diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 3c15958a..e06990b2 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -94,14 +94,13 @@ import { type SpotlightLandscapeLayoutMedia, type SpotlightPortraitLayoutMedia, } from "../layout-types.ts"; -import { type ElementCallError } from "../../utils/errors.ts"; +import { ElementCallError } from "../../utils/errors.ts"; import { type ObservableScope } from "../ObservableScope.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./localMember/LocalMembership.ts"; +} from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createMemberships$, @@ -452,13 +451,13 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, - homeserverConnected$: createHomeserverConnected$( + homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), muteStates: muteStates, - joinMatrixRTC: async (transport: LivekitTransport) => { + joinMatrixRTC: (transport: LivekitTransport) => { return enterRTCSession( matrixRTCSession, transport, @@ -1455,7 +1454,7 @@ export function createCallViewModel$( ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: localMembership.requestConnect, + join: localMembership.requestJoinAndPublish, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, @@ -1465,9 +1464,8 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === RTCBackendState.Error), - map((s) => s.error), + localMembership.localMemberState$.pipe( + filter((v) => v instanceof ElementCallError), ), null, ), diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts similarity index 74% rename from src/state/CallViewModel/localMember/LocalMembership.test.ts rename to src/state/CallViewModel/localMember/LocalMember.test.ts index 1ef7abd6..2f8d11a5 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details. */ import { - Status, + Status as RTCMemberStatus, type LivekitTransport, type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; @@ -15,11 +15,7 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { - ConnectionState as LivekitConnectionState, - type LocalParticipant, - type LocalTrack, -} from "livekit-client"; +import { type LocalParticipant, type LocalTrack } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -30,16 +26,19 @@ import { withTestScheduler, } from "../../../utils/test"; import { + TransportState, createLocalMembership$, enterRTCSession, - RTCBackendState, -} from "./LocalMembership"; + PublishState, + TrackState, +} from "./LocalMember"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; -import { type Connection } from "../remoteMembers/Connection"; +import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; +import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); @@ -200,21 +199,18 @@ describe("LocalMembership", () => { joinMatrixRTC: async (): Promise => {}, homeserverConnected: { combined$: constant(true), - rtsSession$: constant(Status.Connected), + rtsSession$: constant(RTCMemberStatus.Connected), }, }; it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const goodTransport = { - livekit_service_url: "other", - } as LivekitTransport; - - const localTransport$ = scope.behavior( + const localTransport$ = scope.behavior( hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - goodTransport, + null, ); + // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { transports$: scope.behavior( localTransport$.pipe(map((t) => new Epoch([t]))), @@ -230,15 +226,11 @@ describe("LocalMembership", () => { connectionManager: mockConnectionManager, localTransport$, }); + localMembership.requestJoinAndPublish(); - expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: RTCBackendState.WaitingForConnection }, - e: { - state: RTCBackendState.Error, - error: expect.toSatisfy( - (e) => e instanceof MatrixRTCTransportMissingError, - ), - }, + expectObservable(localMembership.localMemberState$).toBe("ne", { + n: TransportState.Waiting, + e: expect.toSatisfy((e) => e instanceof MatrixRTCTransportMissingError), }); }); }); @@ -250,32 +242,24 @@ describe("LocalMembership", () => { livekit_service_url: "b", } as LivekitTransport; - const connectionManagerData = new ConnectionManagerData(); - - connectionManagerData.add( - { - livekitRoom: mockLivekitRoom({ - localParticipant: { - isScreenShareEnabled: false, - trackPublications: [], - } as unknown as LocalParticipant, - }), - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: aTransport, - } as unknown as Connection, - [], - ); - connectionManagerData.add( - { - state$: constant({ - state: LivekitConnectionState.Connected, - }), - transport: bTransport, - } as unknown as Connection, - [], - ); + const connectionTransportAConnected = { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant(ConnectionState.LivekitConnected), + transport: aTransport, + } as unknown as Connection; + const connectionTransportAConnecting = { + ...connectionTransportAConnected, + state$: constant(ConnectionState.LivekitConnecting), + } as unknown as Connection; + const connectionTransportBConnected = { + state$: constant(ConnectionState.LivekitConnected), + transport: bTransport, + } as unknown as Connection; it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { const scope = new ObservableScope(); @@ -300,6 +284,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + connectionManagerData.add(connectionTransportBConnected, []); createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -359,6 +346,9 @@ describe("LocalMembership", () => { typeof vi.fn >; + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportAConnected, []); + // connectionManagerData.add(connectionTransportB, []); const localMembership = createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, @@ -385,10 +375,11 @@ describe("LocalMembership", () => { it("tracks livekit state correctly", async () => { const scope = new ObservableScope(); + const connectionManagerData = new ConnectionManagerData(); const localTransport$ = new BehaviorSubject(null); - const connectionManagerData$ = new BehaviorSubject< - Epoch - >(new Epoch(new ConnectionManagerData())); + const connectionManagerData$ = new BehaviorSubject( + new Epoch(connectionManagerData), + ); const publishers: Publisher[] = []; const tracks$ = new BehaviorSubject([]); @@ -434,19 +425,45 @@ describe("LocalMembership", () => { }); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForTransport, - }); + expect(localMembership.localMemberState$.value).toStrictEqual( + TransportState.Waiting, + ); localTransport$.next(aTransport); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingForConnection, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { connection: null, tracks: TrackState.WaitingForUser }, }); - connectionManagerData$.next(new Epoch(connectionManagerData)); + + const connectionManagerData2 = new ConnectionManagerData(); + connectionManagerData2.add( + // clone because we will mutate this later. + { ...connectionTransportAConnecting } as unknown as Connection, + [], + ); + + connectionManagerData$.next(new Epoch(connectionManagerData2)); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitConnectionState.Connected, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnecting, + tracks: TrackState.WaitingForUser, + }, }); + + ( + connectionManagerData2.getConnectionForTransport(aTransport)! + .state$ as BehaviorSubject + ).next(ConnectionState.LivekitConnected); + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + connection: ConnectionState.LivekitConnected, + tracks: TrackState.WaitingForUser, + }, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -455,37 +472,46 @@ describe("LocalMembership", () => { // ------- await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.CreatingTracks, + expect(localMembership.localMemberState$.value).toStrictEqual({ + matrix: RTCMemberStatus.Connected, + media: { + tracks: TrackState.Creating, + connection: ConnectionState.LivekitConnected, + }, }); createTrackResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ReadyToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.WaitingForUser); // ------- - localMembership.requestConnect(); + localMembership.requestJoinAndPublish(); // ------- - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingToPublish, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Starting); publishResolver.resolve(); await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); - expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + expect(localMembership.localMemberState$.isStopped).toBe(false); scope.end(); await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.ConnectedAndPublishing, - }); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (localMembership.localMemberState$.value as any).media, + ).toStrictEqual(PublishState.Publishing); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMember.ts similarity index 77% rename from src/state/CallViewModel/localMember/LocalMembership.ts rename to src/state/CallViewModel/localMember/LocalMember.ts index 6a31ce4b..e2fcc70e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -11,7 +11,6 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState as LivekitConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -36,62 +35,66 @@ import { import { type Logger } from "matrix-js-sdk/lib/logger"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { constant, type Behavior } from "../../Behavior"; -import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; -import { type ObservableScope } from "../../ObservableScope"; -import { type Publisher } from "./Publisher"; -import { type MuteStates } from "../../MuteStates"; +import { constant, type Behavior } from "../../Behavior.ts"; +import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts"; +import { type ObservableScope } from "../../ObservableScope.ts"; +import { type Publisher } from "./Publisher.ts"; +import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, MembershipManagerError, UnknownCallError, -} from "../../../utils/errors"; -import { ElementWidgetActions, widget } from "../../../widget"; +} from "../../../utils/errors.ts"; +import { ElementWidgetActions, widget } from "../../../widget.ts"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; import { - type ConnectionState, + ConnectionState, type Connection, + type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; -export enum RTCBackendState { - Error = "error", +export enum TransportState { /** Not even a transport is available to the LocalMembership */ - WaitingForTransport = "waiting_for_transport", - /** A connection appeared so we can initialise the publisher */ - WaitingForConnection = "waiting_for_connection", - /** Implies lk connection is connected */ - CreatingTracks = "creating_tracks", - /** Implies lk connection is connected */ - ReadyToPublish = "ready_to_publish", - /** Implies lk connection is connected */ - WaitingToPublish = "waiting_to_publish", - /** Implies lk connection is connected */ - ConnectedAndPublishing = "fully_connected", + Waiting = "transport_waiting", } -type LocalMemberRTCBackendState = - | { state: RTCBackendState.Error; error: ElementCallError } - | { state: Exclude } - | ConnectionState; - -export enum MatrixAdditionalState { - WaitingForTransport = "waiting_for_transport", +export enum PublishState { + WaitingForUser = "publish_waiting_for_user", + /** Implies lk connection is connected */ + Starting = "publish_start_publishing", + /** Implies lk connection is connected */ + Publishing = "publish_publishing", } -type LocalMemberMatrixState = - | { state: MatrixAdditionalState.WaitingForTransport } - | { state: "Error"; error: Error } - | { state: RTCSessionStatus }; - -export interface LocalMemberConnectionState { - livekit$: Behavior; - matrix$: Behavior; +export enum TrackState { + /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ + WaitingForUser = "tracks_waiting_for_user", + /** Implies lk connection is connected */ + Creating = "tracks_creating", + /** Implies lk connection is connected */ + Ready = "tracks_ready", } +export type LocalMemberMediaState = + | { + tracks: TrackState; + connection: ConnectionState | FailedToStartError; + } + | PublishState + | ElementCallError; +export type LocalMemberMatrixState = Error | RTCSessionStatus; +export type LocalMemberState = + | ElementCallError + | TransportState.Waiting + | { + media: LocalMemberMediaState; + matrix: LocalMemberMatrixState; + }; + /* * - get well known * - get oldest membership @@ -146,16 +149,16 @@ export const createLocalMembership$ = ({ matrixRTCSession, }: Props): { /** - * This starts audio and video tracks. They will be reused when calling `requestConnect`. + * This starts audio and video tracks. They will be reused when calling `requestPublish`. */ startTracks: () => Behavior; /** - * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. */ - requestConnect: () => void; + requestJoinAndPublish: () => void; requestDisconnect: () => void; - connectionState: LocalMemberConnectionState; + localMemberState$: Behavior; sharingScreen$: Behavior; /** * Callback to toggle screen sharing. If null, screen sharing is not possible. @@ -164,11 +167,11 @@ export const createLocalMembership$ = ({ tracks$: Behavior; participant$: Behavior; connection$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Reconnecting + /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting * Direct translation to the js-sdk membership manager connection `Status`. */ reconnecting$: Behavior; - /** Shorthand for connectionState.matrix.state === Status.Disconnected + /** Shorthand for homeserverConnected.rtcSession === Status.Disconnected * Direct translation to the js-sdk membership manager connection `Status`. */ disconnected$: Behavior; @@ -190,7 +193,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - setLivekitError(error); + setTransportError(error); return of(null); }), ), @@ -223,19 +226,13 @@ export const createLocalMembership$ = ({ // MATRIX RELATED - const reconnecting$ = scope.behavior( - homeserverConnected.rtsSession$.pipe( - map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), - ), - ); - // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. const trackStartRequested = Promise.withResolvers(); // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); + const joinAndPublishRequested$ = new BehaviorSubject(false); /** * The publisher is stored in here an abstracts creating and publishing tracks. @@ -256,13 +253,13 @@ export const createLocalMembership$ = ({ return tracks$; }; - const requestConnect = (): void => { + const requestJoinAndPublish = (): void => { trackStartRequested.resolve(); - connectRequested$.next(true); + joinAndPublishRequested$.next(true); }; const requestDisconnect = (): void => { - connectRequested$.next(false); + joinAndPublishRequested$.next(false); }; // Take care of the publisher$ @@ -300,112 +297,129 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), - async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldConnect) { + scope.behavior( + combineLatest([publisher$, tracks$, joinAndPublishRequested$]), + ), + async ([publisher, tracks, shouldJoinAndPublish]) => { + if (shouldJoinAndPublish === publisher?.publishing$.value) return; + if (tracks.length !== 0 && shouldJoinAndPublish) { try { await publisher?.startPublishing(); } catch (error) { - setLivekitError(error as ElementCallError); + setMediaError(error as ElementCallError); } - } else if (tracks.length !== 0 && !shouldConnect) { + } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setLivekitError(new UnknownCallError(error as Error)); + setMediaError(new UnknownCallError(error as Error)); } } }, ); - const fatalLivekitError$ = new BehaviorSubject(null); - const setLivekitError = (e: ElementCallError): void => { - if (fatalLivekitError$.value !== null) - logger.error("Multiple Livkit Errors:", e); - else fatalLivekitError$.next(e); + const fatalMediaError$ = new BehaviorSubject(null); + const setMediaError = (e: ElementCallError): void => { + if (fatalMediaError$.value !== null) + logger.error("Multiple Media Errors:", e); + else fatalMediaError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + + const fatalTransportError$ = new BehaviorSubject( + null, + ); + const setTransportError = (e: ElementCallError): void => { + if (fatalTransportError$.value !== null) + logger.error("Multiple Transport Errors:", e); + else fatalTransportError$.next(e); + }; + + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, - publisher$, localTransport$, - tracks$.pipe( - tap((t) => { - logger.info("tracks$: ", t); - }), - ), + tracks$, publishing$, - connectRequested$, + joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), startWith(false), ), - fatalLivekitError$, ]).pipe( map( ([ localConnectionState, - publisher, localTransport, tracks, publishing, - shouldConnect, + shouldPublish, shouldStartTracks, - error, ]) => { - // read this: - // if(!) return {state: ...} - // if(!) return {state: } - // - // as: - // We do have but not yet so we are in - if (error !== null) return { state: RTCBackendState.Error, error }; + if (!localTransport) return null; const hasTracks = tracks.length > 0; - if (!localTransport) - return { state: RTCBackendState.WaitingForTransport }; - if (!localConnectionState) - return { state: RTCBackendState.WaitingForConnection }; + let trackState: TrackState = TrackState.WaitingForUser; + if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; + if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + if ( - localConnectionState.state !== LivekitConnectionState.Connected || - !publisher + localConnectionState !== ConnectionState.LivekitConnected || + trackState !== TrackState.Ready ) - // pass through the localConnectionState while we do not yet have a publisher or the state - // of the connection is not yet connected - return { state: localConnectionState.state }; - if (!shouldStartTracks) - return { state: LivekitConnectionState.Connected }; - if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; - if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; - if (!publishing) return { state: RTCBackendState.WaitingToPublish }; - return { state: RTCBackendState.ConnectedAndPublishing }; + return { + connection: localConnectionState, + tracks: trackState, + }; + if (!shouldPublish) return PublishState.WaitingForUser; + if (!publishing) return PublishState.Starting; + return PublishState.Publishing; }, ), distinctUntilChanged(deepCompare), ), ); - const fatalMatrixError$ = new BehaviorSubject(null); const setMatrixError = (e: ElementCallError): void => { if (fatalMatrixError$.value !== null) logger.error("Multiple Matrix Errors:", e); else fatalMatrixError$.next(e); }; - const matrixState$: Behavior = scope.behavior( - combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( - map(([localTransport, rtcSessionStatus]) => { - if (!localTransport) - return { state: MatrixAdditionalState.WaitingForTransport }; - return { state: rtcSessionStatus }; - }), + + const localMemberState$ = scope.behavior( + combineLatest([ + mediaState$, + homeserverConnected.rtsSession$, + fatalMatrixError$, + fatalTransportError$, + fatalMediaError$, + ]).pipe( + map( + ([ + mediaState, + rtcSessionStatus, + matrixError, + transportError, + mediaError, + ]) => { + if (transportError !== null) return transportError; + // `mediaState` will be 'null' until the transport appears. + if (mediaState && rtcSessionStatus) + return { + matrix: matrixError ?? rtcSessionStatus, + media: mediaError ?? mediaState, + }; + else { + return TransportState.Waiting; + } + }, + ), ), ); // inform the widget about the connect and disconnect intent from the user. scope - .behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ + .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ undefined, - connectRequested$.value, + joinAndPublishRequested$.value, ]) .subscribe(([prev, current]) => { if (!widget) return; @@ -434,7 +448,7 @@ export const createLocalMembership$ = ({ // Keep matrix rtc session in sync with localTransport$, connectRequested$ scope.reconcile( - scope.behavior(combineLatest([localTransport$, connectRequested$])), + scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])), async ([transport, shouldConnect]) => { if (!transport) return; // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. @@ -555,21 +569,19 @@ export const createLocalMembership$ = ({ return { startTracks, - requestConnect, + requestJoinAndPublish, requestDisconnect, - connectionState: { - livekit$: livekitState$, - matrix$: matrixState$, - }, + localMemberState$, tracks$, participant$, - reconnecting$, + reconnecting$: scope.behavior( + homeserverConnected.rtsSession$.pipe( + map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting), + ), + ), disconnected$: scope.behavior( - matrixState$.pipe( - map( - (sessionStatus) => - sessionStatus.state === RTCSessionStatus.Disconnected, - ), + homeserverConnected.rtsSession$.pipe( + map((state) => state === RTCSessionStatus.Disconnected), ), ), sharingScreen$, diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 5468d1ff..6d27c042 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -52,9 +52,7 @@ describe("Publisher", () => { } as unknown as MuteStates; scope = new ObservableScope(); connection = { - state$: constant({ - state: LivekitConenctionState.Connected, - }), + state$: constant(LivekitConenctionState.Connected), livekitRoom: mockLivekitRoom({ localParticipant: mockLocalParticipant({}), }), @@ -110,15 +108,14 @@ describe("Publisher", () => { // failiour due to connection.state$ const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next({ - state: "FailedToStart", - error: Error("testStartError"), - }); + (connection.state$ as BehaviorSubject).next(Error("testStartError")); await expect(publisher.startPublishing()).rejects.toThrow( new FailToStartLivekitConnection("testStartError"), ); - (connection.state$ as BehaviorSubject).next(beforeState); + (connection.state$ as BehaviorSubject).next( + beforeState, + ); // does not try other conenction after the first one failed expect( diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 6e4a9b35..b32e7e99 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,7 +32,10 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { type Connection } from "../remoteMembers/Connection.ts"; +import { + ConnectionState, + type Connection, +} from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { ElementCallError, @@ -158,20 +161,17 @@ export class Publisher { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((s) => { - switch (s.state) { - case LivekitConnectionState.Connected: - resolve(); - break; - case "FailedToStart": - reject( - s.error instanceof ElementCallError - ? s.error - : new FailToStartLivekitConnection(s.error.message), - ); - break; - default: - this.logger.info("waiting for connection: ", s.state); + const sub = this.connection.state$.subscribe((state) => { + if (state instanceof Error) { + const error = + state instanceof ElementCallError + ? state + : new FailToStartLivekitConnection(state.message); + reject(error); + } else if (state === ConnectionState.LivekitConnected) { + resolve(); + } else { + this.logger.info("waiting for connection: ", state); } }); try { diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index efee1ccb..a90f0aa2 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -30,8 +30,8 @@ import { logger } from "matrix-js-sdk/lib/logger"; import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { Connection, + ConnectionState, type ConnectionOpts, - type ConnectionState, type PublishingParticipant, } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; @@ -151,7 +151,7 @@ describe("Start connection states", () => { }; const connection = new Connection(opts, logger); - expect(connection.state$.getValue().state).toEqual("Initialized"); + expect(connection.state$.getValue()).toEqual("Initialized"); }); it("fail to getOpenId token then error state", async () => { @@ -167,7 +167,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -187,22 +187,20 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState!.state).toEqual("FetchingConfig"); + expect(capturedState!).toEqual("FetchingConfig"); deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token"))); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState!.state === "FailedToStart") { - expect(capturedState!.error.message).toEqual("Something went wrong"); + if (capturedState instanceof Error) { + expect(capturedState.message).toEqual("Something went wrong"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -219,7 +217,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -241,24 +239,22 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState?.state === "FailedToStart") { - expect(capturedState?.error.message).toContain( + if (capturedState instanceof Error) { + expect(capturedState.message).toContain( "SFU Config fetch failed with exception Error", ); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { - expect.fail( - "Expected FailedToStart state but got " + capturedState?.state, - ); + expect.fail("Expected FailedToStart state but got " + capturedState); } }); @@ -275,7 +271,7 @@ describe("Start connection states", () => { const connection = new Connection(opts, logger); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -305,17 +301,15 @@ describe("Start connection states", () => { let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); - expect(capturedState?.state).toEqual("FetchingConfig"); + expect(capturedState).toEqual(ConnectionState.FetchingConfig); deferredSFU.resolve(); await vi.runAllTimersAsync(); capturedState = capturedStates.pop(); - if (capturedState && capturedState.state === "FailedToStart") { - expect(capturedState.error.message).toContain( - "Failed to connect to livekit", - ); + if (capturedState instanceof Error) { + expect(capturedState.message).toContain("Failed to connect to livekit"); expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); @@ -332,7 +326,7 @@ describe("Start connection states", () => { const connection = setupRemoteConnection(); - const capturedStates: ConnectionState[] = []; + const capturedStates: (ConnectionState | Error)[] = []; const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); @@ -342,13 +336,13 @@ describe("Start connection states", () => { await vi.runAllTimersAsync(); const initialState = capturedStates.shift(); - expect(initialState?.state).toEqual("Initialized"); + expect(initialState).toEqual(ConnectionState.Initialized); const fetchingState = capturedStates.shift(); - expect(fetchingState?.state).toEqual("FetchingConfig"); + expect(fetchingState).toEqual(ConnectionState.FetchingConfig); const connectingState = capturedStates.shift(); - expect(connectingState?.state).toEqual("ConnectingToLkRoom"); + expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom); const connectedState = capturedStates.shift(); - expect(connectedState?.state).toEqual("connected"); + expect(connectedState).toEqual(ConnectionState.LivekitConnected); }); it("shutting down the scope should stop the connection", async () => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 549777f9..29ad7a8c 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -12,7 +12,6 @@ import { } from "@livekit/components-core"; import { ConnectionError, - ConnectionState as LivekitConnectionState, type Room as LivekitRoom, type LocalParticipant, type RemoteParticipant, @@ -55,14 +54,21 @@ export class FailedToStartError extends Error { } export enum ConnectionState { + /** The start state of a connection. It has been created but nothing has loaded yet. */ Initialized = "Initialized", + /** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */ FetchingConfig = "FetchingConfig", Stopped = "Stopped", ConnectingToLkRoom = "ConnectingToLkRoom", + /** The same as ConnectionState.Disconnected from `livekit-client` */ LivekitDisconnected = "disconnected", + /** The same as ConnectionState.Connecting from `livekit-client` */ LivekitConnecting = "connecting", + /** The same as ConnectionState.Connected from `livekit-client` */ LivekitConnected = "connected", + /** The same as ConnectionState.Reconnecting from `livekit-client` */ LivekitReconnecting = "reconnecting", + /** The same as ConnectionState.SignalReconnecting from `livekit-client` */ LivekitSignalReconnecting = "signalReconnecting", } @@ -73,15 +79,14 @@ export enum ConnectionState { */ export class Connection { // Private Behavior - private readonly _state$ = new BehaviorSubject< - ConnectionState | FailedToStartError - >(ConnectionState.Initialized); + private readonly _state$ = new BehaviorSubject( + ConnectionState.Initialized, + ); /** * The current state of the connection to the media transport. */ - public readonly state$: Behavior = - this._state$; + public readonly state$: Behavior = this._state$; /** * The media transport to connect to. @@ -161,15 +166,12 @@ export class Connection { connectionStateObserver(this.livekitRoom) .pipe(this.scope.bind()) .subscribe((lkState) => { - // It si save to cast lkState to ConnectionState as they are fully overlapping. + // It is save to cast lkState to ConnectionState as they are fully overlapping. this._state$.next(lkState as unknown as ConnectionState); }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); - this._state$.next({ - state: "FailedToStart", - error: error instanceof Error ? error : new Error(`${error}`), - }); + this._state$.next(error instanceof Error ? error : new Error(`${error}`)); throw error; } } @@ -194,9 +196,7 @@ export class Connection { ); if (this.stopped) return; await this.livekitRoom.disconnect(); - this._state$.next({ - state: ConnectionAdditionalState.Stopped, - }); + this._state$.next(ConnectionState.Stopped); this.stopped = true; } diff --git a/yarn.lock b/yarn.lock index 94b73130..f0ca83a7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10353,8 +10353,8 @@ __metadata: linkType: hard "matrix-js-sdk@npm:^39.2.0": - version: 39.2.0 - resolution: "matrix-js-sdk@npm:39.2.0" + version: 39.3.0 + resolution: "matrix-js-sdk@npm:39.3.0" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" @@ -10370,7 +10370,7 @@ __metadata: sdp-transform: "npm:^3.0.0" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 + checksum: 10c0/031c9ec042e00c32dc531f82fc59c64cc25fb665abfc642b1f0765c530d60684f8bd63daf0cdd0dbe96b4f87ea3f4148f9d3f024a59d57eceaec1ce5d0164755 languageName: node linkType: hard