diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index db2c0f2a..57873b40 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -31,7 +31,6 @@ import { VolumeOnSolidIcon, } from "@vector-im/compound-design-tokens/assets/web/icons"; import { useTranslation } from "react-i18next"; -import { ConnectionState } from "livekit-client"; import LogoMark from "../icons/LogoMark.svg?react"; import LogoType from "../icons/LogoType.svg?react"; diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index b6327cfa..6b2ee35a 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -122,7 +122,7 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { Connection } from "./Connection"; +import { type Connection, type ConnectionOpts, RemoteConnection } from "./Connection"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; @@ -453,18 +453,21 @@ export class CallViewModel extends ViewModel { private readonly localFocus = makeFocus(this.matrixRTCSession); private readonly localConnection = this.localFocus.then( - (focus) => - new PublishConnection( + (focus) => { + const args: ConnectionOpts = { focus, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, + client: this.matrixRTCSession.room.client, + scope: this.scope, + membershipsFocusMap$: this.membershipsAndFocusMap$, + } + return new PublishConnection( + args, this.mediaDevices, this.muteStates, this.e2eeLivekitOptions(), this.scope.behavior(this.trackProcessorState$), - ), + ) + } ); public readonly livekitConnectionState$ = this.scope.behavior( @@ -521,18 +524,17 @@ export class CallViewModel extends ViewModel { "SFU remoteConnections$ construct new connection: ", focusUrl, ); - nextConnection = new Connection( - { + const args: ConnectionOpts = { + focus: { + type: "livekit", livekit_service_url: focusUrl, livekit_alias: this.livekitAlias, - type: "livekit", }, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, - this.e2eeLivekitOptions(), - ); + client: this.matrixRTCSession.room.client, + scope: this.scope, + membershipsFocusMap$: this.membershipsAndFocusMap$, + } + nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions()); } else { logger.log( "SFU remoteConnections$ use prev connection: ", diff --git a/src/state/Connection.ts b/src/state/Connection.ts index f725ddda..bc352adf 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -7,15 +7,24 @@ Please see LICENSE in the repository root for full details. import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client"; -import { type MatrixClient } from "matrix-js-sdk"; import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest } from "rxjs"; -import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; +import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; import { type ObservableScope } from "./ObservableScope"; import { defaultLiveKitOptions } from "../livekit/options"; +export interface ConnectionOpts { + /** The focus server to connect to. */ + focus: LivekitFocus; + /** The Matrix client to use for OpenID and SFU config requests. */ + client: OpenIDClientParts; + /** The observable scope to use for this connection. */ + scope: ObservableScope; + /** An observable of the current RTC call memberships and their associated focus. */ + membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>; +} /** * A connection to a Matrix RTC LiveKit backend. * @@ -39,10 +48,20 @@ export class Connection { */ public async start(): Promise { this.stopped = false; - const { url, jwt } = await this.sfuConfig; + // TODO could this be loaded earlier to save time? + const { url, jwt } = await this.getSFUConfigWithOpenID(); + if (!this.stopped) await this.livekitRoom.connect(url, jwt); } + + protected async getSFUConfigWithOpenID(): Promise { + return await getSFUConfigWithOpenID( + this.client, + this.targetFocus.livekit_service_url, + this.targetFocus.livekit_alias + ) + } /** * Stops the connection. * @@ -55,17 +74,6 @@ export class Connection { this.stopped = true; } - protected readonly sfuConfig = getSFUConfigWithOpenID( - this.client, - this.focus.livekit_service_url, - this.focus.livekit_alias - ); - - /* - * An observable of the participants in the livekit room, including subscribers. - * Converts the livekit room events ParticipantConnected/ParticipantDisconnected/StateChange to an observable. - */ - protected readonly participantsIncludingSubscribers$; /** * An observable of the participants that are publishing on this connection. @@ -75,9 +83,9 @@ export class Connection { public readonly publishingParticipants$; /** - * The LiveKit room instance. + * The focus server to connect to. */ - public readonly livekitRoom: LivekitRoom; + protected readonly targetFocus: LivekitFocus; /** * An observable of the livekit connection state. @@ -85,48 +93,39 @@ export class Connection { */ public connectionState$: Behavior; + + private readonly client: OpenIDClientParts; /** * Creates a new connection to a matrix RTC LiveKit backend. * - * @param livekitRoom - Optional LiveKit room instance to use. If not provided, a new instance will be created. - * @param focus - The focus server to connect to. - * @param livekitAlias - The livekit alias to use when connecting to the focus server. TODO duplicate of focus? - * @param client - The matrix client, used to fetch the OpenId token. TODO refactor to avoid passing the whole client - * @param scope - The observable scope to use for creating observables. - * @param membershipsFocusMap$ - The observable of the current call RTC memberships and their associated focus. - * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. TODO refactor to avoid passing the whole options? + * @param livekitRoom - LiveKit room instance to use. + * @param opts - Connection options {@link ConnectionOpts}. + * */ - public constructor( - protected readonly focus: LivekitFocus, - // TODO : remove livekitAlias, it's already in focus? - protected readonly livekitAlias: string, - protected readonly client: MatrixClient, - protected readonly scope: ObservableScope, - protected readonly membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] - >, - e2eeLivekitOptions: E2EEOptions | undefined, - livekitRoom: LivekitRoom | undefined = undefined + protected constructor( + public readonly livekitRoom: LivekitRoom, + opts: ConnectionOpts, ) { - this.livekitRoom = - livekitRoom ?? - new LivekitRoom({ - ...defaultLiveKitOptions, - e2ee: e2eeLivekitOptions - }); - this.participantsIncludingSubscribers$ = this.scope.behavior( + const { focus, client, scope, membershipsFocusMap$ } = + opts; + + this.livekitRoom = livekitRoom + this.targetFocus = focus; + this.client = client; + + const participantsIncludingSubscribers$ = scope.behavior( connectedParticipantsObserver(this.livekitRoom), [] ); - this.publishingParticipants$ = this.scope.behavior( + this.publishingParticipants$ = scope.behavior( combineLatest( - [this.participantsIncludingSubscribers$, this.membershipsFocusMap$], + [participantsIncludingSubscribers$, membershipsFocusMap$], (participants, membershipsFocusMap) => membershipsFocusMap // Find all members that claim to publish on this connection .flatMap(({ membership, focus }) => - focus.livekit_service_url === this.focus.livekit_service_url + focus.livekit_service_url === this.targetFocus.livekit_service_url ? [membership] : [] ) @@ -141,11 +140,32 @@ export class Connection { ), [] ); - this.connectionState$ = this.scope.behavior( + this.connectionState$ = scope.behavior( connectionStateObserver(this.livekitRoom) ); - this.scope.onEnd(() => this.stop()); + scope.onEnd(() => this.stop()); } } +/** + * A remote connection to the Matrix RTC LiveKit backend. + * + * This connection is used for subscribing to remote participants. + * It does not publish any local tracks. + */ +export class RemoteConnection extends Connection { + + /** + * Creates a new remote connection to a matrix RTC LiveKit backend. + * @param opts + * @param sharedE2eeOption - The shared E2EE options to use for the connection. + */ + public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) { + const livekitRoom = new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: sharedE2eeOption + }); + super(livekitRoom, opts); + } +} diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts index 532be26c..724c6c5f 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/PublishConnection.ts @@ -7,9 +7,6 @@ Please see LICENSE in the repository root for full details. import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client"; import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; -import type { CallMembership, LivekitFocus } from "../../../matrix-js-sdk/lib/matrixrtc"; -import type { MatrixClient } from "../../../matrix-js-sdk"; -import type { ObservableScope } from "./ObservableScope.ts"; import type { Behavior } from "./Behavior.ts"; import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; import type { MuteStates } from "./MuteStates.ts"; @@ -19,7 +16,7 @@ import { getUrlParams } from "../UrlParams.ts"; import { defaultLiveKitOptions } from "../livekit/options.ts"; import { getValue } from "../utils/observable.ts"; import { observeTrackReference$ } from "./MediaViewModel.ts"; -import { Connection } from "./Connection.ts"; +import { Connection, type ConnectionOpts } from "./Connection.ts"; /** * A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to. @@ -39,8 +36,8 @@ export class PublishConnection extends Connection { */ public async start(): Promise { this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); + + await super.start() if (!this.stopped) { // TODO this can throw errors? It will also prompt for permissions if not already granted @@ -60,29 +57,20 @@ export class PublishConnection extends Connection { /** * Creates a new PublishConnection. - * @param focus - The Livekit focus object containing the configuration for the connection. - * @param livekitAlias - TODO: remove, use focus.livekit_alias instead - * @param client - The Matrix client to use for authentication. TODO: remove only pick OpenIDClientParts - * @param scope - The observable scope to use for managing subscriptions. - * @param membershipsFocusMap$ - An observable of the current RTC call memberships and their associated focus. + * @param args - The connection options. {@link ConnectionOpts} * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). */ public constructor( - focus: LivekitFocus, - livekitAlias: string, - client: MatrixClient, - scope: ObservableScope, - membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] - >, + args: ConnectionOpts, devices: MediaDevices, private readonly muteStates: MuteStates, e2eeLivekitOptions: E2EEOptions | undefined, trackerProcessorState$: Behavior ) { + const { scope } = args; logger.info("[LivekitRoom] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); @@ -112,17 +100,19 @@ export class PublishConnection extends Connection { }); super( - focus, - livekitAlias, - client, - scope, - membershipsFocusMap$, - e2eeLivekitOptions, - room + room, + args, + // focus, + // livekitAlias, + // client, + // scope, + // membershipsFocusMap$, + // e2eeLivekitOptions, + // room ); // Setup track processor syncing (blur) - const track$ = this.scope.behavior( + const track$ = scope.behavior( observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( map((trackRef) => { const track = trackRef?.publication?.track; @@ -148,7 +138,7 @@ export class PublishConnection extends Connection { } return this.livekitRoom.localParticipant.isCameraEnabled; }); - this.scope.onEnd(() => { + scope.onEnd(() => { this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); }); @@ -157,7 +147,7 @@ export class PublishConnection extends Connection { kind: MediaDeviceKind, selected$: Observable ): Subscription => - selected$.pipe(this.scope.bind()).subscribe((device) => { + selected$.pipe(scope.bind()).subscribe((device) => { if (this.connectionState$.value !== ConnectionState.Connected) return; logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", @@ -192,7 +182,7 @@ export class PublishConnection extends Connection { devices.audioInput.selected$ .pipe( switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), - this.scope.bind() + scope.bind() ) .subscribe(() => { if (this.connectionState$.value !== ConnectionState.Connected) return;