From 0502f66e21ac31ce07e5511f96aca3fab676c7fa Mon Sep 17 00:00:00 2001 From: Valere Date: Thu, 2 Oct 2025 12:53:59 +0200 Subject: [PATCH] tests: Add publisher observable tests --- src/state/Connection.test.ts | 203 +++++++++++++++++++++++++++-------- 1 file changed, 159 insertions(+), 44 deletions(-) diff --git a/src/state/Connection.test.ts b/src/state/Connection.test.ts index 692aee86..5c725e83 100644 --- a/src/state/Connection.test.ts +++ b/src/state/Connection.test.ts @@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, describe, expect, it, type MockedObject, vi } from "vitest"; import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject } from "rxjs"; -import { ConnectionState, type Room as LivekitRoom, RoomEvent } from "livekit-client"; +import { ConnectionState, type RemoteParticipant, type Room as LivekitRoom, RoomEvent } from "livekit-client"; import fetchMock from "fetch-mock"; import EventEmitter from "events"; import { type IOpenIDToken } from "matrix-js-sdk"; @@ -31,8 +31,8 @@ let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; focu const livekitFocus: LivekitFocus = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", - type: "livekit", -} + type: "livekit" +}; function setupTest(): void { testScope = new ObservableScope(); @@ -45,7 +45,7 @@ function setupTest(): void { "expires_in": 3600 } ), - getDeviceId: vi.fn().mockReturnValue("ABCDEF"), + getDeviceId: vi.fn().mockReturnValue("ABCDEF") } as unknown as OpenIDClientParts); fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>([]); @@ -60,7 +60,7 @@ function setupTest(): void { off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter), addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter), removeListener: fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), - removeAllListeners: fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), + removeAllListeners: fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter) } as unknown as LivekitRoom); } @@ -72,8 +72,8 @@ function setupRemoteConnection(): RemoteConnection { focus: livekitFocus, membershipsFocusMap$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom, - } + livekitRoomFactory: () => fakeLivekitRoom + }; fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, () => { @@ -82,9 +82,9 @@ function setupRemoteConnection(): RemoteConnection { body: { "url": "wss://matrix-rtc.m.localhost/livekit/sfu", - "jwt": "ATOKEN", - }, - } + "jwt": "ATOKEN" + } + }; } ); @@ -94,7 +94,7 @@ function setupRemoteConnection(): RemoteConnection { return new RemoteConnection( opts, - undefined, + undefined ); } @@ -105,7 +105,7 @@ describe("Start connection states", () => { vi.useRealTimers(); vi.clearAllMocks(); fetchMock.reset(); - }) + }); it("start in initialized state", () => { setupTest(); @@ -115,11 +115,11 @@ describe("Start connection states", () => { focus: livekitFocus, membershipsFocusMap$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom, - } + livekitRoomFactory: () => fakeLivekitRoom + }; const connection = new RemoteConnection( opts, - undefined, + undefined ); expect(connection.focusedConnectionState$.getValue().state) @@ -135,13 +135,13 @@ describe("Start connection states", () => { focus: livekitFocus, membershipsFocusMap$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom, - } + livekitRoomFactory: () => fakeLivekitRoom + }; const connection = new RemoteConnection( opts, - undefined, + undefined ); const capturedStates: FocusConnectionState[] = []; @@ -154,14 +154,14 @@ describe("Start connection states", () => { client.getOpenIdToken.mockImplementation(async (): Promise => { return await deferred.promise; - }) + }); connection.start() .catch(() => { // expected to throw - }) + }); - const capturedState = capturedStates.shift(); + let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); expect(capturedState!.state).toEqual("FetchingConfig"); @@ -169,6 +169,7 @@ describe("Start connection states", () => { await vi.runAllTimersAsync(); + capturedState = capturedStates.pop(); if (capturedState!.state === "FailedToStart") { expect(capturedState!.error.message).toEqual("Something went wrong"); expect(capturedState!.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); @@ -187,12 +188,12 @@ describe("Start connection states", () => { focus: livekitFocus, membershipsFocusMap$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom, - } + livekitRoomFactory: () => fakeLivekitRoom + }; const connection = new RemoteConnection( opts, - undefined, + undefined ); const capturedStates: FocusConnectionState[] = []; @@ -207,8 +208,8 @@ describe("Start connection states", () => { await deferredSFU.promise; return { status: 500, - body: "Internal Server Error", - } + body: "Internal Server Error" + }; } ); @@ -216,15 +217,17 @@ describe("Start connection states", () => { connection.start() .catch(() => { // expected to throw - }) + }); - const capturedState = capturedStates.shift(); - expect(capturedState).toBeDefined() + let capturedState = capturedStates.pop(); + expect(capturedState).toBeDefined(); expect(capturedState?.state).toEqual("FetchingConfig"); deferredSFU.resolve(); await vi.runAllTimersAsync(); + capturedState = capturedStates.pop(); + if (capturedState?.state === "FailedToStart") { expect(capturedState?.error.message).toContain("SFU Config fetch failed with exception Error"); expect(capturedState?.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); @@ -244,17 +247,17 @@ describe("Start connection states", () => { focus: livekitFocus, membershipsFocusMap$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom, - } + livekitRoomFactory: () => fakeLivekitRoom + }; const connection = new RemoteConnection( opts, - undefined, + undefined ); const capturedStates: FocusConnectionState[] = []; connection.focusedConnectionState$.subscribe((value) => { - capturedStates.push(value) + capturedStates.push(value); }); @@ -267,9 +270,9 @@ describe("Start connection states", () => { body: { "url": "wss://matrix-rtc.m.localhost/livekit/sfu", - "jwt": "ATOKEN", - }, - } + "jwt": "ATOKEN" + } + }; } ); @@ -283,16 +286,18 @@ describe("Start connection states", () => { connection.start() .catch(() => { // expected to throw - }) + }); - const capturedState = capturedStates.shift(); - expect(capturedState).toBeDefined() + let capturedState = capturedStates.pop(); + expect(capturedState).toBeDefined(); expect(capturedState?.state).toEqual("FetchingConfig"); deferredSFU.resolve(); await vi.runAllTimersAsync(); + capturedState = capturedStates.pop(); + if (capturedState && capturedState?.state === "FailedToStart") { expect(capturedState.error.message).toContain("Failed to connect to livekit"); expect(capturedState.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); @@ -304,7 +309,7 @@ describe("Start connection states", () => { it("connection states happy path", async () => { vi.useFakeTimers(); - setupTest() + setupTest(); const connection = setupRemoteConnection(); @@ -328,7 +333,7 @@ describe("Start connection states", () => { }); it("should relay livekit events once connected", async () => { - setupTest() + setupTest(); const connection = setupRemoteConnection(); @@ -346,8 +351,8 @@ describe("Start connection states", () => { ConnectionState.SignalReconnecting, ConnectionState.Connecting, ConnectionState.Connected, - ConnectionState.Reconnecting, - ] + ConnectionState.Reconnecting + ]; for (const state of states) { fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); } @@ -376,7 +381,7 @@ describe("Start connection states", () => { it("shutting down the scope should stop the connection", async () => { - setupTest() + setupTest(); vi.useFakeTimers(); const connection = setupRemoteConnection(); @@ -407,3 +412,113 @@ describe("Start connection states", () => { }); }); + + +function fakeRemoteLivekitParticipant(id: string): RemoteParticipant { + return vi.mocked({ + identity: id + } as unknown as RemoteParticipant); +} + +function fakeRtcMemberShip(userId: string, deviceId: string): CallMembership { + return vi.mocked({ + sender: userId, + deviceId: deviceId, + } as unknown as CallMembership); +} + +describe("Publishing participants observations", () => { + + + it("should emit the list of publishing participants", async () => { + setupTest(); + + const connection = setupRemoteConnection(); + + const bobIsAPublisher = Promise.withResolvers(); + const danIsAPublisher = Promise.withResolvers(); + const observedPublishers: { participant: RemoteParticipant; membership: CallMembership }[][] = []; + connection.publishingParticipants$.subscribe((publishers) => { + observedPublishers.push(publishers); + if (publishers.some((p) => p.participant.identity === "@bob:example.org:DEV111")) { + bobIsAPublisher.resolve(); + } + if (publishers.some((p) => p.participant.identity === "@dan:example.org:DEV333")) { + danIsAPublisher.resolve(); + } + }); + // The publishingParticipants$ observable is derived from the current members of the + // livekitRoom and the rtc membership in order to publish the members that are publishing + // on this connection. + + let participants: RemoteParticipant[]= [ + fakeRemoteLivekitParticipant("@alice:example.org:DEV000"), + fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), + fakeRemoteLivekitParticipant("@carol:example.org:DEV222"), + fakeRemoteLivekitParticipant("@dan:example.org:DEV333") + ]; + + // Let's simulate 3 members on the livekitRoom + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") + .mockReturnValue( + new Map(participants.map((p) => [p.identity, p])) + ); + + for (const participant of participants) { + fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); + } + + // At this point there should be no publishers + expect(observedPublishers.pop()!.length).toEqual(0); + + + const otherFocus: LivekitFocus = { + livekit_alias: "!roomID:example.org", + livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt", + type: "livekit" + } + + + const rtcMemberships = [ + // Say bob is on the same focus + { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), focus: livekitFocus }, + // Alice and carol is on a different focus + { membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), focus: otherFocus }, + { membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), focus: otherFocus }, + // NO DAVE YET + ]; + // signal this change in rtc memberships + fakeMembershipsFocusMap$.next(rtcMemberships); + + // We should have bob has a publisher now + await bobIsAPublisher.promise; + const publishers = observedPublishers.pop(); + expect(publishers?.length).toEqual(1); + expect(publishers?.[0].participant.identity).toEqual("@bob:example.org:DEV111"); + + // Now let's make dan join the rtc memberships + rtcMemberships + .push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), focus: livekitFocus }); + fakeMembershipsFocusMap$.next(rtcMemberships); + + // We should have bob and dan has publishers now + await danIsAPublisher.promise; + const twoPublishers = observedPublishers.pop(); + expect(twoPublishers?.length).toEqual(2); + expect(twoPublishers?.some((p) => p.participant.identity === "@bob:example.org:DEV111")).toBeTruthy(); + expect(twoPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy(); + + // Now let's make bob leave the livekit room + participants = participants.filter((p) => p.identity !== "@bob:example.org:DEV111"); + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") + .mockReturnValue( + new Map(participants.map((p) => [p.identity, p])) + ); + fakeRoomEventEmiter.emit(RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111")); + + const updatedPublishers = observedPublishers.pop(); + expect(updatedPublishers?.length).toEqual(1); + expect(updatedPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy(); + }) + +});