Merge pull request #3838 from element-hq/valere/refactor_transport_discovery

Refactor LocalTransport discovery and validation
This commit is contained in:
Robin
2026-04-06 13:24:16 +02:00
committed by GitHub
8 changed files with 801 additions and 375 deletions

View File

@@ -19,12 +19,14 @@ import fetchMock from "fetch-mock";
import { getSFUConfigWithOpenID, type OpenIDClientParts } from "./openIDSFU";
import { testJWTToken } from "../utils/test-fixtures";
import { ownMemberMock } from "../utils/test";
import { FailToGetOpenIdToken } from "../utils/errors";
const sfuUrl = "https://sfu.example.org";
describe("getSFUConfigWithOpenID", () => {
let matrixClient: MockedObject<OpenIDClientParts>;
beforeEach(() => {
fetchMock.catch(404);
matrixClient = {
getOpenIdToken: vitest.fn(),
getDeviceId: vitest.fn(),
@@ -71,9 +73,10 @@ describe("getSFUConfigWithOpenID", () => {
"https://sfu.example.org",
"!example_room_id",
);
} catch (ex) {
expect((ex as Error).message).toEqual(
"SFU Config fetch failed with status code 500",
} catch (ex: unknown) {
expect(ex).toBeInstanceOf(FailToGetOpenIdToken);
expect((ex as FailToGetOpenIdToken).cause).toEqual(
new Error("SFU Config fetch failed with status code 500"),
);
void (await fetchMock.flush());
return;
@@ -106,8 +109,9 @@ describe("getSFUConfigWithOpenID", () => {
},
);
} catch (ex) {
expect((ex as Error).message).toEqual(
"SFU Config fetch failed with status code 500",
expect(ex).toBeInstanceOf(FailToGetOpenIdToken);
expect((ex as FailToGetOpenIdToken).cause).toEqual(
new Error("SFU Config fetch failed with status code 500"),
);
void (await fetchMock.flush());
}
@@ -160,8 +164,9 @@ describe("getSFUConfigWithOpenID", () => {
},
);
} catch (ex) {
expect((ex as Error).message).toEqual(
"SFU Config fetch failed with status code 500",
expect(ex).toBeInstanceOf(FailToGetOpenIdToken);
expect((ex as FailToGetOpenIdToken).cause).toEqual(
new Error("SFU Config fetch failed with status code 500"),
);
void (await fetchMock.flush());
}

View File

@@ -5,11 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
retryNetworkOperation,
type IOpenIDToken,
type MatrixClient,
} from "matrix-js-sdk";
import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type Logger } from "matrix-js-sdk/lib/logger";
@@ -70,6 +66,7 @@ export type OpenIDClientParts = Pick<
MatrixClient,
"getOpenIdToken" | "getDeviceId"
>;
/**
* Gets a bearer token from the homeserver and then use it to authenticate
* to the matrix RTC backend in order to get acces to the SFU.
@@ -113,9 +110,6 @@ export async function getSFUConfigWithOpenID(
);
}
logger?.debug("Got openID token", openIdToken);
logger?.info(`Trying to get JWT for focus ${serviceUrl}...`);
let sfuConfig: { url: string; jwt: string } | undefined;
const tryBothJwtEndpoints = opts?.forceJwtEndpoint === undefined; // This is for SFUs where we do not publish.
@@ -127,7 +121,10 @@ export async function getSFUConfigWithOpenID(
// if we can use both or if we are forced to use the new one.
if (tryBothJwtEndpoints || forceMatrix2Jwt) {
try {
sfuConfig = await getLiveKitJWTWithDelayDelegation(
logger?.info(
`Trying to get JWT with delegation for focus ${serviceUrl}...`,
);
const sfuConfig = await getLiveKitJWTWithDelayDelegation(
membership,
serviceUrl,
roomId,
@@ -135,33 +132,24 @@ export async function getSFUConfigWithOpenID(
opts?.delayEndpointBaseUrl,
opts?.delayId,
);
logger?.info(`Got JWT from call's active focus URL.`);
return extractFullConfigFromToken(sfuConfig);
} catch (e) {
logger?.debug(`Failed fetching jwt with matrix 2.0 endpoint:`, e);
if (e instanceof NotSupportedError) {
logger?.warn(
`Failed fetching jwt with matrix 2.0 endpoint (retry with legacy) Not supported`,
e,
);
sfuConfig = undefined;
} else {
logger?.warn(
`Failed fetching jwt with matrix 2.0 endpoint other issues ->`,
`(not going to try with legacy endpoint: forceOldJwtEndpoint is set to false, we did not get a not supported error from the sfu)`,
e,
);
// Make this throw a hard error in case we force the matrix2.0 endpoint.
if (forceMatrix2Jwt)
throw new NoMatrix2AuthorizationService(e as Error);
// NEVER get bejond this point if we forceMatrix2 and it failed!
// Make this throw a hard error in case we force the matrix2.0 endpoint.
if (forceMatrix2Jwt) {
throw new NoMatrix2AuthorizationService(e as Error);
}
}
}
// DEPRECATED
// here we either have a sfuConfig or we alredy exited because of `if (forceMatrix2) throw ...`
// here we either have a sfuConfig or we already exited because of `if (forceMatrix2) throw ...`
// The only case we can get into this condition is, if `forceMatrix2` is `false`
if (sfuConfig === undefined) {
try {
logger?.info(
`Trying to get JWT with legacy endpoint for focus ${serviceUrl}...`,
);
sfuConfig = await getLiveKitJWT(
membership.deviceId,
serviceUrl,
@@ -169,15 +157,19 @@ export async function getSFUConfigWithOpenID(
openIdToken,
);
logger?.info(`Got JWT from call's active focus URL.`);
return extractFullConfigFromToken(sfuConfig);
} catch (ex) {
throw new FailToGetOpenIdToken(
ex instanceof Error ? ex : new Error(`Unknown error ${ex}`),
);
}
}
if (!sfuConfig) {
throw new Error("No `sfuConfig` after trying with old and new endpoints");
}
// Pull the details from the JWT
function extractFullConfigFromToken(sfuConfig: {
url: string;
jwt: string;
}): SFUConfig {
const [, payloadStr] = sfuConfig.jwt.split(".");
// TODO: Prefer Uint8Array.fromBase64 when widely available
const payload = JSON.parse(global.atob(payloadStr)) as SFUJWTPayload;
return {
jwt: sfuConfig.jwt,
@@ -189,16 +181,15 @@ export async function getSFUConfigWithOpenID(
livekitIdentity: payload.sub,
};
}
const RETRIES = 4;
async function getLiveKitJWT(
deviceId: string,
livekitServiceURL: string,
matrixRoomId: string,
openIDToken: IOpenIDToken,
): Promise<{ url: string; jwt: string }> {
let res: Response | undefined;
await retryNetworkOperation(RETRIES, async () => {
res = await fetch(livekitServiceURL + "/sfu/get", {
const res = await doNetworkOperationWithRetry(async () => {
return await fetch(livekitServiceURL + "/sfu/get", {
method: "POST",
headers: {
"Content-Type": "application/json",
@@ -211,11 +202,7 @@ async function getLiveKitJWT(
}),
});
});
if (!res) {
throw new Error(
`Network error while connecting to jwt service after ${RETRIES} retries`,
);
}
if (!res.ok) {
throw new Error("SFU Config fetch failed with status code " + res.status);
}
@@ -262,10 +249,8 @@ export async function getLiveKitJWTWithDelayDelegation(
};
}
let res: Response | undefined;
await retryNetworkOperation(RETRIES, async () => {
res = await fetch(livekitServiceURL + "/get_token", {
const res = await doNetworkOperationWithRetry(async () => {
return await fetch(livekitServiceURL + "/get_token", {
method: "POST",
headers: {
"Content-Type": "application/json",
@@ -274,11 +259,6 @@ export async function getLiveKitJWTWithDelayDelegation(
});
});
if (!res) {
throw new Error(
`Network error while connecting to jwt service after ${RETRIES} retries`,
);
}
if (!res.ok) {
const msg = "SFU Config fetch failed with status code " + res.status;
if (res.status === 404) {

View File

@@ -131,6 +131,9 @@ export function withCallViewModel(mode: MatrixRTCMode) {
public getSyncState(): SyncState {
return syncState;
}
public getAccessToken(): string | null {
return "a-token";
}
})() as Partial<MatrixClient> as MatrixClient,
getMembers: () => roomMembers,
getMembersWithMembership: () => roomMembers,

View File

@@ -19,7 +19,7 @@ import {
type CallMembership,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, lastValueFrom } from "rxjs";
import { BehaviorSubject, filter, lastValueFrom } from "rxjs";
import fetchMock from "fetch-mock";
import {
@@ -27,8 +27,13 @@ import {
flushPromises,
ownMemberMock,
mockRtcMembership,
testScope,
} from "../../../utils/test";
import { createLocalTransport$, JwtEndpointVersion } from "./LocalTransport";
import {
createLocalTransport$,
JwtEndpointVersion,
type LocalTransportWithSFUConfig,
} from "./LocalTransport";
import { constant } from "../../Behavior";
import { Epoch, ObservableScope, trackEpoch } from "../../ObservableScope";
import {
@@ -47,13 +52,11 @@ describe("LocalTransport", () => {
livekitIdentity: "@lk_user:ABCDEF",
};
let scope: ObservableScope;
beforeEach(() => (scope = new ObservableScope()));
afterEach(() => scope.end());
beforeEach(() => vi.clearAllMocks());
it("throws if config is missing", async () => {
const { advertised$, active$ } = createLocalTransport$({
scope,
scope: testScope(),
roomId: "!room:example.org",
useOldestMember: false,
memberships$: constant(new Epoch<CallMembership[]>([])),
@@ -140,7 +143,7 @@ describe("LocalTransport", () => {
);
const { advertised$, active$ } = createLocalTransport$({
scope,
scope: testScope(),
roomId: "!room:example.org",
useOldestMember: false,
memberships$: constant(new Epoch<CallMembership[]>([])),
@@ -211,6 +214,7 @@ describe("LocalTransport", () => {
// Initially, Alice is the only member
const memberships$ = new BehaviorSubject([aliceMembership]);
const scope = testScope();
const { advertised$, active$ } = createLocalTransport$({
scope,
roomId: "!example_room_id",
@@ -267,6 +271,7 @@ describe("LocalTransport", () => {
// Initially, there are no members
const memberships$ = new BehaviorSubject<CallMembership[]>([]);
const scope = testScope();
const { advertised$, active$ } = createLocalTransport$({
scope,
roomId: "!example_room_id",
@@ -317,7 +322,7 @@ describe("LocalTransport", () => {
customLivekitUrl.setValue(customLivekitUrl.defaultValue);
localTransportOpts = {
ownMembershipIdentity: ownMemberMock,
scope,
scope: testScope(),
roomId: "!example_room_id",
useOldestMember: false,
forceJwtEndpoint: JwtEndpointVersion.Legacy,
@@ -511,7 +516,7 @@ describe("LocalTransport", () => {
it("throws if no options are available", async () => {
const { advertised$, active$ } = createLocalTransport$({
scope,
scope: testScope(),
ownMembershipIdentity: ownMemberMock,
roomId: "!example_room_id",
useOldestMember: false,
@@ -539,4 +544,86 @@ describe("LocalTransport", () => {
);
});
});
it("should not update advertised/active transport on delayID changes, but delay Id delegation should be called", async () => {
// For simplicity, we'll just use the config livekit
customLivekitUrl.setValue("https://lk.example.org");
const authCallSpy = vi
.spyOn(openIDSFU, "getSFUConfigWithOpenID")
.mockResolvedValue(openIdResponse);
const delayId$ = new BehaviorSubject<string | null>(null);
const { advertised$, active$ } = createLocalTransport$({
scope: testScope(),
ownMembershipIdentity: ownMemberMock,
roomId: "!example_room_id",
// We want multi-sdu
useOldestMember: false,
forceJwtEndpoint: JwtEndpointVersion.Legacy,
delayId$: delayId$,
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
getDomain: () => "",
baseUrl: "https://example.org",
// eslint-disable-next-line @typescript-eslint/naming-convention
_unstable_getRTCTransports: async () => Promise.resolve([]),
getAccessToken: vi.fn().mockReturnValue("access_token"),
// These won't be called in this error path but satisfy the type
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
const advertisedValues: LivekitTransportConfig[] = [];
const activeValues: LocalTransportWithSFUConfig[] = [];
advertised$
.pipe(filter((v) => v !== null))
.subscribe((t) => advertisedValues.push(t));
active$
.pipe(filter((v) => v !== null))
.subscribe((t) => activeValues.push(t));
await flushPromises();
// we have now an active and an advertised
expect(advertisedValues.length).toEqual(1);
expect(activeValues.length).toEqual(1);
expect(advertisedValues[0]!.livekit_service_url).toEqual(
"https://lk.example.org",
);
expect(activeValues[0]!.transport.livekit_service_url).toEqual(
"https://lk.example.org",
);
expect(authCallSpy).toHaveBeenCalledTimes(2);
// Now emits 3 new delays id
delayId$.next("delay_id_1");
await flushPromises();
delayId$.next("delay_id_2");
await flushPromises();
delayId$.next("delay_id_3");
await flushPromises();
// No new emissions should've happened, it is the same transport.
expect(advertisedValues.length).toEqual(1);
expect(activeValues.length).toEqual(1);
// Still we should have updated the delayID to auth
expect(authCallSpy).toHaveBeenCalledTimes(
4 * 2 /* 2 calls for each delayId ?? why */,
);
expect(authCallSpy).toHaveBeenLastCalledWith(
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
expect.objectContaining({
delayId: "delay_id_3",
}),
expect.anything(),
);
});
});

