Refactor local transport

- use the new domain logic to discover the transport
- then try to authenticate
- Also fix the bug in multi sfu where active$ not updated on delayId change
This commit is contained in:
Valere
2026-04-02 14:38:49 +02:00
parent c5c154c99b
commit 6dcb470162
3 changed files with 189 additions and 328 deletions

View File

@@ -5,11 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
retryNetworkOperation,
type IOpenIDToken,
type MatrixClient,
} from "matrix-js-sdk";
import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type Logger } from "matrix-js-sdk/lib/logger";
@@ -70,6 +66,7 @@ export type OpenIDClientParts = Pick<
MatrixClient,
"getOpenIdToken" | "getDeviceId"
>;
/**
* Gets a bearer token from the homeserver and then use it to authenticate
* to the matrix RTC backend in order to get acces to the SFU.
@@ -113,9 +110,6 @@ export async function getSFUConfigWithOpenID(
);
}
logger?.debug("Got openID token", openIdToken);
logger?.info(`Trying to get JWT for focus ${serviceUrl}...`);
let sfuConfig: { url: string; jwt: string } | undefined;
const tryBothJwtEndpoints = opts?.forceJwtEndpoint === undefined; // This is for SFUs where we do not publish.
@@ -127,7 +121,10 @@ export async function getSFUConfigWithOpenID(
// if we can use both or if we are forced to use the new one.
if (tryBothJwtEndpoints || forceMatrix2Jwt) {
try {
sfuConfig = await getLiveKitJWTWithDelayDelegation(
logger?.info(
`Trying to get JWT with delegation for focus ${serviceUrl}...`,
);
const sfuConfig = await getLiveKitJWTWithDelayDelegation(
membership,
serviceUrl,
roomId,
@@ -135,33 +132,36 @@ export async function getSFUConfigWithOpenID(
opts?.delayEndpointBaseUrl,
opts?.delayId,
);
logger?.info(`Got JWT from call's active focus URL.`);
return extractFullConfigFromToken(sfuConfig);
} catch (e) {
logger?.debug(`Failed fetching jwt with matrix 2.0 endpoint:`, e);
if (e instanceof NotSupportedError) {
logger?.warn(
`Failed fetching jwt with matrix 2.0 endpoint (retry with legacy) Not supported`,
e,
);
sfuConfig = undefined;
} else {
logger?.warn(
`Failed fetching jwt with matrix 2.0 endpoint other issues ->`,
`(not going to try with legacy endpoint: forceOldJwtEndpoint is set to false, we did not get a not supported error from the sfu)`,
e,
);
// Make this throw a hard error in case we force the matrix2.0 endpoint.
if (forceMatrix2Jwt)
throw new NoMatrix2AuthorizationService(e as Error);
// NEVER get bejond this point if we forceMatrix2 and it failed!
}
// Make this throw a hard error in case we force the matrix2.0 endpoint.
if (forceMatrix2Jwt) throw new NoMatrix2AuthorizationService(e as Error);
// if (e instanceof NotSupportedError) {
// logger?.warn(
// `Failed fetching jwt with matrix 2.0 endpoint (retry with legacy) Not supported`,
// e,
// );
// } else {
// logger?.warn(
// `Failed fetching jwt with matrix 2.0 endpoint other issues ->`,
// `(not going to try with legacy endpoint: forceOldJwtEndpoint is set to false, we did not get a not supported error from the sfu)`,
// e,
// );
// // NEVER get bejond this point if we forceMatrix2 and it failed!
// }
}
}
// DEPRECATED
// here we either have a sfuConfig or we alredy exited because of `if (forceMatrix2) throw ...`
// here we either have a sfuConfig or we already exited because of `if (forceMatrix2) throw ...`
// The only case we can get into this condition is, if `forceMatrix2` is `false`
if (sfuConfig === undefined) {
try {
logger?.info(
`Trying to get JWT with legacy endpoint for focus ${serviceUrl}...`,
);
sfuConfig = await getLiveKitJWT(
membership.deviceId,
serviceUrl,
@@ -169,15 +169,19 @@ export async function getSFUConfigWithOpenID(
openIdToken,
);
logger?.info(`Got JWT from call's active focus URL.`);
return extractFullConfigFromToken(sfuConfig);
} catch (ex) {
throw new FailToGetOpenIdToken(
ex instanceof Error ? ex : new Error(`Unknown error ${ex}`),
);
}
}
if (!sfuConfig) {
throw new Error("No `sfuConfig` after trying with old and new endpoints");
}
// Pull the details from the JWT
function extractFullConfigFromToken(sfuConfig: {
url: string;
jwt: string;
}): SFUConfig {
const [, payloadStr] = sfuConfig.jwt.split(".");
// TODO: Prefer Uint8Array.fromBase64 when widely available
const payload = JSON.parse(global.atob(payloadStr)) as SFUJWTPayload;
return {
jwt: sfuConfig.jwt,
@@ -189,16 +193,15 @@ export async function getSFUConfigWithOpenID(
livekitIdentity: payload.sub,
};
}
const RETRIES = 4;
async function getLiveKitJWT(
deviceId: string,
livekitServiceURL: string,
matrixRoomId: string,
openIDToken: IOpenIDToken,
): Promise<{ url: string; jwt: string }> {
let res: Response | undefined;
await retryNetworkOperation(RETRIES, async () => {
res = await fetch(livekitServiceURL + "/sfu/get", {
const res = await doNetworkOperationWithRetry(async () => {
return await fetch(livekitServiceURL + "/sfu/get", {
method: "POST",
headers: {
"Content-Type": "application/json",
@@ -211,11 +214,7 @@ async function getLiveKitJWT(
}),
});
});
if (!res) {
throw new Error(
`Network error while connecting to jwt service after ${RETRIES} retries`,
);
}
if (!res.ok) {
throw new Error("SFU Config fetch failed with status code " + res.status);
}
@@ -262,10 +261,8 @@ export async function getLiveKitJWTWithDelayDelegation(
};
}
let res: Response | undefined;
await retryNetworkOperation(RETRIES, async () => {
res = await fetch(livekitServiceURL + "/get_token", {
const res = await doNetworkOperationWithRetry(async () => {
return await fetch(livekitServiceURL + "/get_token", {
method: "POST",
headers: {
"Content-Type": "application/json",
@@ -274,11 +271,6 @@ export async function getLiveKitJWTWithDelayDelegation(
});
});
if (!res) {
throw new Error(
`Network error while connecting to jwt service after ${RETRIES} retries`,
);
}
if (!res.ok) {
const msg = "SFU Config fetch failed with status code " + res.status;
if (res.status === 404) {

View File

@@ -544,7 +544,7 @@ describe("LocalTransport", () => {
});
});
it.fails(
it(
"should not update advertised transport on delayID changes, but active should update",
async () => {
// For simplicity, we'll just use the config livekit

View File

@@ -8,11 +8,11 @@ Please see LICENSE in the repository root for full details.
import {
type CallMembership,
isLivekitTransportConfig,
type Transport,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import { type MatrixClient } from "matrix-js-sdk";
import {
combineLatest,
distinctUntilChanged,
first,
from,
@@ -42,6 +42,7 @@ import {
} from "../../../livekit/openIDSFU.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
import { customLivekitUrl } from "../../../settings/settings.ts";
import { RtcTransportAutoDiscovery } from "./RtcTransportAutoDiscovery.ts";
const logger = rootLogger.getChild("[LocalTransport]");
@@ -137,91 +138,75 @@ export const createLocalTransport$ = ({
forceJwtEndpoint,
delayId$,
}: Props): LocalTransport => {
/**
* The LiveKit transport in use by the oldest RTC membership. `null` when the
* oldest member has no such transport.
*/
const oldestMemberTransport$ = scope.behavior<LivekitTransportConfig | null>(
memberships$.pipe(
map((memberships) => {
const oldestMember = memberships.value[0];
if (oldestMember === undefined) {
logger.info("Oldest member: not found");
return null;
}
const transport = oldestMember.getTransport(oldestMember);
if (transport === undefined) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`,
);
return null;
}
if (!isLivekitTransportConfig(transport)) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`,
);
return null;
}
logger.info(
"Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport",
);
return transport;
}),
distinctUntilChanged(areLivekitTransportsEqual),
),
// The LiveKit transport in use by the oldest RTC membership. `null` when the
// oldest member has no such transport.
const oldestMemberTransport$ = observerOldestMembership$(scope, memberships$);
const transportDiscovery = new RtcTransportAutoDiscovery({
client: client,
resolvedConfig: Config.get(),
wellKnownFetcher: AutoDiscovery.getRawClientConfig.bind(AutoDiscovery),
logger: logger,
});
// Get the preferred transport from the current deployement.
const discoveredTransport$ = from(
transportDiscovery.discoverPreferredTransport(),
);
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps). `null` until fetched and
* validated.
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
const preferredTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
// preferredTransport$ (used for multi sfu) needs to know if we are using the old or new
// jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity
// differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`)
// When using sticky events (we need to use the new endpoint).
customLivekitUrl.value$.pipe(
switchMap((customUrl) =>
startWith<LocalTransportWithSFUConfig | null>(null)(
// Fetch the SFU config, and repeat this asynchronously for every
// change in delay ID.
delayId$.pipe(
switchMap(async (delayId) => {
logger.info(
"Creating preferred transport based on: ",
"customUrl: ",
customUrl,
"delayId: ",
delayId,
"forceJwtEndpoint: ",
forceJwtEndpoint,
);
return makeTransport(
client,
ownMembershipIdentity,
roomId,
customUrl,
forceJwtEndpoint,
delayId ?? undefined,
);
}),
// We deliberately hide any changes to the SFU config because we
// do not actually want the app to reconnect whenever the JWT
// token changes due to us delegating a new delayed event. The
// initial SFU config for the transport is all the app needs.
distinctUntilChanged((prev, next) =>
areLivekitTransportsEqual(prev.transport, next.transport),
),
),
),
),
),
const preferredConfig$ = customLivekitUrl.value$
.pipe(
startWith(null),
switchMap((customUrl) => {
if (customUrl) {
return of({
type: "livekit",
livekit_service_url: customUrl,
} as LivekitTransportConfig);
} else {
return discoveredTransport$;
}
}),
)
.pipe(
map((config) => {
if (!config) {
// Bubbled up from the preferredConfig$ observable.
throw new MatrixRTCTransportMissingError(client.getDomain() ?? "");
}
return config;
}),
distinctUntilChanged(areLivekitTransportsEqual),
);
const preferredTransport$ = combineLatest([preferredConfig$, delayId$]).pipe(
switchMap(async ([transport, delayId]) => {
try {
return await doOpenIdAndJWTFromUrl(
transport.livekit_service_url,
forceJwtEndpoint,
ownMembershipIdentity,
roomId,
client,
delayId ?? undefined,
);
} catch (e) {
if (
e instanceof FailToGetOpenIdToken ||
e instanceof NoMatrix2AuthorizationService
) {
// rethrow as is
throw e;
}
// Catch others and rethrow as FailToGetOpenIdToken that has user friendly message.
logger.error("Failed to get JWT from preferred transport", e);
throw new FailToGetOpenIdToken(
e instanceof Error ? e : new Error(String(e)),
);
}
}),
);
if (useOldestMember) {
// --- Oldest member mode ---
return {
@@ -232,7 +217,7 @@ export const createLocalTransport$ = ({
advertised$: scope.behavior(
merge(
oldestMemberTransport$,
preferredTransport$.pipe(map((t) => t?.transport ?? null)),
preferredTransport$.pipe(map((t) => t.transport)),
).pipe(
first((t) => t !== null),
tap((t) =>
@@ -268,6 +253,7 @@ export const createLocalTransport$ = ({
),
),
),
null,
),
};
}
@@ -277,210 +263,93 @@ export const createLocalTransport$ = ({
return {
advertised$: scope.behavior(
preferredTransport$.pipe(
map((t) => t?.transport ?? null),
map((t) => t.transport),
distinctUntilChanged(areLivekitTransportsEqual),
),
null,
),
active$: preferredTransport$,
active$: scope.behavior(preferredTransport$, null),
};
};
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
/**
* Determine the correct Transport for the current session, including
* validating auth against the service to ensure it's correct.
* Prefers in order:
*
* 1. The `urlFromDevSettings` value. If this cannot be validated, the function will throw.
* 2. The transports returned via the homeserver.
* 3. The transports returned via .well-known.
* 4. The transport configured in Element Call's config.
*
* @param client The authenticated Matrix client for the current user
* @param membership The membership identity of the user.
* @param roomId The ID of the room to be connected to.
* @param urlFromDevSettings Override URL provided by the user's local config.
* @param forceJwtEndpoint Whether to force a specific JWT endpoint
* - `Legacy` / `Matrix_2_0`
* - `get_token` / `sfu/get`
* - not hashing / hashing the backendIdentity
* @param delayId the delay id passed to the jwt service.
*
* @returns A fully validated transport config.
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
* Observes the oldest member in the room and returns the transport that it uses if it is a livekit transport.
* @param scope - The observable scope.
* @param memberships$ - The observable of the call's memberships.'
*/
async function makeTransport(
function observerOldestMembership$(
scope: ObservableScope,
memberships$: Behavior<Epoch<CallMembership[]>>,
): Behavior<LivekitTransportConfig | null> {
return scope.behavior<LivekitTransportConfig | null>(
memberships$.pipe(
map((memberships) => {
const oldestMember = memberships.value[0];
if (oldestMember === undefined) {
logger.info("Oldest member: not found");
return null;
}
const transport = oldestMember.getTransport(oldestMember);
if (transport === undefined) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`,
);
return null;
}
if (!isLivekitTransportConfig(transport)) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`,
);
return null;
}
logger.info(
"Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport",
);
return transport;
}),
distinctUntilChanged(areLivekitTransportsEqual),
),
);
}
// Utility to ensure the user can authenticate with the SFU.
//
// We will call `getSFUConfigWithOpenID` once per transport here as it's our
// only mechanism of validation. This means we will also ask the
// homeserver for a OpenID token a few times. Since OpenID tokens are single
// use we don't want to risk any issues by re-using a token.
//
// If the OpenID request were to fail, then it's acceptable for us to fail
// this function early, as we assume the homeserver has got some problems.
async function doOpenIdAndJWTFromUrl(
url: string,
forceJwtEndpoint: JwtEndpointVersion,
membership: CallMembershipIdentityParts,
roomId: string,
client: Pick<
MatrixClient,
"getDomain" | "baseUrl" | "_unstable_getRTCTransports" | "getAccessToken"
> &
OpenIDClientParts,
membership: CallMembershipIdentityParts,
roomId: string,
urlFromDevSettings: string | null,
forceJwtEndpoint: JwtEndpointVersion,
delayId?: string,
): Promise<LocalTransportWithSFUConfig> {
logger.trace("Searching for a preferred transport");
async function doOpenIdAndJWTFromUrl(
url: string,
): Promise<LocalTransportWithSFUConfig> {
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
delayEndpointBaseUrl: client.baseUrl,
delayId,
},
logger,
);
return {
transport: {
type: "livekit",
livekit_service_url: url,
},
sfuConfig,
};
}
// We will call `getSFUConfigWithOpenID` once per transport here as it's our
// only mechanism of valiation. This means we will also ask the
// homeserver for a OpenID token a few times. Since OpenID tokens are single
// use we don't want to risk any issues by re-using a token.
//
// If the OpenID request were to fail then it's acceptable for us to fail
// this function early, as we assume the homeserver has got some problems.
// DEVTOOL: Highest priority: Load from devtool setting
if (urlFromDevSettings !== null) {
// Validate that the SFU is up. Otherwise, we want to fail on this
// as we don't permit other SFUs.
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using LiveKit transport from dev tools: ", urlFromDevSettings);
return await doOpenIdAndJWTFromUrl(urlFromDevSettings);
}
async function getFirstUsableTransport(
transports: Transport[],
): Promise<LocalTransportWithSFUConfig | null> {
for (const potentialTransport of transports) {
if (isLivekitTransportConfig(potentialTransport)) {
try {
logger.info(
`makeTransport: check transport authentication for "${potentialTransport.livekit_service_url}"`,
);
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
return await doOpenIdAndJWTFromUrl(
potentialTransport.livekit_service_url,
);
} catch (ex) {
logger.debug(
`makeTransport: Could not use SFU service "${potentialTransport.livekit_service_url}" as SFU`,
ex,
);
// Explictly throw these
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
if (ex instanceof NoMatrix2AuthorizationService) {
throw ex;
}
}
} else {
logger.info(
`makeTransport: "${potentialTransport.livekit_service_url}" is not a valid livekit transport as SFU`,
);
}
}
return null;
}
// MSC4143: Attempt to fetch transports from backend.
// TODO: Workaround for an issue in the js-sdk RoomWidgetClient that
// is not yet implementing _unstable_getRTCTransports properly (via widget API new action).
// For now we just skip this call if we are in a widget.
// In widget mode the client is a `RoomWidgetClient` which has no access token (it is using the widget API).
// Could be removed once the js-sdk is fixed (https://github.com/matrix-org/matrix-js-sdk/issues/5245)
const isSPA = !!client.getAccessToken();
if (isSPA && "_unstable_getRTCTransports" in client) {
logger.info(
"makeTransport: First try to use getRTCTransports end point ...",
);
try {
// TODO This should also check for server support?
const transportList = await client._unstable_getRTCTransports();
const selectedTransport = await getFirstUsableTransport(transportList);
if (selectedTransport) {
logger.info(
"makeTransport: ...Using backend-configured (client.getRTCTransports) SFU",
selectedTransport,
);
return selectedTransport;
}
} catch (ex) {
if (ex instanceof MatrixError && ex.httpStatus === 404) {
// Expected, this is an unstable endpoint and it's not required.
// There will be expected 404 errors in the console. When we check if synapse supports the endpoint.
logger.debug(
"Matrix homeserver does not provide any RTC transports via `/rtc/transports` (will retry with well-known.)",
);
} else if (ex instanceof FailToGetOpenIdToken) {
throw ex;
} else {
// We got an error that wasn't just missing support for the feature, so log it loudly.
logger.error(
"Unexpected error fetching RTC transports from backend",
ex,
);
}
}
}
logger.info(
`makeTransport: Trying to get transports from .well-known/matrix/client on domain ${client.getDomain()} ...`,
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
delayEndpointBaseUrl: client.baseUrl,
delayId,
},
logger,
);
// Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available.
const domain = client.getDomain();
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[
FOCI_WK_KEY
];
const selectedTransport = Array.isArray(wellKnownFoci)
? await getFirstUsableTransport(wellKnownFoci)
: null;
if (selectedTransport) {
logger.info("Using .well-known SFU", selectedTransport);
return selectedTransport;
}
}
logger.info(
`makeTransport: No valid transport found via backend or .well-known, falling back to config if available.`,
);
// CONFIG: Least prioritized; Load from config file
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
try {
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using config SFU", urlFromConf);
return await doOpenIdAndJWTFromUrl(urlFromConf);
} catch (ex) {
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
logger.error("Failed to validate config SFU", ex);
}
}
// If we do not have returned a transport by now we throw an error
throw new MatrixRTCTransportMissingError(domain ?? "");
return {
transport: {
type: "livekit",
livekit_service_url: url,
},
sfuConfig,
};
}