From 4f892e358a1d1f2cd07311c30b2c18cb67dc9c83 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 30 Oct 2025 15:15:49 +0100 Subject: [PATCH] start onwMemberhsip.ts --- src/livekit/openIDSFU.ts | 4 +- src/rtcSessionHelpers.ts | 21 ++- src/state/Async.ts | 7 + src/state/CallViewModel.ts | 49 ++----- src/state/ownMember/OwnMembership.ts | 190 ++++++++++++++++++++------- 5 files changed, 175 insertions(+), 96 deletions(-) diff --git a/src/livekit/openIDSFU.ts b/src/livekit/openIDSFU.ts index 70d1786d..073f6c75 100644 --- a/src/livekit/openIDSFU.ts +++ b/src/livekit/openIDSFU.ts @@ -25,7 +25,7 @@ export type OpenIDClientParts = Pick< export async function getSFUConfigWithOpenID( client: OpenIDClientParts, serviceUrl: string, - livekitAlias: string, + matrixRoomId: string, ): Promise { let openIdToken: IOpenIDToken; try { @@ -43,7 +43,7 @@ export async function getSFUConfigWithOpenID( const sfuConfig = await getLiveKitJWT( client, serviceUrl, - livekitAlias, + matrixRoomId, openIdToken, ); logger.info(`Got JWT from call's active focus URL.`); diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index fadc7b37..74023c22 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -13,6 +13,7 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; +import { type MatrixClient } from "matrix-js-sdk"; import { PosthogAnalytics } from "./analytics/PosthogAnalytics"; import { Config } from "./config/Config"; @@ -23,16 +24,13 @@ import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; -export function getLivekitAlias(rtcSession: MatrixRTCSession): string { - // For now we assume everything is a room-scoped call - return rtcSession.room.roomId; -} - async function makeTransportInternal( - rtcSession: MatrixRTCSession, + client: MatrixClient, + roomId: string, ): Promise { logger.log("Searching for a preferred transport"); - const livekitAlias = getLivekitAlias(rtcSession); + //TODO refactor this to use the jwt service returned alias. + const livekitAlias = roomId; // TODO-MULTI-SFU: Either remove this dev tool or make it more official const urlFromStorage = @@ -52,7 +50,7 @@ async function makeTransportInternal( } // Prioritize the .well-known/matrix/client, if available, over the configured SFU - const domain = rtcSession.room.client.getDomain(); + const domain = client.getDomain(); if (domain) { // we use AutoDiscovery instead of relying on the MatrixClient having already // been fully configured and started @@ -85,12 +83,13 @@ async function makeTransportInternal( } export async function makeTransport( - rtcSession: MatrixRTCSession, + client: MatrixClient, + roomId: string, ): Promise { - const transport = await makeTransportInternal(rtcSession); + const transport = await makeTransportInternal(client, roomId); // this will call the jwt/sfu/get endpoint to pre create the livekit room. await getSFUConfigWithOpenID( - rtcSession.room.client, + client, transport.livekit_service_url, transport.livekit_alias, ); diff --git a/src/state/Async.ts b/src/state/Async.ts index 61871f78..f2e0376b 100644 --- a/src/state/Async.ts +++ b/src/state/Async.ts @@ -6,6 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { catchError, from, map, type Observable, of, startWith } from "rxjs"; +import { Behavior } from "./Behavior"; /** * Data that may need to be loaded asynchronously. @@ -51,3 +52,9 @@ export function mapAsync( ): Async { return async.state === "ready" ? ready(project(async.value)) : async; } + +export function unwrapAsync(fallback: A): (async: Async) => A { + return (async: Async) => { + return async.state === "ready" ? async.value : fallback; + }; +} diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index c8f68cbb..31aa2533 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -139,6 +139,7 @@ import { ObservableScope } from "./ObservableScope.ts"; import { memberDisplaynames$ } from "./remoteMembers/displayname.ts"; import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts"; import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts"; +import { ownMembership$ } from "./ownMember/OwnMembership.ts"; //TODO // Larger rename @@ -237,6 +238,15 @@ export class CallViewModel { this.matrixRoom, ); + private ownMembership = ownMembership$({ + scope: this.scope, + muteStates: this.muteStates, + multiSfu: this.multiSfu, + mediaDevices: this.mediaDevices, + trackProcessorState$: this.trackProcessorState$, + e2eeLivekitOptions: this.e2eeLivekitOptions, + }); + /** * If there is a configuration error with the call (e.g. misconfigured E2EE). * This is a fatal error that prevents the call from being created/joined. @@ -358,45 +368,6 @@ export class CallViewModel { // ); private readonly userId = this.matrixRoom.client.getUserId()!; - private readonly deviceId = this.matrixRoom.client.getDeviceId()!; - - /** - * Whether we are connected to the MatrixRTC session. - */ - // DISCUSSION own membership manager - private readonly matrixConnected$ = this.scope.behavior( - // To consider ourselves connected to MatrixRTC, we check the following: - and$( - // The client is connected to the sync loop - ( - fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable< - [SyncState] - > - ).pipe( - startWith([this.matrixRoom.client.getSyncState()]), - map(([state]) => state === SyncState.Syncing), - ), - // Room state observed by session says we're connected - fromEvent( - this.matrixRTCSession, - MembershipManagerEvent.StatusChanged, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.membershipStatus === Status.Connected), - ), - // Also watch out for warnings that we've likely hit a timeout and our - // delayed leave event is being sent (this condition is here because it - // provides an earlier warning than the sync loop timeout, and we wouldn't - // see the actual leave event until we reconnect to the sync loop) - fromEvent( - this.matrixRTCSession, - MembershipManagerEvent.ProbablyLeft, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.probablyLeft !== true), - ), - ), - ); /** * Whether various media/event sources should pretend to be disconnected from diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts index 56d40b3e..52a09033 100644 --- a/src/state/ownMember/OwnMembership.ts +++ b/src/state/ownMember/OwnMembership.ts @@ -1,31 +1,155 @@ /* Copyright 2025 New Vector Ltd. -SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { LiveKitReactNativeInfo } from "livekit-client"; -import { Behavior, constant } from "../Behavior"; -import { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { ConnectionManager } from "../remoteMembers/ConnectionManager"; +import { type E2EEOptions } from "livekit-client"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { + type LivekitTransport, + type MatrixRTCSession, + MembershipManagerEvent, + Status, +} from "matrix-js-sdk/lib/matrixrtc"; +import { + ClientEvent, + type MatrixClient, + SyncState, + type Room as MatrixRoom, +} from "matrix-js-sdk"; +import { fromEvent, map, type Observable, scan, startWith } from "rxjs"; +import { multiSfu } from "../../settings/settings"; +import { type Behavior } from "../Behavior"; +import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; +import { makeTransport } from "../../rtcSessionHelpers"; +import { type ObservableScope } from "../ObservableScope"; +import { async$, unwrapAsync } from "../Async"; +import { Publisher } from "./Publisher"; +import { type MuteStates } from "../MuteStates"; +import { type ProcessorState } from "../../livekit/TrackProcessorContext"; +import { type MediaDevices } from "../../state/MediaDevices"; +import { and$ } from "../../utils/observable"; -const ownMembership$ = ( - multiSfu: boolean, - preferStickyEvents: boolean, - connectionManager: ConnectionManager, - transport: LivekitTransport, -): { - connected: Behavior; - transport: Behavior; +interface Props { + scope: ObservableScope; + mediaDevices: MediaDevices; + muteStates: MuteStates; + connectionManager: ConnectionManager; + matrixRTCSession: MatrixRTCSession; + matrixRoom: MatrixRoom; + client: MatrixClient; + preferStickyEvents: boolean; + roomId: string; + e2eeLivekitOptions: E2EEOptions | undefined; + trackerProcessorState$: Behavior; +} + +/** + * This class is responsible for managing the own membership in a room. + * We want + * - a publisher + * - + * @param param0 + * @returns + * - publisher: The handle to create tracks and publish them to the room. + * - connected$: the current connection state. Including matrix server and livekit server connection. (only the livekit server relevant for our own participation) + * - transport$: the transport object the ownMembership$ ended up using. + * + */ +export const ownMembership$ = ({ + scope, + muteStates, + mediaDevices, + preferStickyEvents, + connectionManager, + matrixRTCSession, + matrixRoom, + e2eeLivekitOptions, + client, + roomId, + trackerProcessorState$, +}: Props): { + connected$: Behavior; + transport$: Behavior; + publisher: Publisher; } => { - const userId = this.matrixRoom.client.getUserId()!; - const deviceId = this.matrixRoom.client.getDeviceId()!; + const userId = client.getUserId()!; + const deviceId = client.getDeviceId()!; + const multiSfu$ = multiSfu.value$; + /** + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). + */ + const preferredTransport$ = scope.behavior( + async$(makeTransport(client, roomId)).pipe( + map(unwrapAsync(null)), + ), + ); const connection = connectionManager.registerTransports( - constant([transport]), + scope.behavior(preferredTransport$.pipe(map((t) => (t ? [t] : [])))), + )[0]; + if (!connection) { + logger.warn( + "No connection found when passing transport to connectionManager. transport:", + preferredTransport$.value, + ); + } + + /** + * Whether we are connected to the MatrixRTC session. + */ + // DISCUSSION own membership manager + const matrixConnected$ = scope.behavior( + // To consider ourselves connected to MatrixRTC, we check the following: + and$( + // The client is connected to the sync loop + ( + fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< + [SyncState] + > + ).pipe( + startWith([matrixRoom.client.getSyncState()]), + map(([state]) => state === SyncState.Syncing), + ), + // Room state observed by session says we're connected + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + startWith(null), + map(() => matrixRTCSession.membershipStatus === Status.Connected), + ), + // Also watch out for warnings that we've likely hit a timeout and our + // delayed leave event is being sent (this condition is here because it + // provides an earlier warning than the sync loop timeout, and we wouldn't + // see the actual leave event until we reconnect to the sync loop) + fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( + startWith(null), + map(() => matrixRTCSession.probablyLeft !== true), + ), + ), + ); + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + const connected$ = scope.behavior( + and$( + matrixConnected$, + connection.state$.pipe( + map((state) => state.state === "ConnectedToLkRoom"), + ), + ), + ); + + const publisher = new Publisher( + scope, + connection, + mediaDevices, + muteStates, + e2eeLivekitOptions, + trackerProcessorState$, ); - const publisher = new Publisher(connection); // HOW IT WAS PREVIEOUSLY CREATED // new PublishConnection( @@ -41,21 +165,13 @@ const ownMembership$ = ( // this.e2eeLivekitOptions(), // this.scope.behavior(this.trackProcessorState$), // ), - /** - * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). - */ - // DISCUSS move to ownMembership - private readonly preferredTransport$ = this.scope.behavior( - async$(makeTransport(this.matrixRTCSession)), - ); /** * The transport over which we should be actively publishing our media. * null when not joined. */ // DISCUSSION ownMembershipManager - private readonly localTransport$: Behavior | null> = + const localTransport$: Behavior | null> = this.scope.behavior( this.transports$.pipe( map((transports) => transports?.local ?? null), @@ -68,7 +184,7 @@ const ownMembership$ = ( * it is a multi-SFU transport and whether we should use sticky events). */ // DISCUSSION ownMembershipManager - private readonly advertisedTransport$: Behavior<{ + const advertisedTransport$: Behavior<{ multiSfu: boolean; preferStickyEvents: boolean; transport: LivekitTransport; @@ -97,27 +213,13 @@ const ownMembership$ = ( ); // MATRIX RELATED - // - /** - * Whether we are "fully" connected to the call. Accounts for both the - * connection to the MatrixRTC session and the LiveKit publish connection. - */ - // DISCUSSION own membership manager - private readonly connected$ = this.scope.behavior( - and$( - this.matrixConnected$, - this.livekitConnectionState$.pipe( - map((state) => state === ConnectionState.Connected), - ), - ), - ); /** * Whether we should tell the user that we're reconnecting to the call. */ // DISCUSSION own membership manager - public readonly reconnecting$ = this.scope.behavior( - this.connected$.pipe( + const reconnecting$ = scope.behavior( + connected$.pipe( // We are reconnecting if we previously had some successful initial // connection but are now disconnected scan( @@ -130,5 +232,5 @@ const ownMembership$ = ( map(({ reconnecting }) => reconnecting), ), ); - return { connected: true, transport$ }; + return { connected$, transport$: preferredTransport$, publisher }; };