View File

@@ -8,16 +8,18 @@ Please see LICENSE in the repository root for full details.
import {
type CallMembership,
isLivekitTransportConfig,
type Transport,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import { type MatrixClient } from "matrix-js-sdk";
import {
catchError,
combineLatest,
distinctUntilChanged,
first,
from,
map,
merge,
type Observable,
of,
startWith,
switchMap,
@@ -42,6 +44,7 @@ import {
} from "../../../livekit/openIDSFU.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
import { customLivekitUrl } from "../../../settings/settings.ts";
import { RtcTransportAutoDiscovery } from "./RtcTransportAutoDiscovery.ts";
const logger = rootLogger.getChild("[LocalTransport]");
@@ -137,11 +140,116 @@ export const createLocalTransport$ = ({
forceJwtEndpoint,
delayId$,
}: Props): LocalTransport => {
/**
* The LiveKit transport in use by the oldest RTC membership. `null` when the
* oldest member has no such transport.
*/
const oldestMemberTransport$ = scope.behavior<LivekitTransportConfig | null>(
// The LiveKit transport in use by the oldest RTC membership. `null` when the
// oldest member has no such transport.
const oldestMemberTransport$ = observerOldestMembership$(scope, memberships$);
const transportDiscovery = new RtcTransportAutoDiscovery({
client: client,
resolvedConfig: Config.get(),
wellKnownFetcher: AutoDiscovery.getRawClientConfig.bind(AutoDiscovery),
logger: logger,
});
// Get the preferred transport from the current deployment.
const discoveredTransport$ = from(
transportDiscovery.discoverPreferredTransport(),
);
const preferredConfig$ = customLivekitUrl.value$
.pipe(
switchMap((customUrl) => {
if (customUrl) {
return of({
type: "livekit",
livekit_service_url: customUrl,
} as LivekitTransportConfig);
} else {
return discoveredTransport$;
}
}),
)
.pipe(
map((config) => {
if (!config) {
// Bubbled up from the preferredConfig$ observable.
throw new MatrixRTCTransportMissingError(client.getDomain() ?? "");
}
return config;
}),
distinctUntilChanged(areLivekitTransportsEqual),
);
const preferredTransport$ = combineLatest([preferredConfig$, delayId$]).pipe(
switchMap(async ([transport, delayId]) => {
try {
return await doOpenIdAndJWTFromUrl(
transport,
forceJwtEndpoint,
ownMembershipIdentity,
roomId,
client,
delayId ?? undefined,
);
} catch (e) {
logger.error(
`Failed to authenticate to transport ${transport.livekit_service_url}`,
e,
);
throw mapAuthErrorToUserFriendlyError(e);
}
}),
);
if (useOldestMember) {
return observeLocalTransportForOldestMembership(
scope,
oldestMemberTransport$,
preferredTransport$,
client,
ownMembershipIdentity,
roomId,
);
}
// --- Multi-SFU mode ---
// Always publish on and advertise the preferred transport.
return {
advertised$: scope.behavior(
preferredTransport$.pipe(
map((t) => t.transport),
distinctUntilChanged(areLivekitTransportsEqual),
),
null,
),
active$: scope.behavior(
preferredTransport$.pipe(
// XXX: WORK AROUND due to a reconnection glitch.
// To remove when we have a proper way to refresh the delegation event ID without refreshing
// the whole credentials.
// We deliberately hide any changes to the SFU config because we
// do not want the app to reconnect whenever the JWT
// token changes due to us delegating a new delayed event. The
// initial SFU config for the transport is all the app needs.
distinctUntilChanged((prev, next) =>
areLivekitTransportsEqual(prev.transport, next.transport),
),
),
null,
),
};
};
/**
* Observes the oldest member in the room and returns the transport that it uses if it is a livekit transport.
* @param scope - The observable scope.
* @param memberships$ - The observable of the call's memberships.'
*/
function observerOldestMembership$(
scope: ObservableScope,
memberships$: Behavior<Epoch<CallMembership[]>>,
): Behavior<LivekitTransportConfig | null> {
return scope.behavior<LivekitTransportConfig | null>(
memberships$.pipe(
map((memberships) => {
const oldestMember = memberships.value[0];
@@ -170,317 +278,141 @@ export const createLocalTransport$ = ({
distinctUntilChanged(areLivekitTransportsEqual),
),
);
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps). `null` until fetched and
* validated.
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
const preferredTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
// preferredTransport$ (used for multi sfu) needs to know if we are using the old or new
// jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity
// differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`)
// When using sticky events (we need to use the new endpoint).
customLivekitUrl.value$.pipe(
switchMap((customUrl) =>
startWith<LocalTransportWithSFUConfig | null>(null)(
// Fetch the SFU config, and repeat this asynchronously for every
// change in delay ID.
delayId$.pipe(
switchMap(async (delayId) => {
logger.info(
"Creating preferred transport based on: ",
"customUrl: ",
customUrl,
"delayId: ",
delayId,
"forceJwtEndpoint: ",
forceJwtEndpoint,
);
return makeTransport(
client,
ownMembershipIdentity,
roomId,
customUrl,
forceJwtEndpoint,
delayId ?? undefined,
);
}),
// We deliberately hide any changes to the SFU config because we
// do not actually want the app to reconnect whenever the JWT
// token changes due to us delegating a new delayed event. The
// initial SFU config for the transport is all the app needs.
distinctUntilChanged((prev, next) =>
areLivekitTransportsEqual(prev.transport, next.transport),
),
),
),
),
),
);
if (useOldestMember) {
// --- Oldest member mode ---
return {
// Never update the transport that we advertise in our membership. Just
// take the first valid oldest member or preferred transport that we learn
// about, and stick with that. This avoids unnecessary SFU hops and room
// state changes.
advertised$: scope.behavior(
merge(
oldestMemberTransport$,
preferredTransport$.pipe(map((t) => t?.transport ?? null)),
).pipe(
first((t) => t !== null),
tap((t) =>
logger.info(`Advertise transport: ${t.livekit_service_url}`),
),
),
null,
),
// Publish on the transport used by the oldest member.
active$: scope.behavior(
oldestMemberTransport$.pipe(
switchMap((transport) => {
// Oldest member not available (or invalid SFU config).
if (transport === null) return of(null);
// Oldest member available: fetch the SFU config.
const fetchOldestMemberTransport =
async (): Promise<LocalTransportWithSFUConfig> => ({
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
});
return from(fetchOldestMemberTransport()).pipe(startWith(null));
}),
tap((t) =>
logger.info(
`Publish on transport: ${t?.transport.livekit_service_url}`,
),
),
),
),
};
}
// --- Multi-SFU mode ---
// Always publish on and advertise the preferred transport.
return {
advertised$: scope.behavior(
preferredTransport$.pipe(
map((t) => t?.transport ?? null),
distinctUntilChanged(areLivekitTransportsEqual),
),
),
active$: preferredTransport$,
};
};
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
}
/**
* Determine the correct Transport for the current session, including
* validating auth against the service to ensure it's correct.
* Prefers in order:
* Utility to ensure the user can authenticate with the SFU.
* We will call `getSFUConfigWithOpenID` once per transport here as it's our
* only mechanism of validation. This means we will also ask the
* homeserver for a OpenID token a few times. Since OpenID tokens are single
* use we don't want to risk any issues by re-using a token.
*
* 1. The `urlFromDevSettings` value. If this cannot be validated, the function will throw.
* 2. The transports returned via the homeserver.
* 3. The transports returned via .well-known.
* 4. The transport configured in Element Call's config.
* @param transport The transport to authenticate with.
* @param forceJwtEndpoint Whether to force the JWT endpoint to be used.
* @param membership The identity of the local member.
* @param roomId The room ID to use for the JWT.
* @param client The client to use for the OpenID token.
* @param delayId The delayId to use for the JWT.
*
* @param client The authenticated Matrix client for the current user
* @param membership The membership identity of the user.
* @param roomId The ID of the room to be connected to.
* @param urlFromDevSettings Override URL provided by the user's local config.
* @param forceJwtEndpoint Whether to force a specific JWT endpoint
* - `Legacy` / `Matrix_2_0`
* - `get_token` / `sfu/get`
* - not hashing / hashing the backendIdentity
* @param delayId the delay id passed to the jwt service.
*
* @returns A fully validated transport config.
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
* @throws FailToGetOpenIdToken, NoMatrix2AuthorizationService
*/
async function makeTransport(
async function doOpenIdAndJWTFromUrl(
transport: LivekitTransportConfig,
forceJwtEndpoint: JwtEndpointVersion,
membership: CallMembershipIdentityParts,
roomId: string,
client: Pick<
MatrixClient,
"getDomain" | "baseUrl" | "_unstable_getRTCTransports" | "getAccessToken"
> &
OpenIDClientParts,
membership: CallMembershipIdentityParts,
roomId: string,
urlFromDevSettings: string | null,
forceJwtEndpoint: JwtEndpointVersion,
delayId?: string,
): Promise<LocalTransportWithSFUConfig> {
logger.trace("Searching for a preferred transport");
async function doOpenIdAndJWTFromUrl(
url: string,
): Promise<LocalTransportWithSFUConfig> {
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
delayEndpointBaseUrl: client.baseUrl,
delayId,
},
logger,
);
return {
transport: {
type: "livekit",
livekit_service_url: url,
},
sfuConfig,
};
}
// We will call `getSFUConfigWithOpenID` once per transport here as it's our
// only mechanism of valiation. This means we will also ask the
// homeserver for a OpenID token a few times. Since OpenID tokens are single
// use we don't want to risk any issues by re-using a token.
//
// If the OpenID request were to fail then it's acceptable for us to fail
// this function early, as we assume the homeserver has got some problems.
// DEVTOOL: Highest priority: Load from devtool setting
if (urlFromDevSettings !== null) {
// Validate that the SFU is up. Otherwise, we want to fail on this
// as we don't permit other SFUs.
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using LiveKit transport from dev tools: ", urlFromDevSettings);
return await doOpenIdAndJWTFromUrl(urlFromDevSettings);
}
async function getFirstUsableTransport(
transports: Transport[],
): Promise<LocalTransportWithSFUConfig | null> {
for (const potentialTransport of transports) {
if (isLivekitTransportConfig(potentialTransport)) {
try {
logger.info(
`makeTransport: check transport authentication for "${potentialTransport.livekit_service_url}"`,
);
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
return await doOpenIdAndJWTFromUrl(
potentialTransport.livekit_service_url,
);
} catch (ex) {
logger.debug(
`makeTransport: Could not use SFU service "${potentialTransport.livekit_service_url}" as SFU`,
ex,
);
// Explictly throw these
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
if (ex instanceof NoMatrix2AuthorizationService) {
throw ex;
}
}
} else {
logger.info(
`makeTransport: "${potentialTransport.livekit_service_url}" is not a valid livekit transport as SFU`,
);
}
}
return null;
}
// MSC4143: Attempt to fetch transports from backend.
// TODO: Workaround for an issue in the js-sdk RoomWidgetClient that
// is not yet implementing _unstable_getRTCTransports properly (via widget API new action).
// For now we just skip this call if we are in a widget.
// In widget mode the client is a `RoomWidgetClient` which has no access token (it is using the widget API).
// Could be removed once the js-sdk is fixed (https://github.com/matrix-org/matrix-js-sdk/issues/5245)
const isSPA = !!client.getAccessToken();
if (isSPA && "_unstable_getRTCTransports" in client) {
logger.info(
"makeTransport: First try to use getRTCTransports end point ...",
);
try {
// TODO This should also check for server support?
const transportList = await client._unstable_getRTCTransports();
const selectedTransport = await getFirstUsableTransport(transportList);
if (selectedTransport) {
logger.info(
"makeTransport: ...Using backend-configured (client.getRTCTransports) SFU",
selectedTransport,
);
return selectedTransport;
}
} catch (ex) {
if (ex instanceof MatrixError && ex.httpStatus === 404) {
// Expected, this is an unstable endpoint and it's not required.
// There will be expected 404 errors in the console. When we check if synapse supports the endpoint.
logger.debug(
"Matrix homeserver does not provide any RTC transports via `/rtc/transports` (will retry with well-known.)",
);
} else if (ex instanceof FailToGetOpenIdToken) {
throw ex;
} else {
// We got an error that wasn't just missing support for the feature, so log it loudly.
logger.error(
"Unexpected error fetching RTC transports from backend",
ex,
);
}
}
}
logger.info(
`makeTransport: Trying to get transports from .well-known/matrix/client on domain ${client.getDomain()} ...`,
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
transport.livekit_service_url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
delayEndpointBaseUrl: client.baseUrl,
delayId,
},
logger,
);
return {
transport,
sfuConfig,
};
}
function observeLocalTransportForOldestMembership(
scope: ObservableScope,
oldestMemberTransport$: Behavior<LivekitTransportConfig | null>,
preferredTransport$: Observable<LocalTransportWithSFUConfig>,
client: Pick<
MatrixClient,
"getDomain" | "baseUrl" | "_unstable_getRTCTransports" | "getAccessToken"
> &
OpenIDClientParts,
ownMembershipIdentity: CallMembershipIdentityParts,
roomId: string,
): LocalTransport {
// Ensure we can authenticate with the SFU.
const authenticatedOldestMemberTransport$ = oldestMemberTransport$.pipe(
switchMap((transport) => {
// Oldest member not available -we are first- (or invalid SFU config).
if (transport === null) return of(null);
// Whenever there is transport change we want to revert
// to no transport while we do the authentication.
// So do a from(promise) here to be able to startWith(null)
return from(
doOpenIdAndJWTFromUrl(
transport,
JwtEndpointVersion.Legacy,
ownMembershipIdentity,
roomId,
client,
undefined,
),
).pipe(
catchError((e: unknown) => {
logger.error(
`Failed to authenticate to transport ${transport.livekit_service_url}`,
e,
);
throw mapAuthErrorToUserFriendlyError(e);
}),
startWith(null),
);
}),
);
// --- Oldest member mode ---
return {
// Never update the transport that we advertise in our membership. Just
// take the first valid oldest member or preferred transport that we learn
// about, and stick with that. This avoids unnecessary SFU hops and room
// state changes.
advertised$: scope.behavior(
merge(
authenticatedOldestMemberTransport$.pipe(
map((t) => t?.transport ?? null),
),
preferredTransport$.pipe(map((t) => t.transport)),
).pipe(
first((t) => t !== null),
tap((t) =>
logger.info(`Advertise transport: ${t.livekit_service_url}`),
),
),
null,
),
// Publish on the transport used by the oldest member.
active$: scope.behavior(
authenticatedOldestMemberTransport$.pipe(
tap((t) =>
logger.info(
`Publish on transport: ${t?.transport.livekit_service_url}`,
),
),
),
null,
),
};
}
function mapAuthErrorToUserFriendlyError(e: unknown): Error {
if (
e instanceof FailToGetOpenIdToken ||
e instanceof NoMatrix2AuthorizationService
) {
// rethrow as is
return e;
}
// Catch others and rethrow as FailToGetOpenIdToken that has user friendly message.
return new FailToGetOpenIdToken(
e instanceof Error ? e : new Error(String(e)),
);
// Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available.
const domain = client.getDomain();
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[
FOCI_WK_KEY
];
const selectedTransport = Array.isArray(wellKnownFoci)
? await getFirstUsableTransport(wellKnownFoci)
: null;
if (selectedTransport) {
logger.info("Using .well-known SFU", selectedTransport);
return selectedTransport;
}
}
logger.info(
`makeTransport: No valid transport found via backend or .well-known, falling back to config if available.`,
);
// CONFIG: Least prioritized; Load from config file
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
try {
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using config SFU", urlFromConf);
return await doOpenIdAndJWTFromUrl(urlFromConf);
} catch (ex) {
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
logger.error("Failed to validate config SFU", ex);
}
}
// If we do not have returned a transport by now we throw an error
throw new MatrixRTCTransportMissingError(domain ?? "");
}

