From 35ed313577bcf5e1e4247812c1a97f92e2e9a46d Mon Sep 17 00:00:00 2001 From: Robin Date: Wed, 18 Jun 2025 17:14:21 -0400 Subject: [PATCH] Replace ObservableScope.state with Observable.behavior --- src/state/CallViewModel.ts | 1008 +++++++++++++++++----------------- src/state/MediaViewModel.ts | 286 +++++----- src/state/ObservableScope.ts | 24 +- src/utils/test.ts | 9 +- 4 files changed, 670 insertions(+), 657 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index fc1222c4..a9bf5413 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -94,6 +94,7 @@ import { observeSpeaker$ } from "./observeSpeaker"; import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; +import { type Behavior } from "./Behavior"; // How long we wait after a focus switch before showing the real participant // list again @@ -271,9 +272,9 @@ class UserMedia { this.participant$.asObservable() as Observable, encryptionSystem, livekitRoom, - displayname$, - handRaised$, - reaction$, + displayname$.behavior(this.scope), + handRaised$.behavior(this.scope), + reaction$.behavior(this.scope), ); } else { this.vm = new RemoteUserMediaViewModel( @@ -284,29 +285,30 @@ class UserMedia { >, encryptionSystem, livekitRoom, - displayname$, - handRaised$, - reaction$, + displayname$.behavior(this.scope), + handRaised$.behavior(this.scope), + reaction$.behavior(this.scope), ); } - this.speaker$ = observeSpeaker$(this.vm.speaking$).pipe(this.scope.state()); + this.speaker$ = observeSpeaker$(this.vm.speaking$).behavior(this.scope); - this.presenter$ = this.participant$.pipe( - switchMap( - (p) => - (p && - observeParticipantEvents( - p, - ParticipantEvent.TrackPublished, - ParticipantEvent.TrackUnpublished, - ParticipantEvent.LocalTrackPublished, - ParticipantEvent.LocalTrackUnpublished, - ).pipe(map((p) => p.isScreenShareEnabled))) ?? - of(false), - ), - this.scope.state(), - ); + this.presenter$ = this.participant$ + .pipe( + switchMap( + (p) => + (p && + observeParticipantEvents( + p, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ).pipe(map((p) => p.isScreenShareEnabled))) ?? + of(false), + ), + ) + .behavior(this.scope); } public updateParticipant( @@ -325,6 +327,7 @@ class UserMedia { } class ScreenShare { + private readonly scope = new ObservableScope(); public readonly vm: ScreenShareViewModel; private readonly participant$: BehaviorSubject< LocalParticipant | RemoteParticipant @@ -346,12 +349,13 @@ class ScreenShare { this.participant$.asObservable(), encryptionSystem, liveKitRoom, - displayname$, + displayname$.behavior(this.scope), participant.isLocal, ); } public destroy(): void { + this.scope.end(); this.vm.destroy(); } } @@ -397,7 +401,7 @@ export class CallViewModel extends ViewModel { * The raw list of RemoteParticipants as reported by LiveKit */ private readonly rawRemoteParticipants$: Observable = - connectedParticipantsObserver(this.livekitRoom).pipe(this.scope.state()); + connectedParticipantsObserver(this.livekitRoom).behavior(this.scope); /** * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that @@ -471,38 +475,42 @@ export class CallViewModel extends ViewModel { fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged), // Handle room membership changes (and displayname updates) fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members), - ).pipe( - startWith(null), - map(() => { - const displaynameMap = new Map(); - const { room, memberships } = this.matrixRTCSession; + ) + .pipe( + startWith(null), + map(() => { + const displaynameMap = new Map(); + const { room, memberships } = this.matrixRTCSession; - // We only consider RTC members for disambiguation as they are the only visible members. - for (const rtcMember of memberships) { - const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; - const { member } = getRoomMemberFromRtcMember(rtcMember, room); - if (!member) { - logger.error("Could not find member for media id:", matrixIdentifier); - continue; + // We only consider RTC members for disambiguation as they are the only visible members. + for (const rtcMember of memberships) { + const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; + const { member } = getRoomMemberFromRtcMember(rtcMember, room); + if (!member) { + logger.error( + "Could not find member for media id:", + matrixIdentifier, + ); + continue; + } + const disambiguate = shouldDisambiguate(member, memberships, room); + displaynameMap.set( + matrixIdentifier, + calculateDisplayName(member, disambiguate), + ); } - const disambiguate = shouldDisambiguate(member, memberships, room); - displaynameMap.set( - matrixIdentifier, - calculateDisplayName(member, disambiguate), - ); - } - return displaynameMap; - }), - // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower - // than on Chrome/Firefox). This means it is important that we share() the result so that we - // don't do this work more times than we need to. This is achieve through the state() operator: - this.scope.state(), - ); + return displaynameMap; + }), + // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower + // than on Chrome/Firefox). This means it is important that we multicast the result so that we + // don't do this work more times than we need to. This is achieved by converting to a behavior: + ) + .behavior(this.scope); /** * List of MediaItems that we want to display */ - private readonly mediaItems$: Observable = combineLatest([ + private readonly mediaItems$: Behavior = combineLatest([ this.remoteParticipants$, observeParticipantMedia(this.livekitRoom.localParticipant), duplicateTiles.value$, @@ -514,90 +522,68 @@ export class CallViewModel extends ViewModel { MatrixRTCSessionEvent.MembershipsChanged, ).pipe(startWith(null)), showNonMemberTiles.value$, - ]).pipe( - scan( - ( - prevItems, - [ - remoteParticipants, - { participant: localParticipant }, - duplicateTiles, - _membershipsChanged, - showNonMemberTiles, - ], - ) => { - const newItems = new Map( - function* (this: CallViewModel): Iterable<[string, MediaItem]> { - const room = this.matrixRTCSession.room; - // m.rtc.members are the basis for calculating what is visible in the call - for (const rtcMember of this.matrixRTCSession.memberships) { - const { member, id: livekitParticipantId } = - getRoomMemberFromRtcMember(rtcMember, room); - const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; + ]) + .pipe( + scan( + ( + prevItems, + [ + remoteParticipants, + { participant: localParticipant }, + duplicateTiles, + _membershipsChanged, + showNonMemberTiles, + ], + ) => { + const newItems = new Map( + function* (this: CallViewModel): Iterable<[string, MediaItem]> { + const room = this.matrixRTCSession.room; + // m.rtc.members are the basis for calculating what is visible in the call + for (const rtcMember of this.matrixRTCSession.memberships) { + const { member, id: livekitParticipantId } = + getRoomMemberFromRtcMember(rtcMember, room); + const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; - let participant: - | LocalParticipant - | RemoteParticipant - | undefined = undefined; - if (livekitParticipantId === "local") { - participant = localParticipant; - } else { - participant = remoteParticipants.find( - (p) => p.identity === livekitParticipantId, - ); - } - - if (!member) { - logger.error( - "Could not find member for media id: ", - livekitParticipantId, - ); - } - for (let i = 0; i < 1 + duplicateTiles; i++) { - const indexedMediaId = `${livekitParticipantId}:${i}`; - let prevMedia = prevItems.get(indexedMediaId); - if (prevMedia && prevMedia instanceof UserMedia) { - prevMedia.updateParticipant(participant); - if (prevMedia.vm.member === undefined) { - // We have a previous media created because of the `debugShowNonMember` flag. - // In this case we actually replace the media item. - // This "hack" never occurs if we do not use the `debugShowNonMember` debugging - // option and if we always find a room member for each rtc member (which also - // only fails if we have a fundamental problem) - prevMedia = undefined; - } + let participant: + | LocalParticipant + | RemoteParticipant + | undefined = undefined; + if (livekitParticipantId === "local") { + participant = localParticipant; + } else { + participant = remoteParticipants.find( + (p) => p.identity === livekitParticipantId, + ); } - yield [ - indexedMediaId, - // We create UserMedia with or without a participant. - // This will be the initial value of a BehaviourSubject. - // Once a participant appears we will update the BehaviourSubject. (see above) - prevMedia ?? - new UserMedia( - indexedMediaId, - member, - participant, - this.encryptionSystem, - this.livekitRoom, - this.memberDisplaynames$.pipe( - map((m) => m.get(matrixIdentifier) ?? "[👻]"), - ), - this.handsRaised$.pipe( - map((v) => v[matrixIdentifier]?.time ?? null), - ), - this.reactions$.pipe( - map((v) => v[matrixIdentifier] ?? undefined), - ), - ), - ]; - if (participant?.isScreenShareEnabled) { - const screenShareId = `${indexedMediaId}:screen-share`; + if (!member) { + logger.error( + "Could not find member for media id: ", + livekitParticipantId, + ); + } + for (let i = 0; i < 1 + duplicateTiles; i++) { + const indexedMediaId = `${livekitParticipantId}:${i}`; + let prevMedia = prevItems.get(indexedMediaId); + if (prevMedia && prevMedia instanceof UserMedia) { + prevMedia.updateParticipant(participant); + if (prevMedia.vm.member === undefined) { + // We have a previous media created because of the `debugShowNonMember` flag. + // In this case we actually replace the media item. + // This "hack" never occurs if we do not use the `debugShowNonMember` debugging + // option and if we always find a room member for each rtc member (which also + // only fails if we have a fundamental problem) + prevMedia = undefined; + } + } yield [ - screenShareId, - prevItems.get(screenShareId) ?? - new ScreenShare( - screenShareId, + indexedMediaId, + // We create UserMedia with or without a participant. + // This will be the initial value of a BehaviourSubject. + // Once a participant appears we will update the BehaviourSubject. (see above) + prevMedia ?? + new UserMedia( + indexedMediaId, member, participant, this.encryptionSystem, @@ -605,77 +591,103 @@ export class CallViewModel extends ViewModel { this.memberDisplaynames$.pipe( map((m) => m.get(matrixIdentifier) ?? "[👻]"), ), + this.handsRaised$.pipe( + map((v) => v[matrixIdentifier]?.time ?? null), + ), + this.reactions$.pipe( + map((v) => v[matrixIdentifier] ?? undefined), + ), ), ]; - } - } - } - }.bind(this)(), - ); - // Generate non member items (items without a corresponding MatrixRTC member) - // Those items should not be rendered, they are participants in LiveKit that do not have a corresponding - // MatrixRTC members. This cannot be any good: - // - A malicious user impersonates someone - // - Someone injects abusive content - // - The user cannot have encryption keys so it makes no sense to participate - // We can only trust users that have a MatrixRTC member event. - // - // This is still available as a debug option. This can be useful - // - If one wants to test scalability using the LiveKit CLI. - // - If an experimental project does not yet do the MatrixRTC bits. - // - If someone wants to debug if the LiveKit connection works but MatrixRTC room state failed to arrive. - const newNonMemberItems = showNonMemberTiles - ? new Map( - function* (this: CallViewModel): Iterable<[string, MediaItem]> { - for (const participant of remoteParticipants) { - for (let i = 0; i < 1 + duplicateTiles; i++) { - const maybeNonMemberParticipantId = - participant.identity + ":" + i; - if (!newItems.has(maybeNonMemberParticipantId)) { - const nonMemberId = maybeNonMemberParticipantId; - yield [ - nonMemberId, - prevItems.get(nonMemberId) ?? - new UserMedia( - nonMemberId, - undefined, - participant, - this.encryptionSystem, - this.livekitRoom, - this.memberDisplaynames$.pipe( - map((m) => m.get(participant.identity) ?? "[👻]"), - ), - of(null), - of(null), + if (participant?.isScreenShareEnabled) { + const screenShareId = `${indexedMediaId}:screen-share`; + yield [ + screenShareId, + prevItems.get(screenShareId) ?? + new ScreenShare( + screenShareId, + member, + participant, + this.encryptionSystem, + this.livekitRoom, + this.memberDisplaynames$.pipe( + map((m) => m.get(matrixIdentifier) ?? "[👻]"), ), - ]; - } + ), + ]; } } - }.bind(this)(), - ) - : new Map(); - if (newNonMemberItems.size > 0) { - logger.debug("Added NonMember items: ", newNonMemberItems); - } + } + }.bind(this)(), + ); - const combinedNew = new Map([ - ...newNonMemberItems.entries(), - ...newItems.entries(), - ]); + // Generate non member items (items without a corresponding MatrixRTC member) + // Those items should not be rendered, they are participants in LiveKit that do not have a corresponding + // MatrixRTC members. This cannot be any good: + // - A malicious user impersonates someone + // - Someone injects abusive content + // - The user cannot have encryption keys so it makes no sense to participate + // We can only trust users that have a MatrixRTC member event. + // + // This is still available as a debug option. This can be useful + // - If one wants to test scalability using the LiveKit CLI. + // - If an experimental project does not yet do the MatrixRTC bits. + // - If someone wants to debug if the LiveKit connection works but MatrixRTC room state failed to arrive. + const newNonMemberItems = showNonMemberTiles + ? new Map( + function* (this: CallViewModel): Iterable<[string, MediaItem]> { + for (const participant of remoteParticipants) { + for (let i = 0; i < 1 + duplicateTiles; i++) { + const maybeNonMemberParticipantId = + participant.identity + ":" + i; + if (!newItems.has(maybeNonMemberParticipantId)) { + const nonMemberId = maybeNonMemberParticipantId; + yield [ + nonMemberId, + prevItems.get(nonMemberId) ?? + new UserMedia( + nonMemberId, + undefined, + participant, + this.encryptionSystem, + this.livekitRoom, + this.memberDisplaynames$.pipe( + map( + (m) => m.get(participant.identity) ?? "[👻]", + ), + ), + of(null), + of(null), + ), + ]; + } + } + } + }.bind(this)(), + ) + : new Map(); + if (newNonMemberItems.size > 0) { + logger.debug("Added NonMember items: ", newNonMemberItems); + } - for (const [id, t] of prevItems) if (!combinedNew.has(id)) t.destroy(); - return combinedNew; - }, - new Map(), - ), - map((mediaItems) => [...mediaItems.values()]), - finalizeValue((ts) => { - for (const t of ts) t.destroy(); - }), - this.scope.state(), - ); + const combinedNew = new Map([ + ...newNonMemberItems.entries(), + ...newItems.entries(), + ]); + + for (const [id, t] of prevItems) + if (!combinedNew.has(id)) t.destroy(); + return combinedNew; + }, + new Map(), + ), + map((mediaItems) => [...mediaItems.values()]), + finalizeValue((ts) => { + for (const t of ts) t.destroy(); + }), + ) + .behavior(this.scope); /** * List of MediaItems that we want to display, that are of type UserMedia @@ -702,52 +714,53 @@ export class CallViewModel extends ViewModel { /** * List of MediaItems that we want to display, that are of type ScreenShare */ - private readonly screenShares$: Observable = - this.mediaItems$.pipe( + private readonly screenShares$: Behavior = this.mediaItems$ + .pipe( map((mediaItems) => mediaItems.filter((m): m is ScreenShare => m instanceof ScreenShare), ), - this.scope.state(), - ); + ) + .behavior(this.scope); - private readonly spotlightSpeaker$: Observable = - this.userMedia$.pipe( - switchMap((mediaItems) => - mediaItems.length === 0 - ? of([]) - : combineLatest( - mediaItems.map((m) => - m.vm.speaking$.pipe(map((s) => [m, s] as const)), + private readonly spotlightSpeaker$: Behavior = + this.userMedia$ + .pipe( + switchMap((mediaItems) => + mediaItems.length === 0 + ? of([]) + : combineLatest( + mediaItems.map((m) => + m.vm.speaking$.pipe(map((s) => [m, s] as const)), + ), ), - ), - ), - scan<(readonly [UserMedia, boolean])[], UserMedia | undefined, null>( - (prev, mediaItems) => { - // Only remote users that are still in the call should be sticky - const [stickyMedia, stickySpeaking] = - (!prev?.vm.local && mediaItems.find(([m]) => m === prev)) || []; - // Decide who to spotlight: - // If the previous speaker is still speaking, stick with them rather - // than switching eagerly to someone else - return stickySpeaking - ? stickyMedia! - : // Otherwise, select any remote user who is speaking - (mediaItems.find(([m, s]) => !m.vm.local && s)?.[0] ?? - // Otherwise, stick with the person who was last speaking - stickyMedia ?? - // Otherwise, spotlight an arbitrary remote user - mediaItems.find(([m]) => !m.vm.local)?.[0] ?? - // Otherwise, spotlight the local user - mediaItems.find(([m]) => m.vm.local)?.[0]); - }, - null, - ), - map((speaker) => speaker?.vm ?? null), - this.scope.state(), - ); + ), + scan<(readonly [UserMedia, boolean])[], UserMedia | undefined, null>( + (prev, mediaItems) => { + // Only remote users that are still in the call should be sticky + const [stickyMedia, stickySpeaking] = + (!prev?.vm.local && mediaItems.find(([m]) => m === prev)) || []; + // Decide who to spotlight: + // If the previous speaker is still speaking, stick with them rather + // than switching eagerly to someone else + return stickySpeaking + ? stickyMedia! + : // Otherwise, select any remote user who is speaking + (mediaItems.find(([m, s]) => !m.vm.local && s)?.[0] ?? + // Otherwise, stick with the person who was last speaking + stickyMedia ?? + // Otherwise, spotlight an arbitrary remote user + mediaItems.find(([m]) => !m.vm.local)?.[0] ?? + // Otherwise, spotlight the local user + mediaItems.find(([m]) => m.vm.local)?.[0]); + }, + null, + ), + map((speaker) => speaker?.vm ?? null), + ) + .behavior(this.scope); - private readonly grid$: Observable = - this.userMedia$.pipe( + private readonly grid$: Behavior = this.userMedia$ + .pipe( switchMap((mediaItems) => { const bins = mediaItems.map((m) => combineLatest( @@ -784,11 +797,11 @@ export class CallViewModel extends ViewModel { ); }), distinctUntilChanged(shallowEquals), - this.scope.state(), - ); + ) + .behavior(this.scope); - private readonly spotlight$: Observable = - this.screenShares$.pipe( + private readonly spotlight$: Behavior = this.screenShares$ + .pipe( switchMap((screenShares) => { if (screenShares.length > 0) { return of(screenShares.map((m) => m.vm)); @@ -799,45 +812,46 @@ export class CallViewModel extends ViewModel { ); }), distinctUntilChanged(shallowEquals), - this.scope.state(), - ); + ) + .behavior(this.scope); - private readonly pip$: Observable = combineLatest([ + private readonly pip$: Behavior = combineLatest([ this.screenShares$, this.spotlightSpeaker$, this.mediaItems$, - ]).pipe( - switchMap(([screenShares, spotlight, mediaItems]) => { - if (screenShares.length > 0) { - return this.spotlightSpeaker$; - } - if (!spotlight || spotlight.local) { - return of(null); - } + ]) + .pipe( + switchMap(([screenShares, spotlight, mediaItems]) => { + if (screenShares.length > 0) { + return this.spotlightSpeaker$; + } + if (!spotlight || spotlight.local) { + return of(null); + } - const localUserMedia = mediaItems.find( - (m) => m.vm instanceof LocalUserMediaViewModel, - ) as UserMedia | undefined; + const localUserMedia = mediaItems.find( + (m) => m.vm instanceof LocalUserMediaViewModel, + ) as UserMedia | undefined; - const localUserMediaViewModel = localUserMedia?.vm as - | LocalUserMediaViewModel - | undefined; + const localUserMediaViewModel = localUserMedia?.vm as + | LocalUserMediaViewModel + | undefined; - if (!localUserMediaViewModel) { - return of(null); - } - return localUserMediaViewModel.alwaysShow$.pipe( - map((alwaysShow) => { - if (alwaysShow) { - return localUserMediaViewModel; - } + if (!localUserMediaViewModel) { + return of(null); + } + return localUserMediaViewModel.alwaysShow$.pipe( + map((alwaysShow) => { + if (alwaysShow) { + return localUserMediaViewModel; + } - return null; - }), - ); - }), - this.scope.state(), - ); + return null; + }), + ); + }), + ) + .behavior(this.scope); private readonly hasRemoteScreenShares$: Observable = this.spotlight$.pipe( @@ -851,64 +865,72 @@ export class CallViewModel extends ViewModel { startWith(false), ); - private readonly naturalWindowMode$: Observable = fromEvent( + private readonly naturalWindowMode$: Behavior = fromEvent( window, "resize", - ).pipe( - startWith(null), - map(() => { - const height = window.innerHeight; - const width = window.innerWidth; - if (height <= 400 && width <= 340) return "pip"; - // Our layouts for flat windows are better at adapting to a small width - // than our layouts for narrow windows are at adapting to a small height, - // so we give "flat" precedence here - if (height <= 600) return "flat"; - if (width <= 600) return "narrow"; - return "normal"; - }), - this.scope.state(), - ); + ) + .pipe( + startWith(null), + map(() => { + const height = window.innerHeight; + const width = window.innerWidth; + if (height <= 400 && width <= 340) return "pip"; + // Our layouts for flat windows are better at adapting to a small width + // than our layouts for narrow windows are at adapting to a small height, + // so we give "flat" precedence here + if (height <= 600) return "flat"; + if (width <= 600) return "narrow"; + return "normal"; + }), + ) + .behavior(this.scope); /** * The general shape of the window. */ - public readonly windowMode$: Observable = this.pipEnabled$.pipe( - switchMap((pip) => (pip ? of("pip") : this.naturalWindowMode$)), - ); + public readonly windowMode$: Behavior = this.pipEnabled$ + .pipe( + switchMap((pip) => + pip ? of("pip") : this.naturalWindowMode$, + ), + ) + .behavior(this.scope); private readonly spotlightExpandedToggle$ = new Subject(); - public readonly spotlightExpanded$: Observable = - this.spotlightExpandedToggle$.pipe( - accumulate(false, (expanded) => !expanded), - this.scope.state(), - ); + public readonly spotlightExpanded$: Behavior = + this.spotlightExpandedToggle$ + .pipe(accumulate(false, (expanded) => !expanded)) + .behavior(this.scope); private readonly gridModeUserSelection$ = new Subject(); /** * The layout mode of the media tile grid. */ - public readonly gridMode$: Observable = + public readonly gridMode$: Behavior = // If the user hasn't selected spotlight and somebody starts screen sharing, // automatically switch to spotlight mode and reset when screen sharing ends - this.gridModeUserSelection$.pipe( - startWith(null), - switchMap((userSelection) => - (userSelection === "spotlight" - ? EMPTY - : combineLatest([this.hasRemoteScreenShares$, this.windowMode$]).pipe( - skip(userSelection === null ? 0 : 1), - map( - ([hasScreenShares, windowMode]): GridMode => - hasScreenShares || windowMode === "flat" - ? "spotlight" - : "grid", - ), - ) - ).pipe(startWith(userSelection ?? "grid")), - ), - this.scope.state(), - ); + this.gridModeUserSelection$ + .pipe( + startWith(null), + switchMap((userSelection) => + (userSelection === "spotlight" + ? EMPTY + : combineLatest([ + this.hasRemoteScreenShares$, + this.windowMode$, + ]).pipe( + skip(userSelection === null ? 0 : 1), + map( + ([hasScreenShares, windowMode]): GridMode => + hasScreenShares || windowMode === "flat" + ? "spotlight" + : "grid", + ), + ) + ).pipe(startWith(userSelection ?? "grid")), + ), + ) + .behavior(this.scope); public setGridMode(value: GridMode): void { this.gridModeUserSelection$.next(value); @@ -969,8 +991,8 @@ export class CallViewModel extends ViewModel { /** * The media to be used to produce a layout. */ - private readonly layoutMedia$: Observable = - this.windowMode$.pipe( + private readonly layoutMedia$: Behavior = this.windowMode$ + .pipe( switchMap((windowMode) => { switch (windowMode) { case "normal": @@ -1032,8 +1054,8 @@ export class CallViewModel extends ViewModel { return this.pipLayoutMedia$; } }), - this.scope.state(), - ); + ) + .behavior(this.scope); // There is a cyclical dependency here: the layout algorithms want to know // which tiles are on screen, but to know which tiles are on screen we have to @@ -1043,117 +1065,116 @@ export class CallViewModel extends ViewModel { private readonly setVisibleTiles = (value: number): void => this.visibleTiles$.next(value); - public readonly layoutInternals$: Observable< + private readonly layoutInternals$: Behavior< LayoutScanState & { layout: Layout } > = combineLatest([ this.layoutMedia$, this.visibleTiles$.pipe(startWith(0), distinctUntilChanged()), - ]).pipe( - scan< - [LayoutMedia, number], - LayoutScanState & { layout: Layout }, - LayoutScanState - >( - ({ tiles: prevTiles }, [media, visibleTiles]) => { - let layout: Layout; - let newTiles: TileStore; - switch (media.type) { - case "grid": - case "spotlight-landscape": - case "spotlight-portrait": - [layout, newTiles] = gridLikeLayout( - media, - visibleTiles, - this.setVisibleTiles, - prevTiles, - ); - break; - case "spotlight-expanded": - [layout, newTiles] = spotlightExpandedLayout(media, prevTiles); - break; - case "one-on-one": - [layout, newTiles] = oneOnOneLayout(media, prevTiles); - break; - case "pip": - [layout, newTiles] = pipLayout(media, prevTiles); - break; - } + ]) + .pipe( + scan< + [LayoutMedia, number], + LayoutScanState & { layout: Layout }, + LayoutScanState + >( + ({ tiles: prevTiles }, [media, visibleTiles]) => { + let layout: Layout; + let newTiles: TileStore; + switch (media.type) { + case "grid": + case "spotlight-landscape": + case "spotlight-portrait": + [layout, newTiles] = gridLikeLayout( + media, + visibleTiles, + this.setVisibleTiles, + prevTiles, + ); + break; + case "spotlight-expanded": + [layout, newTiles] = spotlightExpandedLayout(media, prevTiles); + break; + case "one-on-one": + [layout, newTiles] = oneOnOneLayout(media, prevTiles); + break; + case "pip": + [layout, newTiles] = pipLayout(media, prevTiles); + break; + } - return { layout, tiles: newTiles }; - }, - { layout: null, tiles: TileStore.empty() }, - ), - this.scope.state(), - ); + return { layout, tiles: newTiles }; + }, + { layout: null, tiles: TileStore.empty() }, + ), + ) + .behavior(this.scope); /** * The layout of tiles in the call interface. */ - public readonly layout$: Observable = this.layoutInternals$.pipe( - map(({ layout }) => layout), - this.scope.state(), - ); + public readonly layout$: Behavior = this.layoutInternals$ + .pipe(map(({ layout }) => layout)) + .behavior(this.scope); /** * The current generation of the tile store, exposed for debugging purposes. */ - public readonly tileStoreGeneration$: Observable = - this.layoutInternals$.pipe( - map(({ tiles }) => tiles.generation), - this.scope.state(), - ); + public readonly tileStoreGeneration$: Behavior = this.layoutInternals$ + .pipe(map(({ tiles }) => tiles.generation)) + .behavior(this.scope); - public showSpotlightIndicators$: Observable = this.layout$.pipe( - map((l) => l.type !== "grid"), - this.scope.state(), - ); + public showSpotlightIndicators$: Behavior = this.layout$ + .pipe(map((l) => l.type !== "grid")) + .behavior(this.scope); - public showSpeakingIndicators$: Observable = this.layout$.pipe( - switchMap((l) => { - switch (l.type) { - case "spotlight-landscape": - case "spotlight-portrait": - // If the spotlight is showing the active speaker, we can do without - // speaking indicators as they're a redundant visual cue. But if - // screen sharing feeds are in the spotlight we still need them. - return l.spotlight.media$.pipe( - map((models: MediaViewModel[]) => - models.some((m) => m instanceof ScreenShareViewModel), - ), - ); - // In expanded spotlight layout, the active speaker is always shown in - // the picture-in-picture tile so there is no need for speaking - // indicators. And in one-on-one layout there's no question as to who is - // speaking. - case "spotlight-expanded": - case "one-on-one": - return of(false); - default: - return of(true); - } - }), - this.scope.state(), - ); - - public readonly toggleSpotlightExpanded$: Observable<(() => void) | null> = - this.windowMode$.pipe( - switchMap((mode) => - mode === "normal" - ? this.layout$.pipe( - map( - (l) => - l.type === "spotlight-landscape" || - l.type === "spotlight-expanded", + public showSpeakingIndicators$: Behavior = this.layout$ + .pipe( + switchMap((l) => { + switch (l.type) { + case "spotlight-landscape": + case "spotlight-portrait": + // If the spotlight is showing the active speaker, we can do without + // speaking indicators as they're a redundant visual cue. But if + // screen sharing feeds are in the spotlight we still need them. + return l.spotlight.media$.pipe( + map((models: MediaViewModel[]) => + models.some((m) => m instanceof ScreenShareViewModel), ), - ) - : of(false), - ), - distinctUntilChanged(), - map((enabled) => - enabled ? (): void => this.spotlightExpandedToggle$.next() : null, - ), - this.scope.state(), - ); + ); + // In expanded spotlight layout, the active speaker is always shown in + // the picture-in-picture tile so there is no need for speaking + // indicators. And in one-on-one layout there's no question as to who is + // speaking. + case "spotlight-expanded": + case "one-on-one": + return of(false); + default: + return of(true); + } + }), + ) + .behavior(this.scope); + + public readonly toggleSpotlightExpanded$: Behavior<(() => void) | null> = + this.windowMode$ + .pipe( + switchMap((mode) => + mode === "normal" + ? this.layout$.pipe( + map( + (l) => + l.type === "spotlight-landscape" || + l.type === "spotlight-expanded", + ), + ) + : of(false), + ), + distinctUntilChanged(), + map((enabled) => + enabled ? (): void => this.spotlightExpandedToggle$.next() : null, + ), + ) + .behavior(this.scope); private readonly screenTap$ = new Subject(); private readonly controlsTap$ = new Subject(); @@ -1188,64 +1209,64 @@ export class CallViewModel extends ViewModel { this.screenUnhover$.next(); } - public readonly showHeader$: Observable = this.windowMode$.pipe( - map((mode) => mode !== "pip" && mode !== "flat"), - this.scope.state(), - ); + public readonly showHeader$: Behavior = this.windowMode$ + .pipe(map((mode) => mode !== "pip" && mode !== "flat")) + .behavior(this.scope); - public readonly showFooter$: Observable = this.windowMode$.pipe( - switchMap((mode) => { - switch (mode) { - case "pip": - return of(false); - case "normal": - case "narrow": - return of(true); - case "flat": - // Sadly Firefox has some layering glitches that prevent the footer - // from appearing properly. They happen less often if we never hide - // the footer. - if (isFirefox()) return of(true); - // Show/hide the footer in response to interactions - return merge( - this.screenTap$.pipe(map(() => "tap screen" as const)), - this.controlsTap$.pipe(map(() => "tap controls" as const)), - this.screenHover$.pipe(map(() => "hover" as const)), - ).pipe( - switchScan((state, interaction) => { - switch (interaction) { - case "tap screen": - return state - ? // Toggle visibility on tap - of(false) - : // Hide after a timeout - timer(showFooterMs).pipe( - map(() => false), - startWith(true), - ); - case "tap controls": - // The user is interacting with things, so reset the timeout - return timer(showFooterMs).pipe( - map(() => false), - startWith(true), - ); - case "hover": - // Show on hover and hide after a timeout - return race( - timer(showFooterMs), - this.screenUnhover$.pipe(take(1)), - ).pipe( - map(() => false), - startWith(true), - ); - } - }, false), - startWith(false), - ); - } - }), - this.scope.state(), - ); + public readonly showFooter$: Behavior = this.windowMode$ + .pipe( + switchMap((mode) => { + switch (mode) { + case "pip": + return of(false); + case "normal": + case "narrow": + return of(true); + case "flat": + // Sadly Firefox has some layering glitches that prevent the footer + // from appearing properly. They happen less often if we never hide + // the footer. + if (isFirefox()) return of(true); + // Show/hide the footer in response to interactions + return merge( + this.screenTap$.pipe(map(() => "tap screen" as const)), + this.controlsTap$.pipe(map(() => "tap controls" as const)), + this.screenHover$.pipe(map(() => "hover" as const)), + ).pipe( + switchScan((state, interaction) => { + switch (interaction) { + case "tap screen": + return state + ? // Toggle visibility on tap + of(false) + : // Hide after a timeout + timer(showFooterMs).pipe( + map(() => false), + startWith(true), + ); + case "tap controls": + // The user is interacting with things, so reset the timeout + return timer(showFooterMs).pipe( + map(() => false), + startWith(true), + ); + case "hover": + // Show on hover and hide after a timeout + return race( + timer(showFooterMs), + this.screenUnhover$.pipe(take(1)), + ).pipe( + map(() => false), + startWith(true), + ); + } + }, false), + startWith(false), + ); + } + }), + ) + .behavior(this.scope); /** * Whether audio is currently being output through the earpiece. @@ -1292,35 +1313,42 @@ export class CallViewModel extends ViewModel { }, ); - public readonly reactions$ = this.reactionsSubject$.pipe( - map((v) => - Object.fromEntries( - Object.entries(v).map(([a, { reactionOption }]) => [a, reactionOption]), + public readonly reactions$ = this.reactionsSubject$ + .pipe( + map((v) => + Object.fromEntries( + Object.entries(v).map(([a, { reactionOption }]) => [ + a, + reactionOption, + ]), + ), ), - ), - ); + ) + .behavior(this.scope); - public readonly handsRaised$ = this.handsRaisedSubject$.pipe(); + public readonly handsRaised$ = this.handsRaisedSubject$.behavior(this.scope); /** * Emits an array of reactions that should be visible on the screen. */ - public readonly visibleReactions$ = showReactions.value$.pipe( - switchMap((show) => (show ? this.reactions$ : of({}))), - scan< - Record, - { sender: string; emoji: string; startX: number }[] - >((acc, latest) => { - const newSet: { sender: string; emoji: string; startX: number }[] = []; - for (const [sender, reaction] of Object.entries(latest)) { - const startX = - acc.find((v) => v.sender === sender && v.emoji)?.startX ?? - Math.ceil(Math.random() * 80) + 10; - newSet.push({ sender, emoji: reaction.emoji, startX }); - } - return newSet; - }, []), - ); + public readonly visibleReactions$ = showReactions.value$ + .pipe( + switchMap((show) => (show ? this.reactions$ : of({}))), + scan< + Record, + { sender: string; emoji: string; startX: number }[] + >((acc, latest) => { + const newSet: { sender: string; emoji: string; startX: number }[] = []; + for (const [sender, reaction] of Object.entries(latest)) { + const startX = + acc.find((v) => v.sender === sender && v.emoji)?.startX ?? + Math.ceil(Math.random() * 80) + 10; + newSet.push({ sender, emoji: reaction.emoji, startX }); + } + return newSet; + }, []), + ) + .behavior(this.scope); /** * Emits an array of reactions that should be played. diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 424d003e..8c7136aa 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -51,6 +51,7 @@ import { accumulate } from "../utils/observable"; import { type EncryptionSystem } from "../e2ee/sharedKeyManagement"; import { E2eeType } from "../e2ee/e2eeType"; import { type ReactionOption } from "../reactions"; +import { type Behavior } from "./Behavior"; export function observeTrackReference$( participant$: Observable, @@ -223,13 +224,13 @@ abstract class BaseMediaViewModel extends ViewModel { /** * The LiveKit video track for this media. */ - public readonly video$: Observable; + public readonly video$: Behavior; /** * Whether there should be a warning that this media is unencrypted. */ - public readonly unencryptedWarning$: Observable; + public readonly unencryptedWarning$: Behavior; - public readonly encryptionStatus$: Observable; + public readonly encryptionStatus$: Behavior; /** * Whether this media corresponds to the local participant. @@ -260,11 +261,11 @@ abstract class BaseMediaViewModel extends ViewModel { public readonly displayname$: Observable, ) { super(); - const audio$ = observeTrackReference$(participant$, audioSource).pipe( - this.scope.state(), + const audio$ = observeTrackReference$(participant$, audioSource).behavior( + this.scope, ); - this.video$ = observeTrackReference$(participant$, videoSource).pipe( - this.scope.state(), + this.video$ = observeTrackReference$(participant$, videoSource).behavior( + this.scope, ); this.unencryptedWarning$ = combineLatest( [audio$, this.video$], @@ -272,70 +273,71 @@ abstract class BaseMediaViewModel extends ViewModel { encryptionSystem.kind !== E2eeType.NONE && (a?.publication?.isEncrypted === false || v?.publication?.isEncrypted === false), - ).pipe(this.scope.state()); + ).behavior(this.scope); - this.encryptionStatus$ = this.participant$.pipe( - switchMap((participant): Observable => { - if (!participant) { - return of(EncryptionStatus.Connecting); - } else if ( - participant.isLocal || - encryptionSystem.kind === E2eeType.NONE - ) { - return of(EncryptionStatus.Okay); - } else if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { - return combineLatest([ - encryptionErrorObservable$( - livekitRoom, - participant, - encryptionSystem, - "MissingKey", - ), - encryptionErrorObservable$( - livekitRoom, - participant, - encryptionSystem, - "InvalidKey", - ), - observeRemoteTrackReceivingOkay$(participant, audioSource), - observeRemoteTrackReceivingOkay$(participant, videoSource), - ]).pipe( - map(([keyMissing, keyInvalid, audioOkay, videoOkay]) => { - if (keyMissing) return EncryptionStatus.KeyMissing; - if (keyInvalid) return EncryptionStatus.KeyInvalid; - if (audioOkay || videoOkay) return EncryptionStatus.Okay; - return undefined; // no change - }), - filter((x) => !!x), - startWith(EncryptionStatus.Connecting), - ); - } else { - return combineLatest([ - encryptionErrorObservable$( - livekitRoom, - participant, - encryptionSystem, - "InvalidKey", - ), - observeRemoteTrackReceivingOkay$(participant, audioSource), - observeRemoteTrackReceivingOkay$(participant, videoSource), - ]).pipe( - map( - ([keyInvalid, audioOkay, videoOkay]): - | EncryptionStatus - | undefined => { - if (keyInvalid) return EncryptionStatus.PasswordInvalid; + this.encryptionStatus$ = this.participant$ + .pipe( + switchMap((participant): Observable => { + if (!participant) { + return of(EncryptionStatus.Connecting); + } else if ( + participant.isLocal || + encryptionSystem.kind === E2eeType.NONE + ) { + return of(EncryptionStatus.Okay); + } else if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { + return combineLatest([ + encryptionErrorObservable$( + livekitRoom, + participant, + encryptionSystem, + "MissingKey", + ), + encryptionErrorObservable$( + livekitRoom, + participant, + encryptionSystem, + "InvalidKey", + ), + observeRemoteTrackReceivingOkay$(participant, audioSource), + observeRemoteTrackReceivingOkay$(participant, videoSource), + ]).pipe( + map(([keyMissing, keyInvalid, audioOkay, videoOkay]) => { + if (keyMissing) return EncryptionStatus.KeyMissing; + if (keyInvalid) return EncryptionStatus.KeyInvalid; if (audioOkay || videoOkay) return EncryptionStatus.Okay; return undefined; // no change - }, - ), - filter((x) => !!x), - startWith(EncryptionStatus.Connecting), - ); - } - }), - this.scope.state(), - ); + }), + filter((x) => !!x), + startWith(EncryptionStatus.Connecting), + ); + } else { + return combineLatest([ + encryptionErrorObservable$( + livekitRoom, + participant, + encryptionSystem, + "InvalidKey", + ), + observeRemoteTrackReceivingOkay$(participant, audioSource), + observeRemoteTrackReceivingOkay$(participant, videoSource), + ]).pipe( + map( + ([keyInvalid, audioOkay, videoOkay]): + | EncryptionStatus + | undefined => { + if (keyInvalid) return EncryptionStatus.PasswordInvalid; + if (audioOkay || videoOkay) return EncryptionStatus.Okay; + return undefined; // no change + }, + ), + filter((x) => !!x), + startWith(EncryptionStatus.Connecting), + ); + } + }), + ) + .behavior(this.scope); } } @@ -354,31 +356,33 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { /** * Whether the participant is speaking. */ - public readonly speaking$ = this.participant$.pipe( - switchMap((p) => - p - ? observeParticipantEvents(p, ParticipantEvent.IsSpeakingChanged).pipe( - map((p) => p.isSpeaking), - ) - : of(false), - ), - this.scope.state(), - ); + public readonly speaking$ = this.participant$ + .pipe( + switchMap((p) => + p + ? observeParticipantEvents( + p, + ParticipantEvent.IsSpeakingChanged, + ).pipe(map((p) => p.isSpeaking)) + : of(false), + ), + ) + .behavior(this.scope); /** * Whether this participant is sending audio (i.e. is unmuted on their side). */ - public readonly audioEnabled$: Observable; + public readonly audioEnabled$: Behavior; /** * Whether this participant is sending video. */ - public readonly videoEnabled$: Observable; + public readonly videoEnabled$: Behavior; private readonly _cropVideo$ = new BehaviorSubject(true); /** * Whether the tile video should be contained inside the tile or be cropped to fit. */ - public readonly cropVideo$: Observable = this._cropVideo$; + public readonly cropVideo$: Behavior = this._cropVideo$; public constructor( id: string, @@ -387,8 +391,8 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, displayname$: Observable, - public readonly handRaised$: Observable, - public readonly reaction$: Observable, + public readonly handRaised$: Behavior, + public readonly reaction$: Behavior, ) { super( id, @@ -401,16 +405,17 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { displayname$, ); - const media$ = participant$.pipe( - switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)), - this.scope.state(), - ); - this.audioEnabled$ = media$.pipe( - map((m) => m?.microphoneTrack?.isMuted === false), - ); - this.videoEnabled$ = media$.pipe( - map((m) => m?.cameraTrack?.isMuted === false), - ); + const media$ = participant$ + .pipe( + switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)), + ) + .behavior(this.scope); + this.audioEnabled$ = media$ + .pipe(map((m) => m?.microphoneTrack?.isMuted === false)) + .behavior(this.scope); + this.videoEnabled$ = media$ + .pipe(map((m) => m?.cameraTrack?.isMuted === false)) + .behavior(this.scope); } public toggleFitContain(): void { @@ -436,19 +441,20 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { /** * Whether the video should be mirrored. */ - public readonly mirror$ = this.video$.pipe( - switchMap((v) => { - const track = v?.publication?.track; - if (!(track instanceof LocalTrack)) return of(false); - // Watch for track restarts, because they indicate a camera switch - return fromEvent(track, TrackEvent.Restarted).pipe( - startWith(null), - // Mirror only front-facing cameras (those that face the user) - map(() => facingModeFromLocalTrack(track).facingMode === "user"), - ); - }), - this.scope.state(), - ); + public readonly mirror$ = this.video$ + .pipe( + switchMap((v) => { + const track = v?.publication?.track; + if (!(track instanceof LocalTrack)) return of(false); + // Watch for track restarts, because they indicate a camera switch + return fromEvent(track, TrackEvent.Restarted).pipe( + startWith(null), + // Mirror only front-facing cameras (those that face the user) + map(() => facingModeFromLocalTrack(track).facingMode === "user"), + ); + }), + ) + .behavior(this.scope); /** * Whether to show this tile in a highly visible location near the start of @@ -464,8 +470,8 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, displayname$: Observable, - handRaised$: Observable, - reaction$: Observable, + handRaised$: Behavior, + reaction$: Behavior, ) { super( id, @@ -512,43 +518,43 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { * The volume to which this participant's audio is set, as a scalar * multiplier. */ - public readonly localVolume$: Observable = merge( + public readonly localVolume$: Behavior = merge( this.locallyMutedToggle$.pipe(map(() => "toggle mute" as const)), this.localVolumeAdjustment$, this.localVolumeCommit$.pipe(map(() => "commit" as const)), - ).pipe( - accumulate({ volume: 1, committedVolume: 1 }, (state, event) => { - switch (event) { - case "toggle mute": - return { - ...state, - volume: state.volume === 0 ? state.committedVolume : 0, - }; - case "commit": - // Dragging the slider to zero should have the same effect as - // muting: keep the original committed volume, as if it were never - // dragged - return { - ...state, - committedVolume: - state.volume === 0 ? state.committedVolume : state.volume, - }; - default: - // Volume adjustment - return { ...state, volume: event }; - } - }), - map(({ volume }) => volume), - this.scope.state(), - ); + ) + .pipe( + accumulate({ volume: 1, committedVolume: 1 }, (state, event) => { + switch (event) { + case "toggle mute": + return { + ...state, + volume: state.volume === 0 ? state.committedVolume : 0, + }; + case "commit": + // Dragging the slider to zero should have the same effect as + // muting: keep the original committed volume, as if it were never + // dragged + return { + ...state, + committedVolume: + state.volume === 0 ? state.committedVolume : state.volume, + }; + default: + // Volume adjustment + return { ...state, volume: event }; + } + }), + map(({ volume }) => volume), + ) + .behavior(this.scope); /** * Whether this participant's audio is disabled. */ - public readonly locallyMuted$: Observable = this.localVolume$.pipe( - map((volume) => volume === 0), - this.scope.state(), - ); + public readonly locallyMuted$: Behavior = this.localVolume$ + .pipe(map((volume) => volume === 0)) + .behavior(this.scope); public constructor( id: string, @@ -557,8 +563,8 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, displayname$: Observable, - handRaised$: Observable, - reaction$: Observable, + handRaised$: Behavior, + reaction$: Behavior, ) { super( id, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 727082cf..d475e0af 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -5,13 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - distinctUntilChanged, - type Observable, - shareReplay, - Subject, - takeUntil, -} from "rxjs"; +import { type Observable, Subject, takeUntil } from "rxjs"; type MonoTypeOperator = (o: Observable) => Observable; @@ -31,22 +25,6 @@ export class ObservableScope { return this.bindImpl; } - private readonly stateImpl: MonoTypeOperator = (o$) => - o$.pipe( - this.bind(), - distinctUntilChanged(), - shareReplay({ bufferSize: 1, refCount: false }), - ); - - /** - * Transforms an Observable into a hot state Observable which replays its - * latest value upon subscription, skips updates with identical values, and - * is bound to this scope. - */ - public state(): MonoTypeOperator { - return this.stateImpl; - } - /** * Ends the scope, causing any bound Observables to complete. */ diff --git a/src/utils/test.ts b/src/utils/test.ts index 8f8b19a3..92c091e6 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -47,6 +47,7 @@ import { } from "../config/ConfigOptions"; import { Config } from "../config/Config"; import { type MediaDevices } from "../state/MediaDevices"; +import { constant } from "../state/Behavior"; export function withFakeTimers(continuation: () => void): void { vi.useFakeTimers(); @@ -217,8 +218,8 @@ export async function withLocalMedia( }, mockLivekitRoom({ localParticipant }), of(roomMember.rawDisplayName ?? "nodisplayname"), - of(null), - of(null), + constant(null), + constant(null), ); try { await continuation(vm); @@ -256,8 +257,8 @@ export async function withRemoteMedia( }, mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }), of(roomMember.rawDisplayName ?? "nodisplayname"), - of(null), - of(null), + constant(null), + constant(null), ); try { await continuation(vm);