diff --git a/src/state/Behavior.ts b/src/state/Behavior.ts index 4a48494b..3c88dc00 100644 --- a/src/state/Behavior.ts +++ b/src/state/Behavior.ts @@ -5,9 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { BehaviorSubject, distinctUntilChanged, Observable } from "rxjs"; - -import { type ObservableScope } from "./ObservableScope"; +import { BehaviorSubject } from "rxjs"; /** * A stateful, read-only reactive value. As an Observable, it is "hot" and @@ -26,38 +24,3 @@ export type Behavior = Omit, "next" | "observers">; export function constant(value: T): Behavior { return new BehaviorSubject(value); } - -declare module "rxjs" { - interface Observable { - /** - * Converts this Observable into a Behavior. This requires the Observable to - * synchronously emit an initial value. - */ - behavior(scope: ObservableScope): Behavior; - } -} - -const nothing = Symbol("nothing"); - -Observable.prototype.behavior = function ( - this: Observable, - scope: ObservableScope, -): Behavior { - const subject$ = new BehaviorSubject(nothing); - // Push values from the Observable into the BehaviorSubject. - // BehaviorSubjects have an undesirable feature where if you call 'complete', - // they will no longer re-emit their current value upon subscription. We want - // to support Observables that complete (for example `of({})`), so we have to - // take care to not propagate the completion event. - this.pipe(scope.bind(), distinctUntilChanged()).subscribe({ - next(value) { - subject$.next(value); - }, - error(err) { - subject$.error(err); - }, - }); - if (subject$.value === nothing) - throw new Error("Behavior failed to synchronously emit an initial value"); - return subject$ as Behavior; -}; diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index de90cb16..021f8f77 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -272,9 +272,9 @@ class UserMedia { this.participant$ as Behavior, encryptionSystem, livekitRoom, - displayname$.behavior(this.scope), - handRaised$.behavior(this.scope), - reaction$.behavior(this.scope), + this.scope.behavior(displayname$), + this.scope.behavior(handRaised$), + this.scope.behavior(reaction$), ); } else { this.vm = new RemoteUserMediaViewModel( @@ -285,16 +285,16 @@ class UserMedia { >, encryptionSystem, livekitRoom, - displayname$.behavior(this.scope), - handRaised$.behavior(this.scope), - reaction$.behavior(this.scope), + this.scope.behavior(displayname$), + this.scope.behavior(handRaised$), + this.scope.behavior(reaction$), ); } - this.speaker$ = observeSpeaker$(this.vm.speaking$).behavior(this.scope); + this.speaker$ = this.scope.behavior(observeSpeaker$(this.vm.speaking$)); - this.presenter$ = this.participant$ - .pipe( + this.presenter$ = this.scope.behavior( + this.participant$.pipe( switchMap( (p) => (p && @@ -307,8 +307,8 @@ class UserMedia { ).pipe(map((p) => p.isScreenShareEnabled))) ?? of(false), ), - ) - .behavior(this.scope); + ), + ); } public updateParticipant( @@ -349,7 +349,7 @@ class ScreenShare { this.participant$.asObservable(), encryptionSystem, liveKitRoom, - displayName$.behavior(this.scope), + this.scope.behavior(displayName$), participant.isLocal, ); } @@ -386,71 +386,72 @@ function getRoomMemberFromRtcMember( // TODO: Move wayyyy more business logic from the call and lobby views into here export class CallViewModel extends ViewModel { - public readonly localVideo$: Behavior = + public readonly localVideo$ = this.scope.behavior( observeTrackReference$( this.livekitRoom.localParticipant, Track.Source.Camera, - ) - .pipe( - map((trackRef) => { - const track = trackRef?.publication?.track; - return track instanceof LocalVideoTrack ? track : null; - }), - ) - .behavior(this.scope); + ).pipe( + map((trackRef) => { + const track = trackRef?.publication?.track; + return track instanceof LocalVideoTrack ? track : null; + }), + ), + ); /** * The raw list of RemoteParticipants as reported by LiveKit */ - private readonly rawRemoteParticipants$: Behavior = - connectedParticipantsObserver(this.livekitRoom) - .pipe(startWith([])) - .behavior(this.scope); + private readonly rawRemoteParticipants$ = this.scope.behavior< + RemoteParticipant[] + >(connectedParticipantsObserver(this.livekitRoom).pipe(startWith([]))); /** * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that * they've left */ - private readonly remoteParticipantHolds$: Behavior = - this.connectionState$ - .pipe( - withLatestFrom(this.rawRemoteParticipants$), - mergeMap(([s, ps]) => { - // Whenever we switch focuses, we should retain all the previous - // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to - // give their clients time to switch over and avoid jarring layout shifts - if (s === ECAddonConnectionState.ECSwitchingFocus) { - return concat( - // Hold these participants - of({ hold: ps }), - // Wait for time to pass and the connection state to have changed - forkJoin([ - timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), - this.connectionState$.pipe( - filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), - take(1), - ), - // Then unhold them - ]).pipe(map(() => ({ unhold: ps }))), - ); - } else { - return EMPTY; - } - }), - // Accumulate the hold instructions into a single list showing which - // participants are being held - accumulate([] as RemoteParticipant[][], (holds, instruction) => - "hold" in instruction - ? [instruction.hold, ...holds] - : holds.filter((h) => h !== instruction.unhold), - ), - ) - .behavior(this.scope); + private readonly remoteParticipantHolds$ = this.scope.behavior< + RemoteParticipant[][] + >( + this.connectionState$.pipe( + withLatestFrom(this.rawRemoteParticipants$), + mergeMap(([s, ps]) => { + // Whenever we switch focuses, we should retain all the previous + // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to + // give their clients time to switch over and avoid jarring layout shifts + if (s === ECAddonConnectionState.ECSwitchingFocus) { + return concat( + // Hold these participants + of({ hold: ps }), + // Wait for time to pass and the connection state to have changed + forkJoin([ + timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), + this.connectionState$.pipe( + filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), + take(1), + ), + // Then unhold them + ]).pipe(map(() => ({ unhold: ps }))), + ); + } else { + return EMPTY; + } + }), + // Accumulate the hold instructions into a single list showing which + // participants are being held + accumulate([] as RemoteParticipant[][], (holds, instruction) => + "hold" in instruction + ? [instruction.hold, ...holds] + : holds.filter((h) => h !== instruction.unhold), + ), + ), + ); /** * The RemoteParticipants including those that are being "held" on the screen */ - private readonly remoteParticipants$: Behavior = + private readonly remoteParticipants$ = this.scope.behavior< + RemoteParticipant[] + >( combineLatest( [this.rawRemoteParticipants$, this.remoteParticipantHolds$], (raw, holds) => { @@ -469,20 +470,24 @@ export class CallViewModel extends ViewModel { return result; }, - ).behavior(this.scope); + ), + ); /** * Displaynames for each member of the call. This will disambiguate * any displaynames that clashes with another member. Only members * joined to the call are considered here. */ - public readonly memberDisplaynames$ = merge( - // Handle call membership changes. - fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged), - // Handle room membership changes (and displayname updates) - fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members), - ) - .pipe( + public readonly memberDisplaynames$ = this.scope.behavior( + merge( + // Handle call membership changes. + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ), + // Handle room membership changes (and displayname updates) + fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members), + ).pipe( startWith(null), map(() => { const displaynameMap = new Map(); @@ -510,13 +515,13 @@ export class CallViewModel extends ViewModel { // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower // than on Chrome/Firefox). This means it is important that we multicast the result so that we // don't do this work more times than we need to. This is achieved by converting to a behavior: - ) - .behavior(this.scope); + ), + ); - public readonly handsRaised$ = this.handsRaisedSubject$.behavior(this.scope); + public readonly handsRaised$ = this.scope.behavior(this.handsRaisedSubject$); - public readonly reactions$ = this.reactionsSubject$ - .pipe( + public readonly reactions$ = this.scope.behavior( + this.reactionsSubject$.pipe( map((v) => Object.fromEntries( Object.entries(v).map(([a, { reactionOption }]) => [ @@ -525,26 +530,26 @@ export class CallViewModel extends ViewModel { ]), ), ), - ) - .behavior(this.scope); + ), + ); /** * List of MediaItems that we want to display */ - private readonly mediaItems$: Behavior = combineLatest([ - this.remoteParticipants$, - observeParticipantMedia(this.livekitRoom.localParticipant), - duplicateTiles.value$, - // Also react to changes in the MatrixRTC session list. - // The session list will also be update if a room membership changes. - // No additional RoomState event listener needs to be set up. - fromEvent( - this.matrixRTCSession, - MatrixRTCSessionEvent.MembershipsChanged, - ).pipe(startWith(null)), - showNonMemberTiles.value$, - ]) - .pipe( + private readonly mediaItems$ = this.scope.behavior( + combineLatest([ + this.remoteParticipants$, + observeParticipantMedia(this.livekitRoom.localParticipant), + duplicateTiles.value$, + // Also react to changes in the MatrixRTC session list. + // The session list will also be update if a room membership changes. + // No additional RoomState event listener needs to be set up. + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ).pipe(startWith(null)), + showNonMemberTiles.value$, + ]).pipe( scan( ( prevItems, @@ -707,19 +712,19 @@ export class CallViewModel extends ViewModel { finalizeValue((ts) => { for (const t of ts) t.destroy(); }), - ) - .behavior(this.scope); + ), + ); /** * List of MediaItems that we want to display, that are of type UserMedia */ - private readonly userMedia$: Behavior = this.mediaItems$ - .pipe( + private readonly userMedia$ = this.scope.behavior( + this.mediaItems$.pipe( map((mediaItems) => mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), ), - ) - .behavior(this.scope); + ), + ); public readonly memberChanges$ = this.userMedia$ .pipe(map((mediaItems) => mediaItems.map((m) => m.id))) @@ -737,17 +742,17 @@ export class CallViewModel extends ViewModel { /** * List of MediaItems that we want to display, that are of type ScreenShare */ - private readonly screenShares$: Behavior = this.mediaItems$ - .pipe( + private readonly screenShares$ = this.scope.behavior( + this.mediaItems$.pipe( map((mediaItems) => mediaItems.filter((m): m is ScreenShare => m instanceof ScreenShare), ), - ) - .behavior(this.scope); + ), + ); - private readonly spotlightSpeaker$: Behavior = - this.userMedia$ - .pipe( + private readonly spotlightSpeaker$ = + this.scope.behavior( + this.userMedia$.pipe( switchMap((mediaItems) => mediaItems.length === 0 ? of([]) @@ -779,11 +784,11 @@ export class CallViewModel extends ViewModel { null, ), map((speaker) => speaker?.vm ?? null), - ) - .behavior(this.scope); + ), + ); - private readonly grid$: Behavior = this.userMedia$ - .pipe( + private readonly grid$ = this.scope.behavior( + this.userMedia$.pipe( switchMap((mediaItems) => { const bins = mediaItems.map((m) => combineLatest( @@ -820,11 +825,11 @@ export class CallViewModel extends ViewModel { ); }), distinctUntilChanged(shallowEquals), - ) - .behavior(this.scope); + ), + ); - private readonly spotlight$: Behavior = this.screenShares$ - .pipe( + private readonly spotlight$ = this.scope.behavior( + this.screenShares$.pipe( switchMap((screenShares) => { if (screenShares.length > 0) { return of(screenShares.map((m) => m.vm)); @@ -835,15 +840,15 @@ export class CallViewModel extends ViewModel { ); }), distinctUntilChanged(shallowEquals), - ) - .behavior(this.scope); + ), + ); - private readonly pip$: Behavior = combineLatest([ - this.screenShares$, - this.spotlightSpeaker$, - this.mediaItems$, - ]) - .pipe( + private readonly pip$ = this.scope.behavior( + combineLatest([ + this.screenShares$, + this.spotlightSpeaker$, + this.mediaItems$, + ]).pipe( switchMap(([screenShares, spotlight, mediaItems]) => { if (screenShares.length > 0) { return this.spotlightSpeaker$; @@ -873,8 +878,8 @@ export class CallViewModel extends ViewModel { }), ); }), - ) - .behavior(this.scope); + ), + ); private readonly hasRemoteScreenShares$: Observable = this.spotlight$.pipe( @@ -888,11 +893,8 @@ export class CallViewModel extends ViewModel { startWith(false), ); - private readonly naturalWindowMode$: Behavior = fromEvent( - window, - "resize", - ) - .pipe( + private readonly naturalWindowMode$ = this.scope.behavior( + fromEvent(window, "resize").pipe( startWith(null), map(() => { const height = window.innerHeight; @@ -905,35 +907,36 @@ export class CallViewModel extends ViewModel { if (width <= 600) return "narrow"; return "normal"; }), - ) - .behavior(this.scope); + ), + ); /** * The general shape of the window. */ - public readonly windowMode$: Behavior = this.pipEnabled$ - .pipe( + public readonly windowMode$ = this.scope.behavior( + this.pipEnabled$.pipe( switchMap((pip) => pip ? of("pip") : this.naturalWindowMode$, ), - ) - .behavior(this.scope); + ), + ); private readonly spotlightExpandedToggle$ = new Subject(); - public readonly spotlightExpanded$: Behavior = - this.spotlightExpandedToggle$ - .pipe(accumulate(false, (expanded) => !expanded)) - .behavior(this.scope); + public readonly spotlightExpanded$ = this.scope.behavior( + this.spotlightExpandedToggle$.pipe( + accumulate(false, (expanded) => !expanded), + ), + ); private readonly gridModeUserSelection$ = new Subject(); /** * The layout mode of the media tile grid. */ - public readonly gridMode$: Behavior = + public readonly gridMode$ = // If the user hasn't selected spotlight and somebody starts screen sharing, // automatically switch to spotlight mode and reset when screen sharing ends - this.gridModeUserSelection$ - .pipe( + this.scope.behavior( + this.gridModeUserSelection$.pipe( startWith(null), switchMap((userSelection) => (userSelection === "spotlight" @@ -952,8 +955,8 @@ export class CallViewModel extends ViewModel { ) ).pipe(startWith(userSelection ?? "grid")), ), - ) - .behavior(this.scope); + ), + ); public setGridMode(value: GridMode): void { this.gridModeUserSelection$.next(value); @@ -1014,8 +1017,8 @@ export class CallViewModel extends ViewModel { /** * The media to be used to produce a layout. */ - private readonly layoutMedia$: Behavior = this.windowMode$ - .pipe( + private readonly layoutMedia$ = this.scope.behavior( + this.windowMode$.pipe( switchMap((windowMode) => { switch (windowMode) { case "normal": @@ -1077,8 +1080,8 @@ export class CallViewModel extends ViewModel { return this.pipLayoutMedia$; } }), - ) - .behavior(this.scope); + ), + ); // There is a cyclical dependency here: the layout algorithms want to know // which tiles are on screen, but to know which tiles are on screen we have to @@ -1088,13 +1091,13 @@ export class CallViewModel extends ViewModel { private readonly setVisibleTiles = (value: number): void => this.visibleTiles$.next(value); - private readonly layoutInternals$: Behavior< + private readonly layoutInternals$ = this.scope.behavior< LayoutScanState & { layout: Layout } - > = combineLatest([ - this.layoutMedia$, - this.visibleTiles$.pipe(startWith(0), distinctUntilChanged()), - ]) - .pipe( + >( + combineLatest([ + this.layoutMedia$, + this.visibleTiles$.pipe(startWith(0), distinctUntilChanged()), + ]).pipe( scan< [LayoutMedia, number], LayoutScanState & { layout: Layout }, @@ -1129,29 +1132,29 @@ export class CallViewModel extends ViewModel { }, { layout: null, tiles: TileStore.empty() }, ), - ) - .behavior(this.scope); + ), + ); /** * The layout of tiles in the call interface. */ - public readonly layout$: Behavior = this.layoutInternals$ - .pipe(map(({ layout }) => layout)) - .behavior(this.scope); + public readonly layout$ = this.scope.behavior( + this.layoutInternals$.pipe(map(({ layout }) => layout)), + ); /** * The current generation of the tile store, exposed for debugging purposes. */ - public readonly tileStoreGeneration$: Behavior = this.layoutInternals$ - .pipe(map(({ tiles }) => tiles.generation)) - .behavior(this.scope); + public readonly tileStoreGeneration$ = this.scope.behavior( + this.layoutInternals$.pipe(map(({ tiles }) => tiles.generation)), + ); - public showSpotlightIndicators$: Behavior = this.layout$ - .pipe(map((l) => l.type !== "grid")) - .behavior(this.scope); + public showSpotlightIndicators$ = this.scope.behavior( + this.layout$.pipe(map((l) => l.type !== "grid")), + ); - public showSpeakingIndicators$: Behavior = this.layout$ - .pipe( + public showSpeakingIndicators$ = this.scope.behavior( + this.layout$.pipe( switchMap((l) => { switch (l.type) { case "spotlight-landscape": @@ -1175,29 +1178,30 @@ export class CallViewModel extends ViewModel { return of(true); } }), - ) - .behavior(this.scope); + ), + ); - public readonly toggleSpotlightExpanded$: Behavior<(() => void) | null> = - this.windowMode$ - .pipe( - switchMap((mode) => - mode === "normal" - ? this.layout$.pipe( - map( - (l) => - l.type === "spotlight-landscape" || - l.type === "spotlight-expanded", - ), - ) - : of(false), - ), - distinctUntilChanged(), - map((enabled) => - enabled ? (): void => this.spotlightExpandedToggle$.next() : null, - ), - ) - .behavior(this.scope); + public readonly toggleSpotlightExpanded$ = this.scope.behavior< + (() => void) | null + >( + this.windowMode$.pipe( + switchMap((mode) => + mode === "normal" + ? this.layout$.pipe( + map( + (l) => + l.type === "spotlight-landscape" || + l.type === "spotlight-expanded", + ), + ) + : of(false), + ), + distinctUntilChanged(), + map((enabled) => + enabled ? (): void => this.spotlightExpandedToggle$.next() : null, + ), + ), + ); private readonly screenTap$ = new Subject(); private readonly controlsTap$ = new Subject(); @@ -1232,12 +1236,12 @@ export class CallViewModel extends ViewModel { this.screenUnhover$.next(); } - public readonly showHeader$: Behavior = this.windowMode$ - .pipe(map((mode) => mode !== "pip" && mode !== "flat")) - .behavior(this.scope); + public readonly showHeader$ = this.scope.behavior( + this.windowMode$.pipe(map((mode) => mode !== "pip" && mode !== "flat")), + ); - public readonly showFooter$: Behavior = this.windowMode$ - .pipe( + public readonly showFooter$ = this.scope.behavior( + this.windowMode$.pipe( switchMap((mode) => { switch (mode) { case "pip": @@ -1288,20 +1292,23 @@ export class CallViewModel extends ViewModel { ); } }), - ) - .behavior(this.scope); + ), + ); /** * Whether audio is currently being output through the earpiece. */ - public readonly earpieceMode$: Behavior = combineLatest( - [ - this.mediaDevices.audioOutput.available$, - this.mediaDevices.audioOutput.selected$, - ], - (available, selected) => - selected !== undefined && available.get(selected.id)?.type === "earpiece", - ).behavior(this.scope); + public readonly earpieceMode$ = this.scope.behavior( + combineLatest( + [ + this.mediaDevices.audioOutput.available$, + this.mediaDevices.audioOutput.selected$, + ], + (available, selected) => + selected !== undefined && + available.get(selected.id)?.type === "earpiece", + ), + ); /** * Callback to toggle between the earpiece and the loudspeaker. @@ -1309,38 +1316,40 @@ export class CallViewModel extends ViewModel { * This will be `null` in case the target does not exist in the list * of available audio outputs. */ - public readonly audioOutputSwitcher$: Behavior<{ + public readonly audioOutputSwitcher$ = this.scope.behavior<{ targetOutput: "earpiece" | "speaker"; switch: () => void; - } | null> = combineLatest( - [ - this.mediaDevices.audioOutput.available$, - this.mediaDevices.audioOutput.selected$, - ], - (available, selected) => { - const selectionType = selected && available.get(selected.id)?.type; + } | null>( + combineLatest( + [ + this.mediaDevices.audioOutput.available$, + this.mediaDevices.audioOutput.selected$, + ], + (available, selected) => { + const selectionType = selected && available.get(selected.id)?.type; - // If we are in any output mode other than spaeker switch to speaker. - const newSelectionType: "earpiece" | "speaker" = - selectionType === "speaker" ? "earpiece" : "speaker"; - const newSelection = [...available].find( - ([, d]) => d.type === newSelectionType, - ); - if (newSelection === undefined) return null; + // If we are in any output mode other than spaeker switch to speaker. + const newSelectionType: "earpiece" | "speaker" = + selectionType === "speaker" ? "earpiece" : "speaker"; + const newSelection = [...available].find( + ([, d]) => d.type === newSelectionType, + ); + if (newSelection === undefined) return null; - const [id] = newSelection; - return { - targetOutput: newSelectionType, - switch: () => this.mediaDevices.audioOutput.select(id), - }; - }, - ).behavior(this.scope); + const [id] = newSelection; + return { + targetOutput: newSelectionType, + switch: (): void => this.mediaDevices.audioOutput.select(id), + }; + }, + ), + ); /** * Emits an array of reactions that should be visible on the screen. */ - public readonly visibleReactions$ = showReactions.value$ - .pipe( + public readonly visibleReactions$ = this.scope.behavior( + showReactions.value$.pipe( switchMap((show) => (show ? this.reactions$ : of({}))), scan< Record, @@ -1355,8 +1364,8 @@ export class CallViewModel extends ViewModel { } return newSet; }, []), - ) - .behavior(this.scope); + ), + ); /** * Emits an array of reactions that should be played. diff --git a/src/state/MediaDevices.ts b/src/state/MediaDevices.ts index b27120b5..a3a2dc1e 100644 --- a/src/state/MediaDevices.ts +++ b/src/state/MediaDevices.ts @@ -106,8 +106,8 @@ function availableRawDevices$( const devices$ = createMediaDeviceObserver(kind, logError, false); const devicesWithNames$ = createMediaDeviceObserver(kind, logError, true); - return usingNames$ - .pipe( + return scope.behavior( + usingNames$.pipe( switchMap((withNames) => withNames ? // It might be that there is already a media stream running somewhere, @@ -123,8 +123,8 @@ function availableRawDevices$( : devices$, ), startWith([]), - ) - .behavior(scope); + ), + ); } function buildDeviceMap( @@ -165,15 +165,12 @@ class AudioInput implements MediaDevice { private readonly availableRaw$: Behavior = availableRawDevices$("audioinput", this.usingNames$, this.scope); - public readonly available$ = this.availableRaw$ - .pipe(map(buildDeviceMap)) - .behavior(this.scope); + public readonly available$ = this.scope.behavior( + this.availableRaw$.pipe(map(buildDeviceMap)), + ); - public readonly selected$ = selectDevice$( - this.available$, - audioInputSetting.value$, - ) - .pipe( + public readonly selected$ = this.scope.behavior( + selectDevice$(this.available$, audioInputSetting.value$).pipe( map((id) => id === undefined ? undefined @@ -191,8 +188,8 @@ class AudioInput implements MediaDevice { ), }, ), - ) - .behavior(this.scope); + ), + ); public select(id: string): void { audioInputSetting.setValue(id); @@ -211,12 +208,8 @@ class AudioInput implements MediaDevice { class AudioOutput implements MediaDevice { - public readonly available$ = availableRawDevices$( - "audiooutput", - this.usingNames$, - this.scope, - ) - .pipe( + public readonly available$ = this.scope.behavior( + availableRawDevices$("audiooutput", this.usingNames$, this.scope).pipe( map((availableRaw) => { const available: Map = buildDeviceMap(availableRaw); @@ -233,14 +226,11 @@ class AudioOutput // automatically track the default device. return available; }), - ) - .behavior(this.scope); + ), + ); - public readonly selected$ = selectDevice$( - this.available$, - audioOutputSetting.value$, - ) - .pipe( + public readonly selected$ = this.scope.behavior( + selectDevice$(this.available$, audioOutputSetting.value$).pipe( map((id) => id === undefined ? undefined @@ -249,8 +239,8 @@ class AudioOutput virtualEarpiece: false, }, ), - ) - .behavior(this.scope); + ), + ); public select(id: string): void { audioOutputSetting.setValue(id); } @@ -268,30 +258,32 @@ class AudioOutput class ControlledAudioOutput implements MediaDevice { - public readonly available$ = combineLatest( - [controlledAvailableOutputDevices$.pipe(startWith([])), iosDeviceMenu$], - (availableRaw, iosDeviceMenu) => { - const available = new Map( - availableRaw.map( - ({ id, name, isEarpiece, isSpeaker /*,isExternalHeadset*/ }) => { - let deviceLabel: AudioOutputDeviceLabel; - // if (isExternalHeadset) // Do we want this? - if (isEarpiece) deviceLabel = { type: "earpiece" }; - else if (isSpeaker) deviceLabel = { type: "speaker" }; - else deviceLabel = { type: "name", name }; - return [id, deviceLabel]; - }, - ), - ); + public readonly available$ = this.scope.behavior( + combineLatest( + [controlledAvailableOutputDevices$.pipe(startWith([])), iosDeviceMenu$], + (availableRaw, iosDeviceMenu) => { + const available = new Map( + availableRaw.map( + ({ id, name, isEarpiece, isSpeaker /*,isExternalHeadset*/ }) => { + let deviceLabel: AudioOutputDeviceLabel; + // if (isExternalHeadset) // Do we want this? + if (isEarpiece) deviceLabel = { type: "earpiece" }; + else if (isSpeaker) deviceLabel = { type: "speaker" }; + else deviceLabel = { type: "name", name }; + return [id, deviceLabel]; + }, + ), + ); - // Create a virtual earpiece device in case a non-earpiece device is - // designated for this purpose - if (iosDeviceMenu && availableRaw.some((d) => d.forEarpiece)) - available.set(EARPIECE_CONFIG_ID, { type: "earpiece" }); + // Create a virtual earpiece device in case a non-earpiece device is + // designated for this purpose + if (iosDeviceMenu && availableRaw.some((d) => d.forEarpiece)) + available.set(EARPIECE_CONFIG_ID, { type: "earpiece" }); - return available; - }, - ).behavior(this.scope); + return available; + }, + ), + ); private readonly deviceSelection$ = new Subject(); @@ -299,21 +291,23 @@ class ControlledAudioOutput this.deviceSelection$.next(id); } - public readonly selected$ = combineLatest( - [ - this.available$, - merge( - controlledOutputSelection$.pipe(startWith(undefined)), - this.deviceSelection$, - ), - ], - (available, preferredId) => { - const id = preferredId ?? available.keys().next().value; - return id === undefined - ? undefined - : { id, virtualEarpiece: id === EARPIECE_CONFIG_ID }; - }, - ).behavior(this.scope); + public readonly selected$ = this.scope.behavior( + combineLatest( + [ + this.available$, + merge( + controlledOutputSelection$.pipe(startWith(undefined)), + this.deviceSelection$, + ), + ], + (available, preferredId) => { + const id = preferredId ?? available.keys().next().value; + return id === undefined + ? undefined + : { id, virtualEarpiece: id === EARPIECE_CONFIG_ID }; + }, + ), + ); public constructor(private readonly scope: ObservableScope) { this.selected$.subscribe((device) => { @@ -335,19 +329,16 @@ class ControlledAudioOutput } class VideoInput implements MediaDevice { - public readonly available$ = availableRawDevices$( - "videoinput", - this.usingNames$, - this.scope, - ) - .pipe(map(buildDeviceMap)) - .behavior(this.scope); - public readonly selected$ = selectDevice$( - this.available$, - videoInputSetting.value$, - ) - .pipe(map((id) => (id === undefined ? undefined : { id }))) - .behavior(this.scope); + public readonly available$ = this.scope.behavior( + availableRawDevices$("videoinput", this.usingNames$, this.scope).pipe( + map(buildDeviceMap), + ), + ); + public readonly selected$ = this.scope.behavior( + selectDevice$(this.available$, videoInputSetting.value$).pipe( + map((id) => (id === undefined ? undefined : { id })), + ), + ); public select(id: string): void { videoInputSetting.setValue(id); } @@ -381,12 +372,12 @@ export class MediaDevices { // you to do to receive device names in lieu of a more explicit permissions // API. This flag never resets to false, because once permissions are granted // the first time, the user won't be prompted again until reload of the page. - private readonly usingNames$ = this.deviceNamesRequest$ - .pipe( + private readonly usingNames$ = this.scope.behavior( + this.deviceNamesRequest$.pipe( map(() => true), startWith(false), - ) - .behavior(this.scope); + ), + ); public readonly audioInput: MediaDevice< DeviceLabel, SelectedAudioInputDevice diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index def56633..f5953887 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -232,13 +232,13 @@ abstract class BaseMediaViewModel extends ViewModel { private observeTrackReference$( source: Track.Source, ): Behavior { - return this.participant$ - .pipe( + return this.scope.behavior( + this.participant$.pipe( switchMap((p) => p === undefined ? of(undefined) : observeTrackReference$(p, source), ), - ) - .behavior(this.scope); + ), + ); } public constructor( @@ -269,16 +269,18 @@ abstract class BaseMediaViewModel extends ViewModel { const audio$ = this.observeTrackReference$(audioSource); this.video$ = this.observeTrackReference$(videoSource); - this.unencryptedWarning$ = combineLatest( - [audio$, this.video$], - (a, v) => - encryptionSystem.kind !== E2eeType.NONE && - (a?.publication?.isEncrypted === false || - v?.publication?.isEncrypted === false), - ).behavior(this.scope); + this.unencryptedWarning$ = this.scope.behavior( + combineLatest( + [audio$, this.video$], + (a, v) => + encryptionSystem.kind !== E2eeType.NONE && + (a?.publication?.isEncrypted === false || + v?.publication?.isEncrypted === false), + ), + ); - this.encryptionStatus$ = this.participant$ - .pipe( + this.encryptionStatus$ = this.scope.behavior( + this.participant$.pipe( switchMap((participant): Observable => { if (!participant) { return of(EncryptionStatus.Connecting); @@ -338,8 +340,8 @@ abstract class BaseMediaViewModel extends ViewModel { ); } }), - ) - .behavior(this.scope); + ), + ); } } @@ -358,8 +360,8 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { /** * Whether the participant is speaking. */ - public readonly speaking$ = this.participant$ - .pipe( + public readonly speaking$ = this.scope.behavior( + this.participant$.pipe( switchMap((p) => p ? observeParticipantEvents( @@ -368,8 +370,8 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { ).pipe(map((p) => p.isSpeaking)) : of(false), ), - ) - .behavior(this.scope); + ), + ); /** * Whether this participant is sending audio (i.e. is unmuted on their side). @@ -407,17 +409,17 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { displayName$, ); - const media$ = participant$ - .pipe( + const media$ = this.scope.behavior( + participant$.pipe( switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)), - ) - .behavior(this.scope); - this.audioEnabled$ = media$ - .pipe(map((m) => m?.microphoneTrack?.isMuted === false)) - .behavior(this.scope); - this.videoEnabled$ = media$ - .pipe(map((m) => m?.cameraTrack?.isMuted === false)) - .behavior(this.scope); + ), + ); + this.audioEnabled$ = this.scope.behavior( + media$.pipe(map((m) => m?.microphoneTrack?.isMuted === false)), + ); + this.videoEnabled$ = this.scope.behavior( + media$.pipe(map((m) => m?.cameraTrack?.isMuted === false)), + ); } public toggleFitContain(): void { @@ -443,8 +445,8 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { /** * Whether the video should be mirrored. */ - public readonly mirror$ = this.video$ - .pipe( + public readonly mirror$ = this.scope.behavior( + this.video$.pipe( switchMap((v) => { const track = v?.publication?.track; if (!(track instanceof LocalTrack)) return of(false); @@ -455,8 +457,8 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { map(() => facingModeFromLocalTrack(track).facingMode === "user"), ); }), - ) - .behavior(this.scope); + ), + ); /** * Whether to show this tile in a highly visible location near the start of @@ -520,12 +522,12 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { * The volume to which this participant's audio is set, as a scalar * multiplier. */ - public readonly localVolume$: Behavior = merge( - this.locallyMutedToggle$.pipe(map(() => "toggle mute" as const)), - this.localVolumeAdjustment$, - this.localVolumeCommit$.pipe(map(() => "commit" as const)), - ) - .pipe( + public readonly localVolume$ = this.scope.behavior( + merge( + this.locallyMutedToggle$.pipe(map(() => "toggle mute" as const)), + this.localVolumeAdjustment$, + this.localVolumeCommit$.pipe(map(() => "commit" as const)), + ).pipe( accumulate({ volume: 1, committedVolume: 1 }, (state, event) => { switch (event) { case "toggle mute": @@ -548,15 +550,15 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { } }), map(({ volume }) => volume), - ) - .behavior(this.scope); + ), + ); /** * Whether this participant's audio is disabled. */ - public readonly locallyMuted$: Behavior = this.localVolume$ - .pipe(map((volume) => volume === 0)) - .behavior(this.scope); + public readonly locallyMuted$ = this.scope.behavior( + this.localVolume$.pipe(map((volume) => volume === 0)), + ); public constructor( id: string, diff --git a/src/state/MuteAllAudioModel.ts b/src/state/MuteAllAudioModel.ts index 00efca98..46c29815 100644 --- a/src/state/MuteAllAudioModel.ts +++ b/src/state/MuteAllAudioModel.ts @@ -10,12 +10,13 @@ import { combineLatest, startWith } from "rxjs"; import { setAudioEnabled$ } from "../controls"; import { muteAllAudio as muteAllAudioSetting } from "../settings/settings"; import { globalScope } from "./ObservableScope"; -import "../state/Behavior"; // Patches in the Observable.behavior method /** * This can transition into sth more complete: `GroupCallViewModel.ts` */ -export const muteAllAudio$ = combineLatest( - [setAudioEnabled$.pipe(startWith(true)), muteAllAudioSetting.value$], - (outputEnabled, settingsMute) => !outputEnabled || settingsMute, -).behavior(globalScope); +export const muteAllAudio$ = globalScope.behavior( + combineLatest( + [setAudioEnabled$.pipe(startWith(true)), muteAllAudioSetting.value$], + (outputEnabled, settingsMute) => !outputEnabled || settingsMute, + ), +); diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index b44301b8..14a470b5 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -5,10 +5,20 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type Observable, Subject, takeUntil } from "rxjs"; +import { + BehaviorSubject, + distinctUntilChanged, + type Observable, + Subject, + takeUntil, +} from "rxjs"; + +import { type Behavior } from "./Behavior"; type MonoTypeOperator = (o: Observable) => Observable; +const nothing = Symbol("nothing"); + /** * A scope which limits the execution lifetime of its bound Observables. */ @@ -25,6 +35,33 @@ export class ObservableScope { return this.bindImpl; } + /** + * Converts an Observable to a Behavior. If no initial value is specified, the + * Observable must synchronously emit an initial value. + */ + public behavior( + setValue$: Observable, + initialValue: T | typeof nothing = nothing, + ): Behavior { + const subject$ = new BehaviorSubject(initialValue); + // Push values from the Observable into the BehaviorSubject. + // BehaviorSubjects have an undesirable feature where if you call 'complete', + // they will no longer re-emit their current value upon subscription. We want + // to support Observables that complete (for example `of({})`), so we have to + // take care to not propagate the completion event. + setValue$.pipe(this.bind(), distinctUntilChanged()).subscribe({ + next(value) { + subject$.next(value); + }, + error(err: unknown) { + subject$.error(err); + }, + }); + if (subject$.value === nothing) + throw new Error("Behavior failed to synchronously emit an initial value"); + return subject$ as Behavior; + } + /** * Ends the scope, causing any bound Observables to complete. */ diff --git a/src/utils/test.ts b/src/utils/test.ts index 3bcfa45f..01cadd85 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -124,10 +124,11 @@ export function withTestScheduler( const initialValue = values === undefined ? (initialMarble as T) : values[initialMarble]; // The remainder of the marble diagram should start on frame 1 - return helpers - .hot(`-${marbles.slice(initialMarbleIndex + 1)}`, values, error) - .pipe(startWith(initialValue)) - .behavior(scope); + return scope.behavior( + helpers + .hot(`-${marbles.slice(initialMarbleIndex + 1)}`, values, error) + .pipe(startWith(initialValue)), + ); }, }), );