View File

@@ -0,0 +1,233 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
beforeEach,
describe,
expect,
it,
type MockedObject,
vi,
} from "vitest";
import { type IClientWellKnown, MatrixError } from "matrix-js-sdk";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import {
type LivekitTransportConfig,
type Transport,
} from "matrix-js-sdk/lib/matrixrtc";
import type { ResolvedConfigOptions } from "../../../config/ConfigOptions.ts";
import {
RtcTransportAutoDiscovery,
type RtcTransportAutoDiscoveryProps,
} from "./RtcTransportAutoDiscovery.ts";
type DiscoveryClient = RtcTransportAutoDiscoveryProps["client"];
const backendTransport: LivekitTransportConfig = {
type: "livekit",
livekit_service_url: "https://backend.example.org",
};
const wellKnownTransport: LivekitTransportConfig = {
type: "livekit",
livekit_service_url: "https://well-known.example.org",
};
function makeClient(): MockedObject<DiscoveryClient> {
return {
getDomain: vi.fn().mockReturnValue("example.org"),
baseUrl: "https://matrix.example.org",
_unstable_getRTCTransports: vi.fn().mockResolvedValue([]),
getAccessToken: vi.fn().mockReturnValue("access_token"),
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
} as unknown as MockedObject<DiscoveryClient>;
}
function makeResolvedConfig(livekitServiceUrl?: string): ResolvedConfigOptions {
return {
livekit: livekitServiceUrl
? {
livekit_service_url: livekitServiceUrl,
}
: undefined,
} as ResolvedConfigOptions;
}
function makeWellKnown(rtcFoci?: Transport[]): IClientWellKnown {
return {
"org.matrix.msc4143.rtc_foci": rtcFoci,
} as unknown as IClientWellKnown;
}
describe("RtcTransportAutoDiscovery", () => {
beforeEach(() => {
vi.clearAllMocks();
});
const VALID_TEST_CASES: Array<{ transports: Transport[] }> = [
{ transports: [backendTransport] },
// will pick the first livekit transport in the list, even if there are other non-livekit transports
{ transports: [{ type: "not_livekit" }, backendTransport] },
];
it.each(VALID_TEST_CASES)(
"prefers backend transport over well-known and app config $transports",
async ({ transports }) => {
// it("prefers backend transport over well-known and app config", async () => {
const client = makeClient();
client._unstable_getRTCTransports.mockResolvedValue(transports);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue(makeWellKnown([wellKnownTransport]));
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig("https://config.example.org"),
wellKnownFetcher,
logger: rootLogger,
});
await expect(
discovery.discoverPreferredTransport(),
).resolves.toStrictEqual(backendTransport);
expect(client._unstable_getRTCTransports).toHaveBeenCalledTimes(1);
expect(wellKnownFetcher).not.toHaveBeenCalled();
},
);
it("Retries limit_exceeded backend transport over well-known", async () => {
const client = makeClient();
client._unstable_getRTCTransports
.mockRejectedValueOnce(
new MatrixError(
{
errcode: "M_LIMIT_EXCEEDED",
error: "Too many requests",
retry_after_ms: 100,
},
429,
),
)
.mockResolvedValue([backendTransport]);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue(makeWellKnown([wellKnownTransport]));
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig("https://config.example.org"),
wellKnownFetcher,
logger: rootLogger,
});
await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual(
backendTransport,
);
expect(client._unstable_getRTCTransports).toHaveBeenCalledTimes(2);
expect(wellKnownFetcher).not.toHaveBeenCalled();
});
const INVALID_TEST_CASES: Array<{ transports: Transport[] }> = [
{ transports: [] },
{ transports: [{ type: "not_livekit" }] },
];
it.each(INVALID_TEST_CASES)(
"falls back to well-known when backend has no (valid) livekit transports $transports",
async ({ transports }) => {
const client = makeClient();
client._unstable_getRTCTransports.mockResolvedValue(transports);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue(makeWellKnown([wellKnownTransport]));
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig("https://config.example.org"),
wellKnownFetcher,
logger: rootLogger,
});
await expect(
discovery.discoverPreferredTransport(),
).resolves.toStrictEqual(wellKnownTransport);
expect(wellKnownFetcher).toHaveBeenCalledWith("example.org");
},
);
it("skips backend discovery in widget mode and uses well-known", async () => {
const client = makeClient();
// widget mode is detected by the absence of an access token
client.getAccessToken.mockReturnValue(null);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue(makeWellKnown([wellKnownTransport]));
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig("https://config.example.org"),
wellKnownFetcher,
logger: rootLogger,
});
await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual(
wellKnownTransport,
);
expect(client._unstable_getRTCTransports).not.toHaveBeenCalled();
expect(wellKnownFetcher).toHaveBeenCalledWith("example.org");
});
it("falls back to app config when backend fails and well-known has no rtc_foci", async () => {
const client = makeClient();
client._unstable_getRTCTransports.mockRejectedValue(
new MatrixError({ errcode: "M_UNKNOWN" }, 404),
);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue({} as IClientWellKnown);
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig("https://config.example.org"),
wellKnownFetcher,
logger: rootLogger,
});
await expect(discovery.discoverPreferredTransport()).resolves.toStrictEqual(
{
type: "livekit",
livekit_service_url: "https://config.example.org",
},
);
});
it("returns null when backend, well-known and config are all unavailable", async () => {
const client = makeClient();
client._unstable_getRTCTransports.mockResolvedValue([]);
const wellKnownFetcher = vi
.fn<(domain: string) => Promise<IClientWellKnown>>()
.mockResolvedValue({} as IClientWellKnown);
const discovery = new RtcTransportAutoDiscovery({
client,
resolvedConfig: makeResolvedConfig(undefined),
wellKnownFetcher,
logger: rootLogger,
});
await expect(discovery.discoverPreferredTransport()).resolves.toBeNull();
});
});

