From 33ba746f2b9dba2509667a11e4aacea053a5fa0b Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 28 Aug 2025 17:45:14 +0200 Subject: [PATCH] lots of work. noone knows if it works. Signed-off-by: Timo K --- src/room/GroupCallView.tsx | 2 +- src/state/CallViewModel.ts | 379 ++++++++++++++++++------------------- src/state/Connection.ts | 364 ++++++++++++++++++++++++++++++++--- 3 files changed, 516 insertions(+), 229 deletions(-) diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 75648f53..26004323 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -41,7 +41,7 @@ import { ActiveCall } from "./InCallView"; import { MUTE_PARTICIPANT_COUNT, type MuteStates } from "./MuteStates"; import { useMediaDevices } from "../MediaDevicesContext"; import { useMatrixRTCSessionMemberships } from "../useMatrixRTCSessionMemberships"; -import { enterRTCSession, leaveRTCSession } from "../rtcSessionHelpers"; +import { leaveRTCSession } from "../rtcSessionHelpers"; import { saveKeyForRoom, useRoomEncryptionSystem, diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 1a5281c8..7e16689f 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -20,10 +20,10 @@ import { import E2EEWorker from "livekit-client/e2ee-worker?worker"; import { ClientEvent, + RoomMember, RoomStateEvent, SyncState, type Room as MatrixRoom, - type RoomMember, } from "matrix-js-sdk"; import { BehaviorSubject, @@ -104,7 +104,7 @@ import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior } from "./Behavior"; -import { defaultLiveKitOptions } from "../livekit/options"; + import { enterRTCSession, getLivekitAlias, @@ -388,31 +388,6 @@ class ScreenShare { type MediaItem = UserMedia | ScreenShare; -function getE2eeOptions( - e2eeSystem: EncryptionSystem, - rtcSession: MatrixRTCSession, -): E2EEOptions | undefined { - if (e2eeSystem.kind === E2eeType.NONE) return undefined; - - if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { - const keyProvider = new MatrixKeyProvider(); - keyProvider.setRTCSession(rtcSession); - return { - keyProvider, - worker: new E2EEWorker(), - }; - } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { - const keyProvider = new ExternalE2EEKeyProvider(); - keyProvider - .setKey(e2eeSystem.secret) - .catch((e) => logger.error("Failed to set shared key for E2EE", e)); - return { - keyProvider, - worker: new E2EEWorker(), - }; - } -} - function getRoomMemberFromRtcMember( rtcMember: CallMembership, room: MatrixRoom, @@ -436,29 +411,25 @@ function getRoomMemberFromRtcMember( } export class CallViewModel extends ViewModel { - private readonly e2eeOptions = getE2eeOptions( + private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); + + private readonly livekitE2EERoomOptions = getE2eeOptions( this.options.encryptionSystem, this.matrixRTCSession, ); - private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); - - private readonly localConnectionLivekitRoom = new LivekitRoom({ - ...defaultLiveKitOptions, - e2ee: this.e2eeOptions, - }); - private readonly localFocus = makeFocus(this.matrixRTCSession); private readonly localConnection = this.localFocus.then( (focus) => new PublishConnection( - this.localConnectionLivekitRoom, focus, this.livekitAlias, this.matrixRTCSession.room.client, this.scope, this.membershipsAndFocusMap$, + this.mediaDevices, + this.livekitE2EERoomOptions, ), ); @@ -483,12 +454,12 @@ export class CallViewModel extends ViewModel { ), ); - private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe( + private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe( map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), ); private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.focusServiceUrls$]).pipe( + combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe( accumulate( new Map(), (prev, [localFocus, focusUrls]) => { @@ -505,10 +476,6 @@ export class CallViewModel extends ViewModel { focusUrl, ); nextConnection = new Connection( - new LivekitRoom({ - ...defaultLiveKitOptions, - e2ee: this.e2eeOptions, - }), { livekit_service_url: focusUrl, livekit_alias: this.livekitAlias, @@ -518,6 +485,7 @@ export class CallViewModel extends ViewModel { this.matrixRTCSession.room.client, this.scope, this.membershipsAndFocusMap$, + this.livekitE2EERoomOptions, ); } else { logger.log( @@ -634,29 +602,54 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; - /** - * The RemoteParticipants including those that are being "held" on the screen - */ - private readonly remoteParticipants$ = this.scope - .behavior( - combineLatest( - [this.localConnection, this.remoteConnections$], - (localConnection, remoteConnections) => { - const remoteConnectionsParticipants = [ - ...remoteConnections.values(), - ].map((c) => c.publishingParticipants$); - - return combineLatest( - [ - localConnection.publishingParticipants$, - ...remoteConnectionsParticipants, - ], - (...ps) => ps.flat(1), + private readonly participants$ = this.scope + .behavior< + { + participant: LocalParticipant | RemoteParticipant; + member: RoomMember; + livekitRoom: LivekitRoom; + }[] + >( + from(this.localConnection).pipe( + switchMap((localConnection) => { + const memberError = (): never => { + throw new Error("No room member for call membership"); + }; + const localParticipant = { + participant: localConnection.livekitRoom.localParticipant, + member: + this.matrixRoom.getMember(this.userId ?? "") ?? memberError(), + livekitRoom: localConnection.livekitRoom, + }; + return this.remoteConnections$.pipe( + switchMap((connections) => + combineLatest( + [...connections.values()].map((c) => + c.publishingParticipants$.pipe( + map((ps) => + ps.map(({ participant, membership }) => ({ + participant, + member: + getRoomMemberFromRtcMember( + membership, + this.matrixRoom, + )?.member ?? memberError(), + livekitRoom: c.livekitRoom, + })), + ), + ), + ), + ), + ), + map((remoteParticipants) => [ + ...remoteParticipants.flat(1), + localParticipant, + ]), ); - }, - ).pipe(switchAll(), startWith([])), + }), + ), ) - .pipe(pauseWhen(this.pretendToBeDisconnected$)); + .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)); /** * Displaynames for each member of the call. This will disambiguate @@ -728,8 +721,7 @@ export class CallViewModel extends ViewModel { */ private readonly mediaItems$ = this.scope.behavior( combineLatest([ - this.remoteParticipants$, - observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant), + this.participants$, duplicateTiles.value$, // Also react to changes in the MatrixRTC session list. // The session list will also be update if a room membership changes. @@ -744,43 +736,21 @@ export class CallViewModel extends ViewModel { ( prevItems, [ - remoteParticipants, - { participant: localParticipant }, + participants, duplicateTiles, _membershipsChanged, showNonMemberTiles, ], ) => { - const newItems = new Map( + const newItems: Map = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { - const room = this.matrixRoom; - // m.rtc.members are the basis for calculating what is visible in the call - for (const rtcMember of this.matrixRTCSession.memberships) { - const { member, id: livekitParticipantId } = - getRoomMemberFromRtcMember(rtcMember, room); - const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; - - let participant: - | LocalParticipant - | RemoteParticipant - | undefined = undefined; - if (livekitParticipantId === "local") { - participant = localParticipant; - } else { - participant = remoteParticipants.find( - (p) => p.identity === livekitParticipantId, - ); - } - - if (!member) { - logger.error( - "Could not find member for media id: ", - livekitParticipantId, - ); - } + for (const { participant, member, livekitRoom } of participants) { + const matrixId = participant.isLocal + ? "local" + : participant.identity; for (let i = 0; i < 1 + duplicateTiles; i++) { - const indexedMediaId = `${livekitParticipantId}:${i}`; - let prevMedia = prevItems.get(indexedMediaId); + const mediaId = `${matrixId}:${i}`; + let prevMedia = prevItems.get(mediaId); if (prevMedia && prevMedia instanceof UserMedia) { prevMedia.updateParticipant(participant); if (prevMedia.vm.member === undefined) { @@ -793,33 +763,33 @@ export class CallViewModel extends ViewModel { } } yield [ - indexedMediaId, + mediaId, // We create UserMedia with or without a participant. // This will be the initial value of a BehaviourSubject. // Once a participant appears we will update the BehaviourSubject. (see above) prevMedia ?? new UserMedia( - indexedMediaId, + mediaId, member, participant, this.options.encryptionSystem, - this.localConnectionLivekitRoom, + livekitRoom, this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixIdentifier) ?? "[👻]"), + map((m) => m.get(matrixId) ?? "[👻]"), ), this.handsRaised$.pipe( - map((v) => v[matrixIdentifier]?.time ?? null), + map((v) => v[matrixId]?.time ?? null), ), this.reactions$.pipe( - map((v) => v[matrixIdentifier] ?? undefined), + map((v) => v[matrixId] ?? undefined), ), ), ]; if (participant?.isScreenShareEnabled) { - const screenShareId = `${indexedMediaId}:screen-share`; + const screenShareId = `${mediaId}:screen-share`; yield [ screenShareId, prevItems.get(screenShareId) ?? @@ -828,10 +798,10 @@ export class CallViewModel extends ViewModel { member, participant, this.options.encryptionSystem, - this.localConnectionLivekitRoom, + livekitRoom, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixIdentifier) ?? "[👻]"), + map((m) => m.get(matrixId) ?? "[👻]"), ), ), ]; @@ -853,47 +823,51 @@ export class CallViewModel extends ViewModel { // - If one wants to test scalability using the LiveKit CLI. // - If an experimental project does not yet do the MatrixRTC bits. // - If someone wants to debug if the LiveKit connection works but MatrixRTC room state failed to arrive. - const newNonMemberItems = showNonMemberTiles - ? new Map( - function* (this: CallViewModel): Iterable<[string, MediaItem]> { - for (const participant of remoteParticipants) { - for (let i = 0; i < 1 + duplicateTiles; i++) { - const maybeNonMemberParticipantId = - participant.identity + ":" + i; - if (!newItems.has(maybeNonMemberParticipantId)) { - const nonMemberId = maybeNonMemberParticipantId; - yield [ - nonMemberId, - prevItems.get(nonMemberId) ?? - new UserMedia( - nonMemberId, - undefined, - participant, - this.options.encryptionSystem, - this.localConnectionLivekitRoom, - this.mediaDevices, - this.pretendToBeDisconnected$, - this.memberDisplaynames$.pipe( - map( - (m) => m.get(participant.identity) ?? "[👻]", - ), - ), - of(null), - of(null), - ), - ]; - } - } - } - }.bind(this)(), - ) - : new Map(); - if (newNonMemberItems.size > 0) { - logger.debug("Added NonMember items: ", newNonMemberItems); - } + // TODO-MULTI-SFU + // const newNonMemberItems = showNonMemberTiles + // ? new Map( + // function* ( + // this: CallViewModel, + // ): Iterable<[string, MediaItem]> { + // for (const participant of remoteParticipants) { + // for (let i = 0; i < 1 + duplicateTiles; i++) { + // const maybeNonMemberParticipantId = + // participant.identity + ":" + i; + // if (!newItems.has(maybeNonMemberParticipantId)) { + // const nonMemberId = maybeNonMemberParticipantId; + // yield [ + // nonMemberId, + // prevItems.get(nonMemberId) ?? + // new UserMedia( + // nonMemberId, + // undefined, + // participant, + // this.options.encryptionSystem, + // localConnection.livekitRoom, + // this.mediaDevices, + // this.pretendToBeDisconnected$, + // this.memberDisplaynames$.pipe( + // map( + // (m) => + // m.get(participant.identity) ?? "[👻]", + // ), + // ), + // of(null), + // of(null), + // ), + // ]; + // } + // } + // } + // }.bind(this)(), + // ) + // : new Map(); + // if (newNonMemberItems.size > 0) { + // logger.debug("Added NonMember items: ", newNonMemberItems); + // } const combinedNew = new Map([ - ...newNonMemberItems.entries(), + // ...newNonMemberItems.entries(), ...newItems.entries(), ]); @@ -1724,66 +1698,77 @@ export class CallViewModel extends ViewModel { // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { - const publications = - this.localConnectionLivekitRoom.localParticipant.trackPublications.values(); - if (connected) { - for (const p of publications) { - if (p.track?.isUpstreamPaused === true) { - const kind = p.track.kind; - logger.log( - `Resumming ${kind} track (MatrixRTC connection present)`, - ); - p.track - .resumeUpstream() - .catch((e) => - logger.error( - `Failed to resume ${kind} track after MatrixRTC reconnection`, - e, - ), + void this.localConnection.then((localConnection) => + this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { + const publications = + localConnection.livekitRoom.localParticipant.trackPublications.values(); + if (connected) { + for (const p of publications) { + if (p.track?.isUpstreamPaused === true) { + const kind = p.track.kind; + logger.log( + `Resuming ${kind} track (MatrixRTC connection present)`, ); + p.track + .resumeUpstream() + .catch((e) => + logger.error( + `Failed to resume ${kind} track after MatrixRTC reconnection`, + e, + ), + ); + } + } + } else { + for (const p of publications) { + if (p.track?.isUpstreamPaused === false) { + const kind = p.track.kind; + logger.log( + `Pausing ${kind} track (uncertain MatrixRTC connection)`, + ); + p.track + .pauseUpstream() + .catch((e) => + logger.error( + `Failed to pause ${kind} track after entering uncertain MatrixRTC connection`, + e, + ), + ); + } } } - } else { - for (const p of publications) { - if (p.track?.isUpstreamPaused === false) { - const kind = p.track.kind; - logger.log( - `Pausing ${kind} track (uncertain MatrixRTC connection)`, - ); - p.track - .pauseUpstream() - .catch((e) => - logger.error( - `Failed to pause ${kind} track after entering uncertain MatrixRTC connection`, - e, - ), - ); - } - } - } - }); + }), + ); // Join automatically this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? } } -export const membershipsFocusUrl = ( - memberships: CallMembership[], - matrixRTCSession: MatrixRTCSession, -): { livekit_service_url: string; membership: CallMembership }[] => { - return memberships - .map( - (m) => - [matrixRTCSession.resolveActiveFocus(m), m] as [ - LivekitFocusConfig | undefined, - CallMembership, - ], - ) - .filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f)) - .map(([f, m]) => ({ - livekit_service_url: f!.livekit_service_url, - membership: m, - })); -}; +// TODO-MULTI-SFU // Setup and update the keyProvider which was create by `createRoom` was a thing before. Now we never update if the E2EEsystem changes +// do we need this? + +function getE2eeOptions( + e2eeSystem: EncryptionSystem, + rtcSession: MatrixRTCSession, +): E2EEOptions | undefined { + if (e2eeSystem.kind === E2eeType.NONE) return undefined; + + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + const keyProvider = new MatrixKeyProvider(); + keyProvider.setRTCSession(rtcSession); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + const keyProvider = new ExternalE2EEKeyProvider(); + keyProvider + .setKey(e2eeSystem.secret) + .catch((e) => logger.error("Failed to set shared key for E2EE", e)); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } +} diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 6e114603..700ee4ef 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -8,26 +8,42 @@ Please see LICENSE in the repository root for full details. import { connectedParticipantsObserver } from "@livekit/components-core"; import { - type Room as LivekitRoom, - type RemoteParticipant, + ConnectionState, + Room as LivekitRoom, + type RoomOptions, + type E2EEOptions, + RoomEvent, + Track, } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; import { type LivekitFocus, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, map, type Observable } from "rxjs"; +import { + BehaviorSubject, + combineLatest, + filter, + fromEvent, + map, + NEVER, + type Observable, + type Subscription, + switchMap, +} from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { type SelectedDevice, type MediaDevices } from "./MediaDevices"; import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; -import { type Behavior } from "./Behavior"; +import { constant, type Behavior } from "./Behavior"; import { type ObservableScope } from "./ObservableScope"; +import { defaultLiveKitOptions } from "../livekit/options"; +import { getValue } from "../utils/observable"; +import { getUrlParams } from "../UrlParams"; +import { type MuteStates } from "../room/MuteStates"; export class Connection { - protected readonly sfuConfig = getSFUConfigWithOpenID( - this.client, - this.focus.livekit_service_url, - this.livekitAlias, - ); + protected stopped = false; public async start(): Promise { this.stopped = false; @@ -35,22 +51,44 @@ export class Connection { if (!this.stopped) await this.livekitRoom.connect(url, jwt); } - protected stopped = false; - public stop(): void { void this.livekitRoom.disconnect(); this.stopped = true; } - public readonly participantsIncludingSubscribers$ = this.scope.behavior( - connectedParticipantsObserver(this.livekitRoom), - [], + protected readonly sfuConfig = getSFUConfigWithOpenID( + this.client, + this.focus.livekit_service_url, + this.livekitAlias, ); - public readonly publishingParticipants$: Observable = - this.scope.behavior( + public readonly participantsIncludingSubscribers$; + public readonly publishingParticipants$; + public livekitRoom: LivekitRoom; + + public connectionState$: Behavior; + public constructor( + protected readonly focus: LivekitFocus, + protected readonly livekitAlias: string, + protected readonly client: MatrixClient, + protected readonly scope: ObservableScope, + protected readonly membershipsFocusMap$: Behavior< + { membership: CallMembership; focus: LivekitFocus }[] + >, + e2eeLivekitOptions: E2EEOptions | undefined, + ) { + this.livekitRoom = new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: e2eeLivekitOptions, + }); + this.participantsIncludingSubscribers$ = this.scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); + + this.publishingParticipants$ = this.scope.behavior( combineLatest([ - connectedParticipantsObserver(this.livekitRoom), + this.participantsIncludingSubscribers$, this.membershipsFocusMap$, ]).pipe( map(([participants, membershipsFocusMap]) => @@ -62,27 +100,24 @@ export class Connection { : [], ) // Find all associated publishing livekit participant objects - .flatMap(({ sender, deviceId }) => { + .flatMap((membership) => { const participant = participants.find( - (p) => p.identity === `${sender}:${deviceId}`, + (p) => + p.identity === `${membership.sender}:${membership.deviceId}`, ); - return participant ? [participant] : []; + return participant ? [{ participant, membership }] : []; }), ), ), [], ); - - public constructor( - protected readonly livekitRoom: LivekitRoom, - protected readonly focus: LivekitFocus, - protected readonly livekitAlias: string, - protected readonly client: MatrixClient, - protected readonly scope: ObservableScope, - protected readonly membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] - >, - ) {} + this.connectionState$ = this.scope.behavior( + fromEvent( + this.livekitRoom, + RoomEvent.ConnectionStateChanged, + ), + ); + } } export class PublishConnection extends Connection { @@ -111,4 +146,271 @@ export class PublishConnection extends Connection { connectedParticipantsObserver(this.livekitRoom), [], ); + private readonly muteStates$: Behavior; + private updatingMuteStates$ = new BehaviorSubject(false); + + public constructor( + protected readonly focus: LivekitFocus, + protected readonly livekitAlias: string, + protected readonly client: MatrixClient, + protected readonly scope: ObservableScope, + protected readonly membershipsFocusMap$: Behavior< + { membership: CallMembership; focus: LivekitFocus }[] + >, + protected readonly devices: MediaDevices, + e2eeLivekitOptions: E2EEOptions | undefined, + ) { + super( + focus, + livekitAlias, + client, + scope, + membershipsFocusMap$, + e2eeLivekitOptions, + ); + + // TODO-MULTI-SFU use actual mute states + this.muteStates$ = constant({ + audio: { enabled: true, setEnabled: (enabled) => {} }, + video: { enabled: true, setEnabled: (enabled) => {} }, + }); + + logger.info("[LivekitRoom] Create LiveKit room"); + const { controlledAudioDevices } = getUrlParams(); + + const roomOptions: RoomOptions = { + ...defaultLiveKitOptions, + videoCaptureDefaults: { + ...defaultLiveKitOptions.videoCaptureDefaults, + deviceId: getValue(this.devices.videoInput.selected$)?.id, + // TODO-MULTI-SFU add processor support back + // processor, + }, + audioCaptureDefaults: { + ...defaultLiveKitOptions.audioCaptureDefaults, + deviceId: getValue(devices.audioInput.selected$)?.id, + }, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : getValue(devices.audioOutput.selected$)?.id, + }, + e2ee: e2eeLivekitOptions, + }; + // We have to create the room manually here due to a bug inside + // @livekit/components-react. JSON.stringify() is used in deps of a + // useEffect() with an argument that references itself, if E2EE is enabled + const room = new LivekitRoom(roomOptions); + room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => { + logger.error("Failed to set E2EE enabled on room", e); + }); + this.livekitRoom = room; + + // sync mute states TODO-MULTI_SFU This possibly can be simplified quite a bit. + combineLatest([ + this.connectionState$, + this.muteStates$, + this.updatingMuteStates$, + ]) + .pipe( + filter(([_c, _m, updating]) => !updating), + this.scope.bind(), + ) + .subscribe(([connectionState, muteStates, _]) => { + // Sync the requested mute states with LiveKit's mute states. We do it this + // way around rather than using LiveKit as the source of truth, so that the + // states can be consistent throughout the lobby and loading screens. + // It's important that we only do this in the connected state, because + // LiveKit's internal mute states aren't consistent during connection setup, + // and setting tracks to be enabled during this time causes errors. + if ( + this.livekitRoom !== undefined && + connectionState === ConnectionState.Connected + ) { + const participant = this.livekitRoom.localParticipant; + + enum MuteDevice { + Microphone, + Camera, + } + + const syncMuteState = async ( + iterCount: number, + type: MuteDevice, + ): Promise => { + // The approach for muting is to always bring the actual livekit state in sync with the button + // This allows for a very predictable and reactive behavior for the user. + // (the new state is the old state when pressing the button n times (where n is even)) + // (the new state is different to the old state when pressing the button n times (where n is uneven)) + // In case there are issues with the device there might be situations where setMicrophoneEnabled/setCameraEnabled + // return immediately. This should be caught with the Error("track with new mute state could not be published"). + // For now we are still using an iterCount to limit the recursion loop to 10. + // This could happen if the device just really does not want to turn on (hardware based issue) + // but the mute button is in unmute state. + // For now our fail mode is to just stay in this state. + // TODO: decide for a UX on how that fail mode should be treated (disable button, hide button, sync button back to muted without user input) + + if (iterCount > 10) { + logger.error( + "Stop trying to sync the input device with current mute state after 10 failed tries", + ); + return; + } + let devEnabled; + let btnEnabled; + switch (type) { + case MuteDevice.Microphone: + devEnabled = participant.isMicrophoneEnabled; + btnEnabled = muteStates.audio.enabled; + break; + case MuteDevice.Camera: + devEnabled = participant.isCameraEnabled; + btnEnabled = muteStates.video.enabled; + break; + } + if (devEnabled !== btnEnabled && !this.updatingMuteStates$.value) { + this.updatingMuteStates$.next(true); + + try { + let trackPublication; + switch (type) { + case MuteDevice.Microphone: + trackPublication = await participant.setMicrophoneEnabled( + btnEnabled, + this.livekitRoom.options.audioCaptureDefaults, + ); + break; + case MuteDevice.Camera: + trackPublication = await participant.setCameraEnabled( + btnEnabled, + this.livekitRoom.options.videoCaptureDefaults, + ); + break; + } + + if (trackPublication) { + // await participant.setMicrophoneEnabled can return immediately in some instances, + // so that participant.isMicrophoneEnabled !== buttonEnabled.current.audio still holds true. + // This happens if the device is still in a pending state + // "sleeping" here makes sure we let react do its thing so that participant.isMicrophoneEnabled is updated, + // so we do not end up in a recursion loop. + await new Promise((r) => setTimeout(r, 100)); + + // track got successfully changed to mute/unmute + // Run the check again after the change is done. Because the user + // can update the state (presses mute button) while the device is enabling + // itself we need might need to update the mute state right away. + // This async recursion makes sure that setCamera/MicrophoneEnabled is + // called as little times as possible. + await syncMuteState(iterCount + 1, type); + } else { + throw new Error( + "track with new mute state could not be published", + ); + } + } catch (e) { + if ((e as DOMException).name === "NotAllowedError") { + logger.error( + "Fatal error while syncing mute state: resetting", + e, + ); + if (type === MuteDevice.Microphone) { + muteStates.audio.setEnabled?.(false); + } else { + muteStates.video.setEnabled?.(false); + } + } else { + logger.error( + "Failed to sync audio mute state with LiveKit (will retry to sync in 1s):", + e, + ); + setTimeout(() => { + this.updatingMuteStates$.next(false); + }, 1000); + } + } + } + }; + + syncMuteState(0, MuteDevice.Microphone).catch((e) => { + logger.error("Failed to sync audio mute state with LiveKit", e); + }); + syncMuteState(0, MuteDevice.Camera).catch((e) => { + logger.error("Failed to sync video mute state with LiveKit", e); + }); + } + }); + + const syncDevice = ( + kind: MediaDeviceKind, + selected$: Observable, + ): Subscription => + selected$.pipe(this.scope.bind()).subscribe((device) => { + if (this.connectionState$.value !== ConnectionState.Connected) return; + logger.info( + "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", + this.livekitRoom.getActiveDevice(kind), + " !== ", + device?.id, + ); + if ( + device !== undefined && + this.livekitRoom.getActiveDevice(kind) !== device.id + ) { + this.livekitRoom + .switchActiveDevice(kind, device.id) + .catch((e) => + logger.error(`Failed to sync ${kind} device with LiveKit`, e), + ); + } + }); + + syncDevice("audioinput", devices.audioInput.selected$); + if (!controlledAudioDevices) + syncDevice("audiooutput", devices.audioOutput.selected$); + syncDevice("videoinput", devices.videoInput.selected$); + // Restart the audio input track whenever we detect that the active media + // device has changed to refer to a different hardware device. We do this + // for the sake of Chrome, which provides a "default" device that is meant + // to match the system's default audio input, whatever that may be. + // This is special-cased for only audio inputs because we need to dig around + // in the LocalParticipant object for the track object and there's not a nice + // way to do that generically. There is usually no OS-level default video capture + // device anyway, and audio outputs work differently. + devices.audioInput.selected$ + .pipe( + switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), + this.scope.bind(), + ) + .subscribe(() => { + if (this.connectionState$.value !== ConnectionState.Connected) return; + const activeMicTrack = Array.from( + this.livekitRoom.localParticipant.audioTrackPublications.values(), + ).find((d) => d.source === Track.Source.Microphone)?.track; + + if ( + activeMicTrack && + // only restart if the stream is still running: LiveKit will detect + // when a track stops & restart appropriately, so this is not our job. + // Plus, we need to avoid restarting again if the track is already in + // the process of being restarted. + activeMicTrack.mediaStreamTrack.readyState !== "ended" + ) { + // Restart the track, which will cause Livekit to do another + // getUserMedia() call with deviceId: default to get the *new* default device. + // Note that room.switchActiveDevice() won't work: Livekit will ignore it because + // the deviceId hasn't changed (was & still is default). + this.livekitRoom.localParticipant + .getTrackPublication(Track.Source.Microphone) + ?.audioTrack?.restartTrack() + .catch((e) => { + logger.error(`Failed to restart audio device track`, e); + }); + } + }); + } + // TODO-MULTI-SFU Sync the requested track processors with LiveKit }