diff --git a/src/livekit/openIDSFU.test.ts b/src/livekit/openIDSFU.test.ts index 20820748..fc0b6d54 100644 --- a/src/livekit/openIDSFU.test.ts +++ b/src/livekit/openIDSFU.test.ts @@ -19,12 +19,14 @@ import fetchMock from "fetch-mock"; import { getSFUConfigWithOpenID, type OpenIDClientParts } from "./openIDSFU"; import { testJWTToken } from "../utils/test-fixtures"; import { ownMemberMock } from "../utils/test"; +import { FailToGetOpenIdToken } from "../utils/errors"; const sfuUrl = "https://sfu.example.org"; describe("getSFUConfigWithOpenID", () => { let matrixClient: MockedObject; beforeEach(() => { + fetchMock.catch(404); matrixClient = { getOpenIdToken: vitest.fn(), getDeviceId: vitest.fn(), @@ -71,9 +73,10 @@ describe("getSFUConfigWithOpenID", () => { "https://sfu.example.org", "!example_room_id", ); - } catch (ex) { - expect((ex as Error).message).toEqual( - "SFU Config fetch failed with status code 500", + } catch (ex: unknown) { + expect(ex).toBeInstanceOf(FailToGetOpenIdToken); + expect((ex as FailToGetOpenIdToken).cause).toEqual( + new Error("SFU Config fetch failed with status code 500"), ); void (await fetchMock.flush()); return; @@ -106,8 +109,9 @@ describe("getSFUConfigWithOpenID", () => { }, ); } catch (ex) { - expect((ex as Error).message).toEqual( - "SFU Config fetch failed with status code 500", + expect(ex).toBeInstanceOf(FailToGetOpenIdToken); + expect((ex as FailToGetOpenIdToken).cause).toEqual( + new Error("SFU Config fetch failed with status code 500"), ); void (await fetchMock.flush()); } @@ -160,8 +164,9 @@ describe("getSFUConfigWithOpenID", () => { }, ); } catch (ex) { - expect((ex as Error).message).toEqual( - "SFU Config fetch failed with status code 500", + expect(ex).toBeInstanceOf(FailToGetOpenIdToken); + expect((ex as FailToGetOpenIdToken).cause).toEqual( + new Error("SFU Config fetch failed with status code 500"), ); void (await fetchMock.flush()); } diff --git a/src/livekit/openIDSFU.ts b/src/livekit/openIDSFU.ts index d3756e6c..dfe04323 100644 --- a/src/livekit/openIDSFU.ts +++ b/src/livekit/openIDSFU.ts @@ -5,11 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - retryNetworkOperation, - type IOpenIDToken, - type MatrixClient, -} from "matrix-js-sdk"; +import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk"; import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; import { type Logger } from "matrix-js-sdk/lib/logger"; @@ -70,6 +66,7 @@ export type OpenIDClientParts = Pick< MatrixClient, "getOpenIdToken" | "getDeviceId" >; + /** * Gets a bearer token from the homeserver and then use it to authenticate * to the matrix RTC backend in order to get acces to the SFU. @@ -113,9 +110,6 @@ export async function getSFUConfigWithOpenID( ); } logger?.debug("Got openID token", openIdToken); - - logger?.info(`Trying to get JWT for focus ${serviceUrl}...`); - let sfuConfig: { url: string; jwt: string } | undefined; const tryBothJwtEndpoints = opts?.forceJwtEndpoint === undefined; // This is for SFUs where we do not publish. @@ -127,7 +121,10 @@ export async function getSFUConfigWithOpenID( // if we can use both or if we are forced to use the new one. if (tryBothJwtEndpoints || forceMatrix2Jwt) { try { - sfuConfig = await getLiveKitJWTWithDelayDelegation( + logger?.info( + `Trying to get JWT with delegation for focus ${serviceUrl}...`, + ); + const sfuConfig = await getLiveKitJWTWithDelayDelegation( membership, serviceUrl, roomId, @@ -135,33 +132,24 @@ export async function getSFUConfigWithOpenID( opts?.delayEndpointBaseUrl, opts?.delayId, ); - logger?.info(`Got JWT from call's active focus URL.`); + + return extractFullConfigFromToken(sfuConfig); } catch (e) { logger?.debug(`Failed fetching jwt with matrix 2.0 endpoint:`, e); - if (e instanceof NotSupportedError) { - logger?.warn( - `Failed fetching jwt with matrix 2.0 endpoint (retry with legacy) Not supported`, - e, - ); - sfuConfig = undefined; - } else { - logger?.warn( - `Failed fetching jwt with matrix 2.0 endpoint other issues ->`, - `(not going to try with legacy endpoint: forceOldJwtEndpoint is set to false, we did not get a not supported error from the sfu)`, - e, - ); - // Make this throw a hard error in case we force the matrix2.0 endpoint. - if (forceMatrix2Jwt) - throw new NoMatrix2AuthorizationService(e as Error); - // NEVER get bejond this point if we forceMatrix2 and it failed! + // Make this throw a hard error in case we force the matrix2.0 endpoint. + if (forceMatrix2Jwt) { + throw new NoMatrix2AuthorizationService(e as Error); } } } // DEPRECATED - // here we either have a sfuConfig or we alredy exited because of `if (forceMatrix2) throw ...` + // here we either have a sfuConfig or we already exited because of `if (forceMatrix2) throw ...` // The only case we can get into this condition is, if `forceMatrix2` is `false` - if (sfuConfig === undefined) { + try { + logger?.info( + `Trying to get JWT with legacy endpoint for focus ${serviceUrl}...`, + ); sfuConfig = await getLiveKitJWT( membership.deviceId, serviceUrl, @@ -169,15 +157,19 @@ export async function getSFUConfigWithOpenID( openIdToken, ); logger?.info(`Got JWT from call's active focus URL.`); + return extractFullConfigFromToken(sfuConfig); + } catch (ex) { + throw new FailToGetOpenIdToken( + ex instanceof Error ? ex : new Error(`Unknown error ${ex}`), + ); } +} - if (!sfuConfig) { - throw new Error("No `sfuConfig` after trying with old and new endpoints"); - } - - // Pull the details from the JWT +function extractFullConfigFromToken(sfuConfig: { + url: string; + jwt: string; +}): SFUConfig { const [, payloadStr] = sfuConfig.jwt.split("."); - // TODO: Prefer Uint8Array.fromBase64 when widely available const payload = JSON.parse(global.atob(payloadStr)) as SFUJWTPayload; return { jwt: sfuConfig.jwt, @@ -189,16 +181,15 @@ export async function getSFUConfigWithOpenID( livekitIdentity: payload.sub, }; } -const RETRIES = 4; + async function getLiveKitJWT( deviceId: string, livekitServiceURL: string, matrixRoomId: string, openIDToken: IOpenIDToken, ): Promise<{ url: string; jwt: string }> { - let res: Response | undefined; - await retryNetworkOperation(RETRIES, async () => { - res = await fetch(livekitServiceURL + "/sfu/get", { + const res = await doNetworkOperationWithRetry(async () => { + return await fetch(livekitServiceURL + "/sfu/get", { method: "POST", headers: { "Content-Type": "application/json", @@ -211,11 +202,7 @@ async function getLiveKitJWT( }), }); }); - if (!res) { - throw new Error( - `Network error while connecting to jwt service after ${RETRIES} retries`, - ); - } + if (!res.ok) { throw new Error("SFU Config fetch failed with status code " + res.status); } @@ -262,10 +249,8 @@ export async function getLiveKitJWTWithDelayDelegation( }; } - let res: Response | undefined; - - await retryNetworkOperation(RETRIES, async () => { - res = await fetch(livekitServiceURL + "/get_token", { + const res = await doNetworkOperationWithRetry(async () => { + return await fetch(livekitServiceURL + "/get_token", { method: "POST", headers: { "Content-Type": "application/json", @@ -274,11 +259,6 @@ export async function getLiveKitJWTWithDelayDelegation( }); }); - if (!res) { - throw new Error( - `Network error while connecting to jwt service after ${RETRIES} retries`, - ); - } if (!res.ok) { const msg = "SFU Config fetch failed with status code " + res.status; if (res.status === 404) { diff --git a/src/state/CallViewModel/CallViewModelTestUtils.ts b/src/state/CallViewModel/CallViewModelTestUtils.ts index b6bf8a9a..09a43fc3 100644 --- a/src/state/CallViewModel/CallViewModelTestUtils.ts +++ b/src/state/CallViewModel/CallViewModelTestUtils.ts @@ -131,6 +131,9 @@ export function withCallViewModel(mode: MatrixRTCMode) { public getSyncState(): SyncState { return syncState; } + public getAccessToken(): string | null { + return "a-token"; + } })() as Partial as MatrixClient, getMembers: () => roomMembers, getMembersWithMembership: () => roomMembers, diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index f6c00c7f..cf7555fa 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -19,7 +19,7 @@ import { type CallMembership, type LivekitTransportConfig, } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, lastValueFrom } from "rxjs"; +import { BehaviorSubject, filter, lastValueFrom } from "rxjs"; import fetchMock from "fetch-mock"; import { @@ -27,8 +27,13 @@ import { flushPromises, ownMemberMock, mockRtcMembership, + testScope, } from "../../../utils/test"; -import { createLocalTransport$, JwtEndpointVersion } from "./LocalTransport"; +import { + createLocalTransport$, + JwtEndpointVersion, + type LocalTransportWithSFUConfig, +} from "./LocalTransport"; import { constant } from "../../Behavior"; import { Epoch, ObservableScope, trackEpoch } from "../../ObservableScope"; import { @@ -47,13 +52,11 @@ describe("LocalTransport", () => { livekitIdentity: "@lk_user:ABCDEF", }; - let scope: ObservableScope; - beforeEach(() => (scope = new ObservableScope())); - afterEach(() => scope.end()); + beforeEach(() => vi.clearAllMocks()); it("throws if config is missing", async () => { const { advertised$, active$ } = createLocalTransport$({ - scope, + scope: testScope(), roomId: "!room:example.org", useOldestMember: false, memberships$: constant(new Epoch([])), @@ -140,7 +143,7 @@ describe("LocalTransport", () => { ); const { advertised$, active$ } = createLocalTransport$({ - scope, + scope: testScope(), roomId: "!room:example.org", useOldestMember: false, memberships$: constant(new Epoch([])), @@ -211,6 +214,7 @@ describe("LocalTransport", () => { // Initially, Alice is the only member const memberships$ = new BehaviorSubject([aliceMembership]); + const scope = testScope(); const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!example_room_id", @@ -267,6 +271,7 @@ describe("LocalTransport", () => { // Initially, there are no members const memberships$ = new BehaviorSubject([]); + const scope = testScope(); const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!example_room_id", @@ -317,7 +322,7 @@ describe("LocalTransport", () => { customLivekitUrl.setValue(customLivekitUrl.defaultValue); localTransportOpts = { ownMembershipIdentity: ownMemberMock, - scope, + scope: testScope(), roomId: "!example_room_id", useOldestMember: false, forceJwtEndpoint: JwtEndpointVersion.Legacy, @@ -511,7 +516,7 @@ describe("LocalTransport", () => { it("throws if no options are available", async () => { const { advertised$, active$ } = createLocalTransport$({ - scope, + scope: testScope(), ownMembershipIdentity: ownMemberMock, roomId: "!example_room_id", useOldestMember: false, @@ -539,4 +544,86 @@ describe("LocalTransport", () => { ); }); }); + + it("should not update advertised/active transport on delayID changes, but delay Id delegation should be called", async () => { + // For simplicity, we'll just use the config livekit + customLivekitUrl.setValue("https://lk.example.org"); + + const authCallSpy = vi + .spyOn(openIDSFU, "getSFUConfigWithOpenID") + .mockResolvedValue(openIdResponse); + + const delayId$ = new BehaviorSubject(null); + + const { advertised$, active$ } = createLocalTransport$({ + scope: testScope(), + ownMembershipIdentity: ownMemberMock, + roomId: "!example_room_id", + // We want multi-sdu + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: delayId$, + memberships$: constant(new Epoch([])), + client: { + getDomain: () => "", + baseUrl: "https://example.org", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => Promise.resolve([]), + getAccessToken: vi.fn().mockReturnValue("access_token"), + // These won't be called in this error path but satisfy the type + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + + const advertisedValues: LivekitTransportConfig[] = []; + const activeValues: LocalTransportWithSFUConfig[] = []; + advertised$ + .pipe(filter((v) => v !== null)) + .subscribe((t) => advertisedValues.push(t)); + active$ + .pipe(filter((v) => v !== null)) + .subscribe((t) => activeValues.push(t)); + + await flushPromises(); + + // we have now an active and an advertised + expect(advertisedValues.length).toEqual(1); + expect(activeValues.length).toEqual(1); + expect(advertisedValues[0]!.livekit_service_url).toEqual( + "https://lk.example.org", + ); + expect(activeValues[0]!.transport.livekit_service_url).toEqual( + "https://lk.example.org", + ); + + expect(authCallSpy).toHaveBeenCalledTimes(2); + // Now emits 3 new delays id + delayId$.next("delay_id_1"); + await flushPromises(); + delayId$.next("delay_id_2"); + await flushPromises(); + delayId$.next("delay_id_3"); + await flushPromises(); + + // No new emissions should've happened, it is the same transport. + expect(advertisedValues.length).toEqual(1); + expect(activeValues.length).toEqual(1); + + // Still we should have updated the delayID to auth + expect(authCallSpy).toHaveBeenCalledTimes( + 4 * 2 /* 2 calls for each delayId ?? why */, + ); + + expect(authCallSpy).toHaveBeenLastCalledWith( + expect.anything(), + expect.anything(), + expect.anything(), + expect.anything(), + expect.objectContaining({ + delayId: "delay_id_3", + }), + expect.anything(), + ); + }); }); diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index da4fe1dc..037b6a0b 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -8,16 +8,18 @@ Please see LICENSE in the repository root for full details. import { type CallMembership, isLivekitTransportConfig, - type Transport, type LivekitTransportConfig, } from "matrix-js-sdk/lib/matrixrtc"; -import { MatrixError, type MatrixClient } from "matrix-js-sdk"; +import { type MatrixClient } from "matrix-js-sdk"; import { + catchError, + combineLatest, distinctUntilChanged, first, from, map, merge, + type Observable, of, startWith, switchMap, @@ -42,6 +44,7 @@ import { } from "../../../livekit/openIDSFU.ts"; import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; import { customLivekitUrl } from "../../../settings/settings.ts"; +import { RtcTransportAutoDiscovery } from "./RtcTransportAutoDiscovery.ts"; const logger = rootLogger.getChild("[LocalTransport]"); @@ -137,11 +140,116 @@ export const createLocalTransport$ = ({ forceJwtEndpoint, delayId$, }: Props): LocalTransport => { - /** - * The LiveKit transport in use by the oldest RTC membership. `null` when the - * oldest member has no such transport. - */ - const oldestMemberTransport$ = scope.behavior( + // The LiveKit transport in use by the oldest RTC membership. `null` when the + // oldest member has no such transport. + const oldestMemberTransport$ = observerOldestMembership$(scope, memberships$); + + const transportDiscovery = new RtcTransportAutoDiscovery({ + client: client, + resolvedConfig: Config.get(), + wellKnownFetcher: AutoDiscovery.getRawClientConfig.bind(AutoDiscovery), + logger: logger, + }); + + // Get the preferred transport from the current deployment. + const discoveredTransport$ = from( + transportDiscovery.discoverPreferredTransport(), + ); + + const preferredConfig$ = customLivekitUrl.value$ + .pipe( + switchMap((customUrl) => { + if (customUrl) { + return of({ + type: "livekit", + livekit_service_url: customUrl, + } as LivekitTransportConfig); + } else { + return discoveredTransport$; + } + }), + ) + .pipe( + map((config) => { + if (!config) { + // Bubbled up from the preferredConfig$ observable. + throw new MatrixRTCTransportMissingError(client.getDomain() ?? ""); + } + return config; + }), + distinctUntilChanged(areLivekitTransportsEqual), + ); + + const preferredTransport$ = combineLatest([preferredConfig$, delayId$]).pipe( + switchMap(async ([transport, delayId]) => { + try { + return await doOpenIdAndJWTFromUrl( + transport, + forceJwtEndpoint, + ownMembershipIdentity, + roomId, + client, + delayId ?? undefined, + ); + } catch (e) { + logger.error( + `Failed to authenticate to transport ${transport.livekit_service_url}`, + e, + ); + throw mapAuthErrorToUserFriendlyError(e); + } + }), + ); + + if (useOldestMember) { + return observeLocalTransportForOldestMembership( + scope, + oldestMemberTransport$, + preferredTransport$, + client, + ownMembershipIdentity, + roomId, + ); + } + + // --- Multi-SFU mode --- + // Always publish on and advertise the preferred transport. + return { + advertised$: scope.behavior( + preferredTransport$.pipe( + map((t) => t.transport), + distinctUntilChanged(areLivekitTransportsEqual), + ), + null, + ), + active$: scope.behavior( + preferredTransport$.pipe( + // XXX: WORK AROUND due to a reconnection glitch. + // To remove when we have a proper way to refresh the delegation event ID without refreshing + // the whole credentials. + // We deliberately hide any changes to the SFU config because we + // do not want the app to reconnect whenever the JWT + // token changes due to us delegating a new delayed event. The + // initial SFU config for the transport is all the app needs. + distinctUntilChanged((prev, next) => + areLivekitTransportsEqual(prev.transport, next.transport), + ), + ), + null, + ), + }; +}; + +/** + * Observes the oldest member in the room and returns the transport that it uses if it is a livekit transport. + * @param scope - The observable scope. + * @param memberships$ - The observable of the call's memberships.' + */ +function observerOldestMembership$( + scope: ObservableScope, + memberships$: Behavior>, +): Behavior { + return scope.behavior( memberships$.pipe( map((memberships) => { const oldestMember = memberships.value[0]; @@ -170,317 +278,141 @@ export const createLocalTransport$ = ({ distinctUntilChanged(areLivekitTransportsEqual), ), ); - - /** - * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). `null` until fetched and - * validated. - * - * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken - */ - const preferredTransport$ = - scope.behavior( - // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new - // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity - // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) - // When using sticky events (we need to use the new endpoint). - customLivekitUrl.value$.pipe( - switchMap((customUrl) => - startWith(null)( - // Fetch the SFU config, and repeat this asynchronously for every - // change in delay ID. - delayId$.pipe( - switchMap(async (delayId) => { - logger.info( - "Creating preferred transport based on: ", - "customUrl: ", - customUrl, - "delayId: ", - delayId, - "forceJwtEndpoint: ", - forceJwtEndpoint, - ); - return makeTransport( - client, - ownMembershipIdentity, - roomId, - customUrl, - forceJwtEndpoint, - delayId ?? undefined, - ); - }), - // We deliberately hide any changes to the SFU config because we - // do not actually want the app to reconnect whenever the JWT - // token changes due to us delegating a new delayed event. The - // initial SFU config for the transport is all the app needs. - distinctUntilChanged((prev, next) => - areLivekitTransportsEqual(prev.transport, next.transport), - ), - ), - ), - ), - ), - ); - - if (useOldestMember) { - // --- Oldest member mode --- - return { - // Never update the transport that we advertise in our membership. Just - // take the first valid oldest member or preferred transport that we learn - // about, and stick with that. This avoids unnecessary SFU hops and room - // state changes. - advertised$: scope.behavior( - merge( - oldestMemberTransport$, - preferredTransport$.pipe(map((t) => t?.transport ?? null)), - ).pipe( - first((t) => t !== null), - tap((t) => - logger.info(`Advertise transport: ${t.livekit_service_url}`), - ), - ), - null, - ), - // Publish on the transport used by the oldest member. - active$: scope.behavior( - oldestMemberTransport$.pipe( - switchMap((transport) => { - // Oldest member not available (or invalid SFU config). - if (transport === null) return of(null); - // Oldest member available: fetch the SFU config. - const fetchOldestMemberTransport = - async (): Promise => ({ - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }); - return from(fetchOldestMemberTransport()).pipe(startWith(null)); - }), - tap((t) => - logger.info( - `Publish on transport: ${t?.transport.livekit_service_url}`, - ), - ), - ), - ), - }; - } - - // --- Multi-SFU mode --- - // Always publish on and advertise the preferred transport. - return { - advertised$: scope.behavior( - preferredTransport$.pipe( - map((t) => t?.transport ?? null), - distinctUntilChanged(areLivekitTransportsEqual), - ), - ), - active$: preferredTransport$, - }; -}; - -const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; +} /** - * Determine the correct Transport for the current session, including - * validating auth against the service to ensure it's correct. - * Prefers in order: + * Utility to ensure the user can authenticate with the SFU. + * We will call `getSFUConfigWithOpenID` once per transport here as it's our + * only mechanism of validation. This means we will also ask the + * homeserver for a OpenID token a few times. Since OpenID tokens are single + * use we don't want to risk any issues by re-using a token. * - - * 1. The `urlFromDevSettings` value. If this cannot be validated, the function will throw. - * 2. The transports returned via the homeserver. - * 3. The transports returned via .well-known. - * 4. The transport configured in Element Call's config. + * @param transport The transport to authenticate with. + * @param forceJwtEndpoint Whether to force the JWT endpoint to be used. + * @param membership The identity of the local member. + * @param roomId The room ID to use for the JWT. + * @param client The client to use for the OpenID token. + * @param delayId The delayId to use for the JWT. * - * @param client The authenticated Matrix client for the current user - * @param membership The membership identity of the user. - * @param roomId The ID of the room to be connected to. - * @param urlFromDevSettings Override URL provided by the user's local config. - * @param forceJwtEndpoint Whether to force a specific JWT endpoint - * - `Legacy` / `Matrix_2_0` - * - `get_token` / `sfu/get` - * - not hashing / hashing the backendIdentity - * @param delayId the delay id passed to the jwt service. - * - * @returns A fully validated transport config. - * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken + * @throws FailToGetOpenIdToken, NoMatrix2AuthorizationService */ -async function makeTransport( +async function doOpenIdAndJWTFromUrl( + transport: LivekitTransportConfig, + forceJwtEndpoint: JwtEndpointVersion, + membership: CallMembershipIdentityParts, + roomId: string, client: Pick< MatrixClient, "getDomain" | "baseUrl" | "_unstable_getRTCTransports" | "getAccessToken" > & OpenIDClientParts, - membership: CallMembershipIdentityParts, - roomId: string, - urlFromDevSettings: string | null, - forceJwtEndpoint: JwtEndpointVersion, delayId?: string, ): Promise { - logger.trace("Searching for a preferred transport"); - - async function doOpenIdAndJWTFromUrl( - url: string, - ): Promise { - const sfuConfig = await getSFUConfigWithOpenID( - client, - membership, - url, - roomId, - { - forceJwtEndpoint: forceJwtEndpoint, - delayEndpointBaseUrl: client.baseUrl, - delayId, - }, - logger, - ); - return { - transport: { - type: "livekit", - livekit_service_url: url, - }, - sfuConfig, - }; - } - // We will call `getSFUConfigWithOpenID` once per transport here as it's our - // only mechanism of valiation. This means we will also ask the - // homeserver for a OpenID token a few times. Since OpenID tokens are single - // use we don't want to risk any issues by re-using a token. - // - // If the OpenID request were to fail then it's acceptable for us to fail - // this function early, as we assume the homeserver has got some problems. - - // DEVTOOL: Highest priority: Load from devtool setting - if (urlFromDevSettings !== null) { - // Validate that the SFU is up. Otherwise, we want to fail on this - // as we don't permit other SFUs. - // This will call the jwt/sfu/get endpoint to pre create the livekit room. - logger.info("Using LiveKit transport from dev tools: ", urlFromDevSettings); - return await doOpenIdAndJWTFromUrl(urlFromDevSettings); - } - - async function getFirstUsableTransport( - transports: Transport[], - ): Promise { - for (const potentialTransport of transports) { - if (isLivekitTransportConfig(potentialTransport)) { - try { - logger.info( - `makeTransport: check transport authentication for "${potentialTransport.livekit_service_url}"`, - ); - // This will call the jwt/sfu/get endpoint to pre create the livekit room. - return await doOpenIdAndJWTFromUrl( - potentialTransport.livekit_service_url, - ); - } catch (ex) { - logger.debug( - `makeTransport: Could not use SFU service "${potentialTransport.livekit_service_url}" as SFU`, - ex, - ); - // Explictly throw these - if (ex instanceof FailToGetOpenIdToken) { - throw ex; - } - if (ex instanceof NoMatrix2AuthorizationService) { - throw ex; - } - } - } else { - logger.info( - `makeTransport: "${potentialTransport.livekit_service_url}" is not a valid livekit transport as SFU`, - ); - } - } - return null; - } - - // MSC4143: Attempt to fetch transports from backend. - // TODO: Workaround for an issue in the js-sdk RoomWidgetClient that - // is not yet implementing _unstable_getRTCTransports properly (via widget API new action). - // For now we just skip this call if we are in a widget. - // In widget mode the client is a `RoomWidgetClient` which has no access token (it is using the widget API). - // Could be removed once the js-sdk is fixed (https://github.com/matrix-org/matrix-js-sdk/issues/5245) - const isSPA = !!client.getAccessToken(); - if (isSPA && "_unstable_getRTCTransports" in client) { - logger.info( - "makeTransport: First try to use getRTCTransports end point ...", - ); - try { - // TODO This should also check for server support? - const transportList = await client._unstable_getRTCTransports(); - const selectedTransport = await getFirstUsableTransport(transportList); - if (selectedTransport) { - logger.info( - "makeTransport: ...Using backend-configured (client.getRTCTransports) SFU", - selectedTransport, - ); - return selectedTransport; - } - } catch (ex) { - if (ex instanceof MatrixError && ex.httpStatus === 404) { - // Expected, this is an unstable endpoint and it's not required. - // There will be expected 404 errors in the console. When we check if synapse supports the endpoint. - logger.debug( - "Matrix homeserver does not provide any RTC transports via `/rtc/transports` (will retry with well-known.)", - ); - } else if (ex instanceof FailToGetOpenIdToken) { - throw ex; - } else { - // We got an error that wasn't just missing support for the feature, so log it loudly. - logger.error( - "Unexpected error fetching RTC transports from backend", - ex, - ); - } - } - } - - logger.info( - `makeTransport: Trying to get transports from .well-known/matrix/client on domain ${client.getDomain()} ...`, + const sfuConfig = await getSFUConfigWithOpenID( + client, + membership, + transport.livekit_service_url, + roomId, + { + forceJwtEndpoint: forceJwtEndpoint, + delayEndpointBaseUrl: client.baseUrl, + delayId, + }, + logger, + ); + return { + transport, + sfuConfig, + }; +} + +function observeLocalTransportForOldestMembership( + scope: ObservableScope, + oldestMemberTransport$: Behavior, + preferredTransport$: Observable, + client: Pick< + MatrixClient, + "getDomain" | "baseUrl" | "_unstable_getRTCTransports" | "getAccessToken" + > & + OpenIDClientParts, + ownMembershipIdentity: CallMembershipIdentityParts, + roomId: string, +): LocalTransport { + // Ensure we can authenticate with the SFU. + const authenticatedOldestMemberTransport$ = oldestMemberTransport$.pipe( + switchMap((transport) => { + // Oldest member not available -we are first- (or invalid SFU config). + if (transport === null) return of(null); + + // Whenever there is transport change we want to revert + // to no transport while we do the authentication. + // So do a from(promise) here to be able to startWith(null) + return from( + doOpenIdAndJWTFromUrl( + transport, + JwtEndpointVersion.Legacy, + ownMembershipIdentity, + roomId, + client, + undefined, + ), + ).pipe( + catchError((e: unknown) => { + logger.error( + `Failed to authenticate to transport ${transport.livekit_service_url}`, + e, + ); + throw mapAuthErrorToUserFriendlyError(e); + }), + startWith(null), + ); + }), + ); + + // --- Oldest member mode --- + return { + // Never update the transport that we advertise in our membership. Just + // take the first valid oldest member or preferred transport that we learn + // about, and stick with that. This avoids unnecessary SFU hops and room + // state changes. + advertised$: scope.behavior( + merge( + authenticatedOldestMemberTransport$.pipe( + map((t) => t?.transport ?? null), + ), + preferredTransport$.pipe(map((t) => t.transport)), + ).pipe( + first((t) => t !== null), + tap((t) => + logger.info(`Advertise transport: ${t.livekit_service_url}`), + ), + ), + null, + ), + // Publish on the transport used by the oldest member. + active$: scope.behavior( + authenticatedOldestMemberTransport$.pipe( + tap((t) => + logger.info( + `Publish on transport: ${t?.transport.livekit_service_url}`, + ), + ), + ), + null, + ), + }; +} + +function mapAuthErrorToUserFriendlyError(e: unknown): Error { + if ( + e instanceof FailToGetOpenIdToken || + e instanceof NoMatrix2AuthorizationService + ) { + // rethrow as is + return e; + } + // Catch others and rethrow as FailToGetOpenIdToken that has user friendly message. + return new FailToGetOpenIdToken( + e instanceof Error ? e : new Error(String(e)), ); - - // Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available. - const domain = client.getDomain(); - if (domain) { - // we use AutoDiscovery instead of relying on the MatrixClient having already - // been fully configured and started - const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[ - FOCI_WK_KEY - ]; - const selectedTransport = Array.isArray(wellKnownFoci) - ? await getFirstUsableTransport(wellKnownFoci) - : null; - if (selectedTransport) { - logger.info("Using .well-known SFU", selectedTransport); - return selectedTransport; - } - } - - logger.info( - `makeTransport: No valid transport found via backend or .well-known, falling back to config if available.`, - ); - - // CONFIG: Least prioritized; Load from config file - const urlFromConf = Config.get().livekit?.livekit_service_url; - if (urlFromConf) { - try { - // This will call the jwt/sfu/get endpoint to pre create the livekit room. - logger.info("Using config SFU", urlFromConf); - return await doOpenIdAndJWTFromUrl(urlFromConf); - } catch (ex) { - if (ex instanceof FailToGetOpenIdToken) { - throw ex; - } - logger.error("Failed to validate config SFU", ex); - } - } - - // If we do not have returned a transport by now we throw an error - throw new MatrixRTCTransportMissingError(domain ?? ""); } diff --git a/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.test.ts b/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.test.ts new file mode 100644 index 00000000..9314b993 --- /dev/null +++ b/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.test.ts @@ -0,0 +1,233 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + beforeEach, + describe, + expect, + it, + type MockedObject, + vi, +} from "vitest"; +import { type IClientWellKnown, MatrixError } from "matrix-js-sdk"; +import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; +import { + type LivekitTransportConfig, + type Transport, +} from "matrix-js-sdk/lib/matrixrtc"; + +import type { ResolvedConfigOptions } from "../../../config/ConfigOptions.ts"; +import { + RtcTransportAutoDiscovery, + type RtcTransportAutoDiscoveryProps, +} from "./RtcTransportAutoDiscovery.ts"; + +type DiscoveryClient = RtcTransportAutoDiscoveryProps["client"]; + +const backendTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://backend.example.org", +}; + +const wellKnownTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://well-known.example.org", +}; + +function makeClient(): MockedObject { + return { + getDomain: vi.fn().mockReturnValue("example.org"), + baseUrl: "https://matrix.example.org", + _unstable_getRTCTransports: vi.fn().mockResolvedValue([]), + getAccessToken: vi.fn().mockReturnValue("access_token"), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + } as unknown as MockedObject; +} + +function makeResolvedConfig(livekitServiceUrl?: string): ResolvedConfigOptions { + return { + livekit: livekitServiceUrl + ? { + livekit_service_url: livekitServiceUrl, + } + : undefined, + } as ResolvedConfigOptions; +} + +function makeWellKnown(rtcFoci?: Transport[]): IClientWellKnown { + return { + "org.matrix.msc4143.rtc_foci": rtcFoci, + } as unknown as IClientWellKnown; +} + +describe("RtcTransportAutoDiscovery", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + const VALID_TEST_CASES: Array<{ transports: Transport[] }> = [ + { transports: [backendTransport] }, + // will pick the first livekit transport in the list, even if there are other non-livekit transports + { transports: [{ type: "not_livekit" }, backendTransport] }, + ]; + it.each(VALID_TEST_CASES)( + "prefers backend transport over well-known and app config $transports", + async ({ transports }) => { + // it("prefers backend transport over well-known and app config", async () => { + const client = makeClient(); + client._unstable_getRTCTransports.mockResolvedValue(transports); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue(makeWellKnown([wellKnownTransport])); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig("https://config.example.org"), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect( + discovery.discoverPreferredTransport(), + ).resolves.toStrictEqual(backendTransport); + + expect(client._unstable_getRTCTransports).toHaveBeenCalledTimes(1); + expect(wellKnownFetcher).not.toHaveBeenCalled(); + }, + ); + + it("Retries limit_exceeded backend transport over well-known", async () => { + const client = makeClient(); + client._unstable_getRTCTransports + .mockRejectedValueOnce( + new MatrixError( + { + errcode: "M_LIMIT_EXCEEDED", + error: "Too many requests", + retry_after_ms: 100, + }, + 429, + ), + ) + .mockResolvedValue([backendTransport]); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue(makeWellKnown([wellKnownTransport])); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig("https://config.example.org"), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual( + backendTransport, + ); + + expect(client._unstable_getRTCTransports).toHaveBeenCalledTimes(2); + expect(wellKnownFetcher).not.toHaveBeenCalled(); + }); + + const INVALID_TEST_CASES: Array<{ transports: Transport[] }> = [ + { transports: [] }, + { transports: [{ type: "not_livekit" }] }, + ]; + it.each(INVALID_TEST_CASES)( + "falls back to well-known when backend has no (valid) livekit transports $transports", + async ({ transports }) => { + const client = makeClient(); + client._unstable_getRTCTransports.mockResolvedValue(transports); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue(makeWellKnown([wellKnownTransport])); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig("https://config.example.org"), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect( + discovery.discoverPreferredTransport(), + ).resolves.toStrictEqual(wellKnownTransport); + + expect(wellKnownFetcher).toHaveBeenCalledWith("example.org"); + }, + ); + + it("skips backend discovery in widget mode and uses well-known", async () => { + const client = makeClient(); + // widget mode is detected by the absence of an access token + client.getAccessToken.mockReturnValue(null); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue(makeWellKnown([wellKnownTransport])); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig("https://config.example.org"), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual( + wellKnownTransport, + ); + + expect(client._unstable_getRTCTransports).not.toHaveBeenCalled(); + expect(wellKnownFetcher).toHaveBeenCalledWith("example.org"); + }); + + it("falls back to app config when backend fails and well-known has no rtc_foci", async () => { + const client = makeClient(); + client._unstable_getRTCTransports.mockRejectedValue( + new MatrixError({ errcode: "M_UNKNOWN" }, 404), + ); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue({} as IClientWellKnown); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig("https://config.example.org"), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual( + { + type: "livekit", + livekit_service_url: "https://config.example.org", + }, + ); + }); + + it("returns null when backend, well-known and config are all unavailable", async () => { + const client = makeClient(); + client._unstable_getRTCTransports.mockResolvedValue([]); + + const wellKnownFetcher = vi + .fn<(domain: string) => Promise>() + .mockResolvedValue({} as IClientWellKnown); + + const discovery = new RtcTransportAutoDiscovery({ + client, + resolvedConfig: makeResolvedConfig(undefined), + wellKnownFetcher, + logger: rootLogger, + }); + + await expect(discovery.discoverPreferredTransport()).resolves.toBeNull(); + }); +}); diff --git a/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.ts b/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.ts new file mode 100644 index 00000000..6d2bac46 --- /dev/null +++ b/src/state/CallViewModel/localMember/RtcTransportAutoDiscovery.ts @@ -0,0 +1,172 @@ +/* +Copyright 2026 Element Creations Ltd. + +SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ +import { + isLivekitTransportConfig, + type LivekitTransportConfig, +} from "matrix-js-sdk/lib/matrixrtc"; +import { type IClientWellKnown, type MatrixClient } from "matrix-js-sdk"; +import { type Logger } from "matrix-js-sdk/lib/logger"; + +import type { ResolvedConfigOptions } from "../../../config/ConfigOptions.ts"; +import { doNetworkOperationWithRetry } from "../../../utils/matrix.ts"; + +type TransportDiscoveryClient = Pick< + MatrixClient, + "getDomain" | "_unstable_getRTCTransports" | "getAccessToken" +>; + +export interface RtcTransportAutoDiscoveryProps { + client: TransportDiscoveryClient; + resolvedConfig: ResolvedConfigOptions; + wellKnownFetcher: (domain: string) => Promise; + logger: Logger; +} + +export class RtcTransportAutoDiscovery { + private readonly client: TransportDiscoveryClient; + private readonly resolvedConfig: ResolvedConfigOptions; + private readonly wellKnownFetcher: ( + domain: string, + ) => Promise; + private readonly logger: Logger; + + public constructor({ + client, + resolvedConfig, + wellKnownFetcher, + logger, + }: RtcTransportAutoDiscoveryProps) { + this.client = client; + this.resolvedConfig = resolvedConfig; + this.wellKnownFetcher = wellKnownFetcher; + this.logger = logger.getChild("[RtcTransportAutoDiscovery]"); + } + + public async discoverPreferredTransport(): Promise { + // 1) backend transports + const backendTransport = await this.tryBackendTransports(); + if (backendTransport) { + this.logger.info( + `Found backend transport: ${backendTransport.livekit_service_url}`, + ); + return backendTransport; + } + + this.logger.info("No backend transport found, falling back to well-known"); + // 2) .well-known transports + const wellKnownTransport = await this.tryWellKnownTransports(); + if (wellKnownTransport) { + this.logger.info( + `Found .well-known transport: ${wellKnownTransport.livekit_service_url}`, + ); + return wellKnownTransport; + } + + this.logger.info( + "No .well-known transport found, falling back to app config", + ); + + // 3) app config URL + const configTransport = this.tryConfigTransport(); + if (configTransport) { + this.logger.info( + `Found app config transport: ${configTransport.livekit_service_url}`, + ); + return configTransport; + } + + return null; + } + + /** + * Fetches the first rtc_foci from the backend. + * This will not throw errors, but instead just log them and return null if the expected config is not found or malformed. + * @private + */ + private async tryBackendTransports(): Promise { + const client = this.client; + // MSC4143: Attempt to fetch transports from backend. + // TODO: Workaround for an issue in the js-sdk RoomWidgetClient that + // is not yet implementing _unstable_getRTCTransports properly (via widget API new action). + // For now we just skip this call if we are in a widget. + // In widget mode the client is a `RoomWidgetClient` which has no access token (it is using the widget API). + // Could be removed once the js-sdk is fixed (https://github.com/matrix-org/matrix-js-sdk/issues/5245) + const isSPA = !!client.getAccessToken(); + if (isSPA && "_unstable_getRTCTransports" in client) { + this.logger.info("First try to use getRTCTransports end point ..."); + try { + const transportList = await doNetworkOperationWithRetry(async () => + client._unstable_getRTCTransports(), + ); + const first = transportList.filter(isLivekitTransportConfig)[0]; + if (first) { + return first; + } else { + this.logger.info( + `No livekit transport found in getRTCTransports end point`, + transportList, + ); + } + } catch (ex) { + this.logger.info(`Failed to use getRTCTransports end point: ${ex}`); + } + } else { + this.logger.debug(`getRTCTransports end point not available`); + } + + return null; + } + + /** + * Fetches the first rtc_foci from the .well-known/matrix/client. + * This will not throw errors, but instead just log them and return null if the expected config is not found or malformed. + * @private + */ + private async tryWellKnownTransports(): Promise { + // Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available. + const client = this.client; + const domain = client.getDomain(); + if (domain) { + // we use AutoDiscovery instead of relying on the MatrixClient having already + // been fully configured and started + + const wellKnownFoci = await this.wellKnownFetcher(domain); + + const fociConfig = wellKnownFoci["org.matrix.msc4143.rtc_foci"]; + if (fociConfig) { + if (!Array.isArray(fociConfig)) { + this.logger.warn( + `org.matrix.msc4143.rtc_foci is not an array in .well-known`, + ); + } else { + return fociConfig[0]; + } + } else { + this.logger.info( + `No .well-known "org.matrix.msc4143.rtc_foci" found for ${domain}`, + wellKnownFoci, + ); + } + } else { + // Should never happen, but just in case + this.logger.warn(`No domain configured for client`); + } + + return null; + } + + private tryConfigTransport(): LivekitTransportConfig | null { + const url = this.resolvedConfig.livekit?.livekit_service_url; + if (url) { + return { + type: "livekit", + livekit_service_url: url, + }; + } + return null; + } +} diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 2c89eef5..7ad5d775 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -8,6 +8,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, + beforeEach, describe, expect, it, @@ -151,6 +152,19 @@ afterEach(() => { }); describe("Start connection states", () => { + beforeEach(() => { + fetchMock.post( + `https://matrix-rtc.example.org/livekit/jwt/get_token`, + () => { + return { + // Return a non-retryable error, if not, the retry logic will + // wait and fail the test with a timeout. + status: 404, + }; + }, + ); + }); + it("start in initialized state", () => { setupTest();