diff --git a/src/state/ownMember/Publisher.ts b/src/state/ownMember/Publisher.ts index 3feb8a52..c37445b0 100644 --- a/src/state/ownMember/Publisher.ts +++ b/src/state/ownMember/Publisher.ts @@ -5,12 +5,13 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { - ConnectionState, type E2EEOptions, LocalVideoTrack, - Room as LivekitRoom, - type RoomOptions, + type Room as LivekitRoom, Track, + type LocalTrack, + type LocalTrackPublication, + ConnectionState as LivekitConnectionState, } from "livekit-client"; import { map, @@ -19,7 +20,7 @@ import { type Subscription, switchMap, } from "rxjs"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { type Logger } from "matrix-js-sdk/lib/logger"; import type { Behavior } from "../Behavior.ts"; import type { MediaDevices, SelectedDevice } from "../MediaDevices.ts"; @@ -29,55 +30,43 @@ import { trackProcessorSync, } from "../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../UrlParams.ts"; -import { defaultLiveKitOptions } from "../../livekit/options.ts"; -import { getValue } from "../../utils/observable.ts"; import { observeTrackReference$ } from "../MediaViewModel.ts"; -import { Connection, type ConnectionOpts } from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../ObservableScope.ts"; /** - * A connection to the local LiveKit room, the one the user is publishing to. - * This connection will publish the local user's audio and video tracks. + * A wrapper for a Connection object. + * This wrapper will manage the connection used to publish to the LiveKit room. + * The Publisher is also responsible for creating the media tracks. */ -export class PublishConnection extends Connection { - private readonly scope: ObservableScope; - +export class Publisher { + public tracks: LocalTrack[] = []; /** - * Creates a new PublishConnection. - * @param args - The connection options. {@link ConnectionOpts} + * Creates a new Publisher. + * @param scope - The observable scope to use for managing the publisher. + * @param connection - The connection to use for publishing. * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). */ public constructor( - args: ConnectionOpts, + private scope: ObservableScope, + private connection: Connection, devices: MediaDevices, private readonly muteStates: MuteStates, e2eeLivekitOptions: E2EEOptions | undefined, trackerProcessorState$: Behavior, + private logger?: Logger, ) { - const { scope } = args; - logger.info("[PublishConnection] Create LiveKit room"); + this.logger?.info("[PublishConnection] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); - const factory = - args.livekitRoomFactory ?? - ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); - const room = factory( - generateRoomOption( - devices, - trackerProcessorState$.value, - controlledAudioDevices, - e2eeLivekitOptions, - ), - ); - room.setE2EEEnabled(e2eeLivekitOptions !== undefined)?.catch((e) => { - logger.error("Failed to set E2EE enabled on room", e); - }); + const room = connection.livekitRoom; - super(room, args); - this.scope = scope; + room.setE2EEEnabled(e2eeLivekitOptions !== undefined)?.catch((e) => { + this.logger?.error("Failed to set E2EE enabled on room", e); + }); // Setup track processor syncing (blur) this.observeTrackProcessors(scope, room, trackerProcessorState$); @@ -91,55 +80,98 @@ export class PublishConnection extends Connection { * Start the connection to LiveKit and publish local tracks. * * This will: - * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) - * 2. Use this token to request the SFU config to the MatrixRtc authentication service. - * 3. Connect to the configured LiveKit room. - * 4. Create local audio and video tracks based on the current mute states and publish them to the room. + * wait for the connection to be ready. + // * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) + // * 2. Use this token to request the SFU config to the MatrixRtc authentication service. + // * 3. Connect to the configured LiveKit room. + // * 4. Create local audio and video tracks based on the current mute states and publish them to the room. * * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - public async start(): Promise { - this.stopped = false; - + public async createAndSetupTracks(): Promise { + const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); + // TODO: This should be an autostarted connection no need to start here. just check the connection state. // TODO: This will fetch the JWT token. Perhaps we could keep it preloaded // instead? This optimization would only be safe for a publish connection, // because we don't want to leak the user's intent to perhaps join a call to // remote servers before they actually commit to it. - await super.start(); - - if (this.stopped) return; - + const { promise, resolve, reject } = Promise.withResolvers(); + const sub = this.connection.state$.subscribe((s) => { + if (s.state !== "FailedToStart") { + reject(new Error("Disconnected from LiveKit server")); + } else { + resolve(); + } + }); + try { + await promise; + } catch (e) { + throw e; + } finally { + sub.unsubscribe(); + } // TODO-MULTI-SFU: Prepublish a microphone track const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; // createTracks throws if called with audio=false and video=false if (audio || video) { // TODO this can still throw errors? It will also prompt for permissions if not already granted - const tracks = await this.livekitRoom.localParticipant.createTracks({ + this.tracks = await lkRoom.localParticipant.createTracks({ audio, video, }); - if (this.stopped) return; - for (const track of tracks) { - // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally - // with a timeout. - await this.livekitRoom.localParticipant.publishTrack(track); - if (this.stopped) return; - // TODO: check if the connection is still active? and break the loop if not? - } } } - public async stop(): Promise { + public async startPublishing(): Promise { + const lkRoom = this.connection.livekitRoom; + const { promise, resolve, reject } = Promise.withResolvers(); + const sub = this.connection.state$.subscribe((s) => { + switch (s.state) { + case "ConnectedToLkRoom": + resolve(); + break; + case "FailedToStart": + reject(new Error("Failed to connect to LiveKit server")); + break; + default: + this.logger?.info("waiting for connection: ", s.state); + } + }); + try { + await promise; + } catch (e) { + throw e; + } finally { + sub.unsubscribe(); + } + for (const track of this.tracks) { + // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally + // with a timeout. + await lkRoom.localParticipant.publishTrack(track); + + // TODO: check if the connection is still active? and break the loop if not? + } + return this.tracks; + } + + public async stopPublishing(): Promise { // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope // actually has the right lifetime this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); - await super.stop(); + + const localParticipant = this.connection.livekitRoom.localParticipant; + const tracks: LocalTrack[] = []; + const addToTracksIfDefined = (p: LocalTrackPublication): void => { + if (p.track !== undefined) tracks.push(p.track); + }; + localParticipant.trackPublications.forEach(addToTracksIfDefined); + await localParticipant.unpublishTracks(tracks); } /// Private methods @@ -156,15 +188,16 @@ export class PublishConnection extends Connection { devices: MediaDevices, scope: ObservableScope, ): void { + const lkRoom = this.connection.livekitRoom; devices.audioInput.selected$ .pipe( switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), scope.bind(), ) .subscribe(() => { - if (this.livekitRoom.state != ConnectionState.Connected) return; + if (lkRoom.state != LivekitConnectionState.Connected) return; const activeMicTrack = Array.from( - this.livekitRoom.localParticipant.audioTrackPublications.values(), + lkRoom.localParticipant.audioTrackPublications.values(), ).find((d) => d.source === Track.Source.Microphone)?.track; if ( @@ -179,11 +212,11 @@ export class PublishConnection extends Connection { // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because // the deviceId hasn't changed (was & still is default). - this.livekitRoom.localParticipant + lkRoom.localParticipant .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { - logger.error(`Failed to restart audio device track`, e); + this.logger?.error(`Failed to restart audio device track`, e); }); } }); @@ -195,27 +228,31 @@ export class PublishConnection extends Connection { devices: MediaDevices, controlledAudioDevices: boolean, ): void { + const lkRoom = this.connection.livekitRoom; const syncDevice = ( kind: MediaDeviceKind, selected$: Observable, ): Subscription => selected$.pipe(scope.bind()).subscribe((device) => { - if (this.livekitRoom.state != ConnectionState.Connected) return; + if (lkRoom.state != LivekitConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; - logger.info( + this.logger?.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", - this.livekitRoom.getActiveDevice(kind), + lkRoom.getActiveDevice(kind), " !== ", device?.id, ); if ( device !== undefined && - this.livekitRoom.getActiveDevice(kind) !== device.id + lkRoom.getActiveDevice(kind) !== device.id ) { - this.livekitRoom + lkRoom .switchActiveDevice(kind, device.id) .catch((e) => - logger.error(`Failed to sync ${kind} device with LiveKit`, e), + this.logger?.error( + `Failed to sync ${kind} device with LiveKit`, + e, + ), ); } }); @@ -232,21 +269,28 @@ export class PublishConnection extends Connection { * @private */ private observeMuteStates(scope: ObservableScope): void { + const lkRoom = this.connection.livekitRoom; this.muteStates.audio.setHandler(async (desired) => { try { - await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); + await lkRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { - logger.error("Failed to update LiveKit audio input mute state", e); + this.logger?.error( + "Failed to update LiveKit audio input mute state", + e, + ); } - return this.livekitRoom.localParticipant.isMicrophoneEnabled; + return lkRoom.localParticipant.isMicrophoneEnabled; }); this.muteStates.video.setHandler(async (desired) => { try { - await this.livekitRoom.localParticipant.setCameraEnabled(desired); + await lkRoom.localParticipant.setCameraEnabled(desired); } catch (e) { - logger.error("Failed to update LiveKit video input mute state", e); + this.logger?.error( + "Failed to update LiveKit video input mute state", + e, + ); } - return this.livekitRoom.localParticipant.isCameraEnabled; + return lkRoom.localParticipant.isCameraEnabled; }); } @@ -266,33 +310,3 @@ export class PublishConnection extends Connection { trackProcessorSync(track$, trackerProcessorState$); } } - -// Generate the initial LiveKit RoomOptions based on the current media devices and processor state. -function generateRoomOption( - devices: MediaDevices, - processorState: ProcessorState, - controlledAudioDevices: boolean, - e2eeLivekitOptions: E2EEOptions | undefined, -): RoomOptions { - return { - ...defaultLiveKitOptions, - videoCaptureDefaults: { - ...defaultLiveKitOptions.videoCaptureDefaults, - deviceId: devices.videoInput.selected$.value?.id, - processor: processorState.processor, - }, - audioCaptureDefaults: { - ...defaultLiveKitOptions.audioCaptureDefaults, - deviceId: devices.audioInput.selected$.value?.id, - }, - audioOutput: { - // When using controlled audio devices, we don't want to set the - // deviceId here, because it will be set by the native app. - // (also the id does not need to match a browser device id) - deviceId: controlledAudioDevices - ? undefined - : getValue(devices.audioOutput.selected$)?.id, - }, - e2ee: e2eeLivekitOptions, - }; -} diff --git a/src/state/remoteMembers/Connection.test.ts b/src/state/remoteMembers/Connection.test.ts index 3b0f42ee..0719e2c5 100644 --- a/src/state/remoteMembers/Connection.test.ts +++ b/src/state/remoteMembers/Connection.test.ts @@ -41,7 +41,7 @@ import { import { ObservableScope } from "../ObservableScope.ts"; import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; import { FailToGetOpenIdToken } from "../../utils/errors.ts"; -import { PublishConnection } from "../ownMember/PublishConnection.ts"; +import { PublishConnection } from "../ownMember/Publisher.ts"; import { mockMediaDevices, mockMuteStates } from "../../utils/test.ts"; import type { ProcessorState } from "../../livekit/TrackProcessorContext.tsx"; import { type MuteStates } from "../MuteStates.ts"; diff --git a/src/state/remoteMembers/Connection.ts b/src/state/remoteMembers/Connection.ts index e815ea55..67b2dc8e 100644 --- a/src/state/remoteMembers/Connection.ts +++ b/src/state/remoteMembers/Connection.ts @@ -12,14 +12,12 @@ import { import { ConnectionError, type ConnectionState as LivekitConenctionState, - type E2EEOptions, - Room as LivekitRoom, - type RoomOptions, + type Room as LivekitRoom, type Participant, RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; +import { BehaviorSubject, type Observable } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -29,7 +27,6 @@ import { } from "../../livekit/openIDSFU.ts"; import { type Behavior } from "../Behavior.ts"; import { type ObservableScope } from "../ObservableScope.ts"; -import { defaultLiveKitOptions } from "../../livekit/options.ts"; import { InsufficientCapacityError, SFURoomCreationRestrictedError, @@ -44,19 +41,9 @@ export interface ConnectionOpts { client: OpenIDClientParts; /** The observable scope to use for this connection. */ scope: ObservableScope; - /** - * An observable of the current RTC call memberships and their associated transports. - * Used to differentiate between publishing and subscribging participants on each connection. - * Used to find out which rtc member should upload to this connection (publishingParticipants$). - * The livekit room gives access to all the users subscribing to this connection, we need - * to filter out the ones that are uploading to this connection. - */ - // membershipsWithTransport$: Behavior< - // { membership: CallMembership; transport: LivekitTransport }[] - // >; /** Optional factory to create the LiveKit room, mainly for testing purposes. */ - livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; + livekitRoomFactory: () => LivekitRoom; } export type ConnectionState = @@ -173,6 +160,7 @@ export class Connection { this.transport.livekit_alias, ); } + /** * Stops the connection. * @@ -195,10 +183,7 @@ export class Connection { * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ - public readonly publishingParticipants$: Behavior; - public readonly participantsWithPublishTrack$: Behavior< - PublishingParticipant[] - >; + public readonly participantsWithTrack$: Behavior; /** * The media transport to connect to. @@ -206,6 +191,8 @@ export class Connection { public readonly transport: LivekitTransport; private readonly client: OpenIDClientParts; + public readonly livekitRoom: LivekitRoom; + /** * Creates a new connection to a matrix RTC LiveKit backend. * @@ -213,20 +200,17 @@ export class Connection { * @param opts - Connection options {@link ConnectionOpts}. * */ - protected constructor( - public readonly livekitRoom: LivekitRoom, - opts: ConnectionOpts, - logger?: Logger, - ) { + public constructor(opts: ConnectionOpts, logger?: Logger) { logger?.info( `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, ); const { transport, client, scope } = opts; + this.livekitRoom = opts.livekitRoomFactory(); this.transport = transport; this.client = client; - this.participantsWithPublishTrack$ = scope.behavior( + this.participantsWithTrack$ = scope.behavior( connectedParticipantsObserver( this.livekitRoom, // VALR: added that while I think about it @@ -243,30 +227,3 @@ export class Connection { scope.onEnd(() => void this.stop()); } } - -/** - * A remote connection to the Matrix RTC LiveKit backend. - * - * This connection is used for subscribing to remote participants. - * It does not publish any local tracks. - */ -export class RemoteConnection extends Connection { - /** - * Creates a new remote connection to a matrix RTC LiveKit backend. - * @param opts - * @param sharedE2eeOption - The shared E2EE options to use for the connection. - */ - public constructor( - opts: ConnectionOpts, - sharedE2eeOption: E2EEOptions | undefined, - ) { - const factory = - opts.livekitRoomFactory ?? - ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); - const livekitRoom = factory({ - ...defaultLiveKitOptions, - e2ee: sharedE2eeOption, - }); - super(livekitRoom, opts); - } -} diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index 311e621e..b7a37b11 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -16,16 +16,21 @@ import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { type E2EEOptions, - type Room as LivekitRoom, + Room as LivekitRoom, type Participant as LivekitParticipant, + type RoomOptions, } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; import { type Behavior } from "../Behavior"; -import { type Connection, RemoteConnection } from "./Connection"; +import { Connection } from "./Connection"; import { type ObservableScope } from "../ObservableScope"; import { generateKeyed$ } from "../../utils/observable"; import { areLivekitTransportsEqual } from "./matrixLivekitMerger"; +import { getUrlParams } from "../../UrlParams"; +import { type ProcessorState } from "../../livekit/TrackProcessorContext"; +import { type MediaDevices } from "../MediaDevices"; +import { defaultLiveKitOptions } from "../../livekit/options"; export type ParticipantByMemberIdMap = Map< ParticipantId, @@ -33,25 +38,57 @@ export type ParticipantByMemberIdMap = Map< // multiple times to several livekit rooms. { participant: LivekitParticipant; connection: Connection }[] >; - -// - write test for scopes (do we really need to bind scope) +// TODO - write test for scopes (do we really need to bind scope) export class ConnectionManager { - /** - * The transport to use for publishing. - * This extends the list of tranports - */ - private publishTransport$ = new BehaviorSubject( - undefined, - ); + private livekitRoomFactory: () => LivekitRoom; + public constructor( + private client: MatrixClient, + private scope: ObservableScope, + private devices: MediaDevices, + private processorState: ProcessorState, + private e2eeLivekitOptions$: Behavior, + private logger?: Logger, + livekitRoomFactory?: () => LivekitRoom, + ) { + this.scope = scope; + const defaultFactory = (): LivekitRoom => + new LivekitRoom( + generateRoomOption( + this.devices, + this.processorState, + this.e2eeLivekitOptions$.value, + ), + ); + this.livekitRoomFactory = livekitRoomFactory ?? defaultFactory; + } - private transportSubscriptions$ = new BehaviorSubject< + /** + * A list of Behaviors each containing a LIST of LivekitTransport. + * Each of these behaviors can be interpreted as subscribed list of transports. + * + * Using `registerTransports` independent external modules can control what connections + * are created by the ConnectionManager. + * + * The connection manager will remove all duplicate transports in each subscibed list. + * + * See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe. + */ + private readonly transportsSubscriptions$ = new BehaviorSubject< Behavior[] >([]); - private transports$ = this.scope.behavior( - this.transportSubscriptions$.pipe( + /** + * All transports currently managed by the ConnectionManager. + * + * This list does not include duplicate transports. + * + * It is build based on the list of subscribed transports (`transportsSubscriptions$`). + * externally this is modified via `registerTransports()`. + */ + private readonly transports$ = this.scope.behavior( + this.transportsSubscriptions$.pipe( switchMap((subscriptions) => - combineLatest(subscriptions.map((s) => s.transports)).pipe( + combineLatest(subscriptions).pipe( map((transportsNested) => transportsNested.flat()), map(removeDuplicateTransports), ), @@ -59,24 +96,6 @@ export class ConnectionManager { ), ); - public constructor( - private client: MatrixClient, - private e2eeLivekitOptions: () => E2EEOptions | undefined, - private scope: ObservableScope, - private logger?: Logger, - private livekitRoomFactory?: () => LivekitRoom, - ) { - this.scope = scope; - } - - public getOrCreatePublishConnection( - transport: LivekitTransport, - ): Connection | undefined { - this.publishTransport$.next(transport); - const equalsRequestedTransport = (c: Connection): boolean => - areLivekitTransportsEqual(c.transport, transport); - return this.connections$.value.find(equalsRequestedTransport); - } /** * Connections for each transport in use by one or more session members. */ @@ -87,16 +106,16 @@ export class ConnectionManager { const createConnection = ( transport: LivekitTransport, - ): ((scope: ObservableScope) => RemoteConnection) => + ): ((scope: ObservableScope) => Connection) => (scope) => { - const connection = new RemoteConnection( + const connection = new Connection( { transport, client: this.client, scope: scope, livekitRoomFactory: this.livekitRoomFactory, }, - this.e2eeLivekitOptions(), + this.logger, ); void connection.start(); return connection; @@ -114,15 +133,23 @@ export class ConnectionManager { ); /** + * Add an a Behavior containing a list of transports to this ConnectionManager. * - * @param transports$ + * The intended usage is: + * - create a ConnectionManager + * - register one `transports$` behavior using registerTransports + * - add new connections to the `ConnectionManager` by updating the `transports$` behavior + * - remove a single connection by removing the transport. + * - remove this subscription by calling `unregisterTransports` and passing + * the same `transports$` behavior reference. + * @param transports$ The Behavior containing a list of transports to subscribe to. */ public registerTransports( transports$: Behavior, ): Connection[] { - if (!this.transportSubscriptions$.value.some((t$) => t$ === transports$)) { - this.transportSubscriptions$.next( - this.transportSubscriptions$.value.concat(transports$), + if (!this.transportsSubscriptions$.value.some((t$) => t$ === transports$)) { + this.transportsSubscriptions$.next( + this.transportsSubscriptions$.value.concat(transports$), ); } // After updating the subscriptions our connection list is also updated. @@ -135,22 +162,30 @@ export class ConnectionManager { .filter((c) => c !== undefined); } + /** + * Unsubscribe from the given transports. + * @param transports$ The behavior to unsubscribe from + * @returns + */ public unregisterTransports( transports$: Behavior, ): boolean { - const subscriptions = this.transportSubscriptions$.value; + const subscriptions = this.transportsSubscriptions$.value; const subscriptionsUnregistered = subscriptions.filter( (t$) => t$ !== transports$, ); const canUnregister = subscriptions.length !== subscriptionsUnregistered.length; if (canUnregister) - this.transportSubscriptions$.next(subscriptionsUnregistered); + this.transportsSubscriptions$.next(subscriptionsUnregistered); return canUnregister; } + /** + * Unsubscribe from all transports. + */ public unregisterAllTransports(): void { - this.transportSubscriptions$.next([]); + this.transportsSubscriptions$.next([]); } // We have a lost of connections, for each of these these @@ -161,7 +196,7 @@ export class ConnectionManager { switchMap((connections) => { const listsOfParticipantWithConnection = connections.map( (connection) => { - return connection.participantsWithPublishTrack$.pipe( + return connection.participantsWithTrack$.pipe( map((participants) => participants.map((p) => ({ participant: p, @@ -178,7 +213,13 @@ export class ConnectionManager { ), ); - // Filters the livekit participants + /** + * This field makes the connection manager to behave as close to a single SFU as possible. + * Each participant that is found on all connections managed by the manager will be listed. + * + * They are stored an a map keyed by `participant.identity` + * (which is equivalent to the `member.id` field in the `m.rtc.member` event) + */ public allParticipantsByMemberId$ = this.scope.behavior( this.allParticipantsWithConnection$.pipe( map((participantsWithConnections) => { @@ -191,10 +232,10 @@ export class ConnectionManager { acc.set(participant.identity, [{ connection, participant }]); } else { // already known - // This is user is publishing on several SFUs + // This is for users publishing on several SFUs currentVal.push({ connection, participant }); this.logger?.info( - `Participant ${participant.identity} is publishing on several SFUs ${currentVal.join()}`, + `Participant ${participant.identity} is publishing on several SFUs ${currentVal.map((v) => v.connection.transport.livekit_service_url).join(", ")}`, ); } } @@ -217,3 +258,37 @@ function removeDuplicateTransports( return acc; }, [] as LivekitTransport[]); } + +/** + * Generate the initial LiveKit RoomOptions based on the current media devices and processor state. + */ +function generateRoomOption( + devices: MediaDevices, + processorState: ProcessorState, + e2eeLivekitOptions: E2EEOptions | undefined, +): RoomOptions { + const { controlledAudioDevices } = getUrlParams(); + return { + ...defaultLiveKitOptions, + videoCaptureDefaults: { + ...defaultLiveKitOptions.videoCaptureDefaults, + deviceId: devices.videoInput.selected$.value?.id, + processor: processorState.processor, + }, + audioCaptureDefaults: { + ...defaultLiveKitOptions.audioCaptureDefaults, + deviceId: devices.audioInput.selected$.value?.id, + }, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : devices.audioOutput.selected$.value?.id, + }, + e2ee: e2eeLivekitOptions, + // TODO test and consider this: + // webAudioMix: true, + }; +} diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index ec3231c6..f288e2d0 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -12,7 +12,11 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; import { type ObservableScope } from "../ObservableScope"; -import { calculateDisplayName, shouldDisambiguate } from "../../utils/displayname"; +import { + calculateDisplayName, + shouldDisambiguate, +} from "../../utils/displayname"; +import { type Behavior } from "../Behavior"; /** * Displayname for each member of the call. This will disambiguate @@ -21,12 +25,12 @@ import { calculateDisplayName, shouldDisambiguate } from "../../utils/displaynam */ // don't do this work more times than we need to. This is achieved by converting to a behavior: export const memberDisplaynames$ = ( + scope: ObservableScope, matrixRoom: Room, memberships$: Observable, - scope: ObservableScope, userId: string, deviceId: string, -) => +): Behavior> => scope.behavior( combineLatest( [ diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index eb33f5a5..e77306c1 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -148,7 +148,7 @@ export class MatrixLivekitMerger { * together when it might change together is what you have to do in RxJS to * avoid reading inconsistent state or observing too many changes.) */ - private mapMembershipsToMembershipWithTransport$(): Observable< + private mapMembershipsToMembershipWithTransport$(): Behavior< { membership: CallMembership; transport?: LivekitTransport }[] > { return this.scope.behavior(