From fc842d4be7721446324922d26df01c8e0f2df6fa Mon Sep 17 00:00:00 2001 From: Valere Date: Sat, 8 Nov 2025 13:02:12 +0100 Subject: [PATCH] test: fixup ConnectionManager tests --- .../remoteMembers/ConnectionManager.test.ts | 361 ++++++++++-------- src/state/ObservableScope.test.ts | 2 +- 2 files changed, 196 insertions(+), 167 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index ec3c1b2f..17d127f3 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -10,15 +10,16 @@ import { BehaviorSubject } from "rxjs"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Participant as LivekitParticipant } from "livekit-client"; -import { ObservableScope } from "../../ObservableScope.ts"; +import { Epoch, ObservableScope } from "../../ObservableScope.ts"; import { - type IConnectionManager, createConnectionManager$, + type ConnectionManagerData, } from "./ConnectionManager.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type Connection } from "./Connection.ts"; -import { flushPromises, withTestScheduler } from "../../../utils/test.ts"; +import { withTestScheduler } from "../../../utils/test.ts"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; +import { type Behavior } from "../../Behavior.ts"; // Some test constants @@ -34,23 +35,15 @@ const TRANSPORT_2: LivekitTransport = { livekit_alias: "!alias:sample.com", }; -// const TRANSPORT_3: LivekitTransport = { -// type: "livekit", -// livekit_service_url: "https://lk-other.sample.com", -// livekit_alias: "!alias:sample.com", -// }; let fakeConnectionFactory: ConnectionFactory; let testScope: ObservableScope; -let testTransportStream$: BehaviorSubject; -let connectionManagerInputs: { - scope: ObservableScope; - connectionFactory: ConnectionFactory; - inputTransports$: BehaviorSubject; -}; -let manager: IConnectionManager; + +// Can be useful to track all created connections in tests, even the disposed ones +let allCreatedConnections: Connection[]; + beforeEach(() => { testScope = new ObservableScope(); - + allCreatedConnections = []; fakeConnectionFactory = {} as unknown as ConnectionFactory; vi.mocked(fakeConnectionFactory).createConnection = vi .fn() @@ -58,6 +51,7 @@ beforeEach(() => { (transport: LivekitTransport, scope: ObservableScope) => { const mockConnection = { transport, + participantsWithTrack$: new BehaviorSubject([]), } as unknown as Connection; vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).stop = vi.fn(); @@ -65,17 +59,10 @@ beforeEach(() => { scope.onEnd(() => { void mockConnection.stop(); }); + allCreatedConnections.push(mockConnection); return mockConnection; }, ); - - testTransportStream$ = new BehaviorSubject([]); - connectionManagerInputs = { - scope: testScope, - connectionFactory: fakeConnectionFactory, - inputTransports$: testTransportStream$, - }; - manager = createConnectionManager$(connectionManagerInputs); }); afterEach(() => { @@ -83,93 +70,122 @@ afterEach(() => { }); describe("connections$ stream", () => { - test("Should create and start new connections for each transports", async () => { - const managedConnections = Promise.withResolvers(); - manager.connections$.subscribe((connections) => { - if (connections.length > 0) managedConnections.resolve(connections); + test("Should create and start new connections for each transports", () => { + withTestScheduler(({ behavior, expectObservable }) => { + const { connections$ } = createConnectionManager$({ + scope: testScope, + connectionFactory: fakeConnectionFactory, + inputTransports$: behavior("a", { + a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0), + }), + }); + + expectObservable(connections$).toBe("a", { + a: expect.toSatisfy((e: Epoch) => { + const connections = e.value; + expect(connections.length).toBe(2); + + expect( + vi.mocked(fakeConnectionFactory).createConnection, + ).toHaveBeenCalledTimes(2); + + const conn1 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_1), + ); + expect(conn1).toBeDefined(); + expect(conn1!.start).toHaveBeenCalled(); + + const conn2 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_2), + ); + expect(conn2).toBeDefined(); + expect(conn2!.start).toHaveBeenCalled(); + return true; + }), + }); }); - - connectionManagerInputs.inputTransports$.next([TRANSPORT_1, TRANSPORT_2]); - - const connections = await managedConnections.promise; - - expect(connections.length).toBe(2); - - expect( - vi.mocked(fakeConnectionFactory).createConnection, - ).toHaveBeenCalledTimes(2); - - const conn1 = connections.find((c) => - areLivekitTransportsEqual(c.transport, TRANSPORT_1), - ); - expect(conn1).toBeDefined(); - expect(conn1!.start).toHaveBeenCalled(); - - const conn2 = connections.find((c) => - areLivekitTransportsEqual(c.transport, TRANSPORT_2), - ); - expect(conn2).toBeDefined(); - expect(conn2!.start).toHaveBeenCalled(); }); - test("Should start connection only once", async () => { - const observedConnections: Connection[][] = []; - manager.connections$.subscribe((connections) => { - observedConnections.push(connections); + test("Should start connection only once", () => { + withTestScheduler(({ behavior, expectObservable }) => { + const { connections$ } = createConnectionManager$({ + scope: testScope, + connectionFactory: fakeConnectionFactory, + inputTransports$: behavior("abcdef", { + a: new Epoch([TRANSPORT_1], 0), + b: new Epoch([TRANSPORT_1], 1), + c: new Epoch([TRANSPORT_1], 2), + d: new Epoch([TRANSPORT_1], 3), + e: new Epoch([TRANSPORT_1], 4), + f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5), + }), + }); + + expectObservable(connections$).toBe("xxxxxa", { + x: expect.anything(), + a: expect.toSatisfy((e: Epoch) => { + const connections = e.value; + + expect(connections.length).toBe(2); + expect( + vi.mocked(fakeConnectionFactory).createConnection, + ).toHaveBeenCalledTimes(2); + + const conn2 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_2), + ); + expect(conn2).toBeDefined(); + + const conn1 = connections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_1), + ); + expect(conn1).toBeDefined(); + expect(conn1!.start).toHaveBeenCalledOnce(); + + return true; + }), + }); }); - - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); - - await flushPromises(); - const connections = observedConnections.pop()!; - - expect(connections.length).toBe(2); - expect( - vi.mocked(fakeConnectionFactory).createConnection, - ).toHaveBeenCalledTimes(2); - - const conn2 = connections.find((c) => - areLivekitTransportsEqual(c.transport, TRANSPORT_2), - ); - expect(conn2).toBeDefined(); - - const conn1 = connections.find((c) => - areLivekitTransportsEqual(c.transport, TRANSPORT_1), - ); - expect(conn1).toBeDefined(); - expect(conn1!.start).toHaveBeenCalledOnce(); }); - test("Should cleanup connections when not needed anymore", async () => { - const observedConnections: Connection[][] = []; - manager.connections$.subscribe((connections) => { - observedConnections.push(connections); + test("Should cleanup connections when not needed anymore", () => { + withTestScheduler(({ behavior, expectObservable }) => { + const { connections$ } = createConnectionManager$({ + scope: testScope, + connectionFactory: fakeConnectionFactory, + inputTransports$: behavior("abc", { + a: new Epoch([TRANSPORT_1], 0), + b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1), + c: new Epoch([TRANSPORT_1], 2), + }), + }); + + expectObservable(connections$).toBe("xab", { + x: expect.anything(), + a: expect.toSatisfy((e: Epoch) => { + const connections = e.value; + expect(connections.length).toBe(2); + return true; + }), + b: expect.toSatisfy((e: Epoch) => { + const connections = e.value; + + expect(connections.length).toBe(1); + // The second connection should have been stopped has it is no longer needed. + const connection2 = allCreatedConnections.find((c) => + areLivekitTransportsEqual(c.transport, TRANSPORT_2), + ); + expect(connection2).toBeDefined(); + expect(connection2!.stop).toHaveBeenCalled(); + + // The first connection should still be active + const conn1 = connections[0]; + expect(conn1.stop).not.toHaveBeenCalledOnce(); + + return true; + }), + }); }); - - testTransportStream$.next([TRANSPORT_1]); - testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); - - await flushPromises(); - - const conn2 = observedConnections - .pop()! - .find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2))!; - - testTransportStream$.next([TRANSPORT_1]); - - await flushPromises(); - - // The second connection should have been stopped has it is no longer needed - expect(conn2.stop).toHaveBeenCalled(); - - // The first connection should still be active - const conn1 = observedConnections.pop()![0]; - expect(conn1.stop).not.toHaveBeenCalledOnce(); }); }); @@ -177,7 +193,7 @@ describe("connectionManagerData$ stream", () => { // Used in test to control fake connections' participantsWithTrack$ streams let fakePublishingParticipantsStreams: Map< string, - BehaviorSubject + Behavior >; function keyForTransport(transport: LivekitTransport): string { @@ -186,6 +202,16 @@ describe("connectionManagerData$ stream", () => { beforeEach(() => { fakePublishingParticipantsStreams = new Map(); + + function getPublishingParticipantsFor( + transport: LivekitTransport, + ): Behavior { + return ( + fakePublishingParticipantsStreams.get(keyForTransport(transport)) ?? + new BehaviorSubject([]) + ); + } + // need a more advanced fake connection factory vi.mocked(fakeConnectionFactory).createConnection = vi .fn() @@ -196,7 +222,7 @@ describe("connectionManagerData$ stream", () => { >([]); const mockConnection = { transport, - participantsWithTrack$: fakePublishingParticipants$, + participantsWithTrack$: getPublishingParticipantsFor(transport), } as unknown as Connection; vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).stop = vi.fn(); @@ -216,80 +242,83 @@ describe("connectionManagerData$ stream", () => { test("Should report connections with the publishing participants", () => { withTestScheduler(({ expectObservable, schedule, behavior }) => { - manager = createConnectionManager$({ - ...connectionManagerInputs, - inputTransports$: behavior("a", { - a: [TRANSPORT_1, TRANSPORT_2], - }), - }); - - const conn1Participants$ = fakePublishingParticipantsStreams.get( + // Setup the fake participants streams behavior + // ============================== + fakePublishingParticipantsStreams.set( keyForTransport(TRANSPORT_1), - )!; - - schedule("-a-b", { - a: () => { - conn1Participants$.next([ - { identity: "user1A" } as LivekitParticipant, - ]); - }, - b: () => { - conn1Participants$.next([ + behavior("oa-b", { + o: [], + a: [{ identity: "user1A" } as LivekitParticipant], + b: [ { identity: "user1A" } as LivekitParticipant, { identity: "user1B" } as LivekitParticipant, - ]); - }, - }); + ], + }), + ); - const conn2Participants$ = fakePublishingParticipantsStreams.get( + fakePublishingParticipantsStreams.set( keyForTransport(TRANSPORT_2), - )!; + behavior("o-a", { + o: [], + a: [{ identity: "user2A" } as LivekitParticipant], + }), + ); + // ============================== - schedule("--a", { - a: () => { - conn2Participants$.next([ - { identity: "user2A" } as LivekitParticipant, - ]); - }, + const { connectionManagerData$ } = createConnectionManager$({ + scope: testScope, + connectionFactory: fakeConnectionFactory, + inputTransports$: behavior("a", { + a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0), + }), }); - expectObservable(manager.connectionManagerData$).toBe("abcd", { - a: expect.toSatisfy((data) => { - return ( - data.getConnections().length == 2 && - data.getParticipantForTransport(TRANSPORT_1).length == 0 && - data.getParticipantForTransport(TRANSPORT_2).length == 0 - ); + expectObservable(connectionManagerData$).toBe("abcd", { + a: expect.toSatisfy((e) => { + const data: ConnectionManagerData = e.value; + expect(data.getConnections().length).toBe(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0); + return true; }), - b: expect.toSatisfy((data) => { - return ( - data.getConnections().length == 2 && - data.getParticipantForTransport(TRANSPORT_1).length == 1 && - data.getParticipantForTransport(TRANSPORT_2).length == 0 && - data.getParticipantForTransport(TRANSPORT_1)[0].identity == "user1A" + b: expect.toSatisfy((e) => { + const data: ConnectionManagerData = e.value; + expect(data.getConnections().length).toBe(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( + "user1A", ); + return true; }), - c: expect.toSatisfy((data) => { - return ( - data.getConnections().length == 2 && - data.getParticipantForTransport(TRANSPORT_1).length == 1 && - data.getParticipantForTransport(TRANSPORT_2).length == 1 && - data.getParticipantForTransport(TRANSPORT_1)[0].identity == - "user1A" && - data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A" + c: expect.toSatisfy((e) => { + const data: ConnectionManagerData = e.value; + expect(data.getConnections().length).toBe(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( + "user1A", ); + expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe( + "user2A", + ); + return true; }), - d: expect.toSatisfy((data) => { - return ( - data.getConnections().length == 2 && - data.getParticipantForTransport(TRANSPORT_1).length == 2 && - data.getParticipantForTransport(TRANSPORT_2).length == 1 && - data.getParticipantForTransport(TRANSPORT_1)[0].identity == - "user1A" && - data.getParticipantForTransport(TRANSPORT_1)[1].identity == - "user1B" && - data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A" + d: expect.toSatisfy((e) => { + const data: ConnectionManagerData = e.value; + expect(data.getConnections().length).toBe(2); + expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2); + expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1); + expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( + "user1A", ); + expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe( + "user1B", + ); + expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe( + "user2A", + ); + return true; }), }); }); diff --git a/src/state/ObservableScope.test.ts b/src/state/ObservableScope.test.ts index 19fea76c..4b0f3b4f 100644 --- a/src/state/ObservableScope.test.ts +++ b/src/state/ObservableScope.test.ts @@ -6,6 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { describe, expect, it } from "vitest"; +import { BehaviorSubject, timer } from "rxjs"; import { Epoch, @@ -14,7 +15,6 @@ import { trackEpoch, } from "./ObservableScope"; import { withTestScheduler } from "../utils/test"; -import { BehaviorSubject, timer } from "rxjs"; describe("Epoch", () => { it("should map the value correctly", () => {