From 870b7066722bf559ed2d6fd5e2bd7bb72606822a Mon Sep 17 00:00:00 2001 From: Valere Date: Tue, 4 Nov 2025 17:13:28 +0100 Subject: [PATCH] Connection & Livekit integ test WIP --- src/state/ownMember/OwnMembership.ts | 15 +- src/state/remoteMembers/Connection.ts | 16 +- .../remoteMembers/ConnectionManager.test.ts | 34 ++-- src/state/remoteMembers/displayname.ts | 28 ++-- src/state/remoteMembers/integration.test.ts | 157 ++++++++++++++++++ .../remoteMembers/matrixLivekitMerger.ts | 1 + 6 files changed, 208 insertions(+), 43 deletions(-) create mode 100644 src/state/remoteMembers/integration.test.ts diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts index 3b6d1987..c096c2da 100644 --- a/src/state/ownMember/OwnMembership.ts +++ b/src/state/ownMember/OwnMembership.ts @@ -5,8 +5,7 @@ SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type E2EEOptions } from "livekit-client"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { type E2EEOptions, type Track } from "livekit-client"; import { type LivekitTransport, type MatrixRTCSession, @@ -22,6 +21,7 @@ import { import { BehaviorSubject, combineLatest, + distinctUntilChanged, from, fromEvent, map, @@ -31,19 +31,20 @@ import { startWith, switchMap, } from "rxjs"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; import { multiSfu } from "../../settings/settings"; import { type Behavior } from "../Behavior"; import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; import { makeTransport } from "../../rtcSessionHelpers"; import { type ObservableScope } from "../ObservableScope"; -import { async$, unwrapAsync } from "../Async"; import { Publisher } from "./Publisher"; import { type MuteStates } from "../MuteStates"; import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type MediaDevices } from "../../state/MediaDevices"; import { and$ } from "../../utils/observable"; import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger"; +import { type ElementCallError } from "../../utils/errors.ts"; /* * - get well known @@ -70,6 +71,10 @@ interface Props { trackerProcessorState$: Behavior; } +export type JoinedState = + | { state: "Initialized" } + | { state: "Error"; error: ElementCallError }; + /** * This class is responsible for managing the own membership in a room. * We want @@ -96,11 +101,11 @@ export const ownMembership$ = ({ trackerProcessorState$, }: Props): { // publisher: Publisher - requestJoin(): Observable; + requestJoin$(): Observable; startTracks(): Track[]; } => { // This should be used in a combineLatest with publisher$ to connect. - const shouldStartTracks$ = BehaviorSubject(false); + const shouldStartTracks$ = new BehaviorSubject(false); // to make it possible to call startTracks before the preferredTransport$ has resolved. const startTracks = () => { diff --git a/src/state/remoteMembers/Connection.ts b/src/state/remoteMembers/Connection.ts index 67b2dc8e..03a9e137 100644 --- a/src/state/remoteMembers/Connection.ts +++ b/src/state/remoteMembers/Connection.ts @@ -211,16 +211,12 @@ export class Connection { this.client = client; this.participantsWithTrack$ = scope.behavior( - connectedParticipantsObserver( - this.livekitRoom, - // VALR: added that while I think about it - { - additionalRoomEvents: [ - RoomEvent.TrackPublished, - RoomEvent.TrackUnpublished, - ], - }, - ), + connectedParticipantsObserver(this.livekitRoom, { + additionalRoomEvents: [ + RoomEvent.TrackPublished, + RoomEvent.TrackUnpublished, + ], + }), [], ); diff --git a/src/state/remoteMembers/ConnectionManager.test.ts b/src/state/remoteMembers/ConnectionManager.test.ts index 79881f66..48c897e3 100644 --- a/src/state/remoteMembers/ConnectionManager.test.ts +++ b/src/state/remoteMembers/ConnectionManager.test.ts @@ -12,9 +12,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Participant as LivekitParticipant } from "livekit-client"; import { ObservableScope } from "../ObservableScope.ts"; -import { - ConnectionManager, -} from "./ConnectionManager.ts"; +import { ConnectionManager } from "./ConnectionManager.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type Connection } from "./Connection.ts"; import { areLivekitTransportsEqual } from "./matrixLivekitMerger.ts"; @@ -34,11 +32,11 @@ const TRANSPORT_2: LivekitTransport = { livekit_alias: "!alias:sample.com", }; -const TRANSPORT_3: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk-other.sample.com", - livekit_alias: "!alias:sample.com", -}; +// const TRANSPORT_3: LivekitTransport = { +// type: "livekit", +// livekit_service_url: "https://lk-other.sample.com", +// livekit_alias: "!alias:sample.com", +// }; let testScope: ObservableScope; let fakeConnectionFactory: ConnectionFactory; @@ -211,8 +209,8 @@ describe("connectionManagerData$ stream", () => { ); }); - test("Should report connections with the publishing participants", async () => { - withTestScheduler(({ expectObservable, schedule, cold, behavior }) => { + test("Should report connections with the publishing participants", () => { + withTestScheduler(({ expectObservable, schedule, behavior }) => { manager.registerTransports( behavior("a", { a: [TRANSPORT_1, TRANSPORT_2], @@ -257,7 +255,7 @@ describe("connectionManagerData$ stream", () => { ); }), b: expect.toSatisfy((data) => { - return ( + return ( data.getConnections().length == 2 && data.getParticipantForTransport(TRANSPORT_1).length == 1 && data.getParticipantForTransport(TRANSPORT_2).length == 0 && @@ -265,26 +263,28 @@ describe("connectionManagerData$ stream", () => { ); }), c: expect.toSatisfy((data) => { - return ( + return ( data.getConnections().length == 2 && data.getParticipantForTransport(TRANSPORT_1).length == 1 && data.getParticipantForTransport(TRANSPORT_2).length == 1 && - data.getParticipantForTransport(TRANSPORT_1)[0].identity == "user1A"&& + data.getParticipantForTransport(TRANSPORT_1)[0].identity == + "user1A" && data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A" ); }), d: expect.toSatisfy((data) => { - return ( + return ( data.getConnections().length == 2 && data.getParticipantForTransport(TRANSPORT_1).length == 2 && data.getParticipantForTransport(TRANSPORT_2).length == 1 && - data.getParticipantForTransport(TRANSPORT_1)[0].identity == "user1A"&& - data.getParticipantForTransport(TRANSPORT_1)[1].identity == "user1B"&& + data.getParticipantForTransport(TRANSPORT_1)[0].identity == + "user1A" && + data.getParticipantForTransport(TRANSPORT_1)[1].identity == + "user1B" && data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A" ); }), }); }); }); - }); diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index 67e11f99..825ad5a1 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -6,7 +6,13 @@ Please see LICENSE in the repository root for full details. */ import { type RoomMember, RoomStateEvent } from "matrix-js-sdk"; -import { combineLatest, fromEvent, type Observable, startWith } from "rxjs"; +import { + combineLatest, + fromEvent, + map, + type Observable, + startWith, +} from "rxjs"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; @@ -36,15 +42,14 @@ export const memberDisplaynames$ = ( deviceId: string, ): Behavior> => scope.behavior( - combineLatest( - [ - // Handle call membership changes - memberships$, - // Additionally handle display name changes (implicitly reacting to them) - fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), - // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), - ], - (memberships, _displaynames) => { + combineLatest([ + // Handle call membership changes + memberships$, + // Additionally handle display name changes (implicitly reacting to them) + fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), + // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), + ]).pipe( + map((memberships, _displaynames) => { const displaynameMap = new Map([ [ `${userId}:${deviceId}`, @@ -71,8 +76,9 @@ export const memberDisplaynames$ = ( ); } return displaynameMap; - }, + }), ), + new Map(), ); export function getRoomMemberFromRtcMember( diff --git a/src/state/remoteMembers/integration.test.ts b/src/state/remoteMembers/integration.test.ts new file mode 100644 index 00000000..36594680 --- /dev/null +++ b/src/state/remoteMembers/integration.test.ts @@ -0,0 +1,157 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { test, vi, beforeEach, afterEach } from "vitest"; +import { BehaviorSubject, type Observable } from "rxjs"; +import { type Room as LivekitRoom } from "livekit-client"; +import { logger } from "matrix-js-sdk/lib/logger"; +import EventEmitter from "events"; +import fetchMock from "fetch-mock"; + +import { ConnectionManager } from "./ConnectionManager.ts"; +import { ObservableScope } from "../ObservableScope.ts"; +import { ECConnectionFactory } from "./ConnectionFactory.ts"; +import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; +import { mockMediaDevices, withTestScheduler } from "../../utils/test"; +import { type ProcessorState } from "../../livekit/TrackProcessorContext.tsx"; +import { MatrixLivekitMerger } from "./matrixLivekitMerger.ts"; +import type { CallMembership, Transport } from "matrix-js-sdk/lib/matrixrtc"; +import { TRANSPORT_1 } from "./ConnectionManager.test.ts"; + +// Test the integration of ConnectionManager and MatrixLivekitMerger + +let testScope: ObservableScope; +let ecConnectionFactory: ECConnectionFactory; +let mockClient: OpenIDClientParts; +let lkRoomFactory: () => LivekitRoom; + +const createdMockLivekitRooms: Map = new Map(); + +// Main test input +const memberships$ = new BehaviorSubject([]); + +// under test +let connectionManager: ConnectionManager; + +function createLkMerger( + memberships$: Observable, +): MatrixLivekitMerger { + const mockRoomEmitter = new EventEmitter(); + return new MatrixLivekitMerger( + testScope, + memberships$, + connectionManager, + { + on: mockRoomEmitter.on.bind(mockRoomEmitter), + off: mockRoomEmitter.off.bind(mockRoomEmitter), + getMember: vi.fn().mockReturnValue(undefined), + }, + "@user:example.com", + "DEV000", + ); +} + +beforeEach(() => { + testScope = new ObservableScope(); + mockClient = { + getOpenIdToken: vi.fn().mockReturnValue(""), + getDeviceId: vi.fn().mockReturnValue("DEV000"), + }; + + lkRoomFactory = vi.fn().mockImplementation(() => { + const emitter = new EventEmitter(); + const base = { + on: emitter.on.bind(emitter), + off: emitter.off.bind(emitter), + emit: emitter.emit.bind(emitter), + disconnect: vi.fn(), + remoteParticipants: new Map(), + } as unknown as LivekitRoom; + + vi.mocked(base).connect = vi.fn().mockImplementation(({ url }) => { + createdMockLivekitRooms.set(url, base); + }); + return base; + }); + + ecConnectionFactory = new ECConnectionFactory( + mockClient, + mockMediaDevices({}), + new BehaviorSubject({ + supported: true, + processor: undefined, + }), + undefined, + false, + lkRoomFactory, + ); + + connectionManager = new ConnectionManager( + testScope, + ecConnectionFactory, + logger, + ); + + //TODO a bit annoying to have to do a http mock? + fetchMock.post(`**/sfu/get`, (url) => { + const domain = new URL(url).hostname; // Extract the domain from the URL + + return { + status: 200, + body: { + url: `wss://${domain}/livekit/sfu`, + jwt: "ATOKEN", + }, + }; + }); +}); + +afterEach(() => { + testScope.end(); + fetchMock.reset(); +}); + +test("example test", () => { + withTestScheduler(({ schedule, expectObservable, cold }) => { + connectionManager.connections$.subscribe((connections) => { + // console.log( + // "Connections updated:", + // connections.map((c) => c.transport), + // ); + }); + + const memberships$ = cold("-a-b-c", { + a: [mockCallmembership("@bob:example.com", "BDEV000")], + b: [ + mockCallmembership("@bob:example.com", "BDEV000"), + mockCallmembership("@carl:example.com", "CDEV000"), + ], + c: [ + mockCallmembership("@bob:example.com", "BDEV000"), + mockCallmembership("@carl:example.com", "CDEV000"), + mockCallmembership("@dave:foo.bar", "DDEV000"), + ], + }); + + // TODO IN PROGRESS + const merger = createLkMerger(memberships$); + }); +}); + +function mockCallmembership( + userId: string, + deviceId: string, + transport?: Transport, +): CallMembership { + const t = transport ?? TRANSPORT_1; + return { + userId: userId, + deviceId: deviceId, + getTransport: vi.fn().mockReturnValue(t), + transports: [t], + } as unknown as CallMembership; +} diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index f2f106f2..1487636c 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -177,6 +177,7 @@ export class MatrixLivekitMerger { }); }), ), + [], ); } }