From d22d7460fe3e1c9de246d127cbb978fccf903308 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 25 Nov 2025 20:18:34 +0100 Subject: [PATCH 01/11] Another larger refactor to fix sfu switches and in general proper cleanup. --- locales/en/app.json | 3 + src/room/InCallView.tsx | 2 + src/state/CallViewModel/CallViewModel.ts | 22 +- .../localMember/LocalMembership.test.ts | 77 ++- .../localMember/LocalMembership.ts | 476 ++++++++++-------- .../localMember/LocalTransport.test.ts | 36 ++ .../CallViewModel/localMember/Publisher.ts | 70 +-- .../remoteMembers/Connection.test.ts | 6 +- .../CallViewModel/remoteMembers/Connection.ts | 47 +- .../remoteMembers/ConnectionManager.ts | 3 +- src/state/ObservableScope.ts | 42 +- src/utils/errors.ts | 26 + 12 files changed, 482 insertions(+), 328 deletions(-) diff --git a/locales/en/app.json b/locales/en/app.json index 9e8fbbd3..32d10663 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -108,11 +108,14 @@ "connection_lost_description": "You were disconnected from the call.", "e2ee_unsupported": "Incompatible browser", "e2ee_unsupported_description": "Your web browser does not support encrypted calls. Supported browsers include Chrome, Safari, and Firefox 117+.", + "failed_to_start_livekit": "Failed to start Livekit", "generic": "Something went wrong", "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "membership_manager": "Membership Manager Error", + "membership_manager_description": "The Membership Manager had to shut down. This is caused by many consequtive failed network requests.", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index b17d3aae..6ae004d8 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -146,6 +146,8 @@ export const ActiveCall: FC = (props) => { reactionsReader.reactions$, scope.behavior(trackProcessorState$), ); + // TODO move this somewhere else once we use the callViewModel in the lobby as well! + vm.join(); setVm(vm); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 506eca1b..082da477 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -102,7 +102,6 @@ import { createLocalMembership$, enterRTCSession, LivekitState, - type LocalMemberConnectionState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -202,7 +201,7 @@ export interface CallViewModel { hangup: () => void; // joining - join: () => LocalMemberConnectionState; + join: () => void; // screen sharing /** @@ -572,15 +571,6 @@ export function createCallViewModel$( ), ); - // CODESMELL? - // This is functionally the same Observable as leave$, except here it's - // hoisted to the top of the class. This enables the cyclic dependency between - // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> - // localConnection$ -> transports$ -> joined$ -> leave$. - const leaveHoisted$ = new Subject< - "user" | "timeout" | "decline" | "allOthersLeft" - >(); - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. @@ -840,10 +830,7 @@ export function createCallViewModel$( merge( autoLeave$, merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe( - scope.share, - tap((reason) => leaveHoisted$.next(reason)), - ); + ).pipe(scope.share); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1448,16 +1435,13 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; - const join = localMembership.requestConnect; - // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? - join(); return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: join, + join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 9459d419..a3bfe158 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -12,12 +12,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import { map } from "rxjs"; +import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; +import { type LocalParticipant } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { + flushPromises, mockConfig, + mockLivekitRoom, mockMuteStates, withTestScheduler, } from "../../../utils/test"; @@ -27,14 +30,19 @@ import { LivekitState, } from "./LocalMembership"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; -import { Epoch } from "../../ObservableScope"; +import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; -import { type Publisher } from "./Publisher"; +import { type Connection } from "../remoteMembers/Connection"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); vi.mock("../../../UrlParams", () => ({ getUrlParams })); +vi.mock("@livekit/components-core", () => ({ + observeParticipantEvents: vi + .fn() + .mockReturnValue(of({ isScreenShareEnabled: false })), +})); describe("LocalMembership", () => { describe("enterRTCSession", () => { @@ -183,7 +191,7 @@ describe("LocalMembership", () => { processor: undefined, }), logger: logger, - createPublisherFactory: (): Publisher => ({}) as unknown as Publisher, + createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, homeserverConnected$: constant(true), }; @@ -216,7 +224,7 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.Uninitialized }, + n: { state: LivekitState.Connecting }, e: { state: LivekitState.Error, error: expect.toSatisfy( @@ -226,4 +234,63 @@ describe("LocalMembership", () => { }); }); }); + + it("recreates publisher if new connection is used", async () => { + const scope = new ObservableScope(); + const aTransport = { + livekit_service_url: "a", + } as LivekitTransport; + const bTransport = { + livekit_service_url: "b", + } as LivekitTransport; + + const localTransport$ = new BehaviorSubject(aTransport); + + const connectionManagerData = new ConnectionManagerData(); + + connectionManagerData.add( + { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: aTransport, + } as unknown as Connection, + [], + ); + connectionManagerData.add( + { + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: bTransport, + } as unknown as Connection, + [], + ); + + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + localTransport$.next(bTransport); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledTimes(2); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); + }); }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 36952c5a..cfc715e0 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -30,50 +30,68 @@ import { tap, } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { type Behavior } from "../../Behavior"; +import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; import { and$ } from "../../../utils/observable"; -import { ElementCallError, UnknownCallError } from "../../../utils/errors"; +import { + ElementCallError, + MembershipManagerError, + UnknownCallError, +} from "../../../utils/errors"; import { ElementWidgetActions, widget } from "../../../widget"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; export enum LivekitState { - Uninitialized = "uninitialized", - Connecting = "connecting", - Connected = "connected", Error = "error", + /** Not even a transport is available to the LocalMembership */ + WaitingForTransport = "waiting_for_transport", + /** A transport is and we are loading the connection based on the transport */ + Connecting = "connecting", + InitialisingPublisher = "uninitialized", + Initialized = "Initialized", + CreatingTracks = "creating_tracks", + ReadyToPublish = "ready_to_publish", + WaitingToPublish = "publishing", + Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", } type LocalMemberLivekitState = | { state: LivekitState.Error; error: ElementCallError } - | { state: LivekitState.Connected } + | { state: LivekitState.WaitingForTransport } | { state: LivekitState.Connecting } - | { state: LivekitState.Uninitialized } + | { state: LivekitState.InitialisingPublisher } + | { state: LivekitState.Initialized } + | { state: LivekitState.CreatingTracks } + | { state: LivekitState.ReadyToPublish } + | { state: LivekitState.WaitingToPublish } + | { state: LivekitState.Connected } | { state: LivekitState.Disconnected } | { state: LivekitState.Disconnecting }; export enum MatrixState { + WaitingForTransport = "waiting_for_transport", + Ready = "ready", + Connecting = "connecting", Connected = "connected", Disconnected = "disconnected", - Connecting = "connecting", Error = "Error", } type LocalMemberMatrixState = | { state: MatrixState.Connected } + | { state: MatrixState.WaitingForTransport } + | { state: MatrixState.Ready } | { state: MatrixState.Connecting } | { state: MatrixState.Disconnected } | { state: MatrixState.Error; error: Error }; @@ -102,7 +120,7 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (trasnport: LivekitTransport) => Promise; + joinMatrixRTC: (transport: LivekitTransport) => Promise; homeserverConnected$: Behavior; localTransport$: Behavior; matrixRTCSession: Pick< @@ -136,9 +154,9 @@ export const createLocalMembership$ = ({ muteStates, matrixRTCSession, }: Props): { - requestConnect: () => LocalMemberConnectionState; + requestConnect: () => void; startTracks: () => Behavior; - requestDisconnect: () => Observable | null; + requestDisconnect: () => void; connectionState: LocalMemberConnectionState; sharingScreen$: Behavior; /** @@ -157,27 +175,8 @@ export const createLocalMembership$ = ({ } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); - const state = { - livekit$: new BehaviorSubject({ - state: LivekitState.Uninitialized, - }), - matrix$: new BehaviorSubject({ - state: MatrixState.Disconnected, - }), - }; - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const trackStartRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - const tracks$ = new BehaviorSubject([]); - - // unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. + // Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. const localTransport$ = scope.behavior( localTransportCanThrow$.pipe( catchError((e: unknown) => { @@ -191,7 +190,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - state.livekit$.next({ state: LivekitState.Error, error }); + setLivekitError(error); return of(null); }), ), @@ -203,12 +202,12 @@ export const createLocalMembership$ = ({ connectionManager.connectionManagerData$, localTransport$, ]).pipe( - map(([connectionData, localTransport]) => { + map(([{ value: connectionData }, localTransport]) => { if (localTransport === null) { return null; } - return connectionData.value.getConnectionForTransport(localTransport); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -236,34 +235,6 @@ export const createLocalMembership$ = ({ ), ); - const publisher$ = new BehaviorSubject(null); - localConnection$.pipe(scope.bind()).subscribe((connection) => { - if (connection !== null && publisher$.value === null) { - // TODO looks strange to not change publisher if connection changes. - // @toger5 will take care of this! - publisher$.next(createPublisherFactory(connection)); - } - }); - - // const mutestate= publisher$.pipe(switchMap((publisher) => { - // return publisher.muteState$ - // }); - - combineLatest([publisher$, trackStartRequested$]).subscribe( - ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { - publisher - .createAndSetupTracks() - .then((tracks) => { - tracks$.next(tracks); - }) - .catch((error) => { - logger.error("Error creating tracks:", error); - }); - } - }, - ); - // MATRIX RELATED // /** @@ -286,90 +257,230 @@ export const createLocalMembership$ = ({ ), ); + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const trackStartRequested$ = new BehaviorSubject(false); + + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const connectRequested$ = new BehaviorSubject(false); + + /** + * The publisher is stored in here an abstracts creating and publishing tracks. + */ + const publisher$ = new BehaviorSubject(null); + /** + * Extract the tracks from the published. Also reacts to changing publishers. + */ + const tracks$ = scope.behavior( + publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))), + ); + const publishing$ = scope.behavior( + publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))), + ); + const startTracks = (): Behavior => { trackStartRequested$.next(true); return tracks$; }; - combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => { - if ( - tracks.length === 0 || - // change this to !== Publishing - state.livekit$.value.state !== LivekitState.Uninitialized - ) { - return; + const requestConnect = (): void => { + trackStartRequested$.next(true); + connectRequested$.next(true); + }; + + const requestDisconnect = (): void => { + connectRequested$.next(false); + }; + + // Take care of the publisher$ + // create a new one as soon as a local Connection is available + // + // Recreate a new one once the local connection changes + // - stop publishing + // - destruct all current streams + // - overwrite current publisher + scope.reconcile(localConnection$, async (connection) => { + if (connection !== null) { + publisher$.next(createPublisherFactory(connection)); } - state.livekit$.next({ state: LivekitState.Connecting }); - publisher - ?.startPublishing() - .then(() => { - state.livekit$.next({ state: LivekitState.Connected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); + return Promise.resolve(async (): Promise => { + await publisher$?.value?.stopPublishing(); + publisher$?.value?.stopTracks(); + }); }); - combineLatest([localTransport$, connectRequested$]).subscribe( - // TODO reconnect when transport changes => create test. - ([transport, connectRequested]) => { - if ( - transport === null || - !connectRequested || - state.matrix$.value.state !== MatrixState.Disconnected - ) { - logger.info( - "Not yet connecting because: ", - "transport === null:", - transport === null, - "!connectRequested:", - !connectRequested, - "state.matrix$.value.state !== MatrixState.Disconnected:", - state.matrix$.value.state !== MatrixState.Disconnected, - ); - return; - } - state.matrix$.next({ state: MatrixState.Connecting }); - logger.info("Matrix State connecting"); + // const mutestate= publisher$.pipe(switchMap((publisher) => { + // return publisher.muteState$ + // }); - joinMatrixRTC(transport).catch((error) => { - logger.error(error); - state.matrix$.next({ state: MatrixState.Error, error }); - }); + // For each publisher create the descired tracks + // If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks + // THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher + // track start request can than just toggle the tracks. + // TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks? + combineLatest([publisher$, trackStartRequested$]).subscribe( + ([publisher, shouldStartTracks]) => { + if (publisher && shouldStartTracks) { + publisher.createAndSetupTracks().catch( + // TODO make this set some error state + (e) => logger.error(e), + ); + } else if (publisher) { + publisher.stopTracks(); + } }, ); - // TODO add this and update `state.matrix$` based on it. - // useTypedEventEmitter( - // rtcSession, - // MatrixRTCSessionEvent.MembershipManagerError, - // (error) => setExternalError(new ConnectionLostError()), - // ); + // Use reconcile here to not run concurrent createAndSetupTracks calls + // `tracks$` will update once they are ready. + scope.reconcile( + scope.behavior(combineLatest([publisher$, trackStartRequested$])), + async ([publisher, shouldStartTracks]) => { + if (publisher && shouldStartTracks) { + await publisher.createAndSetupTracks().catch((e) => logger.error(e)); + } else if (publisher) { + publisher.stopTracks(); + } + }, + ); - const requestConnect = (): LocalMemberConnectionState => { - trackStartRequested$.next(true); - connectRequested$.next(true); + // Based on `connectRequested$` we start publishing tracks. (once they are there!) + scope.reconcile( + scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), + async ([publisher, tracks, shouldConnect]) => { + if (shouldConnect === publisher?.publishing$.value) + return Promise.resolve(); + if (tracks.length !== 0 && shouldConnect) { + try { + await publisher?.startPublishing(); + } catch (error) { + // will take care of "FailedToStartLk" errors. + setLivekitError(error as ElementCallError); + } + } else if (tracks.length !== 0 && !shouldConnect) { + try { + await publisher?.stopPublishing(); + } catch (error) { + setLivekitError(new UnknownCallError(error as Error)); + } + } + }, + ); - return state; + const fatalLivekitError$ = new BehaviorSubject(null); + const setLivekitError = (e: ElementCallError): void => { + if (fatalLivekitError$.value !== null) + logger.error("Multiple Livkit Errors:", e); + else fatalLivekitError$.next(e); }; + const livekitState$: Observable = combineLatest([ + publisher$, + localTransport$, + localConnection$, + tracks$, + publishing$, + connectRequested$, + trackStartRequested$, + fatalLivekitError$, + ]).pipe( + map( + ([ + publisher, + localTransport, + localConnection, + tracks, + publishing, + shouldConnect, + shouldStartTracks, + error, + ]) => { + // read this: + // if(!) return {state: ...} + // if(!) return {state: } + // + // as: + // We do have but not yet so we are in + if (error !== null) return { state: LivekitState.Error, error }; + const hasTracks = tracks.length > 0; + if (!localTransport) return { state: LivekitState.WaitingForTransport }; + if (!localConnection) return { state: LivekitState.Connecting }; + if (!publisher) return { state: LivekitState.InitialisingPublisher }; + if (!shouldStartTracks) return { state: LivekitState.Initialized }; + if (!hasTracks) return { state: LivekitState.CreatingTracks }; + if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; + if (!publishing) return { state: LivekitState.WaitingToPublish }; + return { state: LivekitState.Connected }; + }, + ), + distinctUntilChanged(deepCompare), + ); - const requestDisconnect = (): Behavior | null => { - if (state.livekit$.value.state !== LivekitState.Connected) return null; - state.livekit$.next({ state: LivekitState.Disconnecting }); - combineLatest([publisher$, tracks$], (publisher, tracks) => { - publisher - ?.stopPublishing() - .then(() => { - tracks.forEach((track) => track.stop()); - state.livekit$.next({ state: LivekitState.Disconnected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); - }); - - return state.livekit$; + const fatalMatrixError$ = new BehaviorSubject(null); + const setMatrixError = (e: ElementCallError): void => { + if (fatalMatrixError$.value !== null) + logger.error("Multiple Matrix Errors:", e); + else fatalMatrixError$.next(e); }; + const matrixState$: Behavior = scope.behavior( + combineLatest([ + localTransport$, + connectRequested$, + homeserverConnected$, + ]).pipe( + map(([localTransport, connectRequested, homeserverConnected]) => { + if (!localTransport) return { state: MatrixState.WaitingForTransport }; + if (!connectRequested) return { state: MatrixState.Ready }; + if (!homeserverConnected) return { state: MatrixState.Connecting }; + return { state: MatrixState.Connected }; + }), + ), + ); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + scope.reconcile( + scope.behavior(combineLatest([localTransport$, connectRequested$])), + async ([transport, shouldConnect]) => { + if (!shouldConnect) return; + + if (!transport) return; + try { + await joinMatrixRTC(transport); + } catch (error) { + logger.error("Error entering RTC session", error); + if (error instanceof Error) + setMatrixError(new MembershipManagerError(error)); + } + + // Update our member event when our mute state changes. + const callIntentScope = new ObservableScope(); + // because this uses its own scope, we can start another reconciliation for the duration of one connection. + callIntentScope.reconcile( + muteStates.video.enabled$, + async (videoEnabled) => + matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), + ); + + return async (): Promise => { + callIntentScope.end(); + try { + // Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; + }, + ); + + const participant$ = scope.behavior( + localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), + ); // Pause upstream of all local media tracks when we're disconnected from // MatrixRTC, because it can be an unpleasant surprise for the app to say @@ -377,12 +488,12 @@ export const createLocalMembership$ = ({ // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - combineLatest([localConnection$, homeserverConnected$]) + // TODO refactor this based no livekitState$ + combineLatest([participant$, homeserverConnected$]) .pipe(scope.bind()) - .subscribe(([connection, connected]) => { - if (connection?.state$.value.state !== "ConnectedToLkRoom") return; - const publications = - connection.livekitRoom.localParticipant.trackPublications.values(); + .subscribe(([participant, connected]) => { + if (!participant) return; + const publications = participant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -419,85 +530,13 @@ export const createLocalMembership$ = ({ } } }); - // TODO: Refactor updateCallIntent to sth like this: - // combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{ - // matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - // })) - // - - // TODO I do not fully understand what this does. - // Is it needed? - // Is this at the right place? - // Can this be simplified? - // Start and stop session membership as needed - // Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file) - // MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block. - // @toger5 will try to take care of this. - scope.reconcile(localTransport$, async (transport) => { - if (transport !== null && transport !== undefined) { - try { - state.matrix$.next({ state: MatrixState.Connecting }); - await joinMatrixRTC(transport); - } catch (e) { - logger.error("Error entering RTC session", e); - } - - // Update our member event when our mute state changes. - const intentScope = new ObservableScope(); - intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - intentScope.end(); - // Only sends Matrix leave event. The LiveKit session will disconnect - // as soon as either the stopConnection$ handler above gets to it or - // the view model is destroyed. - try { - await matrixRTCSession.leaveRoomSession(); - } catch (e) { - logger.error("Error leaving RTC session", e); - } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; - } - }); - - localConnection$ - .pipe( - distinctUntilChanged(), - switchMap((c) => - c === null ? of({ state: "Initialized" } as ConnectionState) : c.state$, - ), - map((s) => { - logger.trace(`Local connection state update: ${s.state}`); - if (s.state == "FailedToStart") { - return s.error instanceof ElementCallError - ? s.error - : new UnknownCallError(s.error); - } - }), - scope.bind(), - ) - .subscribe((error) => { - if (error !== undefined) - state.livekit$.next({ state: LivekitState.Error, error }); - }); /** * Whether the user is currently sharing their screen. */ const sharingScreen$ = scope.behavior( - localConnection$.pipe( - switchMap((c) => - c !== null - ? observeSharingScreen$(c.livekitRoom.localParticipant) - : of(false), - ), + participant$.pipe( + switchMap((p) => (p !== null ? observeSharingScreen$(p) : of(false))), ), ); @@ -527,24 +566,23 @@ export const createLocalMembership$ = ({ // We also allow screen sharing to be toggled even if the connection // is still initializing or publishing tracks, because there's no // technical reason to disallow this. LiveKit will publish if it can. - localConnection$.value?.livekitRoom.localParticipant - .setScreenShareEnabled(targetScreenshareState, screenshareSettings) + participant$.value + ?.setScreenShareEnabled(targetScreenshareState, screenshareSettings) .catch(logger.error); }; } - const participant$ = scope.behavior( - localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), - ); return { startTracks, requestConnect, requestDisconnect, - connectionState: state, + connectionState: { + livekit$: scope.behavior(livekitState$), + matrix$: matrixState$, + }, homeserverConnected$, connected$, reconnecting$, - sharingScreen$, toggleScreenSharing, participant$, diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index d543f97a..c1c36fa5 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { BehaviorSubject } from "rxjs"; import { mockConfig, flushPromises } from "../../../utils/test"; import { createLocalTransport$ } from "./LocalTransport"; @@ -117,4 +118,39 @@ describe("LocalTransport", () => { type: "livekit", }); }); + + it("updates local transport when oldest member changes", async () => { + // Use config so transport discovery succeeds, but delay OpenID JWT fetch + mockConfig({ + livekit: { livekit_service_url: "https://lk.example.org" }, + }); + const memberships$ = new BehaviorSubject(new Epoch([])); + const openIdResolver = Promise.withResolvers(); + + vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( + openIdResolver.promise, + ); + + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(true), + memberships$, + client: { + getDomain: () => "", + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + + openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" }); + expect(localTransport$.value).toBe(null); + await flushPromises(); + // final + expect(localTransport$.value).toStrictEqual({ + livekit_alias: "!room:example.org", + livekit_service_url: "https://lk.example.org", + type: "livekit", + }); + }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 11f35424..df6addb8 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -14,6 +14,7 @@ import { ConnectionState as LivekitConnectionState, } from "livekit-client"; import { + BehaviorSubject, map, NEVER, type Observable, @@ -33,6 +34,7 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; +import { FailToStartLivekitConnection } from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -40,7 +42,6 @@ import { type ObservableScope } from "../../ObservableScope.ts"; * The Publisher is also responsible for creating the media tracks. */ export class Publisher { - public tracks: LocalTrack[] = []; /** * Creates a new Publisher. * @param scope - The observable scope to use for managing the publisher. @@ -81,6 +82,9 @@ export class Publisher { }); } + private _tracks$ = new BehaviorSubject[]>([]); + public tracks$ = this._tracks$ as Behavior[]>; + /** * Start the connection to LiveKit and publish local tracks. * @@ -94,50 +98,36 @@ export class Publisher { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - public async createAndSetupTracks(): Promise { + public async createAndSetupTracks(): Promise { const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); - // TODO: This should be an autostarted connection no need to start here. just check the connection state. - // TODO: This will fetch the JWT token. Perhaps we could keep it preloaded - // instead? This optimization would only be safe for a publish connection, - // because we don't want to leak the user's intent to perhaps join a call to - // remote servers before they actually commit to it. - // const { promise, resolve, reject } = Promise.withResolvers(); - // const sub = this.connection.state$.subscribe((s) => { - // if (s.state === "FailedToStart") { - // reject(new Error("Disconnected from LiveKit server")); - // } else if (s.state === "ConnectedToLkRoom") { - // resolve(); - // } - // }); - // try { - // await promise; - // } catch (e) { - // throw e; - // } finally { - // sub.unsubscribe(); - // } // TODO-MULTI-SFU: Prepublish a microphone track const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; // createTracks throws if called with audio=false and video=false if (audio || video) { // TODO this can still throw errors? It will also prompt for permissions if not already granted - this.tracks = - (await lkRoom.localParticipant - .createTracks({ - audio, - video, - }) - .catch((error) => { - this.logger?.error("Failed to create tracks", error); - })) ?? []; + return lkRoom.localParticipant + .createTracks({ + audio, + video, + }) + .then((tracks) => { + this._tracks$.next(tracks); + }); } - return this.tracks; + throw Error("audio and video is false"); } + private _publishing$ = new BehaviorSubject(false); + public publishing$ = this.scope.behavior(this._publishing$); + /** + * + * @returns + * @throws ElementCallError + */ public async startPublishing(): Promise { const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); @@ -147,7 +137,7 @@ export class Publisher { resolve(); break; case "FailedToStart": - reject(new Error("Failed to connect to LiveKit server")); + reject(new FailToStartLivekitConnection()); break; default: this.logger?.info("waiting for connection: ", s.state); @@ -160,7 +150,7 @@ export class Publisher { } finally { sub.unsubscribe(); } - for (const track of this.tracks) { + for (const track of this.tracks$.value) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { @@ -169,7 +159,8 @@ export class Publisher { // TODO: check if the connection is still active? and break the loop if not? } - return this.tracks; + this._publishing$.next(true); + return this.tracks$.value; } public async stopPublishing(): Promise { @@ -185,6 +176,15 @@ export class Publisher { }; localParticipant.trackPublications.forEach(addToTracksIfDefined); await localParticipant.unpublishTracks(tracks); + this._publishing$.next(false); + } + + /** + * Stops all tracks that are currently running + */ + public stopTracks(): void { + this.tracks$.value.forEach((t) => t.stop()); + this._tracks$.next([]); } /// Private methods diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 3f58bcf6..2ead768b 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -193,7 +193,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState!.state === "FailedToStart") { expect(capturedState!.error.message).toEqual("Something went wrong"); - expect(capturedState!.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -249,7 +249,7 @@ describe("Start connection states", () => { expect(capturedState?.error.message).toContain( "SFU Config fetch failed with exception Error", ); - expect(capturedState?.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -313,7 +313,7 @@ describe("Start connection states", () => { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); - expect(capturedState.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c17fae2b..81bc9f29 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -50,16 +50,14 @@ export interface ConnectionOpts { export type ConnectionState = | { state: "Initialized" } - | { state: "FetchingConfig"; transport: LivekitTransport } - | { state: "ConnectingToLkRoom"; transport: LivekitTransport } - | { state: "PublishingTracks"; transport: LivekitTransport } - | { state: "FailedToStart"; error: Error; transport: LivekitTransport } + | { state: "FetchingConfig" } + | { state: "ConnectingToLkRoom" } | { state: "ConnectedToLkRoom"; livekitConnectionState$: Observable; - transport: LivekitTransport; } - | { state: "Stopped"; transport: LivekitTransport }; + | { state: "FailedToStart"; error: Error } + | { state: "Stopped" }; /** * A connection to a Matrix RTC LiveKit backend. @@ -77,6 +75,22 @@ export class Connection { */ public readonly state$: Behavior = this._state$; + /** + * The media transport to connect to. + */ + public readonly transport: LivekitTransport; + + public readonly livekitRoom: LivekitRoom; + + /** + * An observable of the participants that are publishing on this connection. (Excluding our local participant) + * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. + * It filters the participants to only those that are associated with a membership that claims to publish on this connection. + */ + public readonly remoteParticipantsWithTracks$: Behavior< + PublishingParticipant[] + >; + /** * Whether the connection has been stopped. * @see Connection.stop @@ -104,7 +118,6 @@ export class Connection { try { this._state$.next({ state: "FetchingConfig", - transport: this.transport, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect @@ -112,7 +125,6 @@ export class Connection { this._state$.next({ state: "ConnectingToLkRoom", - transport: this.transport, }); try { await this.livekitRoom.connect(url, jwt); @@ -143,7 +155,6 @@ export class Connection { this._state$.next({ state: "ConnectedToLkRoom", - transport: this.transport, livekitConnectionState$: connectionStateObserver(this.livekitRoom), }); } catch (error) { @@ -151,7 +162,6 @@ export class Connection { this._state$.next({ state: "FailedToStart", error: error instanceof Error ? error : new Error(`${error}`), - transport: this.transport, }); throw error; } @@ -179,28 +189,11 @@ export class Connection { await this.livekitRoom.disconnect(); this._state$.next({ state: "Stopped", - transport: this.transport, }); this.stopped = true; } - /** - * An observable of the participants that are publishing on this connection. (Excluding our local participant) - * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. - * It filters the participants to only those that are associated with a membership that claims to publish on this connection. - */ - public readonly remoteParticipantsWithTracks$: Behavior< - PublishingParticipant[] - >; - - /** - * The media transport to connect to. - */ - public readonly transport: LivekitTransport; - private readonly client: OpenIDClientParts; - public readonly livekitRoom: LivekitRoom; - private readonly logger: Logger; /** diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index d9a0380e..0b9f939c 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -92,7 +92,6 @@ interface Props { } // TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { - transports$: Behavior>; connectionManagerData$: Behavior>; } /** @@ -216,7 +215,7 @@ export function createConnectionManager$({ new Epoch(new ConnectionManagerData()), ); - return { transports$, connectionManagerData$ }; + return { connectionManagerData$ }; } function removeDuplicateTransports( diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 27f501c7..6d867414 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -123,8 +123,22 @@ export class ObservableScope { callback: (value: T) => Promise<(() => Promise) | void>, ): void { let latestValue: T | typeof nothing = nothing; - let reconciledValue: T | typeof nothing = nothing; + let reconcilePromise: Promise | undefined = undefined; let cleanUp: (() => Promise) | void = undefined; + + // While this loop runs it will process the latest from `value$` until it caught up with the updates. + // It might skip updates from `value$` and only process the newest value after callback has resolved. + const reconcileLoop = async (): Promise => { + let prevVal: T | typeof nothing = nothing; + while (latestValue !== prevVal) { + await cleanUp?.(); // Call the previous value's clean-up handler + prevVal = latestValue; + + if (latestValue !== nothing) cleanUp = await callback(latestValue); // Sync current value... + // `latestValue` might have gotten updated during the `await callback`. That is why we loop here + } + }; + value$ .pipe( catchError(() => EMPTY), // Ignore errors @@ -132,23 +146,15 @@ export class ObservableScope { endWith(nothing), // Clean up when the scope ends ) .subscribe((value) => { - void (async (): Promise => { - if (latestValue === nothing) { - latestValue = value; - while (latestValue !== reconciledValue) { - await cleanUp?.(); // Call the previous value's clean-up handler - reconciledValue = latestValue; - if (latestValue !== nothing) - cleanUp = await callback(latestValue); // Sync current value - } - // Reset to signal that reconciliation is done for now - latestValue = nothing; - } else { - // There's already an instance of the above 'while' loop running - // concurrently. Just update the latest value and let it be handled. - latestValue = value; - } - })(); + // Always track the latest value! The `reconcileLoop` will run until it "processed" the "last" `latestValue`. + latestValue = value; + // There's already an instance of the below 'reconcileLoop' loop running + // concurrently. So lets let the loop handle it. NEVER instanciate two `reconcileLoop`s. + if (reconcilePromise) return; + + reconcilePromise = reconcileLoop().finally(() => { + reconcilePromise = undefined; + }); }); } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index b77c0ff0..cdd0e75c 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -13,6 +13,8 @@ export enum ErrorCode { */ MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", + INTERNAL_MEMBERSHIP_MANAGER = "INTERNAL_MEMBERSHIP_MANAGER", + FAILED_TO_START_LIVEKIT = "FAILED_TO_START_LIVEKIT", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", E2EE_NOT_SUPPORTED = "E2EE_NOT_SUPPORTED", @@ -27,6 +29,7 @@ export enum ErrorCategory { NETWORK_CONNECTIVITY = "NETWORK_CONNECTIVITY", CLIENT_CONFIGURATION = "CLIENT_CONFIGURATION", UNKNOWN = "UNKNOWN", + SYSTEM_FAILURE = "SYSTEM_FAILURE", // SYSTEM_FAILURE / FEDERATION_FAILURE .. } @@ -83,6 +86,18 @@ export class ConnectionLostError extends ElementCallError { } } +export class MembershipManagerError extends ElementCallError { + public constructor(error: Error) { + super( + t("error.membership_manager"), + ErrorCode.INTERNAL_MEMBERSHIP_MANAGER, + ErrorCategory.SYSTEM_FAILURE, + t("error.membership_manager_description"), + error, + ); + } +} + export class E2EENotSupportedError extends ElementCallError { public constructor() { super( @@ -120,6 +135,17 @@ export class FailToGetOpenIdToken extends ElementCallError { } } +export class FailToStartLivekitConnection extends ElementCallError { + public constructor() { + super( + t("error.failed_to_start_livekit"), + ErrorCode.FAILED_TO_START_LIVEKIT, + ErrorCategory.NETWORK_CONNECTIVITY, + undefined, + ); + } +} + export class InsufficientCapacityError extends ElementCallError { public constructor() { super( From e5117f705d78113e8c236ba0d6e1cd6e3c50a9cb Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 27 Nov 2025 14:42:23 +0100 Subject: [PATCH 02/11] More testing and cleaning up --- .../localMember/LocalMembership.test.ts | 252 +++++++++++++++--- .../localMember/LocalMembership.ts | 171 ++++++------ .../CallViewModel/localMember/Publisher.ts | 18 ++ src/state/ObservableScope.ts | 7 +- 4 files changed, 321 insertions(+), 127 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index a3bfe158..6c6c3d6e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -14,7 +14,7 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type LocalParticipant } from "livekit-client"; +import { type LocalParticipant, type LocalTrack } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -34,6 +34,7 @@ import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { type Connection } from "../remoteMembers/Connection"; +import { type Publisher } from "./Publisher"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); @@ -235,44 +236,54 @@ describe("LocalMembership", () => { }); }); - it("recreates publisher if new connection is used", async () => { + const aTransport = { + livekit_service_url: "a", + } as LivekitTransport; + const bTransport = { + livekit_service_url: "b", + } as LivekitTransport; + + const connectionManagerData = new ConnectionManagerData(); + + connectionManagerData.add( + { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: aTransport, + } as unknown as Connection, + [], + ); + connectionManagerData.add( + { + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: bTransport, + } as unknown as Connection, + [], + ); + + it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { const scope = new ObservableScope(); - const aTransport = { - livekit_service_url: "a", - } as LivekitTransport; - const bTransport = { - livekit_service_url: "b", - } as LivekitTransport; const localTransport$ = new BehaviorSubject(aTransport); - const connectionManagerData = new ConnectionManagerData(); + const publishers: Publisher[] = []; - connectionManagerData.add( - { - livekitRoom: mockLivekitRoom({ - localParticipant: { - isScreenShareEnabled: false, - trackPublications: [], - } as unknown as LocalParticipant, - }), - state$: constant({ - state: "ConnectedToLkRoom", - }), - transport: aTransport, - } as unknown as Connection, - [], + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { stopPublishing: vi.fn(), stopTracks: vi.fn() }; + publishers.push(p as unknown as Publisher); + return p; + }, ); - connectionManagerData.add( - { - state$: constant({ - state: "ConnectedToLkRoom", - }), - transport: bTransport, - } as unknown as Connection, - [], - ); - const publisherFactory = defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< typeof vi.fn @@ -290,7 +301,182 @@ describe("LocalMembership", () => { localTransport$.next(bTransport); await flushPromises(); expect(publisherFactory).toHaveBeenCalledTimes(2); + expect(publishers.length).toBe(2); + // stop the first Publisher and let the second one life. + expect(publishers[0].stopTracks).toHaveBeenCalled(); + expect(publishers[1].stopTracks).not.toHaveBeenCalled(); + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopPublishing).not.toHaveBeenCalled(); expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[1].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopTracks).toHaveBeenCalled(); + + defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); + }); + + it("only start tracks if requested", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(aTransport); + + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn(), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + tracks$.next([{}, {}] as LocalTrack[]); + return Promise.resolve(); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + localMembership.startTracks(); + await flushPromises(); + expect(localMembership.tracks$.value.length).toBe(2); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); + }); + // TODO add an integration test combining publisher and localMembership + // + it("tracks livekit state correctly", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(null); + const connectionManagerData$ = new BehaviorSubject< + Epoch + >(new Epoch(new ConnectionManagerData())); + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + const createTrackResolver = Promise.withResolvers(); + const publishResolver = Promise.withResolvers(); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn().mockImplementation(() => { + logger.info("stopTracks"); + tracks$.next([]); + }), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + await createTrackResolver.promise; + tracks$.next([{}, {}] as LocalTrack[]); + }), + startPublishing: vi.fn().mockImplementation(async () => { + await publishResolver.promise; + publishing$.next(true); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$, + }, + localTransport$, + }); + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingForTransport, + }); + localTransport$.next(aTransport); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingForConnection, + }); + connectionManagerData$.next(new Epoch(connectionManagerData)); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Initialized, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + + // ------- + localMembership.startTracks(); + // ------- + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.CreatingTracks, + }); + createTrackResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.ReadyToPublish, + }); + + // ------- + localMembership.requestConnect(); + // ------- + + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingToPublish, + }); + + publishResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Connected, + }); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); + + expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + scope.end(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.isStopped).toBe(true); + // stays in connected state because it is stopped before the update to tracks update the state. + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Connected, + }); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); }); }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index cfc715e0..706aeaca 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -22,10 +22,12 @@ import { catchError, combineLatest, distinctUntilChanged, + from, map, type Observable, of, scan, + startWith, switchMap, tap, } from "rxjs"; @@ -54,13 +56,13 @@ export enum LivekitState { Error = "error", /** Not even a transport is available to the LocalMembership */ WaitingForTransport = "waiting_for_transport", - /** A transport is and we are loading the connection based on the transport */ - Connecting = "connecting", - InitialisingPublisher = "uninitialized", + /** A connection appeared so we can initialise the publisher */ + WaitingForConnection = "waiting_for_connection", + /** Connection and transport arrived, publisher Initialized */ Initialized = "Initialized", CreatingTracks = "creating_tracks", ReadyToPublish = "ready_to_publish", - WaitingToPublish = "publishing", + WaitingToPublish = "waiting_to_publish", Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", @@ -69,8 +71,7 @@ export enum LivekitState { type LocalMemberLivekitState = | { state: LivekitState.Error; error: ElementCallError } | { state: LivekitState.WaitingForTransport } - | { state: LivekitState.Connecting } - | { state: LivekitState.InitialisingPublisher } + | { state: LivekitState.WaitingForConnection } | { state: LivekitState.Initialized } | { state: LivekitState.CreatingTracks } | { state: LivekitState.ReadyToPublish } @@ -163,12 +164,10 @@ export const createLocalMembership$ = ({ * Callback to toggle screen sharing. If null, screen sharing is not possible. */ toggleScreenSharing: (() => void) | null; + tracks$: Behavior; participant$: Behavior; connection$: Behavior; homeserverConnected$: Behavior; - // deprecated fields - /** @deprecated use state instead*/ - connected$: Behavior; // this needs to be discussed /** @deprecated use state instead*/ reconnecting$: Behavior; @@ -217,20 +216,19 @@ export const createLocalMembership$ = ({ ), ); + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + // /** // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - // // TODO use this in combination with the MemberState. const connected$ = scope.behavior( and$( homeserverConnected$, - localConnection$.pipe( - switchMap((c) => - c - ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) - : of(false), - ), + localConnectionState$.pipe( + map((state) => (state ? state.state === "ConnectedToLkRoom" : false)), ), ), ); @@ -259,7 +257,7 @@ export const createLocalMembership$ = ({ // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const trackStartRequested$ = new BehaviorSubject(false); + const trackStartRequested = Promise.withResolvers(); // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. @@ -273,19 +271,21 @@ export const createLocalMembership$ = ({ * Extract the tracks from the published. Also reacts to changing publishers. */ const tracks$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))), + publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), ); const publishing$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))), + publisher$.pipe( + switchMap((p) => (p?.publishing$ ? p.publishing$ : constant(false))), + ), ); const startTracks = (): Behavior => { - trackStartRequested$.next(true); + trackStartRequested.resolve(); return tracks$; }; const requestConnect = (): void => { - trackStartRequested$.next(true); + trackStartRequested.resolve(); connectRequested$.next(true); }; @@ -310,37 +310,18 @@ export const createLocalMembership$ = ({ }); }); - // const mutestate= publisher$.pipe(switchMap((publisher) => { - // return publisher.muteState$ - // }); - - // For each publisher create the descired tracks - // If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks - // THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher - // track start request can than just toggle the tracks. - // TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks? - combineLatest([publisher$, trackStartRequested$]).subscribe( - ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { - publisher.createAndSetupTracks().catch( - // TODO make this set some error state - (e) => logger.error(e), - ); - } else if (publisher) { - publisher.stopTracks(); - } - }, - ); - // Use reconcile here to not run concurrent createAndSetupTracks calls // `tracks$` will update once they are ready. scope.reconcile( - scope.behavior(combineLatest([publisher$, trackStartRequested$])), - async ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { + scope.behavior( + combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]), + null, + ), + async (valueIfReady) => { + if (!valueIfReady) return; + const [publisher, tracks] = valueIfReady; + if (publisher && tracks.length === 0) { await publisher.createAndSetupTracks().catch((e) => logger.error(e)); - } else if (publisher) { - publisher.stopTracks(); } }, ); @@ -349,8 +330,7 @@ export const createLocalMembership$ = ({ scope.reconcile( scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) - return Promise.resolve(); + if (shouldConnect === publisher?.publishing$.value) return; if (tracks.length !== 0 && shouldConnect) { try { await publisher?.startPublishing(); @@ -374,46 +354,53 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Observable = combineLatest([ - publisher$, - localTransport$, - localConnection$, - tracks$, - publishing$, - connectRequested$, - trackStartRequested$, - fatalLivekitError$, - ]).pipe( - map( - ([ - publisher, - localTransport, - localConnection, - tracks, - publishing, - shouldConnect, - shouldStartTracks, - error, - ]) => { - // read this: - // if(!) return {state: ...} - // if(!) return {state: } - // - // as: - // We do have but not yet so we are in - if (error !== null) return { state: LivekitState.Error, error }; - const hasTracks = tracks.length > 0; - if (!localTransport) return { state: LivekitState.WaitingForTransport }; - if (!localConnection) return { state: LivekitState.Connecting }; - if (!publisher) return { state: LivekitState.InitialisingPublisher }; - if (!shouldStartTracks) return { state: LivekitState.Initialized }; - if (!hasTracks) return { state: LivekitState.CreatingTracks }; - if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; - if (!publishing) return { state: LivekitState.WaitingToPublish }; - return { state: LivekitState.Connected }; - }, + const livekitState$: Behavior = scope.behavior( + combineLatest([ + publisher$, + localTransport$, + tracks$.pipe( + tap((t) => { + logger.info("tracks$: ", t); + }), + ), + publishing$, + connectRequested$, + from(trackStartRequested.promise).pipe( + map(() => true), + startWith(false), + ), + fatalLivekitError$, + ]).pipe( + map( + ([ + publisher, + localTransport, + tracks, + publishing, + shouldConnect, + shouldStartTracks, + error, + ]) => { + // read this: + // if(!) return {state: ...} + // if(!) return {state: } + // + // as: + // We do have but not yet so we are in + if (error !== null) return { state: LivekitState.Error, error }; + const hasTracks = tracks.length > 0; + if (!localTransport) + return { state: LivekitState.WaitingForTransport }; + if (!publisher) return { state: LivekitState.WaitingForConnection }; + if (!shouldStartTracks) return { state: LivekitState.Initialized }; + if (!hasTracks) return { state: LivekitState.CreatingTracks }; + if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; + if (!publishing) return { state: LivekitState.WaitingToPublish }; + return { state: LivekitState.Connected }; + }, + ), + distinctUntilChanged(deepCompare), ), - distinctUntilChanged(deepCompare), ); const fatalMatrixError$ = new BehaviorSubject(null); @@ -577,15 +564,15 @@ export const createLocalMembership$ = ({ requestConnect, requestDisconnect, connectionState: { - livekit$: scope.behavior(livekitState$), + livekit$: livekitState$, matrix$: matrixState$, }, + tracks$, + participant$, homeserverConnected$, - connected$, reconnecting$, sharingScreen$, toggleScreenSharing, - participant$, connection$: localConnection$, }; }; diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index df6addb8..14f44491 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -15,6 +15,7 @@ import { } from "livekit-client"; import { BehaviorSubject, + combineLatest, map, NEVER, type Observable, @@ -80,6 +81,23 @@ export class Publisher { ); void this.stopPublishing(); }); + + // TODO move mute state handling here using reconcile (instead of inside the mute state class) + // this.scope.reconcile( + // this.scope.behavior( + // combineLatest([this.muteStates.video.enabled$, this.tracks$]), + // ), + // async ([videoEnabled, tracks]) => { + // const track = tracks.find((t) => t.kind == Track.Kind.Video); + // if (!track) return; + + // if (videoEnabled) { + // await track.unmute(); + // } else { + // await track.mute(); + // } + // }, + // ); } private _tracks$ = new BehaviorSubject[]>([]); diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 6d867414..812cfcd7 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -80,8 +80,11 @@ export class ObservableScope { error(err: unknown) { subject$.error(err); }, + complete() { + subject$.complete(); + }, }); - if (subject$.value === nothing) + if (subject$.value === nothing && !subject$.isStopped) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } @@ -125,11 +128,11 @@ export class ObservableScope { let latestValue: T | typeof nothing = nothing; let reconcilePromise: Promise | undefined = undefined; let cleanUp: (() => Promise) | void = undefined; + let prevVal: T | typeof nothing = nothing; // While this loop runs it will process the latest from `value$` until it caught up with the updates. // It might skip updates from `value$` and only process the newest value after callback has resolved. const reconcileLoop = async (): Promise => { - let prevVal: T | typeof nothing = nothing; while (latestValue !== prevVal) { await cleanUp?.(); // Call the previous value's clean-up handler prevVal = latestValue; From 4b0f6e76c4f8df8b227fdbff5bf4800afef9e63c Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 27 Nov 2025 15:38:31 +0100 Subject: [PATCH 03/11] revert complete behavior check --- src/state/ObservableScope.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 812cfcd7..e3fc644f 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -80,11 +80,8 @@ export class ObservableScope { error(err: unknown) { subject$.error(err); }, - complete() { - subject$.complete(); - }, }); - if (subject$.value === nothing && !subject$.isStopped) + if (subject$.value === nothing) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } From 46f8fe4ec7362cf9e5b033563503be33d4e355b8 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 27 Nov 2025 16:43:03 +0100 Subject: [PATCH 04/11] fix test errors --- .../CallViewModel/localMember/LocalMembership.test.ts | 10 +++++++--- src/state/CallViewModel/localMember/Publisher.ts | 1 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 6c6c3d6e..f5256005 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -225,7 +225,7 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.Connecting }, + n: { state: LivekitState.WaitingForConnection }, e: { state: LivekitState.Error, error: expect.toSatisfy( @@ -279,7 +279,11 @@ describe("LocalMembership", () => { defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( () => { - const p = { stopPublishing: vi.fn(), stopTracks: vi.fn() }; + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn(), + publishing$: constant(false), + }; publishers.push(p as unknown as Publisher); return p; }, @@ -367,6 +371,7 @@ describe("LocalMembership", () => { // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled(); + publisherFactory.mockClear(); }); // TODO add an integration test combining publisher and localMembership // @@ -470,7 +475,6 @@ describe("LocalMembership", () => { expect(localMembership.connectionState.livekit$.isStopped).toBe(false); scope.end(); await flushPromises(); - expect(localMembership.connectionState.livekit$.isStopped).toBe(true); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ state: LivekitState.Connected, diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 14f44491..7fc7d924 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -15,7 +15,6 @@ import { } from "livekit-client"; import { BehaviorSubject, - combineLatest, map, NEVER, type Observable, From c0913b654612347282a0a0d8098d10dccff40303 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 27 Nov 2025 18:02:46 +0100 Subject: [PATCH 05/11] fix playwright test --- locales/en/app.json | 2 +- .../CallViewModel/localMember/LocalMembership.test.ts | 1 + src/state/CallViewModel/localMember/Publisher.ts | 11 +++++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/locales/en/app.json b/locales/en/app.json index 32d10663..1ff066ea 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -108,7 +108,7 @@ "connection_lost_description": "You were disconnected from the call.", "e2ee_unsupported": "Incompatible browser", "e2ee_unsupported_description": "Your web browser does not support encrypted calls. Supported browsers include Chrome, Safari, and Firefox 117+.", - "failed_to_start_livekit": "Failed to start Livekit", + "failed_to_start_livekit": "Failed to start Livekit connection", "generic": "Something went wrong", "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index f5256005..e5b7cc4a 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -483,4 +483,5 @@ describe("LocalMembership", () => { expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled(); }); + // TODO add tests for matrix local matrix participation. }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 7fc7d924..2021d618 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -34,7 +34,10 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; -import { FailToStartLivekitConnection } from "../../../utils/errors.ts"; +import { + ElementCallError, + FailToStartLivekitConnection, +} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -154,7 +157,11 @@ export class Publisher { resolve(); break; case "FailedToStart": - reject(new FailToStartLivekitConnection()); + reject( + s.error instanceof ElementCallError + ? s.error + : new FailToStartLivekitConnection(), + ); break; default: this.logger?.info("waiting for connection: ", s.state); From 2011aef116f21e8ad2aaaac004c141538c4ae29d Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 28 Nov 2025 17:59:10 +0100 Subject: [PATCH 06/11] skip "Should show error screen if call creation is restricted" on ff --- playwright/errors.spec.ts | 5 +++++ src/state/CallViewModel/localMember/LocalMembership.ts | 1 - src/state/CallViewModel/localMember/Publisher.ts | 1 + src/state/CallViewModel/remoteMembers/Connection.ts | 1 - 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/playwright/errors.spec.ts b/playwright/errors.spec.ts index 851e448d..0dc9fa38 100644 --- a/playwright/errors.spec.ts +++ b/playwright/errors.spec.ts @@ -75,7 +75,12 @@ test("Should automatically retry non fatal JWT errors", async ({ test("Should show error screen if call creation is restricted", async ({ page, + browserName, }) => { + test.skip( + browserName === "firefox", + "The test to check the video visibility is not working in Firefox CI environment. looks like video is disabled?", + ); await page.goto("/"); // We need the socket connection to fail, but this cannot be done by using the websocket route. diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 706aeaca..a68738e1 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -335,7 +335,6 @@ export const createLocalMembership$ = ({ try { await publisher?.startPublishing(); } catch (error) { - // will take care of "FailedToStartLk" errors. setLivekitError(error as ElementCallError); } } else if (tracks.length !== 0 && !shouldConnect) { diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 2021d618..a93ef392 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -174,6 +174,7 @@ export class Publisher { } finally { sub.unsubscribe(); } + for (const track of this.tracks$.value) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 81bc9f29..afa519fb 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -110,7 +110,6 @@ export class Connection { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - // TODO dont make this throw and instead store a connection error state in this class? // TODO consider an autostart pattern... public async start(): Promise { this.logger.debug("Starting Connection"); From 66dece98a5cb7b2ece7ec28e665edcf387916ce8 Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 28 Nov 2025 21:50:22 +0100 Subject: [PATCH 07/11] add more test for publisher --- playwright/errors.spec.ts | 2 +- .../localMember/Publisher.test.ts | 138 ++++++++++++++++++ .../CallViewModel/localMember/Publisher.ts | 14 +- src/utils/errors.ts | 4 +- src/utils/test.ts | 6 +- 5 files changed, 154 insertions(+), 10 deletions(-) create mode 100644 src/state/CallViewModel/localMember/Publisher.test.ts diff --git a/playwright/errors.spec.ts b/playwright/errors.spec.ts index 0dc9fa38..0d36f7ab 100644 --- a/playwright/errors.spec.ts +++ b/playwright/errors.spec.ts @@ -79,7 +79,7 @@ test("Should show error screen if call creation is restricted", async ({ }) => { test.skip( browserName === "firefox", - "The test to check the video visibility is not working in Firefox CI environment. looks like video is disabled?", + "The is test is not working on firefox CI environment.", ); await page.goto("/"); diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts new file mode 100644 index 00000000..f45f7abe --- /dev/null +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -0,0 +1,138 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + afterEach, + beforeEach, + describe, + expect, + it, + type Mock, + vi, +} from "vitest"; +import { ConnectionState as LivekitConenctionState } from "livekit-client"; +import { type BehaviorSubject } from "rxjs"; + +import { ObservableScope } from "../../ObservableScope"; +import { constant } from "../../Behavior"; +import { + mockLivekitRoom, + mockLocalParticipant, + mockMediaDevices, +} from "../../../utils/test"; +import { Publisher } from "./Publisher"; +import { + type Connection, + type ConnectionState, +} from "../remoteMembers/Connection"; +import { type MuteStates } from "../../MuteStates"; +import { FailToStartLivekitConnection } from "../../../utils/errors"; + +describe("Publisher", () => { + let scope: ObservableScope; + let connection: Connection; + let muteStates: MuteStates; + beforeEach(() => { + muteStates = { + audio: { + enabled$: constant(false), + unsetHandler: vi.fn(), + setHandler: vi.fn(), + }, + video: { + enabled$: constant(false), + unsetHandler: vi.fn(), + setHandler: vi.fn(), + }, + } as unknown as MuteStates; + scope = new ObservableScope(); + connection = { + state$: constant({ + state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConenctionState.Connected), + }), + livekitRoom: mockLivekitRoom({ + localParticipant: mockLocalParticipant({}), + }), + } as unknown as Connection; + }); + + afterEach(() => scope.end()); + + it("throws if livekit room could not publish", async () => { + const publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + ); + + // should do nothing if no tracks have been created yet. + await publisher.startPublishing(); + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).not.toHaveBeenCalled(); + + await expect(publisher.createAndSetupTracks()).rejects.toThrow( + Error("audio and video is false"), + ); + + (muteStates.audio.enabled$ as BehaviorSubject).next(true); + + ( + connection.livekitRoom.localParticipant.createTracks as Mock + ).mockResolvedValue([{}, {}]); + + await expect(publisher.createAndSetupTracks()).resolves.not.toThrow(); + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledOnce(); + + // failiour due to localParticipant.publishTrack + ( + connection.livekitRoom.localParticipant.publishTrack as Mock + ).mockRejectedValue(Error("testError")); + + await expect(publisher.startPublishing()).rejects.toThrow( + new FailToStartLivekitConnection("testError"), + ); + + // does not try other conenction after the first one failed + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(1); + + // failiour due to connection.state$ + const beforeState = connection.state$.value; + (connection.state$ as BehaviorSubject).next({ + state: "FailedToStart", + error: Error("testStartError"), + }); + + await expect(publisher.startPublishing()).rejects.toThrow( + new FailToStartLivekitConnection("testStartError"), + ); + (connection.state$ as BehaviorSubject).next(beforeState); + + // does not try other conenction after the first one failed + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(1); + + // success case + ( + connection.livekitRoom.localParticipant.publishTrack as Mock + ).mockResolvedValue({}); + + await expect(publisher.startPublishing()).resolves.not.toThrow(); + + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(3); + }); +}); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index a93ef392..3f3192d1 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -56,7 +56,7 @@ export class Publisher { */ public constructor( private scope: ObservableScope, - private connection: Connection, + private connection: Pick, //setE2EEEnabled, devices: MediaDevices, private readonly muteStates: MuteStates, trackerProcessorState$: Behavior, @@ -160,7 +160,7 @@ export class Publisher { reject( s.error instanceof ElementCallError ? s.error - : new FailToStartLivekitConnection(), + : new FailToStartLivekitConnection(s.error.message), ); break; default: @@ -180,17 +180,16 @@ export class Publisher { // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { this.logger?.error("Failed to publish track", error); + throw new FailToStartLivekitConnection( + error instanceof Error ? error.message : error, + ); }); - - // TODO: check if the connection is still active? and break the loop if not? } this._publishing$.next(true); return this.tracks$.value; } public async stopPublishing(): Promise { - // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope - // actually has the right lifetime this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); @@ -246,6 +245,9 @@ export class Publisher { // the process of being restarted. activeMicTrack.mediaStreamTrack.readyState !== "ended" ) { + this.logger?.info( + "Restarting audio device track due to active media device changed (workaroundRestartAudioInputTrackChrome)", + ); // Restart the track, which will cause Livekit to do another // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because diff --git a/src/utils/errors.ts b/src/utils/errors.ts index cdd0e75c..bb37754a 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -136,12 +136,12 @@ export class FailToGetOpenIdToken extends ElementCallError { } export class FailToStartLivekitConnection extends ElementCallError { - public constructor() { + public constructor(e?: string) { super( t("error.failed_to_start_livekit"), ErrorCode.FAILED_TO_START_LIVEKIT, ErrorCategory.NETWORK_CONNECTIVITY, - undefined, + e, ); } } diff --git a/src/utils/test.ts b/src/utils/test.ts index 471d35d8..d243b343 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -284,6 +284,8 @@ export function mockLivekitRoom( ): LivekitRoom { const livekitRoom = { options: {}, + setE2EEEnabled: vi.fn(), + ...mockEmitter(), ...room, } as Partial as LivekitRoom; @@ -306,7 +308,9 @@ export function mockLocalParticipant( return { isLocal: true, trackPublications: new Map(), - unpublishTracks: async () => Promise.resolve(), + publishTrack: vi.fn(), + unpublishTracks: vi.fn(), + createTracks: vi.fn(), getTrackPublication: () => ({}) as Partial as LocalTrackPublication, ...mockEmitter(), From 47c6a17d1e2c34f77f6e6de4877df2cf9aa19edd Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 1 Dec 2025 14:42:15 +0100 Subject: [PATCH 08/11] review --- src/state/CallViewModel/CallViewModel.ts | 9 ++- .../localMember/LocalMembership.test.ts | 22 +++--- .../localMember/LocalMembership.ts | 71 +++++++++++-------- .../localMember/Publisher.test.ts | 2 + .../CallViewModel/localMember/Publisher.ts | 60 ++++++++++------ 5 files changed, 102 insertions(+), 62 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 082da477..e48dd8c4 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -101,7 +101,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, - LivekitState, + RTCBackendState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -473,6 +473,9 @@ export function createCallViewModel$( mediaDevices, muteStates, trackProcessorState$, + logger.getChild( + "[Publisher" + connection.transport.livekit_service_url + "]", + ), ); }, connectionManager: connectionManager, @@ -664,7 +667,7 @@ export function createCallViewModel$( { value: matrixLivekitMembers }, duplicateTiles, ]) { - let localParticipantId = undefined; + let localParticipantId: string | undefined = undefined; // add local member if available if (localMatrixLivekitMember) { const { userId, participant$, connection$, membership$ } = @@ -1452,7 +1455,7 @@ export function createCallViewModel$( fatalError$: scope.behavior( localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === LivekitState.Error), + filter((v) => v.state === RTCBackendState.Error), map((s) => s.error), ), null, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index e5b7cc4a..8d9df81d 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -27,7 +27,7 @@ import { import { createLocalMembership$, enterRTCSession, - LivekitState, + RTCBackendState, } from "./LocalMembership"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; @@ -225,9 +225,9 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.WaitingForConnection }, + n: { state: RTCBackendState.WaitingForConnection }, e: { - state: LivekitState.Error, + state: RTCBackendState.Error, error: expect.toSatisfy( (e) => e instanceof MatrixRTCTransportMissingError, ), @@ -428,17 +428,17 @@ describe("LocalMembership", () => { await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingForTransport, + state: RTCBackendState.WaitingForTransport, }); localTransport$.next(aTransport); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingForConnection, + state: RTCBackendState.WaitingForConnection, }); connectionManagerData$.next(new Epoch(connectionManagerData)); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Initialized, + state: RTCBackendState.Initialized, }); expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -449,12 +449,12 @@ describe("LocalMembership", () => { await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.CreatingTracks, + state: RTCBackendState.CreatingTracks, }); createTrackResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.ReadyToPublish, + state: RTCBackendState.ReadyToPublish, }); // ------- @@ -462,13 +462,13 @@ describe("LocalMembership", () => { // ------- expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingToPublish, + state: RTCBackendState.WaitingToPublish, }); publishResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Connected, + state: RTCBackendState.Connected, }); expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); @@ -477,7 +477,7 @@ describe("LocalMembership", () => { await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Connected, + state: RTCBackendState.Connected, }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index a68738e1..3e9cd0cf 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,6 +11,7 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, + ConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -52,7 +53,7 @@ import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; -export enum LivekitState { +export enum RTCBackendState { Error = "error", /** Not even a transport is available to the LocalMembership */ WaitingForTransport = "waiting_for_transport", @@ -68,17 +69,17 @@ export enum LivekitState { Disconnecting = "disconnecting", } -type LocalMemberLivekitState = - | { state: LivekitState.Error; error: ElementCallError } - | { state: LivekitState.WaitingForTransport } - | { state: LivekitState.WaitingForConnection } - | { state: LivekitState.Initialized } - | { state: LivekitState.CreatingTracks } - | { state: LivekitState.ReadyToPublish } - | { state: LivekitState.WaitingToPublish } - | { state: LivekitState.Connected } - | { state: LivekitState.Disconnected } - | { state: LivekitState.Disconnecting }; +type LocalMemberRtcBackendState = + | { state: RTCBackendState.Error; error: ElementCallError } + | { state: RTCBackendState.WaitingForTransport } + | { state: RTCBackendState.WaitingForConnection } + | { state: RTCBackendState.Initialized } + | { state: RTCBackendState.CreatingTracks } + | { state: RTCBackendState.ReadyToPublish } + | { state: RTCBackendState.WaitingToPublish } + | { state: RTCBackendState.Connected } + | { state: RTCBackendState.Disconnected } + | { state: RTCBackendState.Disconnecting }; export enum MatrixState { WaitingForTransport = "waiting_for_transport", @@ -98,7 +99,7 @@ type LocalMemberMatrixState = | { state: MatrixState.Error; error: Error }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -155,8 +156,15 @@ export const createLocalMembership$ = ({ muteStates, matrixRTCSession, }: Props): { - requestConnect: () => void; + /** + * This starts audio and video tracks. They will be reused when calling `requestConnect`. + */ startTracks: () => Behavior; + /** + * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * connected to matrix and livekit. + */ + requestConnect: () => void; requestDisconnect: () => void; connectionState: LocalMemberConnectionState; sharingScreen$: Behavior; @@ -228,7 +236,15 @@ export const createLocalMembership$ = ({ and$( homeserverConnected$, localConnectionState$.pipe( - map((state) => (state ? state.state === "ConnectedToLkRoom" : false)), + switchMap((state) => { + if (!state) return of(false); + if (state.state === "ConnectedToLkRoom") { + state.livekitConnectionState$.pipe( + map((lkState) => lkState === ConnectionState.Connected), + ); + } + return of(false); + }), ), ), ); @@ -274,9 +290,7 @@ export const createLocalMembership$ = ({ publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), ); const publishing$ = scope.behavior( - publisher$.pipe( - switchMap((p) => (p?.publishing$ ? p.publishing$ : constant(false))), - ), + publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), ); const startTracks = (): Behavior => { @@ -353,7 +367,7 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + const livekitState$: Behavior = scope.behavior( combineLatest([ publisher$, localTransport$, @@ -386,16 +400,17 @@ export const createLocalMembership$ = ({ // // as: // We do have but not yet so we are in - if (error !== null) return { state: LivekitState.Error, error }; + if (error !== null) return { state: RTCBackendState.Error, error }; const hasTracks = tracks.length > 0; if (!localTransport) - return { state: LivekitState.WaitingForTransport }; - if (!publisher) return { state: LivekitState.WaitingForConnection }; - if (!shouldStartTracks) return { state: LivekitState.Initialized }; - if (!hasTracks) return { state: LivekitState.CreatingTracks }; - if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; - if (!publishing) return { state: LivekitState.WaitingToPublish }; - return { state: LivekitState.Connected }; + return { state: RTCBackendState.WaitingForTransport }; + if (!publisher) + return { state: RTCBackendState.WaitingForConnection }; + if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; + if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; + if (!publishing) return { state: RTCBackendState.WaitingToPublish }; + return { state: RTCBackendState.Connected }; }, ), distinctUntilChanged(deepCompare), @@ -526,7 +541,7 @@ export const createLocalMembership$ = ({ ), ); - let toggleScreenSharing = null; + let toggleScreenSharing: (() => void) | null = null; if ( "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !getUrlParams().hideScreensharing diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index f45f7abe..9b3e5b2a 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -16,6 +16,7 @@ import { } from "vitest"; import { ConnectionState as LivekitConenctionState } from "livekit-client"; import { type BehaviorSubject } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; @@ -70,6 +71,7 @@ describe("Publisher", () => { mockMediaDevices({}), muteStates, constant({ supported: false, processor: undefined }), + logger, ); // should do nothing if no tracks have been created yet. diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 3f3192d1..326dedaf 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -60,15 +60,15 @@ export class Publisher { devices: MediaDevices, private readonly muteStates: MuteStates, trackerProcessorState$: Behavior, - private logger?: Logger, + private logger: Logger, ) { - this.logger?.info("[PublishConnection] Create LiveKit room"); + this.logger.info("Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => { - this.logger?.error("Failed to set E2EE enabled on room", e); + this.logger.error("Failed to set E2EE enabled on room", e); }); // Setup track processor syncing (blur) @@ -78,9 +78,7 @@ export class Publisher { this.workaroundRestartAudioInputTrackChrome(devices, scope); this.scope.onEnd(() => { - this.logger?.info( - "[PublishConnection] Scope ended -> stop publishing all tracks", - ); + this.logger.info("Scope ended -> stop publishing all tracks"); void this.stopPublishing(); }); @@ -119,6 +117,7 @@ export class Publisher { * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ public async createAndSetupTracks(): Promise { + this.logger.debug("createAndSetupTracks called"); const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); @@ -135,7 +134,14 @@ export class Publisher { video, }) .then((tracks) => { + this.logger.info( + "created track", + tracks.map((t) => t.kind + ", " + t.id), + ); this._tracks$.next(tracks); + }) + .catch((error) => { + this.logger.error("Failed to create tracks", error); }); } throw Error("audio and video is false"); @@ -149,6 +155,7 @@ export class Publisher { * @throws ElementCallError */ public async startPublishing(): Promise { + this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { @@ -164,7 +171,7 @@ export class Publisher { ); break; default: - this.logger?.info("waiting for connection: ", s.state); + this.logger.info("waiting for connection: ", s.state); } }); try { @@ -176,20 +183,27 @@ export class Publisher { } for (const track of this.tracks$.value) { + this.logger.info("publish ", this.tracks$.value.length, "tracks"); // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger?.error("Failed to publish track", error); + this.logger.error("Failed to publish track", error); throw new FailToStartLivekitConnection( error instanceof Error ? error.message : error, ); }); + this.logger.info("published track ", track.kind, track.id); + + // TODO: check if the connection is still active? and break the loop if not? } this._publishing$.next(true); return this.tracks$.value; } public async stopPublishing(): Promise { + this.logger.debug("stopPublishing called"); + // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope + // actually has the right lifetime this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); @@ -199,7 +213,19 @@ export class Publisher { if (p.track !== undefined) tracks.push(p.track); }; localParticipant.trackPublications.forEach(addToTracksIfDefined); - await localParticipant.unpublishTracks(tracks); + this.logger.debug( + "list of tracks to unpublish:", + tracks.map((t) => t.kind + ", " + t.id), + "start unpublishing now", + ); + await localParticipant.unpublishTracks(tracks).catch((error) => { + this.logger.error("Failed to unpublish tracks", error); + throw error; + }); + this.logger.debug( + "unpublished tracks", + tracks.map((t) => t.kind + ", " + t.id), + ); this._publishing$.next(false); } @@ -256,7 +282,7 @@ export class Publisher { .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { - this.logger?.error(`Failed to restart audio device track`, e); + this.logger.error(`Failed to restart audio device track`, e); }); } }); @@ -276,7 +302,7 @@ export class Publisher { selected$.pipe(scope.bind()).subscribe((device) => { if (lkRoom.state != LivekitConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; - this.logger?.info( + this.logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", lkRoom.getActiveDevice(kind), " !== ", @@ -289,7 +315,7 @@ export class Publisher { lkRoom .switchActiveDevice(kind, device.id) .catch((e: Error) => - this.logger?.error( + this.logger.error( `Failed to sync ${kind} device with LiveKit`, e, ), @@ -314,10 +340,7 @@ export class Publisher { try { await lkRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit audio input mute state", - e, - ); + this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); @@ -325,10 +348,7 @@ export class Publisher { try { await lkRoom.localParticipant.setCameraEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit video input mute state", - e, - ); + this.logger.error("Failed to update LiveKit video input mute state", e); } return lkRoom.localParticipant.isCameraEnabled; }); From f26aa8f9705f5b2573642f7f24e35d2854a5e296 Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 1 Dec 2025 15:10:38 +0100 Subject: [PATCH 09/11] fix tests --- .../CallViewModel/localMember/LocalMembership.test.ts | 7 ++++++- src/utils/test.ts | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 8d9df81d..cff5c06d 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -14,7 +14,11 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type LocalParticipant, type LocalTrack } from "livekit-client"; +import { + ConnectionState as LivekitConnectionState, + type LocalParticipant, + type LocalTrack, +} from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -255,6 +259,7 @@ describe("LocalMembership", () => { }), state$: constant({ state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConnectionState.Connected), }), transport: aTransport, } as unknown as Connection, diff --git a/src/utils/test.ts b/src/utils/test.ts index d243b343..50a9add0 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -309,7 +309,7 @@ export function mockLocalParticipant( isLocal: true, trackPublications: new Map(), publishTrack: vi.fn(), - unpublishTracks: vi.fn(), + unpublishTracks: vi.fn().mockResolvedValue([]), createTracks: vi.fn(), getTrackPublication: () => ({}) as Partial as LocalTrackPublication, From 63cd4f79dd76e5d1dc5613c4f75b1fa1605151ad Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 1 Dec 2025 17:29:21 +0100 Subject: [PATCH 10/11] fix playwright test! (It caught an actual bug!!! so the right wording would be: fix implementation thanks to the playwright test! --- src/state/CallViewModel/CallViewModel.ts | 7 +++---- .../CallViewModel/localMember/LocalMembership.ts | 13 ++++++++++--- src/state/CallViewModel/remoteMembers/Connection.ts | 11 ++++++++--- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index e48dd8c4..cb933995 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -584,7 +584,6 @@ export function createCallViewModel$( // in a split-brained state. // DISCUSSION own membership manager ALSO this probably can be simplifis const reconnecting$ = localMembership.reconnecting$; - const pretendToBeDisconnected$ = reconnecting$; const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( @@ -633,7 +632,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(pretendToBeDisconnected$)), + handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), ); const reactions$ = scope.behavior( @@ -646,7 +645,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(pretendToBeDisconnected$), + pauseWhen(reconnecting$), ), ); @@ -737,7 +736,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - pretendToBeDisconnected$, + reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 3e9cd0cf..21d9c3f9 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -234,19 +234,26 @@ export const createLocalMembership$ = ({ // */ const connected$ = scope.behavior( and$( - homeserverConnected$, + homeserverConnected$.pipe( + tap((v) => logger.info("matrix: Connected state changed", v)), + ), localConnectionState$.pipe( switchMap((state) => { + logger.info("livekit: Connected state changed", state); if (!state) return of(false); if (state.state === "ConnectedToLkRoom") { - state.livekitConnectionState$.pipe( + logger.info( + "livekit: Connected state changed (inner livekitConnectionState$)", + state.livekitConnectionState$.value, + ); + return state.livekitConnectionState$.pipe( map((lkState) => lkState === ConnectionState.Connected), ); } return of(false); }), ), - ), + ).pipe(tap((v) => logger.info("combined: Connected state changed", v))), ); // MATRIX RELATED diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index afa519fb..4f3bbda4 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -19,7 +19,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map, type Observable } from "rxjs"; +import { BehaviorSubject, map } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -54,7 +54,7 @@ export type ConnectionState = | { state: "ConnectingToLkRoom" } | { state: "ConnectedToLkRoom"; - livekitConnectionState$: Observable; + livekitConnectionState$: Behavior; } | { state: "FailedToStart"; error: Error } | { state: "Stopped" }; @@ -82,6 +82,8 @@ export class Connection { public readonly livekitRoom: LivekitRoom; + private scope: ObservableScope; + /** * An observable of the participants that are publishing on this connection. (Excluding our local participant) * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. @@ -154,7 +156,9 @@ export class Connection { this._state$.next({ state: "ConnectedToLkRoom", - livekitConnectionState$: connectionStateObserver(this.livekitRoom), + livekitConnectionState$: this.scope.behavior( + connectionStateObserver(this.livekitRoom), + ), }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); @@ -209,6 +213,7 @@ export class Connection { ); const { transport, client, scope } = opts; + this.scope = scope; this.livekitRoom = opts.livekitRoomFactory(); this.transport = transport; this.client = client; From 8cb8357398657bb173094abd9720b5412c328a77 Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 1 Dec 2025 17:34:37 +0100 Subject: [PATCH 11/11] make logs just debug. (info -> debug) --- src/state/CallViewModel/localMember/LocalMembership.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 21d9c3f9..60ae79b8 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -235,14 +235,14 @@ export const createLocalMembership$ = ({ const connected$ = scope.behavior( and$( homeserverConnected$.pipe( - tap((v) => logger.info("matrix: Connected state changed", v)), + tap((v) => logger.debug("matrix: Connected state changed", v)), ), localConnectionState$.pipe( switchMap((state) => { - logger.info("livekit: Connected state changed", state); + logger.debug("livekit: Connected state changed", state); if (!state) return of(false); if (state.state === "ConnectedToLkRoom") { - logger.info( + logger.debug( "livekit: Connected state changed (inner livekitConnectionState$)", state.livekitConnectionState$.value, ); @@ -253,7 +253,7 @@ export const createLocalMembership$ = ({ return of(false); }), ), - ).pipe(tap((v) => logger.info("combined: Connected state changed", v))), + ).pipe(tap((v) => logger.debug("combined: Connected state changed", v))), ); // MATRIX RELATED