diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 5f7303c6..61afb7b9 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -60,6 +60,7 @@ import { import { accumulate, filterBehavior, + generateItem, generateItems, pauseWhen, } from "../../utils/observable"; @@ -444,35 +445,38 @@ export function createCallViewModel$( memberId: uuidv4(), }; - const localTransport$ = createLocalTransport$({ - scope: scope, - memberships$: memberships$, - ownMembershipIdentity, - client, - delayId$: scope.behavior( - ( - fromEvent( - matrixRTCSession, - MembershipManagerEvent.DelayIdChanged, - // The type of reemitted event includes the original emitted as the second arg. - ) as Observable<[string | undefined, IMembershipManager]> - ).pipe(map(([delayId]) => delayId ?? null)), - matrixRTCSession.delayId ?? null, - ), - roomId: matrixRoom.roomId, - forceJwtEndpoint$: scope.behavior( - matrixRTCMode$.pipe( - map((v) => - v === MatrixRTCMode.Matrix_2_0 - ? JwtEndpointVersion.Matrix_2_0 - : JwtEndpointVersion.Legacy, - ), + const localTransport$ = scope.behavior( + matrixRTCMode$.pipe( + generateItem( + "CallViewModel localTransport$", + // Re-create LocalTransport whenever the mode changes + (mode) => ({ keys: [mode], data: undefined }), + (scope, _data$, mode) => + createLocalTransport$({ + scope: scope, + memberships$: memberships$, + ownMembershipIdentity, + client, + delayId$: scope.behavior( + ( + fromEvent( + matrixRTCSession, + MembershipManagerEvent.DelayIdChanged, + // The type of reemitted event includes the original emitted as the second arg. + ) as Observable<[string | undefined, IMembershipManager]> + ).pipe(map(([delayId]) => delayId ?? null)), + matrixRTCSession.delayId ?? null, + ), + roomId: matrixRoom.roomId, + forceJwtEndpoint: + mode === MatrixRTCMode.Matrix_2_0 + ? JwtEndpointVersion.Matrix_2_0 + : JwtEndpointVersion.Legacy, + useOldestMember: mode === MatrixRTCMode.Legacy, + }), ), ), - useOldestMember$: scope.behavior( - matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)), - ), - }); + ); const connectionFactory = new ECConnectionFactory( client, @@ -491,6 +495,7 @@ export function createCallViewModel$( connectionFactory: connectionFactory, localTransport$: scope.behavior( localTransport$.pipe( + switchMap((t) => t.active$), catchError((e: unknown) => { logger.info( "could not pass local transport to createConnectionManager$. localTransport$ threw an error", @@ -524,13 +529,13 @@ export function createCallViewModel$( ); const localMembership = createLocalMembership$({ - scope: scope, + scope, homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), - muteStates: muteStates, + muteStates, joinMatrixRTC: (transport: LivekitTransportConfig) => { return enterRTCSession( matrixRTCSession, @@ -550,9 +555,11 @@ export function createCallViewModel$( ), ); }, - connectionManager: connectionManager, - matrixRTCSession: matrixRTCSession, - localTransport$: localTransport$, + connectionManager, + matrixRTCSession, + localTransport$: scope.behavior( + localTransport$.pipe(switchMap((t) => t.advertised$)), + ), logger: logger.getChild(`[${Date.now()}]`), }); diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index b228cd08..e5e9f327 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -39,7 +39,6 @@ import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport"; import { initializeWidget } from "../../../widget"; initializeWidget(); @@ -216,11 +215,10 @@ describe("LocalMembership", () => { it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const localTransport$ = - scope.behavior( - hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - null, - ); + const localTransport$ = scope.behavior( + hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), + null, + ); // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { @@ -279,23 +277,11 @@ describe("LocalMembership", () => { }); const aTransport = { - transport: { - livekit_service_url: "a", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "a", + } as LivekitTransportConfig; const bTransport = { - transport: { - livekit_service_url: "b", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "b", + } as LivekitTransportConfig; const connectionTransportAConnected = { livekitRoom: mockLivekitRoom({ @@ -305,7 +291,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant(ConnectionState.LivekitConnected), - transport: aTransport.transport, + transport: aTransport, } as unknown as Connection; const connectionTransportAConnecting = { ...connectionTransportAConnected, @@ -314,7 +300,7 @@ describe("LocalMembership", () => { } as unknown as Connection; const connectionTransportBConnected = { state$: constant(ConnectionState.LivekitConnected), - transport: bTransport.transport, + transport: bTransport, livekitRoom: mockLivekitRoom({}), } as unknown as Connection; @@ -368,12 +354,8 @@ describe("LocalMembership", () => { // stop the first Publisher and let the second one life. expect(publishers[0].destroy).toHaveBeenCalled(); expect(publishers[1].destroy).not.toHaveBeenCalled(); - expect(publisherFactory.mock.calls[0][0].transport).toBe( - aTransport.transport, - ); - expect(publisherFactory.mock.calls[1][0].transport).toBe( - bTransport.transport, - ); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); scope.end(); await flushPromises(); // stop all tracks after ending scopes @@ -446,8 +428,9 @@ describe("LocalMembership", () => { const scope = new ObservableScope(); const connectionManagerData = new ConnectionManagerData(); - const localTransport$ = - new BehaviorSubject(null); + const localTransport$ = new BehaviorSubject( + null, + ); const connectionManagerData$ = new BehaviorSubject( new Epoch(connectionManagerData), ); @@ -519,7 +502,7 @@ describe("LocalMembership", () => { }); ( - connectionManagerData2.getConnectionForTransport(aTransport.transport)! + connectionManagerData2.getConnectionForTransport(aTransport)! .state$ as BehaviorSubject ).next(ConnectionState.LivekitConnected); expect(localMembership.localMemberState$.value).toStrictEqual({ diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 2f38ad82..eb641ca7 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,7 +62,6 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -128,7 +127,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -147,7 +146,7 @@ interface Props { * @param props.createPublisherFactory Factory to create a publisher once we have a connection. * @param props.joinMatrixRTC Callback to join the matrix RTC session once we have a transport. * @param props.homeserverConnected The homeserver connected state. - * @param props.localTransport$ The local transport to use for publishing. + * @param props.localTransport$ The transport to advertise in our membership. * @param props.logger The logger to use. * @param props.muteStates The mute states for video and audio. * @param props.matrixRTCSession The matrix RTC session to join. @@ -237,9 +236,7 @@ export const createLocalMembership$ = ({ return null; } - return connectionData.getConnectionForTransport( - localTransport.transport, - ); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -549,7 +546,7 @@ export const createLocalMembership$ = ({ if (!shouldConnect) return; try { - joinMatrixRTC(transport.transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index 2476923a..8454b09a 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -13,15 +13,24 @@ import { it, type MockedObject, vi, + type MockInstance, } from "vitest"; -import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { + type CallMembership, + type LivekitTransportConfig, +} from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, lastValueFrom } from "rxjs"; import fetchMock from "fetch-mock"; -import { mockConfig, flushPromises, ownMemberMock } from "../../../utils/test"; +import { + mockConfig, + flushPromises, + ownMemberMock, + mockRtcMembership, +} from "../../../utils/test"; import { createLocalTransport$, JwtEndpointVersion } from "./LocalTransport"; import { constant } from "../../Behavior"; -import { Epoch, ObservableScope } from "../../ObservableScope"; +import { Epoch, ObservableScope, trackEpoch } from "../../ObservableScope"; import { MatrixRTCTransportMissingError, FailToGetOpenIdToken, @@ -43,10 +52,10 @@ describe("LocalTransport", () => { afterEach(() => scope.end()); it("throws if config is missing", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -58,14 +67,15 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( new MatrixRTCTransportMissingError(""), ); + expect(() => active$.value).toThrow(new MatrixRTCTransportMissingError("")); }); it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => { @@ -83,10 +93,10 @@ describe("LocalTransport", () => { ); const observations: unknown[] = []; const errors: Error[] = []; - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!example_room_id", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { baseUrl: "https://lk.example.org", @@ -98,10 +108,10 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); - localTransport$.subscribe( + active$.subscribe( (o) => observations.push(o), (e) => errors.push(e), ); @@ -111,7 +121,8 @@ describe("LocalTransport", () => { const expectedError = new FailToGetOpenIdToken(new Error("no openid")); expect(observations).toStrictEqual([null]); expect(errors).toStrictEqual([expectedError]); - expect(() => localTransport$.value).toThrow(expectedError); + expect(() => advertised$.value).toThrow(expectedError); + expect(() => active$.value).toThrow(expectedError); }); it("emits preferred transport after OpenID resolves", async () => { @@ -126,10 +137,10 @@ describe("LocalTransport", () => { openIdResolver.promise, ); - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -140,7 +151,7 @@ describe("LocalTransport", () => { baseUrl: "https://lk.example.org", }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); @@ -150,14 +161,17 @@ describe("LocalTransport", () => { livekitAlias: "Akph4alDMhen", livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId, }); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "jwt", livekitAlias: "Akph4alDMhen", @@ -167,51 +181,122 @@ describe("LocalTransport", () => { }); }); - it("updates local transport when oldest member changes", async () => { - // Use config so transport discovery succeeds, but delay OpenID JWT fetch - mockConfig({ - livekit: { livekit_service_url: "https://lk.example.org" }, + describe("oldest member mode", () => { + const aliceTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://alice.example.org", + }; + const bobTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://bob.example.org", + }; + const aliceMembership = mockRtcMembership("@alice:example.org", "AAA", { + fociPreferred: [aliceTransport], }); - const memberships$ = new BehaviorSubject(new Epoch([])); - const openIdResolver = Promise.withResolvers(); - - vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( - openIdResolver.promise, - ); - - const localTransport$ = createLocalTransport$({ - scope, - roomId: "!example_room_id", - useOldestMember$: constant(true), - memberships$, - client: { - getDomain: () => "", - // eslint-disable-next-line @typescript-eslint/naming-convention - _unstable_getRTCTransports: async () => Promise.resolve([]), - getOpenIdToken: vi.fn(), - getDeviceId: vi.fn(), - baseUrl: "https://lk.example.org", - }, - ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), - delayId$: constant("delay_id_mock"), + const bobMembership = mockRtcMembership("@bob:example.org", "BBB", { + fociPreferred: [bobTransport], }); - openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); - await flushPromises(); - // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, - sfuConfig: { - jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", - livekitAlias: "Akph4alDMhen", - livekitIdentity: "@lk_user:ABCDEF", - url: "https://lk.example.org", - }, + let openIdSpy: MockInstance<(typeof openIDSFU)["getSFUConfigWithOpenID"]>; + beforeEach(() => { + openIdSpy = vi + .spyOn(openIDSFU, "getSFUConfigWithOpenID") + .mockResolvedValue(openIdResponse); + }); + + it("updates active transport when oldest member changes", async () => { + // Initially, Alice is the only member + const memberships$ = new BehaviorSubject([aliceMembership]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => Promise.resolve([]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Alice's transport should be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob joins the call, but Alice is still the oldest member + openIdSpy.mockClear(); + memberships$.next([aliceMembership, bobMembership]); + await flushPromises(); + // No new SFU config should've been fetched + expect(openIdSpy).not.toHaveBeenCalled(); + // Alice's transport should still be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob takes Alice's place as the oldest member + openIdSpy.mockClear(); + memberships$.next([bobMembership, aliceMembership]); + // Active transport should reset to null until we have Bob's SFU config + expect(active$.value).toStrictEqual(null); + await flushPromises(); + // Bob's SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Bob's transport should be active, but Alice's should remain advertised + // (since we don't want the change in oldest member to cause a wave of new + // state events) + expect(active$.value?.transport).toStrictEqual(bobTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + }); + + it("advertises preferred transport when no other member exists", async () => { + // Initially, there are no members + const memberships$ = new BehaviorSubject([]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => + Promise.resolve([aliceTransport]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // Our own preferred transport should be advertised + expect(advertised$.value).toStrictEqual(aliceTransport); + // No transport should be active however (there is still no oldest member) + expect(active$.value).toBe(null); + + // Now Bob joins the call and becomes the oldest member + memberships$.next([bobMembership]); + await flushPromises(); + // We should still advertise our own preferred transport (to avoid + // unnecessary state changes) + expect(advertised$.value).toStrictEqual(aliceTransport); + // Bob's transport should become active + expect(active$.value?.transport).toBe(bobTransport); }); }); @@ -229,8 +314,8 @@ describe("LocalTransport", () => { ownMembershipIdentity: ownMemberMock, scope, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -256,15 +341,19 @@ describe("LocalTransport", () => { mockConfig({ livekit: { livekit_service_url: "https://lk.example.org" }, }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -273,13 +362,15 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via user settings", async () => { customLivekitUrl.setValue("https://lk.example.org"); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ + expect(active$.value).toStrictEqual({ transport: { livekit_service_url: "https://lk.example.org", type: "livekit", @@ -292,19 +383,24 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via backend", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, ]); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -313,6 +409,7 @@ describe("LocalTransport", () => { }, }); }); + it("fails fast if the openID request fails for backend config", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, @@ -320,13 +417,11 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("supports getting transport via well-known", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -334,15 +429,19 @@ describe("LocalTransport", () => { { type: "livekit", livekit_service_url: "https://lk.example.org" }, ], }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -352,6 +451,7 @@ describe("LocalTransport", () => { }); expect(fetchMock.done()).toEqual(true); }); + it("fails fast if the openId request fails for the well-known config", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -362,20 +462,18 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("throws if no options are available", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, ownMembershipIdentity: ownMemberMock, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -390,7 +488,10 @@ describe("LocalTransport", () => { }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( + new MatrixRTCTransportMissingError(""), + ); + expect(() => active$.value).toThrow( new MatrixRTCTransportMissingError(""), ); }); diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 73364094..0b566ba0 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -13,12 +13,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { - combineLatest, distinctUntilChanged, + first, from, map, + merge, of, + startWith, switchMap, + tap, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -58,8 +61,8 @@ interface Props { OpenIDClientParts; // Used by the jwt service to create the livekit room and compute the livekit alias. roomId: string; - useOldestMember$: Behavior; - forceJwtEndpoint$: Behavior; + useOldestMember: boolean; + forceJwtEndpoint: JwtEndpointVersion; delayId$: Behavior; } @@ -93,23 +96,35 @@ export interface LocalTransportWithSFUConfig { transport: LivekitTransportConfig; sfuConfig: SFUConfig; } + export function isLocalTransportWithSFUConfig( obj: LivekitTransportConfig | LocalTransportWithSFUConfig, ): obj is LocalTransportWithSFUConfig { return "transport" in obj && "sfuConfig" in obj; } +interface LocalTransport { + /** + * The transport to be advertised in our MatrixRTC membership. `null` when not + * yet fetched/validated. + */ + advertised$: Behavior; + /** + * The transport to connect to and publish media on. `null` when not yet known + * or available. + */ + active$: Behavior; +} + /** - * This class is responsible for managing the local transport. - * "Which transport is the local member going to use" + * Connects to the JWT service and determines the transports that the local member should use. * * @prop useOldestMember Whether to use the same transport as the oldest member. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. - * - * @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. + * @prop useOldJwtEndpoint Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. * This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity. * (which is expected for non sticky event based rtc member events) - * @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value. + * @returns The transport to advertise in the local MatrixRTC membership, along with the transport to actively publish media to. * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ export const createLocalTransport$ = ({ @@ -118,114 +133,156 @@ export const createLocalTransport$ = ({ ownMembershipIdentity, client, roomId, - useOldestMember$, - forceJwtEndpoint$, + useOldestMember, + forceJwtEndpoint, delayId$, -}: Props): Behavior => { +}: Props): LocalTransport => { /** - * The transport over which we should be actively publishing our media. - * undefined when not joined. + * The LiveKit transport in use by the oldest RTC membership. `null` when the + * oldest member has no such transport. */ - const oldestMemberTransport$ = - scope.behavior( - combineLatest([memberships$, useOldestMember$]).pipe( - map(([memberships, useOldestMember]) => { - if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member - const oldestMember = memberships.value[0]; - const transport = oldestMember?.getTransport(oldestMember); - if (!transport) return null; - return transport; - }), - switchMap((transport) => { - if (transport !== null && isLivekitTransportConfig(transport)) { - // Get the open jwt token to connect to the sfu - const computeLocalTransportWithSFUConfig = - async (): Promise => { - return { - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }; - }; - return from(computeLocalTransportWithSFUConfig()); - } - return of(null); - }), - ), - null, - ); + const oldestMemberTransport$ = scope.behavior( + memberships$.pipe( + map((memberships) => { + const oldestMember = memberships.value[0]; + if (oldestMember === undefined) { + logger.info("Oldest member: not found"); + return null; + } + const transport = oldestMember.getTransport(oldestMember); + if (transport === undefined) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`, + ); + return null; + } + if (!isLivekitTransportConfig(transport)) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`, + ); + return null; + } + logger.info( + "Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport", + ); + return transport; + }), + distinctUntilChanged(areLivekitTransportsEqual), + ), + ); /** * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). + * transport preferences of others, perhaps). `null` until fetched and + * validated. * * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ - const preferredTransport$ = scope.behavior( - // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new - // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity - // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) - // When using sticky events (we need to use the new endpoint). - combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe( - switchMap(([customUrl, delayId, forceEndpoint]) => { - logger.info( - "Creating preferred transport based on: ", - "customUrl: ", - customUrl, - "delayId: ", - delayId, - "forceEndpoint: ", - forceEndpoint, - ); - return from( - makeTransport( - client, - ownMembershipIdentity, - roomId, - customUrl, - forceEndpoint, - delayId ?? undefined, + const preferredTransport$ = + scope.behavior( + // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new + // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity + // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) + // When using sticky events (we need to use the new endpoint). + customLivekitUrl.value$.pipe( + switchMap((customUrl) => + startWith(null)( + // Fetch the SFU config, and repeat this asynchronously for every + // change in delay ID. + delayId$.pipe( + switchMap(async (delayId) => { + logger.info( + "Creating preferred transport based on: ", + "customUrl: ", + customUrl, + "delayId: ", + delayId, + "forceJwtEndpoint: ", + forceJwtEndpoint, + ); + return makeTransport( + client, + ownMembershipIdentity, + roomId, + customUrl, + forceJwtEndpoint, + delayId ?? undefined, + ); + }), + // We deliberately hide any changes to the SFU config because we + // do not actually want the app to reconnect whenever the JWT + // token changes due to us delegating a new delayed event. The + // initial SFU config for the transport is all the app needs. + distinctUntilChanged((prev, next) => + areLivekitTransportsEqual(prev.transport, next.transport), + ), + ), ), - ); - }), - ), - null, - ); + ), + ), + ); - /** - * The chosen transport we should advertise in our MatrixRTC membership. - */ - return scope.behavior( - combineLatest([ - useOldestMember$, - oldestMemberTransport$, - preferredTransport$, - ]).pipe( - map(([useOldestMember, oldestMemberTransport, preferredTransport]) => { - return useOldestMember - ? (oldestMemberTransport ?? preferredTransport) - : preferredTransport; - }), - distinctUntilChanged((t1, t2) => { - logger.info( - "Local Transport Update from:", - t1?.transport.livekit_service_url, - " to ", - t2?.transport.livekit_service_url, - ); - return areLivekitTransportsEqual( - t1?.transport ?? null, - t2?.transport ?? null, - ); - }), + if (useOldestMember) { + // --- Oldest member mode --- + return { + // Never update the transport that we advertise in our membership. Just + // take the first valid oldest member or preferred transport that we learn + // about, and stick with that. This avoids unnecessary SFU hops and room + // state changes. + advertised$: scope.behavior( + merge( + oldestMemberTransport$, + preferredTransport$.pipe(map((t) => t?.transport ?? null)), + ).pipe( + first((t) => t !== null), + tap((t) => + logger.info(`Advertise transport: ${t.livekit_service_url}`), + ), + ), + null, + ), + // Publish on the transport used by the oldest member. + active$: scope.behavior( + oldestMemberTransport$.pipe( + switchMap((transport) => { + // Oldest member not available (or invalid SFU config). + if (transport === null) return of(null); + // Oldest member available: fetch the SFU config. + const fetchOldestMemberTransport = + async (): Promise => ({ + transport, + sfuConfig: await getSFUConfigWithOpenID( + client, + ownMembershipIdentity, + transport.livekit_service_url, + roomId, + { forceJwtEndpoint: JwtEndpointVersion.Legacy }, + logger, + ), + }); + return from(fetchOldestMemberTransport()).pipe(startWith(null)); + }), + tap((t) => + logger.info( + `Publish on transport: ${t?.transport.livekit_service_url}`, + ), + ), + ), + ), + }; + } + + // --- Multi-SFU mode --- + // Always publish on and advertise the preferred transport. + return { + advertised$: scope.behavior( + preferredTransport$.pipe( + map((t) => t?.transport ?? null), + distinctUntilChanged(areLivekitTransportsEqual), + ), ), - ); + active$: preferredTransport$, + }; }; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index e4376c12..727f68bc 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -90,7 +90,7 @@ export interface IConnectionManager { * @param props - Configuration object * @param props.scope - The observable scope used by this object * @param props.connectionFactory - Used to create new connections - * @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$) + * @param props.localTransport$ - The transport to publish local media on. (deduplicated with remoteTransports$) * @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$) * @param props.ownMembershipIdentity - The own membership identity to use. * @param props.logger - The logger to use. @@ -164,21 +164,21 @@ export function createConnectionManager$({ generateItemsWithEpoch( "ConnectionManager connections$", function* (transports) { - for (const transportWithOrWithoutSfuConfig of transports) { - if ( - isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig) - ) { - // This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field - const { transport, sfuConfig } = transportWithOrWithoutSfuConfig; + for (const transport of transports) { + if (isLocalTransportWithSFUConfig(transport)) { + // This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field. yield { - keys: [transport.livekit_service_url, sfuConfig], + keys: [ + transport.transport.livekit_service_url, + transport.sfuConfig, + ], data: undefined, }; } else { yield { keys: [ - transportWithOrWithoutSfuConfig.livekit_service_url, - undefined as undefined | SFUConfig, + transport.livekit_service_url, + undefined as SFUConfig | undefined, ], data: undefined, }; @@ -194,6 +194,8 @@ export function createConnectionManager$({ }, ownMembershipIdentity, logger, + // TODO: This whole optional SFUConfig parameter is not particularly elegant. + // I would like it if connections always fetched the SFUConfig by themselves. sfuConfig, ); // Start the connection immediately diff --git a/src/utils/observable.ts b/src/utils/observable.ts index d4182021..2e19748b 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -213,6 +213,38 @@ export function filterBehavior( ); } +/** + * Maps a changing input value to an item whose lifetime is tied to a certain + * computed key. The item may capture some dynamic data from the input. + */ +export function generateItem< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, +>( + name: string, + generator: (input: Input) => { keys: readonly [...Keys]; data: Data }, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, +): OperatorFunction { + return (input$) => + input$.pipe( + generateItemsInternal( + name, + function* (input) { + yield generator(input); + }, + factory, + (items) => items, + ), + map(([item]) => item), + ); +} + function generateItemsInternal< Input, Keys extends [unknown, ...unknown[]],