Files
element-call-Github/src/state/CallViewModel/localMember/LocalTransport.ts
Valere Fedronic c0540c105d Merge pull request #3706 from element-hq/valere/fix_remove_livekit_alias_signaling
The advertised livekit_alias in membership is deprecated
2026-02-04 13:53:30 +01:00

405 lines
14 KiB
TypeScript

/*
Copyright 2025 Element Creations Ltd.
SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
type CallMembership,
isLivekitTransportConfig,
type Transport,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import {
combineLatest,
distinctUntilChanged,
from,
map,
of,
switchMap,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type Behavior } from "../../Behavior.ts";
import { type Epoch, type ObservableScope } from "../../ObservableScope.ts";
import { Config } from "../../../config/Config.ts";
import {
FailToGetOpenIdToken,
MatrixRTCTransportMissingError,
NoMatrix2AuthorizationService,
} from "../../../utils/errors.ts";
import {
getSFUConfigWithOpenID,
type SFUConfig,
type OpenIDClientParts,
} from "../../../livekit/openIDSFU.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
import { customLivekitUrl } from "../../../settings/settings.ts";
const logger = rootLogger.getChild("[LocalTransport]");
/*
* It figures out “which LiveKit focus URL/alias the local user should use,”
* optionally aligning with the oldest member, and ensures the SFU path is primed
* before advertising that choice.
*/
interface Props {
scope: ObservableScope;
ownMembershipIdentity: CallMembershipIdentityParts;
memberships$: Behavior<Epoch<CallMembership[]>>;
client: Pick<
MatrixClient,
"getDomain" | "baseUrl" | "_unstable_getRTCTransports"
> &
OpenIDClientParts;
// Used by the jwt service to create the livekit room and compute the livekit alias.
roomId: string;
useOldestMember$: Behavior<boolean>;
forceJwtEndpoint$: Behavior<JwtEndpointVersion>;
delayId$: Behavior<string | null>;
}
export enum JwtEndpointVersion {
Legacy = "legacy",
Matrix_2_0 = "matrix_2_0",
}
// TODO livekit_alias-cleanup
// 1. We need to move away from transports map to connections!!!
//
// 2. We need to stop sending livekit_alias all together
//
//
// 1.
// Transports are just the jwt service adress but do not contain the information which room on this transport to use.
// That requires slot and roomId.
//
// We need one connection per room on the transport.
//
// We need an object that contains:
// transport
// roomId
// slotId
//
// To map to the connections. Prosposal: `ConnectionIdentifier`
//
// 2.
// We need to make sure we do not sent livekit_alias in sticky events and that we drop all code for sending state events!
export interface LocalTransportWithSFUConfig {
transport: LivekitTransportConfig;
sfuConfig: SFUConfig;
}
export function isLocalTransportWithSFUConfig(
obj: LivekitTransportConfig | LocalTransportWithSFUConfig,
): obj is LocalTransportWithSFUConfig {
return "transport" in obj && "sfuConfig" in obj;
}
/**
* This class is responsible for managing the local transport.
* "Which transport is the local member going to use"
*
* @prop useOldestMember Whether to use the same transport as the oldest member.
* This will only update once the first oldest member appears. Will not recompute if the oldest member leaves.
*
* @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint.
* This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity.
* (which is expected for non sticky event based rtc member events)
* @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value.
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
export const createLocalTransport$ = ({
scope,
memberships$,
ownMembershipIdentity,
client,
roomId,
useOldestMember$,
forceJwtEndpoint$,
delayId$,
}: Props): Behavior<LocalTransportWithSFUConfig | null> => {
/**
* The transport over which we should be actively publishing our media.
* undefined when not joined.
*/
const oldestMemberTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
combineLatest([memberships$, useOldestMember$]).pipe(
map(([memberships, useOldestMember]) => {
if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member
const oldestMember = memberships.value[0];
const transport = oldestMember?.getTransport(oldestMember);
if (!transport) return null;
return transport;
}),
switchMap((transport) => {
if (transport !== null && isLivekitTransportConfig(transport)) {
// Get the open jwt token to connect to the sfu
const computeLocalTransportWithSFUConfig =
async (): Promise<LocalTransportWithSFUConfig> => {
return {
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
};
};
return from(computeLocalTransportWithSFUConfig());
}
return of(null);
}),
),
null,
);
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
const preferredTransport$ = scope.behavior(
// preferredTransport$ (used for multi sfu) needs to know if we are using the old or new
// jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity
// differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`)
// When using sticky events (we need to use the new endpoint).
combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe(
switchMap(([customUrl, delayId, forceEndpoint]) => {
logger.info(
"Creating preferred transport based on: ",
"customUrl: ",
customUrl,
"delayId: ",
delayId,
"forceEndpoint: ",
forceEndpoint,
);
return from(
makeTransport(
client,
ownMembershipIdentity,
roomId,
customUrl,
forceEndpoint,
delayId ?? undefined,
),
);
}),
),
null,
);
/**
* The chosen transport we should advertise in our MatrixRTC membership.
*/
return scope.behavior(
combineLatest([
useOldestMember$,
oldestMemberTransport$,
preferredTransport$,
]).pipe(
map(([useOldestMember, oldestMemberTransport, preferredTransport]) => {
return useOldestMember
? (oldestMemberTransport ?? preferredTransport)
: preferredTransport;
}),
distinctUntilChanged((t1, t2) => {
logger.info(
"Local Transport Update from:",
t1?.transport.livekit_service_url,
" to ",
t2?.transport.livekit_service_url,
);
return areLivekitTransportsEqual(
t1?.transport ?? null,
t2?.transport ?? null,
);
}),
),
);
};
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
/**
* Determine the correct Transport for the current session, including
* validating auth against the service to ensure it's correct.
* Prefers in order:
*
* 1. The `urlFromDevSettings` value. If this cannot be validated, the function will throw.
* 2. The transports returned via the homeserver.
* 3. The transports returned via .well-known.
* 4. The transport configured in Element Call's config.
*
* @param client The authenticated Matrix client for the current user
* @param membership The membership identity of the user.
* @param roomId The ID of the room to be connected to.
* @param urlFromDevSettings Override URL provided by the user's local config.
* @param forceJwtEndpoint Whether to force a specific JWT endpoint
* - `Legacy` / `Matrix_2_0`
* - `get_token` / `sfu/get`
* - not hashing / hashing the backendIdentity
* @param delayId the delay id passed to the jwt service.
*
* @returns A fully validated transport config.
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
async function makeTransport(
client: Pick<
MatrixClient,
"getDomain" | "baseUrl" | "_unstable_getRTCTransports"
> &
OpenIDClientParts,
membership: CallMembershipIdentityParts,
roomId: string,
urlFromDevSettings: string | null,
forceJwtEndpoint: JwtEndpointVersion,
delayId?: string,
): Promise<LocalTransportWithSFUConfig> {
logger.trace("Searching for a preferred transport");
async function doOpenIdAndJWTFromUrl(
url: string,
): Promise<LocalTransportWithSFUConfig> {
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
delayEndpointBaseUrl: client.baseUrl,
delayId,
},
logger,
);
return {
transport: {
type: "livekit",
livekit_service_url: url,
},
sfuConfig,
};
}
// We will call `getSFUConfigWithOpenID` once per transport here as it's our
// only mechanism of valiation. This means we will also ask the
// homeserver for a OpenID token a few times. Since OpenID tokens are single
// use we don't want to risk any issues by re-using a token.
//
// If the OpenID request were to fail then it's acceptable for us to fail
// this function early, as we assume the homeserver has got some problems.
// DEVTOOL: Highest priority: Load from devtool setting
if (urlFromDevSettings !== null) {
// Validate that the SFU is up. Otherwise, we want to fail on this
// as we don't permit other SFUs.
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using LiveKit transport from dev tools: ", urlFromDevSettings);
return await doOpenIdAndJWTFromUrl(urlFromDevSettings);
}
async function getFirstUsableTransport(
transports: Transport[],
): Promise<LocalTransportWithSFUConfig | null> {
for (const potentialTransport of transports) {
if (isLivekitTransportConfig(potentialTransport)) {
try {
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
return await doOpenIdAndJWTFromUrl(
potentialTransport.livekit_service_url,
);
} catch (ex) {
// Explictly throw these
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
if (ex instanceof NoMatrix2AuthorizationService) {
throw ex;
}
logger.debug(
`Could not use SFU service "${potentialTransport.livekit_service_url}" as SFU`,
ex,
);
}
}
}
return null;
}
// MSC4143: Attempt to fetch transports from backend.
if ("_unstable_getRTCTransports" in client) {
try {
const transportList = await client._unstable_getRTCTransports();
const selectedTransport = await getFirstUsableTransport(transportList);
if (selectedTransport) {
logger.info(
"Using backend-configured (client.getRTCTransports) SFU",
selectedTransport,
);
return selectedTransport;
}
} catch (ex) {
if (ex instanceof MatrixError && ex.httpStatus === 404) {
// Expected, this is an unstable endpoint and it's not required.
// There will be expected 404 errors in the console. When we check if synapse supports the endpoint.
logger.debug(
"Matrix homeserver does not provide any RTC transports via `/rtc/transports` (will retry with well-known.)",
);
} else if (ex instanceof FailToGetOpenIdToken) {
throw ex;
} else {
// We got an error that wasn't just missing support for the feature, so log it loudly.
logger.error(
"Unexpected error fetching RTC transports from backend",
ex,
);
}
}
}
// Legacy MSC4143 (to be removed) WELL_KNOWN: Prioritize the .well-known/matrix/client, if available.
const domain = client.getDomain();
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[
FOCI_WK_KEY
];
const selectedTransport = Array.isArray(wellKnownFoci)
? await getFirstUsableTransport(wellKnownFoci)
: null;
if (selectedTransport) {
logger.info("Using .well-known SFU", selectedTransport);
return selectedTransport;
}
}
// CONFIG: Least prioritized; Load from config file
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
try {
// This will call the jwt/sfu/get endpoint to pre create the livekit room.
logger.info("Using config SFU", urlFromConf);
return await doOpenIdAndJWTFromUrl(urlFromConf);
} catch (ex) {
if (ex instanceof FailToGetOpenIdToken) {
throw ex;
}
logger.error("Failed to validate config SFU", ex);
}
}
// If we do not have returned a transport by now we throw an error
throw new MatrixRTCTransportMissingError(domain ?? "");
}