diff --git a/sdk/main.ts b/sdk/main.ts index 286c16ea2..b94a7eb51 100644 --- a/sdk/main.ts +++ b/sdk/main.ts @@ -76,9 +76,9 @@ interface MatrixRTCSdk { stop: () => void; data$: Observable<{ rtcBackendIdentity: string; data: string }>; /** - * flattened list of members + * flattened list of remote members */ - members$: Behavior< + remoteMembers$: Behavior< { connection: Connection | null; membership: CallMembership; @@ -86,7 +86,7 @@ interface MatrixRTCSdk { }[] >; /** - * flattened local members + * flattened local member */ localMember$: Behavior<{ connection: Connection | null; @@ -338,8 +338,8 @@ export async function createMatrixRTCSdk( ), ), connected$: callViewModel.connected$, - members$: scope.behavior( - callViewModel.matrixLivekitMembers$.pipe( + remoteMembers$: scope.behavior( + callViewModel.remoteMatrixLivekitMembers$.pipe( switchMap((members) => { const listOfMemberObservables = members.map((member) => combineLatest([ diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index a12b59578..2b901fdc9 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -127,10 +127,9 @@ import { createConnectionManager$, } from "./remoteMembers/ConnectionManager.ts"; import { - createMatrixLivekitMembers$, + createRemoteMatrixLivekitMembers$, type LocalMatrixLivekitMember, type RemoteMatrixLivekitMember, - type MatrixLivekitMember, } from "./remoteMembers/MatrixLivekitMembers.ts"; import { type AutoLeaveReason, @@ -302,7 +301,7 @@ export interface CallViewModel { /** Participants sorted by livekit room so they can be used in the audio rendering */ livekitRoomItems$: Behavior; /** use the layout instead, this is just for the sdk export. */ - matrixLivekitMembers$: Behavior; + remoteMatrixLivekitMembers$: Behavior; localMatrixLivekitMember$: Behavior; /** List of participants raising their hand */ handsRaised$: Behavior>; @@ -529,13 +528,15 @@ export function createCallViewModel$( ownMembershipIdentity, }); - const matrixLivekitMembers$: Behavior> = - createMatrixLivekitMembers$({ - scope: scope, - membershipsWithTransport$: - membershipsAndTransports.membershipsWithTransport$, - connectionManager: connectionManager, - }); + const remoteMatrixLivekitMembers$: Behavior< + Epoch + > = createRemoteMatrixLivekitMembers$({ + scope: scope, + membershipsWithTransport$: + membershipsAndTransports.membershipsWithTransport$, + connectionManager: connectionManager, + localUser: { userId, deviceId }, + }); const connectOptions$ = scope.behavior( matrixRTCMode$.pipe( @@ -612,6 +613,13 @@ export function createCallViewModel$( ), ); + const matrixLivekitMembers$ = scope.behavior( + combineLatest( + [localMatrixLivekitMember$, remoteMatrixLivekitMembers$], + (local, remote) => [...(local === null ? [] : [local]), ...remote.value], + ), + ); + // ------------------------------------------------------------------------ // matrixMemberMetadataStore @@ -641,7 +649,7 @@ export function createCallViewModel$( connectionManager.connectionManagerData$.pipe(map((d) => d.value)), ); const livekitRoomItems$ = scope.behavior( - matrixLivekitMembers$.pipe( + remoteMatrixLivekitMembers$.pipe( switchMap((members) => { const a$ = combineLatest( members.value.map((member) => @@ -707,43 +715,20 @@ export function createCallViewModel$( * List of user media (camera feeds) that we want tiles for. */ const userMedia$ = scope.behavior( - combineLatest([ - localMatrixLivekitMember$, - matrixLivekitMembers$, - duplicateTiles.value$, - ]).pipe( + combineLatest([matrixLivekitMembers$, duplicateTiles.value$]).pipe( // Generate a collection of user media from the list of expected (whether // present or missing) LiveKit participants. generateItems( "CallViewModel userMedia$", - function* ([ - localMatrixLivekitMember, - matrixLivekitMembers, - duplicateTiles, - ]) { - const computeMediaId = (m: MatrixLivekitMember): string => - `${m.userId}:${m.membership$.value.deviceId}`; - - const localUserMediaId = localMatrixLivekitMember - ? computeMediaId(localMatrixLivekitMember) - : undefined; - - const localAsArray = localMatrixLivekitMember - ? [localMatrixLivekitMember] - : []; - const remoteWithoutLocal = matrixLivekitMembers.value.filter( - (m) => computeMediaId(m) !== localUserMediaId, - ); - const allMatrixLivekitMembers = [ - ...localAsArray, - ...remoteWithoutLocal, - ]; - - for (const matrixLivekitMember of allMatrixLivekitMembers) { - const { userId, participant, connection$, membership$ } = - matrixLivekitMember; - const rtcId = membership$.value.rtcBackendIdentity; // rtcBackendIdentity - const mediaId = computeMediaId(matrixLivekitMember); + function* ([members, duplicateTiles]) { + for (const { + userId, + participant, + connection$, + membership$, + } of members) { + const rtcId = membership$.value.rtcBackendIdentity; + const mediaId = `${userId}:${membership$.value.deviceId}`; for (let dup = 0; dup < 1 + duplicateTiles; dup++) { yield { keys: [dup, mediaId, userId, participant, connection$, rtcId], @@ -861,7 +846,7 @@ export function createCallViewModel$( * multiple devices. */ const participantCount$ = scope.behavior( - matrixLivekitMembers$.pipe(map((ms) => ms.value.length)), + matrixLivekitMembers$.pipe(map((ms) => ms.length)), ); const leaveSoundEffect$ = userMedia$.pipe( @@ -1770,8 +1755,8 @@ export function createCallViewModel$( setGridMode: setGridMode, layout$: layout$, localMatrixLivekitMember$, - matrixLivekitMembers$: scope.behavior( - matrixLivekitMembers$.pipe( + remoteMatrixLivekitMembers$: scope.behavior( + remoteMatrixLivekitMembers$.pipe( map((members) => members.value), tap((v) => { const listForLogs = v diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts index 5d34f7be1..244d70ae8 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts @@ -15,7 +15,7 @@ import { BehaviorSubject, combineLatest, map, type Observable } from "rxjs"; import { type IConnectionManager } from "./ConnectionManager.ts"; import { type RemoteMatrixLivekitMember, - createMatrixLivekitMembers$, + createRemoteMatrixLivekitMembers$, } from "./MatrixLivekitMembers.ts"; import { Epoch, @@ -31,6 +31,7 @@ import { } from "../../../utils/test.ts"; import { type Connection } from "./Connection.ts"; import { constant } from "../../Behavior.ts"; +import { localRtcMember } from "../../../utils/test-fixtures.ts"; let testScope: ObservableScope; @@ -88,16 +89,17 @@ test("should signal participant not yet connected to livekit", async () => { mockConnectionManagerData$, ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, + localUser: localRtcMember, }); await flushPromises(); - expect(matrixLivekitMember$.value.value).toSatisfy( + expect(remoteMatrixLivekitMembers$.value.value).toSatisfy( (data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expect(data[0].membership$.value).toBe(bobMembership); @@ -157,16 +159,17 @@ test("should signal participant on a connection that is publishing", async () => constant(dataWithPublisher), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, + localUser: localRtcMember, }); await flushPromises(); - expect(matrixLivekitMember$.value.value).toSatisfy( + expect(remoteMatrixLivekitMembers$.value.value).toSatisfy( (data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expect(data[0].membership$.value).toBe(bobMembership); @@ -197,15 +200,16 @@ test("should signal participant on a connection that is not publishing", async ( constant(dataWithPublisher), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, + localUser: localRtcMember, }); await flushPromises(); - expect(matrixLivekitMember$.value.value).toSatisfy( + expect(remoteMatrixLivekitMembers$.value.value).toSatisfy( (data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expect(data[0].membership$.value).toBe(bobMembership); @@ -245,15 +249,16 @@ describe("Publication edge case", () => { constant(connectionWithPublisher), ); - const matrixLivekitMembers$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, + localUser: localRtcMember, }); await flushPromises(); - expect(matrixLivekitMembers$.value.value).toSatisfy( + expect(remoteMatrixLivekitMembers$.value.value).toSatisfy( (data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(2); expect(data[0].membership$.value).toBe(bobMembership); @@ -303,16 +308,17 @@ test("bob is publishing in the wrong connection", async () => { connectionsWithPublisher$, ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, + localUser: localRtcMember, }); await flushPromises(); - expect(matrixLivekitMember$.value.value).toSatisfy( + expect(remoteMatrixLivekitMembers$.value.value).toSatisfy( (data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(2); expect(data[0].membership$.value).toBe(bobMembership); diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index acd5b55f1..0b93a274b 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -62,7 +62,9 @@ interface Props { Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]> >; connectionManager: IConnectionManager; + localUser: { deviceId: string; userId: string }; } + /** * Combines MatrixRTC and Livekit worlds. * @@ -73,13 +75,14 @@ interface Props { * - out (via public Observable): * - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data. */ -export function createMatrixLivekitMembers$({ +export function createRemoteMatrixLivekitMembers$({ scope, membershipsWithTransport$, connectionManager, + localUser, }: Props): Behavior> { /** - * Stream of all the call members and their associated livekit data (if available). + * Behavior of all the remote call members and their associated livekit data (if available). */ return scope.behavior( combineLatest([ @@ -91,12 +94,19 @@ export function createMatrixLivekitMembers$({ ), map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)), generateItemsWithEpoch( - "MatrixLivekitMembers", + "RemoteMatrixLivekitMembers", // Generator function. // creates an array of `{key, data}[]` // Each change in the keys (new key) will result in a call to the factory function. function* ([membershipsWithTransport, managerData]) { for (const { membership, transport } of membershipsWithTransport) { + // Exclude the local membership + if ( + membership.userId === localUser.userId && + membership.deviceId === localUser.deviceId + ) + continue; + const participants = transport ? managerData.getParticipantsForTransport(transport) : []; diff --git a/src/state/CallViewModel/remoteMembers/integration.test.ts b/src/state/CallViewModel/remoteMembers/integration.test.ts index eb2c6ac8c..67d15a38f 100644 --- a/src/state/CallViewModel/remoteMembers/integration.test.ts +++ b/src/state/CallViewModel/remoteMembers/integration.test.ts @@ -29,13 +29,13 @@ import { import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"; import { areLivekitTransportsEqual, - createMatrixLivekitMembers$, + createRemoteMatrixLivekitMembers$, type RemoteMatrixLivekitMember, } from "./MatrixLivekitMembers.ts"; import { createConnectionManager$ } from "./ConnectionManager.ts"; import { membershipsAndTransports$ } from "../../SessionBehaviors.ts"; import { constant } from "../../Behavior.ts"; -import { testJWTToken } from "../../../utils/test-fixtures.ts"; +import { localRtcMember, testJWTToken } from "../../../utils/test-fixtures.ts"; // Test the integration of ConnectionManager and MatrixLivekitMerger @@ -130,14 +130,15 @@ test("bob, carl, then bob joining no tracks yet", () => { ownMembershipIdentity: ownMemberMock, }); - const matrixLivekitMembers$ = createMatrixLivekitMembers$({ + const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: membershipsAndTransports.membershipsWithTransport$, connectionManager, + localUser: localRtcMember, }); - expectObservable(matrixLivekitMembers$).toBe(vMarble, { + expectObservable(remoteMatrixLivekitMembers$).toBe(vMarble, { a: expect.toSatisfy((e: Epoch) => { const items = e.value; expect(items.length).toBe(1);