From d22d7460fe3e1c9de246d127cbb978fccf903308 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 25 Nov 2025 20:18:34 +0100 Subject: [PATCH] Another larger refactor to fix sfu switches and in general proper cleanup. --- locales/en/app.json | 3 + src/room/InCallView.tsx | 2 + src/state/CallViewModel/CallViewModel.ts | 22 +- .../localMember/LocalMembership.test.ts | 77 ++- .../localMember/LocalMembership.ts | 476 ++++++++++-------- .../localMember/LocalTransport.test.ts | 36 ++ .../CallViewModel/localMember/Publisher.ts | 70 +-- .../remoteMembers/Connection.test.ts | 6 +- .../CallViewModel/remoteMembers/Connection.ts | 47 +- .../remoteMembers/ConnectionManager.ts | 3 +- src/state/ObservableScope.ts | 42 +- src/utils/errors.ts | 26 + 12 files changed, 482 insertions(+), 328 deletions(-) diff --git a/locales/en/app.json b/locales/en/app.json index 9e8fbbd3..32d10663 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -108,11 +108,14 @@ "connection_lost_description": "You were disconnected from the call.", "e2ee_unsupported": "Incompatible browser", "e2ee_unsupported_description": "Your web browser does not support encrypted calls. Supported browsers include Chrome, Safari, and Firefox 117+.", + "failed_to_start_livekit": "Failed to start Livekit", "generic": "Something went wrong", "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "membership_manager": "Membership Manager Error", + "membership_manager_description": "The Membership Manager had to shut down. This is caused by many consequtive failed network requests.", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index b17d3aae..6ae004d8 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -146,6 +146,8 @@ export const ActiveCall: FC = (props) => { reactionsReader.reactions$, scope.behavior(trackProcessorState$), ); + // TODO move this somewhere else once we use the callViewModel in the lobby as well! + vm.join(); setVm(vm); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 506eca1b..082da477 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -102,7 +102,6 @@ import { createLocalMembership$, enterRTCSession, LivekitState, - type LocalMemberConnectionState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -202,7 +201,7 @@ export interface CallViewModel { hangup: () => void; // joining - join: () => LocalMemberConnectionState; + join: () => void; // screen sharing /** @@ -572,15 +571,6 @@ export function createCallViewModel$( ), ); - // CODESMELL? - // This is functionally the same Observable as leave$, except here it's - // hoisted to the top of the class. This enables the cyclic dependency between - // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> - // localConnection$ -> transports$ -> joined$ -> leave$. - const leaveHoisted$ = new Subject< - "user" | "timeout" | "decline" | "allOthersLeft" - >(); - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. @@ -840,10 +830,7 @@ export function createCallViewModel$( merge( autoLeave$, merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe( - scope.share, - tap((reason) => leaveHoisted$.next(reason)), - ); + ).pipe(scope.share); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1448,16 +1435,13 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; - const join = localMembership.requestConnect; - // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? - join(); return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: join, + join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 9459d419..a3bfe158 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -12,12 +12,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import { map } from "rxjs"; +import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; +import { type LocalParticipant } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { + flushPromises, mockConfig, + mockLivekitRoom, mockMuteStates, withTestScheduler, } from "../../../utils/test"; @@ -27,14 +30,19 @@ import { LivekitState, } from "./LocalMembership"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; -import { Epoch } from "../../ObservableScope"; +import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; -import { type Publisher } from "./Publisher"; +import { type Connection } from "../remoteMembers/Connection"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); vi.mock("../../../UrlParams", () => ({ getUrlParams })); +vi.mock("@livekit/components-core", () => ({ + observeParticipantEvents: vi + .fn() + .mockReturnValue(of({ isScreenShareEnabled: false })), +})); describe("LocalMembership", () => { describe("enterRTCSession", () => { @@ -183,7 +191,7 @@ describe("LocalMembership", () => { processor: undefined, }), logger: logger, - createPublisherFactory: (): Publisher => ({}) as unknown as Publisher, + createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, homeserverConnected$: constant(true), }; @@ -216,7 +224,7 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.Uninitialized }, + n: { state: LivekitState.Connecting }, e: { state: LivekitState.Error, error: expect.toSatisfy( @@ -226,4 +234,63 @@ describe("LocalMembership", () => { }); }); }); + + it("recreates publisher if new connection is used", async () => { + const scope = new ObservableScope(); + const aTransport = { + livekit_service_url: "a", + } as LivekitTransport; + const bTransport = { + livekit_service_url: "b", + } as LivekitTransport; + + const localTransport$ = new BehaviorSubject(aTransport); + + const connectionManagerData = new ConnectionManagerData(); + + connectionManagerData.add( + { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: aTransport, + } as unknown as Connection, + [], + ); + connectionManagerData.add( + { + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: bTransport, + } as unknown as Connection, + [], + ); + + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + localTransport$.next(bTransport); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledTimes(2); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); + }); }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 36952c5a..cfc715e0 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -30,50 +30,68 @@ import { tap, } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { type Behavior } from "../../Behavior"; +import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; import { and$ } from "../../../utils/observable"; -import { ElementCallError, UnknownCallError } from "../../../utils/errors"; +import { + ElementCallError, + MembershipManagerError, + UnknownCallError, +} from "../../../utils/errors"; import { ElementWidgetActions, widget } from "../../../widget"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; export enum LivekitState { - Uninitialized = "uninitialized", - Connecting = "connecting", - Connected = "connected", Error = "error", + /** Not even a transport is available to the LocalMembership */ + WaitingForTransport = "waiting_for_transport", + /** A transport is and we are loading the connection based on the transport */ + Connecting = "connecting", + InitialisingPublisher = "uninitialized", + Initialized = "Initialized", + CreatingTracks = "creating_tracks", + ReadyToPublish = "ready_to_publish", + WaitingToPublish = "publishing", + Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", } type LocalMemberLivekitState = | { state: LivekitState.Error; error: ElementCallError } - | { state: LivekitState.Connected } + | { state: LivekitState.WaitingForTransport } | { state: LivekitState.Connecting } - | { state: LivekitState.Uninitialized } + | { state: LivekitState.InitialisingPublisher } + | { state: LivekitState.Initialized } + | { state: LivekitState.CreatingTracks } + | { state: LivekitState.ReadyToPublish } + | { state: LivekitState.WaitingToPublish } + | { state: LivekitState.Connected } | { state: LivekitState.Disconnected } | { state: LivekitState.Disconnecting }; export enum MatrixState { + WaitingForTransport = "waiting_for_transport", + Ready = "ready", + Connecting = "connecting", Connected = "connected", Disconnected = "disconnected", - Connecting = "connecting", Error = "Error", } type LocalMemberMatrixState = | { state: MatrixState.Connected } + | { state: MatrixState.WaitingForTransport } + | { state: MatrixState.Ready } | { state: MatrixState.Connecting } | { state: MatrixState.Disconnected } | { state: MatrixState.Error; error: Error }; @@ -102,7 +120,7 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (trasnport: LivekitTransport) => Promise; + joinMatrixRTC: (transport: LivekitTransport) => Promise; homeserverConnected$: Behavior; localTransport$: Behavior; matrixRTCSession: Pick< @@ -136,9 +154,9 @@ export const createLocalMembership$ = ({ muteStates, matrixRTCSession, }: Props): { - requestConnect: () => LocalMemberConnectionState; + requestConnect: () => void; startTracks: () => Behavior; - requestDisconnect: () => Observable | null; + requestDisconnect: () => void; connectionState: LocalMemberConnectionState; sharingScreen$: Behavior; /** @@ -157,27 +175,8 @@ export const createLocalMembership$ = ({ } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); - const state = { - livekit$: new BehaviorSubject({ - state: LivekitState.Uninitialized, - }), - matrix$: new BehaviorSubject({ - state: MatrixState.Disconnected, - }), - }; - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const trackStartRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - const tracks$ = new BehaviorSubject([]); - - // unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. + // Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. const localTransport$ = scope.behavior( localTransportCanThrow$.pipe( catchError((e: unknown) => { @@ -191,7 +190,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - state.livekit$.next({ state: LivekitState.Error, error }); + setLivekitError(error); return of(null); }), ), @@ -203,12 +202,12 @@ export const createLocalMembership$ = ({ connectionManager.connectionManagerData$, localTransport$, ]).pipe( - map(([connectionData, localTransport]) => { + map(([{ value: connectionData }, localTransport]) => { if (localTransport === null) { return null; } - return connectionData.value.getConnectionForTransport(localTransport); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -236,34 +235,6 @@ export const createLocalMembership$ = ({ ), ); - const publisher$ = new BehaviorSubject(null); - localConnection$.pipe(scope.bind()).subscribe((connection) => { - if (connection !== null && publisher$.value === null) { - // TODO looks strange to not change publisher if connection changes. - // @toger5 will take care of this! - publisher$.next(createPublisherFactory(connection)); - } - }); - - // const mutestate= publisher$.pipe(switchMap((publisher) => { - // return publisher.muteState$ - // }); - - combineLatest([publisher$, trackStartRequested$]).subscribe( - ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { - publisher - .createAndSetupTracks() - .then((tracks) => { - tracks$.next(tracks); - }) - .catch((error) => { - logger.error("Error creating tracks:", error); - }); - } - }, - ); - // MATRIX RELATED // /** @@ -286,90 +257,230 @@ export const createLocalMembership$ = ({ ), ); + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const trackStartRequested$ = new BehaviorSubject(false); + + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const connectRequested$ = new BehaviorSubject(false); + + /** + * The publisher is stored in here an abstracts creating and publishing tracks. + */ + const publisher$ = new BehaviorSubject(null); + /** + * Extract the tracks from the published. Also reacts to changing publishers. + */ + const tracks$ = scope.behavior( + publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))), + ); + const publishing$ = scope.behavior( + publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))), + ); + const startTracks = (): Behavior => { trackStartRequested$.next(true); return tracks$; }; - combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => { - if ( - tracks.length === 0 || - // change this to !== Publishing - state.livekit$.value.state !== LivekitState.Uninitialized - ) { - return; + const requestConnect = (): void => { + trackStartRequested$.next(true); + connectRequested$.next(true); + }; + + const requestDisconnect = (): void => { + connectRequested$.next(false); + }; + + // Take care of the publisher$ + // create a new one as soon as a local Connection is available + // + // Recreate a new one once the local connection changes + // - stop publishing + // - destruct all current streams + // - overwrite current publisher + scope.reconcile(localConnection$, async (connection) => { + if (connection !== null) { + publisher$.next(createPublisherFactory(connection)); } - state.livekit$.next({ state: LivekitState.Connecting }); - publisher - ?.startPublishing() - .then(() => { - state.livekit$.next({ state: LivekitState.Connected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); + return Promise.resolve(async (): Promise => { + await publisher$?.value?.stopPublishing(); + publisher$?.value?.stopTracks(); + }); }); - combineLatest([localTransport$, connectRequested$]).subscribe( - // TODO reconnect when transport changes => create test. - ([transport, connectRequested]) => { - if ( - transport === null || - !connectRequested || - state.matrix$.value.state !== MatrixState.Disconnected - ) { - logger.info( - "Not yet connecting because: ", - "transport === null:", - transport === null, - "!connectRequested:", - !connectRequested, - "state.matrix$.value.state !== MatrixState.Disconnected:", - state.matrix$.value.state !== MatrixState.Disconnected, - ); - return; - } - state.matrix$.next({ state: MatrixState.Connecting }); - logger.info("Matrix State connecting"); + // const mutestate= publisher$.pipe(switchMap((publisher) => { + // return publisher.muteState$ + // }); - joinMatrixRTC(transport).catch((error) => { - logger.error(error); - state.matrix$.next({ state: MatrixState.Error, error }); - }); + // For each publisher create the descired tracks + // If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks + // THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher + // track start request can than just toggle the tracks. + // TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks? + combineLatest([publisher$, trackStartRequested$]).subscribe( + ([publisher, shouldStartTracks]) => { + if (publisher && shouldStartTracks) { + publisher.createAndSetupTracks().catch( + // TODO make this set some error state + (e) => logger.error(e), + ); + } else if (publisher) { + publisher.stopTracks(); + } }, ); - // TODO add this and update `state.matrix$` based on it. - // useTypedEventEmitter( - // rtcSession, - // MatrixRTCSessionEvent.MembershipManagerError, - // (error) => setExternalError(new ConnectionLostError()), - // ); + // Use reconcile here to not run concurrent createAndSetupTracks calls + // `tracks$` will update once they are ready. + scope.reconcile( + scope.behavior(combineLatest([publisher$, trackStartRequested$])), + async ([publisher, shouldStartTracks]) => { + if (publisher && shouldStartTracks) { + await publisher.createAndSetupTracks().catch((e) => logger.error(e)); + } else if (publisher) { + publisher.stopTracks(); + } + }, + ); - const requestConnect = (): LocalMemberConnectionState => { - trackStartRequested$.next(true); - connectRequested$.next(true); + // Based on `connectRequested$` we start publishing tracks. (once they are there!) + scope.reconcile( + scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), + async ([publisher, tracks, shouldConnect]) => { + if (shouldConnect === publisher?.publishing$.value) + return Promise.resolve(); + if (tracks.length !== 0 && shouldConnect) { + try { + await publisher?.startPublishing(); + } catch (error) { + // will take care of "FailedToStartLk" errors. + setLivekitError(error as ElementCallError); + } + } else if (tracks.length !== 0 && !shouldConnect) { + try { + await publisher?.stopPublishing(); + } catch (error) { + setLivekitError(new UnknownCallError(error as Error)); + } + } + }, + ); - return state; + const fatalLivekitError$ = new BehaviorSubject(null); + const setLivekitError = (e: ElementCallError): void => { + if (fatalLivekitError$.value !== null) + logger.error("Multiple Livkit Errors:", e); + else fatalLivekitError$.next(e); }; + const livekitState$: Observable = combineLatest([ + publisher$, + localTransport$, + localConnection$, + tracks$, + publishing$, + connectRequested$, + trackStartRequested$, + fatalLivekitError$, + ]).pipe( + map( + ([ + publisher, + localTransport, + localConnection, + tracks, + publishing, + shouldConnect, + shouldStartTracks, + error, + ]) => { + // read this: + // if(!) return {state: ...} + // if(!) return {state: } + // + // as: + // We do have but not yet so we are in + if (error !== null) return { state: LivekitState.Error, error }; + const hasTracks = tracks.length > 0; + if (!localTransport) return { state: LivekitState.WaitingForTransport }; + if (!localConnection) return { state: LivekitState.Connecting }; + if (!publisher) return { state: LivekitState.InitialisingPublisher }; + if (!shouldStartTracks) return { state: LivekitState.Initialized }; + if (!hasTracks) return { state: LivekitState.CreatingTracks }; + if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; + if (!publishing) return { state: LivekitState.WaitingToPublish }; + return { state: LivekitState.Connected }; + }, + ), + distinctUntilChanged(deepCompare), + ); - const requestDisconnect = (): Behavior | null => { - if (state.livekit$.value.state !== LivekitState.Connected) return null; - state.livekit$.next({ state: LivekitState.Disconnecting }); - combineLatest([publisher$, tracks$], (publisher, tracks) => { - publisher - ?.stopPublishing() - .then(() => { - tracks.forEach((track) => track.stop()); - state.livekit$.next({ state: LivekitState.Disconnected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); - }); - - return state.livekit$; + const fatalMatrixError$ = new BehaviorSubject(null); + const setMatrixError = (e: ElementCallError): void => { + if (fatalMatrixError$.value !== null) + logger.error("Multiple Matrix Errors:", e); + else fatalMatrixError$.next(e); }; + const matrixState$: Behavior = scope.behavior( + combineLatest([ + localTransport$, + connectRequested$, + homeserverConnected$, + ]).pipe( + map(([localTransport, connectRequested, homeserverConnected]) => { + if (!localTransport) return { state: MatrixState.WaitingForTransport }; + if (!connectRequested) return { state: MatrixState.Ready }; + if (!homeserverConnected) return { state: MatrixState.Connecting }; + return { state: MatrixState.Connected }; + }), + ), + ); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + scope.reconcile( + scope.behavior(combineLatest([localTransport$, connectRequested$])), + async ([transport, shouldConnect]) => { + if (!shouldConnect) return; + + if (!transport) return; + try { + await joinMatrixRTC(transport); + } catch (error) { + logger.error("Error entering RTC session", error); + if (error instanceof Error) + setMatrixError(new MembershipManagerError(error)); + } + + // Update our member event when our mute state changes. + const callIntentScope = new ObservableScope(); + // because this uses its own scope, we can start another reconciliation for the duration of one connection. + callIntentScope.reconcile( + muteStates.video.enabled$, + async (videoEnabled) => + matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), + ); + + return async (): Promise => { + callIntentScope.end(); + try { + // Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; + }, + ); + + const participant$ = scope.behavior( + localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), + ); // Pause upstream of all local media tracks when we're disconnected from // MatrixRTC, because it can be an unpleasant surprise for the app to say @@ -377,12 +488,12 @@ export const createLocalMembership$ = ({ // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - combineLatest([localConnection$, homeserverConnected$]) + // TODO refactor this based no livekitState$ + combineLatest([participant$, homeserverConnected$]) .pipe(scope.bind()) - .subscribe(([connection, connected]) => { - if (connection?.state$.value.state !== "ConnectedToLkRoom") return; - const publications = - connection.livekitRoom.localParticipant.trackPublications.values(); + .subscribe(([participant, connected]) => { + if (!participant) return; + const publications = participant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -419,85 +530,13 @@ export const createLocalMembership$ = ({ } } }); - // TODO: Refactor updateCallIntent to sth like this: - // combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{ - // matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - // })) - // - - // TODO I do not fully understand what this does. - // Is it needed? - // Is this at the right place? - // Can this be simplified? - // Start and stop session membership as needed - // Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file) - // MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block. - // @toger5 will try to take care of this. - scope.reconcile(localTransport$, async (transport) => { - if (transport !== null && transport !== undefined) { - try { - state.matrix$.next({ state: MatrixState.Connecting }); - await joinMatrixRTC(transport); - } catch (e) { - logger.error("Error entering RTC session", e); - } - - // Update our member event when our mute state changes. - const intentScope = new ObservableScope(); - intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - intentScope.end(); - // Only sends Matrix leave event. The LiveKit session will disconnect - // as soon as either the stopConnection$ handler above gets to it or - // the view model is destroyed. - try { - await matrixRTCSession.leaveRoomSession(); - } catch (e) { - logger.error("Error leaving RTC session", e); - } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; - } - }); - - localConnection$ - .pipe( - distinctUntilChanged(), - switchMap((c) => - c === null ? of({ state: "Initialized" } as ConnectionState) : c.state$, - ), - map((s) => { - logger.trace(`Local connection state update: ${s.state}`); - if (s.state == "FailedToStart") { - return s.error instanceof ElementCallError - ? s.error - : new UnknownCallError(s.error); - } - }), - scope.bind(), - ) - .subscribe((error) => { - if (error !== undefined) - state.livekit$.next({ state: LivekitState.Error, error }); - }); /** * Whether the user is currently sharing their screen. */ const sharingScreen$ = scope.behavior( - localConnection$.pipe( - switchMap((c) => - c !== null - ? observeSharingScreen$(c.livekitRoom.localParticipant) - : of(false), - ), + participant$.pipe( + switchMap((p) => (p !== null ? observeSharingScreen$(p) : of(false))), ), ); @@ -527,24 +566,23 @@ export const createLocalMembership$ = ({ // We also allow screen sharing to be toggled even if the connection // is still initializing or publishing tracks, because there's no // technical reason to disallow this. LiveKit will publish if it can. - localConnection$.value?.livekitRoom.localParticipant - .setScreenShareEnabled(targetScreenshareState, screenshareSettings) + participant$.value + ?.setScreenShareEnabled(targetScreenshareState, screenshareSettings) .catch(logger.error); }; } - const participant$ = scope.behavior( - localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), - ); return { startTracks, requestConnect, requestDisconnect, - connectionState: state, + connectionState: { + livekit$: scope.behavior(livekitState$), + matrix$: matrixState$, + }, homeserverConnected$, connected$, reconnecting$, - sharingScreen$, toggleScreenSharing, participant$, diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index d543f97a..c1c36fa5 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { BehaviorSubject } from "rxjs"; import { mockConfig, flushPromises } from "../../../utils/test"; import { createLocalTransport$ } from "./LocalTransport"; @@ -117,4 +118,39 @@ describe("LocalTransport", () => { type: "livekit", }); }); + + it("updates local transport when oldest member changes", async () => { + // Use config so transport discovery succeeds, but delay OpenID JWT fetch + mockConfig({ + livekit: { livekit_service_url: "https://lk.example.org" }, + }); + const memberships$ = new BehaviorSubject(new Epoch([])); + const openIdResolver = Promise.withResolvers(); + + vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( + openIdResolver.promise, + ); + + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(true), + memberships$, + client: { + getDomain: () => "", + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + + openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" }); + expect(localTransport$.value).toBe(null); + await flushPromises(); + // final + expect(localTransport$.value).toStrictEqual({ + livekit_alias: "!room:example.org", + livekit_service_url: "https://lk.example.org", + type: "livekit", + }); + }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 11f35424..df6addb8 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -14,6 +14,7 @@ import { ConnectionState as LivekitConnectionState, } from "livekit-client"; import { + BehaviorSubject, map, NEVER, type Observable, @@ -33,6 +34,7 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; +import { FailToStartLivekitConnection } from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -40,7 +42,6 @@ import { type ObservableScope } from "../../ObservableScope.ts"; * The Publisher is also responsible for creating the media tracks. */ export class Publisher { - public tracks: LocalTrack[] = []; /** * Creates a new Publisher. * @param scope - The observable scope to use for managing the publisher. @@ -81,6 +82,9 @@ export class Publisher { }); } + private _tracks$ = new BehaviorSubject[]>([]); + public tracks$ = this._tracks$ as Behavior[]>; + /** * Start the connection to LiveKit and publish local tracks. * @@ -94,50 +98,36 @@ export class Publisher { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - public async createAndSetupTracks(): Promise { + public async createAndSetupTracks(): Promise { const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); - // TODO: This should be an autostarted connection no need to start here. just check the connection state. - // TODO: This will fetch the JWT token. Perhaps we could keep it preloaded - // instead? This optimization would only be safe for a publish connection, - // because we don't want to leak the user's intent to perhaps join a call to - // remote servers before they actually commit to it. - // const { promise, resolve, reject } = Promise.withResolvers(); - // const sub = this.connection.state$.subscribe((s) => { - // if (s.state === "FailedToStart") { - // reject(new Error("Disconnected from LiveKit server")); - // } else if (s.state === "ConnectedToLkRoom") { - // resolve(); - // } - // }); - // try { - // await promise; - // } catch (e) { - // throw e; - // } finally { - // sub.unsubscribe(); - // } // TODO-MULTI-SFU: Prepublish a microphone track const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; // createTracks throws if called with audio=false and video=false if (audio || video) { // TODO this can still throw errors? It will also prompt for permissions if not already granted - this.tracks = - (await lkRoom.localParticipant - .createTracks({ - audio, - video, - }) - .catch((error) => { - this.logger?.error("Failed to create tracks", error); - })) ?? []; + return lkRoom.localParticipant + .createTracks({ + audio, + video, + }) + .then((tracks) => { + this._tracks$.next(tracks); + }); } - return this.tracks; + throw Error("audio and video is false"); } + private _publishing$ = new BehaviorSubject(false); + public publishing$ = this.scope.behavior(this._publishing$); + /** + * + * @returns + * @throws ElementCallError + */ public async startPublishing(): Promise { const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); @@ -147,7 +137,7 @@ export class Publisher { resolve(); break; case "FailedToStart": - reject(new Error("Failed to connect to LiveKit server")); + reject(new FailToStartLivekitConnection()); break; default: this.logger?.info("waiting for connection: ", s.state); @@ -160,7 +150,7 @@ export class Publisher { } finally { sub.unsubscribe(); } - for (const track of this.tracks) { + for (const track of this.tracks$.value) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { @@ -169,7 +159,8 @@ export class Publisher { // TODO: check if the connection is still active? and break the loop if not? } - return this.tracks; + this._publishing$.next(true); + return this.tracks$.value; } public async stopPublishing(): Promise { @@ -185,6 +176,15 @@ export class Publisher { }; localParticipant.trackPublications.forEach(addToTracksIfDefined); await localParticipant.unpublishTracks(tracks); + this._publishing$.next(false); + } + + /** + * Stops all tracks that are currently running + */ + public stopTracks(): void { + this.tracks$.value.forEach((t) => t.stop()); + this._tracks$.next([]); } /// Private methods diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 3f58bcf6..2ead768b 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -193,7 +193,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState!.state === "FailedToStart") { expect(capturedState!.error.message).toEqual("Something went wrong"); - expect(capturedState!.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -249,7 +249,7 @@ describe("Start connection states", () => { expect(capturedState?.error.message).toContain( "SFU Config fetch failed with exception Error", ); - expect(capturedState?.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -313,7 +313,7 @@ describe("Start connection states", () => { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); - expect(capturedState.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c17fae2b..81bc9f29 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -50,16 +50,14 @@ export interface ConnectionOpts { export type ConnectionState = | { state: "Initialized" } - | { state: "FetchingConfig"; transport: LivekitTransport } - | { state: "ConnectingToLkRoom"; transport: LivekitTransport } - | { state: "PublishingTracks"; transport: LivekitTransport } - | { state: "FailedToStart"; error: Error; transport: LivekitTransport } + | { state: "FetchingConfig" } + | { state: "ConnectingToLkRoom" } | { state: "ConnectedToLkRoom"; livekitConnectionState$: Observable; - transport: LivekitTransport; } - | { state: "Stopped"; transport: LivekitTransport }; + | { state: "FailedToStart"; error: Error } + | { state: "Stopped" }; /** * A connection to a Matrix RTC LiveKit backend. @@ -77,6 +75,22 @@ export class Connection { */ public readonly state$: Behavior = this._state$; + /** + * The media transport to connect to. + */ + public readonly transport: LivekitTransport; + + public readonly livekitRoom: LivekitRoom; + + /** + * An observable of the participants that are publishing on this connection. (Excluding our local participant) + * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. + * It filters the participants to only those that are associated with a membership that claims to publish on this connection. + */ + public readonly remoteParticipantsWithTracks$: Behavior< + PublishingParticipant[] + >; + /** * Whether the connection has been stopped. * @see Connection.stop @@ -104,7 +118,6 @@ export class Connection { try { this._state$.next({ state: "FetchingConfig", - transport: this.transport, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect @@ -112,7 +125,6 @@ export class Connection { this._state$.next({ state: "ConnectingToLkRoom", - transport: this.transport, }); try { await this.livekitRoom.connect(url, jwt); @@ -143,7 +155,6 @@ export class Connection { this._state$.next({ state: "ConnectedToLkRoom", - transport: this.transport, livekitConnectionState$: connectionStateObserver(this.livekitRoom), }); } catch (error) { @@ -151,7 +162,6 @@ export class Connection { this._state$.next({ state: "FailedToStart", error: error instanceof Error ? error : new Error(`${error}`), - transport: this.transport, }); throw error; } @@ -179,28 +189,11 @@ export class Connection { await this.livekitRoom.disconnect(); this._state$.next({ state: "Stopped", - transport: this.transport, }); this.stopped = true; } - /** - * An observable of the participants that are publishing on this connection. (Excluding our local participant) - * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. - * It filters the participants to only those that are associated with a membership that claims to publish on this connection. - */ - public readonly remoteParticipantsWithTracks$: Behavior< - PublishingParticipant[] - >; - - /** - * The media transport to connect to. - */ - public readonly transport: LivekitTransport; - private readonly client: OpenIDClientParts; - public readonly livekitRoom: LivekitRoom; - private readonly logger: Logger; /** diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index d9a0380e..0b9f939c 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -92,7 +92,6 @@ interface Props { } // TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { - transports$: Behavior>; connectionManagerData$: Behavior>; } /** @@ -216,7 +215,7 @@ export function createConnectionManager$({ new Epoch(new ConnectionManagerData()), ); - return { transports$, connectionManagerData$ }; + return { connectionManagerData$ }; } function removeDuplicateTransports( diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 27f501c7..6d867414 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -123,8 +123,22 @@ export class ObservableScope { callback: (value: T) => Promise<(() => Promise) | void>, ): void { let latestValue: T | typeof nothing = nothing; - let reconciledValue: T | typeof nothing = nothing; + let reconcilePromise: Promise | undefined = undefined; let cleanUp: (() => Promise) | void = undefined; + + // While this loop runs it will process the latest from `value$` until it caught up with the updates. + // It might skip updates from `value$` and only process the newest value after callback has resolved. + const reconcileLoop = async (): Promise => { + let prevVal: T | typeof nothing = nothing; + while (latestValue !== prevVal) { + await cleanUp?.(); // Call the previous value's clean-up handler + prevVal = latestValue; + + if (latestValue !== nothing) cleanUp = await callback(latestValue); // Sync current value... + // `latestValue` might have gotten updated during the `await callback`. That is why we loop here + } + }; + value$ .pipe( catchError(() => EMPTY), // Ignore errors @@ -132,23 +146,15 @@ export class ObservableScope { endWith(nothing), // Clean up when the scope ends ) .subscribe((value) => { - void (async (): Promise => { - if (latestValue === nothing) { - latestValue = value; - while (latestValue !== reconciledValue) { - await cleanUp?.(); // Call the previous value's clean-up handler - reconciledValue = latestValue; - if (latestValue !== nothing) - cleanUp = await callback(latestValue); // Sync current value - } - // Reset to signal that reconciliation is done for now - latestValue = nothing; - } else { - // There's already an instance of the above 'while' loop running - // concurrently. Just update the latest value and let it be handled. - latestValue = value; - } - })(); + // Always track the latest value! The `reconcileLoop` will run until it "processed" the "last" `latestValue`. + latestValue = value; + // There's already an instance of the below 'reconcileLoop' loop running + // concurrently. So lets let the loop handle it. NEVER instanciate two `reconcileLoop`s. + if (reconcilePromise) return; + + reconcilePromise = reconcileLoop().finally(() => { + reconcilePromise = undefined; + }); }); } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index b77c0ff0..cdd0e75c 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -13,6 +13,8 @@ export enum ErrorCode { */ MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", + INTERNAL_MEMBERSHIP_MANAGER = "INTERNAL_MEMBERSHIP_MANAGER", + FAILED_TO_START_LIVEKIT = "FAILED_TO_START_LIVEKIT", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", E2EE_NOT_SUPPORTED = "E2EE_NOT_SUPPORTED", @@ -27,6 +29,7 @@ export enum ErrorCategory { NETWORK_CONNECTIVITY = "NETWORK_CONNECTIVITY", CLIENT_CONFIGURATION = "CLIENT_CONFIGURATION", UNKNOWN = "UNKNOWN", + SYSTEM_FAILURE = "SYSTEM_FAILURE", // SYSTEM_FAILURE / FEDERATION_FAILURE .. } @@ -83,6 +86,18 @@ export class ConnectionLostError extends ElementCallError { } } +export class MembershipManagerError extends ElementCallError { + public constructor(error: Error) { + super( + t("error.membership_manager"), + ErrorCode.INTERNAL_MEMBERSHIP_MANAGER, + ErrorCategory.SYSTEM_FAILURE, + t("error.membership_manager_description"), + error, + ); + } +} + export class E2EENotSupportedError extends ElementCallError { public constructor() { super( @@ -120,6 +135,17 @@ export class FailToGetOpenIdToken extends ElementCallError { } } +export class FailToStartLivekitConnection extends ElementCallError { + public constructor() { + super( + t("error.failed_to_start_livekit"), + ErrorCode.FAILED_TO_START_LIVEKIT, + ErrorCategory.NETWORK_CONNECTIVITY, + undefined, + ); + } +} + export class InsufficientCapacityError extends ElementCallError { public constructor() { super(