diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6b2ee35a..cac4322e 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -470,12 +470,22 @@ export class CallViewModel extends ViewModel { } ); - public readonly livekitConnectionState$ = this.scope.behavior( - combineLatest([this.localConnection]).pipe( - switchMap(([c]) => c.connectionState$), - startWith(ConnectionState.Disconnected), - ), - ); + public readonly livekitConnectionState$ = + this.scope.behavior( + from(this.localConnection).pipe( + switchMap((c) => + c.focusedConnectionState$.pipe( + map((s) => { + if (s.state === "ConnectedToLkRoom") return s.connectionState; + return ConnectionState.Disconnected + }), + distinctUntilChanged(), + ), + ), + startWith(ConnectionState.Disconnected), + ), + ) + /** * The MatrixRTC session participants. diff --git a/src/state/Connection.ts b/src/state/Connection.ts index bc352adf..1e081b06 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -6,9 +6,9 @@ Please see LICENSE in the repository root for full details. */ import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; -import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client"; +import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client"; import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest } from "rxjs"; +import { BehaviorSubject, combineLatest } from "rxjs"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; @@ -24,7 +24,20 @@ export interface ConnectionOpts { scope: ObservableScope; /** An observable of the current RTC call memberships and their associated focus. */ membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>; + + /** Optional factory to create the Livekit room, mainly for testing purposes. */ + livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; } + +export type FocusConnectionState = + | { state: 'Initialized' } + | { state: 'FetchingConfig', focus: LivekitFocus } + | { state: 'ConnectingToLkRoom', focus: LivekitFocus } + | { state: 'PublishingTracks', focus: LivekitFocus } + | { state: 'FailedToStart', error: Error, focus: LivekitFocus } + | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitFocus } + | { state: 'Stopped', focus: LivekitFocus }; + /** * A connection to a Matrix RTC LiveKit backend. * @@ -32,6 +45,15 @@ export interface ConnectionOpts { */ export class Connection { + // Private Behavior + private readonly _focusedConnectionState$ = new BehaviorSubject({ state: 'Initialized' }); + + /** + * The current state of the connection to the focus server. + */ + public get focusedConnectionState$(): Behavior { + return this._focusedConnectionState$; + } /** * Whether the connection has been stopped. * @see Connection.stop @@ -48,10 +70,23 @@ export class Connection { */ public async start(): Promise { this.stopped = false; - // TODO could this be loaded earlier to save time? - const { url, jwt } = await this.getSFUConfigWithOpenID(); + try { + this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.targetFocus }); + // TODO could this be loaded earlier to save time? + const { url, jwt } = await this.getSFUConfigWithOpenID(); + // If we were stopped while fetching the config, don't proceed to connect + if (this.stopped) return; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); + this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.targetFocus }); + await this.livekitRoom.connect(url, jwt); + // If we were stopped while connecting, don't proceed to update state. + if (this.stopped) return; + + this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.targetFocus, connectionState: this.livekitRoom.state }); + } catch (error) { + this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.targetFocus }); + throw error; + } } @@ -71,6 +106,7 @@ export class Connection { public stop(): void { if (this.stopped) return; void this.livekitRoom.disconnect(); + this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus }); this.stopped = true; } @@ -87,13 +123,6 @@ export class Connection { */ protected readonly targetFocus: LivekitFocus; - /** - * An observable of the livekit connection state. - * Converts the livekit room events StateChange to an observable. - */ - public connectionState$: Behavior; - - private readonly client: OpenIDClientParts; /** * Creates a new connection to a matrix RTC LiveKit backend. @@ -140,9 +169,16 @@ export class Connection { ), [] ); - this.connectionState$ = scope.behavior( + + scope.behavior( connectionStateObserver(this.livekitRoom) - ); + ).subscribe((connectionState) => { + const current = this.focusedConnectionState$.value; + // Only update the state if we are already connected to the LiveKit room. + if (current.state === 'ConnectedToLkRoom') { + this.focusedConnectionState$.next({ state: 'ConnectedToLkRoom', connectionState, focus: current.focus }); + } + }); scope.onEnd(() => this.stop()); } @@ -162,7 +198,8 @@ export class RemoteConnection extends Connection { * @param sharedE2eeOption - The shared E2EE options to use for the connection. */ public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) { - const livekitRoom = new LivekitRoom({ + const factory = opts.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); + const livekitRoom = factory({ ...defaultLiveKitOptions, e2ee: sharedE2eeOption }); diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts index 724c6c5f..c7b9c6aa 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/PublishConnection.ts @@ -4,7 +4,7 @@ Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client"; +import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, type RoomOptions, Track } from "livekit-client"; import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; import type { Behavior } from "./Behavior.ts"; @@ -39,18 +39,20 @@ export class PublishConnection extends Connection { await super.start() - if (!this.stopped) { - // TODO this can throw errors? It will also prompt for permissions if not already granted - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: this.muteStates.audio.enabled$.value, - video: this.muteStates.video.enabled$.value - }); - for (const track of tracks) { - // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally - // with a timeout. - await this.livekitRoom.localParticipant.publishTrack(track); - // TODO: check if the connection is still active? and break the loop if not? - } + if (this.stopped) return; + + // TODO this can throw errors? It will also prompt for permissions if not already granted + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio: this.muteStates.audio.enabled$.value, + video: this.muteStates.video.enabled$.value + }); + if (this.stopped) return; + for (const track of tracks) { + // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally + // with a timeout. + await this.livekitRoom.localParticipant.publishTrack(track); + if (this.stopped) return; + // TODO: check if the connection is still active? and break the loop if not? } }; @@ -74,7 +76,8 @@ export class PublishConnection extends Connection { logger.info("[LivekitRoom] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); - const room = new LivekitRoom({ + const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); + const room = factory({ ...defaultLiveKitOptions, videoCaptureDefaults: { ...defaultLiveKitOptions.videoCaptureDefaults, @@ -99,17 +102,7 @@ export class PublishConnection extends Connection { logger.error("Failed to set E2EE enabled on room", e); }); - super( - room, - args, - // focus, - // livekitAlias, - // client, - // scope, - // membershipsFocusMap$, - // e2eeLivekitOptions, - // room - ); + super(room, args); // Setup track processor syncing (blur) const track$ = scope.behavior( @@ -148,7 +141,8 @@ export class PublishConnection extends Connection { selected$: Observable ): Subscription => selected$.pipe(scope.bind()).subscribe((device) => { - if (this.connectionState$.value !== ConnectionState.Connected) return; + if (this.livekitRoom.state != ConnectionState.Connected) return; + // if (this.connectionState$.value !== ConnectionState.Connected) return; logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", this.livekitRoom.getActiveDevice(kind), @@ -185,7 +179,7 @@ export class PublishConnection extends Connection { scope.bind() ) .subscribe(() => { - if (this.connectionState$.value !== ConnectionState.Connected) return; + if (this.livekitRoom.state != ConnectionState.Connected) return; const activeMicTrack = Array.from( this.livekitRoom.localParticipant.audioTrackPublications.values() ).find((d) => d.source === Track.Source.Microphone)?.track;