Clarify which Matrix-LiveKit members are remote

It was rather confusing that matrixLivekitMembers$ gives you objects of type RemoteMatrixLivekitMembers and yet the *local* member would often be among these. I've attempted to clear this up. To my knowledge this wasn't creating any bugs.
This commit is contained in:
Robin
2026-06-18 09:19:24 +02:00
parent 256219c4bf
commit e77d143ce1
5 changed files with 66 additions and 64 deletions

View File

@@ -76,9 +76,9 @@ interface MatrixRTCSdk {
stop: () => void;
data$: Observable<{ rtcBackendIdentity: string; data: string }>;
/**
* flattened list of members
* flattened list of remote members
*/
members$: Behavior<
remoteMembers$: Behavior<
{
connection: Connection | null;
membership: CallMembership;
@@ -86,7 +86,7 @@ interface MatrixRTCSdk {
}[]
>;
/**
* flattened local members
* flattened local member
*/
localMember$: Behavior<{
connection: Connection | null;
@@ -338,8 +338,8 @@ export async function createMatrixRTCSdk(
),
),
connected$: callViewModel.connected$,
members$: scope.behavior(
callViewModel.matrixLivekitMembers$.pipe(
remoteMembers$: scope.behavior(
callViewModel.remoteMatrixLivekitMembers$.pipe(
switchMap((members) => {
const listOfMemberObservables = members.map((member) =>
combineLatest([

View File

@@ -127,10 +127,9 @@ import {
createConnectionManager$,
} from "./remoteMembers/ConnectionManager.ts";
import {
createMatrixLivekitMembers$,
createRemoteMatrixLivekitMembers$,
type LocalMatrixLivekitMember,
type RemoteMatrixLivekitMember,
type MatrixLivekitMember,
} from "./remoteMembers/MatrixLivekitMembers.ts";
import {
type AutoLeaveReason,
@@ -302,7 +301,7 @@ export interface CallViewModel {
/** Participants sorted by livekit room so they can be used in the audio rendering */
livekitRoomItems$: Behavior<LivekitRoomItem[]>;
/** use the layout instead, this is just for the sdk export. */
matrixLivekitMembers$: Behavior<RemoteMatrixLivekitMember[]>;
remoteMatrixLivekitMembers$: Behavior<RemoteMatrixLivekitMember[]>;
localMatrixLivekitMember$: Behavior<LocalMatrixLivekitMember | null>;
/** List of participants raising their hand */
handsRaised$: Behavior<Record<string, RaisedHandInfo>>;
@@ -529,13 +528,15 @@ export function createCallViewModel$(
ownMembershipIdentity,
});
const matrixLivekitMembers$: Behavior<Epoch<RemoteMatrixLivekitMember[]>> =
createMatrixLivekitMembers$({
scope: scope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager: connectionManager,
});
const remoteMatrixLivekitMembers$: Behavior<
Epoch<RemoteMatrixLivekitMember[]>
> = createRemoteMatrixLivekitMembers$({
scope: scope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager: connectionManager,
localUser: { userId, deviceId },
});
const connectOptions$ = scope.behavior(
matrixRTCMode$.pipe(
@@ -612,6 +613,13 @@ export function createCallViewModel$(
),
);
const matrixLivekitMembers$ = scope.behavior(
combineLatest(
[localMatrixLivekitMember$, remoteMatrixLivekitMembers$],
(local, remote) => [...(local === null ? [] : [local]), ...remote.value],
),
);
// ------------------------------------------------------------------------
// matrixMemberMetadataStore
@@ -641,7 +649,7 @@ export function createCallViewModel$(
connectionManager.connectionManagerData$.pipe(map((d) => d.value)),
);
const livekitRoomItems$ = scope.behavior(
matrixLivekitMembers$.pipe(
remoteMatrixLivekitMembers$.pipe(
switchMap((members) => {
const a$ = combineLatest(
members.value.map((member) =>
@@ -707,43 +715,20 @@ export function createCallViewModel$(
* List of user media (camera feeds) that we want tiles for.
*/
const userMedia$ = scope.behavior<WrappedUserMediaViewModel[]>(
combineLatest([
localMatrixLivekitMember$,
matrixLivekitMembers$,
duplicateTiles.value$,
]).pipe(
combineLatest([matrixLivekitMembers$, duplicateTiles.value$]).pipe(
// Generate a collection of user media from the list of expected (whether
// present or missing) LiveKit participants.
generateItems(
"CallViewModel userMedia$",
function* ([
localMatrixLivekitMember,
matrixLivekitMembers,
duplicateTiles,
]) {
const computeMediaId = (m: MatrixLivekitMember): string =>
`${m.userId}:${m.membership$.value.deviceId}`;
const localUserMediaId = localMatrixLivekitMember
? computeMediaId(localMatrixLivekitMember)
: undefined;
const localAsArray = localMatrixLivekitMember
? [localMatrixLivekitMember]
: [];
const remoteWithoutLocal = matrixLivekitMembers.value.filter(
(m) => computeMediaId(m) !== localUserMediaId,
);
const allMatrixLivekitMembers = [
...localAsArray,
...remoteWithoutLocal,
];
for (const matrixLivekitMember of allMatrixLivekitMembers) {
const { userId, participant, connection$, membership$ } =
matrixLivekitMember;
const rtcId = membership$.value.rtcBackendIdentity; // rtcBackendIdentity
const mediaId = computeMediaId(matrixLivekitMember);
function* ([members, duplicateTiles]) {
for (const {
userId,
participant,
connection$,
membership$,
} of members) {
const rtcId = membership$.value.rtcBackendIdentity;
const mediaId = `${userId}:${membership$.value.deviceId}`;
for (let dup = 0; dup < 1 + duplicateTiles; dup++) {
yield {
keys: [dup, mediaId, userId, participant, connection$, rtcId],
@@ -861,7 +846,7 @@ export function createCallViewModel$(
* multiple devices.
*/
const participantCount$ = scope.behavior(
matrixLivekitMembers$.pipe(map((ms) => ms.value.length)),
matrixLivekitMembers$.pipe(map((ms) => ms.length)),
);
const leaveSoundEffect$ = userMedia$.pipe(
@@ -1770,8 +1755,8 @@ export function createCallViewModel$(
setGridMode: setGridMode,
layout$: layout$,
localMatrixLivekitMember$,
matrixLivekitMembers$: scope.behavior(
matrixLivekitMembers$.pipe(
remoteMatrixLivekitMembers$: scope.behavior(
remoteMatrixLivekitMembers$.pipe(
map((members) => members.value),
tap((v) => {
const listForLogs = v

View File

@@ -15,7 +15,7 @@ import { BehaviorSubject, combineLatest, map, type Observable } from "rxjs";
import { type IConnectionManager } from "./ConnectionManager.ts";
import {
type RemoteMatrixLivekitMember,
createMatrixLivekitMembers$,
createRemoteMatrixLivekitMembers$,
} from "./MatrixLivekitMembers.ts";
import {
Epoch,
@@ -31,6 +31,7 @@ import {
} from "../../../utils/test.ts";
import { type Connection } from "./Connection.ts";
import { constant } from "../../Behavior.ts";
import { localRtcMember } from "../../../utils/test-fixtures.ts";
let testScope: ObservableScope;
@@ -88,12 +89,13 @@ test("should signal participant not yet connected to livekit", async () => {
mockConnectionManagerData$,
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMember$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
localUser: localRtcMember,
});
await flushPromises();
@@ -157,12 +159,13 @@ test("should signal participant on a connection that is publishing", async () =>
constant(dataWithPublisher),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMember$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
localUser: localRtcMember,
});
await flushPromises();
@@ -197,12 +200,13 @@ test("should signal participant on a connection that is not publishing", async (
constant(dataWithPublisher),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMember$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
localUser: localRtcMember,
});
await flushPromises();
expect(matrixLivekitMember$.value.value).toSatisfy(
@@ -245,12 +249,13 @@ describe("Publication edge case", () => {
constant(connectionWithPublisher),
);
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
localUser: localRtcMember,
});
await flushPromises();
expect(matrixLivekitMembers$.value.value).toSatisfy(
@@ -303,12 +308,13 @@ test("bob is publishing in the wrong connection", async () => {
connectionsWithPublisher$,
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMember$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
localUser: localRtcMember,
});
await flushPromises();

View File

@@ -62,7 +62,9 @@ interface Props {
Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
>;
connectionManager: IConnectionManager;
localUser: { deviceId: string; userId: string };
}
/**
* Combines MatrixRTC and Livekit worlds.
*
@@ -73,13 +75,14 @@ interface Props {
* - out (via public Observable):
* - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data.
*/
export function createMatrixLivekitMembers$({
export function createRemoteMatrixLivekitMembers$({
scope,
membershipsWithTransport$,
connectionManager,
localUser,
}: Props): Behavior<Epoch<RemoteMatrixLivekitMember[]>> {
/**
* Stream of all the call members and their associated livekit data (if available).
* Behavior of all the remote call members and their associated livekit data (if available).
*/
return scope.behavior(
combineLatest([
@@ -91,12 +94,19 @@ export function createMatrixLivekitMembers$({
),
map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)),
generateItemsWithEpoch(
"MatrixLivekitMembers",
"RemoteMatrixLivekitMembers",
// Generator function.
// creates an array of `{key, data}[]`
// Each change in the keys (new key) will result in a call to the factory function.
function* ([membershipsWithTransport, managerData]) {
for (const { membership, transport } of membershipsWithTransport) {
// Exclude the local membership
if (
membership.userId === localUser.userId &&
membership.deviceId === localUser.deviceId
)
continue;
const participants = transport
? managerData.getParticipantsForTransport(transport)
: [];

View File

@@ -29,13 +29,13 @@ import {
import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
import {
areLivekitTransportsEqual,
createMatrixLivekitMembers$,
createRemoteMatrixLivekitMembers$,
type RemoteMatrixLivekitMember,
} from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
import { constant } from "../../Behavior.ts";
import { testJWTToken } from "../../../utils/test-fixtures.ts";
import { localRtcMember, testJWTToken } from "../../../utils/test-fixtures.ts";
// Test the integration of ConnectionManager and MatrixLivekitMerger
@@ -130,11 +130,12 @@ test("bob, carl, then bob joining no tracks yet", () => {
ownMembershipIdentity: ownMemberMock,
});
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager,
localUser: localRtcMember,
});
expectObservable(matrixLivekitMembers$).toBe(vMarble, {