From 4c5f06a8a9d2b27a730f58e4ecfbb410b9f88c5a Mon Sep 17 00:00:00 2001 From: Valere Date: Mon, 3 Nov 2025 13:18:21 +0100 Subject: [PATCH] Refactoring to ease testing of ConnectionManager - Extract a ConnectionFactory - Change Connection manager allPariticipantWithConnection$ for structure that supports members with no participant --- src/state/remoteMembers/ConnectionFactory.ts | 114 +++++++ .../remoteMembers/ConnectionManager.test.ts | 307 ++++++++++++++++++ src/state/remoteMembers/ConnectionManager.ts | 247 ++++++-------- .../remoteMembers/MatrixLivekitMerger.test.ts | 236 +++++++++++++- .../remoteMembers/matrixLivekitMerger.ts | 31 +- 5 files changed, 778 insertions(+), 157 deletions(-) create mode 100644 src/state/remoteMembers/ConnectionFactory.ts create mode 100644 src/state/remoteMembers/ConnectionManager.test.ts diff --git a/src/state/remoteMembers/ConnectionFactory.ts b/src/state/remoteMembers/ConnectionFactory.ts new file mode 100644 index 00000000..a2a02e3e --- /dev/null +++ b/src/state/remoteMembers/ConnectionFactory.ts @@ -0,0 +1,114 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; +import { + type E2EEOptions, + Room as LivekitRoom, + type RoomOptions, +} from "livekit-client"; +import { type Logger } from "matrix-js-sdk/lib/logger"; + +import { type ObservableScope } from "../ObservableScope.ts"; +import { Connection } from "./Connection.ts"; +import type { OpenIDClientParts } from "../../livekit/openIDSFU.ts"; +import type { MediaDevices } from "../MediaDevices.ts"; +import type { Behavior } from "../Behavior.ts"; +import type { ProcessorState } from "../../livekit/TrackProcessorContext.tsx"; +import { defaultLiveKitOptions } from "../../livekit/options.ts"; + +export interface ConnectionFactory { + createConnection( + transport: LivekitTransport, + scope: ObservableScope, + logger: Logger, + ): Connection; +} + +export class ECConnectionFactory implements ConnectionFactory { + private readonly livekitRoomFactory: () => LivekitRoom; + + /** + * Creates a ConnectionFactory for LiveKit connections. + * + * @param client - The OpenID client parts for authentication, needed to get openID and JWT tokens. + * @param devices - Used for video/audio out/in capture options. + * @param processorState$ - Effects like background blur (only for publishing connection?) + * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit Room. + * @param controlledAudioDevices - Option to indicate whether audio output device is controlled externally (native mobile app). + * @param livekitRoomFactory - Optional factory function (for testing) to create LivekitRoom instances. If not provided, a default factory is used. + */ + public constructor( + private client: OpenIDClientParts, + private devices: MediaDevices, + private processorState$: Behavior, + private e2eeLivekitOptions: E2EEOptions | undefined, + private controlledAudioDevices: boolean, + livekitRoomFactory?: () => LivekitRoom, + ) { + const defaultFactory = (): LivekitRoom => + new LivekitRoom( + generateRoomOption( + this.devices, + this.processorState$.value, + this.e2eeLivekitOptions, + this.controlledAudioDevices, + ), + ); + this.livekitRoomFactory = livekitRoomFactory ?? defaultFactory; + } + + public createConnection( + transport: LivekitTransport, + scope: ObservableScope, + logger: Logger, + ): Connection { + return new Connection( + { + transport, + client: this.client, + scope: scope, + livekitRoomFactory: this.livekitRoomFactory, + }, + logger, + ); + } +} + +/** + * Generate the initial LiveKit RoomOptions based on the current media devices and processor state. + */ +function generateRoomOption( + devices: MediaDevices, + processorState: ProcessorState, + e2eeLivekitOptions: E2EEOptions | undefined, + controlledAudioDevices: boolean, +): RoomOptions { + return { + ...defaultLiveKitOptions, + videoCaptureDefaults: { + ...defaultLiveKitOptions.videoCaptureDefaults, + deviceId: devices.videoInput.selected$.value?.id, + processor: processorState.processor, + }, + audioCaptureDefaults: { + ...defaultLiveKitOptions.audioCaptureDefaults, + deviceId: devices.audioInput.selected$.value?.id, + }, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : devices.audioOutput.selected$.value?.id, + }, + e2ee: e2eeLivekitOptions, + // TODO test and consider this: + // webAudioMix: true, + }; +} diff --git a/src/state/remoteMembers/ConnectionManager.test.ts b/src/state/remoteMembers/ConnectionManager.test.ts new file mode 100644 index 00000000..a0203840 --- /dev/null +++ b/src/state/remoteMembers/ConnectionManager.test.ts @@ -0,0 +1,307 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { BehaviorSubject } from "rxjs"; +import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; +import { type Participant as LivekitParticipant } from "livekit-client"; + +import { ObservableScope } from "../ObservableScope.ts"; +import { + ConnectionManager, + type ConnectionManagerData, +} from "./ConnectionManager.ts"; +import { type ConnectionFactory } from "./ConnectionFactory.ts"; +import { type Connection } from "./Connection.ts"; +import { areLivekitTransportsEqual } from "./matrixLivekitMerger.ts"; +import { flushPromises } from "../../utils/test.ts"; + +// Some test constants + +const TRANSPORT_1: LivekitTransport = { + type: "livekit", + livekit_service_url: "https://lk.example.org", + livekit_alias: "!alias:example.org", +}; + +const TRANSPORT_2: LivekitTransport = { + type: "livekit", + livekit_service_url: "https://lk.sample.com", + livekit_alias: "!alias:sample.com", +}; + +const TRANSPORT_3: LivekitTransport = { + type: "livekit", + livekit_service_url: "https://lk-other.sample.com", + livekit_alias: "!alias:sample.com", +}; + +let testScope: ObservableScope; +let fakeConnectionFactory: ConnectionFactory; + +let testTransportStream$: BehaviorSubject; + +// The connection manager under test +let manager: ConnectionManager; + +beforeEach(() => { + testScope = new ObservableScope(); + + fakeConnectionFactory = {} as unknown as ConnectionFactory; + vi.mocked(fakeConnectionFactory).createConnection = vi + .fn() + .mockImplementation( + (transport: LivekitTransport, scope: ObservableScope) => { + const mockConnection = { + transport, + } as unknown as Connection; + vi.mocked(mockConnection).start = vi.fn(); + vi.mocked(mockConnection).stop = vi.fn(); + // Tie the connection's lifecycle to the scope to test scope lifecycle management + scope.onEnd(() => { + void mockConnection.stop(); + }); + return mockConnection; + }, + ); + + testTransportStream$ = new BehaviorSubject([]); + + manager = new ConnectionManager(testScope, fakeConnectionFactory, logger); + manager.registerTransports(testTransportStream$); +}); + +afterEach(() => { + testScope.end(); +}); + +describe("connections$ stream", () => { + test("Should create and start new connections for each transports", async () => { + const managedConnections = Promise.withResolvers(); + manager.connections$.subscribe((connections) => { + if (connections.length > 0) managedConnections.resolve(connections); + }); + + testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); + + const connections = await managedConnections.promise; + + expect(connections.length).toBe(2); + + expect( + vi.mocked(fakeConnectionFactory).createConnection, + ).toHaveBeenCalledTimes(2); + + const conn1 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_1), + ); + expect(conn1).toBeDefined(); + expect(conn1!.start).toHaveBeenCalled(); + + const conn2 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_2), + ); + expect(conn2).toBeDefined(); + expect(conn2!.start).toHaveBeenCalled(); + }); + + test("Should start connection only once", async () => { + const observedConnections: Connection[][] = []; + manager.connections$.subscribe((connections) => { + observedConnections.push(connections); + }); + + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); + + await flushPromises(); + const connections = observedConnections.pop()!; + + expect(connections.length).toBe(2); + expect( + vi.mocked(fakeConnectionFactory).createConnection, + ).toHaveBeenCalledTimes(2); + + const conn2 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_2), + ); + expect(conn2).toBeDefined(); + + const conn1 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_1), + ); + expect(conn1).toBeDefined(); + expect(conn1!.start).toHaveBeenCalledOnce(); + }); + + test("Should cleanup connections when not needed anymore", async () => { + const observedConnections: Connection[][] = []; + manager.connections$.subscribe((connections) => { + observedConnections.push(connections); + }); + + testTransportStream$.next([TRANSPORT_1]); + testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); + + await flushPromises(); + + const conn2 = observedConnections + .pop()! + .find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2))!; + + testTransportStream$.next([TRANSPORT_1]); + + await flushPromises(); + + // The second connection should have been stopped has it is no longer needed + expect(conn2.stop).toHaveBeenCalled(); + + // The first connection should still be active + const conn1 = observedConnections.pop()![0]; + expect(conn1.stop).not.toHaveBeenCalledOnce(); + }); +}); + +describe("connectionManagerData$ stream", () => { + // Used in test to control fake connections' participantsWithTrack$ streams + let fakePublishingParticipantsStreams: Map< + string, + BehaviorSubject + >; + + function keyForTransport(transport: LivekitTransport): string { + return `${transport.livekit_service_url}|${transport.livekit_alias}`; + } + + beforeEach(() => { + fakePublishingParticipantsStreams = new Map(); + // need a more advanced fake connection factory + vi.mocked(fakeConnectionFactory).createConnection = vi + .fn() + .mockImplementation( + (transport: LivekitTransport, scope: ObservableScope) => { + const fakePublishingParticipants$ = new BehaviorSubject< + LivekitParticipant[] + >([]); + const mockConnection = { + transport, + participantsWithTrack$: fakePublishingParticipants$, + } as unknown as Connection; + vi.mocked(mockConnection).start = vi.fn(); + vi.mocked(mockConnection).stop = vi.fn(); + // Tie the connection's lifecycle to the scope to test scope lifecycle management + scope.onEnd(() => { + void mockConnection.stop(); + }); + + fakePublishingParticipantsStreams.set( + keyForTransport(transport), + fakePublishingParticipants$, + ); + return mockConnection; + }, + ); + }); + + test("Should report connections with the publishing participants", async () => { + const managerDataUpdates: ConnectionManagerData[] = []; + manager.connectionManagerData$.subscribe((data) => { + managerDataUpdates.push(data); + }); + + testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); + await flushPromises(); + + const conn1Participants$ = fakePublishingParticipantsStreams.get( + keyForTransport(TRANSPORT_1), + )!; + + conn1Participants$.next([{ identity: "user1A" } as LivekitParticipant]); + + const conn2Participants$ = fakePublishingParticipantsStreams.get( + keyForTransport(TRANSPORT_2), + )!; + conn2Participants$.next([{ identity: "user2A" } as LivekitParticipant]); + + conn1Participants$.next([ + { identity: "user1A" } as LivekitParticipant, + { identity: "user1B" } as LivekitParticipant, + ]); + + testTransportStream$.next([TRANSPORT_1, TRANSPORT_2, TRANSPORT_3]); + + expect(managerDataUpdates[0].getConnections().length).toEqual(0); + + { + const data = managerDataUpdates[1]; + expect(data.getConnections().length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(0); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(0); + } + + { + const data = managerDataUpdates[2]; + expect(data.getConnections().length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(1); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( + "user1A", + ); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(0); + } + { + const data = managerDataUpdates[3]; + expect(data.getConnections().length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(1); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( + "user1A", + ); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); + expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( + "user2A", + ); + } + + { + const data = managerDataUpdates[4]; + expect(data.getConnections().length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( + "user1A", + ); + expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toEqual( + "user1B", + ); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); + expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( + "user2A", + ); + } + + { + const data = managerDataUpdates[5]; + expect(data.getConnections().length).toEqual(3); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(2); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( + "user1A", + ); + expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toEqual( + "user1B", + ); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); + expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( + "user2A", + ); + + expect(data.getParticipantForTransport(TRANSPORT_3).length).toEqual(0); + } + }); +}); diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index 845c2af0..1f4b3a90 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -2,6 +2,7 @@ // - make ConnectionManager its own actual class /* +Copyright 2025 Element Creations Ltd. Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -14,52 +15,84 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; -import { - type E2EEOptions, - Room as LivekitRoom, - type Participant as LivekitParticipant, - type RoomOptions, -} from "livekit-client"; -import { type MatrixClient } from "matrix-js-sdk"; +import { type Participant as LivekitParticipant } from "livekit-client"; import { type Behavior } from "../Behavior"; -import { Connection } from "./Connection"; +import { type Connection } from "./Connection"; import { type ObservableScope } from "../ObservableScope"; import { generateKeyed$ } from "../../utils/observable"; import { areLivekitTransportsEqual } from "./matrixLivekitMerger"; -import { getUrlParams } from "../../UrlParams"; -import { type ProcessorState } from "../../livekit/TrackProcessorContext"; -import { type MediaDevices } from "../MediaDevices"; -import { defaultLiveKitOptions } from "../../livekit/options"; +import { type ConnectionFactory } from "./ConnectionFactory.ts"; + +export class ConnectionManagerData { + private readonly store: Map = + new Map(); + + public constructor() {} + + public add(connection: Connection, participants: LivekitParticipant[]): void { + const key = this.getKey(connection.transport); + const existing = this.store.get(key); + if (!existing) { + this.store.set(key, [connection, participants]); + } else { + existing[1].push(...participants); + } + } + + private getKey(transport: LivekitTransport): string { + return transport.livekit_service_url + "|" + transport.livekit_alias; + } + + public getConnections(): Connection[] { + return Array.from(this.store.values()).map(([connection]) => connection); + } + + public getConnectionForTransport( + transport: LivekitTransport, + ): Connection | undefined { + return this.store.get(this.getKey(transport))?.[0]; + } + + public getParticipantForTransport( + transport: LivekitTransport, + ): LivekitParticipant[] { + const key = transport.livekit_service_url + "|" + transport.livekit_alias; + const existing = this.store.get(key); + if (existing) { + return existing[1]; + } + return []; + } + /** + * Get all connections where the given participant is publishing. + * In theory, there could be several connections where the same participant is publishing but with + * only well behaving clients a participant should only be publishing on a single connection. + * @param participantId + */ + public getConnectionsForParticipant( + participantId: ParticipantId, + ): Connection[] { + const connections: Connection[] = []; + for (const [connection, participants] of this.store.values()) { + if (participants.some((p) => p.identity === participantId)) { + connections.push(connection); + } + } + return connections; + } +} -export type ParticipantByMemberIdMap = Map< - ParticipantId, - // It can be an array because a bad behaving client could be publishingParticipants$ - // multiple times to several livekit rooms. - { participant: LivekitParticipant; connection: Connection }[] ->; // TODO - write test for scopes (do we really need to bind scope) export class ConnectionManager { - private livekitRoomFactory: () => LivekitRoom; + private readonly logger: Logger; + public constructor( - private scope: ObservableScope, - private client: MatrixClient, - private devices: MediaDevices, - private processorState$: Behavior, - private e2eeLivekitOptions: E2EEOptions | undefined, - private logger?: Logger, - livekitRoomFactory?: () => LivekitRoom, + private readonly scope: ObservableScope, + private readonly connectionFactory: ConnectionFactory, + logger: Logger, ) { - this.scope = scope; - const defaultFactory = (): LivekitRoom => - new LivekitRoom( - generateRoomOption( - this.devices, - this.processorState$.value, - this.e2eeLivekitOptions, - ), - ); - this.livekitRoomFactory = livekitRoomFactory ?? defaultFactory; + this.logger = logger.getChild("ConnectionManager"); } /** @@ -94,6 +127,7 @@ export class ConnectionManager { ), ), ), + [], ); /** @@ -108,26 +142,23 @@ export class ConnectionManager { transport: LivekitTransport, ): ((scope: ObservableScope) => Connection) => (scope) => { - const connection = new Connection( - { - transport, - client: this.client, - scope: scope, - livekitRoomFactory: this.livekitRoomFactory, - }, + const connection = this.connectionFactory.createConnection( + transport, + scope, this.logger, ); + // Start the connection immediately + // Use connection state to track connection progress void connection.start(); + // TODO subscribe to connection state to retry or log issues? return connection; }; - const connections = transports.map((transport) => { + return transports.map((transport) => { const key = transport.livekit_service_url + "|" + transport.livekit_alias; return createOrGet(key, createConnection(transport)); }); - - return connections; }, ), ); @@ -186,67 +217,39 @@ export class ConnectionManager { this.transportsSubscriptions$.next([]); } - // We have a lost of connections, for each of these these - // connection we create a stream of (participant, connection) tuples. - // Then we combine the several streams (1 per Connection) into a single stream of tuples. - private allParticipantsWithConnection$ = this.scope.behavior( - this.connections$.pipe( - switchMap((connections) => { - const listsOfParticipantWithConnection = connections.map( - (connection) => { - return connection.participantsWithTrack$.pipe( - map((participants) => - participants.map((p) => ({ - participant: p, + public connectionManagerData$: Behavior = + this.scope.behavior( + this.connections$.pipe( + switchMap((connections) => { + // Map the connections to list of (connection, participant[])[] tuples + const listOfConnectionsWithPublishingParticipants = connections.map( + (connection) => { + return connection.participantsWithTrack$.pipe( + map((participants): [Connection, LivekitParticipant[]] => [ connection, - })), - ), - ); - }, - ); - return combineLatest(listsOfParticipantWithConnection).pipe( - map((lists) => lists.flatMap((list) => list)), - ); - }), - ), - ); - - /** - * This field makes the connection manager to behave as close to a single SFU as possible. - * Each participant that is found on all connections managed by the manager will be listed. - * - * They are stored an a map keyed by `participant.identity` - * TODO (which is equivalent to the `member.id` field in the `m.rtc.member` event) right now its userId:deviceId - */ - public allParticipantsByMemberId$ = this.scope.behavior( - this.allParticipantsWithConnection$.pipe( - map((participantsWithConnections) => { - const participantsByMemberId = participantsWithConnections.reduce( - (acc, test) => { - const { participant, connection } = test; - if (participant.getTrackPublications().length > 0) { - const currentVal = acc.get(participant.identity); - if (!currentVal) { - acc.set(participant.identity, [{ connection, participant }]); - } else { - // already known - // This is for users publishing on several SFUs - currentVal.push({ connection, participant }); - this.logger?.info( - `Participant ${participant.identity} is publishing on several SFUs ${currentVal.map((v) => v.connection.transport.livekit_service_url).join(", ")}`, - ); - } - } - return acc; - }, - new Map() as ParticipantByMemberIdMap, - ); - - return participantsByMemberId; - }), - ), - ); + participants, + ]), + ); + }, + ); + // combineLatest the several streams into a single stream with the ConnectionManagerData + return combineLatest( + listOfConnectionsWithPublishingParticipants, + ).pipe( + map((lists) => + lists.reduce((data, [connection, participants]) => { + data.add(connection, participants); + return data; + }, new ConnectionManagerData()), + ), + ); + }), + ), + // start empty + new ConnectionManagerData(), + ); } + function removeDuplicateTransports( transports: LivekitTransport[], ): LivekitTransport[] { @@ -256,37 +259,3 @@ function removeDuplicateTransports( return acc; }, [] as LivekitTransport[]); } - -/** - * Generate the initial LiveKit RoomOptions based on the current media devices and processor state. - */ -function generateRoomOption( - devices: MediaDevices, - processorState: ProcessorState, - e2eeLivekitOptions: E2EEOptions | undefined, -): RoomOptions { - const { controlledAudioDevices } = getUrlParams(); - return { - ...defaultLiveKitOptions, - videoCaptureDefaults: { - ...defaultLiveKitOptions.videoCaptureDefaults, - deviceId: devices.videoInput.selected$.value?.id, - processor: processorState.processor, - }, - audioCaptureDefaults: { - ...defaultLiveKitOptions.audioCaptureDefaults, - deviceId: devices.audioInput.selected$.value?.id, - }, - audioOutput: { - // When using controlled audio devices, we don't want to set the - // deviceId here, because it will be set by the native app. - // (also the id does not need to match a browser device id) - deviceId: controlledAudioDevices - ? undefined - : devices.audioOutput.selected$.value?.id, - }, - e2ee: e2eeLivekitOptions, - // TODO test and consider this: - // webAudioMix: true, - }; -} diff --git a/src/state/remoteMembers/MatrixLivekitMerger.test.ts b/src/state/remoteMembers/MatrixLivekitMerger.test.ts index df7aca0d..e3f08405 100644 --- a/src/state/remoteMembers/MatrixLivekitMerger.test.ts +++ b/src/state/remoteMembers/MatrixLivekitMerger.test.ts @@ -6,25 +6,253 @@ Please see LICENSE in the repository root for full details. */ import { + describe, test, vi, - onTestFinished, - it, - describe, expect, beforeEach, afterEach, + type MockedObject, } from "vitest"; +import { BehaviorSubject, take } from "rxjs"; +import { + type CallMembership, + type LivekitTransport, +} from "matrix-js-sdk/lib/matrixrtc"; +import { type Room as MatrixRoom } from "matrix-js-sdk"; +import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils"; -import { MatrixLivekitMerger } from "./matrixLivekitMerger"; +import { + type MatrixLivekitItem, + MatrixLivekitMerger, +} from "./matrixLivekitMerger"; import { ObservableScope } from "../ObservableScope"; +import { + type ConnectionManager, + ConnectionManagerData, +} from "./ConnectionManager"; +import { aliceRtcMember } from "../../utils/test-fixtures"; +import { mockRemoteParticipant } from "../../utils/test.ts"; +import { type Connection } from "./Connection.ts"; let testScope: ObservableScope; +let fakeManagerData$: BehaviorSubject; +let fakeMemberships$: BehaviorSubject; +let mockConnectionManager: MockedObject; +let mockMatrixRoom: MatrixRoom; +const userId = "@local:example.com"; +const deviceId = "DEVICE000"; + +// The merger beeing tested +let matrixLivekitMerger: MatrixLivekitMerger; beforeEach(() => { testScope = new ObservableScope(); + fakeMemberships$ = new BehaviorSubject([]); + fakeManagerData$ = new BehaviorSubject( + new ConnectionManagerData(), + ); + mockConnectionManager = vi.mocked({ + registerTransports: vi.fn(), + connectionManagerData$: fakeManagerData$, + } as unknown as ConnectionManager); + mockMatrixRoom = vi.mocked({ + getMember: vi.fn().mockReturnValue(null), + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + } as unknown as MatrixRoom); + + matrixLivekitMerger = new MatrixLivekitMerger( + testScope, + fakeMemberships$, + mockConnectionManager, + mockMatrixRoom, + userId, + deviceId, + ); }); afterEach(() => { testScope.end(); }); + +test("should signal participant not yet connected to livekit", () => { + fakeMemberships$.next([aliceRtcMember]); + + let items: MatrixLivekitItem[] = []; + matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((emitted) => { + items = emitted; + }); + + expect(items).toHaveLength(1); + const item = items[0]; + + // Assert the expected membership + expect(item.membership).toBe(aliceRtcMember); + + // Assert participant & connection are absent (not just `undefined`) + expect(item.participant).not.toBeDefined(); + expect(item.participant).not.toBeDefined(); +}); + +test("should signal participant on a connection that is publishing", () => { + const fakeConnection = { + transport: aliceRtcMember.getTransport(aliceRtcMember) as LivekitTransport, + } as unknown as Connection; + + fakeMemberships$.next([aliceRtcMember]); + const aliceParticipantId = getParticipantId( + aliceRtcMember.userId, + aliceRtcMember.deviceId, + ); + + const managerData: ConnectionManagerData = new ConnectionManagerData(); + managerData.add(fakeConnection, [ + mockRemoteParticipant({ identity: aliceParticipantId }), + ]); + fakeManagerData$.next(managerData); + + let items: MatrixLivekitItem[] = []; + matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((emitted) => { + items = emitted; + }); + expect(items).toHaveLength(1); + const item = items[0]; + + // Assert the expected membership + expect(item.membership).toBe(aliceRtcMember); + expect(item.participant?.identity).toBe(aliceParticipantId); + expect(item.connection?.transport).toEqual(fakeConnection.transport); +}); + +test("should signal participant on a connection that is not publishing", () => { + const fakeConnection = { + transport: aliceRtcMember.getTransport(aliceRtcMember) as LivekitTransport, + } as unknown as Connection; + + fakeMemberships$.next([aliceRtcMember]); + + const managerData: ConnectionManagerData = new ConnectionManagerData(); + managerData.add(fakeConnection, []); + fakeManagerData$.next(managerData); + + matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((items) => { + expect(items).toHaveLength(1); + const item = items[0]; + + // Assert the expected membership + expect(item.membership).toBe(aliceRtcMember); + expect(item.participant).not.toBeDefined(); + // We have the connection + expect(item.connection?.transport).toEqual(fakeConnection.transport); + }); +}); + +describe("Publication edge case", () => { + const connectionA = { + transport: { + type: "livekit", + livekit_service_url: "https://lk.example.org", + livekit_alias: "!alias:example.org", + }, + } as unknown as Connection; + + const connectionB = { + transport: { + type: "livekit", + livekit_service_url: "https://lk.sample.com", + livekit_alias: "!alias:sample.com", + }, + } as unknown as Connection; + + const bobMembership = { + userId: "@bob:example.org", + deviceId: "DEV000", + transports: [connectionA.transport], + } as unknown as CallMembership; + + const bobParticipantId = getParticipantId( + bobMembership.userId, + bobMembership.deviceId, + ); + + test("bob is publishing in several connections", () => { + let lastMatrixLkItems: MatrixLivekitItem[] = []; + matrixLivekitMerger.matrixLivekitItems$.subscribe((items) => { + lastMatrixLkItems = items; + }); + + vi.mocked(bobMembership).getTransport = vi + .fn() + .mockReturnValue(connectionA.transport); + + fakeMemberships$.next([bobMembership]); + + const lkMap = new ConnectionManagerData(); + lkMap.add(connectionA, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); + lkMap.add(connectionB, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); + + fakeManagerData$.next(lkMap); + + const items = lastMatrixLkItems; + expect(items).toHaveLength(1); + const item = items[0]; + + // Assert the expected membership + expect(item.membership.userId).toEqual(bobMembership.userId); + expect(item.membership.deviceId).toEqual(bobMembership.deviceId); + + expect(item.participant?.identity).toEqual(bobParticipantId); + + // The transport info should come from the membership transports and not only from the publishing connection + expect(item.connection?.transport?.livekit_service_url).toEqual( + bobMembership.transports[0]?.livekit_service_url, + ); + expect(item.connection?.transport?.livekit_alias).toEqual( + bobMembership.transports[0]?.livekit_alias, + ); + }); + + test("bob is publishing in the wrong connection", () => { + let lastMatrixLkItems: MatrixLivekitItem[] = []; + matrixLivekitMerger.matrixLivekitItems$.subscribe((items) => { + lastMatrixLkItems = items; + }); + + vi.mocked(bobMembership).getTransport = vi + .fn() + .mockReturnValue(connectionA.transport); + + fakeMemberships$.next([bobMembership]); + + const lkMap = new ConnectionManagerData(); + lkMap.add(connectionA, []); + lkMap.add(connectionB, [ + mockRemoteParticipant({ identity: bobParticipantId }), + ]); + + fakeManagerData$.next(lkMap); + + const items = lastMatrixLkItems; + expect(items).toHaveLength(1); + const item = items[0]; + + // Assert the expected membership + expect(item.membership.userId).toEqual(bobMembership.userId); + expect(item.membership.deviceId).toEqual(bobMembership.deviceId); + + expect(item.participant).not.toBeDefined(); + + // The transport info should come from the membership transports and not only from the publishing connection + expect(item.connection?.transport?.livekit_service_url).toEqual( + bobMembership.transports[0]?.livekit_service_url, + ); + expect(item.connection?.transport?.livekit_alias).toEqual( + bobMembership.transports[0]?.livekit_alias, + ); + }); +}); diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index bd6ed353..f2f106f2 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -13,7 +13,7 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, startWith, type Observable } from "rxjs"; // eslint-disable-next-line rxjs/no-internal -import { type HasEventTargetAddRemove } from "rxjs/internal/observable/fromEvent"; +import { type NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEvent.ts"; import type { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; // import type { Logger } from "matrix-js-sdk/lib/logger"; @@ -71,8 +71,7 @@ export class MatrixLivekitMerger { // apparently needed to get a room member to later get the Avatar // => Extract an AvatarService instead? // Better with just `getMember` - private matrixRoom: Pick & - HasEventTargetAddRemove, + private matrixRoom: Pick & NodeStyleEventEmitter, private userId: string, private deviceId: string, // parentLogger: Logger, @@ -102,28 +101,32 @@ export class MatrixLivekitMerger { return combineLatest([ membershipsWithTransport$, - this.connectionManager.allParticipantsByMemberId$, + this.connectionManager.connectionManagerData$, displaynameMap$, ]).pipe( - map(([memberships, participantsByMemberId, displayNameMap]) => { + map(([memberships, managerData, displayNameMap]) => { const items: MatrixLivekitItem[] = memberships.map( ({ membership, transport }) => { - const participantsWithConnection = participantsByMemberId.get( - // membership.membershipID, Currently its hardcoded by the jwt service to - `${membership.userId}:${membership.deviceId}`, + // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to + const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; + + const participants = transport + ? managerData.getParticipantForTransport(transport) + : []; + const participant = participants.find( + (p) => p.identity == participantId, ); - const participant = - transport && - participantsWithConnection?.find((p) => - areLivekitTransportsEqual(p.connection.transport, transport), - ); const member = getRoomMemberFromRtcMember( membership, this.matrixRoom, )?.member; + const connection = transport + ? managerData.getConnectionForTransport(transport) + : undefined; return { - ...participant, + participant, membership, + connection, // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) member, displayName: displayNameMap.get(membership.membershipID) ?? "---",