From 13d131c2e939fee8e6b6eb0317eb8078eb23c022 Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 9 Feb 2026 19:07:27 +0100 Subject: [PATCH 1/4] Logically separate the advertised transport from the active transport To correctly implement the legacy "oldest membership" mode, we need the code to be more nuanced about the local transport. Specifically, it needs to allow for the transport we advertise in our membership to be different from the transport that we connect to and publish media on. Otherwise, if these two are yoked together, members will resend their memberships whenever an SFU hop occurs, which an attacker could use to cause an amplified wave of state changes. --- src/state/CallViewModel/CallViewModel.ts | 66 ++--- .../CallViewModel/localMember/LocalMember.ts | 11 +- .../localMember/LocalTransport.ts | 265 +++++++++++------- .../remoteMembers/ConnectionManager.ts | 22 +- 4 files changed, 211 insertions(+), 153 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 82bcaedb..e2c6e46e 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -444,35 +444,34 @@ export function createCallViewModel$( memberId: uuidv4(), }; - const localTransport$ = createLocalTransport$({ - scope: scope, - memberships$: memberships$, - ownMembershipIdentity, - client, - delayId$: scope.behavior( - ( - fromEvent( - matrixRTCSession, - MembershipManagerEvent.DelayIdChanged, - // The type of reemitted event includes the original emitted as the second arg. - ) as Observable<[string | undefined, IMembershipManager]> - ).pipe(map(([delayId]) => delayId ?? null)), - matrixRTCSession.delayId ?? null, - ), - roomId: matrixRoom.roomId, - forceJwtEndpoint$: scope.behavior( - matrixRTCMode$.pipe( - map((v) => - v === MatrixRTCMode.Matrix_2_0 - ? JwtEndpointVersion.Matrix_2_0 - : JwtEndpointVersion.Legacy, - ), + const localTransport$ = scope.behavior( + matrixRTCMode$.pipe( + map((mode) => + createLocalTransport$({ + scope: scope, + memberships$: memberships$, + ownMembershipIdentity, + client, + delayId$: scope.behavior( + ( + fromEvent( + matrixRTCSession, + MembershipManagerEvent.DelayIdChanged, + // The type of reemitted event includes the original emitted as the second arg. + ) as Observable<[string | undefined, IMembershipManager]> + ).pipe(map(([delayId]) => delayId ?? null)), + matrixRTCSession.delayId ?? null, + ), + roomId: matrixRoom.roomId, + forceJwtEndpoint: + mode === MatrixRTCMode.Matrix_2_0 + ? JwtEndpointVersion.Matrix_2_0 + : JwtEndpointVersion.Legacy, + useOldestMember: mode === MatrixRTCMode.Legacy, + }), ), ), - useOldestMember$: scope.behavior( - matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)), - ), - }); + ); const connectionFactory = new ECConnectionFactory( client, @@ -491,6 +490,7 @@ export function createCallViewModel$( connectionFactory: connectionFactory, localTransport$: scope.behavior( localTransport$.pipe( + switchMap((t) => t.active$), catchError((e: unknown) => { logger.info( "could not pass local transport to createConnectionManager$. localTransport$ threw an error", @@ -524,13 +524,13 @@ export function createCallViewModel$( ); const localMembership = createLocalMembership$({ - scope: scope, + scope, homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), - muteStates: muteStates, + muteStates, joinMatrixRTC: (transport: LivekitTransportConfig) => { return enterRTCSession( matrixRTCSession, @@ -550,9 +550,11 @@ export function createCallViewModel$( ), ); }, - connectionManager: connectionManager, - matrixRTCSession: matrixRTCSession, - localTransport$: localTransport$, + connectionManager, + matrixRTCSession, + localTransport$: scope.behavior( + localTransport$.pipe(switchMap((t) => t.advertised$)), + ), logger: logger.getChild(`[${Date.now()}]`), }); diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 2f38ad82..eb641ca7 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,7 +62,6 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -128,7 +127,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -147,7 +146,7 @@ interface Props { * @param props.createPublisherFactory Factory to create a publisher once we have a connection. * @param props.joinMatrixRTC Callback to join the matrix RTC session once we have a transport. * @param props.homeserverConnected The homeserver connected state. - * @param props.localTransport$ The local transport to use for publishing. + * @param props.localTransport$ The transport to advertise in our membership. * @param props.logger The logger to use. * @param props.muteStates The mute states for video and audio. * @param props.matrixRTCSession The matrix RTC session to join. @@ -237,9 +236,7 @@ export const createLocalMembership$ = ({ return null; } - return connectionData.getConnectionForTransport( - localTransport.transport, - ); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -549,7 +546,7 @@ export const createLocalMembership$ = ({ if (!shouldConnect) return; try { - joinMatrixRTC(transport.transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 73364094..0b566ba0 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -13,12 +13,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { - combineLatest, distinctUntilChanged, + first, from, map, + merge, of, + startWith, switchMap, + tap, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -58,8 +61,8 @@ interface Props { OpenIDClientParts; // Used by the jwt service to create the livekit room and compute the livekit alias. roomId: string; - useOldestMember$: Behavior; - forceJwtEndpoint$: Behavior; + useOldestMember: boolean; + forceJwtEndpoint: JwtEndpointVersion; delayId$: Behavior; } @@ -93,23 +96,35 @@ export interface LocalTransportWithSFUConfig { transport: LivekitTransportConfig; sfuConfig: SFUConfig; } + export function isLocalTransportWithSFUConfig( obj: LivekitTransportConfig | LocalTransportWithSFUConfig, ): obj is LocalTransportWithSFUConfig { return "transport" in obj && "sfuConfig" in obj; } +interface LocalTransport { + /** + * The transport to be advertised in our MatrixRTC membership. `null` when not + * yet fetched/validated. + */ + advertised$: Behavior; + /** + * The transport to connect to and publish media on. `null` when not yet known + * or available. + */ + active$: Behavior; +} + /** - * This class is responsible for managing the local transport. - * "Which transport is the local member going to use" + * Connects to the JWT service and determines the transports that the local member should use. * * @prop useOldestMember Whether to use the same transport as the oldest member. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. - * - * @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. + * @prop useOldJwtEndpoint Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. * This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity. * (which is expected for non sticky event based rtc member events) - * @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value. + * @returns The transport to advertise in the local MatrixRTC membership, along with the transport to actively publish media to. * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ export const createLocalTransport$ = ({ @@ -118,114 +133,156 @@ export const createLocalTransport$ = ({ ownMembershipIdentity, client, roomId, - useOldestMember$, - forceJwtEndpoint$, + useOldestMember, + forceJwtEndpoint, delayId$, -}: Props): Behavior => { +}: Props): LocalTransport => { /** - * The transport over which we should be actively publishing our media. - * undefined when not joined. + * The LiveKit transport in use by the oldest RTC membership. `null` when the + * oldest member has no such transport. */ - const oldestMemberTransport$ = - scope.behavior( - combineLatest([memberships$, useOldestMember$]).pipe( - map(([memberships, useOldestMember]) => { - if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member - const oldestMember = memberships.value[0]; - const transport = oldestMember?.getTransport(oldestMember); - if (!transport) return null; - return transport; - }), - switchMap((transport) => { - if (transport !== null && isLivekitTransportConfig(transport)) { - // Get the open jwt token to connect to the sfu - const computeLocalTransportWithSFUConfig = - async (): Promise => { - return { - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }; - }; - return from(computeLocalTransportWithSFUConfig()); - } - return of(null); - }), - ), - null, - ); + const oldestMemberTransport$ = scope.behavior( + memberships$.pipe( + map((memberships) => { + const oldestMember = memberships.value[0]; + if (oldestMember === undefined) { + logger.info("Oldest member: not found"); + return null; + } + const transport = oldestMember.getTransport(oldestMember); + if (transport === undefined) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`, + ); + return null; + } + if (!isLivekitTransportConfig(transport)) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`, + ); + return null; + } + logger.info( + "Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport", + ); + return transport; + }), + distinctUntilChanged(areLivekitTransportsEqual), + ), + ); /** * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). + * transport preferences of others, perhaps). `null` until fetched and + * validated. * * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ - const preferredTransport$ = scope.behavior( - // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new - // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity - // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) - // When using sticky events (we need to use the new endpoint). - combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe( - switchMap(([customUrl, delayId, forceEndpoint]) => { - logger.info( - "Creating preferred transport based on: ", - "customUrl: ", - customUrl, - "delayId: ", - delayId, - "forceEndpoint: ", - forceEndpoint, - ); - return from( - makeTransport( - client, - ownMembershipIdentity, - roomId, - customUrl, - forceEndpoint, - delayId ?? undefined, + const preferredTransport$ = + scope.behavior( + // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new + // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity + // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) + // When using sticky events (we need to use the new endpoint). + customLivekitUrl.value$.pipe( + switchMap((customUrl) => + startWith(null)( + // Fetch the SFU config, and repeat this asynchronously for every + // change in delay ID. + delayId$.pipe( + switchMap(async (delayId) => { + logger.info( + "Creating preferred transport based on: ", + "customUrl: ", + customUrl, + "delayId: ", + delayId, + "forceJwtEndpoint: ", + forceJwtEndpoint, + ); + return makeTransport( + client, + ownMembershipIdentity, + roomId, + customUrl, + forceJwtEndpoint, + delayId ?? undefined, + ); + }), + // We deliberately hide any changes to the SFU config because we + // do not actually want the app to reconnect whenever the JWT + // token changes due to us delegating a new delayed event. The + // initial SFU config for the transport is all the app needs. + distinctUntilChanged((prev, next) => + areLivekitTransportsEqual(prev.transport, next.transport), + ), + ), ), - ); - }), - ), - null, - ); + ), + ), + ); - /** - * The chosen transport we should advertise in our MatrixRTC membership. - */ - return scope.behavior( - combineLatest([ - useOldestMember$, - oldestMemberTransport$, - preferredTransport$, - ]).pipe( - map(([useOldestMember, oldestMemberTransport, preferredTransport]) => { - return useOldestMember - ? (oldestMemberTransport ?? preferredTransport) - : preferredTransport; - }), - distinctUntilChanged((t1, t2) => { - logger.info( - "Local Transport Update from:", - t1?.transport.livekit_service_url, - " to ", - t2?.transport.livekit_service_url, - ); - return areLivekitTransportsEqual( - t1?.transport ?? null, - t2?.transport ?? null, - ); - }), + if (useOldestMember) { + // --- Oldest member mode --- + return { + // Never update the transport that we advertise in our membership. Just + // take the first valid oldest member or preferred transport that we learn + // about, and stick with that. This avoids unnecessary SFU hops and room + // state changes. + advertised$: scope.behavior( + merge( + oldestMemberTransport$, + preferredTransport$.pipe(map((t) => t?.transport ?? null)), + ).pipe( + first((t) => t !== null), + tap((t) => + logger.info(`Advertise transport: ${t.livekit_service_url}`), + ), + ), + null, + ), + // Publish on the transport used by the oldest member. + active$: scope.behavior( + oldestMemberTransport$.pipe( + switchMap((transport) => { + // Oldest member not available (or invalid SFU config). + if (transport === null) return of(null); + // Oldest member available: fetch the SFU config. + const fetchOldestMemberTransport = + async (): Promise => ({ + transport, + sfuConfig: await getSFUConfigWithOpenID( + client, + ownMembershipIdentity, + transport.livekit_service_url, + roomId, + { forceJwtEndpoint: JwtEndpointVersion.Legacy }, + logger, + ), + }); + return from(fetchOldestMemberTransport()).pipe(startWith(null)); + }), + tap((t) => + logger.info( + `Publish on transport: ${t?.transport.livekit_service_url}`, + ), + ), + ), + ), + }; + } + + // --- Multi-SFU mode --- + // Always publish on and advertise the preferred transport. + return { + advertised$: scope.behavior( + preferredTransport$.pipe( + map((t) => t?.transport ?? null), + distinctUntilChanged(areLivekitTransportsEqual), + ), ), - ); + active$: preferredTransport$, + }; }; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index e4376c12..727f68bc 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -90,7 +90,7 @@ export interface IConnectionManager { * @param props - Configuration object * @param props.scope - The observable scope used by this object * @param props.connectionFactory - Used to create new connections - * @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$) + * @param props.localTransport$ - The transport to publish local media on. (deduplicated with remoteTransports$) * @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$) * @param props.ownMembershipIdentity - The own membership identity to use. * @param props.logger - The logger to use. @@ -164,21 +164,21 @@ export function createConnectionManager$({ generateItemsWithEpoch( "ConnectionManager connections$", function* (transports) { - for (const transportWithOrWithoutSfuConfig of transports) { - if ( - isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig) - ) { - // This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field - const { transport, sfuConfig } = transportWithOrWithoutSfuConfig; + for (const transport of transports) { + if (isLocalTransportWithSFUConfig(transport)) { + // This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field. yield { - keys: [transport.livekit_service_url, sfuConfig], + keys: [ + transport.transport.livekit_service_url, + transport.sfuConfig, + ], data: undefined, }; } else { yield { keys: [ - transportWithOrWithoutSfuConfig.livekit_service_url, - undefined as undefined | SFUConfig, + transport.livekit_service_url, + undefined as SFUConfig | undefined, ], data: undefined, }; @@ -194,6 +194,8 @@ export function createConnectionManager$({ }, ownMembershipIdentity, logger, + // TODO: This whole optional SFUConfig parameter is not particularly elegant. + // I would like it if connections always fetched the SFUConfig by themselves. sfuConfig, ); // Start the connection immediately From 6cf859fd9e242501620c3053507d4a669f56004e Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 13 Feb 2026 12:39:40 +0100 Subject: [PATCH 2/4] Fix resource leak due to unsafe scope reference --- src/state/CallViewModel/CallViewModel.ts | 51 +++++++++++++----------- src/utils/observable.ts | 32 +++++++++++++++ 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index e2c6e46e..90bf71fe 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -60,6 +60,7 @@ import { import { accumulate, filterBehavior, + generateItem, generateItems, pauseWhen, } from "../../utils/observable"; @@ -446,29 +447,33 @@ export function createCallViewModel$( const localTransport$ = scope.behavior( matrixRTCMode$.pipe( - map((mode) => - createLocalTransport$({ - scope: scope, - memberships$: memberships$, - ownMembershipIdentity, - client, - delayId$: scope.behavior( - ( - fromEvent( - matrixRTCSession, - MembershipManagerEvent.DelayIdChanged, - // The type of reemitted event includes the original emitted as the second arg. - ) as Observable<[string | undefined, IMembershipManager]> - ).pipe(map(([delayId]) => delayId ?? null)), - matrixRTCSession.delayId ?? null, - ), - roomId: matrixRoom.roomId, - forceJwtEndpoint: - mode === MatrixRTCMode.Matrix_2_0 - ? JwtEndpointVersion.Matrix_2_0 - : JwtEndpointVersion.Legacy, - useOldestMember: mode === MatrixRTCMode.Legacy, - }), + generateItem( + "CallViewModel localTransport$", + // Re-create LocalTransport whenever the mode changes + (mode) => ({ keys: [mode], data: undefined }), + (scope, _data$, mode) => + createLocalTransport$({ + scope: scope, + memberships$: memberships$, + ownMembershipIdentity, + client, + delayId$: scope.behavior( + ( + fromEvent( + matrixRTCSession, + MembershipManagerEvent.DelayIdChanged, + // The type of reemitted event includes the original emitted as the second arg. + ) as Observable<[string | undefined, IMembershipManager]> + ).pipe(map(([delayId]) => delayId ?? null)), + matrixRTCSession.delayId ?? null, + ), + roomId: matrixRoom.roomId, + forceJwtEndpoint: + mode === MatrixRTCMode.Matrix_2_0 + ? JwtEndpointVersion.Matrix_2_0 + : JwtEndpointVersion.Legacy, + useOldestMember: mode === MatrixRTCMode.Legacy, + }), ), ), ); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index d4182021..2e19748b 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -213,6 +213,38 @@ export function filterBehavior( ); } +/** + * Maps a changing input value to an item whose lifetime is tied to a certain + * computed key. The item may capture some dynamic data from the input. + */ +export function generateItem< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, +>( + name: string, + generator: (input: Input) => { keys: readonly [...Keys]; data: Data }, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, +): OperatorFunction { + return (input$) => + input$.pipe( + generateItemsInternal( + name, + function* (input) { + yield generator(input); + }, + factory, + (items) => items, + ), + map(([item]) => item), + ); +} + function generateItemsInternal< Input, Keys extends [unknown, ...unknown[]], From 2a56830426036fa59c81624c7a70a4372d5a235a Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 13 Feb 2026 12:43:13 +0100 Subject: [PATCH 3/4] Fix existing LocalTransport tests --- .../localMember/LocalMember.test.ts | 49 ++--- .../localMember/LocalTransport.test.ts | 190 ++++++++---------- 2 files changed, 99 insertions(+), 140 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index b228cd08..e5e9f327 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -39,7 +39,6 @@ import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport"; import { initializeWidget } from "../../../widget"; initializeWidget(); @@ -216,11 +215,10 @@ describe("LocalMembership", () => { it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const localTransport$ = - scope.behavior( - hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - null, - ); + const localTransport$ = scope.behavior( + hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), + null, + ); // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { @@ -279,23 +277,11 @@ describe("LocalMembership", () => { }); const aTransport = { - transport: { - livekit_service_url: "a", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "a", + } as LivekitTransportConfig; const bTransport = { - transport: { - livekit_service_url: "b", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "b", + } as LivekitTransportConfig; const connectionTransportAConnected = { livekitRoom: mockLivekitRoom({ @@ -305,7 +291,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant(ConnectionState.LivekitConnected), - transport: aTransport.transport, + transport: aTransport, } as unknown as Connection; const connectionTransportAConnecting = { ...connectionTransportAConnected, @@ -314,7 +300,7 @@ describe("LocalMembership", () => { } as unknown as Connection; const connectionTransportBConnected = { state$: constant(ConnectionState.LivekitConnected), - transport: bTransport.transport, + transport: bTransport, livekitRoom: mockLivekitRoom({}), } as unknown as Connection; @@ -368,12 +354,8 @@ describe("LocalMembership", () => { // stop the first Publisher and let the second one life. expect(publishers[0].destroy).toHaveBeenCalled(); expect(publishers[1].destroy).not.toHaveBeenCalled(); - expect(publisherFactory.mock.calls[0][0].transport).toBe( - aTransport.transport, - ); - expect(publisherFactory.mock.calls[1][0].transport).toBe( - bTransport.transport, - ); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); scope.end(); await flushPromises(); // stop all tracks after ending scopes @@ -446,8 +428,9 @@ describe("LocalMembership", () => { const scope = new ObservableScope(); const connectionManagerData = new ConnectionManagerData(); - const localTransport$ = - new BehaviorSubject(null); + const localTransport$ = new BehaviorSubject( + null, + ); const connectionManagerData$ = new BehaviorSubject( new Epoch(connectionManagerData), ); @@ -519,7 +502,7 @@ describe("LocalMembership", () => { }); ( - connectionManagerData2.getConnectionForTransport(aTransport.transport)! + connectionManagerData2.getConnectionForTransport(aTransport)! .state$ as BehaviorSubject ).next(ConnectionState.LivekitConnected); expect(localMembership.localMemberState$.value).toStrictEqual({ diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index 2476923a..e63f7c72 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -43,10 +43,10 @@ describe("LocalTransport", () => { afterEach(() => scope.end()); it("throws if config is missing", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -58,14 +58,15 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( new MatrixRTCTransportMissingError(""), ); + expect(() => active$.value).toThrow(new MatrixRTCTransportMissingError("")); }); it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => { @@ -83,10 +84,10 @@ describe("LocalTransport", () => { ); const observations: unknown[] = []; const errors: Error[] = []; - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!example_room_id", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { baseUrl: "https://lk.example.org", @@ -98,10 +99,10 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); - localTransport$.subscribe( + active$.subscribe( (o) => observations.push(o), (e) => errors.push(e), ); @@ -111,7 +112,8 @@ describe("LocalTransport", () => { const expectedError = new FailToGetOpenIdToken(new Error("no openid")); expect(observations).toStrictEqual([null]); expect(errors).toStrictEqual([expectedError]); - expect(() => localTransport$.value).toThrow(expectedError); + expect(() => advertised$.value).toThrow(expectedError); + expect(() => active$.value).toThrow(expectedError); }); it("emits preferred transport after OpenID resolves", async () => { @@ -126,10 +128,10 @@ describe("LocalTransport", () => { openIdResolver.promise, ); - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -140,7 +142,7 @@ describe("LocalTransport", () => { baseUrl: "https://lk.example.org", }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); @@ -150,14 +152,17 @@ describe("LocalTransport", () => { livekitAlias: "Akph4alDMhen", livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId, }); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "jwt", livekitAlias: "Akph4alDMhen", @@ -167,53 +172,8 @@ describe("LocalTransport", () => { }); }); - it("updates local transport when oldest member changes", async () => { - // Use config so transport discovery succeeds, but delay OpenID JWT fetch - mockConfig({ - livekit: { livekit_service_url: "https://lk.example.org" }, - }); - const memberships$ = new BehaviorSubject(new Epoch([])); - const openIdResolver = Promise.withResolvers(); - - vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( - openIdResolver.promise, - ); - - const localTransport$ = createLocalTransport$({ - scope, - roomId: "!example_room_id", - useOldestMember$: constant(true), - memberships$, - client: { - getDomain: () => "", - // eslint-disable-next-line @typescript-eslint/naming-convention - _unstable_getRTCTransports: async () => Promise.resolve([]), - getOpenIdToken: vi.fn(), - getDeviceId: vi.fn(), - baseUrl: "https://lk.example.org", - }, - ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), - delayId$: constant("delay_id_mock"), - }); - - openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); - await flushPromises(); - // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, - sfuConfig: { - jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", - livekitAlias: "Akph4alDMhen", - livekitIdentity: "@lk_user:ABCDEF", - url: "https://lk.example.org", - }, - }); - }); + // TODO: This test previously didn't test what it claims to. + it.todo("updates local transport when oldest member changes"); type LocalTransportProps = Parameters[0]; @@ -229,8 +189,8 @@ describe("LocalTransport", () => { ownMembershipIdentity: ownMemberMock, scope, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -256,15 +216,19 @@ describe("LocalTransport", () => { mockConfig({ livekit: { livekit_service_url: "https://lk.example.org" }, }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -273,13 +237,15 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via user settings", async () => { customLivekitUrl.setValue("https://lk.example.org"); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ + expect(active$.value).toStrictEqual({ transport: { livekit_service_url: "https://lk.example.org", type: "livekit", @@ -292,19 +258,24 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via backend", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, ]); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -313,6 +284,7 @@ describe("LocalTransport", () => { }, }); }); + it("fails fast if the openID request fails for backend config", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, @@ -320,13 +292,11 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("supports getting transport via well-known", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -334,15 +304,19 @@ describe("LocalTransport", () => { { type: "livekit", livekit_service_url: "https://lk.example.org" }, ], }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -352,6 +326,7 @@ describe("LocalTransport", () => { }); expect(fetchMock.done()).toEqual(true); }); + it("fails fast if the openId request fails for the well-known config", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -362,20 +337,18 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("throws if no options are available", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, ownMembershipIdentity: ownMemberMock, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -390,7 +363,10 @@ describe("LocalTransport", () => { }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( + new MatrixRTCTransportMissingError(""), + ); + expect(() => active$.value).toThrow( new MatrixRTCTransportMissingError(""), ); }); From 450800294765baa2e42f1940a35bc4bd3f65994d Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 13 Feb 2026 13:46:32 +0100 Subject: [PATCH 4/4] Test local transport code in oldest member mode --- .../localMember/LocalTransport.test.ts | 135 +++++++++++++++++- 1 file changed, 130 insertions(+), 5 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index e63f7c72..8454b09a 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -13,15 +13,24 @@ import { it, type MockedObject, vi, + type MockInstance, } from "vitest"; -import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { + type CallMembership, + type LivekitTransportConfig, +} from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, lastValueFrom } from "rxjs"; import fetchMock from "fetch-mock"; -import { mockConfig, flushPromises, ownMemberMock } from "../../../utils/test"; +import { + mockConfig, + flushPromises, + ownMemberMock, + mockRtcMembership, +} from "../../../utils/test"; import { createLocalTransport$, JwtEndpointVersion } from "./LocalTransport"; import { constant } from "../../Behavior"; -import { Epoch, ObservableScope } from "../../ObservableScope"; +import { Epoch, ObservableScope, trackEpoch } from "../../ObservableScope"; import { MatrixRTCTransportMissingError, FailToGetOpenIdToken, @@ -172,8 +181,124 @@ describe("LocalTransport", () => { }); }); - // TODO: This test previously didn't test what it claims to. - it.todo("updates local transport when oldest member changes"); + describe("oldest member mode", () => { + const aliceTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://alice.example.org", + }; + const bobTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://bob.example.org", + }; + const aliceMembership = mockRtcMembership("@alice:example.org", "AAA", { + fociPreferred: [aliceTransport], + }); + const bobMembership = mockRtcMembership("@bob:example.org", "BBB", { + fociPreferred: [bobTransport], + }); + + let openIdSpy: MockInstance<(typeof openIDSFU)["getSFUConfigWithOpenID"]>; + beforeEach(() => { + openIdSpy = vi + .spyOn(openIDSFU, "getSFUConfigWithOpenID") + .mockResolvedValue(openIdResponse); + }); + + it("updates active transport when oldest member changes", async () => { + // Initially, Alice is the only member + const memberships$ = new BehaviorSubject([aliceMembership]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => Promise.resolve([]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Alice's transport should be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob joins the call, but Alice is still the oldest member + openIdSpy.mockClear(); + memberships$.next([aliceMembership, bobMembership]); + await flushPromises(); + // No new SFU config should've been fetched + expect(openIdSpy).not.toHaveBeenCalled(); + // Alice's transport should still be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob takes Alice's place as the oldest member + openIdSpy.mockClear(); + memberships$.next([bobMembership, aliceMembership]); + // Active transport should reset to null until we have Bob's SFU config + expect(active$.value).toStrictEqual(null); + await flushPromises(); + // Bob's SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Bob's transport should be active, but Alice's should remain advertised + // (since we don't want the change in oldest member to cause a wave of new + // state events) + expect(active$.value?.transport).toStrictEqual(bobTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + }); + + it("advertises preferred transport when no other member exists", async () => { + // Initially, there are no members + const memberships$ = new BehaviorSubject([]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => + Promise.resolve([aliceTransport]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // Our own preferred transport should be advertised + expect(advertised$.value).toStrictEqual(aliceTransport); + // No transport should be active however (there is still no oldest member) + expect(active$.value).toBe(null); + + // Now Bob joins the call and becomes the oldest member + memberships$.next([bobMembership]); + await flushPromises(); + // We should still advertise our own preferred transport (to avoid + // unnecessary state changes) + expect(advertised$.value).toStrictEqual(aliceTransport); + // Bob's transport should become active + expect(active$.value?.transport).toBe(bobTransport); + }); + }); type LocalTransportProps = Parameters[0];