diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 82bcaedb..e2c6e46e 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -444,35 +444,34 @@ export function createCallViewModel$( memberId: uuidv4(), }; - const localTransport$ = createLocalTransport$({ - scope: scope, - memberships$: memberships$, - ownMembershipIdentity, - client, - delayId$: scope.behavior( - ( - fromEvent( - matrixRTCSession, - MembershipManagerEvent.DelayIdChanged, - // The type of reemitted event includes the original emitted as the second arg. - ) as Observable<[string | undefined, IMembershipManager]> - ).pipe(map(([delayId]) => delayId ?? null)), - matrixRTCSession.delayId ?? null, - ), - roomId: matrixRoom.roomId, - forceJwtEndpoint$: scope.behavior( - matrixRTCMode$.pipe( - map((v) => - v === MatrixRTCMode.Matrix_2_0 - ? JwtEndpointVersion.Matrix_2_0 - : JwtEndpointVersion.Legacy, - ), + const localTransport$ = scope.behavior( + matrixRTCMode$.pipe( + map((mode) => + createLocalTransport$({ + scope: scope, + memberships$: memberships$, + ownMembershipIdentity, + client, + delayId$: scope.behavior( + ( + fromEvent( + matrixRTCSession, + MembershipManagerEvent.DelayIdChanged, + // The type of reemitted event includes the original emitted as the second arg. + ) as Observable<[string | undefined, IMembershipManager]> + ).pipe(map(([delayId]) => delayId ?? null)), + matrixRTCSession.delayId ?? null, + ), + roomId: matrixRoom.roomId, + forceJwtEndpoint: + mode === MatrixRTCMode.Matrix_2_0 + ? JwtEndpointVersion.Matrix_2_0 + : JwtEndpointVersion.Legacy, + useOldestMember: mode === MatrixRTCMode.Legacy, + }), ), ), - useOldestMember$: scope.behavior( - matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)), - ), - }); + ); const connectionFactory = new ECConnectionFactory( client, @@ -491,6 +490,7 @@ export function createCallViewModel$( connectionFactory: connectionFactory, localTransport$: scope.behavior( localTransport$.pipe( + switchMap((t) => t.active$), catchError((e: unknown) => { logger.info( "could not pass local transport to createConnectionManager$. localTransport$ threw an error", @@ -524,13 +524,13 @@ export function createCallViewModel$( ); const localMembership = createLocalMembership$({ - scope: scope, + scope, homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), - muteStates: muteStates, + muteStates, joinMatrixRTC: (transport: LivekitTransportConfig) => { return enterRTCSession( matrixRTCSession, @@ -550,9 +550,11 @@ export function createCallViewModel$( ), ); }, - connectionManager: connectionManager, - matrixRTCSession: matrixRTCSession, - localTransport$: localTransport$, + connectionManager, + matrixRTCSession, + localTransport$: scope.behavior( + localTransport$.pipe(switchMap((t) => t.advertised$)), + ), logger: logger.getChild(`[${Date.now()}]`), }); diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 2f38ad82..eb641ca7 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,7 +62,6 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -128,7 +127,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -147,7 +146,7 @@ interface Props { * @param props.createPublisherFactory Factory to create a publisher once we have a connection. * @param props.joinMatrixRTC Callback to join the matrix RTC session once we have a transport. * @param props.homeserverConnected The homeserver connected state. - * @param props.localTransport$ The local transport to use for publishing. + * @param props.localTransport$ The transport to advertise in our membership. * @param props.logger The logger to use. * @param props.muteStates The mute states for video and audio. * @param props.matrixRTCSession The matrix RTC session to join. @@ -237,9 +236,7 @@ export const createLocalMembership$ = ({ return null; } - return connectionData.getConnectionForTransport( - localTransport.transport, - ); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -549,7 +546,7 @@ export const createLocalMembership$ = ({ if (!shouldConnect) return; try { - joinMatrixRTC(transport.transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 73364094..0b566ba0 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -13,12 +13,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { - combineLatest, distinctUntilChanged, + first, from, map, + merge, of, + startWith, switchMap, + tap, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -58,8 +61,8 @@ interface Props { OpenIDClientParts; // Used by the jwt service to create the livekit room and compute the livekit alias. roomId: string; - useOldestMember$: Behavior; - forceJwtEndpoint$: Behavior; + useOldestMember: boolean; + forceJwtEndpoint: JwtEndpointVersion; delayId$: Behavior; } @@ -93,23 +96,35 @@ export interface LocalTransportWithSFUConfig { transport: LivekitTransportConfig; sfuConfig: SFUConfig; } + export function isLocalTransportWithSFUConfig( obj: LivekitTransportConfig | LocalTransportWithSFUConfig, ): obj is LocalTransportWithSFUConfig { return "transport" in obj && "sfuConfig" in obj; } +interface LocalTransport { + /** + * The transport to be advertised in our MatrixRTC membership. `null` when not + * yet fetched/validated. + */ + advertised$: Behavior; + /** + * The transport to connect to and publish media on. `null` when not yet known + * or available. + */ + active$: Behavior; +} + /** - * This class is responsible for managing the local transport. - * "Which transport is the local member going to use" + * Connects to the JWT service and determines the transports that the local member should use. * * @prop useOldestMember Whether to use the same transport as the oldest member. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. - * - * @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. + * @prop useOldJwtEndpoint Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. * This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity. * (which is expected for non sticky event based rtc member events) - * @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value. + * @returns The transport to advertise in the local MatrixRTC membership, along with the transport to actively publish media to. * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ export const createLocalTransport$ = ({ @@ -118,114 +133,156 @@ export const createLocalTransport$ = ({ ownMembershipIdentity, client, roomId, - useOldestMember$, - forceJwtEndpoint$, + useOldestMember, + forceJwtEndpoint, delayId$, -}: Props): Behavior => { +}: Props): LocalTransport => { /** - * The transport over which we should be actively publishing our media. - * undefined when not joined. + * The LiveKit transport in use by the oldest RTC membership. `null` when the + * oldest member has no such transport. */ - const oldestMemberTransport$ = - scope.behavior( - combineLatest([memberships$, useOldestMember$]).pipe( - map(([memberships, useOldestMember]) => { - if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member - const oldestMember = memberships.value[0]; - const transport = oldestMember?.getTransport(oldestMember); - if (!transport) return null; - return transport; - }), - switchMap((transport) => { - if (transport !== null && isLivekitTransportConfig(transport)) { - // Get the open jwt token to connect to the sfu - const computeLocalTransportWithSFUConfig = - async (): Promise => { - return { - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }; - }; - return from(computeLocalTransportWithSFUConfig()); - } - return of(null); - }), - ), - null, - ); + const oldestMemberTransport$ = scope.behavior( + memberships$.pipe( + map((memberships) => { + const oldestMember = memberships.value[0]; + if (oldestMember === undefined) { + logger.info("Oldest member: not found"); + return null; + } + const transport = oldestMember.getTransport(oldestMember); + if (transport === undefined) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`, + ); + return null; + } + if (!isLivekitTransportConfig(transport)) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`, + ); + return null; + } + logger.info( + "Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport", + ); + return transport; + }), + distinctUntilChanged(areLivekitTransportsEqual), + ), + ); /** * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). + * transport preferences of others, perhaps). `null` until fetched and + * validated. * * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ - const preferredTransport$ = scope.behavior( - // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new - // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity - // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) - // When using sticky events (we need to use the new endpoint). - combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe( - switchMap(([customUrl, delayId, forceEndpoint]) => { - logger.info( - "Creating preferred transport based on: ", - "customUrl: ", - customUrl, - "delayId: ", - delayId, - "forceEndpoint: ", - forceEndpoint, - ); - return from( - makeTransport( - client, - ownMembershipIdentity, - roomId, - customUrl, - forceEndpoint, - delayId ?? undefined, + const preferredTransport$ = + scope.behavior( + // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new + // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity + // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) + // When using sticky events (we need to use the new endpoint). + customLivekitUrl.value$.pipe( + switchMap((customUrl) => + startWith(null)( + // Fetch the SFU config, and repeat this asynchronously for every + // change in delay ID. + delayId$.pipe( + switchMap(async (delayId) => { + logger.info( + "Creating preferred transport based on: ", + "customUrl: ", + customUrl, + "delayId: ", + delayId, + "forceJwtEndpoint: ", + forceJwtEndpoint, + ); + return makeTransport( + client, + ownMembershipIdentity, + roomId, + customUrl, + forceJwtEndpoint, + delayId ?? undefined, + ); + }), + // We deliberately hide any changes to the SFU config because we + // do not actually want the app to reconnect whenever the JWT + // token changes due to us delegating a new delayed event. The + // initial SFU config for the transport is all the app needs. + distinctUntilChanged((prev, next) => + areLivekitTransportsEqual(prev.transport, next.transport), + ), + ), ), - ); - }), - ), - null, - ); + ), + ), + ); - /** - * The chosen transport we should advertise in our MatrixRTC membership. - */ - return scope.behavior( - combineLatest([ - useOldestMember$, - oldestMemberTransport$, - preferredTransport$, - ]).pipe( - map(([useOldestMember, oldestMemberTransport, preferredTransport]) => { - return useOldestMember - ? (oldestMemberTransport ?? preferredTransport) - : preferredTransport; - }), - distinctUntilChanged((t1, t2) => { - logger.info( - "Local Transport Update from:", - t1?.transport.livekit_service_url, - " to ", - t2?.transport.livekit_service_url, - ); - return areLivekitTransportsEqual( - t1?.transport ?? null, - t2?.transport ?? null, - ); - }), + if (useOldestMember) { + // --- Oldest member mode --- + return { + // Never update the transport that we advertise in our membership. Just + // take the first valid oldest member or preferred transport that we learn + // about, and stick with that. This avoids unnecessary SFU hops and room + // state changes. + advertised$: scope.behavior( + merge( + oldestMemberTransport$, + preferredTransport$.pipe(map((t) => t?.transport ?? null)), + ).pipe( + first((t) => t !== null), + tap((t) => + logger.info(`Advertise transport: ${t.livekit_service_url}`), + ), + ), + null, + ), + // Publish on the transport used by the oldest member. + active$: scope.behavior( + oldestMemberTransport$.pipe( + switchMap((transport) => { + // Oldest member not available (or invalid SFU config). + if (transport === null) return of(null); + // Oldest member available: fetch the SFU config. + const fetchOldestMemberTransport = + async (): Promise => ({ + transport, + sfuConfig: await getSFUConfigWithOpenID( + client, + ownMembershipIdentity, + transport.livekit_service_url, + roomId, + { forceJwtEndpoint: JwtEndpointVersion.Legacy }, + logger, + ), + }); + return from(fetchOldestMemberTransport()).pipe(startWith(null)); + }), + tap((t) => + logger.info( + `Publish on transport: ${t?.transport.livekit_service_url}`, + ), + ), + ), + ), + }; + } + + // --- Multi-SFU mode --- + // Always publish on and advertise the preferred transport. + return { + advertised$: scope.behavior( + preferredTransport$.pipe( + map((t) => t?.transport ?? null), + distinctUntilChanged(areLivekitTransportsEqual), + ), ), - ); + active$: preferredTransport$, + }; }; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index e4376c12..727f68bc 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -90,7 +90,7 @@ export interface IConnectionManager { * @param props - Configuration object * @param props.scope - The observable scope used by this object * @param props.connectionFactory - Used to create new connections - * @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$) + * @param props.localTransport$ - The transport to publish local media on. (deduplicated with remoteTransports$) * @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$) * @param props.ownMembershipIdentity - The own membership identity to use. * @param props.logger - The logger to use. @@ -164,21 +164,21 @@ export function createConnectionManager$({ generateItemsWithEpoch( "ConnectionManager connections$", function* (transports) { - for (const transportWithOrWithoutSfuConfig of transports) { - if ( - isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig) - ) { - // This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field - const { transport, sfuConfig } = transportWithOrWithoutSfuConfig; + for (const transport of transports) { + if (isLocalTransportWithSFUConfig(transport)) { + // This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field. yield { - keys: [transport.livekit_service_url, sfuConfig], + keys: [ + transport.transport.livekit_service_url, + transport.sfuConfig, + ], data: undefined, }; } else { yield { keys: [ - transportWithOrWithoutSfuConfig.livekit_service_url, - undefined as undefined | SFUConfig, + transport.livekit_service_url, + undefined as SFUConfig | undefined, ], data: undefined, }; @@ -194,6 +194,8 @@ export function createConnectionManager$({ }, ownMembershipIdentity, logger, + // TODO: This whole optional SFUConfig parameter is not particularly elegant. + // I would like it if connections always fetched the SFUConfig by themselves. sfuConfig, ); // Start the connection immediately