From 2e646bfac163ea48b45de6fc8a22d52e8a0e01cd Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 2 Dec 2025 19:40:08 +0100 Subject: [PATCH 01/24] Unify LiveKit and Matrix connection states --- src/room/GroupCallView.tsx | 1 + src/state/CallViewModel/CallViewModel.ts | 47 ++-- .../localMember/HomeserverConnected.test.ts | 58 ++--- .../localMember/HomeserverConnected.ts | 38 +-- .../localMember/LocalMembership.test.ts | 26 +- .../localMember/LocalMembership.ts | 244 +++++++++--------- .../localMember/Publisher.test.ts | 3 +- .../CallViewModel/localMember/Publisher.ts | 2 +- .../remoteMembers/Connection.test.ts | 9 +- .../CallViewModel/remoteMembers/Connection.ts | 43 +-- 10 files changed, 238 insertions(+), 233 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 75438f7f..43602716 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -160,6 +160,7 @@ export const GroupCallView: FC = ({ }, [rtcSession]); // TODO move this into the callViewModel LocalMembership.ts + // We might actually not need this at all. Since we get into fatalError on those errors already? useTypedEventEmitter( rtcSession, MatrixRTCSessionEvent.MembershipManagerError, diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..9bfa979c 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -452,18 +452,14 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { - return enterRTCSession( - matrixRTCSession, - transport, - connectOptions$.value, - ); + joinMatrixRTC: (transport: LivekitTransport) => { + enterRTCSession(matrixRTCSession, transport, connectOptions$.value); }, createPublisherFactory: (connection: Connection) => { return new Publisher( @@ -573,17 +569,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -631,7 +616,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -644,7 +629,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -735,7 +720,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -827,11 +812,17 @@ export function createCallViewModel$( }), ); - const leave$: Observable<"user" | "timeout" | "decline" | "allOthersLeft"> = - merge( - autoLeave$, - merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe(scope.share); + const shouldLeave$: Observable< + "user" | "timeout" | "decline" | "allOthersLeft" + > = merge( + autoLeave$, + merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), + ).pipe(scope.share); + + shouldLeave$.pipe(scope.bind()).subscribe((reason) => { + logger.info(`Call left due to ${reason}`); + localMembership.requestDisconnect(); + }); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1453,7 +1444,7 @@ export function createCallViewModel$( autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, - leave$: leave$, + leave$: shouldLeave$, hangup: (): void => userHangup$.next(), join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, @@ -1500,7 +1491,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts index 1f61e533..87ca35d0 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts @@ -97,106 +97,106 @@ describe("createHomeserverConnected$", () => { // LLM generated test cases. They are a bit overkill but I improved the mocking so it is // easy enough to read them so I think they can stay. it("is false when sync state is not Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); - expect(hsConnected$.value).toBe(false); + const hsConnected = createHomeserverConnected$(scope, client, session); + expect(hsConnected.combined$.value).toBe(false); }); it("remains false while membership status is not Connected even if sync is Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // membership still disconnected + expect(hsConnected.combined$.value).toBe(false); // membership still disconnected }); it("is false when membership status transitions to Connected but ProbablyLeft is true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Make sync loop OK client.setSyncState(SyncState.Syncing); // Indicate probable leave before connection session.setProbablyLeft(true); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("becomes true only when all three conditions are satisfied", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // 1. Sync loop connected client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // not yet membership connected + expect(hsConnected.combined$.value).toBe(false); // not yet membership connected // 2. Membership connected session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); // probablyLeft is false + expect(hsConnected.combined$.value).toBe(true); // probablyLeft is false }); it("drops back to false when sync loop leaves Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Reach connected state client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Sync loop error => should flip false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops back to false when membership status becomes disconnected", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setMembershipStatus(Status.Disconnected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops to false when ProbablyLeft is emitted after being true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Simulate clearing the flag (in realistic scenario membership manager would update) session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); }); it("composite sequence reflects each individual failure reason", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Initially false (sync error + disconnected + not probably left) - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix sync only client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix membership session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Introduce probablyLeft -> false session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Restore notProbablyLeft -> true again session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Drop sync -> false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); }); diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.ts b/src/state/CallViewModel/localMember/HomeserverConnected.ts index e1c28078..c8bcd021 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.ts @@ -25,6 +25,11 @@ import { type NodeStyleEventEmitter } from "../../../utils/test"; */ const logger = rootLogger.getChild("[HomeserverConnected]"); +export interface HomeserverConnected { + combined$: Behavior; + rtsSession$: Behavior; +} + /** * Behavior representing whether we consider ourselves connected to the Matrix homeserver * for the purposes of a MatrixRTC session. @@ -39,7 +44,7 @@ export function createHomeserverConnected$( client: NodeStyleEventEmitter & Pick, matrixRTCSession: NodeStyleEventEmitter & Pick, -): Behavior { +): HomeserverConnected { const syncing$ = ( fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]> ).pipe( @@ -47,12 +52,15 @@ export function createHomeserverConnected$( map(([state]) => state === SyncState.Syncing), ); - const membershipConnected$ = fromEvent( - matrixRTCSession, - MembershipManagerEvent.StatusChanged, - ).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), + const rtsSession$ = scope.behavior( + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + map(() => matrixRTCSession.membershipStatus ?? Status.Unknown), + ), + Status.Unknown, + ); + + const membershipConnected$ = rtsSession$.pipe( + map((status) => status === Status.Connected), ); // This is basically notProbablyLeft$ @@ -71,15 +79,13 @@ export function createHomeserverConnected$( map(() => matrixRTCSession.probablyLeft !== true), ); - const connectedCombined$ = and$( - syncing$, - membershipConnected$, - certainlyConnected$, - ).pipe( - tap((connected) => { - logger.info(`Homeserver connected update: ${connected}`); - }), + const combined$ = scope.behavior( + and$(syncing$, membershipConnected$, certainlyConnected$).pipe( + tap((connected) => { + logger.info(`Homeserver connected update: ${connected}`); + }), + ), ); - return scope.behavior(connectedCombined$); + return { combined$, rtsSession$ }; } diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index cff5c06d..1ef7abd6 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { + Status, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -51,7 +52,7 @@ vi.mock("@livekit/components-core", () => ({ describe("LocalMembership", () => { describe("enterRTCSession", () => { - it("It joins the correct Session", async () => { + it("It joins the correct Session", () => { const focusFromOlderMembership = { type: "livekit", livekit_service_url: "http://my-oldest-member-service-url.com", @@ -107,7 +108,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -136,7 +137,7 @@ describe("LocalMembership", () => { ); }); - it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { + it("It should not fail with configuration error if homeserver config has livekit url but not fallback", () => { mockConfig({}); vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ "org.matrix.msc4143.rtc_foci": [ @@ -165,7 +166,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -190,7 +191,6 @@ describe("LocalMembership", () => { leaveRoomSession: () => {}, } as unknown as MatrixRTCSession, muteStates: mockMuteStates(), - isHomeserverConnected: constant(true), trackProcessorState$: constant({ supported: false, processor: undefined, @@ -198,7 +198,10 @@ describe("LocalMembership", () => { logger: logger, createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, - homeserverConnected$: constant(true), + homeserverConnected: { + combined$: constant(true), + rtsSession$: constant(Status.Connected), + }, }; it("throws error on missing RTC config error", () => { @@ -258,8 +261,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConnectionState.Connected), + state: LivekitConnectionState.Connected, }), transport: aTransport, } as unknown as Connection, @@ -268,7 +270,7 @@ describe("LocalMembership", () => { connectionManagerData.add( { state$: constant({ - state: "ConnectedToLkRoom", + state: LivekitConnectionState.Connected, }), transport: bTransport, } as unknown as Connection, @@ -443,7 +445,7 @@ describe("LocalMembership", () => { connectionManagerData$.next(new Epoch(connectionManagerData)); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Initialized, + state: LivekitConnectionState.Connected, }); expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -473,7 +475,7 @@ describe("LocalMembership", () => { publishResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); @@ -482,7 +484,7 @@ describe("LocalMembership", () => { await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 60ae79b8..33af5192 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,10 +11,11 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState, + ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { + Status as RTCSessionStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -27,7 +28,7 @@ import { map, type Observable, of, - scan, + pairwise, startWith, switchMap, tap, @@ -37,10 +38,9 @@ import { deepCompare } from "matrix-js-sdk/lib/utils"; import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { ObservableScope } from "../../ObservableScope"; +import { type ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; -import { and$ } from "../../../utils/observable"; import { ElementCallError, MembershipManagerError, @@ -51,7 +51,11 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + type ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; +import { type HomeserverConnected } from "./HomeserverConnected.ts"; export enum RTCBackendState { Error = "error", @@ -59,47 +63,32 @@ export enum RTCBackendState { WaitingForTransport = "waiting_for_transport", /** A connection appeared so we can initialise the publisher */ WaitingForConnection = "waiting_for_connection", - /** Connection and transport arrived, publisher Initialized */ - Initialized = "Initialized", + /** Implies lk connection is connected */ CreatingTracks = "creating_tracks", + /** Implies lk connection is connected */ ReadyToPublish = "ready_to_publish", + /** Implies lk connection is connected */ WaitingToPublish = "waiting_to_publish", - Connected = "connected", - Disconnected = "disconnected", - Disconnecting = "disconnecting", + /** Implies lk connection is connected */ + ConnectedAndPublishing = "fully_connected", } -type LocalMemberRtcBackendState = +type LocalMemberRTCBackendState = | { state: RTCBackendState.Error; error: ElementCallError } - | { state: RTCBackendState.WaitingForTransport } - | { state: RTCBackendState.WaitingForConnection } - | { state: RTCBackendState.Initialized } - | { state: RTCBackendState.CreatingTracks } - | { state: RTCBackendState.ReadyToPublish } - | { state: RTCBackendState.WaitingToPublish } - | { state: RTCBackendState.Connected } - | { state: RTCBackendState.Disconnected } - | { state: RTCBackendState.Disconnecting }; + | { state: Exclude } + | ConnectionState; -export enum MatrixState { +export enum MatrixAdditionalState { WaitingForTransport = "waiting_for_transport", - Ready = "ready", - Connecting = "connecting", - Connected = "connected", - Disconnected = "disconnected", - Error = "Error", } type LocalMemberMatrixState = - | { state: MatrixState.Connected } - | { state: MatrixState.WaitingForTransport } - | { state: MatrixState.Ready } - | { state: MatrixState.Connecting } - | { state: MatrixState.Disconnected } - | { state: MatrixState.Error; error: Error }; + | { state: MatrixAdditionalState.WaitingForTransport } + | { state: "Error"; error: Error } + | { state: RTCSessionStatus }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -122,8 +111,8 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (transport: LivekitTransport) => Promise; - homeserverConnected$: Behavior; + joinMatrixRTC: (transport: LivekitTransport) => void; + homeserverConnected: HomeserverConnected; localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, @@ -149,7 +138,7 @@ export const createLocalMembership$ = ({ scope, connectionManager, localTransport$: localTransportCanThrow$, - homeserverConnected$, + homeserverConnected, createPublisherFactory, joinMatrixRTC, logger: parentLogger, @@ -175,10 +164,14 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - homeserverConnected$: Behavior; - // this needs to be discussed - /** @deprecated use state instead*/ + /** Shorthand for connectionState.matrix.state === Status.Reconnecting + * Direct translation to the js-sdk membership manager connection `Status`. + */ reconnecting$: Behavior; + /** Shorthand for connectionState.matrix.state === Status.Disconnected + * Direct translation to the js-sdk membership manager connection `Status`. + */ + disconnected$: Behavior; } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); @@ -232,49 +225,31 @@ export const createLocalMembership$ = ({ // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - const connected$ = scope.behavior( - and$( - homeserverConnected$.pipe( - tap((v) => logger.debug("matrix: Connected state changed", v)), - ), - localConnectionState$.pipe( - switchMap((state) => { - logger.debug("livekit: Connected state changed", state); - if (!state) return of(false); - if (state.state === "ConnectedToLkRoom") { - logger.debug( - "livekit: Connected state changed (inner livekitConnectionState$)", - state.livekitConnectionState$.value, - ); - return state.livekitConnectionState$.pipe( - map((lkState) => lkState === ConnectionState.Connected), - ); - } - return of(false); - }), - ), - ).pipe(tap((v) => logger.debug("combined: Connected state changed", v))), - ); + // TODO remove this and just make it one single check of livekitConnectionState$ + // const connected$ = scope.behavior( + // localConnectionState$.pipe( + // switchMap((state) => { + // logger.debug("livekit: Connected state changed", state); + // if (!state) return of(false); + // if (state.state === "ConnectedToLkRoom") { + // logger.debug( + // "livekit: Connected state changed (inner livekitConnectionState$)", + // state.livekitConnectionState$.value, + // ); + // return state.livekitConnectionState$.pipe( + // map((lkState) => lkState === ConnectionState.Connected), + // ); + // } + // return of(false); + // }), + // ), + // ); // MATRIX RELATED - // /** - // * Whether we should tell the user that we're reconnecting to the call. - // */ - // DISCUSSION is there a better way to do this? - // sth that is more deriectly implied from the membership manager of the js sdk. (fromEvent(matrixRTCSession, Reconnecting)) ??? or similar const reconnecting$ = scope.behavior( - connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), ), ); @@ -374,8 +349,9 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + const livekitState$: Behavior = scope.behavior( combineLatest([ + localConnectionState$, publisher$, localTransport$, tracks$.pipe( @@ -389,10 +365,12 @@ export const createLocalMembership$ = ({ map(() => true), startWith(false), ), + // TODO use local connection state here to give the full pciture of the livekit state! fatalLivekitError$, ]).pipe( map( ([ + localConnectionState, publisher, localTransport, tracks, @@ -411,13 +389,21 @@ export const createLocalMembership$ = ({ const hasTracks = tracks.length > 0; if (!localTransport) return { state: RTCBackendState.WaitingForTransport }; - if (!publisher) + if (!localConnectionState) return { state: RTCBackendState.WaitingForConnection }; - if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if ( + localConnectionState.state !== LivekitConnectionState.Connected || + !publisher + ) + // pass through the localConnectionState while we do not yet have a publisher or the state + // of the connection is not yet connected + return { state: localConnectionState.state }; + if (!shouldStartTracks) + return { state: LivekitConnectionState.Connected }; if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.Connected }; + return { state: RTCBackendState.ConnectedAndPublishing }; }, ), distinctUntilChanged(deepCompare), @@ -431,58 +417,70 @@ export const createLocalMembership$ = ({ else fatalMatrixError$.next(e); }; const matrixState$: Behavior = scope.behavior( - combineLatest([ - localTransport$, - connectRequested$, - homeserverConnected$, - ]).pipe( - map(([localTransport, connectRequested, homeserverConnected]) => { - if (!localTransport) return { state: MatrixState.WaitingForTransport }; - if (!connectRequested) return { state: MatrixState.Ready }; - if (!homeserverConnected) return { state: MatrixState.Connecting }; - return { state: MatrixState.Connected }; + combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( + map(([localTransport, rtcSessionStatus]) => { + if (!localTransport) + return { state: MatrixAdditionalState.WaitingForTransport }; + return { state: rtcSessionStatus }; }), ), ); - // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + // inform the widget about the connect and disconnect intent from the user. + scope + .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + undefined, + connectRequested$.value, + ]) + .subscribe(([prev, current]) => { + if (!widget) return; + if (!prev && current) { + try { + void widget.api.transport.send(ElementWidgetActions.JoinCall, {}); + } catch (e) { + logger.error("Failed to send join action", e); + } + } + if (prev && !current) { + try { + void widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + } + }); + + combineLatest([muteStates.video.enabled$, homeserverConnected.combined$]) + .pipe(scope.bind()) + .subscribe(([videoEnabled, connected]) => { + if (!connected) return; + void matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"); + }); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( scope.behavior(combineLatest([localTransport$, connectRequested$])), async ([transport, shouldConnect]) => { + if (!transport) return; + // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. if (!shouldConnect) return; - if (!transport) return; try { - await joinMatrixRTC(transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) setMatrixError(new MembershipManagerError(error)); } - // Update our member event when our mute state changes. - const callIntentScope = new ObservableScope(); - // because this uses its own scope, we can start another reconciliation for the duration of one connection. - callIntentScope.reconcile( - muteStates.video.enabled$, - async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - callIntentScope.end(); + return Promise.resolve(async (): Promise => { try { - // Update matrixRTCSession to allow udpating the transport without leaving the session! - await matrixRTCSession.leaveRoomSession(); + // TODO Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(1000); } catch (e) { logger.error("Error leaving RTC session", e); } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; + }); }, ); @@ -497,7 +495,7 @@ export const createLocalMembership$ = ({ // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. // TODO refactor this based no livekitState$ - combineLatest([participant$, homeserverConnected$]) + combineLatest([participant$, homeserverConnected.combined$]) .pipe(scope.bind()) .subscribe(([participant, connected]) => { if (!participant) return; @@ -590,8 +588,15 @@ export const createLocalMembership$ = ({ }, tracks$, participant$, - homeserverConnected$, reconnecting$, + disconnected$: scope.behavior( + matrixState$.pipe( + map( + (sessionStatus) => + sessionStatus.state === RTCSessionStatus.Disconnected, + ), + ), + ), sharingScreen$, toggleScreenSharing, connection$: localConnection$, @@ -626,11 +631,11 @@ interface EnterRTCSessionOptions { * @throws If the widget could not send ElementWidgetActions.JoinCall action. */ // Exported for unit testing -export async function enterRTCSession( +export function enterRTCSession( rtcSession: MatrixRTCSession, transport: LivekitTransport, { encryptMedia, matrixRTCMode }: EnterRTCSessionOptions, -): Promise { +): void { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -669,7 +674,4 @@ export async function enterRTCSession( unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0, }, ); - if (widget) { - await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); - } } diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 9b3e5b2a..5468d1ff 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -53,8 +53,7 @@ describe("Publisher", () => { scope = new ObservableScope(); connection = { state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConenctionState.Connected), + state: LivekitConenctionState.Connected, }), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 326dedaf..6e4a9b35 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -160,7 +160,7 @@ export class Publisher { const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { switch (s.state) { - case "ConnectedToLkRoom": + case LivekitConnectionState.Connected: resolve(); break; case "FailedToStart": diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 2ead768b..efee1ccb 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -125,7 +125,10 @@ function setupRemoteConnection(): Connection { }; }); - fakeLivekitRoom.connect.mockResolvedValue(undefined); + fakeLivekitRoom.connect.mockImplementation(async (): Promise => { + fakeLivekitRoom.state = LivekitConnectionState.Connected; + return Promise.resolve(); + }); return new Connection(opts, logger); } @@ -309,7 +312,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState && capturedState?.state === "FailedToStart") { + if (capturedState && capturedState.state === "FailedToStart") { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); @@ -345,7 +348,7 @@ describe("Start connection states", () => { const connectingState = capturedStates.shift(); expect(connectingState?.state).toEqual("ConnectingToLkRoom"); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("ConnectedToLkRoom"); + expect(connectedState?.state).toEqual("connected"); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 4f3bbda4..962f56d9 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState as LivekitConenctionState, + type ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -47,17 +47,17 @@ export interface ConnectionOpts { /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory: () => LivekitRoom; } - +export enum ConnectionAdditionalState { + Initialized = "Initialized", + FetchingConfig = "FetchingConfig", + // FailedToStart = "FailedToStart", + Stopped = "Stopped", + ConnectingToLkRoom = "ConnectingToLkRoom", +} export type ConnectionState = - | { state: "Initialized" } - | { state: "FetchingConfig" } - | { state: "ConnectingToLkRoom" } - | { - state: "ConnectedToLkRoom"; - livekitConnectionState$: Behavior; - } - | { state: "FailedToStart"; error: Error } - | { state: "Stopped" }; + | { state: ConnectionAdditionalState } + | { state: LivekitConnectionState } + | { state: "FailedToStart"; error: Error }; /** * A connection to a Matrix RTC LiveKit backend. @@ -67,7 +67,7 @@ export type ConnectionState = export class Connection { // Private Behavior private readonly _state$ = new BehaviorSubject({ - state: "Initialized", + state: ConnectionAdditionalState.Initialized, }); /** @@ -118,14 +118,14 @@ export class Connection { this.stopped = false; try { this._state$.next({ - state: "FetchingConfig", + state: ConnectionAdditionalState.FetchingConfig, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; this._state$.next({ - state: "ConnectingToLkRoom", + state: ConnectionAdditionalState.ConnectingToLkRoom, }); try { await this.livekitRoom.connect(url, jwt); @@ -154,12 +154,13 @@ export class Connection { // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._state$.next({ - state: "ConnectedToLkRoom", - livekitConnectionState$: this.scope.behavior( - connectionStateObserver(this.livekitRoom), - ), - }); + connectionStateObserver(this.livekitRoom) + .pipe(this.scope.bind()) + .subscribe((lkState) => { + this._state$.next({ + state: lkState, + }); + }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ @@ -191,7 +192,7 @@ export class Connection { if (this.stopped) return; await this.livekitRoom.disconnect(); this._state$.next({ - state: "Stopped", + state: ConnectionAdditionalState.Stopped, }); this.stopped = true; } From 88721be9521843ab106fabc4e1c447bcabb2247a Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 3 Dec 2025 10:04:22 +0100 Subject: [PATCH 02/24] cleanup --- src/room/GroupCallView.tsx | 1 + src/room/InCallView.tsx | 1 + .../localMember/LocalMembership.ts | 25 ------------------- 3 files changed, 2 insertions(+), 25 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 43602716..dfd11ff3 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -314,6 +314,7 @@ export const GroupCallView: FC = ({ const navigate = useNavigate(); + // TODO split this into leave and onDisconnect const onLeft = useCallback( ( reason: "timeout" | "user" | "allOthersLeft" | "decline" | "error", diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6ae004d8..7ae3700c 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -151,6 +151,7 @@ export const ActiveCall: FC = (props) => { setVm(vm); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); + return (): void => { scope.end(); }; diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 33af5192..6a31ce4b 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -221,30 +221,6 @@ export const createLocalMembership$ = ({ switchMap((connection) => (connection ? connection.state$ : of(null))), ); - // /** - // * Whether we are "fully" connected to the call. Accounts for both the - // * connection to the MatrixRTC session and the LiveKit publish connection. - // */ - // TODO remove this and just make it one single check of livekitConnectionState$ - // const connected$ = scope.behavior( - // localConnectionState$.pipe( - // switchMap((state) => { - // logger.debug("livekit: Connected state changed", state); - // if (!state) return of(false); - // if (state.state === "ConnectedToLkRoom") { - // logger.debug( - // "livekit: Connected state changed (inner livekitConnectionState$)", - // state.livekitConnectionState$.value, - // ); - // return state.livekitConnectionState$.pipe( - // map((lkState) => lkState === ConnectionState.Connected), - // ); - // } - // return of(false); - // }), - // ), - // ); - // MATRIX RELATED const reconnecting$ = scope.behavior( @@ -365,7 +341,6 @@ export const createLocalMembership$ = ({ map(() => true), startWith(false), ), - // TODO use local connection state here to give the full pciture of the livekit state! fatalLivekitError$, ]).pipe( map( From 7c40b0e177fbbfbf14c7c28b71a22b58f6df94e4 Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 5 Dec 2025 19:48:02 +0100 Subject: [PATCH 03/24] ideas --- src/state/CallViewModel/CallViewModel.ts | 47 +++++++++++-------- .../CallViewModel/remoteMembers/Connection.ts | 43 +++++++++-------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 9bfa979c..3c15958a 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -452,14 +452,18 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected: createHomeserverConnected$( + homeserverConnected$: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: (transport: LivekitTransport) => { - enterRTCSession(matrixRTCSession, transport, connectOptions$.value); + joinMatrixRTC: async (transport: LivekitTransport) => { + return enterRTCSession( + matrixRTCSession, + transport, + connectOptions$.value, + ); }, createPublisherFactory: (connection: Connection) => { return new Publisher( @@ -569,6 +573,17 @@ export function createCallViewModel$( ), ); + /** + * Whether various media/event sources should pretend to be disconnected from + * all network input, even if their connection still technically works. + */ + // We do this when the app is in the 'reconnecting' state, because it might be + // that the LiveKit connection is still functional while the homeserver is + // down, for example, and we want to avoid making people worry that the app is + // in a split-brained state. + // DISCUSSION own membership manager ALSO this probably can be simplifis + const reconnecting$ = localMembership.reconnecting$; + const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -616,7 +631,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), ); const reactions$ = scope.behavior( @@ -629,7 +644,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(localMembership.reconnecting$), + pauseWhen(reconnecting$), ), ); @@ -720,7 +735,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - localMembership.reconnecting$, + reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -812,17 +827,11 @@ export function createCallViewModel$( }), ); - const shouldLeave$: Observable< - "user" | "timeout" | "decline" | "allOthersLeft" - > = merge( - autoLeave$, - merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe(scope.share); - - shouldLeave$.pipe(scope.bind()).subscribe((reason) => { - logger.info(`Call left due to ${reason}`); - localMembership.requestDisconnect(); - }); + const leave$: Observable<"user" | "timeout" | "decline" | "allOthersLeft"> = + merge( + autoLeave$, + merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), + ).pipe(scope.share); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1444,7 +1453,7 @@ export function createCallViewModel$( autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, - leave$: shouldLeave$, + leave$: leave$, hangup: (): void => userHangup$.next(), join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, @@ -1491,7 +1500,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: localMembership.reconnecting$, + reconnecting$: reconnecting$, }; } diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 962f56d9..549777f9 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState as LivekitConnectionState, + ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -47,17 +47,24 @@ export interface ConnectionOpts { /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory: () => LivekitRoom; } -export enum ConnectionAdditionalState { +export class FailedToStartError extends Error { + public constructor(message: string) { + super(message); + this.name = "FailedToStartError"; + } +} + +export enum ConnectionState { Initialized = "Initialized", FetchingConfig = "FetchingConfig", - // FailedToStart = "FailedToStart", Stopped = "Stopped", ConnectingToLkRoom = "ConnectingToLkRoom", + LivekitDisconnected = "disconnected", + LivekitConnecting = "connecting", + LivekitConnected = "connected", + LivekitReconnecting = "reconnecting", + LivekitSignalReconnecting = "signalReconnecting", } -export type ConnectionState = - | { state: ConnectionAdditionalState } - | { state: LivekitConnectionState } - | { state: "FailedToStart"; error: Error }; /** * A connection to a Matrix RTC LiveKit backend. @@ -66,14 +73,15 @@ export type ConnectionState = */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject({ - state: ConnectionAdditionalState.Initialized, - }); + private readonly _state$ = new BehaviorSubject< + ConnectionState | FailedToStartError + >(ConnectionState.Initialized); /** * The current state of the connection to the media transport. */ - public readonly state$: Behavior = this._state$; + public readonly state$: Behavior = + this._state$; /** * The media transport to connect to. @@ -117,16 +125,12 @@ export class Connection { this.logger.debug("Starting Connection"); this.stopped = false; try { - this._state$.next({ - state: ConnectionAdditionalState.FetchingConfig, - }); + this._state$.next(ConnectionState.FetchingConfig); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._state$.next({ - state: ConnectionAdditionalState.ConnectingToLkRoom, - }); + this._state$.next(ConnectionState.ConnectingToLkRoom); try { await this.livekitRoom.connect(url, jwt); } catch (e) { @@ -157,9 +161,8 @@ export class Connection { connectionStateObserver(this.livekitRoom) .pipe(this.scope.bind()) .subscribe((lkState) => { - this._state$.next({ - state: lkState, - }); + // It si save to cast lkState to ConnectionState as they are fully overlapping. + this._state$.next(lkState as unknown as ConnectionState); }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); From d8b9568400eb9267a2dc5f3efdff09cfec770831 Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 8 Dec 2025 23:33:41 -0500 Subject: [PATCH 04/24] Stop publisher in a less brittle way --- .../CallViewModel/localMember/LocalMembership.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 60ae79b8..71261d37 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -323,12 +323,14 @@ export const createLocalMembership$ = ({ // - overwrite current publisher scope.reconcile(localConnection$, async (connection) => { if (connection !== null) { - publisher$.next(createPublisherFactory(connection)); + const publisher = createPublisherFactory(connection); + publisher$.next(publisher); + // Clean-up callback + return Promise.resolve(async (): Promise => { + await publisher.stopPublishing(); + publisher.stopTracks(); + }); } - return Promise.resolve(async (): Promise => { - await publisher$?.value?.stopPublishing(); - publisher$?.value?.stopTracks(); - }); }); // Use reconcile here to not run concurrent createAndSetupTracks calls From 9481dc401c6c06c32085560431552247a56ddb16 Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 8 Dec 2025 23:34:42 -0500 Subject: [PATCH 05/24] Remove extraneous 'scope running' check Semantically, behaviors are only meaningful for as long as their scope is running. Setting a behavior's value to an empty array once its scope ends is not guaranteed to work (as it depends on execution order of how the scope is ended), and subscribers should be robust enough to handle clean-up of all connections at the end of the scope either way. --- .../CallViewModel/remoteMembers/ConnectionManager.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 0b9f939c..09a8e79f 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -10,7 +10,7 @@ import { type LivekitTransport, type ParticipantId, } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs"; +import { combineLatest, map, of, switchMap, tap } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; @@ -115,9 +115,6 @@ export function createConnectionManager$({ logger: parentLogger, }: Props): IConnectionManager { const logger = parentLogger.getChild("[ConnectionManager]"); - - const running$ = new BehaviorSubject(true); - scope.onEnd(() => running$.next(false)); // TODO logger: only construct one logger from the client and make it compatible via a EC specific sing /** @@ -129,10 +126,7 @@ export function createConnectionManager$({ * externally this is modified via `registerTransports()`. */ const transports$ = scope.behavior( - combineLatest([running$, inputTransports$]).pipe( - map(([running, transports]) => - transports.mapInner((transport) => (running ? transport : [])), - ), + inputTransports$.pipe( map((transports) => transports.mapInner(removeDuplicateTransports)), tap(({ value: transports }) => { logger.trace( From 2f3f9f95eb6ed5961ff7769c246b0a29a97d181c Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 8 Dec 2025 23:38:15 -0500 Subject: [PATCH 06/24] Use more compact optional chaining and coalescing notation --- src/state/CallViewModel/remoteMembers/ConnectionManager.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 09a8e79f..755ba3dd 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -60,11 +60,7 @@ export class ConnectionManagerData { transport: LivekitTransport, ): (LocalParticipant | RemoteParticipant)[] { const key = transport.livekit_service_url + "|" + transport.livekit_alias; - const existing = this.store.get(key); - if (existing) { - return existing[1]; - } - return []; + return this.store.get(key)?.[1] ?? []; } /** * Get all connections where the given participant is publishing. From 6ee3ef27954d1148ad4e3d7854d84431fb6c349b Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 8 Dec 2025 23:38:54 -0500 Subject: [PATCH 07/24] Edit a misleading log line The factory function is called once per item to construct the item. It is not called on future updates to the item's data. --- src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 2f152630..79ad933c 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -108,7 +108,7 @@ export function createMatrixLivekitMembers$({ // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. (scope, data$, participantId, userId) => { logger.debug( - `Updating data$ for participantId: ${participantId}, userId: ${userId}`, + `Generating member for participantId: ${participantId}, userId: ${userId}`, ); // will only get called once per `participantId, userId` pair. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. From bf801364a68927e0cd67ccfe8d53876507333ff0 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 9 Dec 2025 15:23:30 +0100 Subject: [PATCH 08/24] cleanup and tests --- src/state/CallViewModel/CallViewModel.ts | 16 +- ...Membership.test.ts => LocalMember.test.ts} | 180 ++++++------ .../{LocalMembership.ts => LocalMember.ts} | 256 +++++++++--------- .../localMember/Publisher.test.ts | 13 +- .../CallViewModel/localMember/Publisher.ts | 30 +- .../remoteMembers/Connection.test.ts | 48 ++-- .../CallViewModel/remoteMembers/Connection.ts | 28 +- yarn.lock | 6 +- 8 files changed, 302 insertions(+), 275 deletions(-) rename src/state/CallViewModel/localMember/{LocalMembership.test.ts => LocalMember.test.ts} (74%) rename src/state/CallViewModel/localMember/{LocalMembership.ts => LocalMember.ts} (77%) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..e06990b2 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -94,14 +94,13 @@ import { type SpotlightLandscapeLayoutMedia, type SpotlightPortraitLayoutMedia, } from "../layout-types.ts"; -import { type ElementCallError } from "../../utils/errors.ts"; +import { ElementCallError } from "../../utils/errors.ts"; import { type ObservableScope } from "../ObservableScope.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./localMember/LocalMembership.ts"; +} from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createMemberships$, @@ -452,13 +451,13 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { + joinMatrixRTC: (transport: LivekitTransport) => { return enterRTCSession( matrixRTCSession, transport, @@ -1455,7 +1454,7 @@ export function createCallViewModel$( ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: localMembership.requestConnect, + join: localMembership.requestJoinAndPublish, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, @@ -1465,9 +1464,8 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === RTCBackendState.Error), - map((s) => s.error), + localMembership.localMemberState$.pipe( + filter((v) => v instanceof ElementCallError), ), null, ), diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts similarity index 74% rename from src/state/CallViewModel/localMember/LocalMembership.test.ts rename to src/state/CallViewModel/localMember/LocalMember.test.ts index 1ef7abd6..2f8d11a5 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { - Status, + Status as RTCMemberStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -15,11 +15,7 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { - ConnectionState as LivekitConnectionState, - type LocalParticipant, - type LocalTrack, -} from "livekit-client"; +import { type LocalParticipant, type LocalTrack } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -30,16 +26,19 @@ import { withTestScheduler, } from "../../../utils/test"; import { + TransportState, createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./LocalMembership"; + PublishState, + TrackState, +} from "./LocalMember"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; -import { type Connection } from "../remoteMembers/Connection"; +import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; +import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); @@ -200,21 +199,18 @@ describe("LocalMembership", () => { joinMatrixRTC: async (): Promise => {}, homeserverConnected: { combined$: constant(true), - rtsSession$: constant(Status.Connected), + rtsSession$: constant(RTCMemberStatus.Connected), }, }; it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const goodTransport = { - livekit_service_url: "other", - } as LivekitTransport; - - const localTransport$ = scope.behavior( + const localTransport$ = scope.behavior( hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - goodTransport, + null, ); + // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { transports$: scope.behavior( localTransport$.pipe(map((t) => new Epoch([t]))), @@ -230,15 +226,11 @@ describe("LocalMembership", () => { connectionManager: mockConnectionManager, localTransport$, }); + localMembership.requestJoinAndPublish(); - expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: RTCBackendState.WaitingForConnection }, - e: { - state: RTCBackendState.Error, - error: expect.toSatisfy( - (e) => e instanceof MatrixRTCTransportMissingError, - ), - }, + expectObservable(localMembership.localMemberState$).toBe("ne", { + n: TransportState.Waiting, + e: expect.toSatisfy((e) => e instanceof MatrixRTCTransportMissingError), }); }); }); @@ -250,32 +242,24 @@ describe("LocalMembership", () => { livekit_service_url: "b", } as LivekitTransport; - const connectionManagerData = new ConnectionManagerData(); - - connectionManagerData.add( - { - livekitRoom: mockLivekitRoom({ - localParticipant: { - isScreenShareEnabled: false, - trackPublications: [], - } as unknown as LocalParticipant, - }), - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: aTransport, - } as unknown as Connection, - [], - ); - connectionManagerData.add( - { - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: bTransport, - } as unknown as Connection, - [], - ); + const connectionTransportAConnected = { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant(ConnectionState.LivekitConnected), + transport: aTransport, + } as unknown as Connection; + const connectionTransportAConnecting = { + ...connectionTransportAConnected, + state$: constant(ConnectionState.LivekitConnecting), + } as unknown as Connection; + const connectionTransportBConnected = { + state$: constant(ConnectionState.LivekitConnected), + transport: bTransport, + } as unknown as Connection; it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { const scope = new ObservableScope(); @@ -300,6 +284,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + connectionManagerData.add(connectionTransportBConnected, []); createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -359,6 +346,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + // connectionManagerData.add(connectionTransportB, []); const localMembership = createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -385,10 +375,11 @@ describe("LocalMembership", () => { it("tracks livekit state correctly", async () => { const scope = new ObservableScope(); + const connectionManagerData = new ConnectionManagerData(); const localTransport$ = new BehaviorSubject(null); - const connectionManagerData$ = new BehaviorSubject< - Epoch - >(new Epoch(new ConnectionManagerData())); + const connectionManagerData$ = new BehaviorSubject( + new Epoch(connectionManagerData), + ); const publishers: Publisher[] = []; const tracks$ = new BehaviorSubject([]); @@ -434,19 +425,45 @@ describe("LocalMembership", () => { }); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForTransport, - }); + expect(localMembership.localMemberState$.value).toStrictEqual( + TransportState.Waiting, + ); localTransport$.next(aTransport); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForConnection, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { connection: null, tracks: TrackState.WaitingForUser }, }); - connectionManagerData$.next(new Epoch(connectionManagerData)); + + const connectionManagerData2 = new ConnectionManagerData(); + connectionManagerData2.add( + // clone because we will mutate this later. + { ...connectionTransportAConnecting } as unknown as Connection, + [], + ); + + connectionManagerData$.next(new Epoch(connectionManagerData2)); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitConnectionState.Connected, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnecting, + tracks: TrackState.WaitingForUser, + }, }); + + ( + connectionManagerData2.getConnectionForTransport(aTransport)! + .state$ as BehaviorSubject + ).next(ConnectionState.LivekitConnected); + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnected, + tracks: TrackState.WaitingForUser, + }, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -455,37 +472,46 @@ describe("LocalMembership", () => { // ------- await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.CreatingTracks, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + tracks: TrackState.Creating, + connection: ConnectionState.LivekitConnected, + }, }); createTrackResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ReadyToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.WaitingForUser); // ------- - localMembership.requestConnect(); + localMembership.requestJoinAndPublish(); // ------- - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Starting); publishResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); - expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + expect(localMembership.localMemberState$.isStopped).toBe(false); scope.end(); await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMember.ts similarity index 77% rename from src/state/CallViewModel/localMember/LocalMembership.ts rename to src/state/CallViewModel/localMember/LocalMember.ts index 6a31ce4b..e2fcc70e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -11,7 +11,6 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -36,62 +35,66 @@ import { import { type Logger } from "matrix-js-sdk/lib/logger"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { constant, type Behavior } from "../../Behavior"; -import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { type ObservableScope } from "../../ObservableScope"; -import { type Publisher } from "./Publisher"; -import { type MuteStates } from "../../MuteStates"; +import { constant, type Behavior } from "../../Behavior.ts"; +import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts"; +import { type ObservableScope } from "../../ObservableScope.ts"; +import { type Publisher } from "./Publisher.ts"; +import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, MembershipManagerError, UnknownCallError, -} from "../../../utils/errors"; -import { ElementWidgetActions, widget } from "../../../widget"; +} from "../../../utils/errors.ts"; +import { ElementWidgetActions, widget } from "../../../widget.ts"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; import { - type ConnectionState, + ConnectionState, type Connection, + type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; -export enum RTCBackendState { - Error = "error", +export enum TransportState { /** Not even a transport is available to the LocalMembership */ - WaitingForTransport = "waiting_for_transport", - /** A connection appeared so we can initialise the publisher */ - WaitingForConnection = "waiting_for_connection", - /** Implies lk connection is connected */ - CreatingTracks = "creating_tracks", - /** Implies lk connection is connected */ - ReadyToPublish = "ready_to_publish", - /** Implies lk connection is connected */ - WaitingToPublish = "waiting_to_publish", - /** Implies lk connection is connected */ - ConnectedAndPublishing = "fully_connected", + Waiting = "transport_waiting", } -type LocalMemberRTCBackendState = - | { state: RTCBackendState.Error; error: ElementCallError } - | { state: Exclude } - | ConnectionState; - -export enum MatrixAdditionalState { - WaitingForTransport = "waiting_for_transport", +export enum PublishState { + WaitingForUser = "publish_waiting_for_user", + /** Implies lk connection is connected */ + Starting = "publish_start_publishing", + /** Implies lk connection is connected */ + Publishing = "publish_publishing", } -type LocalMemberMatrixState = - | { state: MatrixAdditionalState.WaitingForTransport } - | { state: "Error"; error: Error } - | { state: RTCSessionStatus }; - -export interface LocalMemberConnectionState { - livekit$: Behavior; - matrix$: Behavior; +export enum TrackState { + /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ + WaitingForUser = "tracks_waiting_for_user", + /** Implies lk connection is connected */ + Creating = "tracks_creating", + /** Implies lk connection is connected */ + Ready = "tracks_ready", } +export type LocalMemberMediaState = + | { + tracks: TrackState; + connection: ConnectionState | FailedToStartError; + } + | PublishState + | ElementCallError; +export type LocalMemberMatrixState = Error | RTCSessionStatus; +export type LocalMemberState = + | ElementCallError + | TransportState.Waiting + | { + media: LocalMemberMediaState; + matrix: LocalMemberMatrixState; + }; + /* * - get well known * - get oldest membership @@ -146,16 +149,16 @@ export const createLocalMembership$ = ({ matrixRTCSession, }: Props): { /** - * This starts audio and video tracks. They will be reused when calling `requestConnect`. + * This starts audio and video tracks. They will be reused when calling `requestPublish`. */ startTracks: () => Behavior; /** - * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. */ - requestConnect: () => void; + requestJoinAndPublish: () => void; requestDisconnect: () => void; - connectionState: LocalMemberConnectionState; + localMemberState$: Behavior; sharingScreen$: Behavior; /** * Callback to toggle screen sharing. If null, screen sharing is not possible. @@ -164,11 +167,11 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Reconnecting + /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting * Direct translation to the js-sdk membership manager connection `Status`. */ reconnecting$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Disconnected + /** Shorthand for homeserverConnected.rtcSession === Status.Disconnected * Direct translation to the js-sdk membership manager connection `Status`. */ disconnected$: Behavior; @@ -190,7 +193,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - setLivekitError(error); + setTransportError(error); return of(null); }), ), @@ -223,19 +226,13 @@ export const createLocalMembership$ = ({ // MATRIX RELATED - const reconnecting$ = scope.behavior( - homeserverConnected.rtsSession$.pipe( - map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), - ), - ); - // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. const trackStartRequested = Promise.withResolvers(); // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); + const joinAndPublishRequested$ = new BehaviorSubject(false); /** * The publisher is stored in here an abstracts creating and publishing tracks. @@ -256,13 +253,13 @@ export const createLocalMembership$ = ({ return tracks$; }; - const requestConnect = (): void => { + const requestJoinAndPublish = (): void => { trackStartRequested.resolve(); - connectRequested$.next(true); + joinAndPublishRequested$.next(true); }; const requestDisconnect = (): void => { - connectRequested$.next(false); + joinAndPublishRequested$.next(false); }; // Take care of the publisher$ @@ -300,112 +297,129 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), - async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldConnect) { + scope.behavior( + combineLatest([publisher$, tracks$, joinAndPublishRequested$]), + ), + async ([publisher, tracks, shouldJoinAndPublish]) => { + if (shouldJoinAndPublish === publisher?.publishing$.value) return; + if (tracks.length !== 0 && shouldJoinAndPublish) { try { await publisher?.startPublishing(); } catch (error) { - setLivekitError(error as ElementCallError); + setMediaError(error as ElementCallError); } - } else if (tracks.length !== 0 && !shouldConnect) { + } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setLivekitError(new UnknownCallError(error as Error)); + setMediaError(new UnknownCallError(error as Error)); } } }, ); - const fatalLivekitError$ = new BehaviorSubject(null); - const setLivekitError = (e: ElementCallError): void => { - if (fatalLivekitError$.value !== null) - logger.error("Multiple Livkit Errors:", e); - else fatalLivekitError$.next(e); + const fatalMediaError$ = new BehaviorSubject(null); + const setMediaError = (e: ElementCallError): void => { + if (fatalMediaError$.value !== null) + logger.error("Multiple Media Errors:", e); + else fatalMediaError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + + const fatalTransportError$ = new BehaviorSubject( + null, + ); + const setTransportError = (e: ElementCallError): void => { + if (fatalTransportError$.value !== null) + logger.error("Multiple Transport Errors:", e); + else fatalTransportError$.next(e); + }; + + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, - publisher$, localTransport$, - tracks$.pipe( - tap((t) => { - logger.info("tracks$: ", t); - }), - ), + tracks$, publishing$, - connectRequested$, + joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), startWith(false), ), - fatalLivekitError$, ]).pipe( map( ([ localConnectionState, - publisher, localTransport, tracks, publishing, - shouldConnect, + shouldPublish, shouldStartTracks, - error, ]) => { - // read this: - // if(!) return {state: ...} - // if(!) return {state: } - // - // as: - // We do have but not yet so we are in - if (error !== null) return { state: RTCBackendState.Error, error }; + if (!localTransport) return null; const hasTracks = tracks.length > 0; - if (!localTransport) - return { state: RTCBackendState.WaitingForTransport }; - if (!localConnectionState) - return { state: RTCBackendState.WaitingForConnection }; + let trackState: TrackState = TrackState.WaitingForUser; + if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; + if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + if ( - localConnectionState.state !== LivekitConnectionState.Connected || - !publisher + localConnectionState !== ConnectionState.LivekitConnected || + trackState !== TrackState.Ready ) - // pass through the localConnectionState while we do not yet have a publisher or the state - // of the connection is not yet connected - return { state: localConnectionState.state }; - if (!shouldStartTracks) - return { state: LivekitConnectionState.Connected }; - if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; - if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; - if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.ConnectedAndPublishing }; + return { + connection: localConnectionState, + tracks: trackState, + }; + if (!shouldPublish) return PublishState.WaitingForUser; + if (!publishing) return PublishState.Starting; + return PublishState.Publishing; }, ), distinctUntilChanged(deepCompare), ), ); - const fatalMatrixError$ = new BehaviorSubject(null); const setMatrixError = (e: ElementCallError): void => { if (fatalMatrixError$.value !== null) logger.error("Multiple Matrix Errors:", e); else fatalMatrixError$.next(e); }; - const matrixState$: Behavior = scope.behavior( - combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( - map(([localTransport, rtcSessionStatus]) => { - if (!localTransport) - return { state: MatrixAdditionalState.WaitingForTransport }; - return { state: rtcSessionStatus }; - }), + + const localMemberState$ = scope.behavior( + combineLatest([ + mediaState$, + homeserverConnected.rtsSession$, + fatalMatrixError$, + fatalTransportError$, + fatalMediaError$, + ]).pipe( + map( + ([ + mediaState, + rtcSessionStatus, + matrixError, + transportError, + mediaError, + ]) => { + if (transportError !== null) return transportError; + // `mediaState` will be 'null' until the transport appears. + if (mediaState && rtcSessionStatus) + return { + matrix: matrixError ?? rtcSessionStatus, + media: mediaError ?? mediaState, + }; + else { + return TransportState.Waiting; + } + }, + ), ), ); // inform the widget about the connect and disconnect intent from the user. scope - .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ undefined, - connectRequested$.value, + joinAndPublishRequested$.value, ]) .subscribe(([prev, current]) => { if (!widget) return; @@ -434,7 +448,7 @@ export const createLocalMembership$ = ({ // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( - scope.behavior(combineLatest([localTransport$, connectRequested$])), + scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])), async ([transport, shouldConnect]) => { if (!transport) return; // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. @@ -555,21 +569,19 @@ export const createLocalMembership$ = ({ return { startTracks, - requestConnect, + requestJoinAndPublish, requestDisconnect, - connectionState: { - livekit$: livekitState$, - matrix$: matrixState$, - }, + localMemberState$, tracks$, participant$, - reconnecting$, + reconnecting$: scope.behavior( + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), + ), + ), disconnected$: scope.behavior( - matrixState$.pipe( - map( - (sessionStatus) => - sessionStatus.state === RTCSessionStatus.Disconnected, - ), + homeserverConnected.rtsSession$.pipe( + map((state) => state === RTCSessionStatus.Disconnected), ), ), sharingScreen$, diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 5468d1ff..6d27c042 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -52,9 +52,7 @@ describe("Publisher", () => { } as unknown as MuteStates; scope = new ObservableScope(); connection = { - state$: constant({ - state: LivekitConenctionState.Connected, - }), + state$: constant(LivekitConenctionState.Connected), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), }), @@ -110,15 +108,14 @@ describe("Publisher", () => { // failiour due to connection.state$ const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next({ - state: "FailedToStart", - error: Error("testStartError"), - }); + (connection.state$ as BehaviorSubject).next(Error("testStartError")); await expect(publisher.startPublishing()).rejects.toThrow( new FailToStartLivekitConnection("testStartError"), ); - (connection.state$ as BehaviorSubject).next(beforeState); + (connection.state$ as BehaviorSubject).next( + beforeState, + ); // does not try other conenction after the first one failed expect( diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 6e4a9b35..b32e7e99 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,7 +32,10 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { ElementCallError, @@ -158,20 +161,17 @@ export class Publisher { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((s) => { - switch (s.state) { - case LivekitConnectionState.Connected: - resolve(); - break; - case "FailedToStart": - reject( - s.error instanceof ElementCallError - ? s.error - : new FailToStartLivekitConnection(s.error.message), - ); - break; - default: - this.logger.info("waiting for connection: ", s.state); + const sub = this.connection.state$.subscribe((state) => { + if (state instanceof Error) { + const error = + state instanceof ElementCallError + ? state + : new FailToStartLivekitConnection(state.message); + reject(error); + } else if (state === ConnectionState.LivekitConnected) { + resolve(); + } else { + this.logger.info("waiting for connection: ", state); } }); try { diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index efee1ccb..a90f0aa2 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -30,8 +30,8 @@ import { logger } from "matrix-js-sdk/lib/logger"; import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { Connection, + ConnectionState, type ConnectionOpts, - type ConnectionState, type PublishingParticipant, } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; @@ -151,7 +151,7 @@ describe("Start connection states", () => { }; const connection = new Connection(opts, logger); - expect(connection.state$.getValue().state).toEqual("Initialized"); + expect(connection.state$.getValue()).toEqual("Initialized"); }); it("fail to getOpenId token then error state", async () => { @@ -167,7 +167,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -187,22 +187,20 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState!.state).toEqual("FetchingConfig"); + expect(capturedState!).toEqual("FetchingConfig"); deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token"))); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState!.state === "FailedToStart") { - expect(capturedState!.error.message).toEqual("Something went wrong"); + if (capturedState instanceof Error) { + expect(capturedState.message).toEqual("Something went wrong"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -219,7 +217,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -241,24 +239,22 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState?.state === "FailedToStart") { - expect(capturedState?.error.message).toContain( + if (capturedState instanceof Error) { + expect(capturedState.message).toContain( "SFU Config fetch failed with exception Error", ); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -275,7 +271,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -305,17 +301,15 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState && capturedState.state === "FailedToStart") { - expect(capturedState.error.message).toContain( - "Failed to connect to livekit", - ); + if (capturedState instanceof Error) { + expect(capturedState.message).toContain("Failed to connect to livekit"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); @@ -332,7 +326,7 @@ describe("Start connection states", () => { const connection = setupRemoteConnection(); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -342,13 +336,13 @@ describe("Start connection states", () => { await vi.runAllTimersAsync(); const initialState = capturedStates.shift(); - expect(initialState?.state).toEqual("Initialized"); + expect(initialState).toEqual(ConnectionState.Initialized); const fetchingState = capturedStates.shift(); - expect(fetchingState?.state).toEqual("FetchingConfig"); + expect(fetchingState).toEqual(ConnectionState.FetchingConfig); const connectingState = capturedStates.shift(); - expect(connectingState?.state).toEqual("ConnectingToLkRoom"); + expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("connected"); + expect(connectedState).toEqual(ConnectionState.LivekitConnected); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 549777f9..29ad7a8c 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,6 @@ import { } from "@livekit/components-core"; import { ConnectionError, - ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -55,14 +54,21 @@ export class FailedToStartError extends Error { } export enum ConnectionState { + /** The start state of a connection. It has been created but nothing has loaded yet. */ Initialized = "Initialized", + /** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */ FetchingConfig = "FetchingConfig", Stopped = "Stopped", ConnectingToLkRoom = "ConnectingToLkRoom", + /** The same as ConnectionState.Disconnected from `livekit-client` */ LivekitDisconnected = "disconnected", + /** The same as ConnectionState.Connecting from `livekit-client` */ LivekitConnecting = "connecting", + /** The same as ConnectionState.Connected from `livekit-client` */ LivekitConnected = "connected", + /** The same as ConnectionState.Reconnecting from `livekit-client` */ LivekitReconnecting = "reconnecting", + /** The same as ConnectionState.SignalReconnecting from `livekit-client` */ LivekitSignalReconnecting = "signalReconnecting", } @@ -73,15 +79,14 @@ export enum ConnectionState { */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject< - ConnectionState | FailedToStartError - >(ConnectionState.Initialized); + private readonly _state$ = new BehaviorSubject( + ConnectionState.Initialized, + ); /** * The current state of the connection to the media transport. */ - public readonly state$: Behavior = - this._state$; + public readonly state$: Behavior = this._state$; /** * The media transport to connect to. @@ -161,15 +166,12 @@ export class Connection { connectionStateObserver(this.livekitRoom) .pipe(this.scope.bind()) .subscribe((lkState) => { - // It si save to cast lkState to ConnectionState as they are fully overlapping. + // It is save to cast lkState to ConnectionState as they are fully overlapping. this._state$.next(lkState as unknown as ConnectionState); }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); - this._state$.next({ - state: "FailedToStart", - error: error instanceof Error ? error : new Error(`${error}`), - }); + this._state$.next(error instanceof Error ? error : new Error(`${error}`)); throw error; } } @@ -194,9 +196,7 @@ export class Connection { ); if (this.stopped) return; await this.livekitRoom.disconnect(); - this._state$.next({ - state: ConnectionAdditionalState.Stopped, - }); + this._state$.next(ConnectionState.Stopped); this.stopped = true; } diff --git a/yarn.lock b/yarn.lock index 94b73130..f0ca83a7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10353,8 +10353,8 @@ __metadata: linkType: hard "matrix-js-sdk@npm:^39.2.0": - version: 39.2.0 - resolution: "matrix-js-sdk@npm:39.2.0" + version: 39.3.0 + resolution: "matrix-js-sdk@npm:39.3.0" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" @@ -10370,7 +10370,7 @@ __metadata: sdp-transform: "npm:^3.0.0" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 + checksum: 10c0/031c9ec042e00c32dc531f82fc59c64cc25fb665abfc642b1f0765c530d60684f8bd63daf0cdd0dbe96b4f87ea3f4148f9d3f024a59d57eceaec1ce5d0164755 languageName: node linkType: hard From 7af89b421693b58d06baf2f044bdd3e39d97a623 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 9 Dec 2025 17:36:56 +0100 Subject: [PATCH 09/24] fix lint --- src/state/CallViewModel/localMember/LocalMember.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index 2f8d11a5..6a9f196e 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -38,7 +38,6 @@ import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; -import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); From 0ebc6078dd5cae4f8a2317e4ffb22f128ebd1e75 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 12:08:59 +0100 Subject: [PATCH 10/24] Update LocalMember.ts --- .../CallViewModel/localMember/LocalMember.ts | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index e2fcc70e..193dd53c 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -407,9 +407,7 @@ export const createLocalMembership$ = ({ matrix: matrixError ?? rtcSessionStatus, media: mediaError ?? mediaState, }; - else { - return TransportState.Waiting; - } + return TransportState.Waiting; }, ), ), @@ -423,19 +421,21 @@ export const createLocalMembership$ = ({ ]) .subscribe(([prev, current]) => { if (!widget) return; + // JOIN prev=false (was left) => current-true (now joiend) if (!prev && current) { - try { - void widget.api.transport.send(ElementWidgetActions.JoinCall, {}); - } catch (e) { - logger.error("Failed to send join action", e); - } + widget.api.transport + .send(ElementWidgetActions.JoinCall, {}) + .catch((e) => { + logger.error("Failed to send join action", e); + }); } + // LEAVE prev=false (was joined) => current-true (now left) if (prev && !current) { - try { - void widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } + widget.api.transport + .send(ElementWidgetActions.HangupCall, {}) + .catch((e) => { + logger.error("Failed to send hangup action", e); + }); } }); @@ -575,8 +575,12 @@ export const createLocalMembership$ = ({ tracks$, participant$, reconnecting$: scope.behavior( - homeserverConnected.rtsSession$.pipe( - map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), + localMemberState$.pipe( + map((state) => { + if (typeof state === "object" && "matrix" in state) + return state.matrix === RTCSessionStatus.Reconnecting; + return false; + }), ), ), disconnected$: scope.behavior( From 6efce232f81528c4df4b61f56393dc62e0040549 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 18:50:19 +0100 Subject: [PATCH 11/24] fix playwright tests --- src/state/CallViewModel/CallViewModel.ts | 63 ++++++++++---- .../CallViewModel/localMember/LocalMember.ts | 82 ++++++++++++------- .../CallViewModel/localMember/Publisher.ts | 60 +++++++------- .../CallViewModel/remoteMembers/Connection.ts | 4 +- 4 files changed, 129 insertions(+), 80 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index e04f4698..35ab658b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -99,6 +99,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, + TransportState, } from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -577,17 +578,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -635,7 +625,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -648,7 +638,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -739,7 +729,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -1422,6 +1412,37 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; + const errors$ = scope.behavior<{ + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } | null>( + localMembership.localMemberState$.pipe( + map((value) => { + const returnObject: { + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } = {}; + if (value instanceof ElementCallError) return { transportError: value }; + if (value === TransportState.Waiting) return null; + if (value.matrix instanceof ElementCallError) + returnObject.matrixError = value.matrix; + if (value.media instanceof ElementCallError) + returnObject.publishError = value.media; + else if ( + typeof value.media === "object" && + value.media.connection instanceof ElementCallError + ) + returnObject.connectionError = value.media.connection; + return returnObject; + }), + ), + null, + ); + return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, @@ -1438,8 +1459,16 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.localMemberState$.pipe( - filter((v) => v instanceof ElementCallError), + errors$.pipe( + map((errors) => { + return ( + errors?.transportError ?? + errors?.matrixError ?? + errors?.connectionError ?? + null + ); + }), + filter((error) => error !== null), ), null, ), @@ -1472,7 +1501,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index df42cba9..532d5d55 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -42,6 +42,7 @@ import { type Publisher } from "./Publisher.ts"; import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, + FailToStartLivekitConnection, MembershipManagerError, UnknownCallError, } from "../../../utils/errors.ts"; @@ -56,6 +57,7 @@ import { type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; +import { and$ } from "../../../utils/observable.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -86,13 +88,12 @@ export type LocalMemberMediaState = } | PublishState | ElementCallError; -export type LocalMemberMatrixState = Error | RTCSessionStatus; export type LocalMemberState = | ElementCallError | TransportState.Waiting | { media: LocalMemberMediaState; - matrix: LocalMemberMatrixState; + matrix: ElementCallError | RTCSessionStatus; }; /* @@ -220,10 +221,6 @@ export const createLocalMembership$ = ({ ), ); - const localConnectionState$ = localConnection$.pipe( - switchMap((connection) => (connection ? connection.state$ : of(null))), - ); - // MATRIX RELATED // This should be used in a combineLatest with publisher$ to connect. @@ -308,23 +305,27 @@ export const createLocalMembership$ = ({ try { await publisher?.startPublishing(); } catch (error) { - setMediaError(error as ElementCallError); + const message = + error instanceof Error ? error.message : String(error); + setPublishError(new FailToStartLivekitConnection(message)); } } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setMediaError(new UnknownCallError(error as Error)); + setPublishError(new UnknownCallError(error as Error)); } } }, ); - const fatalMediaError$ = new BehaviorSubject(null); - const setMediaError = (e: ElementCallError): void => { - if (fatalMediaError$.value !== null) - logger.error("Multiple Media Errors:", e); - else fatalMediaError$.next(e); + // STATE COMPUTATION + + // These are non fatal since we can join a room and concume media even though publishing failed. + const publishError$ = new BehaviorSubject(null); + const setPublishError = (e: ElementCallError): void => { + if (publishError$.value !== null) logger.error("Multiple Media Errors:", e); + else publishError$.next(e); }; const fatalTransportError$ = new BehaviorSubject( @@ -336,6 +337,10 @@ export const createLocalMembership$ = ({ else fatalTransportError$.next(e); }; + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, @@ -392,22 +397,22 @@ export const createLocalMembership$ = ({ homeserverConnected.rtsSession$, fatalMatrixError$, fatalTransportError$, - fatalMediaError$, + publishError$, ]).pipe( map( ([ mediaState, rtcSessionStatus, - matrixError, - transportError, - mediaError, + fatalMatrixError, + fatalTransportError, + publishError, ]) => { - if (transportError !== null) return transportError; - // `mediaState` will be 'null' until the transport appears. + if (fatalTransportError !== null) return fatalTransportError; + // `mediaState` will be 'null' until the transport/connection appears. if (mediaState && rtcSessionStatus) return { - matrix: matrixError ?? rtcSessionStatus, - media: mediaError ?? mediaState, + matrix: fatalMatrixError ?? rtcSessionStatus, + media: publishError ?? mediaState, }; return TransportState.Waiting; }, @@ -415,6 +420,31 @@ export const createLocalMembership$ = ({ ), ); + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + const matrixAndLivekitConnected$ = scope.behavior( + and$( + homeserverConnected.combined$, + localConnectionState$.pipe( + map((state) => state === ConnectionState.LivekitConnected), + ), + ).pipe( + tap((v) => logger.debug("livekit+matrix: Connected state changed", v)), + ), + ); + + /** + * Whether we should tell the user that we're reconnecting to the call. + */ + const reconnecting$ = scope.behavior( + matrixAndLivekitConnected$.pipe( + pairwise(), + map(([prev, current]) => prev === true && current === false), + ), + ); + // inform the widget about the connect and disconnect intent from the user. scope .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ @@ -576,15 +606,7 @@ export const createLocalMembership$ = ({ localMemberState$, tracks$, participant$, - reconnecting$: scope.behavior( - localMemberState$.pipe( - map((state) => { - if (typeof state === "object" && "matrix" in state) - return state.matrix === RTCSessionStatus.Reconnecting; - return false; - }), - ), - ), + reconnecting$, disconnected$: scope.behavior( homeserverConnected.rtsSession$.pipe( map((state) => state === RTCSessionStatus.Disconnected), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index b32e7e99..df67f179 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,15 +32,8 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { - ConnectionState, - type Connection, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; -import { - ElementCallError, - FailToStartLivekitConnection, -} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -160,27 +153,29 @@ export class Publisher { public async startPublishing(): Promise { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; - const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((state) => { - if (state instanceof Error) { - const error = - state instanceof ElementCallError - ? state - : new FailToStartLivekitConnection(state.message); - reject(error); - } else if (state === ConnectionState.LivekitConnected) { - resolve(); - } else { - this.logger.info("waiting for connection: ", state); - } - }); - try { - await promise; - } catch (e) { - throw e; - } finally { - sub.unsubscribe(); - } + + // we do not need to do this since lk will wait in `localParticipant.publishTrack` + // const { promise, resolve, reject } = Promise.withResolvers(); + // const sub = this.connection.state$.subscribe((state) => { + // if (state instanceof Error) { + // const error = + // state instanceof ElementCallError + // ? state + // : new FailToStartLivekitConnection(state.message); + // reject(error); + // } else if (state === ConnectionState.LivekitConnected) { + // resolve(); + // } else { + // this.logger.info("waiting for connection: ", state); + // } + // }); + // try { + // await promise; + // } catch (e) { + // throw e; + // } finally { + // sub.unsubscribe(); + // } for (const track of this.tracks$.value) { this.logger.info("publish ", this.tracks$.value.length, "tracks"); @@ -188,9 +183,10 @@ export class Publisher { // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { this.logger.error("Failed to publish track", error); - throw new FailToStartLivekitConnection( - error instanceof Error ? error.message : error, - ); + // throw new FailToStartLivekitConnection( + // error instanceof Error ? error.message : error, + // ); + throw error; }); this.logger.info("published track ", track.kind, track.id); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 29ad7a8c..2fd9eaa8 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -150,7 +150,8 @@ export class Connection { throw new InsufficientCapacityError(); } if (e.status === 404) { - // error msg is "Could not establish signal connection: requested room does not exist" + // error msg is "Failed to create call" + // error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists." // The room does not exist. There are two different modes of operation for the SFU: // - the room is created on the fly when connecting (livekit `auto_create` option) // - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service) @@ -172,6 +173,7 @@ export class Connection { } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next(error instanceof Error ? error : new Error(`${error}`)); + // Its okay to ignore the throw. The error is part of the state. throw error; } } From 1941fc9ca1cc17becdc170e5c5b691ae8edd6c0f Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 19:12:52 +0100 Subject: [PATCH 12/24] fix tests. --- src/state/CallViewModel/localMember/LocalMember.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 532d5d55..73908fcb 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -443,6 +443,7 @@ export const createLocalMembership$ = ({ pairwise(), map(([prev, current]) => prev === true && current === false), ), + false, ); // inform the widget about the connect and disconnect intent from the user. From 667a3d0e3d911ad602b48ae4906ef0da6dc3f085 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 19:18:16 +0100 Subject: [PATCH 13/24] fix test not checking for livekit connection state anymore. --- .../CallViewModel/localMember/Publisher.test.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 6d27c042..68845fa2 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -98,7 +98,7 @@ describe("Publisher", () => { ).mockRejectedValue(Error("testError")); await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testError"), + new Error("testError"), ); // does not try other conenction after the first one failed @@ -106,17 +106,6 @@ describe("Publisher", () => { connection.livekitRoom.localParticipant.publishTrack, ).toHaveBeenCalledTimes(1); - // failiour due to connection.state$ - const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next(Error("testStartError")); - - await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testStartError"), - ); - (connection.state$ as BehaviorSubject).next( - beforeState, - ); - // does not try other conenction after the first one failed expect( connection.livekitRoom.localParticipant.publishTrack, From b380532d3080275388810ec124d578dd57a787b5 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:14:13 +0100 Subject: [PATCH 14/24] lots of error logging and fixing playwright --- src/room/GroupCallView.tsx | 1 + src/room/InCallView.tsx | 7 ++++++- src/room/LobbyView.tsx | 4 ++-- src/state/CallViewModel/CallViewModel.ts | 15 +++++++++++++- .../CallViewModel/localMember/LocalMember.ts | 20 +++++++++++++------ .../localMember/LocalTransport.ts | 2 +- .../CallViewModel/remoteMembers/Connection.ts | 14 +++++++++---- 7 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index dfd11ff3..1542678e 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -446,6 +446,7 @@ export const GroupCallView: FC = ({ let body: ReactNode; if (externalError) { + logger.debug("External error occurred:", externalError); // If an error was recorded within this component but outside // GroupCallErrorBoundary, create a component that rethrows the error from // within the error boundary, so it can be handled uniformly diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 7ae3700c..18acf843 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -127,6 +127,7 @@ export const ActiveCall: FC = (props) => { const mediaDevices = useMediaDevices(); const trackProcessorState$ = useTrackProcessorObservable$(); useEffect(() => { + logger.info("START CALL VIEW SCOPE"); const scope = new ObservableScope(); const reactionsReader = new ReactionsReader(scope, props.rtcSession); const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } = @@ -153,6 +154,7 @@ export const ActiveCall: FC = (props) => { vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); return (): void => { + logger.info("END CALL VIEW SCOPE"); scope.end(); }; }, [ @@ -271,7 +273,10 @@ export const InCallView: FC = ({ const ringOverlay = useBehavior(vm.ringOverlay$); const fatalCallError = useBehavior(vm.fatalError$); // Stop the rendering and throw for the error boundary - if (fatalCallError) throw fatalCallError; + if (fatalCallError) { + logger.debug("fatalCallError stop rendering", fatalCallError); + throw fatalCallError; + } // We need to set the proper timings on the animation based upon the sound length. const ringDuration = pickupPhaseAudio?.soundDuration["waiting"] ?? 1; diff --git a/src/room/LobbyView.tsx b/src/room/LobbyView.tsx index ad4f30b3..10e098f1 100644 --- a/src/room/LobbyView.tsx +++ b/src/room/LobbyView.tsx @@ -79,9 +79,9 @@ export const LobbyView: FC = ({ waitingForInvite, }) => { useEffect(() => { - logger.info("[Lifecycle] GroupCallView Component mounted"); + logger.info("[Lifecycle] LobbyView Component mounted"); return (): void => { - logger.info("[Lifecycle] GroupCallView Component unmounted"); + logger.info("[Lifecycle] LobbyView Component unmounted"); }; }, []); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 35ab658b..6a9eadea 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -15,6 +15,7 @@ import { } from "livekit-client"; import { type Room as MatrixRoom } from "matrix-js-sdk"; import { + catchError, combineLatest, distinctUntilChanged, filter, @@ -425,7 +426,18 @@ export function createCallViewModel$( connectionFactory: connectionFactory, inputTransports$: scope.behavior( combineLatest( - [localTransport$, membershipsAndTransports.transports$], + [ + localTransport$.pipe( + catchError((e) => { + logger.info( + "dont pass local transport to createConnectionManager$. localTransport$ threw an error", + e, + ); + return of(null); + }), + ), + membershipsAndTransports.transports$, + ], (localTransport, transports) => { const localTransportAsArray = localTransport ? [localTransport] : []; return transports.mapInner((transports) => [ @@ -1461,6 +1473,7 @@ export function createCallViewModel$( fatalError$: scope.behavior( errors$.pipe( map((errors) => { + logger.debug("errors$ to compute any fatal errors:", errors); return ( errors?.transportError ?? errors?.matrixError ?? diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 73908fcb..40fb62d6 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -324,17 +324,23 @@ export const createLocalMembership$ = ({ // These are non fatal since we can join a room and concume media even though publishing failed. const publishError$ = new BehaviorSubject(null); const setPublishError = (e: ElementCallError): void => { - if (publishError$.value !== null) logger.error("Multiple Media Errors:", e); - else publishError$.next(e); + if (publishError$.value !== null) { + logger.error("Multiple Media Errors:", e); + } else { + publishError$.next(e); + } }; const fatalTransportError$ = new BehaviorSubject( null, ); + const setTransportError = (e: ElementCallError): void => { - if (fatalTransportError$.value !== null) + if (fatalTransportError$.value !== null) { logger.error("Multiple Transport Errors:", e); - else fatalTransportError$.next(e); + } else { + fatalTransportError$.next(e); + } }; const localConnectionState$ = localConnection$.pipe( @@ -386,9 +392,11 @@ export const createLocalMembership$ = ({ ); const fatalMatrixError$ = new BehaviorSubject(null); const setMatrixError = (e: ElementCallError): void => { - if (fatalMatrixError$.value !== null) + if (fatalMatrixError$.value !== null) { logger.error("Multiple Matrix Errors:", e); - else fatalMatrixError$.next(e); + } else { + fatalMatrixError$.next(e); + } }; const localMemberState$ = scope.behavior( diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 0a85bbc1..1320b8c4 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -85,7 +85,7 @@ export const createLocalTransport$ = ({ * The transport that we would personally prefer to publish on (if not for the * transport preferences of others, perhaps). * - * @throws + * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ const preferredTransport$: Behavior = scope.behavior( customLivekitUrl.value$.pipe( diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 2fd9eaa8..c801b3ae 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -29,8 +29,10 @@ import { import { type Behavior } from "../../Behavior.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { + ElementCallError, InsufficientCapacityError, SFURoomCreationRestrictedError, + UnknownCallError, } from "../../../utils/errors.ts"; export type PublishingParticipant = LocalParticipant | RemoteParticipant; @@ -79,9 +81,9 @@ export enum ConnectionState { */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject( - ConnectionState.Initialized, - ); + private readonly _state$ = new BehaviorSubject< + ConnectionState | ElementCallError + >(ConnectionState.Initialized); /** * The current state of the connection to the media transport. @@ -131,6 +133,8 @@ export class Connection { this.stopped = false; try { this._state$.next(ConnectionState.FetchingConfig); + // We should already have this information after creating the localTransport. + // It would probably be better to forward this here. const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; @@ -172,7 +176,9 @@ export class Connection { }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); - this._state$.next(error instanceof Error ? error : new Error(`${error}`)); + this._state$.next( + error instanceof ElementCallError ? error : new UnknownCallError(error), + ); // Its okay to ignore the throw. The error is part of the state. throw error; } From 8dac0366b64d22da6ede6162dc3e2dcf7d57273b Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:17:33 +0100 Subject: [PATCH 15/24] fix lints --- src/state/CallViewModel/localMember/Publisher.test.ts | 6 +----- src/state/CallViewModel/remoteMembers/Connection.ts | 6 +++++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 68845fa2..40763a99 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -26,12 +26,8 @@ import { mockMediaDevices, } from "../../../utils/test"; import { Publisher } from "./Publisher"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection"; +import { type Connection } from "../remoteMembers/Connection"; import { type MuteStates } from "../../MuteStates"; -import { FailToStartLivekitConnection } from "../../../utils/errors"; describe("Publisher", () => { let scope: ObservableScope; diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c801b3ae..6015bf01 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -177,7 +177,11 @@ export class Connection { } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next( - error instanceof ElementCallError ? error : new UnknownCallError(error), + error instanceof ElementCallError + ? error + : error instanceof Error + ? new UnknownCallError(error) + : new UnknownCallError(new Error(`${error}`)), ); // Its okay to ignore the throw. The error is part of the state. throw error; From e626698fda8dd87213ccda763d3f04f86b830478 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:22:55 +0100 Subject: [PATCH 16/24] fix connection tests --- .../remoteMembers/Connection.test.ts | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index a90f0aa2..95ff931e 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -36,7 +36,10 @@ import { } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; -import { FailToGetOpenIdToken } from "../../../utils/errors.ts"; +import { + ElementCallError, + FailToGetOpenIdToken, +} from "../../../utils/errors.ts"; let testScope: ObservableScope; @@ -246,8 +249,11 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState instanceof Error) { - expect(capturedState.message).toContain( + if ( + capturedState instanceof ElementCallError && + capturedState.cause instanceof Error + ) { + expect(capturedState.cause.message).toContain( "SFU Config fetch failed with exception Error", ); expect(connection.transport.livekit_alias).toEqual( @@ -308,8 +314,13 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState instanceof Error) { - expect(capturedState.message).toContain("Failed to connect to livekit"); + if ( + capturedState instanceof ElementCallError && + capturedState.cause instanceof Error + ) { + expect(capturedState.cause.message).toContain( + "Failed to connect to livekit", + ); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); From aabd76044b7a4d9bc259f5ff1c76ac1eab3d8682 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:25:35 +0100 Subject: [PATCH 17/24] fix lint --- src/state/CallViewModel/CallViewModel.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 6a9eadea..aac88a3b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -428,7 +428,7 @@ export function createCallViewModel$( combineLatest( [ localTransport$.pipe( - catchError((e) => { + catchError((e: unknown) => { logger.info( "dont pass local transport to createConnectionManager$. localTransport$ threw an error", e, From 170a38c0bae050c752d99aed150f2dfd9605b86d Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 11:30:14 +0100 Subject: [PATCH 18/24] fix playwright incompatible browser toast --- playwright/fixtures/widget-user.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index 433c960b..b0c46788 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -113,7 +113,8 @@ async function registerUser( await page.getByRole("button", { name: "Register" }).click(); const continueButton = page.getByRole("button", { name: "Continue" }); try { - await expect(continueButton).toBeVisible({ timeout: 5000 }); + await expect(continueButton).toBeVisible({ timeout: 700 }); + // why do we need to put in the passwor if there is a continue button? await page .getByRole("textbox", { name: "Password", exact: true }) .fill(PASSWORD); @@ -124,6 +125,16 @@ async function registerUser( await expect( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); + + // Dismiss incompatible browser toast + const dismissButton = page.getByRole("button", { name: "Dismiss" }); + try { + await expect(dismissButton).toBeVisible({ timeout: 700 }); + await dismissButton.click(); + } catch { + // dismissButton not visible, continue as normal + } + await setDevToolElementCallDevUrl(page); const clientHandle = await page.evaluateHandle(() => From 328cc7133a2377043061cf0e162547661a8eca55 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 11:32:28 +0100 Subject: [PATCH 19/24] update playwright so that we do not even need the dismiss anymore. --- package.json | 2 +- yarn.lock | 30 +++++++++++++++--------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/package.json b/package.json index 21c870ad..f65865e4 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "@opentelemetry/sdk-trace-base": "^2.0.0", "@opentelemetry/sdk-trace-web": "^2.0.0", "@opentelemetry/semantic-conventions": "^1.25.1", - "@playwright/test": "^1.56.1", + "@playwright/test": "^1.57.0", "@radix-ui/react-dialog": "^1.0.4", "@radix-ui/react-slider": "^1.1.2", "@radix-ui/react-visually-hidden": "^1.0.3", diff --git a/yarn.lock b/yarn.lock index f0ca83a7..02a4a3ce 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3373,14 +3373,14 @@ __metadata: languageName: node linkType: hard -"@playwright/test@npm:^1.56.1": - version: 1.56.1 - resolution: "@playwright/test@npm:1.56.1" +"@playwright/test@npm:^1.57.0": + version: 1.57.0 + resolution: "@playwright/test@npm:1.57.0" dependencies: - playwright: "npm:1.56.1" + playwright: "npm:1.57.0" bin: playwright: cli.js - checksum: 10c0/2b5b0e1f2e6a18f6e5ce6897c7440ca78f64e0b004834e9808e93ad2b78b96366b562ae4366602669cf8ad793a43d85481b58541e74be71e905e732d833dd691 + checksum: 10c0/35ba4b28be72bf0a53e33dbb11c6cff848fb9a37f49e893ce63a90675b5291ec29a1ba82c8a3b043abaead129400f0589623e9ace2e6a1c8eaa409721ecc3774 languageName: node linkType: hard @@ -7492,7 +7492,7 @@ __metadata: "@opentelemetry/sdk-trace-base": "npm:^2.0.0" "@opentelemetry/sdk-trace-web": "npm:^2.0.0" "@opentelemetry/semantic-conventions": "npm:^1.25.1" - "@playwright/test": "npm:^1.56.1" + "@playwright/test": "npm:^1.57.0" "@radix-ui/react-dialog": "npm:^1.0.4" "@radix-ui/react-slider": "npm:^1.1.2" "@radix-ui/react-visually-hidden": "npm:^1.0.3" @@ -11177,27 +11177,27 @@ __metadata: languageName: node linkType: hard -"playwright-core@npm:1.56.1": - version: 1.56.1 - resolution: "playwright-core@npm:1.56.1" +"playwright-core@npm:1.57.0": + version: 1.57.0 + resolution: "playwright-core@npm:1.57.0" bin: playwright-core: cli.js - checksum: 10c0/ffd40142b99c68678b387445d5b42f1fee4ab0b65d983058c37f342e5629f9cdbdac0506ea80a0dfd41a8f9f13345bad54e9a8c35826ef66dc765f4eb3db8da7 + checksum: 10c0/798e35d83bf48419a8c73de20bb94d68be5dde68de23f95d80a0ebe401e3b83e29e3e84aea7894d67fa6c79d2d3d40cc5bcde3e166f657ce50987aaa2421b6a9 languageName: node linkType: hard -"playwright@npm:1.56.1": - version: 1.56.1 - resolution: "playwright@npm:1.56.1" +"playwright@npm:1.57.0": + version: 1.57.0 + resolution: "playwright@npm:1.57.0" dependencies: fsevents: "npm:2.3.2" - playwright-core: "npm:1.56.1" + playwright-core: "npm:1.57.0" dependenciesMeta: fsevents: optional: true bin: playwright: cli.js - checksum: 10c0/8e9965aede86df0f4722063385748498977b219630a40a10d1b82b8bd8d4d4e9b6b65ecbfa024331a30800163161aca292fb6dd7446c531a1ad25f4155625ab4 + checksum: 10c0/ab03c99a67b835bdea9059f516ad3b6e42c21025f9adaa161a4ef6bc7ca716dcba476d287140bb240d06126eb23f889a8933b8f5f1f1a56b80659d92d1358899 languageName: node linkType: hard From 08306d663a16ad6193d7bb395649c0a228ffe160 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 16:04:12 +0100 Subject: [PATCH 20/24] remove duplicated connecting state and update Test setup --- .../remoteMembers/Connection.test.ts | 40 +++++++++++-------- .../CallViewModel/remoteMembers/Connection.ts | 22 +++++----- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 95ff931e..8f9471d0 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -50,11 +50,6 @@ let fakeLivekitRoom: MockedObject; let localParticipantEventEmiter: EventEmitter; let fakeLocalParticipant: MockedObject; -let fakeRoomEventEmiter: EventEmitter; -// let fakeMembershipsFocusMap$: BehaviorSubject< -// { membership: CallMembership; transport: LivekitTransport }[] -// >; - const livekitFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", @@ -91,22 +86,25 @@ function setupTest(): void { localParticipantEventEmiter, ), } as unknown as LocalParticipant); - fakeRoomEventEmiter = new EventEmitter(); + const fakeRoomEventEmitter = new EventEmitter(); fakeLivekitRoom = vi.mocked({ connect: vi.fn(), disconnect: vi.fn(), remoteParticipants: new Map(), localParticipant: fakeLocalParticipant, state: LivekitConnectionState.Disconnected, - on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter), - off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter), - addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter), + on: fakeRoomEventEmitter.on.bind(fakeRoomEventEmitter), + off: fakeRoomEventEmitter.off.bind(fakeRoomEventEmitter), + addListener: fakeRoomEventEmitter.addListener.bind(fakeRoomEventEmitter), removeListener: - fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), + fakeRoomEventEmitter.removeListener.bind(fakeRoomEventEmitter), removeAllListeners: - fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), + fakeRoomEventEmitter.removeAllListeners.bind(fakeRoomEventEmitter), setE2EEEnabled: vi.fn().mockResolvedValue(undefined), + emit: (eventName: string | symbol, ...args: unknown[]) => { + fakeRoomEventEmitter.emit(eventName, ...args); + }, } as unknown as LivekitRoom); } @@ -129,7 +127,13 @@ function setupRemoteConnection(): Connection { }); fakeLivekitRoom.connect.mockImplementation(async (): Promise => { + const changeEv = RoomEvent.ConnectionStateChanged; + + fakeLivekitRoom.state = LivekitConnectionState.Connecting; + fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state); fakeLivekitRoom.state = LivekitConnectionState.Connected; + fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state); + return Promise.resolve(); }); @@ -350,8 +354,10 @@ describe("Start connection states", () => { expect(initialState).toEqual(ConnectionState.Initialized); const fetchingState = capturedStates.shift(); expect(fetchingState).toEqual(ConnectionState.FetchingConfig); + const disconnectedState = capturedStates.shift(); + expect(disconnectedState).toEqual(ConnectionState.LivekitDisconnected); const connectingState = capturedStates.shift(); - expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom); + expect(connectingState).toEqual(ConnectionState.LivekitConnecting); const connectedState = capturedStates.shift(); expect(connectedState).toEqual(ConnectionState.LivekitConnected); }); @@ -419,7 +425,7 @@ describe("Publishing participants observations", () => { ); participants.forEach((p) => - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p), ); // At this point there should be no publishers @@ -432,7 +438,7 @@ describe("Publishing participants observations", () => { fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2), ]; participants.forEach((p) => - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p), ); // At this point there should be no publishers @@ -462,7 +468,7 @@ describe("Publishing participants observations", () => { ); for (const participant of participants) { - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant); } // At this point there should be no publishers @@ -471,7 +477,7 @@ describe("Publishing participants observations", () => { participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)]; for (const participant of participants) { - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant); } // We should have bob has a publisher now @@ -488,7 +494,7 @@ describe("Publishing participants observations", () => { (p) => p.identity !== "@bob:example.org:DEV111", ); - fakeRoomEventEmiter.emit( + fakeLivekitRoom.emit( RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), ); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 6015bf01..8f8c0924 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -18,7 +18,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map } from "rxjs"; +import { BehaviorSubject, map, skip, skipWhile } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -61,7 +61,6 @@ export enum ConnectionState { /** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */ FetchingConfig = "FetchingConfig", Stopped = "Stopped", - ConnectingToLkRoom = "ConnectingToLkRoom", /** The same as ConnectionState.Disconnected from `livekit-client` */ LivekitDisconnected = "disconnected", /** The same as ConnectionState.Connecting from `livekit-client` */ @@ -139,7 +138,17 @@ export class Connection { // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._state$.next(ConnectionState.ConnectingToLkRoom); + // Setup observer once we are done with getSFUConfigWithOpenID + connectionStateObserver(this.livekitRoom) + .pipe( + this.scope.bind(), + map((s) => s as unknown as ConnectionState), + ) + .subscribe((lkState) => { + // It is save to cast lkState to ConnectionState as they are fully overlapping. + this._state$.next(lkState); + }); + try { await this.livekitRoom.connect(url, jwt); } catch (e) { @@ -167,13 +176,6 @@ export class Connection { } // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - - connectionStateObserver(this.livekitRoom) - .pipe(this.scope.bind()) - .subscribe((lkState) => { - // It is save to cast lkState to ConnectionState as they are fully overlapping. - this._state$.next(lkState as unknown as ConnectionState); - }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next( From 9a7e797af48da255682e6db017dafd0e9a4624f0 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 16:17:45 +0100 Subject: [PATCH 21/24] fix lint --- src/state/CallViewModel/remoteMembers/Connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 8f8c0924..8b4479e8 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -18,7 +18,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map, skip, skipWhile } from "rxjs"; +import { BehaviorSubject, map } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { From 207b161b3ba25a7eae243a741c0fb7c3dfaf8eb9 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:17:56 +0100 Subject: [PATCH 22/24] fix logger and dismiss button presses --- playwright/fixtures/widget-user.ts | 10 +++++++++- src/room/GroupCallView.tsx | 1 - src/room/InCallView.tsx | 4 +++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index b0c46788..51dffbc6 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -126,8 +126,16 @@ async function registerUser( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); + await page.pause(); + const browserUnsupportedToast = page + .getByText("Element does not support this browser") + .locator("..") + .locator(".."); + // Dismiss incompatible browser toast - const dismissButton = page.getByRole("button", { name: "Dismiss" }); + const dismissButton = browserUnsupportedToast.getByRole("button", { + name: "Dismiss", + }); try { await expect(dismissButton).toBeVisible({ timeout: 700 }); await dismissButton.click(); diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 1542678e..dfd11ff3 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -446,7 +446,6 @@ export const GroupCallView: FC = ({ let body: ReactNode; if (externalError) { - logger.debug("External error occurred:", externalError); // If an error was recorded within this component but outside // GroupCallErrorBoundary, create a component that rethrows the error from // within the error boundary, so it can be handled uniformly diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 18acf843..add8154a 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -24,7 +24,7 @@ import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import classNames from "classnames"; import { BehaviorSubject, map } from "rxjs"; import { useObservable } from "observable-hooks"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { VoiceCallSolidIcon, VolumeOnSolidIcon, @@ -109,6 +109,8 @@ import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.t import { type Layout } from "../state/layout-types.ts"; import { ObservableScope } from "../state/ObservableScope.ts"; +const logger = rootLogger.getChild("[InCallView]"); + const maxTapDurationMs = 400; export interface ActiveCallProps From 8225e4f2608a10e0662e5004df2ecddef1bea0b8 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:22:02 +0100 Subject: [PATCH 23/24] remove page.pause --- playwright/fixtures/widget-user.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index 51dffbc6..efff8a80 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -126,7 +126,6 @@ async function registerUser( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); - await page.pause(); const browserUnsupportedToast = page .getByText("Element does not support this browser") .locator("..") From 7edc97b9175dddd27ee3f90acb48de5de13fb3f4 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:24:35 +0100 Subject: [PATCH 24/24] remove continue button things --- playwright/fixtures/widget-user.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index efff8a80..6236928c 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -111,17 +111,7 @@ async function registerUser( await page.getByRole("textbox", { name: "Confirm password" }).click(); await page.getByRole("textbox", { name: "Confirm password" }).fill(PASSWORD); await page.getByRole("button", { name: "Register" }).click(); - const continueButton = page.getByRole("button", { name: "Continue" }); - try { - await expect(continueButton).toBeVisible({ timeout: 700 }); - // why do we need to put in the passwor if there is a continue button? - await page - .getByRole("textbox", { name: "Password", exact: true }) - .fill(PASSWORD); - await continueButton.click(); - } catch { - // continueButton not visible, continue as normal - } + await expect( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible();