diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 7e3a5bdf..b6327cfa 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -122,11 +122,12 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { Connection, PublishConnection } from "./Connection"; +import { Connection } from "./Connection"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; +import { PublishConnection } from "./PublishConnection.ts"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; diff --git a/src/state/Connection.ts b/src/state/Connection.ts index db456ba0..f725ddda 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -5,55 +5,50 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - connectedParticipantsObserver, - connectionStateObserver, -} from "@livekit/components-core"; -import { - ConnectionState, - Room as LivekitRoom, - type E2EEOptions, - Track, - LocalVideoTrack, -} from "livekit-client"; +import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; +import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; -import { - type LivekitFocus, - type CallMembership, -} from "matrix-js-sdk/lib/matrixrtc"; -import { - combineLatest, - map, - NEVER, - type Observable, - type Subscription, - switchMap, -} from "rxjs"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; +import { combineLatest } from "rxjs"; -import { type SelectedDevice, type MediaDevices } from "./MediaDevices"; import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; import { type ObservableScope } from "./ObservableScope"; import { defaultLiveKitOptions } from "../livekit/options"; -import { getValue } from "../utils/observable"; -import { getUrlParams } from "../UrlParams"; -import { type MuteStates } from "./MuteStates"; -import { - type ProcessorState, - trackProcessorSync, -} from "../livekit/TrackProcessorContext"; -import { observeTrackReference$ } from "./MediaViewModel"; +/** + * A connection to a Matrix RTC LiveKit backend. + * + * Expose observables for participants and connection state. + */ export class Connection { + + /** + * Whether the connection has been stopped. + * @see Connection.stop + * */ protected stopped = false; + /** + * Starts the connection. + * + * This will: + * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) + * 2. Use this token to request the SFU config to the MatrixRtc authentication service. + * 3. Connect to the configured LiveKit room. + */ public async start(): Promise { this.stopped = false; const { url, jwt } = await this.sfuConfig; if (!this.stopped) await this.livekitRoom.connect(url, jwt); } + /** + * Stops the connection. + * + * This will disconnect from the LiveKit room. + * If the connection is already stopped, this is a no-op. + */ public stop(): void { if (this.stopped) return; void this.livekitRoom.disconnect(); @@ -63,16 +58,47 @@ export class Connection { protected readonly sfuConfig = getSFUConfigWithOpenID( this.client, this.focus.livekit_service_url, - this.livekitAlias, + this.focus.livekit_alias ); - public readonly participantsIncludingSubscribers$; + /* + * An observable of the participants in the livekit room, including subscribers. + * Converts the livekit room events ParticipantConnected/ParticipantDisconnected/StateChange to an observable. + */ + protected readonly participantsIncludingSubscribers$; + + /** + * An observable of the participants that are publishing on this connection. + * This is derived from `participantsIncludingSubscribers$` and `membershipsFocusMap$`. + * It filters the participants to only those that are associated with a membership that claims to publish on this connection. + */ public readonly publishingParticipants$; + + /** + * The LiveKit room instance. + */ public readonly livekitRoom: LivekitRoom; + /** + * An observable of the livekit connection state. + * Converts the livekit room events StateChange to an observable. + */ public connectionState$: Behavior; + + /** + * Creates a new connection to a matrix RTC LiveKit backend. + * + * @param livekitRoom - Optional LiveKit room instance to use. If not provided, a new instance will be created. + * @param focus - The focus server to connect to. + * @param livekitAlias - The livekit alias to use when connecting to the focus server. TODO duplicate of focus? + * @param client - The matrix client, used to fetch the OpenId token. TODO refactor to avoid passing the whole client + * @param scope - The observable scope to use for creating observables. + * @param membershipsFocusMap$ - The observable of the current call RTC memberships and their associated focus. + * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. TODO refactor to avoid passing the whole options? + */ public constructor( protected readonly focus: LivekitFocus, + // TODO : remove livekitAlias, it's already in focus? protected readonly livekitAlias: string, protected readonly client: MatrixClient, protected readonly scope: ObservableScope, @@ -80,17 +106,17 @@ export class Connection { { membership: CallMembership; focus: LivekitFocus }[] >, e2eeLivekitOptions: E2EEOptions | undefined, - livekitRoom: LivekitRoom | undefined = undefined, + livekitRoom: LivekitRoom | undefined = undefined ) { this.livekitRoom = livekitRoom ?? new LivekitRoom({ ...defaultLiveKitOptions, - e2ee: e2eeLivekitOptions, + e2ee: e2eeLivekitOptions }); this.participantsIncludingSubscribers$ = this.scope.behavior( connectedParticipantsObserver(this.livekitRoom), - [], + [] ); this.publishingParticipants$ = this.scope.behavior( @@ -102,193 +128,24 @@ export class Connection { .flatMap(({ membership, focus }) => focus.livekit_service_url === this.focus.livekit_service_url ? [membership] - : [], + : [] ) // Find all associated publishing livekit participant objects .flatMap((membership) => { const participant = participants.find( (p) => - p.identity === `${membership.sender}:${membership.deviceId}`, + p.identity === `${membership.sender}:${membership.deviceId}` ); return participant ? [{ participant, membership }] : []; - }), + }) ), - [], + [] ); this.connectionState$ = this.scope.behavior( - connectionStateObserver(this.livekitRoom), + connectionStateObserver(this.livekitRoom) ); this.scope.onEnd(() => this.stop()); } } -export class PublishConnection extends Connection { - public async start(): Promise { - this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); - - if (!this.stopped) { - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: this.muteStates.audio.enabled$.value, - video: this.muteStates.video.enabled$.value, - }); - for (const track of tracks) { - await this.livekitRoom.localParticipant.publishTrack(track); - } - } - } - - public constructor( - focus: LivekitFocus, - livekitAlias: string, - client: MatrixClient, - scope: ObservableScope, - membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] - >, - devices: MediaDevices, - private readonly muteStates: MuteStates, - e2eeLivekitOptions: E2EEOptions | undefined, - trackerProcessorState$: Behavior, - ) { - logger.info("[LivekitRoom] Create LiveKit room"); - const { controlledAudioDevices } = getUrlParams(); - - const room = new LivekitRoom({ - ...defaultLiveKitOptions, - videoCaptureDefaults: { - ...defaultLiveKitOptions.videoCaptureDefaults, - deviceId: devices.videoInput.selected$.value?.id, - processor: trackerProcessorState$.value.processor, - }, - audioCaptureDefaults: { - ...defaultLiveKitOptions.audioCaptureDefaults, - deviceId: devices.audioInput.selected$.value?.id, - }, - audioOutput: { - // When using controlled audio devices, we don't want to set the - // deviceId here, because it will be set by the native app. - // (also the id does not need to match a browser device id) - deviceId: controlledAudioDevices - ? undefined - : getValue(devices.audioOutput.selected$)?.id, - }, - e2ee: e2eeLivekitOptions, - }); - room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => { - logger.error("Failed to set E2EE enabled on room", e); - }); - - super( - focus, - livekitAlias, - client, - scope, - membershipsFocusMap$, - e2eeLivekitOptions, - room, - ); - - // Setup track processor syncing (blur) - const track$ = this.scope.behavior( - observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( - map((trackRef) => { - const track = trackRef?.publication?.track; - return track instanceof LocalVideoTrack ? track : null; - }), - ), - ); - trackProcessorSync(track$, trackerProcessorState$); - - this.muteStates.audio.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit audio input mute state", e); - } - return this.livekitRoom.localParticipant.isMicrophoneEnabled; - }); - this.muteStates.video.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setCameraEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit video input mute state", e); - } - return this.livekitRoom.localParticipant.isCameraEnabled; - }); - this.scope.onEnd(() => { - this.muteStates.audio.unsetHandler(); - this.muteStates.video.unsetHandler(); - }); - - const syncDevice = ( - kind: MediaDeviceKind, - selected$: Observable, - ): Subscription => - selected$.pipe(this.scope.bind()).subscribe((device) => { - if (this.connectionState$.value !== ConnectionState.Connected) return; - logger.info( - "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", - this.livekitRoom.getActiveDevice(kind), - " !== ", - device?.id, - ); - if ( - device !== undefined && - this.livekitRoom.getActiveDevice(kind) !== device.id - ) { - this.livekitRoom - .switchActiveDevice(kind, device.id) - .catch((e) => - logger.error(`Failed to sync ${kind} device with LiveKit`, e), - ); - } - }); - - syncDevice("audioinput", devices.audioInput.selected$); - if (!controlledAudioDevices) - syncDevice("audiooutput", devices.audioOutput.selected$); - syncDevice("videoinput", devices.videoInput.selected$); - // Restart the audio input track whenever we detect that the active media - // device has changed to refer to a different hardware device. We do this - // for the sake of Chrome, which provides a "default" device that is meant - // to match the system's default audio input, whatever that may be. - // This is special-cased for only audio inputs because we need to dig around - // in the LocalParticipant object for the track object and there's not a nice - // way to do that generically. There is usually no OS-level default video capture - // device anyway, and audio outputs work differently. - devices.audioInput.selected$ - .pipe( - switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), - this.scope.bind(), - ) - .subscribe(() => { - if (this.connectionState$.value !== ConnectionState.Connected) return; - const activeMicTrack = Array.from( - this.livekitRoom.localParticipant.audioTrackPublications.values(), - ).find((d) => d.source === Track.Source.Microphone)?.track; - - if ( - activeMicTrack && - // only restart if the stream is still running: LiveKit will detect - // when a track stops & restart appropriately, so this is not our job. - // Plus, we need to avoid restarting again if the track is already in - // the process of being restarted. - activeMicTrack.mediaStreamTrack.readyState !== "ended" - ) { - // Restart the track, which will cause Livekit to do another - // getUserMedia() call with deviceId: default to get the *new* default device. - // Note that room.switchActiveDevice() won't work: Livekit will ignore it because - // the deviceId hasn't changed (was & still is default). - this.livekitRoom.localParticipant - .getTrackPublication(Track.Source.Microphone) - ?.audioTrack?.restartTrack() - .catch((e) => { - logger.error(`Failed to restart audio device track`, e); - }); - } - }); - } -} diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts new file mode 100644 index 00000000..532be26c --- /dev/null +++ b/src/state/PublishConnection.ts @@ -0,0 +1,224 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ +import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client"; +import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; + +import type { CallMembership, LivekitFocus } from "../../../matrix-js-sdk/lib/matrixrtc"; +import type { MatrixClient } from "../../../matrix-js-sdk"; +import type { ObservableScope } from "./ObservableScope.ts"; +import type { Behavior } from "./Behavior.ts"; +import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; +import type { MuteStates } from "./MuteStates.ts"; +import { type ProcessorState, trackProcessorSync } from "../livekit/TrackProcessorContext.tsx"; +import { logger } from "../../../matrix-js-sdk/lib/logger"; +import { getUrlParams } from "../UrlParams.ts"; +import { defaultLiveKitOptions } from "../livekit/options.ts"; +import { getValue } from "../utils/observable.ts"; +import { observeTrackReference$ } from "./MediaViewModel.ts"; +import { Connection } from "./Connection.ts"; + +/** + * A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to. + * This connection will publish the local user's audio and video tracks. + */ +export class PublishConnection extends Connection { + + + /** + * Start the connection to LiveKit and publish local tracks. + * + * This will: + * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) + * 2. Use this token to request the SFU config to the MatrixRtc authentication service. + * 3. Connect to the configured LiveKit room. + * 4. Create local audio and video tracks based on the current mute states and publish them to the room. + */ + public async start(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + + if (!this.stopped) { + // TODO this can throw errors? It will also prompt for permissions if not already granted + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio: this.muteStates.audio.enabled$.value, + video: this.muteStates.video.enabled$.value + }); + for (const track of tracks) { + // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally + // with a timeout. + await this.livekitRoom.localParticipant.publishTrack(track); + // TODO: check if the connection is still active? and break the loop if not? + } + } + }; + + + /** + * Creates a new PublishConnection. + * @param focus - The Livekit focus object containing the configuration for the connection. + * @param livekitAlias - TODO: remove, use focus.livekit_alias instead + * @param client - The Matrix client to use for authentication. TODO: remove only pick OpenIDClientParts + * @param scope - The observable scope to use for managing subscriptions. + * @param membershipsFocusMap$ - An observable of the current RTC call memberships and their associated focus. + * @param devices - The media devices to use for audio and video input. + * @param muteStates - The mute states for audio and video. + * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. + * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). + */ + public constructor( + focus: LivekitFocus, + livekitAlias: string, + client: MatrixClient, + scope: ObservableScope, + membershipsFocusMap$: Behavior< + { membership: CallMembership; focus: LivekitFocus }[] + >, + devices: MediaDevices, + private readonly muteStates: MuteStates, + e2eeLivekitOptions: E2EEOptions | undefined, + trackerProcessorState$: Behavior + ) { + logger.info("[LivekitRoom] Create LiveKit room"); + const { controlledAudioDevices } = getUrlParams(); + + const room = new LivekitRoom({ + ...defaultLiveKitOptions, + videoCaptureDefaults: { + ...defaultLiveKitOptions.videoCaptureDefaults, + deviceId: devices.videoInput.selected$.value?.id, + processor: trackerProcessorState$.value.processor + }, + audioCaptureDefaults: { + ...defaultLiveKitOptions.audioCaptureDefaults, + deviceId: devices.audioInput.selected$.value?.id + }, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : getValue(devices.audioOutput.selected$)?.id + }, + e2ee: e2eeLivekitOptions + }); + room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => { + logger.error("Failed to set E2EE enabled on room", e); + }); + + super( + focus, + livekitAlias, + client, + scope, + membershipsFocusMap$, + e2eeLivekitOptions, + room + ); + + // Setup track processor syncing (blur) + const track$ = this.scope.behavior( + observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( + map((trackRef) => { + const track = trackRef?.publication?.track; + return track instanceof LocalVideoTrack ? track : null; + }) + ) + ); + trackProcessorSync(track$, trackerProcessorState$); + + this.muteStates.audio.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit audio input mute state", e); + } + return this.livekitRoom.localParticipant.isMicrophoneEnabled; + }); + this.muteStates.video.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setCameraEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit video input mute state", e); + } + return this.livekitRoom.localParticipant.isCameraEnabled; + }); + this.scope.onEnd(() => { + this.muteStates.audio.unsetHandler(); + this.muteStates.video.unsetHandler(); + }); + + const syncDevice = ( + kind: MediaDeviceKind, + selected$: Observable + ): Subscription => + selected$.pipe(this.scope.bind()).subscribe((device) => { + if (this.connectionState$.value !== ConnectionState.Connected) return; + logger.info( + "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", + this.livekitRoom.getActiveDevice(kind), + " !== ", + device?.id + ); + if ( + device !== undefined && + this.livekitRoom.getActiveDevice(kind) !== device.id + ) { + this.livekitRoom + .switchActiveDevice(kind, device.id) + .catch((e) => + logger.error(`Failed to sync ${kind} device with LiveKit`, e) + ); + } + }); + + syncDevice("audioinput", devices.audioInput.selected$); + if (!controlledAudioDevices) + syncDevice("audiooutput", devices.audioOutput.selected$); + syncDevice("videoinput", devices.videoInput.selected$); + // Restart the audio input track whenever we detect that the active media + // device has changed to refer to a different hardware device. We do this + // for the sake of Chrome, which provides a "default" device that is meant + // to match the system's default audio input, whatever that may be. + // This is special-cased for only audio inputs because we need to dig around + // in the LocalParticipant object for the track object and there's not a nice + // way to do that generically. There is usually no OS-level default video capture + // device anyway, and audio outputs work differently. + devices.audioInput.selected$ + .pipe( + switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), + this.scope.bind() + ) + .subscribe(() => { + if (this.connectionState$.value !== ConnectionState.Connected) return; + const activeMicTrack = Array.from( + this.livekitRoom.localParticipant.audioTrackPublications.values() + ).find((d) => d.source === Track.Source.Microphone)?.track; + + if ( + activeMicTrack && + // only restart if the stream is still running: LiveKit will detect + // when a track stops & restart appropriately, so this is not our job. + // Plus, we need to avoid restarting again if the track is already in + // the process of being restarted. + activeMicTrack.mediaStreamTrack.readyState !== "ended" + ) { + // Restart the track, which will cause Livekit to do another + // getUserMedia() call with deviceId: default to get the *new* default device. + // Note that room.switchActiveDevice() won't work: Livekit will ignore it because + // the deviceId hasn't changed (was & still is default). + this.livekitRoom.localParticipant + .getTrackPublication(Track.Source.Microphone) + ?.audioTrack?.restartTrack() + .catch((e) => { + logger.error(`Failed to restart audio device track`, e); + }); + } + }); + } +}