diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 75438f7f..43602716 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -160,6 +160,7 @@ export const GroupCallView: FC = ({ }, [rtcSession]); // TODO move this into the callViewModel LocalMembership.ts + // We might actually not need this at all. Since we get into fatalError on those errors already? useTypedEventEmitter( rtcSession, MatrixRTCSessionEvent.MembershipManagerError, diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..9bfa979c 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -452,18 +452,14 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { - return enterRTCSession( - matrixRTCSession, - transport, - connectOptions$.value, - ); + joinMatrixRTC: (transport: LivekitTransport) => { + enterRTCSession(matrixRTCSession, transport, connectOptions$.value); }, createPublisherFactory: (connection: Connection) => { return new Publisher( @@ -573,17 +569,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -631,7 +616,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -644,7 +629,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -735,7 +720,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -827,11 +812,17 @@ export function createCallViewModel$( }), ); - const leave$: Observable<"user" | "timeout" | "decline" | "allOthersLeft"> = - merge( - autoLeave$, - merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe(scope.share); + const shouldLeave$: Observable< + "user" | "timeout" | "decline" | "allOthersLeft" + > = merge( + autoLeave$, + merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), + ).pipe(scope.share); + + shouldLeave$.pipe(scope.bind()).subscribe((reason) => { + logger.info(`Call left due to ${reason}`); + localMembership.requestDisconnect(); + }); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1453,7 +1444,7 @@ export function createCallViewModel$( autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, - leave$: leave$, + leave$: shouldLeave$, hangup: (): void => userHangup$.next(), join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, @@ -1500,7 +1491,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts index 1f61e533..87ca35d0 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.test.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.test.ts @@ -97,106 +97,106 @@ describe("createHomeserverConnected$", () => { // LLM generated test cases. They are a bit overkill but I improved the mocking so it is // easy enough to read them so I think they can stay. it("is false when sync state is not Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); - expect(hsConnected$.value).toBe(false); + const hsConnected = createHomeserverConnected$(scope, client, session); + expect(hsConnected.combined$.value).toBe(false); }); it("remains false while membership status is not Connected even if sync is Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // membership still disconnected + expect(hsConnected.combined$.value).toBe(false); // membership still disconnected }); it("is false when membership status transitions to Connected but ProbablyLeft is true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Make sync loop OK client.setSyncState(SyncState.Syncing); // Indicate probable leave before connection session.setProbablyLeft(true); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("becomes true only when all three conditions are satisfied", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // 1. Sync loop connected client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); // not yet membership connected + expect(hsConnected.combined$.value).toBe(false); // not yet membership connected // 2. Membership connected session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); // probablyLeft is false + expect(hsConnected.combined$.value).toBe(true); // probablyLeft is false }); it("drops back to false when sync loop leaves Syncing", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Reach connected state client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Sync loop error => should flip false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops back to false when membership status becomes disconnected", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setMembershipStatus(Status.Disconnected); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("drops to false when ProbablyLeft is emitted after being true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); client.setSyncState(SyncState.Syncing); session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Simulate clearing the flag (in realistic scenario membership manager would update) session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); }); it("composite sequence reflects each individual failure reason", () => { - const hsConnected$ = createHomeserverConnected$(scope, client, session); + const hsConnected = createHomeserverConnected$(scope, client, session); // Initially false (sync error + disconnected + not probably left) - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix sync only client.setSyncState(SyncState.Syncing); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Fix membership session.setMembershipStatus(Status.Connected); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Introduce probablyLeft -> false session.setProbablyLeft(true); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); // Restore notProbablyLeft -> true again session.setProbablyLeft(false); - expect(hsConnected$.value).toBe(true); + expect(hsConnected.combined$.value).toBe(true); // Drop sync -> false client.setSyncState(SyncState.Error); - expect(hsConnected$.value).toBe(false); + expect(hsConnected.combined$.value).toBe(false); }); }); diff --git a/src/state/CallViewModel/localMember/HomeserverConnected.ts b/src/state/CallViewModel/localMember/HomeserverConnected.ts index e1c28078..c8bcd021 100644 --- a/src/state/CallViewModel/localMember/HomeserverConnected.ts +++ b/src/state/CallViewModel/localMember/HomeserverConnected.ts @@ -25,6 +25,11 @@ import { type NodeStyleEventEmitter } from "../../../utils/test"; */ const logger = rootLogger.getChild("[HomeserverConnected]"); +export interface HomeserverConnected { + combined$: Behavior; + rtsSession$: Behavior; +} + /** * Behavior representing whether we consider ourselves connected to the Matrix homeserver * for the purposes of a MatrixRTC session. @@ -39,7 +44,7 @@ export function createHomeserverConnected$( client: NodeStyleEventEmitter & Pick, matrixRTCSession: NodeStyleEventEmitter & Pick, -): Behavior { +): HomeserverConnected { const syncing$ = ( fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]> ).pipe( @@ -47,12 +52,15 @@ export function createHomeserverConnected$( map(([state]) => state === SyncState.Syncing), ); - const membershipConnected$ = fromEvent( - matrixRTCSession, - MembershipManagerEvent.StatusChanged, - ).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), + const rtsSession$ = scope.behavior( + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + map(() => matrixRTCSession.membershipStatus ?? Status.Unknown), + ), + Status.Unknown, + ); + + const membershipConnected$ = rtsSession$.pipe( + map((status) => status === Status.Connected), ); // This is basically notProbablyLeft$ @@ -71,15 +79,13 @@ export function createHomeserverConnected$( map(() => matrixRTCSession.probablyLeft !== true), ); - const connectedCombined$ = and$( - syncing$, - membershipConnected$, - certainlyConnected$, - ).pipe( - tap((connected) => { - logger.info(`Homeserver connected update: ${connected}`); - }), + const combined$ = scope.behavior( + and$(syncing$, membershipConnected$, certainlyConnected$).pipe( + tap((connected) => { + logger.info(`Homeserver connected update: ${connected}`); + }), + ), ); - return scope.behavior(connectedCombined$); + return { combined$, rtsSession$ }; } diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index cff5c06d..1ef7abd6 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { + Status, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -51,7 +52,7 @@ vi.mock("@livekit/components-core", () => ({ describe("LocalMembership", () => { describe("enterRTCSession", () => { - it("It joins the correct Session", async () => { + it("It joins the correct Session", () => { const focusFromOlderMembership = { type: "livekit", livekit_service_url: "http://my-oldest-member-service-url.com", @@ -107,7 +108,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -136,7 +137,7 @@ describe("LocalMembership", () => { ); }); - it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { + it("It should not fail with configuration error if homeserver config has livekit url but not fallback", () => { mockConfig({}); vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ "org.matrix.msc4143.rtc_foci": [ @@ -165,7 +166,7 @@ describe("LocalMembership", () => { joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession( + enterRTCSession( mockedSession, { livekit_alias: "roomId", @@ -190,7 +191,6 @@ describe("LocalMembership", () => { leaveRoomSession: () => {}, } as unknown as MatrixRTCSession, muteStates: mockMuteStates(), - isHomeserverConnected: constant(true), trackProcessorState$: constant({ supported: false, processor: undefined, @@ -198,7 +198,10 @@ describe("LocalMembership", () => { logger: logger, createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, - homeserverConnected$: constant(true), + homeserverConnected: { + combined$: constant(true), + rtsSession$: constant(Status.Connected), + }, }; it("throws error on missing RTC config error", () => { @@ -258,8 +261,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConnectionState.Connected), + state: LivekitConnectionState.Connected, }), transport: aTransport, } as unknown as Connection, @@ -268,7 +270,7 @@ describe("LocalMembership", () => { connectionManagerData.add( { state$: constant({ - state: "ConnectedToLkRoom", + state: LivekitConnectionState.Connected, }), transport: bTransport, } as unknown as Connection, @@ -443,7 +445,7 @@ describe("LocalMembership", () => { connectionManagerData$.next(new Epoch(connectionManagerData)); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Initialized, + state: LivekitConnectionState.Connected, }); expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -473,7 +475,7 @@ describe("LocalMembership", () => { publishResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); @@ -482,7 +484,7 @@ describe("LocalMembership", () => { await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.Connected, + state: RTCBackendState.ConnectedAndPublishing, }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 60ae79b8..33af5192 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,10 +11,11 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState, + ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { + Status as RTCSessionStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -27,7 +28,7 @@ import { map, type Observable, of, - scan, + pairwise, startWith, switchMap, tap, @@ -37,10 +38,9 @@ import { deepCompare } from "matrix-js-sdk/lib/utils"; import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { ObservableScope } from "../../ObservableScope"; +import { type ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; -import { and$ } from "../../../utils/observable"; import { ElementCallError, MembershipManagerError, @@ -51,7 +51,11 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + type ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; +import { type HomeserverConnected } from "./HomeserverConnected.ts"; export enum RTCBackendState { Error = "error", @@ -59,47 +63,32 @@ export enum RTCBackendState { WaitingForTransport = "waiting_for_transport", /** A connection appeared so we can initialise the publisher */ WaitingForConnection = "waiting_for_connection", - /** Connection and transport arrived, publisher Initialized */ - Initialized = "Initialized", + /** Implies lk connection is connected */ CreatingTracks = "creating_tracks", + /** Implies lk connection is connected */ ReadyToPublish = "ready_to_publish", + /** Implies lk connection is connected */ WaitingToPublish = "waiting_to_publish", - Connected = "connected", - Disconnected = "disconnected", - Disconnecting = "disconnecting", + /** Implies lk connection is connected */ + ConnectedAndPublishing = "fully_connected", } -type LocalMemberRtcBackendState = +type LocalMemberRTCBackendState = | { state: RTCBackendState.Error; error: ElementCallError } - | { state: RTCBackendState.WaitingForTransport } - | { state: RTCBackendState.WaitingForConnection } - | { state: RTCBackendState.Initialized } - | { state: RTCBackendState.CreatingTracks } - | { state: RTCBackendState.ReadyToPublish } - | { state: RTCBackendState.WaitingToPublish } - | { state: RTCBackendState.Connected } - | { state: RTCBackendState.Disconnected } - | { state: RTCBackendState.Disconnecting }; + | { state: Exclude } + | ConnectionState; -export enum MatrixState { +export enum MatrixAdditionalState { WaitingForTransport = "waiting_for_transport", - Ready = "ready", - Connecting = "connecting", - Connected = "connected", - Disconnected = "disconnected", - Error = "Error", } type LocalMemberMatrixState = - | { state: MatrixState.Connected } - | { state: MatrixState.WaitingForTransport } - | { state: MatrixState.Ready } - | { state: MatrixState.Connecting } - | { state: MatrixState.Disconnected } - | { state: MatrixState.Error; error: Error }; + | { state: MatrixAdditionalState.WaitingForTransport } + | { state: "Error"; error: Error } + | { state: RTCSessionStatus }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -122,8 +111,8 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (transport: LivekitTransport) => Promise; - homeserverConnected$: Behavior; + joinMatrixRTC: (transport: LivekitTransport) => void; + homeserverConnected: HomeserverConnected; localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, @@ -149,7 +138,7 @@ export const createLocalMembership$ = ({ scope, connectionManager, localTransport$: localTransportCanThrow$, - homeserverConnected$, + homeserverConnected, createPublisherFactory, joinMatrixRTC, logger: parentLogger, @@ -175,10 +164,14 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - homeserverConnected$: Behavior; - // this needs to be discussed - /** @deprecated use state instead*/ + /** Shorthand for connectionState.matrix.state === Status.Reconnecting + * Direct translation to the js-sdk membership manager connection `Status`. + */ reconnecting$: Behavior; + /** Shorthand for connectionState.matrix.state === Status.Disconnected + * Direct translation to the js-sdk membership manager connection `Status`. + */ + disconnected$: Behavior; } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); @@ -232,49 +225,31 @@ export const createLocalMembership$ = ({ // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - const connected$ = scope.behavior( - and$( - homeserverConnected$.pipe( - tap((v) => logger.debug("matrix: Connected state changed", v)), - ), - localConnectionState$.pipe( - switchMap((state) => { - logger.debug("livekit: Connected state changed", state); - if (!state) return of(false); - if (state.state === "ConnectedToLkRoom") { - logger.debug( - "livekit: Connected state changed (inner livekitConnectionState$)", - state.livekitConnectionState$.value, - ); - return state.livekitConnectionState$.pipe( - map((lkState) => lkState === ConnectionState.Connected), - ); - } - return of(false); - }), - ), - ).pipe(tap((v) => logger.debug("combined: Connected state changed", v))), - ); + // TODO remove this and just make it one single check of livekitConnectionState$ + // const connected$ = scope.behavior( + // localConnectionState$.pipe( + // switchMap((state) => { + // logger.debug("livekit: Connected state changed", state); + // if (!state) return of(false); + // if (state.state === "ConnectedToLkRoom") { + // logger.debug( + // "livekit: Connected state changed (inner livekitConnectionState$)", + // state.livekitConnectionState$.value, + // ); + // return state.livekitConnectionState$.pipe( + // map((lkState) => lkState === ConnectionState.Connected), + // ); + // } + // return of(false); + // }), + // ), + // ); // MATRIX RELATED - // /** - // * Whether we should tell the user that we're reconnecting to the call. - // */ - // DISCUSSION is there a better way to do this? - // sth that is more deriectly implied from the membership manager of the js sdk. (fromEvent(matrixRTCSession, Reconnecting)) ??? or similar const reconnecting$ = scope.behavior( - connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), ), ); @@ -374,8 +349,9 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + const livekitState$: Behavior = scope.behavior( combineLatest([ + localConnectionState$, publisher$, localTransport$, tracks$.pipe( @@ -389,10 +365,12 @@ export const createLocalMembership$ = ({ map(() => true), startWith(false), ), + // TODO use local connection state here to give the full pciture of the livekit state! fatalLivekitError$, ]).pipe( map( ([ + localConnectionState, publisher, localTransport, tracks, @@ -411,13 +389,21 @@ export const createLocalMembership$ = ({ const hasTracks = tracks.length > 0; if (!localTransport) return { state: RTCBackendState.WaitingForTransport }; - if (!publisher) + if (!localConnectionState) return { state: RTCBackendState.WaitingForConnection }; - if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if ( + localConnectionState.state !== LivekitConnectionState.Connected || + !publisher + ) + // pass through the localConnectionState while we do not yet have a publisher or the state + // of the connection is not yet connected + return { state: localConnectionState.state }; + if (!shouldStartTracks) + return { state: LivekitConnectionState.Connected }; if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.Connected }; + return { state: RTCBackendState.ConnectedAndPublishing }; }, ), distinctUntilChanged(deepCompare), @@ -431,58 +417,70 @@ export const createLocalMembership$ = ({ else fatalMatrixError$.next(e); }; const matrixState$: Behavior = scope.behavior( - combineLatest([ - localTransport$, - connectRequested$, - homeserverConnected$, - ]).pipe( - map(([localTransport, connectRequested, homeserverConnected]) => { - if (!localTransport) return { state: MatrixState.WaitingForTransport }; - if (!connectRequested) return { state: MatrixState.Ready }; - if (!homeserverConnected) return { state: MatrixState.Connecting }; - return { state: MatrixState.Connected }; + combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( + map(([localTransport, rtcSessionStatus]) => { + if (!localTransport) + return { state: MatrixAdditionalState.WaitingForTransport }; + return { state: rtcSessionStatus }; }), ), ); - // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + // inform the widget about the connect and disconnect intent from the user. + scope + .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + undefined, + connectRequested$.value, + ]) + .subscribe(([prev, current]) => { + if (!widget) return; + if (!prev && current) { + try { + void widget.api.transport.send(ElementWidgetActions.JoinCall, {}); + } catch (e) { + logger.error("Failed to send join action", e); + } + } + if (prev && !current) { + try { + void widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + } + }); + + combineLatest([muteStates.video.enabled$, homeserverConnected.combined$]) + .pipe(scope.bind()) + .subscribe(([videoEnabled, connected]) => { + if (!connected) return; + void matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"); + }); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( scope.behavior(combineLatest([localTransport$, connectRequested$])), async ([transport, shouldConnect]) => { + if (!transport) return; + // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. if (!shouldConnect) return; - if (!transport) return; try { - await joinMatrixRTC(transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) setMatrixError(new MembershipManagerError(error)); } - // Update our member event when our mute state changes. - const callIntentScope = new ObservableScope(); - // because this uses its own scope, we can start another reconciliation for the duration of one connection. - callIntentScope.reconcile( - muteStates.video.enabled$, - async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - callIntentScope.end(); + return Promise.resolve(async (): Promise => { try { - // Update matrixRTCSession to allow udpating the transport without leaving the session! - await matrixRTCSession.leaveRoomSession(); + // TODO Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(1000); } catch (e) { logger.error("Error leaving RTC session", e); } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; + }); }, ); @@ -497,7 +495,7 @@ export const createLocalMembership$ = ({ // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. // TODO refactor this based no livekitState$ - combineLatest([participant$, homeserverConnected$]) + combineLatest([participant$, homeserverConnected.combined$]) .pipe(scope.bind()) .subscribe(([participant, connected]) => { if (!participant) return; @@ -590,8 +588,15 @@ export const createLocalMembership$ = ({ }, tracks$, participant$, - homeserverConnected$, reconnecting$, + disconnected$: scope.behavior( + matrixState$.pipe( + map( + (sessionStatus) => + sessionStatus.state === RTCSessionStatus.Disconnected, + ), + ), + ), sharingScreen$, toggleScreenSharing, connection$: localConnection$, @@ -626,11 +631,11 @@ interface EnterRTCSessionOptions { * @throws If the widget could not send ElementWidgetActions.JoinCall action. */ // Exported for unit testing -export async function enterRTCSession( +export function enterRTCSession( rtcSession: MatrixRTCSession, transport: LivekitTransport, { encryptMedia, matrixRTCMode }: EnterRTCSessionOptions, -): Promise { +): void { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -669,7 +674,4 @@ export async function enterRTCSession( unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0, }, ); - if (widget) { - await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); - } } diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 9b3e5b2a..5468d1ff 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -53,8 +53,7 @@ describe("Publisher", () => { scope = new ObservableScope(); connection = { state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConenctionState.Connected), + state: LivekitConenctionState.Connected, }), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 326dedaf..6e4a9b35 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -160,7 +160,7 @@ export class Publisher { const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { switch (s.state) { - case "ConnectedToLkRoom": + case LivekitConnectionState.Connected: resolve(); break; case "FailedToStart": diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 2ead768b..efee1ccb 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -125,7 +125,10 @@ function setupRemoteConnection(): Connection { }; }); - fakeLivekitRoom.connect.mockResolvedValue(undefined); + fakeLivekitRoom.connect.mockImplementation(async (): Promise => { + fakeLivekitRoom.state = LivekitConnectionState.Connected; + return Promise.resolve(); + }); return new Connection(opts, logger); } @@ -309,7 +312,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); - if (capturedState && capturedState?.state === "FailedToStart") { + if (capturedState && capturedState.state === "FailedToStart") { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); @@ -345,7 +348,7 @@ describe("Start connection states", () => { const connectingState = capturedStates.shift(); expect(connectingState?.state).toEqual("ConnectingToLkRoom"); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("ConnectedToLkRoom"); + expect(connectedState?.state).toEqual("connected"); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 4f3bbda4..962f56d9 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState as LivekitConenctionState, + type ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -47,17 +47,17 @@ export interface ConnectionOpts { /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory: () => LivekitRoom; } - +export enum ConnectionAdditionalState { + Initialized = "Initialized", + FetchingConfig = "FetchingConfig", + // FailedToStart = "FailedToStart", + Stopped = "Stopped", + ConnectingToLkRoom = "ConnectingToLkRoom", +} export type ConnectionState = - | { state: "Initialized" } - | { state: "FetchingConfig" } - | { state: "ConnectingToLkRoom" } - | { - state: "ConnectedToLkRoom"; - livekitConnectionState$: Behavior; - } - | { state: "FailedToStart"; error: Error } - | { state: "Stopped" }; + | { state: ConnectionAdditionalState } + | { state: LivekitConnectionState } + | { state: "FailedToStart"; error: Error }; /** * A connection to a Matrix RTC LiveKit backend. @@ -67,7 +67,7 @@ export type ConnectionState = export class Connection { // Private Behavior private readonly _state$ = new BehaviorSubject({ - state: "Initialized", + state: ConnectionAdditionalState.Initialized, }); /** @@ -118,14 +118,14 @@ export class Connection { this.stopped = false; try { this._state$.next({ - state: "FetchingConfig", + state: ConnectionAdditionalState.FetchingConfig, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; this._state$.next({ - state: "ConnectingToLkRoom", + state: ConnectionAdditionalState.ConnectingToLkRoom, }); try { await this.livekitRoom.connect(url, jwt); @@ -154,12 +154,13 @@ export class Connection { // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._state$.next({ - state: "ConnectedToLkRoom", - livekitConnectionState$: this.scope.behavior( - connectionStateObserver(this.livekitRoom), - ), - }); + connectionStateObserver(this.livekitRoom) + .pipe(this.scope.bind()) + .subscribe((lkState) => { + this._state$.next({ + state: lkState, + }); + }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ @@ -191,7 +192,7 @@ export class Connection { if (this.stopped) return; await this.livekitRoom.disconnect(); this._state$.next({ - state: "Stopped", + state: ConnectionAdditionalState.Stopped, }); this.stopped = true; }