From 2e646bfac163ea48b45de6fc8a22d52e8a0e01cd Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 2 Dec 2025 19:40:08 +0100 Subject: [PATCH 01/20] Unify LiveKit and Matrix connection states --- src/room/GroupCallView.tsx | 1 + src/state/CallViewModel/CallViewModel.ts | 47 ++-- .../localMember/HomeserverConnected.test.ts | 58 ++--- .../localMember/HomeserverConnected.ts | 38 +-- .../localMember/LocalMembership.test.ts | 26 +- .../localMember/LocalMembership.ts | 244 +++++++++--------- .../localMember/Publisher.test.ts | 3 +- .../CallViewModel/localMember/Publisher.ts | 2 +- .../remoteMembers/Connection.test.ts | 9 +- .../CallViewModel/remoteMembers/Connection.ts | 43 +-- 10 files changed, 238 insertions(+), 233 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 75438f7f..43602716 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -160,6 +160,7 @@ export const GroupCallView: FC = ({ }, [rtcSession]); // TODO move this into the callViewModel LocalMembership.ts + // We might actually not need this at all. Since we get into fatalError on those errors already? useTypedEventEmitter( rtcSession, MatrixRTCSessionEvent.MembershipManagerError, diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..9bfa979c 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -452,18 +452,14 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { - return enterRTCSession( - matrixRTCSession, - transport, - connectOptions$.value, - ); + joinMatrixRTC: (transport: LivekitTransport) => { + enterRTCSession(matrixRTCSession, transport, connectOptions$.value); }, createPublisherFactory: (connection: Connection) => { return new Publisher( @@ -573,17 +569,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -631,7 +616,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -644,7 +629,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -735,7 +720,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -827,11 +812,17 @@ export function createCallViewModel$( }), ); - const leave$: Observable<"user" | "timeout" | "decline" | "allOthersLeft"> = - merge( - autoLeave$, - merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe(scope.share); + const shouldLeave$: Observable< + "user" | "timeout" | "decline" | "allOthersLeft" + > = merge( + autoLeave$, + merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), + ).pipe(scope.share); + + shouldLeave$.pipe(scope.bind()).subscribe((reason) => { + logger.info(`Call left due to ${reason}`); + localMembership.requestDisconnect(); + }); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1453,7 +1444,7 @@ export function createCallViewModel$( autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, - leave$: leave$, + leave$: shouldLeave$, hangup: (): void => userHangup$.next(), join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, @@ -1500,7 +1491,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts index 1f61e533..87ca35d0 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts @@ -97,106 +97,106 @@ describe("createHomeserverConnected$", () => { // LLM generated test cases. They are a bit overkill but I improved the mocking so it is // easy enough to read them so I think they can stay. it("is false when sync state is not Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); - expect(hsConnected$.value).toBe(false); + const hsConnected = createHomeserverConnected$(scope, client, session); + expect(hsConnected.combined$.value).toBe(false); }); it("remains false while membership status is not Connected even if sync is Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // membership still disconnected + expect(hsConnected.combined$.value).toBe(false); // membership still disconnected }); it("is false when membership status transitions to Connected but ProbablyLeft is true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Make sync loop OK client.setSyncState(SyncState.Syncing); // Indicate probable leave before connection session.setProbablyLeft(true); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("becomes true only when all three conditions are satisfied", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // 1. Sync loop connected client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // not yet membership connected + expect(hsConnected.combined$.value).toBe(false); // not yet membership connected // 2. Membership connected session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); // probablyLeft is false + expect(hsConnected.combined$.value).toBe(true); // probablyLeft is false }); it("drops back to false when sync loop leaves Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Reach connected state client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Sync loop error => should flip false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops back to false when membership status becomes disconnected", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setMembershipStatus(Status.Disconnected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops to false when ProbablyLeft is emitted after being true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Simulate clearing the flag (in realistic scenario membership manager would update) session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); }); it("composite sequence reflects each individual failure reason", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Initially false (sync error + disconnected + not probably left) - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix sync only client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix membership session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Introduce probablyLeft -> false session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Restore notProbablyLeft -> true again session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Drop sync -> false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); }); diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.ts b/src/state/CallViewModel/localMember/HomeserverConnected.ts index e1c28078..c8bcd021 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.ts @@ -25,6 +25,11 @@ import { type NodeStyleEventEmitter } from "../../../utils/test"; */ const logger = rootLogger.getChild("[HomeserverConnected]"); +export interface HomeserverConnected { + combined$: Behavior; + rtsSession$: Behavior; +} + /** * Behavior representing whether we consider ourselves connected to the Matrix homeserver * for the purposes of a MatrixRTC session. @@ -39,7 +44,7 @@ export function createHomeserverConnected$( client: NodeStyleEventEmitter & Pick, matrixRTCSession: NodeStyleEventEmitter & Pick, -): Behavior { +): HomeserverConnected { const syncing$ = ( fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]> ).pipe( @@ -47,12 +52,15 @@ export function createHomeserverConnected$( map(([state]) => state === SyncState.Syncing), ); - const membershipConnected$ = fromEvent( - matrixRTCSession, - MembershipManagerEvent.StatusChanged, - ).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), + const rtsSession$ = scope.behavior( + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + map(() => matrixRTCSession.membershipStatus ?? Status.Unknown), + ), + Status.Unknown, + ); + + const membershipConnected$ = rtsSession$.pipe( + map((status) => status === Status.Connected), ); // This is basically notProbablyLeft$ @@ -71,15 +79,13 @@ export function createHomeserverConnected$( map(() => matrixRTCSession.probablyLeft !== true), ); - const connectedCombined$ = and$( - syncing$, - membershipConnected$, - certainlyConnected$, - ).pipe( - tap((connected) => { - logger.info(`Homeserver connected update: ${connected}`); - }), + const combined$ = scope.behavior( + and$(syncing$, membershipConnected$, certainlyConnected$).pipe( + tap((connected) => { + logger.info(`Homeserver connected update: ${connected}`); + }), + ), ); - return scope.behavior(connectedCombined$); + return { combined$, rtsSession$ }; } diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index cff5c06d..1ef7abd6 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { + Status, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -51,7 +52,7 @@ vi.mock("@livekit/components-core", () => ({ describe("LocalMembership", () => { describe("enterRTCSession", () => { - it("It joins the correct Session", async () => { + it("It joins the correct Session", () => { const focusFromOlderMembership = { type: "livekit", livekit_service_url: "http://my-oldest-member-service-url.com", @@ -107,7 +108,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -136,7 +137,7 @@ describe("LocalMembership", () => { ); }); - it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { + it("It should not fail with configuration error if homeserver config has livekit url but not fallback", () => { mockConfig({}); vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ "org.matrix.msc4143.rtc_foci": [ @@ -165,7 +166,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -190,7 +191,6 @@ describe("LocalMembership", () => { leaveRoomSession: () => {}, } as unknown as MatrixRTCSession, muteStates: mockMuteStates(), - isHomeserverConnected: constant(true), trackProcessorState$: constant({ supported: false, processor: undefined, @@ -198,7 +198,10 @@ describe("LocalMembership", () => { logger: logger, createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, - homeserverConnected$: constant(true), + homeserverConnected: { + combined$: constant(true), + rtsSession$: constant(Status.Connected), + }, }; it("throws error on missing RTC config error", () => { @@ -258,8 +261,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConnectionState.Connected), + state: LivekitConnectionState.Connected, }), transport: aTransport, } as unknown as Connection, @@ -268,7 +270,7 @@ describe("LocalMembership", () => { connectionManagerData.add( { state$: constant({ - state: "ConnectedToLkRoom", + state: LivekitConnectionState.Connected, }), transport: bTransport, } as unknown as Connection, @@ -443,7 +445,7 @@ describe("LocalMembership", () => { connectionManagerData$.next(new Epoch(connectionManagerData)); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Initialized, + state: LivekitConnectionState.Connected, }); expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -473,7 +475,7 @@ describe("LocalMembership", () => { publishResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); @@ -482,7 +484,7 @@ describe("LocalMembership", () => { await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 60ae79b8..33af5192 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,10 +11,11 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState, + ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { + Status as RTCSessionStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -27,7 +28,7 @@ import { map, type Observable, of, - scan, + pairwise, startWith, switchMap, tap, @@ -37,10 +38,9 @@ import { deepCompare } from "matrix-js-sdk/lib/utils"; import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { ObservableScope } from "../../ObservableScope"; +import { type ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; -import { and$ } from "../../../utils/observable"; import { ElementCallError, MembershipManagerError, @@ -51,7 +51,11 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + type ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; +import { type HomeserverConnected } from "./HomeserverConnected.ts"; export enum RTCBackendState { Error = "error", @@ -59,47 +63,32 @@ export enum RTCBackendState { WaitingForTransport = "waiting_for_transport", /** A connection appeared so we can initialise the publisher */ WaitingForConnection = "waiting_for_connection", - /** Connection and transport arrived, publisher Initialized */ - Initialized = "Initialized", + /** Implies lk connection is connected */ CreatingTracks = "creating_tracks", + /** Implies lk connection is connected */ ReadyToPublish = "ready_to_publish", + /** Implies lk connection is connected */ WaitingToPublish = "waiting_to_publish", - Connected = "connected", - Disconnected = "disconnected", - Disconnecting = "disconnecting", + /** Implies lk connection is connected */ + ConnectedAndPublishing = "fully_connected", } -type LocalMemberRtcBackendState = +type LocalMemberRTCBackendState = | { state: RTCBackendState.Error; error: ElementCallError } - | { state: RTCBackendState.WaitingForTransport } - | { state: RTCBackendState.WaitingForConnection } - | { state: RTCBackendState.Initialized } - | { state: RTCBackendState.CreatingTracks } - | { state: RTCBackendState.ReadyToPublish } - | { state: RTCBackendState.WaitingToPublish } - | { state: RTCBackendState.Connected } - | { state: RTCBackendState.Disconnected } - | { state: RTCBackendState.Disconnecting }; + | { state: Exclude } + | ConnectionState; -export enum MatrixState { +export enum MatrixAdditionalState { WaitingForTransport = "waiting_for_transport", - Ready = "ready", - Connecting = "connecting", - Connected = "connected", - Disconnected = "disconnected", - Error = "Error", } type LocalMemberMatrixState = - | { state: MatrixState.Connected } - | { state: MatrixState.WaitingForTransport } - | { state: MatrixState.Ready } - | { state: MatrixState.Connecting } - | { state: MatrixState.Disconnected } - | { state: MatrixState.Error; error: Error }; + | { state: MatrixAdditionalState.WaitingForTransport } + | { state: "Error"; error: Error } + | { state: RTCSessionStatus }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -122,8 +111,8 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (transport: LivekitTransport) => Promise; - homeserverConnected$: Behavior; + joinMatrixRTC: (transport: LivekitTransport) => void; + homeserverConnected: HomeserverConnected; localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, @@ -149,7 +138,7 @@ export const createLocalMembership$ = ({ scope, connectionManager, localTransport$: localTransportCanThrow$, - homeserverConnected$, + homeserverConnected, createPublisherFactory, joinMatrixRTC, logger: parentLogger, @@ -175,10 +164,14 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - homeserverConnected$: Behavior; - // this needs to be discussed - /** @deprecated use state instead*/ + /** Shorthand for connectionState.matrix.state === Status.Reconnecting + * Direct translation to the js-sdk membership manager connection `Status`. + */ reconnecting$: Behavior; + /** Shorthand for connectionState.matrix.state === Status.Disconnected + * Direct translation to the js-sdk membership manager connection `Status`. + */ + disconnected$: Behavior; } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); @@ -232,49 +225,31 @@ export const createLocalMembership$ = ({ // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - const connected$ = scope.behavior( - and$( - homeserverConnected$.pipe( - tap((v) => logger.debug("matrix: Connected state changed", v)), - ), - localConnectionState$.pipe( - switchMap((state) => { - logger.debug("livekit: Connected state changed", state); - if (!state) return of(false); - if (state.state === "ConnectedToLkRoom") { - logger.debug( - "livekit: Connected state changed (inner livekitConnectionState$)", - state.livekitConnectionState$.value, - ); - return state.livekitConnectionState$.pipe( - map((lkState) => lkState === ConnectionState.Connected), - ); - } - return of(false); - }), - ), - ).pipe(tap((v) => logger.debug("combined: Connected state changed", v))), - ); + // TODO remove this and just make it one single check of livekitConnectionState$ + // const connected$ = scope.behavior( + // localConnectionState$.pipe( + // switchMap((state) => { + // logger.debug("livekit: Connected state changed", state); + // if (!state) return of(false); + // if (state.state === "ConnectedToLkRoom") { + // logger.debug( + // "livekit: Connected state changed (inner livekitConnectionState$)", + // state.livekitConnectionState$.value, + // ); + // return state.livekitConnectionState$.pipe( + // map((lkState) => lkState === ConnectionState.Connected), + // ); + // } + // return of(false); + // }), + // ), + // ); // MATRIX RELATED - // /** - // * Whether we should tell the user that we're reconnecting to the call. - // */ - // DISCUSSION is there a better way to do this? - // sth that is more deriectly implied from the membership manager of the js sdk. (fromEvent(matrixRTCSession, Reconnecting)) ??? or similar const reconnecting$ = scope.behavior( - connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), ), ); @@ -374,8 +349,9 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + const livekitState$: Behavior = scope.behavior( combineLatest([ + localConnectionState$, publisher$, localTransport$, tracks$.pipe( @@ -389,10 +365,12 @@ export const createLocalMembership$ = ({ map(() => true), startWith(false), ), + // TODO use local connection state here to give the full pciture of the livekit state! fatalLivekitError$, ]).pipe( map( ([ + localConnectionState, publisher, localTransport, tracks, @@ -411,13 +389,21 @@ export const createLocalMembership$ = ({ const hasTracks = tracks.length > 0; if (!localTransport) return { state: RTCBackendState.WaitingForTransport }; - if (!publisher) + if (!localConnectionState) return { state: RTCBackendState.WaitingForConnection }; - if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if ( + localConnectionState.state !== LivekitConnectionState.Connected || + !publisher + ) + // pass through the localConnectionState while we do not yet have a publisher or the state + // of the connection is not yet connected + return { state: localConnectionState.state }; + if (!shouldStartTracks) + return { state: LivekitConnectionState.Connected }; if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.Connected }; + return { state: RTCBackendState.ConnectedAndPublishing }; }, ), distinctUntilChanged(deepCompare), @@ -431,58 +417,70 @@ export const createLocalMembership$ = ({ else fatalMatrixError$.next(e); }; const matrixState$: Behavior = scope.behavior( - combineLatest([ - localTransport$, - connectRequested$, - homeserverConnected$, - ]).pipe( - map(([localTransport, connectRequested, homeserverConnected]) => { - if (!localTransport) return { state: MatrixState.WaitingForTransport }; - if (!connectRequested) return { state: MatrixState.Ready }; - if (!homeserverConnected) return { state: MatrixState.Connecting }; - return { state: MatrixState.Connected }; + combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( + map(([localTransport, rtcSessionStatus]) => { + if (!localTransport) + return { state: MatrixAdditionalState.WaitingForTransport }; + return { state: rtcSessionStatus }; }), ), ); - // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + // inform the widget about the connect and disconnect intent from the user. + scope + .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + undefined, + connectRequested$.value, + ]) + .subscribe(([prev, current]) => { + if (!widget) return; + if (!prev && current) { + try { + void widget.api.transport.send(ElementWidgetActions.JoinCall, {}); + } catch (e) { + logger.error("Failed to send join action", e); + } + } + if (prev && !current) { + try { + void widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + } + }); + + combineLatest([muteStates.video.enabled$, homeserverConnected.combined$]) + .pipe(scope.bind()) + .subscribe(([videoEnabled, connected]) => { + if (!connected) return; + void matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"); + }); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( scope.behavior(combineLatest([localTransport$, connectRequested$])), async ([transport, shouldConnect]) => { + if (!transport) return; + // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. if (!shouldConnect) return; - if (!transport) return; try { - await joinMatrixRTC(transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) setMatrixError(new MembershipManagerError(error)); } - // Update our member event when our mute state changes. - const callIntentScope = new ObservableScope(); - // because this uses its own scope, we can start another reconciliation for the duration of one connection. - callIntentScope.reconcile( - muteStates.video.enabled$, - async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - callIntentScope.end(); + return Promise.resolve(async (): Promise => { try { - // Update matrixRTCSession to allow udpating the transport without leaving the session! - await matrixRTCSession.leaveRoomSession(); + // TODO Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(1000); } catch (e) { logger.error("Error leaving RTC session", e); } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; + }); }, ); @@ -497,7 +495,7 @@ export const createLocalMembership$ = ({ // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. // TODO refactor this based no livekitState$ - combineLatest([participant$, homeserverConnected$]) + combineLatest([participant$, homeserverConnected.combined$]) .pipe(scope.bind()) .subscribe(([participant, connected]) => { if (!participant) return; @@ -590,8 +588,15 @@ export const createLocalMembership$ = ({ }, tracks$, participant$, - homeserverConnected$, reconnecting$, + disconnected$: scope.behavior( + matrixState$.pipe( + map( + (sessionStatus) => + sessionStatus.state === RTCSessionStatus.Disconnected, + ), + ), + ), sharingScreen$, toggleScreenSharing, connection$: localConnection$, @@ -626,11 +631,11 @@ interface EnterRTCSessionOptions { * @throws If the widget could not send ElementWidgetActions.JoinCall action. */ // Exported for unit testing -export async function enterRTCSession( +export function enterRTCSession( rtcSession: MatrixRTCSession, transport: LivekitTransport, { encryptMedia, matrixRTCMode }: EnterRTCSessionOptions, -): Promise { +): void { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -669,7 +674,4 @@ export async function enterRTCSession( unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0, }, ); - if (widget) { - await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); - } } diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 9b3e5b2a..5468d1ff 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -53,8 +53,7 @@ describe("Publisher", () => { scope = new ObservableScope(); connection = { state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConenctionState.Connected), + state: LivekitConenctionState.Connected, }), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 326dedaf..6e4a9b35 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -160,7 +160,7 @@ export class Publisher { const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { switch (s.state) { - case "ConnectedToLkRoom": + case LivekitConnectionState.Connected: resolve(); break; case "FailedToStart": diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 2ead768b..efee1ccb 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -125,7 +125,10 @@ function setupRemoteConnection(): Connection { }; }); - fakeLivekitRoom.connect.mockResolvedValue(undefined); + fakeLivekitRoom.connect.mockImplementation(async (): Promise => { + fakeLivekitRoom.state = LivekitConnectionState.Connected; + return Promise.resolve(); + }); return new Connection(opts, logger); } @@ -309,7 +312,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState && capturedState?.state === "FailedToStart") { + if (capturedState && capturedState.state === "FailedToStart") { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); @@ -345,7 +348,7 @@ describe("Start connection states", () => { const connectingState = capturedStates.shift(); expect(connectingState?.state).toEqual("ConnectingToLkRoom"); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("ConnectedToLkRoom"); + expect(connectedState?.state).toEqual("connected"); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 4f3bbda4..962f56d9 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState as LivekitConenctionState, + type ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -47,17 +47,17 @@ export interface ConnectionOpts { /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory: () => LivekitRoom; } - +export enum ConnectionAdditionalState { + Initialized = "Initialized", + FetchingConfig = "FetchingConfig", + // FailedToStart = "FailedToStart", + Stopped = "Stopped", + ConnectingToLkRoom = "ConnectingToLkRoom", +} export type ConnectionState = - | { state: "Initialized" } - | { state: "FetchingConfig" } - | { state: "ConnectingToLkRoom" } - | { - state: "ConnectedToLkRoom"; - livekitConnectionState$: Behavior; - } - | { state: "FailedToStart"; error: Error } - | { state: "Stopped" }; + | { state: ConnectionAdditionalState } + | { state: LivekitConnectionState } + | { state: "FailedToStart"; error: Error }; /** * A connection to a Matrix RTC LiveKit backend. @@ -67,7 +67,7 @@ export type ConnectionState = export class Connection { // Private Behavior private readonly _state$ = new BehaviorSubject({ - state: "Initialized", + state: ConnectionAdditionalState.Initialized, }); /** @@ -118,14 +118,14 @@ export class Connection { this.stopped = false; try { this._state$.next({ - state: "FetchingConfig", + state: ConnectionAdditionalState.FetchingConfig, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; this._state$.next({ - state: "ConnectingToLkRoom", + state: ConnectionAdditionalState.ConnectingToLkRoom, }); try { await this.livekitRoom.connect(url, jwt); @@ -154,12 +154,13 @@ export class Connection { // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._state$.next({ - state: "ConnectedToLkRoom", - livekitConnectionState$: this.scope.behavior( - connectionStateObserver(this.livekitRoom), - ), - }); + connectionStateObserver(this.livekitRoom) + .pipe(this.scope.bind()) + .subscribe((lkState) => { + this._state$.next({ + state: lkState, + }); + }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ @@ -191,7 +192,7 @@ export class Connection { if (this.stopped) return; await this.livekitRoom.disconnect(); this._state$.next({ - state: "Stopped", + state: ConnectionAdditionalState.Stopped, }); this.stopped = true; } From 88721be9521843ab106fabc4e1c447bcabb2247a Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 3 Dec 2025 10:04:22 +0100 Subject: [PATCH 02/20] cleanup --- src/room/GroupCallView.tsx | 1 + src/room/InCallView.tsx | 1 + .../localMember/LocalMembership.ts | 25 ------------------- 3 files changed, 2 insertions(+), 25 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 43602716..dfd11ff3 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -314,6 +314,7 @@ export const GroupCallView: FC = ({ const navigate = useNavigate(); + // TODO split this into leave and onDisconnect const onLeft = useCallback( ( reason: "timeout" | "user" | "allOthersLeft" | "decline" | "error", diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6ae004d8..7ae3700c 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -151,6 +151,7 @@ export const ActiveCall: FC = (props) => { setVm(vm); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); + return (): void => { scope.end(); }; diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 33af5192..6a31ce4b 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -221,30 +221,6 @@ export const createLocalMembership$ = ({ switchMap((connection) => (connection ? connection.state$ : of(null))), ); - // /** - // * Whether we are "fully" connected to the call. Accounts for both the - // * connection to the MatrixRTC session and the LiveKit publish connection. - // */ - // TODO remove this and just make it one single check of livekitConnectionState$ - // const connected$ = scope.behavior( - // localConnectionState$.pipe( - // switchMap((state) => { - // logger.debug("livekit: Connected state changed", state); - // if (!state) return of(false); - // if (state.state === "ConnectedToLkRoom") { - // logger.debug( - // "livekit: Connected state changed (inner livekitConnectionState$)", - // state.livekitConnectionState$.value, - // ); - // return state.livekitConnectionState$.pipe( - // map((lkState) => lkState === ConnectionState.Connected), - // ); - // } - // return of(false); - // }), - // ), - // ); - // MATRIX RELATED const reconnecting$ = scope.behavior( @@ -365,7 +341,6 @@ export const createLocalMembership$ = ({ map(() => true), startWith(false), ), - // TODO use local connection state here to give the full pciture of the livekit state! fatalLivekitError$, ]).pipe( map( From 7c40b0e177fbbfbf14c7c28b71a22b58f6df94e4 Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 5 Dec 2025 19:48:02 +0100 Subject: [PATCH 03/20] ideas --- src/state/CallViewModel/CallViewModel.ts | 47 +++++++++++-------- .../CallViewModel/remoteMembers/Connection.ts | 43 +++++++++-------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 9bfa979c..3c15958a 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -452,14 +452,18 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected: createHomeserverConnected$( + homeserverConnected$: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: (transport: LivekitTransport) => { - enterRTCSession(matrixRTCSession, transport, connectOptions$.value); + joinMatrixRTC: async (transport: LivekitTransport) => { + return enterRTCSession( + matrixRTCSession, + transport, + connectOptions$.value, + ); }, createPublisherFactory: (connection: Connection) => { return new Publisher( @@ -569,6 +573,17 @@ export function createCallViewModel$( ), ); + /** + * Whether various media/event sources should pretend to be disconnected from + * all network input, even if their connection still technically works. + */ + // We do this when the app is in the 'reconnecting' state, because it might be + // that the LiveKit connection is still functional while the homeserver is + // down, for example, and we want to avoid making people worry that the app is + // in a split-brained state. + // DISCUSSION own membership manager ALSO this probably can be simplifis + const reconnecting$ = localMembership.reconnecting$; + const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -616,7 +631,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), ); const reactions$ = scope.behavior( @@ -629,7 +644,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(localMembership.reconnecting$), + pauseWhen(reconnecting$), ), ); @@ -720,7 +735,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - localMembership.reconnecting$, + reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -812,17 +827,11 @@ export function createCallViewModel$( }), ); - const shouldLeave$: Observable< - "user" | "timeout" | "decline" | "allOthersLeft" - > = merge( - autoLeave$, - merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe(scope.share); - - shouldLeave$.pipe(scope.bind()).subscribe((reason) => { - logger.info(`Call left due to ${reason}`); - localMembership.requestDisconnect(); - }); + const leave$: Observable<"user" | "timeout" | "decline" | "allOthersLeft"> = + merge( + autoLeave$, + merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), + ).pipe(scope.share); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1444,7 +1453,7 @@ export function createCallViewModel$( autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, - leave$: shouldLeave$, + leave$: leave$, hangup: (): void => userHangup$.next(), join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, @@ -1491,7 +1500,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: localMembership.reconnecting$, + reconnecting$: reconnecting$, }; } diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 962f56d9..549777f9 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState as LivekitConnectionState, + ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -47,17 +47,24 @@ export interface ConnectionOpts { /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory: () => LivekitRoom; } -export enum ConnectionAdditionalState { +export class FailedToStartError extends Error { + public constructor(message: string) { + super(message); + this.name = "FailedToStartError"; + } +} + +export enum ConnectionState { Initialized = "Initialized", FetchingConfig = "FetchingConfig", - // FailedToStart = "FailedToStart", Stopped = "Stopped", ConnectingToLkRoom = "ConnectingToLkRoom", + LivekitDisconnected = "disconnected", + LivekitConnecting = "connecting", + LivekitConnected = "connected", + LivekitReconnecting = "reconnecting", + LivekitSignalReconnecting = "signalReconnecting", } -export type ConnectionState = - | { state: ConnectionAdditionalState } - | { state: LivekitConnectionState } - | { state: "FailedToStart"; error: Error }; /** * A connection to a Matrix RTC LiveKit backend. @@ -66,14 +73,15 @@ export type ConnectionState = */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject({ - state: ConnectionAdditionalState.Initialized, - }); + private readonly _state$ = new BehaviorSubject< + ConnectionState | FailedToStartError + >(ConnectionState.Initialized); /** * The current state of the connection to the media transport. */ - public readonly state$: Behavior = this._state$; + public readonly state$: Behavior = + this._state$; /** * The media transport to connect to. @@ -117,16 +125,12 @@ export class Connection { this.logger.debug("Starting Connection"); this.stopped = false; try { - this._state$.next({ - state: ConnectionAdditionalState.FetchingConfig, - }); + this._state$.next(ConnectionState.FetchingConfig); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._state$.next({ - state: ConnectionAdditionalState.ConnectingToLkRoom, - }); + this._state$.next(ConnectionState.ConnectingToLkRoom); try { await this.livekitRoom.connect(url, jwt); } catch (e) { @@ -157,9 +161,8 @@ export class Connection { connectionStateObserver(this.livekitRoom) .pipe(this.scope.bind()) .subscribe((lkState) => { - this._state$.next({ - state: lkState, - }); + // It si save to cast lkState to ConnectionState as they are fully overlapping. + this._state$.next(lkState as unknown as ConnectionState); }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); From bf801364a68927e0cd67ccfe8d53876507333ff0 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 9 Dec 2025 15:23:30 +0100 Subject: [PATCH 04/20] cleanup and tests --- src/state/CallViewModel/CallViewModel.ts | 16 +- ...Membership.test.ts => LocalMember.test.ts} | 180 ++++++------ .../{LocalMembership.ts => LocalMember.ts} | 256 +++++++++--------- .../localMember/Publisher.test.ts | 13 +- .../CallViewModel/localMember/Publisher.ts | 30 +- .../remoteMembers/Connection.test.ts | 48 ++-- .../CallViewModel/remoteMembers/Connection.ts | 28 +- yarn.lock | 6 +- 8 files changed, 302 insertions(+), 275 deletions(-) rename src/state/CallViewModel/localMember/{LocalMembership.test.ts => LocalMember.test.ts} (74%) rename src/state/CallViewModel/localMember/{LocalMembership.ts => LocalMember.ts} (77%) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..e06990b2 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -94,14 +94,13 @@ import { type SpotlightLandscapeLayoutMedia, type SpotlightPortraitLayoutMedia, } from "../layout-types.ts"; -import { type ElementCallError } from "../../utils/errors.ts"; +import { ElementCallError } from "../../utils/errors.ts"; import { type ObservableScope } from "../ObservableScope.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./localMember/LocalMembership.ts"; +} from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createMemberships$, @@ -452,13 +451,13 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { + joinMatrixRTC: (transport: LivekitTransport) => { return enterRTCSession( matrixRTCSession, transport, @@ -1455,7 +1454,7 @@ export function createCallViewModel$( ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: localMembership.requestConnect, + join: localMembership.requestJoinAndPublish, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, @@ -1465,9 +1464,8 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === RTCBackendState.Error), - map((s) => s.error), + localMembership.localMemberState$.pipe( + filter((v) => v instanceof ElementCallError), ), null, ), diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts similarity index 74% rename from src/state/CallViewModel/localMember/LocalMembership.test.ts rename to src/state/CallViewModel/localMember/LocalMember.test.ts index 1ef7abd6..2f8d11a5 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { - Status, + Status as RTCMemberStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -15,11 +15,7 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { - ConnectionState as LivekitConnectionState, - type LocalParticipant, - type LocalTrack, -} from "livekit-client"; +import { type LocalParticipant, type LocalTrack } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -30,16 +26,19 @@ import { withTestScheduler, } from "../../../utils/test"; import { + TransportState, createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./LocalMembership"; + PublishState, + TrackState, +} from "./LocalMember"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; -import { type Connection } from "../remoteMembers/Connection"; +import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; +import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); @@ -200,21 +199,18 @@ describe("LocalMembership", () => { joinMatrixRTC: async (): Promise => {}, homeserverConnected: { combined$: constant(true), - rtsSession$: constant(Status.Connected), + rtsSession$: constant(RTCMemberStatus.Connected), }, }; it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const goodTransport = { - livekit_service_url: "other", - } as LivekitTransport; - - const localTransport$ = scope.behavior( + const localTransport$ = scope.behavior( hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - goodTransport, + null, ); + // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { transports$: scope.behavior( localTransport$.pipe(map((t) => new Epoch([t]))), @@ -230,15 +226,11 @@ describe("LocalMembership", () => { connectionManager: mockConnectionManager, localTransport$, }); + localMembership.requestJoinAndPublish(); - expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: RTCBackendState.WaitingForConnection }, - e: { - state: RTCBackendState.Error, - error: expect.toSatisfy( - (e) => e instanceof MatrixRTCTransportMissingError, - ), - }, + expectObservable(localMembership.localMemberState$).toBe("ne", { + n: TransportState.Waiting, + e: expect.toSatisfy((e) => e instanceof MatrixRTCTransportMissingError), }); }); }); @@ -250,32 +242,24 @@ describe("LocalMembership", () => { livekit_service_url: "b", } as LivekitTransport; - const connectionManagerData = new ConnectionManagerData(); - - connectionManagerData.add( - { - livekitRoom: mockLivekitRoom({ - localParticipant: { - isScreenShareEnabled: false, - trackPublications: [], - } as unknown as LocalParticipant, - }), - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: aTransport, - } as unknown as Connection, - [], - ); - connectionManagerData.add( - { - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: bTransport, - } as unknown as Connection, - [], - ); + const connectionTransportAConnected = { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant(ConnectionState.LivekitConnected), + transport: aTransport, + } as unknown as Connection; + const connectionTransportAConnecting = { + ...connectionTransportAConnected, + state$: constant(ConnectionState.LivekitConnecting), + } as unknown as Connection; + const connectionTransportBConnected = { + state$: constant(ConnectionState.LivekitConnected), + transport: bTransport, + } as unknown as Connection; it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { const scope = new ObservableScope(); @@ -300,6 +284,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + connectionManagerData.add(connectionTransportBConnected, []); createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -359,6 +346,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + // connectionManagerData.add(connectionTransportB, []); const localMembership = createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -385,10 +375,11 @@ describe("LocalMembership", () => { it("tracks livekit state correctly", async () => { const scope = new ObservableScope(); + const connectionManagerData = new ConnectionManagerData(); const localTransport$ = new BehaviorSubject(null); - const connectionManagerData$ = new BehaviorSubject< - Epoch - >(new Epoch(new ConnectionManagerData())); + const connectionManagerData$ = new BehaviorSubject( + new Epoch(connectionManagerData), + ); const publishers: Publisher[] = []; const tracks$ = new BehaviorSubject([]); @@ -434,19 +425,45 @@ describe("LocalMembership", () => { }); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForTransport, - }); + expect(localMembership.localMemberState$.value).toStrictEqual( + TransportState.Waiting, + ); localTransport$.next(aTransport); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForConnection, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { connection: null, tracks: TrackState.WaitingForUser }, }); - connectionManagerData$.next(new Epoch(connectionManagerData)); + + const connectionManagerData2 = new ConnectionManagerData(); + connectionManagerData2.add( + // clone because we will mutate this later. + { ...connectionTransportAConnecting } as unknown as Connection, + [], + ); + + connectionManagerData$.next(new Epoch(connectionManagerData2)); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitConnectionState.Connected, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnecting, + tracks: TrackState.WaitingForUser, + }, }); + + ( + connectionManagerData2.getConnectionForTransport(aTransport)! + .state$ as BehaviorSubject + ).next(ConnectionState.LivekitConnected); + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnected, + tracks: TrackState.WaitingForUser, + }, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -455,37 +472,46 @@ describe("LocalMembership", () => { // ------- await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.CreatingTracks, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + tracks: TrackState.Creating, + connection: ConnectionState.LivekitConnected, + }, }); createTrackResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ReadyToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.WaitingForUser); // ------- - localMembership.requestConnect(); + localMembership.requestJoinAndPublish(); // ------- - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Starting); publishResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); - expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + expect(localMembership.localMemberState$.isStopped).toBe(false); scope.end(); await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMember.ts similarity index 77% rename from src/state/CallViewModel/localMember/LocalMembership.ts rename to src/state/CallViewModel/localMember/LocalMember.ts index 6a31ce4b..e2fcc70e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -11,7 +11,6 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -36,62 +35,66 @@ import { import { type Logger } from "matrix-js-sdk/lib/logger"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { constant, type Behavior } from "../../Behavior"; -import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { type ObservableScope } from "../../ObservableScope"; -import { type Publisher } from "./Publisher"; -import { type MuteStates } from "../../MuteStates"; +import { constant, type Behavior } from "../../Behavior.ts"; +import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts"; +import { type ObservableScope } from "../../ObservableScope.ts"; +import { type Publisher } from "./Publisher.ts"; +import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, MembershipManagerError, UnknownCallError, -} from "../../../utils/errors"; -import { ElementWidgetActions, widget } from "../../../widget"; +} from "../../../utils/errors.ts"; +import { ElementWidgetActions, widget } from "../../../widget.ts"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; import { - type ConnectionState, + ConnectionState, type Connection, + type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; -export enum RTCBackendState { - Error = "error", +export enum TransportState { /** Not even a transport is available to the LocalMembership */ - WaitingForTransport = "waiting_for_transport", - /** A connection appeared so we can initialise the publisher */ - WaitingForConnection = "waiting_for_connection", - /** Implies lk connection is connected */ - CreatingTracks = "creating_tracks", - /** Implies lk connection is connected */ - ReadyToPublish = "ready_to_publish", - /** Implies lk connection is connected */ - WaitingToPublish = "waiting_to_publish", - /** Implies lk connection is connected */ - ConnectedAndPublishing = "fully_connected", + Waiting = "transport_waiting", } -type LocalMemberRTCBackendState = - | { state: RTCBackendState.Error; error: ElementCallError } - | { state: Exclude } - | ConnectionState; - -export enum MatrixAdditionalState { - WaitingForTransport = "waiting_for_transport", +export enum PublishState { + WaitingForUser = "publish_waiting_for_user", + /** Implies lk connection is connected */ + Starting = "publish_start_publishing", + /** Implies lk connection is connected */ + Publishing = "publish_publishing", } -type LocalMemberMatrixState = - | { state: MatrixAdditionalState.WaitingForTransport } - | { state: "Error"; error: Error } - | { state: RTCSessionStatus }; - -export interface LocalMemberConnectionState { - livekit$: Behavior; - matrix$: Behavior; +export enum TrackState { + /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ + WaitingForUser = "tracks_waiting_for_user", + /** Implies lk connection is connected */ + Creating = "tracks_creating", + /** Implies lk connection is connected */ + Ready = "tracks_ready", } +export type LocalMemberMediaState = + | { + tracks: TrackState; + connection: ConnectionState | FailedToStartError; + } + | PublishState + | ElementCallError; +export type LocalMemberMatrixState = Error | RTCSessionStatus; +export type LocalMemberState = + | ElementCallError + | TransportState.Waiting + | { + media: LocalMemberMediaState; + matrix: LocalMemberMatrixState; + }; + /* * - get well known * - get oldest membership @@ -146,16 +149,16 @@ export const createLocalMembership$ = ({ matrixRTCSession, }: Props): { /** - * This starts audio and video tracks. They will be reused when calling `requestConnect`. + * This starts audio and video tracks. They will be reused when calling `requestPublish`. */ startTracks: () => Behavior; /** - * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. */ - requestConnect: () => void; + requestJoinAndPublish: () => void; requestDisconnect: () => void; - connectionState: LocalMemberConnectionState; + localMemberState$: Behavior; sharingScreen$: Behavior; /** * Callback to toggle screen sharing. If null, screen sharing is not possible. @@ -164,11 +167,11 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Reconnecting + /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting * Direct translation to the js-sdk membership manager connection `Status`. */ reconnecting$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Disconnected + /** Shorthand for homeserverConnected.rtcSession === Status.Disconnected * Direct translation to the js-sdk membership manager connection `Status`. */ disconnected$: Behavior; @@ -190,7 +193,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - setLivekitError(error); + setTransportError(error); return of(null); }), ), @@ -223,19 +226,13 @@ export const createLocalMembership$ = ({ // MATRIX RELATED - const reconnecting$ = scope.behavior( - homeserverConnected.rtsSession$.pipe( - map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), - ), - ); - // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. const trackStartRequested = Promise.withResolvers(); // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); + const joinAndPublishRequested$ = new BehaviorSubject(false); /** * The publisher is stored in here an abstracts creating and publishing tracks. @@ -256,13 +253,13 @@ export const createLocalMembership$ = ({ return tracks$; }; - const requestConnect = (): void => { + const requestJoinAndPublish = (): void => { trackStartRequested.resolve(); - connectRequested$.next(true); + joinAndPublishRequested$.next(true); }; const requestDisconnect = (): void => { - connectRequested$.next(false); + joinAndPublishRequested$.next(false); }; // Take care of the publisher$ @@ -300,112 +297,129 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), - async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldConnect) { + scope.behavior( + combineLatest([publisher$, tracks$, joinAndPublishRequested$]), + ), + async ([publisher, tracks, shouldJoinAndPublish]) => { + if (shouldJoinAndPublish === publisher?.publishing$.value) return; + if (tracks.length !== 0 && shouldJoinAndPublish) { try { await publisher?.startPublishing(); } catch (error) { - setLivekitError(error as ElementCallError); + setMediaError(error as ElementCallError); } - } else if (tracks.length !== 0 && !shouldConnect) { + } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setLivekitError(new UnknownCallError(error as Error)); + setMediaError(new UnknownCallError(error as Error)); } } }, ); - const fatalLivekitError$ = new BehaviorSubject(null); - const setLivekitError = (e: ElementCallError): void => { - if (fatalLivekitError$.value !== null) - logger.error("Multiple Livkit Errors:", e); - else fatalLivekitError$.next(e); + const fatalMediaError$ = new BehaviorSubject(null); + const setMediaError = (e: ElementCallError): void => { + if (fatalMediaError$.value !== null) + logger.error("Multiple Media Errors:", e); + else fatalMediaError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + + const fatalTransportError$ = new BehaviorSubject( + null, + ); + const setTransportError = (e: ElementCallError): void => { + if (fatalTransportError$.value !== null) + logger.error("Multiple Transport Errors:", e); + else fatalTransportError$.next(e); + }; + + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, - publisher$, localTransport$, - tracks$.pipe( - tap((t) => { - logger.info("tracks$: ", t); - }), - ), + tracks$, publishing$, - connectRequested$, + joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), startWith(false), ), - fatalLivekitError$, ]).pipe( map( ([ localConnectionState, - publisher, localTransport, tracks, publishing, - shouldConnect, + shouldPublish, shouldStartTracks, - error, ]) => { - // read this: - // if(!) return {state: ...} - // if(!) return {state: } - // - // as: - // We do have but not yet so we are in - if (error !== null) return { state: RTCBackendState.Error, error }; + if (!localTransport) return null; const hasTracks = tracks.length > 0; - if (!localTransport) - return { state: RTCBackendState.WaitingForTransport }; - if (!localConnectionState) - return { state: RTCBackendState.WaitingForConnection }; + let trackState: TrackState = TrackState.WaitingForUser; + if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; + if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + if ( - localConnectionState.state !== LivekitConnectionState.Connected || - !publisher + localConnectionState !== ConnectionState.LivekitConnected || + trackState !== TrackState.Ready ) - // pass through the localConnectionState while we do not yet have a publisher or the state - // of the connection is not yet connected - return { state: localConnectionState.state }; - if (!shouldStartTracks) - return { state: LivekitConnectionState.Connected }; - if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; - if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; - if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.ConnectedAndPublishing }; + return { + connection: localConnectionState, + tracks: trackState, + }; + if (!shouldPublish) return PublishState.WaitingForUser; + if (!publishing) return PublishState.Starting; + return PublishState.Publishing; }, ), distinctUntilChanged(deepCompare), ), ); - const fatalMatrixError$ = new BehaviorSubject(null); const setMatrixError = (e: ElementCallError): void => { if (fatalMatrixError$.value !== null) logger.error("Multiple Matrix Errors:", e); else fatalMatrixError$.next(e); }; - const matrixState$: Behavior = scope.behavior( - combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( - map(([localTransport, rtcSessionStatus]) => { - if (!localTransport) - return { state: MatrixAdditionalState.WaitingForTransport }; - return { state: rtcSessionStatus }; - }), + + const localMemberState$ = scope.behavior( + combineLatest([ + mediaState$, + homeserverConnected.rtsSession$, + fatalMatrixError$, + fatalTransportError$, + fatalMediaError$, + ]).pipe( + map( + ([ + mediaState, + rtcSessionStatus, + matrixError, + transportError, + mediaError, + ]) => { + if (transportError !== null) return transportError; + // `mediaState` will be 'null' until the transport appears. + if (mediaState && rtcSessionStatus) + return { + matrix: matrixError ?? rtcSessionStatus, + media: mediaError ?? mediaState, + }; + else { + return TransportState.Waiting; + } + }, + ), ), ); // inform the widget about the connect and disconnect intent from the user. scope - .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ undefined, - connectRequested$.value, + joinAndPublishRequested$.value, ]) .subscribe(([prev, current]) => { if (!widget) return; @@ -434,7 +448,7 @@ export const createLocalMembership$ = ({ // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( - scope.behavior(combineLatest([localTransport$, connectRequested$])), + scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])), async ([transport, shouldConnect]) => { if (!transport) return; // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. @@ -555,21 +569,19 @@ export const createLocalMembership$ = ({ return { startTracks, - requestConnect, + requestJoinAndPublish, requestDisconnect, - connectionState: { - livekit$: livekitState$, - matrix$: matrixState$, - }, + localMemberState$, tracks$, participant$, - reconnecting$, + reconnecting$: scope.behavior( + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), + ), + ), disconnected$: scope.behavior( - matrixState$.pipe( - map( - (sessionStatus) => - sessionStatus.state === RTCSessionStatus.Disconnected, - ), + homeserverConnected.rtsSession$.pipe( + map((state) => state === RTCSessionStatus.Disconnected), ), ), sharingScreen$, diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 5468d1ff..6d27c042 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -52,9 +52,7 @@ describe("Publisher", () => { } as unknown as MuteStates; scope = new ObservableScope(); connection = { - state$: constant({ - state: LivekitConenctionState.Connected, - }), + state$: constant(LivekitConenctionState.Connected), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), }), @@ -110,15 +108,14 @@ describe("Publisher", () => { // failiour due to connection.state$ const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next({ - state: "FailedToStart", - error: Error("testStartError"), - }); + (connection.state$ as BehaviorSubject).next(Error("testStartError")); await expect(publisher.startPublishing()).rejects.toThrow( new FailToStartLivekitConnection("testStartError"), ); - (connection.state$ as BehaviorSubject).next(beforeState); + (connection.state$ as BehaviorSubject).next( + beforeState, + ); // does not try other conenction after the first one failed expect( diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 6e4a9b35..b32e7e99 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,7 +32,10 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { ElementCallError, @@ -158,20 +161,17 @@ export class Publisher { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((s) => { - switch (s.state) { - case LivekitConnectionState.Connected: - resolve(); - break; - case "FailedToStart": - reject( - s.error instanceof ElementCallError - ? s.error - : new FailToStartLivekitConnection(s.error.message), - ); - break; - default: - this.logger.info("waiting for connection: ", s.state); + const sub = this.connection.state$.subscribe((state) => { + if (state instanceof Error) { + const error = + state instanceof ElementCallError + ? state + : new FailToStartLivekitConnection(state.message); + reject(error); + } else if (state === ConnectionState.LivekitConnected) { + resolve(); + } else { + this.logger.info("waiting for connection: ", state); } }); try { diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index efee1ccb..a90f0aa2 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -30,8 +30,8 @@ import { logger } from "matrix-js-sdk/lib/logger"; import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { Connection, + ConnectionState, type ConnectionOpts, - type ConnectionState, type PublishingParticipant, } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; @@ -151,7 +151,7 @@ describe("Start connection states", () => { }; const connection = new Connection(opts, logger); - expect(connection.state$.getValue().state).toEqual("Initialized"); + expect(connection.state$.getValue()).toEqual("Initialized"); }); it("fail to getOpenId token then error state", async () => { @@ -167,7 +167,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -187,22 +187,20 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState!.state).toEqual("FetchingConfig"); + expect(capturedState!).toEqual("FetchingConfig"); deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token"))); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState!.state === "FailedToStart") { - expect(capturedState!.error.message).toEqual("Something went wrong"); + if (capturedState instanceof Error) { + expect(capturedState.message).toEqual("Something went wrong"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -219,7 +217,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -241,24 +239,22 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState?.state === "FailedToStart") { - expect(capturedState?.error.message).toContain( + if (capturedState instanceof Error) { + expect(capturedState.message).toContain( "SFU Config fetch failed with exception Error", ); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -275,7 +271,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -305,17 +301,15 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState && capturedState.state === "FailedToStart") { - expect(capturedState.error.message).toContain( - "Failed to connect to livekit", - ); + if (capturedState instanceof Error) { + expect(capturedState.message).toContain("Failed to connect to livekit"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); @@ -332,7 +326,7 @@ describe("Start connection states", () => { const connection = setupRemoteConnection(); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -342,13 +336,13 @@ describe("Start connection states", () => { await vi.runAllTimersAsync(); const initialState = capturedStates.shift(); - expect(initialState?.state).toEqual("Initialized"); + expect(initialState).toEqual(ConnectionState.Initialized); const fetchingState = capturedStates.shift(); - expect(fetchingState?.state).toEqual("FetchingConfig"); + expect(fetchingState).toEqual(ConnectionState.FetchingConfig); const connectingState = capturedStates.shift(); - expect(connectingState?.state).toEqual("ConnectingToLkRoom"); + expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("connected"); + expect(connectedState).toEqual(ConnectionState.LivekitConnected); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 549777f9..29ad7a8c 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,6 @@ import { } from "@livekit/components-core"; import { ConnectionError, - ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -55,14 +54,21 @@ export class FailedToStartError extends Error { } export enum ConnectionState { + /** The start state of a connection. It has been created but nothing has loaded yet. */ Initialized = "Initialized", + /** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */ FetchingConfig = "FetchingConfig", Stopped = "Stopped", ConnectingToLkRoom = "ConnectingToLkRoom", + /** The same as ConnectionState.Disconnected from `livekit-client` */ LivekitDisconnected = "disconnected", + /** The same as ConnectionState.Connecting from `livekit-client` */ LivekitConnecting = "connecting", + /** The same as ConnectionState.Connected from `livekit-client` */ LivekitConnected = "connected", + /** The same as ConnectionState.Reconnecting from `livekit-client` */ LivekitReconnecting = "reconnecting", + /** The same as ConnectionState.SignalReconnecting from `livekit-client` */ LivekitSignalReconnecting = "signalReconnecting", } @@ -73,15 +79,14 @@ export enum ConnectionState { */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject< - ConnectionState | FailedToStartError - >(ConnectionState.Initialized); + private readonly _state$ = new BehaviorSubject( + ConnectionState.Initialized, + ); /** * The current state of the connection to the media transport. */ - public readonly state$: Behavior = - this._state$; + public readonly state$: Behavior = this._state$; /** * The media transport to connect to. @@ -161,15 +166,12 @@ export class Connection { connectionStateObserver(this.livekitRoom) .pipe(this.scope.bind()) .subscribe((lkState) => { - // It si save to cast lkState to ConnectionState as they are fully overlapping. + // It is save to cast lkState to ConnectionState as they are fully overlapping. this._state$.next(lkState as unknown as ConnectionState); }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); - this._state$.next({ - state: "FailedToStart", - error: error instanceof Error ? error : new Error(`${error}`), - }); + this._state$.next(error instanceof Error ? error : new Error(`${error}`)); throw error; } } @@ -194,9 +196,7 @@ export class Connection { ); if (this.stopped) return; await this.livekitRoom.disconnect(); - this._state$.next({ - state: ConnectionAdditionalState.Stopped, - }); + this._state$.next(ConnectionState.Stopped); this.stopped = true; } diff --git a/yarn.lock b/yarn.lock index 94b73130..f0ca83a7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10353,8 +10353,8 @@ __metadata: linkType: hard "matrix-js-sdk@npm:^39.2.0": - version: 39.2.0 - resolution: "matrix-js-sdk@npm:39.2.0" + version: 39.3.0 + resolution: "matrix-js-sdk@npm:39.3.0" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" @@ -10370,7 +10370,7 @@ __metadata: sdp-transform: "npm:^3.0.0" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 + checksum: 10c0/031c9ec042e00c32dc531f82fc59c64cc25fb665abfc642b1f0765c530d60684f8bd63daf0cdd0dbe96b4f87ea3f4148f9d3f024a59d57eceaec1ce5d0164755 languageName: node linkType: hard From 7af89b421693b58d06baf2f044bdd3e39d97a623 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 9 Dec 2025 17:36:56 +0100 Subject: [PATCH 05/20] fix lint --- src/state/CallViewModel/localMember/LocalMember.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index 2f8d11a5..6a9f196e 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -38,7 +38,6 @@ import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; -import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); From 0ebc6078dd5cae4f8a2317e4ffb22f128ebd1e75 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 12:08:59 +0100 Subject: [PATCH 06/20] Update LocalMember.ts --- .../CallViewModel/localMember/LocalMember.ts | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index e2fcc70e..193dd53c 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -407,9 +407,7 @@ export const createLocalMembership$ = ({ matrix: matrixError ?? rtcSessionStatus, media: mediaError ?? mediaState, }; - else { - return TransportState.Waiting; - } + return TransportState.Waiting; }, ), ), @@ -423,19 +421,21 @@ export const createLocalMembership$ = ({ ]) .subscribe(([prev, current]) => { if (!widget) return; + // JOIN prev=false (was left) => current-true (now joiend) if (!prev && current) { - try { - void widget.api.transport.send(ElementWidgetActions.JoinCall, {}); - } catch (e) { - logger.error("Failed to send join action", e); - } + widget.api.transport + .send(ElementWidgetActions.JoinCall, {}) + .catch((e) => { + logger.error("Failed to send join action", e); + }); } + // LEAVE prev=false (was joined) => current-true (now left) if (prev && !current) { - try { - void widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } + widget.api.transport + .send(ElementWidgetActions.HangupCall, {}) + .catch((e) => { + logger.error("Failed to send hangup action", e); + }); } }); @@ -575,8 +575,12 @@ export const createLocalMembership$ = ({ tracks$, participant$, reconnecting$: scope.behavior( - homeserverConnected.rtsSession$.pipe( - map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), + localMemberState$.pipe( + map((state) => { + if (typeof state === "object" && "matrix" in state) + return state.matrix === RTCSessionStatus.Reconnecting; + return false; + }), ), ), disconnected$: scope.behavior( From 6efce232f81528c4df4b61f56393dc62e0040549 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 18:50:19 +0100 Subject: [PATCH 07/20] fix playwright tests --- src/state/CallViewModel/CallViewModel.ts | 63 ++++++++++---- .../CallViewModel/localMember/LocalMember.ts | 82 ++++++++++++------- .../CallViewModel/localMember/Publisher.ts | 60 +++++++------- .../CallViewModel/remoteMembers/Connection.ts | 4 +- 4 files changed, 129 insertions(+), 80 deletions(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index e04f4698..35ab658b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -99,6 +99,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, + TransportState, } from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -577,17 +578,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -635,7 +625,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -648,7 +638,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -739,7 +729,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -1422,6 +1412,37 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; + const errors$ = scope.behavior<{ + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } | null>( + localMembership.localMemberState$.pipe( + map((value) => { + const returnObject: { + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } = {}; + if (value instanceof ElementCallError) return { transportError: value }; + if (value === TransportState.Waiting) return null; + if (value.matrix instanceof ElementCallError) + returnObject.matrixError = value.matrix; + if (value.media instanceof ElementCallError) + returnObject.publishError = value.media; + else if ( + typeof value.media === "object" && + value.media.connection instanceof ElementCallError + ) + returnObject.connectionError = value.media.connection; + return returnObject; + }), + ), + null, + ); + return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, @@ -1438,8 +1459,16 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.localMemberState$.pipe( - filter((v) => v instanceof ElementCallError), + errors$.pipe( + map((errors) => { + return ( + errors?.transportError ?? + errors?.matrixError ?? + errors?.connectionError ?? + null + ); + }), + filter((error) => error !== null), ), null, ), @@ -1472,7 +1501,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index df42cba9..532d5d55 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -42,6 +42,7 @@ import { type Publisher } from "./Publisher.ts"; import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, + FailToStartLivekitConnection, MembershipManagerError, UnknownCallError, } from "../../../utils/errors.ts"; @@ -56,6 +57,7 @@ import { type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; +import { and$ } from "../../../utils/observable.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -86,13 +88,12 @@ export type LocalMemberMediaState = } | PublishState | ElementCallError; -export type LocalMemberMatrixState = Error | RTCSessionStatus; export type LocalMemberState = | ElementCallError | TransportState.Waiting | { media: LocalMemberMediaState; - matrix: LocalMemberMatrixState; + matrix: ElementCallError | RTCSessionStatus; }; /* @@ -220,10 +221,6 @@ export const createLocalMembership$ = ({ ), ); - const localConnectionState$ = localConnection$.pipe( - switchMap((connection) => (connection ? connection.state$ : of(null))), - ); - // MATRIX RELATED // This should be used in a combineLatest with publisher$ to connect. @@ -308,23 +305,27 @@ export const createLocalMembership$ = ({ try { await publisher?.startPublishing(); } catch (error) { - setMediaError(error as ElementCallError); + const message = + error instanceof Error ? error.message : String(error); + setPublishError(new FailToStartLivekitConnection(message)); } } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setMediaError(new UnknownCallError(error as Error)); + setPublishError(new UnknownCallError(error as Error)); } } }, ); - const fatalMediaError$ = new BehaviorSubject(null); - const setMediaError = (e: ElementCallError): void => { - if (fatalMediaError$.value !== null) - logger.error("Multiple Media Errors:", e); - else fatalMediaError$.next(e); + // STATE COMPUTATION + + // These are non fatal since we can join a room and concume media even though publishing failed. + const publishError$ = new BehaviorSubject(null); + const setPublishError = (e: ElementCallError): void => { + if (publishError$.value !== null) logger.error("Multiple Media Errors:", e); + else publishError$.next(e); }; const fatalTransportError$ = new BehaviorSubject( @@ -336,6 +337,10 @@ export const createLocalMembership$ = ({ else fatalTransportError$.next(e); }; + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, @@ -392,22 +397,22 @@ export const createLocalMembership$ = ({ homeserverConnected.rtsSession$, fatalMatrixError$, fatalTransportError$, - fatalMediaError$, + publishError$, ]).pipe( map( ([ mediaState, rtcSessionStatus, - matrixError, - transportError, - mediaError, + fatalMatrixError, + fatalTransportError, + publishError, ]) => { - if (transportError !== null) return transportError; - // `mediaState` will be 'null' until the transport appears. + if (fatalTransportError !== null) return fatalTransportError; + // `mediaState` will be 'null' until the transport/connection appears. if (mediaState && rtcSessionStatus) return { - matrix: matrixError ?? rtcSessionStatus, - media: mediaError ?? mediaState, + matrix: fatalMatrixError ?? rtcSessionStatus, + media: publishError ?? mediaState, }; return TransportState.Waiting; }, @@ -415,6 +420,31 @@ export const createLocalMembership$ = ({ ), ); + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + const matrixAndLivekitConnected$ = scope.behavior( + and$( + homeserverConnected.combined$, + localConnectionState$.pipe( + map((state) => state === ConnectionState.LivekitConnected), + ), + ).pipe( + tap((v) => logger.debug("livekit+matrix: Connected state changed", v)), + ), + ); + + /** + * Whether we should tell the user that we're reconnecting to the call. + */ + const reconnecting$ = scope.behavior( + matrixAndLivekitConnected$.pipe( + pairwise(), + map(([prev, current]) => prev === true && current === false), + ), + ); + // inform the widget about the connect and disconnect intent from the user. scope .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ @@ -576,15 +606,7 @@ export const createLocalMembership$ = ({ localMemberState$, tracks$, participant$, - reconnecting$: scope.behavior( - localMemberState$.pipe( - map((state) => { - if (typeof state === "object" && "matrix" in state) - return state.matrix === RTCSessionStatus.Reconnecting; - return false; - }), - ), - ), + reconnecting$, disconnected$: scope.behavior( homeserverConnected.rtsSession$.pipe( map((state) => state === RTCSessionStatus.Disconnected), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index b32e7e99..df67f179 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,15 +32,8 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { - ConnectionState, - type Connection, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; -import { - ElementCallError, - FailToStartLivekitConnection, -} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -160,27 +153,29 @@ export class Publisher { public async startPublishing(): Promise { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; - const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((state) => { - if (state instanceof Error) { - const error = - state instanceof ElementCallError - ? state - : new FailToStartLivekitConnection(state.message); - reject(error); - } else if (state === ConnectionState.LivekitConnected) { - resolve(); - } else { - this.logger.info("waiting for connection: ", state); - } - }); - try { - await promise; - } catch (e) { - throw e; - } finally { - sub.unsubscribe(); - } + + // we do not need to do this since lk will wait in `localParticipant.publishTrack` + // const { promise, resolve, reject } = Promise.withResolvers(); + // const sub = this.connection.state$.subscribe((state) => { + // if (state instanceof Error) { + // const error = + // state instanceof ElementCallError + // ? state + // : new FailToStartLivekitConnection(state.message); + // reject(error); + // } else if (state === ConnectionState.LivekitConnected) { + // resolve(); + // } else { + // this.logger.info("waiting for connection: ", state); + // } + // }); + // try { + // await promise; + // } catch (e) { + // throw e; + // } finally { + // sub.unsubscribe(); + // } for (const track of this.tracks$.value) { this.logger.info("publish ", this.tracks$.value.length, "tracks"); @@ -188,9 +183,10 @@ export class Publisher { // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { this.logger.error("Failed to publish track", error); - throw new FailToStartLivekitConnection( - error instanceof Error ? error.message : error, - ); + // throw new FailToStartLivekitConnection( + // error instanceof Error ? error.message : error, + // ); + throw error; }); this.logger.info("published track ", track.kind, track.id); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 29ad7a8c..2fd9eaa8 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -150,7 +150,8 @@ export class Connection { throw new InsufficientCapacityError(); } if (e.status === 404) { - // error msg is "Could not establish signal connection: requested room does not exist" + // error msg is "Failed to create call" + // error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists." // The room does not exist. There are two different modes of operation for the SFU: // - the room is created on the fly when connecting (livekit `auto_create` option) // - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service) @@ -172,6 +173,7 @@ export class Connection { } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next(error instanceof Error ? error : new Error(`${error}`)); + // Its okay to ignore the throw. The error is part of the state. throw error; } } From 1941fc9ca1cc17becdc170e5c5b691ae8edd6c0f Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 19:12:52 +0100 Subject: [PATCH 08/20] fix tests. --- src/state/CallViewModel/localMember/LocalMember.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 532d5d55..73908fcb 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -443,6 +443,7 @@ export const createLocalMembership$ = ({ pairwise(), map(([prev, current]) => prev === true && current === false), ), + false, ); // inform the widget about the connect and disconnect intent from the user. From 667a3d0e3d911ad602b48ae4906ef0da6dc3f085 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 19:18:16 +0100 Subject: [PATCH 09/20] fix test not checking for livekit connection state anymore. --- .../CallViewModel/localMember/Publisher.test.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 6d27c042..68845fa2 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -98,7 +98,7 @@ describe("Publisher", () => { ).mockRejectedValue(Error("testError")); await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testError"), + new Error("testError"), ); // does not try other conenction after the first one failed @@ -106,17 +106,6 @@ describe("Publisher", () => { connection.livekitRoom.localParticipant.publishTrack, ).toHaveBeenCalledTimes(1); - // failiour due to connection.state$ - const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next(Error("testStartError")); - - await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testStartError"), - ); - (connection.state$ as BehaviorSubject).next( - beforeState, - ); - // does not try other conenction after the first one failed expect( connection.livekitRoom.localParticipant.publishTrack, From b380532d3080275388810ec124d578dd57a787b5 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:14:13 +0100 Subject: [PATCH 10/20] lots of error logging and fixing playwright --- src/room/GroupCallView.tsx | 1 + src/room/InCallView.tsx | 7 ++++++- src/room/LobbyView.tsx | 4 ++-- src/state/CallViewModel/CallViewModel.ts | 15 +++++++++++++- .../CallViewModel/localMember/LocalMember.ts | 20 +++++++++++++------ .../localMember/LocalTransport.ts | 2 +- .../CallViewModel/remoteMembers/Connection.ts | 14 +++++++++---- 7 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index dfd11ff3..1542678e 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -446,6 +446,7 @@ export const GroupCallView: FC = ({ let body: ReactNode; if (externalError) { + logger.debug("External error occurred:", externalError); // If an error was recorded within this component but outside // GroupCallErrorBoundary, create a component that rethrows the error from // within the error boundary, so it can be handled uniformly diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 7ae3700c..18acf843 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -127,6 +127,7 @@ export const ActiveCall: FC = (props) => { const mediaDevices = useMediaDevices(); const trackProcessorState$ = useTrackProcessorObservable$(); useEffect(() => { + logger.info("START CALL VIEW SCOPE"); const scope = new ObservableScope(); const reactionsReader = new ReactionsReader(scope, props.rtcSession); const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } = @@ -153,6 +154,7 @@ export const ActiveCall: FC = (props) => { vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); return (): void => { + logger.info("END CALL VIEW SCOPE"); scope.end(); }; }, [ @@ -271,7 +273,10 @@ export const InCallView: FC = ({ const ringOverlay = useBehavior(vm.ringOverlay$); const fatalCallError = useBehavior(vm.fatalError$); // Stop the rendering and throw for the error boundary - if (fatalCallError) throw fatalCallError; + if (fatalCallError) { + logger.debug("fatalCallError stop rendering", fatalCallError); + throw fatalCallError; + } // We need to set the proper timings on the animation based upon the sound length. const ringDuration = pickupPhaseAudio?.soundDuration["waiting"] ?? 1; diff --git a/src/room/LobbyView.tsx b/src/room/LobbyView.tsx index ad4f30b3..10e098f1 100644 --- a/src/room/LobbyView.tsx +++ b/src/room/LobbyView.tsx @@ -79,9 +79,9 @@ export const LobbyView: FC = ({ waitingForInvite, }) => { useEffect(() => { - logger.info("[Lifecycle] GroupCallView Component mounted"); + logger.info("[Lifecycle] LobbyView Component mounted"); return (): void => { - logger.info("[Lifecycle] GroupCallView Component unmounted"); + logger.info("[Lifecycle] LobbyView Component unmounted"); }; }, []); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 35ab658b..6a9eadea 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -15,6 +15,7 @@ import { } from "livekit-client"; import { type Room as MatrixRoom } from "matrix-js-sdk"; import { + catchError, combineLatest, distinctUntilChanged, filter, @@ -425,7 +426,18 @@ export function createCallViewModel$( connectionFactory: connectionFactory, inputTransports$: scope.behavior( combineLatest( - [localTransport$, membershipsAndTransports.transports$], + [ + localTransport$.pipe( + catchError((e) => { + logger.info( + "dont pass local transport to createConnectionManager$. localTransport$ threw an error", + e, + ); + return of(null); + }), + ), + membershipsAndTransports.transports$, + ], (localTransport, transports) => { const localTransportAsArray = localTransport ? [localTransport] : []; return transports.mapInner((transports) => [ @@ -1461,6 +1473,7 @@ export function createCallViewModel$( fatalError$: scope.behavior( errors$.pipe( map((errors) => { + logger.debug("errors$ to compute any fatal errors:", errors); return ( errors?.transportError ?? errors?.matrixError ?? diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 73908fcb..40fb62d6 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -324,17 +324,23 @@ export const createLocalMembership$ = ({ // These are non fatal since we can join a room and concume media even though publishing failed. const publishError$ = new BehaviorSubject(null); const setPublishError = (e: ElementCallError): void => { - if (publishError$.value !== null) logger.error("Multiple Media Errors:", e); - else publishError$.next(e); + if (publishError$.value !== null) { + logger.error("Multiple Media Errors:", e); + } else { + publishError$.next(e); + } }; const fatalTransportError$ = new BehaviorSubject( null, ); + const setTransportError = (e: ElementCallError): void => { - if (fatalTransportError$.value !== null) + if (fatalTransportError$.value !== null) { logger.error("Multiple Transport Errors:", e); - else fatalTransportError$.next(e); + } else { + fatalTransportError$.next(e); + } }; const localConnectionState$ = localConnection$.pipe( @@ -386,9 +392,11 @@ export const createLocalMembership$ = ({ ); const fatalMatrixError$ = new BehaviorSubject(null); const setMatrixError = (e: ElementCallError): void => { - if (fatalMatrixError$.value !== null) + if (fatalMatrixError$.value !== null) { logger.error("Multiple Matrix Errors:", e); - else fatalMatrixError$.next(e); + } else { + fatalMatrixError$.next(e); + } }; const localMemberState$ = scope.behavior( diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 0a85bbc1..1320b8c4 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -85,7 +85,7 @@ export const createLocalTransport$ = ({ * The transport that we would personally prefer to publish on (if not for the * transport preferences of others, perhaps). * - * @throws + * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ const preferredTransport$: Behavior = scope.behavior( customLivekitUrl.value$.pipe( diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 2fd9eaa8..c801b3ae 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -29,8 +29,10 @@ import { import { type Behavior } from "../../Behavior.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { + ElementCallError, InsufficientCapacityError, SFURoomCreationRestrictedError, + UnknownCallError, } from "../../../utils/errors.ts"; export type PublishingParticipant = LocalParticipant | RemoteParticipant; @@ -79,9 +81,9 @@ export enum ConnectionState { */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject( - ConnectionState.Initialized, - ); + private readonly _state$ = new BehaviorSubject< + ConnectionState | ElementCallError + >(ConnectionState.Initialized); /** * The current state of the connection to the media transport. @@ -131,6 +133,8 @@ export class Connection { this.stopped = false; try { this._state$.next(ConnectionState.FetchingConfig); + // We should already have this information after creating the localTransport. + // It would probably be better to forward this here. const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; @@ -172,7 +176,9 @@ export class Connection { }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); - this._state$.next(error instanceof Error ? error : new Error(`${error}`)); + this._state$.next( + error instanceof ElementCallError ? error : new UnknownCallError(error), + ); // Its okay to ignore the throw. The error is part of the state. throw error; } From 8dac0366b64d22da6ede6162dc3e2dcf7d57273b Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:17:33 +0100 Subject: [PATCH 11/20] fix lints --- src/state/CallViewModel/localMember/Publisher.test.ts | 6 +----- src/state/CallViewModel/remoteMembers/Connection.ts | 6 +++++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 68845fa2..40763a99 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -26,12 +26,8 @@ import { mockMediaDevices, } from "../../../utils/test"; import { Publisher } from "./Publisher"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection"; +import { type Connection } from "../remoteMembers/Connection"; import { type MuteStates } from "../../MuteStates"; -import { FailToStartLivekitConnection } from "../../../utils/errors"; describe("Publisher", () => { let scope: ObservableScope; diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c801b3ae..6015bf01 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -177,7 +177,11 @@ export class Connection { } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next( - error instanceof ElementCallError ? error : new UnknownCallError(error), + error instanceof ElementCallError + ? error + : error instanceof Error + ? new UnknownCallError(error) + : new UnknownCallError(new Error(`${error}`)), ); // Its okay to ignore the throw. The error is part of the state. throw error; From e626698fda8dd87213ccda763d3f04f86b830478 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:22:55 +0100 Subject: [PATCH 12/20] fix connection tests --- .../remoteMembers/Connection.test.ts | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index a90f0aa2..95ff931e 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -36,7 +36,10 @@ import { } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; -import { FailToGetOpenIdToken } from "../../../utils/errors.ts"; +import { + ElementCallError, + FailToGetOpenIdToken, +} from "../../../utils/errors.ts"; let testScope: ObservableScope; @@ -246,8 +249,11 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState instanceof Error) { - expect(capturedState.message).toContain( + if ( + capturedState instanceof ElementCallError && + capturedState.cause instanceof Error + ) { + expect(capturedState.cause.message).toContain( "SFU Config fetch failed with exception Error", ); expect(connection.transport.livekit_alias).toEqual( @@ -308,8 +314,13 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState instanceof Error) { - expect(capturedState.message).toContain("Failed to connect to livekit"); + if ( + capturedState instanceof ElementCallError && + capturedState.cause instanceof Error + ) { + expect(capturedState.cause.message).toContain( + "Failed to connect to livekit", + ); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); From aabd76044b7a4d9bc259f5ff1c76ac1eab3d8682 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 10 Dec 2025 21:25:35 +0100 Subject: [PATCH 13/20] fix lint --- src/state/CallViewModel/CallViewModel.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 6a9eadea..aac88a3b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -428,7 +428,7 @@ export function createCallViewModel$( combineLatest( [ localTransport$.pipe( - catchError((e) => { + catchError((e: unknown) => { logger.info( "dont pass local transport to createConnectionManager$. localTransport$ threw an error", e, From 170a38c0bae050c752d99aed150f2dfd9605b86d Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 11:30:14 +0100 Subject: [PATCH 14/20] fix playwright incompatible browser toast --- playwright/fixtures/widget-user.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index 433c960b..b0c46788 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -113,7 +113,8 @@ async function registerUser( await page.getByRole("button", { name: "Register" }).click(); const continueButton = page.getByRole("button", { name: "Continue" }); try { - await expect(continueButton).toBeVisible({ timeout: 5000 }); + await expect(continueButton).toBeVisible({ timeout: 700 }); + // why do we need to put in the passwor if there is a continue button? await page .getByRole("textbox", { name: "Password", exact: true }) .fill(PASSWORD); @@ -124,6 +125,16 @@ async function registerUser( await expect( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); + + // Dismiss incompatible browser toast + const dismissButton = page.getByRole("button", { name: "Dismiss" }); + try { + await expect(dismissButton).toBeVisible({ timeout: 700 }); + await dismissButton.click(); + } catch { + // dismissButton not visible, continue as normal + } + await setDevToolElementCallDevUrl(page); const clientHandle = await page.evaluateHandle(() => From 328cc7133a2377043061cf0e162547661a8eca55 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 11:32:28 +0100 Subject: [PATCH 15/20] update playwright so that we do not even need the dismiss anymore. --- package.json | 2 +- yarn.lock | 30 +++++++++++++++--------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/package.json b/package.json index 21c870ad..f65865e4 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "@opentelemetry/sdk-trace-base": "^2.0.0", "@opentelemetry/sdk-trace-web": "^2.0.0", "@opentelemetry/semantic-conventions": "^1.25.1", - "@playwright/test": "^1.56.1", + "@playwright/test": "^1.57.0", "@radix-ui/react-dialog": "^1.0.4", "@radix-ui/react-slider": "^1.1.2", "@radix-ui/react-visually-hidden": "^1.0.3", diff --git a/yarn.lock b/yarn.lock index f0ca83a7..02a4a3ce 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3373,14 +3373,14 @@ __metadata: languageName: node linkType: hard -"@playwright/test@npm:^1.56.1": - version: 1.56.1 - resolution: "@playwright/test@npm:1.56.1" +"@playwright/test@npm:^1.57.0": + version: 1.57.0 + resolution: "@playwright/test@npm:1.57.0" dependencies: - playwright: "npm:1.56.1" + playwright: "npm:1.57.0" bin: playwright: cli.js - checksum: 10c0/2b5b0e1f2e6a18f6e5ce6897c7440ca78f64e0b004834e9808e93ad2b78b96366b562ae4366602669cf8ad793a43d85481b58541e74be71e905e732d833dd691 + checksum: 10c0/35ba4b28be72bf0a53e33dbb11c6cff848fb9a37f49e893ce63a90675b5291ec29a1ba82c8a3b043abaead129400f0589623e9ace2e6a1c8eaa409721ecc3774 languageName: node linkType: hard @@ -7492,7 +7492,7 @@ __metadata: "@opentelemetry/sdk-trace-base": "npm:^2.0.0" "@opentelemetry/sdk-trace-web": "npm:^2.0.0" "@opentelemetry/semantic-conventions": "npm:^1.25.1" - "@playwright/test": "npm:^1.56.1" + "@playwright/test": "npm:^1.57.0" "@radix-ui/react-dialog": "npm:^1.0.4" "@radix-ui/react-slider": "npm:^1.1.2" "@radix-ui/react-visually-hidden": "npm:^1.0.3" @@ -11177,27 +11177,27 @@ __metadata: languageName: node linkType: hard -"playwright-core@npm:1.56.1": - version: 1.56.1 - resolution: "playwright-core@npm:1.56.1" +"playwright-core@npm:1.57.0": + version: 1.57.0 + resolution: "playwright-core@npm:1.57.0" bin: playwright-core: cli.js - checksum: 10c0/ffd40142b99c68678b387445d5b42f1fee4ab0b65d983058c37f342e5629f9cdbdac0506ea80a0dfd41a8f9f13345bad54e9a8c35826ef66dc765f4eb3db8da7 + checksum: 10c0/798e35d83bf48419a8c73de20bb94d68be5dde68de23f95d80a0ebe401e3b83e29e3e84aea7894d67fa6c79d2d3d40cc5bcde3e166f657ce50987aaa2421b6a9 languageName: node linkType: hard -"playwright@npm:1.56.1": - version: 1.56.1 - resolution: "playwright@npm:1.56.1" +"playwright@npm:1.57.0": + version: 1.57.0 + resolution: "playwright@npm:1.57.0" dependencies: fsevents: "npm:2.3.2" - playwright-core: "npm:1.56.1" + playwright-core: "npm:1.57.0" dependenciesMeta: fsevents: optional: true bin: playwright: cli.js - checksum: 10c0/8e9965aede86df0f4722063385748498977b219630a40a10d1b82b8bd8d4d4e9b6b65ecbfa024331a30800163161aca292fb6dd7446c531a1ad25f4155625ab4 + checksum: 10c0/ab03c99a67b835bdea9059f516ad3b6e42c21025f9adaa161a4ef6bc7ca716dcba476d287140bb240d06126eb23f889a8933b8f5f1f1a56b80659d92d1358899 languageName: node linkType: hard From 08306d663a16ad6193d7bb395649c0a228ffe160 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 16:04:12 +0100 Subject: [PATCH 16/20] remove duplicated connecting state and update Test setup --- .../remoteMembers/Connection.test.ts | 40 +++++++++++-------- .../CallViewModel/remoteMembers/Connection.ts | 22 +++++----- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 95ff931e..8f9471d0 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -50,11 +50,6 @@ let fakeLivekitRoom: MockedObject; let localParticipantEventEmiter: EventEmitter; let fakeLocalParticipant: MockedObject; -let fakeRoomEventEmiter: EventEmitter; -// let fakeMembershipsFocusMap$: BehaviorSubject< -// { membership: CallMembership; transport: LivekitTransport }[] -// >; - const livekitFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", @@ -91,22 +86,25 @@ function setupTest(): void { localParticipantEventEmiter, ), } as unknown as LocalParticipant); - fakeRoomEventEmiter = new EventEmitter(); + const fakeRoomEventEmitter = new EventEmitter(); fakeLivekitRoom = vi.mocked({ connect: vi.fn(), disconnect: vi.fn(), remoteParticipants: new Map(), localParticipant: fakeLocalParticipant, state: LivekitConnectionState.Disconnected, - on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter), - off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter), - addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter), + on: fakeRoomEventEmitter.on.bind(fakeRoomEventEmitter), + off: fakeRoomEventEmitter.off.bind(fakeRoomEventEmitter), + addListener: fakeRoomEventEmitter.addListener.bind(fakeRoomEventEmitter), removeListener: - fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), + fakeRoomEventEmitter.removeListener.bind(fakeRoomEventEmitter), removeAllListeners: - fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), + fakeRoomEventEmitter.removeAllListeners.bind(fakeRoomEventEmitter), setE2EEEnabled: vi.fn().mockResolvedValue(undefined), + emit: (eventName: string | symbol, ...args: unknown[]) => { + fakeRoomEventEmitter.emit(eventName, ...args); + }, } as unknown as LivekitRoom); } @@ -129,7 +127,13 @@ function setupRemoteConnection(): Connection { }); fakeLivekitRoom.connect.mockImplementation(async (): Promise => { + const changeEv = RoomEvent.ConnectionStateChanged; + + fakeLivekitRoom.state = LivekitConnectionState.Connecting; + fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state); fakeLivekitRoom.state = LivekitConnectionState.Connected; + fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state); + return Promise.resolve(); }); @@ -350,8 +354,10 @@ describe("Start connection states", () => { expect(initialState).toEqual(ConnectionState.Initialized); const fetchingState = capturedStates.shift(); expect(fetchingState).toEqual(ConnectionState.FetchingConfig); + const disconnectedState = capturedStates.shift(); + expect(disconnectedState).toEqual(ConnectionState.LivekitDisconnected); const connectingState = capturedStates.shift(); - expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom); + expect(connectingState).toEqual(ConnectionState.LivekitConnecting); const connectedState = capturedStates.shift(); expect(connectedState).toEqual(ConnectionState.LivekitConnected); }); @@ -419,7 +425,7 @@ describe("Publishing participants observations", () => { ); participants.forEach((p) => - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p), ); // At this point there should be no publishers @@ -432,7 +438,7 @@ describe("Publishing participants observations", () => { fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2), ]; participants.forEach((p) => - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p), ); // At this point there should be no publishers @@ -462,7 +468,7 @@ describe("Publishing participants observations", () => { ); for (const participant of participants) { - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant); } // At this point there should be no publishers @@ -471,7 +477,7 @@ describe("Publishing participants observations", () => { participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)]; for (const participant of participants) { - fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); + fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant); } // We should have bob has a publisher now @@ -488,7 +494,7 @@ describe("Publishing participants observations", () => { (p) => p.identity !== "@bob:example.org:DEV111", ); - fakeRoomEventEmiter.emit( + fakeLivekitRoom.emit( RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), ); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 6015bf01..8f8c0924 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -18,7 +18,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map } from "rxjs"; +import { BehaviorSubject, map, skip, skipWhile } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -61,7 +61,6 @@ export enum ConnectionState { /** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */ FetchingConfig = "FetchingConfig", Stopped = "Stopped", - ConnectingToLkRoom = "ConnectingToLkRoom", /** The same as ConnectionState.Disconnected from `livekit-client` */ LivekitDisconnected = "disconnected", /** The same as ConnectionState.Connecting from `livekit-client` */ @@ -139,7 +138,17 @@ export class Connection { // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._state$.next(ConnectionState.ConnectingToLkRoom); + // Setup observer once we are done with getSFUConfigWithOpenID + connectionStateObserver(this.livekitRoom) + .pipe( + this.scope.bind(), + map((s) => s as unknown as ConnectionState), + ) + .subscribe((lkState) => { + // It is save to cast lkState to ConnectionState as they are fully overlapping. + this._state$.next(lkState); + }); + try { await this.livekitRoom.connect(url, jwt); } catch (e) { @@ -167,13 +176,6 @@ export class Connection { } // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - - connectionStateObserver(this.livekitRoom) - .pipe(this.scope.bind()) - .subscribe((lkState) => { - // It is save to cast lkState to ConnectionState as they are fully overlapping. - this._state$.next(lkState as unknown as ConnectionState); - }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next( From 9a7e797af48da255682e6db017dafd0e9a4624f0 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 16:17:45 +0100 Subject: [PATCH 17/20] fix lint --- src/state/CallViewModel/remoteMembers/Connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 8f8c0924..8b4479e8 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -18,7 +18,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map, skip, skipWhile } from "rxjs"; +import { BehaviorSubject, map } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { From 207b161b3ba25a7eae243a741c0fb7c3dfaf8eb9 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:17:56 +0100 Subject: [PATCH 18/20] fix logger and dismiss button presses --- playwright/fixtures/widget-user.ts | 10 +++++++++- src/room/GroupCallView.tsx | 1 - src/room/InCallView.tsx | 4 +++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index b0c46788..51dffbc6 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -126,8 +126,16 @@ async function registerUser( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); + await page.pause(); + const browserUnsupportedToast = page + .getByText("Element does not support this browser") + .locator("..") + .locator(".."); + // Dismiss incompatible browser toast - const dismissButton = page.getByRole("button", { name: "Dismiss" }); + const dismissButton = browserUnsupportedToast.getByRole("button", { + name: "Dismiss", + }); try { await expect(dismissButton).toBeVisible({ timeout: 700 }); await dismissButton.click(); diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 1542678e..dfd11ff3 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -446,7 +446,6 @@ export const GroupCallView: FC = ({ let body: ReactNode; if (externalError) { - logger.debug("External error occurred:", externalError); // If an error was recorded within this component but outside // GroupCallErrorBoundary, create a component that rethrows the error from // within the error boundary, so it can be handled uniformly diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 18acf843..add8154a 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -24,7 +24,7 @@ import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import classNames from "classnames"; import { BehaviorSubject, map } from "rxjs"; import { useObservable } from "observable-hooks"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { VoiceCallSolidIcon, VolumeOnSolidIcon, @@ -109,6 +109,8 @@ import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.t import { type Layout } from "../state/layout-types.ts"; import { ObservableScope } from "../state/ObservableScope.ts"; +const logger = rootLogger.getChild("[InCallView]"); + const maxTapDurationMs = 400; export interface ActiveCallProps From 8225e4f2608a10e0662e5004df2ecddef1bea0b8 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:22:02 +0100 Subject: [PATCH 19/20] remove page.pause --- playwright/fixtures/widget-user.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index 51dffbc6..efff8a80 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -126,7 +126,6 @@ async function registerUser( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible(); - await page.pause(); const browserUnsupportedToast = page .getByText("Element does not support this browser") .locator("..") From 7edc97b9175dddd27ee3f90acb48de5de13fb3f4 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 11 Dec 2025 17:24:35 +0100 Subject: [PATCH 20/20] remove continue button things --- playwright/fixtures/widget-user.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/playwright/fixtures/widget-user.ts b/playwright/fixtures/widget-user.ts index efff8a80..6236928c 100644 --- a/playwright/fixtures/widget-user.ts +++ b/playwright/fixtures/widget-user.ts @@ -111,17 +111,7 @@ async function registerUser( await page.getByRole("textbox", { name: "Confirm password" }).click(); await page.getByRole("textbox", { name: "Confirm password" }).fill(PASSWORD); await page.getByRole("button", { name: "Register" }).click(); - const continueButton = page.getByRole("button", { name: "Continue" }); - try { - await expect(continueButton).toBeVisible({ timeout: 700 }); - // why do we need to put in the passwor if there is a continue button? - await page - .getByRole("textbox", { name: "Password", exact: true }) - .fill(PASSWORD); - await continueButton.click(); - } catch { - // continueButton not visible, continue as normal - } + await expect( page.getByRole("heading", { name: `Welcome ${username}` }), ).toBeVisible();