View File

@@ -0,0 +1,172 @@
/*
Copyright 2026 Element Creations Ltd.
SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
isLivekitTransportConfig,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { type IClientWellKnown, type MatrixClient } from "matrix-js-sdk";
import { type Logger } from "matrix-js-sdk/lib/logger";
import type { ResolvedConfigOptions } from "../../../config/ConfigOptions.ts";
import { doNetworkOperationWithRetry } from "../../../utils/matrix.ts";
type TransportDiscoveryClient = Pick<
MatrixClient,
"getDomain" | "_unstable_getRTCTransports" | "getAccessToken"
>;
export interface RtcTransportAutoDiscoveryProps {
client: TransportDiscoveryClient;
resolvedConfig: ResolvedConfigOptions;
wellKnownFetcher: (domain: string) => Promise<IClientWellKnown>;
logger: Logger;
}
export class RtcTransportAutoDiscovery {
private readonly client: TransportDiscoveryClient;
private readonly resolvedConfig: ResolvedConfigOptions;
private readonly wellKnownFetcher: (
domain: string,
) => Promise<IClientWellKnown>;
private readonly logger: Logger;
public constructor({
client,
resolvedConfig,
wellKnownFetcher,
logger,
}: RtcTransportAutoDiscoveryProps) {
this.client = client;
this.resolvedConfig = resolvedConfig;
this.wellKnownFetcher = wellKnownFetcher;
this.logger = logger.getChild("[RtcTransportAutoDiscovery]");
}
public async discoverPreferredTransport(): Promise<LivekitTransportConfig | null> {
// 1) backend transports
const backendTransport = await this.tryBackendTransports();
if (backendTransport) {
this.logger.info(
`Found backend transport: ${backendTransport.livekit_service_url}`,
);
return backendTransport;
}
this.logger.info("No backend transport found, falling back to well-known");
// 2) .well-known transports
const wellKnownTransport = await this.tryWellKnownTransports();
if (wellKnownTransport) {
this.logger.info(
`Found .well-known transport: ${wellKnownTransport.livekit_service_url}`,
);
return wellKnownTransport;
}
this.logger.info(
"No .well-known transport found, falling back to app config",
);
// 3) app config URL
const configTransport = this.tryConfigTransport();
if (configTransport) {
this.logger.info(
`Found app config transport: ${configTransport.livekit_service_url}`,
);
return configTransport;
}
return null;
}
/**
* Fetches the first rtc_foci from the backend.
* This will not throw errors, but instead just log them and return null if the expected config is not found or malformed.
* @private
*/
private async tryBackendTransports(): Promise<LivekitTransportConfig | null> {
const client = this.client;
// MSC4143: Attempt to fetch transports from backend.
// TODO: Workaround for an issue in the js-sdk RoomWidgetClient that
// is not yet implementing _unstable_getRTCTransports properly (via widget API new action).
// For now we just skip this call if we are in a widget.
// In widget mode the client is a `RoomWidgetClient` which has no access token (it is using the widget API).
// Could be removed once the js-sdk is fixed (https://github.com/matrix-org/matrix-js-sdk/issues/5245)
const isSPA = !!client.getAccessToken();
if (isSPA && "_unstable_getRTCTransports" in client) {
this.logger.info("First try to use getRTCTransports end point ...");
try {
const transportList = await doNetworkOperationWithRetry(async () =>
client._unstable_getRTCTransports(),
);
const first = transportList.filter(isLivekitTransportConfig)[0];
if (first) {
return first;
} else {
this.logger.info(
`No livekit transport found in getRTCTransports end point`,
transportList,
);
}
} catch (ex) {
this.logger.info(`Failed to use getRTCTransports end point: ${ex}`);
}
} else {
this.logger.debug(`getRTCTransports end point not available`);
}
return null;
}
/**
* Fetches the first rtc_foci from the .well-known/matrix/client.
* This will not throw errors, but instead just log them and return null if the expected config is not found or malformed.
* @private
*/
private async tryWellKnownTransports(): Promise<LivekitTransportConfig | null> {
// Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available.
const client = this.client;
const domain = client.getDomain();
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
const wellKnownFoci = await this.wellKnownFetcher(domain);
const fociConfig = wellKnownFoci["org.matrix.msc4143.rtc_foci"];
if (fociConfig) {
if (!Array.isArray(fociConfig)) {
this.logger.warn(
`org.matrix.msc4143.rtc_foci is not an array in .well-known`,
);
} else {
return fociConfig[0];
}
} else {
this.logger.info(
`No .well-known "org.matrix.msc4143.rtc_foci" found for ${domain}`,
wellKnownFoci,
);
}
} else {
// Should never happen, but just in case
this.logger.warn(`No domain configured for client`);
}
return null;
}
private tryConfigTransport(): LivekitTransportConfig | null {
const url = this.resolvedConfig.livekit?.livekit_service_url;
if (url) {
return {
type: "livekit",
livekit_service_url: url,
};
}
return null;
}
}

View File

@@ -8,6 +8,7 @@ Please see LICENSE in the repository root for full details.
import {
afterEach,
beforeEach,
describe,
expect,
it,
@@ -151,6 +152,19 @@ afterEach(() => {
});
describe("Start connection states", () => {
beforeEach(() => {
fetchMock.post(
`https://matrix-rtc.example.org/livekit/jwt/get_token`,
() => {
return {
// Return a non-retryable error, if not, the retry logic will
// wait and fail the test with a timeout.
status: 404,
};
},
);
});
it("start in initialized state", () => {
setupTest();