From 909d980dff83d96bd81db2d692b9c4f9f31745c9 Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 15 Dec 2025 18:23:30 +0100 Subject: [PATCH] still with broken tests... --- package.json | 2 +- src/e2ee/matrixKeyProvider.ts | 65 ++- src/livekit/MatrixAudioRenderer.tsx | 3 +- src/room/InCallView.tsx | 1 + src/state/CallViewModel/CallViewModel.test.ts | 3 - src/state/CallViewModel/CallViewModel.ts | 25 +- .../remoteMembers/ConnectionManager.ts | 25 +- .../MatrixLivekitMembers.test.ts | 437 ++++++++---------- .../remoteMembers/MatrixLivekitMembers.ts | 107 ++++- yarn.lock | 42 +- 10 files changed, 353 insertions(+), 357 deletions(-) diff --git a/package.json b/package.json index 21c870ad..1efb504b 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,7 @@ "livekit-client": "^2.13.0", "lodash-es": "^4.17.21", "loglevel": "^1.9.1", - "matrix-js-sdk": "^39.2.0", + "matrix-js-sdk": "github:matrix-org/matrix-js-sdk#head=toger5/use-membershipID-for-session-state-events&commit=f5f1b8efb46b3d55a7eebfabb4a61496640b8b00", "matrix-widget-api": "^1.14.0", "normalize.css": "^8.0.1", "observable-hooks": "^4.2.3", diff --git a/src/e2ee/matrixKeyProvider.ts b/src/e2ee/matrixKeyProvider.ts index 95033f87..1d1c4588 100644 --- a/src/e2ee/matrixKeyProvider.ts +++ b/src/e2ee/matrixKeyProvider.ts @@ -11,6 +11,12 @@ import { type MatrixRTCSession, MatrixRTCSessionEvent, } from "matrix-js-sdk/lib/matrixrtc"; +import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; + +import { + computeLivekitParticipantIdentity, + livekitIdentityInput, +} from "../state/CallViewModel/remoteMembers/MatrixLivekitMembers"; export class MatrixKeyProvider extends BaseKeyProvider { private rtcSession?: MatrixRTCSession; @@ -42,31 +48,46 @@ export class MatrixKeyProvider extends BaseKeyProvider { private onEncryptionKeyChanged = ( encryptionKey: Uint8Array, encryptionKeyIndex: number, - participantId: string, + membership: CallMembershipIdentityParts, ): void => { - crypto.subtle - .importKey("raw", encryptionKey, "HKDF", false, [ + const unhashedIdentity = livekitIdentityInput(membership); + + // This is the only way we can get the kind of the membership event we just received the key for. + // best case we want to recompute this once the memberships change (you can receive the key before the participant...) + // + // TODO change this to `?? "rtc"` for newer versions. + const kind = + this.rtcSession?.memberships.find( + (m) => + m.userId === membership.userId && + m.deviceId === membership.deviceId && + m.memberId === membership.memberId, + )?.kind ?? "session"; + + Promise.all([ + crypto.subtle.importKey("raw", encryptionKey, "HKDF", false, [ "deriveBits", "deriveKey", - ]) - .then( - (keyMaterial) => { - this.onSetEncryptionKey( - keyMaterial, - participantId, - encryptionKeyIndex, - ); + ]), + computeLivekitParticipantIdentity(membership, kind), + ]).then( + ([keyMaterial, livekitParticipantId]) => { + this.onSetEncryptionKey( + keyMaterial, + livekitParticipantId, + encryptionKeyIndex, + ); - logger.debug( - `Sent new key to livekit room=${this.rtcSession?.room.roomId} participantId=${participantId} encryptionKeyIndex=${encryptionKeyIndex}`, - ); - }, - (e) => { - logger.error( - `Failed to create key material from buffer for livekit room=${this.rtcSession?.room.roomId} participantId=${participantId} encryptionKeyIndex=${encryptionKeyIndex}`, - e, - ); - }, - ); + logger.debug( + `Sent new key to livekit room=${this.rtcSession?.room.roomId} participantId=${livekitParticipantId} (before hash: ${unhashedIdentity}) encryptionKeyIndex=${encryptionKeyIndex}`, + ); + }, + (e) => { + logger.error( + `Failed to create key material from buffer for livekit room=${this.rtcSession?.room.roomId} participantId before hash=${unhashedIdentity} encryptionKeyIndex=${encryptionKeyIndex}`, + e, + ); + }, + ); }; } diff --git a/src/livekit/MatrixAudioRenderer.tsx b/src/livekit/MatrixAudioRenderer.tsx index 5b1149e9..0fa5d000 100644 --- a/src/livekit/MatrixAudioRenderer.tsx +++ b/src/livekit/MatrixAudioRenderer.tsx @@ -15,7 +15,6 @@ import { type AudioTrackProps, } from "@livekit/components-react"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type ParticipantId } from "matrix-js-sdk/lib/matrixrtc"; import { useEarpieceAudioConfig } from "../MediaDevicesContext"; import { useReactiveState } from "../useReactiveState"; @@ -32,7 +31,7 @@ export interface MatrixAudioRendererProps { * This list needs to be composed based on the matrixRTC members so that we do not play audio from users * that are not expected to be in the rtc session (local user is excluded). */ - validIdentities: ParticipantId[]; + validIdentities: string[]; /** * If set to `true`, mutes all audio tracks rendered by the component. * @remarks diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6ae004d8..e9932fdc 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -785,6 +785,7 @@ export const InCallView: FC = ({ onTouchEnd={onControlsTouchEnd} /> )} + {!showControls &&
}
); diff --git a/src/state/CallViewModel/CallViewModel.test.ts b/src/state/CallViewModel/CallViewModel.test.ts index 2e5b5700..be598702 100644 --- a/src/state/CallViewModel/CallViewModel.test.ts +++ b/src/state/CallViewModel/CallViewModel.test.ts @@ -1248,9 +1248,6 @@ describe("CallViewModel", () => { y: () => { rtcSession.membershipStatus = Status.Connected; }, - n: () => { - rtcSession.membershipStatus = Status.Reconnecting; - }, }); schedule(probablyLeftMarbles, { y: () => { diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 5cc33f5d..289b642c 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -591,10 +591,9 @@ export function createCallViewModel$( const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( - switchMap((membersWithEpoch) => { - const members = membersWithEpoch.value; + switchMap((members) => { const a$ = combineLatest( - members.map((member) => + members.value.map((member) => combineLatest([member.connection$, member.participant$]).pipe( map(([connection, participant]) => { // do not render audio for local participant @@ -667,22 +666,22 @@ export function createCallViewModel$( generateItems( function* ([ localMatrixLivekitMember, - { value: matrixLivekitMembers }, + matrixLivekitMembers, duplicateTiles, ]) { - let localParticipantId: string | undefined = undefined; + let localUserMediaId: string | undefined = undefined; // add local member if available if (localMatrixLivekitMember) { const { userId, participant$, connection$, membership$ } = localMatrixLivekitMember; - localParticipantId = `${userId}:${membership$.value.deviceId}`; // should be membership$.value.membershipID which is not optional - // const participantId = membership$.value.membershipID; - if (localParticipantId) { + localUserMediaId = `${userId}:${membership$.value.deviceId}`; // should be membership$.value.membershipID which is not optional + + if (localUserMediaId) { for (let dup = 0; dup < 1 + duplicateTiles; dup++) { yield { keys: [ dup, - localParticipantId, + localUserMediaId, userId, participant$, connection$, @@ -698,13 +697,13 @@ export function createCallViewModel$( participant$, connection$, membership$, - } of matrixLivekitMembers) { - const participantId = `${userId}:${membership$.value.deviceId}`; - if (participantId === localParticipantId) continue; + } of matrixLivekitMembers.value) { + const userMediaId = `${userId}:${membership$.value.deviceId}`; + if (userMediaId === localUserMediaId) continue; // const participantId = membership$.value?.identity; for (let dup = 0; dup < 1 + duplicateTiles; dup++) { yield { - keys: [dup, participantId, userId, participant$, connection$], + keys: [dup, userMediaId, userId, participant$, connection$], data: undefined, }; } diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 755ba3dd..6660df62 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -6,10 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - type LivekitTransport, - type ParticipantId, -} from "matrix-js-sdk/lib/matrixrtc"; +import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, of, switchMap, tap } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; @@ -62,24 +59,8 @@ export class ConnectionManagerData { const key = transport.livekit_service_url + "|" + transport.livekit_alias; return this.store.get(key)?.[1] ?? []; } - /** - * Get all connections where the given participant is publishing. - * In theory, there could be several connections where the same participant is publishing but with - * only well behaving clients a participant should only be publishing on a single connection. - * @param participantId - */ - public getConnectionsForParticipant( - participantId: ParticipantId, - ): Connection[] { - const connections: Connection[] = []; - for (const [connection, participants] of this.store.values()) { - if (participants.some((p) => p.identity === participantId)) { - connections.push(connection); - } - } - return connections; - } } + interface Props { scope: ObservableScope; connectionFactory: ConnectionFactory; @@ -202,7 +183,7 @@ export function createConnectionManager$({ ); }), ), - new Epoch(new ConnectionManagerData()), + new Epoch(new ConnectionManagerData(), -1), ); return { connectionManagerData$ }; diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts index e675f723..c2e60798 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts @@ -10,8 +10,7 @@ import { type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; -import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils"; -import { combineLatest, map, type Observable } from "rxjs"; +import { BehaviorSubject, combineLatest, map, type Observable } from "rxjs"; import { type IConnectionManager } from "./ConnectionManager.ts"; import { @@ -26,14 +25,19 @@ import { } from "../../ObservableScope.ts"; import { ConnectionManagerData } from "./ConnectionManager.ts"; import { + flushPromises, mockCallMembership, mockRemoteParticipant, withTestScheduler, } from "../../../utils/test.ts"; import { type Connection } from "./Connection.ts"; +import { constant } from "../../Behavior.ts"; let testScope: ObservableScope; +const fallbackMemberId = (userId: string, deviceId: string): string => + `${userId}:${deviceId}`; + const transportA: LivekitTransport = { type: "livekit", livekit_service_url: "https://lk.example.org", @@ -76,49 +80,41 @@ function epochMeWith$( ); } -test("should signal participant not yet connected to livekit", () => { - withTestScheduler(({ behavior, expectObservable }) => { - const { memberships$, membershipsWithTransport$ } = fromMemberships$( - behavior("a", { - a: [bobMembership], - }), - ); +test("should signal participant not yet connected to livekit", async () => { + const mockedMemberships$ = new BehaviorSubject([bobMembership]); + const mockConnectionManagerData$ = new BehaviorSubject( + new ConnectionManagerData(), + ); + const { memberships$, membershipsWithTransport$ } = + createEpochedMemberships$(mockedMemberships$); - const connectionManagerData$ = epochMeWith$( - memberships$, - behavior("a", { - a: new ConnectionManagerData(), - }), - ); + const connectionManagerData$ = epochMeWith$( + memberships$, + mockConnectionManagerData$, + ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ - scope: testScope, - membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), - connectionManager: { - connectionManagerData$: connectionManagerData$, - } as unknown as IConnectionManager, - }); - - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: null, - }); - expectObservable(data[0].connection$).toBe("a", { - a: null, - }); - return true; - }), - }); + const matrixLivekitMember$ = createMatrixLivekitMembers$({ + scope: testScope, + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, }); + + await flushPromises(); + expect(matrixLivekitMember$.value.value).toSatisfy( + (data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expect(data[0].membership$.value).toBe(bobMembership); + expect(data[0].participant$.value).toBe(null); + expect(data[0].connection$.value).toBe(null); + return true; + }, + ); }); // Helper to create epoch'ed memberships$ and membershipsWithTransport$ from memberships observable. -function fromMemberships$(m$: Observable): { +function createEpochedMemberships$(m$: Observable): { memberships$: Observable>; membershipsWithTransport$: Observable< Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> @@ -143,32 +139,115 @@ function fromMemberships$(m$: Observable): { }; } -test("should signal participant on a connection that is publishing", () => { - withTestScheduler(({ behavior, expectObservable }) => { - const bobParticipantId = getParticipantId( +test("should signal participant on a connection that is publishing", async () => { + const bobParticipantId = fallbackMemberId( + bobMembership.userId, + bobMembership.deviceId, + ); + + const { memberships$, membershipsWithTransport$ } = createEpochedMemberships$( + constant([bobMembership]), + ); + + const connection = { + transport: bobMembership.getTransport(bobMembership), + } as unknown as Connection; + const dataWithPublisher = new ConnectionManagerData(); + dataWithPublisher.add(connection, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); + + const connectionManagerData$ = epochMeWith$( + memberships$, + constant(dataWithPublisher), + ); + + const matrixLivekitMember$ = createMatrixLivekitMembers$({ + scope: testScope, + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, + }); + + await flushPromises(); + expect(matrixLivekitMember$.value.value).toSatisfy( + (data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expect(data[0].membership$.value).toBe(bobMembership); + expect(data[0].participant$.value).toSatisfy((participant) => { + expect(participant).toBeDefined(); + expect(participant!.identity).toEqual(bobParticipantId); + return true; + }); + expect(data[0].connection$.value).toBe(connection); + return true; + }, + ); +}); + +test("should signal participant on a connection that is not publishing", async () => { + const { memberships$, membershipsWithTransport$ } = createEpochedMemberships$( + constant([bobMembership]), + ); + + const connection = { + transport: bobMembership.getTransport(bobMembership), + } as unknown as Connection; + const dataWithPublisher = new ConnectionManagerData(); + dataWithPublisher.add(connection, []); + + const connectionManagerData$ = epochMeWith$( + memberships$, + constant(dataWithPublisher), + ); + + const matrixLivekitMember$ = createMatrixLivekitMembers$({ + scope: testScope, + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, + }); + await flushPromises(); + expect(matrixLivekitMember$.value.value).toSatisfy( + (data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expect(data[0].membership$.value).toBe(bobMembership); + expect(data[0].participant$.value).toBe(null); + expect(data[0].connection$.value).toBe(connection); + return true; + }, + ); +}); + +describe("Publication edge case", () => { + test("bob is publishing in several connections", async () => { + const { memberships$, membershipsWithTransport$ } = + createEpochedMemberships$(constant([bobMembership, carlMembership])); + + const connectionWithPublisher = new ConnectionManagerData(); + const bobParticipantId = fallbackMemberId( bobMembership.userId, bobMembership.deviceId, ); - - const { memberships$, membershipsWithTransport$ } = fromMemberships$( - behavior("a", { - a: [bobMembership], - }), - ); - - const connection = { - transport: bobMembership.getTransport(bobMembership), + const connectionA = { + transport: transportA, } as unknown as Connection; - const dataWithPublisher = new ConnectionManagerData(); - dataWithPublisher.add(connection, [ + const connectionB = { + transport: transportB, + } as unknown as Connection; + + connectionWithPublisher.add(connectionA, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); + connectionWithPublisher.add(connectionB, [ mockRemoteParticipant({ identity: bobParticipantId }), ]); const connectionManagerData$ = epochMeWith$( memberships$, - behavior("a", { - a: dataWithPublisher, - }), + constant(connectionWithPublisher), ); const matrixLivekitMember$ = createMatrixLivekitMembers$({ @@ -178,207 +257,73 @@ test("should signal participant on a connection that is publishing", () => { connectionManagerData$: connectionManagerData$, } as unknown as IConnectionManager, }); + await flushPromises(); + expect(matrixLivekitMember$.value.value).toSatisfy( + (data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(2); + expect(data[0].membership$.value).toBe(bobMembership); + expect(data[0].connection$.value).toBe(connectionA); + expect(data[0].participant$.value).toSatisfy((participant) => { + expect(participant).toBeDefined(); + expect(participant!.identity).toEqual(bobParticipantId); + return true; + }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: expect.toSatisfy((participant) => { - expect(participant).toBeDefined(); - expect(participant!.identity).toEqual(bobParticipantId); - return true; - }), - }); - expectObservable(data[0].connection$).toBe("a", { - a: connection, - }); return true; - }), - }); - }); -}); - -test("should signal participant on a connection that is not publishing", () => { - withTestScheduler(({ behavior, expectObservable }) => { - const { memberships$, membershipsWithTransport$ } = fromMemberships$( - behavior("a", { - a: [bobMembership], - }), + }, ); - - const connection = { - transport: bobMembership.getTransport(bobMembership), - } as unknown as Connection; - const dataWithPublisher = new ConnectionManagerData(); - dataWithPublisher.add(connection, []); - - const connectionManagerData$ = epochMeWith$( - memberships$, - behavior("a", { - a: dataWithPublisher, - }), - ); - - const matrixLivekitMember$ = createMatrixLivekitMembers$({ - scope: testScope, - membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), - connectionManager: { - connectionManagerData$: connectionManagerData$, - } as unknown as IConnectionManager, - }); - - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: null, - }); - expectObservable(data[0].connection$).toBe("a", { - a: connection, - }); - return true; - }), - }); }); }); -describe("Publication edge case", () => { - test("bob is publishing in several connections", () => { - withTestScheduler(({ behavior, expectObservable }) => { - const { memberships$, membershipsWithTransport$ } = fromMemberships$( - behavior("a", { - a: [bobMembership, carlMembership], - }), - ); +test("bob is publishing in the wrong connection", async () => { + const mockedMemberships$ = new BehaviorSubject([ + bobMembership, + carlMembership, + ]); - const connectionWithPublisher = new ConnectionManagerData(); - const bobParticipantId = getParticipantId( - bobMembership.userId, - bobMembership.deviceId, - ); - const connectionA = { - transport: transportA, - } as unknown as Connection; - const connectionB = { - transport: transportB, - } as unknown as Connection; + const { memberships$, membershipsWithTransport$ } = + createEpochedMemberships$(mockedMemberships$); - connectionWithPublisher.add(connectionA, [ - mockRemoteParticipant({ identity: bobParticipantId }), - ]); - connectionWithPublisher.add(connectionB, [ - mockRemoteParticipant({ identity: bobParticipantId }), - ]); + const connectionWithPublisher = new ConnectionManagerData(); - const connectionManagerData$ = epochMeWith$( - memberships$, - behavior("a", { - a: connectionWithPublisher, - }), - ); + const bobParticipantId = fallbackMemberId( + bobMembership.userId, + bobMembership.deviceId, + ); + const connectionA = { transport: transportA } as unknown as Connection; + const connectionB = { transport: transportB } as unknown as Connection; - const matrixLivekitMember$ = createMatrixLivekitMembers$({ - scope: testScope, - membershipsWithTransport$: testScope.behavior( - membershipsWithTransport$, - ), - connectionManager: { - connectionManagerData$: connectionManagerData$, - } as unknown as IConnectionManager, - }); + // Bob is not publishing on A + connectionWithPublisher.add(connectionA, []); + // Bob is publishing on B but his membership says A + connectionWithPublisher.add(connectionB, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( - "a", - { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(2); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].connection$).toBe("a", { - // The real connection should be from transportA as per the membership - a: connectionA, - }); - expectObservable(data[0].participant$).toBe("a", { - a: expect.toSatisfy((participant) => { - expect(participant).toBeDefined(); - expect(participant!.identity).toEqual(bobParticipantId); - return true; - }), - }); - return true; - }), - }, - ); - }); + const connectionsWithPublisher$ = new BehaviorSubject( + connectionWithPublisher, + ); + const connectionManagerData$ = epochMeWith$( + memberships$, + connectionsWithPublisher$, + ); + + const matrixLivekitMember$ = createMatrixLivekitMembers$({ + scope: testScope, + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, }); - test("bob is publishing in the wrong connection", () => { - withTestScheduler(({ behavior, expectObservable }) => { - const { memberships$, membershipsWithTransport$ } = fromMemberships$( - behavior("a", { - a: [bobMembership, carlMembership], - }), - ); - - const connectionWithPublisher = new ConnectionManagerData(); - const bobParticipantId = getParticipantId( - bobMembership.userId, - bobMembership.deviceId, - ); - const connectionA = { transport: transportA } as unknown as Connection; - const connectionB = { transport: transportB } as unknown as Connection; - - // Bob is not publishing on A - connectionWithPublisher.add(connectionA, []); - // Bob is publishing on B but his membership says A - connectionWithPublisher.add(connectionB, [ - mockRemoteParticipant({ identity: bobParticipantId }), - ]); - - const connectionManagerData$ = epochMeWith$( - memberships$, - behavior("a", { - a: connectionWithPublisher, - }), - ); - - const matrixLivekitMember$ = createMatrixLivekitMembers$({ - scope: testScope, - membershipsWithTransport$: testScope.behavior( - membershipsWithTransport$, - ), - connectionManager: { - connectionManagerData$: connectionManagerData$, - } as unknown as IConnectionManager, - }); - - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( - "a", - { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(2); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].connection$).toBe("a", { - // The real connection should be from transportA as per the membership - a: connectionA, - }); - expectObservable(data[0].participant$).toBe("a", { - // No participant as Bob is not publishing on his membership transport - a: null, - }); - return true; - }), - }, - ); - }); - }); + await flushPromises(); + expect(matrixLivekitMember$.value.value).toSatisfy( + (data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(2); + expect(data[0].membership$.value).toBe(bobMembership); + expect(data[0].connection$.value).toBe(connectionA); + expect(data[0].participant$.value).toBe(null); + return true; + }, + ); }); diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 79ad933c..9c6a05c9 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -13,8 +13,11 @@ import { type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, filter, map } from "rxjs"; +import { combineLatest, filter, map, switchMap } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; +import { sha256 } from "matrix-js-sdk/lib/digest"; +import { encodeUnpaddedBase64Url } from "matrix-js-sdk"; +import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; @@ -62,64 +65,89 @@ export function createMatrixLivekitMembers$({ membershipsWithTransport$, connectionManager, }: Props): Behavior> { + /** + * This internal observable is used to compute the async sha256 hash of the user's identity. + * a promise is treated like an observable. So we can switchMap on the promise from the identity computation. + * The last update to `membershipsWithTransport$` will always be the last promise we pass to switchMap. + * So we will eventually always end up with the latest memberships and their identities. + */ + const membershipsWithTransportAndLivekitIdentity$ = + membershipsWithTransport$.pipe( + switchMap(async (membershipsWithTransport) => { + const { value, epoch } = membershipsWithTransport; + const membershipsWithTransportAndLkIdentityPromises = value.map( + async (obj) => { + return computeLivekitParticipantIdentity( + obj.membership, + obj.membership.kind, + ); + }, + ); + const identities = await Promise.all( + membershipsWithTransportAndLkIdentityPromises, + ); + const membershipsWithTransportAndLkIdentity = value.map( + ({ transport, membership }, index) => { + return { transport, membership, identity: identities[index] }; + }, + ); + return new Epoch(membershipsWithTransportAndLkIdentity, epoch); + }), + ); + /** * Stream of all the call members and their associated livekit data (if available). */ - return scope.behavior( combineLatest([ - membershipsWithTransport$, + membershipsWithTransportAndLivekitIdentity$, connectionManager.connectionManagerData$, ]).pipe( filter((values) => values.every((value) => value.epoch === values[0].epoch), ), - map( - ([ - { value: membershipsWithTransports, epoch }, - { value: managerData }, - ]) => - new Epoch([membershipsWithTransports, managerData] as const, epoch), - ), + map(([x, y]) => new Epoch([x.value, y.value] as const, x.epoch)), generateItemsWithEpoch( // Generator function. // creates an array of `{key, data}[]` // Each change in the keys (new key, missing key) will result in a call to the factory function. - function* ([membershipsWithTransports, managerData]) { - for (const { membership, transport } of membershipsWithTransports) { - // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to - const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; - + function* ([membershipsWithTransportAndLivekitIdentity, managerData]) { + for (const { + membership, + transport, + identity, + } of membershipsWithTransportAndLivekitIdentity) { const participants = transport ? managerData.getParticipantForTransport(transport) : []; const participant = - participants.find((p) => p.identity == participantId) ?? null; + participants.find((p) => p.identity == identity) ?? null; const connection = transport ? managerData.getConnectionForTransport(transport) : null; yield { - keys: [participantId, membership.userId], + keys: [identity, membership.userId, membership.deviceId], data: { membership, participant, connection }, }; } }, // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. - (scope, data$, participantId, userId) => { + (scope, data$, identity, userId, deviceId) => { logger.debug( - `Generating member for participantId: ${participantId}, userId: ${userId}`, + `Generating member for livekitIdentity: ${identity}, userId:deviceId: ${userId}${deviceId}`, ); // will only get called once per `participantId, userId` pair. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. return { - participantId, + identity, userId, ...scope.splitBehavior(data$), }; }, ), ), + new Epoch([], -1), ); } @@ -136,3 +164,42 @@ export function areLivekitTransportsEqual( if (!t1 && !t2) return true; return false; } + +const livekitParticipantIdentityCache = new Map(); + +/** + * The string that is computed based on the membership and used for the computing the hash. + * `${userId}:${deviceId}:${membershipID}` + * as the direct imput for: await sha256(input) + */ +export const livekitIdentityInput = ({ + userId, + deviceId, + memberId, +}: CallMembershipIdentityParts): string => `${userId}|${deviceId}|${memberId}`; + +export async function computeLivekitParticipantIdentity( + membership: CallMembershipIdentityParts, + kind: "rtc" | "session", +): Promise { + switch (kind) { + case "rtc": { + const input = livekitIdentityInput(membership); + if (livekitParticipantIdentityCache.size > 400) + // prevent memory leaks in a stupid/simple way + livekitParticipantIdentityCache.clear(); + // TODO use non deprecated memberId + if (livekitParticipantIdentityCache.has(input)) + return livekitParticipantIdentityCache.get(input)!; + else { + const hashBuffer = await sha256(input); + const hashedString = encodeUnpaddedBase64Url(hashBuffer); + livekitParticipantIdentityCache.set(input, hashedString); + return hashedString; + } + } + case "session": + default: + return `${membership.userId}:${membership.deviceId}`; + } +} diff --git a/yarn.lock b/yarn.lock index 94b73130..707a6480 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2795,10 +2795,10 @@ __metadata: languageName: node linkType: hard -"@matrix-org/matrix-sdk-crypto-wasm@npm:^15.3.0": - version: 15.3.0 - resolution: "@matrix-org/matrix-sdk-crypto-wasm@npm:15.3.0" - checksum: 10c0/45628f36b7b0e54a8777ae67a7233dbdf3e3cf14e0d95d21f62f89a7ea7e3f907232f1eb7b1262193b1e227759fad47af829dcccc103ded89011f13c66f01d76 +"@matrix-org/matrix-sdk-crypto-wasm@npm:^16.0.0": + version: 16.0.0 + resolution: "@matrix-org/matrix-sdk-crypto-wasm@npm:16.0.0" + checksum: 10c0/13b4ede3e618da819957abff778afefcf3baf9a2faac04a36bb5a07a44fae2ea05fbfa072eb3408d48b2b7b9aaf27242ce52c594c8ce9bf1fb8b3aade2832be1 languageName: node linkType: hard @@ -6571,24 +6571,10 @@ __metadata: languageName: node linkType: hard -"caniuse-lite@npm:^1.0.30001688": - version: 1.0.30001701 - resolution: "caniuse-lite@npm:1.0.30001701" - checksum: 10c0/a814bd4dd8b49645ca51bc6ee42120660a36394bb54eb6084801d3f2bbb9471e5e1a9a8a25f44f83086a032d46e66b33031e2aa345f699b90a7e84a9836b819c - languageName: node - linkType: hard - -"caniuse-lite@npm:^1.0.30001702": - version: 1.0.30001720 - resolution: "caniuse-lite@npm:1.0.30001720" - checksum: 10c0/ba9f963364ec4bfc8359d15d7e2cf365185fa1fddc90b4f534c71befedae9b3dd0cd2583a25ffc168a02d7b61b6c18b59bda0a1828ea2a5250fd3e35c2c049e9 - languageName: node - linkType: hard - -"caniuse-lite@npm:^1.0.30001726": - version: 1.0.30001726 - resolution: "caniuse-lite@npm:1.0.30001726" - checksum: 10c0/2c5f91da7fd9ebf8c6b432818b1498ea28aca8de22b30dafabe2a2a6da1e014f10e67e14f8e68e872a0867b6b4cd6001558dde04e3ab9770c9252ca5c8849d0e +"caniuse-lite@npm:^1.0.30001688, caniuse-lite@npm:^1.0.30001702, caniuse-lite@npm:^1.0.30001726": + version: 1.0.30001760 + resolution: "caniuse-lite@npm:1.0.30001760" + checksum: 10c0/cee26dff5c5b15ba073ab230200e43c0d4e88dc3bac0afe0c9ab963df70aaa876c3e513dde42a027f317136bf6e274818d77b073708b74c5807dfad33c029d3c languageName: node linkType: hard @@ -7547,7 +7533,7 @@ __metadata: livekit-client: "npm:^2.13.0" lodash-es: "npm:^4.17.21" loglevel: "npm:^1.9.1" - matrix-js-sdk: "npm:^39.2.0" + matrix-js-sdk: "github:matrix-org/matrix-js-sdk#head=toger5/use-membershipID-for-session-state-events&commit=f5f1b8efb46b3d55a7eebfabb4a61496640b8b00" matrix-widget-api: "npm:^1.14.0" normalize.css: "npm:^8.0.1" observable-hooks: "npm:^4.2.3" @@ -10352,12 +10338,12 @@ __metadata: languageName: node linkType: hard -"matrix-js-sdk@npm:^39.2.0": - version: 39.2.0 - resolution: "matrix-js-sdk@npm:39.2.0" +"matrix-js-sdk@github:matrix-org/matrix-js-sdk#head=toger5/use-membershipID-for-session-state-events&commit=f5f1b8efb46b3d55a7eebfabb4a61496640b8b00": + version: 39.3.0 + resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=f5f1b8efb46b3d55a7eebfabb4a61496640b8b00" dependencies: "@babel/runtime": "npm:^7.12.5" - "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" + "@matrix-org/matrix-sdk-crypto-wasm": "npm:^16.0.0" another-json: "npm:^0.2.0" bs58: "npm:^6.0.0" content-type: "npm:^1.0.4" @@ -10370,7 +10356,7 @@ __metadata: sdp-transform: "npm:^3.0.0" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 + checksum: 10c0/9607b0c063c873a24c1a2d05cc7500d60c32556ec82b666ebaae5c5e829faf5bb7639780efddea7211e6b9873098bd53b97656f041e932e8b0de0c208ccabbff languageName: node linkType: hard