Logically separate the advertised transport from the active transport

To correctly implement the legacy "oldest membership" mode, we need the code to be more nuanced about the local transport. Specifically, it needs to allow for the transport we advertise in our membership to be different from the transport that we connect to and publish media on. Otherwise, if these two are yoked together, members will resend their memberships whenever an SFU hop occurs, which an attacker could use to cause an amplified wave of state changes.
This commit is contained in:
Robin
2026-02-09 19:07:27 +01:00
parent 8ea773aa24
commit 13d131c2e9
4 changed files with 211 additions and 153 deletions

View File

@@ -444,35 +444,34 @@ export function createCallViewModel$(
memberId: uuidv4(),
};
const localTransport$ = createLocalTransport$({
scope: scope,
memberships$: memberships$,
ownMembershipIdentity,
client,
delayId$: scope.behavior(
(
fromEvent(
matrixRTCSession,
MembershipManagerEvent.DelayIdChanged,
// The type of reemitted event includes the original emitted as the second arg.
) as Observable<[string | undefined, IMembershipManager]>
).pipe(map(([delayId]) => delayId ?? null)),
matrixRTCSession.delayId ?? null,
),
roomId: matrixRoom.roomId,
forceJwtEndpoint$: scope.behavior(
matrixRTCMode$.pipe(
map((v) =>
v === MatrixRTCMode.Matrix_2_0
? JwtEndpointVersion.Matrix_2_0
: JwtEndpointVersion.Legacy,
),
const localTransport$ = scope.behavior(
matrixRTCMode$.pipe(
map((mode) =>
createLocalTransport$({
scope: scope,
memberships$: memberships$,
ownMembershipIdentity,
client,
delayId$: scope.behavior(
(
fromEvent(
matrixRTCSession,
MembershipManagerEvent.DelayIdChanged,
// The type of reemitted event includes the original emitted as the second arg.
) as Observable<[string | undefined, IMembershipManager]>
).pipe(map(([delayId]) => delayId ?? null)),
matrixRTCSession.delayId ?? null,
),
roomId: matrixRoom.roomId,
forceJwtEndpoint:
mode === MatrixRTCMode.Matrix_2_0
? JwtEndpointVersion.Matrix_2_0
: JwtEndpointVersion.Legacy,
useOldestMember: mode === MatrixRTCMode.Legacy,
}),
),
),
useOldestMember$: scope.behavior(
matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
),
});
);
const connectionFactory = new ECConnectionFactory(
client,
@@ -491,6 +490,7 @@ export function createCallViewModel$(
connectionFactory: connectionFactory,
localTransport$: scope.behavior(
localTransport$.pipe(
switchMap((t) => t.active$),
catchError((e: unknown) => {
logger.info(
"could not pass local transport to createConnectionManager$. localTransport$ threw an error",
@@ -524,13 +524,13 @@ export function createCallViewModel$(
);
const localMembership = createLocalMembership$({
scope: scope,
scope,
homeserverConnected: createHomeserverConnected$(
scope,
client,
matrixRTCSession,
),
muteStates: muteStates,
muteStates,
joinMatrixRTC: (transport: LivekitTransportConfig) => {
return enterRTCSession(
matrixRTCSession,
@@ -550,9 +550,11 @@ export function createCallViewModel$(
),
);
},
connectionManager: connectionManager,
matrixRTCSession: matrixRTCSession,
localTransport$: localTransport$,
connectionManager,
matrixRTCSession,
localTransport$: scope.behavior(
localTransport$.pipe(switchMap((t) => t.advertised$)),
),
logger: logger.getChild(`[${Date.now()}]`),
});

View File

@@ -62,7 +62,6 @@ import {
} from "../remoteMembers/Connection.ts";
import { type HomeserverConnected } from "./HomeserverConnected.ts";
import { and$ } from "../../../utils/observable.ts";
import { type LocalTransportWithSFUConfig } from "./LocalTransport.ts";
export enum TransportState {
/** Not even a transport is available to the LocalMembership */
@@ -128,7 +127,7 @@ interface Props {
createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (transport: LivekitTransportConfig) => void;
homeserverConnected: HomeserverConnected;
localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
localTransport$: Behavior<LivekitTransportConfig | null>;
matrixRTCSession: Pick<
MatrixRTCSession,
"updateCallIntent" | "leaveRoomSession"
@@ -147,7 +146,7 @@ interface Props {
* @param props.createPublisherFactory Factory to create a publisher once we have a connection.
* @param props.joinMatrixRTC Callback to join the matrix RTC session once we have a transport.
* @param props.homeserverConnected The homeserver connected state.
* @param props.localTransport$ The local transport to use for publishing.
* @param props.localTransport$ The transport to advertise in our membership.
* @param props.logger The logger to use.
* @param props.muteStates The mute states for video and audio.
* @param props.matrixRTCSession The matrix RTC session to join.
@@ -237,9 +236,7 @@ export const createLocalMembership$ = ({
return null;
}
return connectionData.getConnectionForTransport(
localTransport.transport,
);
return connectionData.getConnectionForTransport(localTransport);
}),
tap((connection) => {
logger.info(
@@ -549,7 +546,7 @@ export const createLocalMembership$ = ({
if (!shouldConnect) return;
try {
joinMatrixRTC(transport.transport);
joinMatrixRTC(transport);
} catch (error) {
logger.error("Error entering RTC session", error);
if (error instanceof Error)

View File

@@ -13,12 +13,15 @@ import {
} from "matrix-js-sdk/lib/matrixrtc";
import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import {
combineLatest,
distinctUntilChanged,
first,
from,
map,
merge,
of,
startWith,
switchMap,
tap,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
@@ -58,8 +61,8 @@ interface Props {
OpenIDClientParts;
// Used by the jwt service to create the livekit room and compute the livekit alias.
roomId: string;
useOldestMember$: Behavior<boolean>;
forceJwtEndpoint$: Behavior<JwtEndpointVersion>;
useOldestMember: boolean;
forceJwtEndpoint: JwtEndpointVersion;
delayId$: Behavior<string | null>;
}
@@ -93,23 +96,35 @@ export interface LocalTransportWithSFUConfig {
transport: LivekitTransportConfig;
sfuConfig: SFUConfig;
}
export function isLocalTransportWithSFUConfig(
obj: LivekitTransportConfig | LocalTransportWithSFUConfig,
): obj is LocalTransportWithSFUConfig {
return "transport" in obj && "sfuConfig" in obj;
}
interface LocalTransport {
/**
* The transport to be advertised in our MatrixRTC membership. `null` when not
* yet fetched/validated.
*/
advertised$: Behavior<LivekitTransportConfig | null>;
/**
* The transport to connect to and publish media on. `null` when not yet known
* or available.
*/
active$: Behavior<LocalTransportWithSFUConfig | null>;
}
/**
* This class is responsible for managing the local transport.
* "Which transport is the local member going to use"
* Connects to the JWT service and determines the transports that the local member should use.
*
* @prop useOldestMember Whether to use the same transport as the oldest member.
* This will only update once the first oldest member appears. Will not recompute if the oldest member leaves.
*
* @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint.
* @prop useOldJwtEndpoint Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint.
* This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity.
* (which is expected for non sticky event based rtc member events)
* @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value.
* @returns The transport to advertise in the local MatrixRTC membership, along with the transport to actively publish media to.
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
export const createLocalTransport$ = ({
@@ -118,114 +133,156 @@ export const createLocalTransport$ = ({
ownMembershipIdentity,
client,
roomId,
useOldestMember$,
forceJwtEndpoint$,
useOldestMember,
forceJwtEndpoint,
delayId$,
}: Props): Behavior<LocalTransportWithSFUConfig | null> => {
}: Props): LocalTransport => {
/**
* The transport over which we should be actively publishing our media.
* undefined when not joined.
* The LiveKit transport in use by the oldest RTC membership. `null` when the
* oldest member has no such transport.
*/
const oldestMemberTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
combineLatest([memberships$, useOldestMember$]).pipe(
map(([memberships, useOldestMember]) => {
if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member
const oldestMember = memberships.value[0];
const transport = oldestMember?.getTransport(oldestMember);
if (!transport) return null;
return transport;
}),
switchMap((transport) => {
if (transport !== null && isLivekitTransportConfig(transport)) {
// Get the open jwt token to connect to the sfu
const computeLocalTransportWithSFUConfig =
async (): Promise<LocalTransportWithSFUConfig> => {
return {
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
};
};
return from(computeLocalTransportWithSFUConfig());
}
return of(null);
}),
),
null,
);
const oldestMemberTransport$ = scope.behavior<LivekitTransportConfig | null>(
memberships$.pipe(
map((memberships) => {
const oldestMember = memberships.value[0];
if (oldestMember === undefined) {
logger.info("Oldest member: not found");
return null;
}
const transport = oldestMember.getTransport(oldestMember);
if (transport === undefined) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`,
);
return null;
}
if (!isLivekitTransportConfig(transport)) {
logger.warn(
`Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`,
);
return null;
}
logger.info(
"Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport",
);
return transport;
}),
distinctUntilChanged(areLivekitTransportsEqual),
),
);
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
* transport preferences of others, perhaps). `null` until fetched and
* validated.
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
const preferredTransport$ = scope.behavior(
// preferredTransport$ (used for multi sfu) needs to know if we are using the old or new
// jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity
// differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`)
// When using sticky events (we need to use the new endpoint).
combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe(
switchMap(([customUrl, delayId, forceEndpoint]) => {
logger.info(
"Creating preferred transport based on: ",
"customUrl: ",
customUrl,
"delayId: ",
delayId,
"forceEndpoint: ",
forceEndpoint,
);
return from(
makeTransport(
client,
ownMembershipIdentity,
roomId,
customUrl,
forceEndpoint,
delayId ?? undefined,
const preferredTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
// preferredTransport$ (used for multi sfu) needs to know if we are using the old or new
// jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity
// differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`)
// When using sticky events (we need to use the new endpoint).
customLivekitUrl.value$.pipe(
switchMap((customUrl) =>
startWith<LocalTransportWithSFUConfig | null>(null)(
// Fetch the SFU config, and repeat this asynchronously for every
// change in delay ID.
delayId$.pipe(
switchMap(async (delayId) => {
logger.info(
"Creating preferred transport based on: ",
"customUrl: ",
customUrl,
"delayId: ",
delayId,
"forceJwtEndpoint: ",
forceJwtEndpoint,
);
return makeTransport(
client,
ownMembershipIdentity,
roomId,
customUrl,
forceJwtEndpoint,
delayId ?? undefined,
);
}),
// We deliberately hide any changes to the SFU config because we
// do not actually want the app to reconnect whenever the JWT
// token changes due to us delegating a new delayed event. The
// initial SFU config for the transport is all the app needs.
distinctUntilChanged((prev, next) =>
areLivekitTransportsEqual(prev.transport, next.transport),
),
),
),
);
}),
),
null,
);
),
),
);
/**
* The chosen transport we should advertise in our MatrixRTC membership.
*/
return scope.behavior(
combineLatest([
useOldestMember$,
oldestMemberTransport$,
preferredTransport$,
]).pipe(
map(([useOldestMember, oldestMemberTransport, preferredTransport]) => {
return useOldestMember
? (oldestMemberTransport ?? preferredTransport)
: preferredTransport;
}),
distinctUntilChanged((t1, t2) => {
logger.info(
"Local Transport Update from:",
t1?.transport.livekit_service_url,
" to ",
t2?.transport.livekit_service_url,
);
return areLivekitTransportsEqual(
t1?.transport ?? null,
t2?.transport ?? null,
);
}),
if (useOldestMember) {
// --- Oldest member mode ---
return {
// Never update the transport that we advertise in our membership. Just
// take the first valid oldest member or preferred transport that we learn
// about, and stick with that. This avoids unnecessary SFU hops and room
// state changes.
advertised$: scope.behavior(
merge(
oldestMemberTransport$,
preferredTransport$.pipe(map((t) => t?.transport ?? null)),
).pipe(
first((t) => t !== null),
tap((t) =>
logger.info(`Advertise transport: ${t.livekit_service_url}`),
),
),
null,
),
// Publish on the transport used by the oldest member.
active$: scope.behavior(
oldestMemberTransport$.pipe(
switchMap((transport) => {
// Oldest member not available (or invalid SFU config).
if (transport === null) return of(null);
// Oldest member available: fetch the SFU config.
const fetchOldestMemberTransport =
async (): Promise<LocalTransportWithSFUConfig> => ({
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
});
return from(fetchOldestMemberTransport()).pipe(startWith(null));
}),
tap((t) =>
logger.info(
`Publish on transport: ${t?.transport.livekit_service_url}`,
),
),
),
),
};
}
// --- Multi-SFU mode ---
// Always publish on and advertise the preferred transport.
return {
advertised$: scope.behavior(
preferredTransport$.pipe(
map((t) => t?.transport ?? null),
distinctUntilChanged(areLivekitTransportsEqual),
),
),
);
active$: preferredTransport$,
};
};
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";

View File

@@ -90,7 +90,7 @@ export interface IConnectionManager {
* @param props - Configuration object
* @param props.scope - The observable scope used by this object
* @param props.connectionFactory - Used to create new connections
* @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$)
* @param props.localTransport$ - The transport to publish local media on. (deduplicated with remoteTransports$)
* @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$)
* @param props.ownMembershipIdentity - The own membership identity to use.
* @param props.logger - The logger to use.
@@ -164,21 +164,21 @@ export function createConnectionManager$({
generateItemsWithEpoch(
"ConnectionManager connections$",
function* (transports) {
for (const transportWithOrWithoutSfuConfig of transports) {
if (
isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig)
) {
// This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field
const { transport, sfuConfig } = transportWithOrWithoutSfuConfig;
for (const transport of transports) {
if (isLocalTransportWithSFUConfig(transport)) {
// This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field.
yield {
keys: [transport.livekit_service_url, sfuConfig],
keys: [
transport.transport.livekit_service_url,
transport.sfuConfig,
],
data: undefined,
};
} else {
yield {
keys: [
transportWithOrWithoutSfuConfig.livekit_service_url,
undefined as undefined | SFUConfig,
transport.livekit_service_url,
undefined as SFUConfig | undefined,
],
data: undefined,
};
@@ -194,6 +194,8 @@ export function createConnectionManager$({
},
ownMembershipIdentity,
logger,
// TODO: This whole optional SFUConfig parameter is not particularly elegant.
// I would like it if connections always fetched the SFUConfig by themselves.
sfuConfig,
);
// Start the connection immediately