diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts index 60b52d69..ccf93a30 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts @@ -12,19 +12,23 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils"; +import { combineLatest, map, type Observable } from "rxjs"; import { type IConnectionManager } from "./ConnectionManager.ts"; import { type MatrixLivekitMember, createMatrixLivekitMembers$, - areLivekitTransportsEqual, } from "./MatrixLivekitMembers.ts"; -import { ObservableScope } from "../../ObservableScope.ts"; +import { + Epoch, + mapEpoch, + ObservableScope, + trackEpoch, +} from "../../ObservableScope.ts"; import { ConnectionManagerData } from "./ConnectionManager.ts"; import { mockCallMembership, mockRemoteParticipant, - type OurRunHelpers, withTestScheduler, } from "../../../utils/test.ts"; import { type Connection } from "./Connection.ts"; @@ -32,7 +36,28 @@ import { type Connection } from "./Connection.ts"; let testScope: ObservableScope; let mockMatrixRoom: MatrixRoom; -// The merger beeing tested +const transportA: LivekitTransport = { + type: "livekit", + livekit_service_url: "https://lk.example.org", + livekit_alias: "!alias:example.org", +}; + +const transportB: LivekitTransport = { + type: "livekit", + livekit_service_url: "https://lk.sample.com", + livekit_alias: "!alias:sample.com", +}; + +const bobMembership = mockCallMembership( + "@bob:example.org", + "DEV000", + transportA, +); +const carlMembership = mockCallMembership( + "@carl:sample.com", + "DEV111", + transportB, +); beforeEach(() => { testScope = new ObservableScope(); @@ -53,113 +78,138 @@ afterEach(() => { testScope.end(); }); +function epochMeWith$( + source$: Observable>, + me$: Observable, +): Observable> { + return combineLatest([source$, me$]).pipe( + map(([ep, cd]) => { + return new Epoch(cd, ep.epoch); + }), + ); +} + test("should signal participant not yet connected to livekit", () => { withTestScheduler(({ behavior, expectObservable }) => { - const bobMembership = { - userId: "@bob:example.org", - deviceId: "DEV000", - transports: [ - { - type: "livekit", - livekit_service_url: "https://lk.example.org", - livekit_alias: "!alias:example.org", - }, - ], - } as unknown as CallMembership; + const { memberships$, membershipsWithTransport$ } = fromMemberships$( + behavior("a", { + a: [bobMembership], + }), + ); + + const connectionManagerData$ = epochMeWith$( + memberships$, + behavior("a", { + a: new ConnectionManagerData(), + }), + ); const matrixLivekitMember$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: behavior("a", { - a: [ - { - membership: bobMembership, - }, - ], - }), + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { - connectionManagerData$: behavior("a", { - a: new ConnectionManagerData(), - }), - transports$: behavior("a", { a: [] }), - connections$: behavior("a", { a: [] }), - }, + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, matrixRoom: mockMatrixRoom, }); - expectObservable(matrixLivekitMember$).toBe("a", { + expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - return ( - data.length == 1 && - data[0].membership === bobMembership && - data[0].participant === undefined && - data[0].connection === undefined - ); + expect(data.length).toEqual(1); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: null, + }); + expectObservable(data[0].connection$).toBe("a", { + a: null, + }); + return true; }), }); }); }); -function aConnectionManager( - data: ConnectionManagerData, - behavior: OurRunHelpers["behavior"], -): IConnectionManager { - return { - connectionManagerData$: behavior("a", { a: data }), - transports$: behavior("a", { - a: data.getConnections().map((connection) => connection.transport), +// Helper to create epoch'ed memberships$ and membershipsWithTransport$ from memberships observable. +function fromMemberships$(m$: Observable): { + memberships$: Observable>; + membershipsWithTransport$: Observable< + Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> + >; +} { + const memberships$ = m$.pipe(trackEpoch()); + const membershipsWithTransport$ = memberships$.pipe( + mapEpoch((members) => { + return members.map((m) => { + const tr = m.getTransport(m); + return { + membership: m, + transport: + tr?.type === "livekit" ? (tr as LivekitTransport) : undefined, + }; + }); }), - connections$: behavior("a", { a: data.getConnections() }), + ); + return { + memberships$, + membershipsWithTransport$, }; } test("should signal participant on a connection that is publishing", () => { withTestScheduler(({ behavior, expectObservable }) => { - const transport: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.example.org", - livekit_alias: "!alias:example.org", - }; - - const bobMembership = mockCallMembership( - "@bob:example.org", - "DEV000", - transport, - ); - - const connectionWithPublisher = new ConnectionManagerData(); const bobParticipantId = getParticipantId( bobMembership.userId, bobMembership.deviceId, ); + + const { memberships$, membershipsWithTransport$ } = fromMemberships$( + behavior("a", { + a: [bobMembership], + }), + ); + const connection = { - transport: transport, + transport: bobMembership.getTransport(bobMembership), } as unknown as Connection; - connectionWithPublisher.add(connection, [ + const dataWithPublisher = new ConnectionManagerData(); + dataWithPublisher.add(connection, [ mockRemoteParticipant({ identity: bobParticipantId }), ]); + + const connectionManagerData$ = epochMeWith$( + memberships$, + behavior("a", { + a: dataWithPublisher, + }), + ); + const matrixLivekitMember$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: behavior("a", { - a: [ - { - membership: bobMembership, - transport, - }, - ], - }), - connectionManager: aConnectionManager(connectionWithPublisher, behavior), + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, matrixRoom: mockMatrixRoom, }); - expectObservable(matrixLivekitMember$).toBe("a", { + expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { a: expect.toSatisfy((data: MatrixLivekitMember[]) => { expect(data.length).toEqual(1); - expect(data[0].participant).toBeDefined(); - expect(data[0].connection).toBeDefined(); - expect(data[0].membership).toEqual(bobMembership); - expect( - areLivekitTransportsEqual(data[0].connection!.transport, transport), - ).toBe(true); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: expect.toSatisfy((participant) => { + expect(participant).toBeDefined(); + expect(participant!.identity).toEqual(bobParticipantId); + return true; + }), + }); + expectObservable(data[0].connection$).toBe("a", { + a: connection, + }); return true; }), }); @@ -168,47 +218,46 @@ test("should signal participant on a connection that is publishing", () => { test("should signal participant on a connection that is not publishing", () => { withTestScheduler(({ behavior, expectObservable }) => { - const transport: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.example.org", - livekit_alias: "!alias:example.org", - }; - - const bobMembership = mockCallMembership( - "@bob:example.org", - "DEV000", - transport, + const { memberships$, membershipsWithTransport$ } = fromMemberships$( + behavior("a", { + a: [bobMembership], + }), ); - const connectionWithPublisher = new ConnectionManagerData(); - // const bobParticipantId = getParticipantId(bobMembership.userId, bobMembership.deviceId); const connection = { - transport: transport, + transport: bobMembership.getTransport(bobMembership), } as unknown as Connection; - connectionWithPublisher.add(connection, []); + const dataWithPublisher = new ConnectionManagerData(); + dataWithPublisher.add(connection, []); + + const connectionManagerData$ = epochMeWith$( + memberships$, + behavior("a", { + a: dataWithPublisher, + }), + ); + const matrixLivekitMember$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: behavior("a", { - a: [ - { - membership: bobMembership, - transport, - }, - ], - }), - connectionManager: aConnectionManager(connectionWithPublisher, behavior), + membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, matrixRoom: mockMatrixRoom, }); - expectObservable(matrixLivekitMember$).toBe("a", { + expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { a: expect.toSatisfy((data: MatrixLivekitMember[]) => { expect(data.length).toEqual(1); - expect(data[0].participant).not.toBeDefined(); - expect(data[0].connection).toBeDefined(); - expect(data[0].membership).toEqual(bobMembership); - expect( - areLivekitTransportsEqual(data[0].connection!.transport, transport), - ).toBe(true); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: null, + }); + expectObservable(data[0].connection$).toBe("a", { + a: connection, + }); return true; }), }); @@ -218,22 +267,10 @@ test("should signal participant on a connection that is not publishing", () => { describe("Publication edge case", () => { test("bob is publishing in several connections", () => { withTestScheduler(({ behavior, expectObservable }) => { - const transportA: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.example.org", - livekit_alias: "!alias:example.org", - }; - - const transportB: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.sample.com", - livekit_alias: "!alias:sample.com", - }; - - const bobMembership = mockCallMembership( - "@bob:example.org", - "DEV000", - transportA, + const { memberships$, membershipsWithTransport$ } = fromMemberships$( + behavior("a", { + a: [bobMembership, carlMembership], + }), ); const connectionWithPublisher = new ConnectionManagerData(); @@ -254,60 +291,57 @@ describe("Publication edge case", () => { connectionWithPublisher.add(connectionB, [ mockRemoteParticipant({ identity: bobParticipantId }), ]); + + const connectionManagerData$ = epochMeWith$( + memberships$, + behavior("a", { + a: connectionWithPublisher, + }), + ); + const matrixLivekitMember$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: behavior("a", { - a: [ - { - membership: bobMembership, - transport: transportA, - }, - ], - }), - connectionManager: aConnectionManager( - connectionWithPublisher, - behavior, + membershipsWithTransport$: testScope.behavior( + membershipsWithTransport$, ), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, matrixRoom: mockMatrixRoom, }); - expectObservable(matrixLivekitMember$).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expect(data[0].participant).toBeDefined(); - expect(data[0].participant!.identity).toEqual(bobParticipantId); - expect(data[0].connection).toBeDefined(); - expect(data[0].membership).toEqual(bobMembership); - expect( - areLivekitTransportsEqual( - data[0].connection!.transport, - transportA, - ), - ).toBe(true); - return true; - }), - }); + expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( + "a", + { + a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(2); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].connection$).toBe("a", { + // The real connection should be from transportA as per the membership + a: connectionA, + }); + expectObservable(data[0].participant$).toBe("a", { + a: expect.toSatisfy((participant) => { + expect(participant).toBeDefined(); + expect(participant!.identity).toEqual(bobParticipantId); + return true; + }), + }); + return true; + }), + }, + ); }); }); test("bob is publishing in the wrong connection", () => { withTestScheduler(({ behavior, expectObservable }) => { - const transportA: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.example.org", - livekit_alias: "!alias:example.org", - }; - - const transportB: LivekitTransport = { - type: "livekit", - livekit_service_url: "https://lk.sample.com", - livekit_alias: "!alias:sample.com", - }; - - const bobMembership = mockCallMembership( - "@bob:example.org", - "DEV000", - transportA, + const { memberships$, membershipsWithTransport$ } = fromMemberships$( + behavior("a", { + a: [bobMembership, carlMembership], + }), ); const connectionWithPublisher = new ConnectionManagerData(); @@ -315,86 +349,54 @@ describe("Publication edge case", () => { bobMembership.userId, bobMembership.deviceId, ); - const connectionA = { - transport: transportA, - } as unknown as Connection; - const connectionB = { - transport: transportB, - } as unknown as Connection; + const connectionA = { transport: transportA } as unknown as Connection; + const connectionB = { transport: transportB } as unknown as Connection; + // Bob is not publishing on A connectionWithPublisher.add(connectionA, []); + // Bob is publishing on B but his membership says A connectionWithPublisher.add(connectionB, [ mockRemoteParticipant({ identity: bobParticipantId }), ]); + + const connectionManagerData$ = epochMeWith$( + memberships$, + behavior("a", { + a: connectionWithPublisher, + }), + ); + const matrixLivekitMember$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: behavior("a", { - a: [ - { - membership: bobMembership, - transport: transportA, - }, - ], - }), - connectionManager: aConnectionManager( - connectionWithPublisher, - behavior, + membershipsWithTransport$: testScope.behavior( + membershipsWithTransport$, ), + connectionManager: { + connectionManagerData$: connectionManagerData$, + } as unknown as IConnectionManager, matrixRoom: mockMatrixRoom, }); - expectObservable(matrixLivekitMember$).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expect(data[0].participant).not.toBeDefined(); - expect(data[0].connection).toBeDefined(); - expect(data[0].membership).toEqual(bobMembership); - expect( - areLivekitTransportsEqual( - data[0].connection!.transport, - transportA, - ), - ).toBe(true); - return true; - }), - }); + expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( + "a", + { + a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(2); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].connection$).toBe("a", { + // The real connection should be from transportA as per the membership + a: connectionA, + }); + expectObservable(data[0].participant$).toBe("a", { + // No participant as Bob is not publishing on his membership transport + a: null, + }); + return true; + }), + }, + ); }); - - // let lastMatrixLkItems: MatrixLivekitMember[] = []; - // matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => { - // lastMatrixLkItems = items; - // }); - - // vi.mocked(bobMembership).getTransport = vi - // .fn() - // .mockReturnValue(connectionA.transport); - - // fakeMemberships$.next([bobMembership]); - - // const lkMap = new ConnectionManagerData(); - // lkMap.add(connectionA, []); - // lkMap.add(connectionB, [ - // mockRemoteParticipant({ identity: bobParticipantId }) - // ]); - - // fakeManagerData$.next(lkMap); - - // const items = lastMatrixLkItems; - // expect(items).toHaveLength(1); - // const item = items[0]; - - // // Assert the expected membership - // expect(item.membership.userId).toEqual(bobMembership.userId); - // expect(item.membership.deviceId).toEqual(bobMembership.deviceId); - - // expect(item.participant).not.toBeDefined(); - - // // The transport info should come from the membership transports and not only from the publishing connection - // expect(item.connection?.transport?.livekit_service_url).toEqual( - // bobMembership.transports[0]?.livekit_service_url - // ); - // expect(item.connection?.transport?.livekit_alias).toEqual( - // bobMembership.transports[0]?.livekit_alias - // ); }); });