From 32bf1c30d23a950a8d83b7f1567034b6ac01cc8e Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 11 Jul 2025 23:53:59 -0400 Subject: [PATCH] Use Behaviors even more consistently --- src/livekit/useLivekit.ts | 11 ++-- src/state/CallViewModel.ts | 110 +++++++++++++++++++----------------- src/state/MediaViewModel.ts | 52 +++++++++-------- src/utils/test.ts | 4 +- 4 files changed, 94 insertions(+), 83 deletions(-) diff --git a/src/livekit/useLivekit.ts b/src/livekit/useLivekit.ts index 58f088f6..b7032645 100644 --- a/src/livekit/useLivekit.ts +++ b/src/livekit/useLivekit.ts @@ -157,10 +157,13 @@ export function useLivekit( useObservableEagerState( useObservable( (room$) => - observeTrackReference$( - room$.pipe(map(([room]) => room.localParticipant)), - Track.Source.Camera, - ).pipe( + room$.pipe( + switchMap(([room]) => + observeTrackReference$( + room.localParticipant, + Track.Source.Camera, + ), + ), map((trackRef) => { const track = trackRef?.publication?.track; return track instanceof LocalVideoTrack ? track : null; diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 5ff3d8d3..de90cb16 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -251,8 +251,8 @@ class UserMedia { LocalParticipant | RemoteParticipant | undefined >; - public readonly speaker$: Observable; - public readonly presenter$: Observable; + public readonly speaker$: Behavior; + public readonly presenter$: Behavior; public constructor( public readonly id: string, member: RoomMember | undefined, @@ -269,7 +269,7 @@ class UserMedia { this.vm = new LocalUserMediaViewModel( this.id, member, - this.participant$.asObservable() as Observable, + this.participant$ as Behavior, encryptionSystem, livekitRoom, displayname$.behavior(this.scope), @@ -386,21 +386,23 @@ function getRoomMemberFromRtcMember( // TODO: Move wayyyy more business logic from the call and lobby views into here export class CallViewModel extends ViewModel { - public readonly localVideo$: Observable = + public readonly localVideo$: Behavior = observeTrackReference$( - of(this.livekitRoom.localParticipant), + this.livekitRoom.localParticipant, Track.Source.Camera, - ).pipe( - map((trackRef) => { - const track = trackRef?.publication?.track; - return track instanceof LocalVideoTrack ? track : null; - }), - ); + ) + .pipe( + map((trackRef) => { + const track = trackRef?.publication?.track; + return track instanceof LocalVideoTrack ? track : null; + }), + ) + .behavior(this.scope); /** * The raw list of RemoteParticipants as reported by LiveKit */ - private readonly rawRemoteParticipants$: Observable = + private readonly rawRemoteParticipants$: Behavior = connectedParticipantsObserver(this.livekitRoom) .pipe(startWith([])) .behavior(this.scope); @@ -409,44 +411,46 @@ export class CallViewModel extends ViewModel { * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that * they've left */ - private readonly remoteParticipantHolds$: Observable = - this.connectionState$.pipe( - withLatestFrom(this.rawRemoteParticipants$), - mergeMap(([s, ps]) => { - // Whenever we switch focuses, we should retain all the previous - // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to - // give their clients time to switch over and avoid jarring layout shifts - if (s === ECAddonConnectionState.ECSwitchingFocus) { - return concat( - // Hold these participants - of({ hold: ps }), - // Wait for time to pass and the connection state to have changed - forkJoin([ - timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), - this.connectionState$.pipe( - filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), - take(1), - ), - // Then unhold them - ]).pipe(map(() => ({ unhold: ps }))), - ); - } else { - return EMPTY; - } - }), - // Accumulate the hold instructions into a single list showing which - // participants are being held - accumulate([] as RemoteParticipant[][], (holds, instruction) => - "hold" in instruction - ? [instruction.hold, ...holds] - : holds.filter((h) => h !== instruction.unhold), - ), - ); + private readonly remoteParticipantHolds$: Behavior = + this.connectionState$ + .pipe( + withLatestFrom(this.rawRemoteParticipants$), + mergeMap(([s, ps]) => { + // Whenever we switch focuses, we should retain all the previous + // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to + // give their clients time to switch over and avoid jarring layout shifts + if (s === ECAddonConnectionState.ECSwitchingFocus) { + return concat( + // Hold these participants + of({ hold: ps }), + // Wait for time to pass and the connection state to have changed + forkJoin([ + timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), + this.connectionState$.pipe( + filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), + take(1), + ), + // Then unhold them + ]).pipe(map(() => ({ unhold: ps }))), + ); + } else { + return EMPTY; + } + }), + // Accumulate the hold instructions into a single list showing which + // participants are being held + accumulate([] as RemoteParticipant[][], (holds, instruction) => + "hold" in instruction + ? [instruction.hold, ...holds] + : holds.filter((h) => h !== instruction.unhold), + ), + ) + .behavior(this.scope); /** * The RemoteParticipants including those that are being "held" on the screen */ - private readonly remoteParticipants$: Observable = + private readonly remoteParticipants$: Behavior = combineLatest( [this.rawRemoteParticipants$, this.remoteParticipantHolds$], (raw, holds) => { @@ -465,7 +469,7 @@ export class CallViewModel extends ViewModel { return result; }, - ); + ).behavior(this.scope); /** * Displaynames for each member of the call. This will disambiguate @@ -709,11 +713,13 @@ export class CallViewModel extends ViewModel { /** * List of MediaItems that we want to display, that are of type UserMedia */ - private readonly userMedia$: Observable = this.mediaItems$.pipe( - map((mediaItems) => - mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), - ), - ); + private readonly userMedia$: Behavior = this.mediaItems$ + .pipe( + map((mediaItems) => + mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), + ), + ) + .behavior(this.scope); public readonly memberChanges$ = this.userMedia$ .pipe(map((mediaItems) => mediaItems.map((m) => m.id))) diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 00ec45c9..def56633 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -54,24 +54,16 @@ import { type ReactionOption } from "../reactions"; import { type Behavior } from "./Behavior"; export function observeTrackReference$( - participant$: Observable, + participant: Participant, source: Track.Source, -): Observable { - return participant$.pipe( - switchMap((p) => { - if (p) { - return observeParticipantMedia(p).pipe( - map(() => ({ - participant: p, - publication: p.getTrackPublication(source), - source, - })), - distinctUntilKeyChanged("publication"), - ); - } else { - return of(undefined); - } - }), +): Observable { + return observeParticipantMedia(participant).pipe( + map(() => ({ + participant: participant, + publication: participant.getTrackPublication(source), + source, + })), + distinctUntilKeyChanged("publication"), ); } @@ -83,7 +75,7 @@ export function observeRtpStreamStats$( RTCInboundRtpStreamStats | RTCOutboundRtpStreamStats | undefined > { return combineLatest([ - observeTrackReference$(of(participant), source), + observeTrackReference$(participant, source), interval(1000).pipe(startWith(0)), ]).pipe( switchMap(async ([trackReference]) => { @@ -237,6 +229,18 @@ abstract class BaseMediaViewModel extends ViewModel { */ public abstract readonly local: boolean; + private observeTrackReference$( + source: Track.Source, + ): Behavior { + return this.participant$ + .pipe( + switchMap((p) => + p === undefined ? of(undefined) : observeTrackReference$(p, source), + ), + ) + .behavior(this.scope); + } + public constructor( /** * An opaque identifier for this media. @@ -261,12 +265,10 @@ abstract class BaseMediaViewModel extends ViewModel { public readonly displayName$: Behavior, ) { super(); - const audio$ = observeTrackReference$(participant$, audioSource).behavior( - this.scope, - ); - this.video$ = observeTrackReference$(participant$, videoSource).behavior( - this.scope, - ); + + const audio$ = this.observeTrackReference$(audioSource); + this.video$ = this.observeTrackReference$(videoSource); + this.unencryptedWarning$ = combineLatest( [audio$, this.video$], (a, v) => @@ -466,7 +468,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, member: RoomMember | undefined, - participant$: Observable, + participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, displayName$: Behavior, diff --git a/src/utils/test.ts b/src/utils/test.ts index 3ebb1335..3bcfa45f 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -243,7 +243,7 @@ export async function withLocalMedia( const vm = new LocalUserMediaViewModel( "local", mockMatrixRoomMember(localRtcMember, roomMember), - of(localParticipant), + constant(localParticipant), { kind: E2eeType.PER_PARTICIPANT, }, @@ -331,7 +331,7 @@ export class MockRTCSession extends TypedEventEmitter< } public withMemberships( - rtcMembers$: Observable[]>, + rtcMembers$: Behavior[]>, ): MockRTCSession { rtcMembers$.subscribe((m) => { const old = this.memberships;