diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 8575497f..dcaa2a8a 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -70,7 +70,12 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "./MediaViewModel"; -import { accumulate, and$, finalizeValue } from "../utils/observable"; +import { + accumulate, + and$, + finalizeValue, + pauseWhen, +} from "../utils/observable"; import { ObservableScope } from "./ObservableScope"; import { duplicateTiles, @@ -269,6 +274,7 @@ class UserMedia { encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, mediaDevices: MediaDevices, + pretendToBeDisconnected$: Behavior, displayname$: Observable, handRaised$: Observable, reaction$: Observable, @@ -296,6 +302,7 @@ class UserMedia { >, encryptionSystem, livekitRoom, + pretendToBeDisconnected$, this.scope.behavior(displayname$), this.scope.behavior(handRaised$), this.scope.behavior(reaction$), @@ -349,7 +356,8 @@ class ScreenShare { member: RoomMember | undefined, participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, - liveKitRoom: LivekitRoom, + livekitRoom: LivekitRoom, + pretendToBeDisconnected$: Behavior, displayName$: Observable, ) { this.participant$ = new BehaviorSubject(participant); @@ -359,7 +367,8 @@ class ScreenShare { member, this.participant$.asObservable(), encryptionSystem, - liveKitRoom, + livekitRoom, + pretendToBeDisconnected$, this.scope.behavior(displayName$), participant.isLocal, ); @@ -400,81 +409,6 @@ export class CallViewModel extends ViewModel { private readonly userId = this.matrixRoom.client.getUserId(); private readonly deviceId = this.matrixRoom.client.getDeviceId(); - /** - * The raw list of RemoteParticipants as reported by LiveKit - */ - private readonly rawRemoteParticipants$ = this.scope.behavior< - RemoteParticipant[] - >(connectedParticipantsObserver(this.livekitRoom), []); - - /** - * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that - * they've left - */ - private readonly remoteParticipantHolds$ = this.scope.behavior< - RemoteParticipant[][] - >( - this.livekitConnectionState$.pipe( - withLatestFrom(this.rawRemoteParticipants$), - mergeMap(([s, ps]) => { - // Whenever we switch focuses, we should retain all the previous - // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to - // give their clients time to switch over and avoid jarring layout shifts - if (s === ECAddonConnectionState.ECSwitchingFocus) { - return concat( - // Hold these participants - of({ hold: ps }), - // Wait for time to pass and the connection state to have changed - forkJoin([ - timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), - this.livekitConnectionState$.pipe( - filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), - take(1), - ), - // Then unhold them - ]).pipe(map(() => ({ unhold: ps }))), - ); - } else { - return EMPTY; - } - }), - // Accumulate the hold instructions into a single list showing which - // participants are being held - accumulate([] as RemoteParticipant[][], (holds, instruction) => - "hold" in instruction - ? [instruction.hold, ...holds] - : holds.filter((h) => h !== instruction.unhold), - ), - ), - ); - - /** - * The RemoteParticipants including those that are being "held" on the screen - */ - private readonly remoteParticipants$ = this.scope.behavior< - RemoteParticipant[] - >( - combineLatest( - [this.rawRemoteParticipants$, this.remoteParticipantHolds$], - (raw, holds) => { - const result = [...raw]; - const resultIds = new Set(result.map((p) => p.identity)); - - // Incorporate the held participants into the list - for (const hold of holds) { - for (const p of hold) { - if (!resultIds.has(p.identity)) { - result.push(p); - resultIds.add(p.identity); - } - } - } - - return result; - }, - ), - ); - private readonly memberships$: Observable = merge( // Handle call membership changes. fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged), @@ -548,35 +482,126 @@ export class CallViewModel extends ViewModel { ), ); + /** + * Whether various media/event sources should pretend to be disconnected from + * all network input, even if their connection still technically works. + */ + // We do this when the app is in the 'reconnecting' state, because it might be + // that the LiveKit connection is still functional while the homeserver is + // down, for example, and we want to avoid making people worry that the app is + // in a split-brained state. + private readonly pretendToBeDisconnected$ = this.reconnecting$; + + /** + * The raw list of RemoteParticipants as reported by LiveKit + */ + private readonly rawRemoteParticipants$ = this.scope.behavior< + RemoteParticipant[] + >(connectedParticipantsObserver(this.livekitRoom), []); + + /** + * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that + * they've left + */ + private readonly remoteParticipantHolds$ = this.scope.behavior< + RemoteParticipant[][] + >( + this.livekitConnectionState$.pipe( + withLatestFrom(this.rawRemoteParticipants$), + mergeMap(([s, ps]) => { + // Whenever we switch focuses, we should retain all the previous + // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to + // give their clients time to switch over and avoid jarring layout shifts + if (s === ECAddonConnectionState.ECSwitchingFocus) { + return concat( + // Hold these participants + of({ hold: ps }), + // Wait for time to pass and the connection state to have changed + forkJoin([ + timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), + this.livekitConnectionState$.pipe( + filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), + take(1), + ), + // Then unhold them + ]).pipe(map(() => ({ unhold: ps }))), + ); + } else { + return EMPTY; + } + }), + // Accumulate the hold instructions into a single list showing which + // participants are being held + accumulate([] as RemoteParticipant[][], (holds, instruction) => + "hold" in instruction + ? [instruction.hold, ...holds] + : holds.filter((h) => h !== instruction.unhold), + ), + ), + ); + + /** + * The RemoteParticipants including those that are being "held" on the screen + */ + private readonly remoteParticipants$ = this.scope + .behavior( + combineLatest( + [this.rawRemoteParticipants$, this.remoteParticipantHolds$], + (raw, holds) => { + const result = [...raw]; + const resultIds = new Set(result.map((p) => p.identity)); + + // Incorporate the held participants into the list + for (const hold of holds) { + for (const p of hold) { + if (!resultIds.has(p.identity)) { + result.push(p); + resultIds.add(p.identity); + } + } + } + + return result; + }, + ), + ) + .pipe(pauseWhen(this.pretendToBeDisconnected$)); + /** * Displaynames for each member of the call. This will disambiguate * any displaynames that clashes with another member. Only members * joined to the call are considered here. */ - public readonly memberDisplaynames$ = this.memberships$.pipe( - map((memberships) => { - const displaynameMap = new Map(); - const room = this.matrixRoom; + // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower + // than on Chrome/Firefox). This means it is important that we multicast the result so that we + // don't do this work more times than we need to. This is achieved by converting to a behavior: + public readonly memberDisplaynames$ = this.scope.behavior( + this.memberships$.pipe( + map((memberships) => { + const displaynameMap = new Map(); + const room = this.matrixRoom; - // We only consider RTC members for disambiguation as they are the only visible members. - for (const rtcMember of memberships) { - const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; - const { member } = getRoomMemberFromRtcMember(rtcMember, room); - if (!member) { - logger.error("Could not find member for media id:", matrixIdentifier); - continue; + // We only consider RTC members for disambiguation as they are the only visible members. + for (const rtcMember of memberships) { + const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; + const { member } = getRoomMemberFromRtcMember(rtcMember, room); + if (!member) { + logger.error( + "Could not find member for media id:", + matrixIdentifier, + ); + continue; + } + const disambiguate = shouldDisambiguate(member, memberships, room); + displaynameMap.set( + matrixIdentifier, + calculateDisplayName(member, disambiguate), + ); } - const disambiguate = shouldDisambiguate(member, memberships, room); - displaynameMap.set( - matrixIdentifier, - calculateDisplayName(member, disambiguate), - ); - } - return displaynameMap; - }), - // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower - // than on Chrome/Firefox). This means it is important that we multicast the result so that we - // don't do this work more times than we need to. This is achieved by converting to a behavior: + return displaynameMap; + }), + pauseWhen(this.pretendToBeDisconnected$), + ), ); public readonly handsRaised$ = this.scope.behavior(this.handsRaisedSubject$); @@ -591,6 +616,7 @@ export class CallViewModel extends ViewModel { ]), ), ), + pauseWhen(this.pretendToBeDisconnected$), ), ); @@ -608,7 +634,7 @@ export class CallViewModel extends ViewModel { fromEvent( this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged, - ).pipe(startWith(null)), + ).pipe(startWith(null), pauseWhen(this.pretendToBeDisconnected$)), showNonMemberTiles.value$, ]).pipe( scan( @@ -676,6 +702,7 @@ export class CallViewModel extends ViewModel { this.options.encryptionSystem, this.livekitRoom, this.mediaDevices, + this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(matrixIdentifier) ?? "[👻]"), ), @@ -699,6 +726,7 @@ export class CallViewModel extends ViewModel { participant, this.options.encryptionSystem, this.livekitRoom, + this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(matrixIdentifier) ?? "[👻]"), ), @@ -741,6 +769,7 @@ export class CallViewModel extends ViewModel { this.options.encryptionSystem, this.livekitRoom, this.mediaDevices, + this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map( (m) => m.get(participant.identity) ?? "[👻]", @@ -962,7 +991,7 @@ export class CallViewModel extends ViewModel { map((speaker) => (speaker ? [speaker] : [])), ); }), - distinctUntilChanged(shallowEquals), + distinctUntilChanged(shallowEquals), ), ); diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 2e690226..48fbb733 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -361,10 +361,7 @@ export type UserMediaViewModel = * Some participant's user media. */ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { - /** - * Whether the participant is speaking. - */ - public readonly speaking$ = this.scope.behavior( + private readonly _speaking$ = this.scope.behavior( this.participant$.pipe( switchMap((p) => p @@ -376,15 +373,27 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { ), ), ); + /** + * Whether the participant is speaking. + */ + // Getter backed by a private field so that subclasses can override it + public get speaking$(): Behavior { + return this._speaking$; + } /** * Whether this participant is sending audio (i.e. is unmuted on their side). */ public readonly audioEnabled$: Behavior; + + private readonly _videoEnabled$: Behavior; /** * Whether this participant is sending video. */ - public readonly videoEnabled$: Behavior; + // Getter backed by a private field so that subclasses can override it + public get videoEnabled$(): Behavior { + return this._videoEnabled$; + } private readonly _cropVideo$ = new BehaviorSubject(true); /** @@ -421,7 +430,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { this.audioEnabled$ = this.scope.behavior( media$.pipe(map((m) => m?.microphoneTrack?.isMuted === false)), ); - this.videoEnabled$ = this.scope.behavior( + this._videoEnabled$ = this.scope.behavior( media$.pipe(map((m) => m?.cameraTrack?.isMuted === false)), ); } @@ -572,6 +581,12 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { * A remote participant's user media. */ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { + // This private field is used to override the value from the superclass + private __speaking$: Behavior; + public get speaking$(): Behavior { + return this.__speaking$; + } + private readonly locallyMutedToggle$ = new Subject(); private readonly localVolumeAdjustment$ = new Subject(); private readonly localVolumeCommit$ = new Subject(); @@ -611,6 +626,23 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { ), ); + /** + * The local volume, taking into account whether we're supposed to pretend + * that the audio stream is disconnected (since we don't necessarily want that + * to modify the UI state). + */ + private readonly actualLocalVolume$ = this.scope.behavior( + this.pretendToBeDisconnected$.pipe( + switchMap((disconnected) => (disconnected ? of(0) : this.localVolume$)), + ), + ); + + // This private field is used to override the value from the superclass + private __videoEnabled$: Behavior; + public get videoEnabled$(): Behavior { + return this.__videoEnabled$; + } + /** * Whether this participant's audio is disabled. */ @@ -624,6 +656,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, + private readonly pretendToBeDisconnected$: Behavior, displayname$: Behavior, handRaised$: Behavior, reaction$: Behavior, @@ -639,11 +672,27 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { reaction$, ); + this.__speaking$ = this.scope.behavior( + pretendToBeDisconnected$.pipe( + switchMap((disconnected) => + disconnected ? of(false) : super.speaking$, + ), + ), + ); + + this.__videoEnabled$ = this.scope.behavior( + pretendToBeDisconnected$.pipe( + switchMap((disconnected) => + disconnected ? of(false) : super.videoEnabled$, + ), + ), + ); + // Sync the local volume with LiveKit combineLatest([ participant$, - this.localVolume$.pipe(this.scope.bind()), - ]).subscribe(([p, volume]) => p && p.setVolume(volume)); + this.actualLocalVolume$.pipe(this.scope.bind()), + ]).subscribe(([p, volume]) => p?.setVolume(volume)); } public toggleLocallyMuted(): void { @@ -683,12 +732,20 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { * Some participant's screen share media. */ export class ScreenShareViewModel extends BaseMediaViewModel { + /** + * Whether this screen share's video should be displayed. + */ + public readonly videoEnabled$ = this.scope.behavior( + this.pretendToBeDisconnected$.pipe(map((disconnected) => !disconnected)), + ); + public constructor( id: string, member: RoomMember | undefined, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, + private readonly pretendToBeDisconnected$: Behavior, displayname$: Behavior, public readonly local: boolean, ) { diff --git a/src/tile/SpotlightTile.tsx b/src/tile/SpotlightTile.tsx index 8bc45a81..abdd27f6 100644 --- a/src/tile/SpotlightTile.tsx +++ b/src/tile/SpotlightTile.tsx @@ -54,6 +54,7 @@ interface SpotlightItemBaseProps { targetWidth: number; targetHeight: number; video: TrackReferenceOrPlaceholder | undefined; + videoEnabled: boolean; member: RoomMember | undefined; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; @@ -63,7 +64,6 @@ interface SpotlightItemBaseProps { } interface SpotlightUserMediaItemBaseProps extends SpotlightItemBaseProps { - videoEnabled: boolean; videoFit: "contain" | "cover"; } @@ -90,12 +90,10 @@ const SpotlightUserMediaItem: FC = ({ vm, ...props }) => { - const videoEnabled = useBehavior(vm.videoEnabled$); const cropVideo = useBehavior(vm.cropVideo$); const baseProps: SpotlightUserMediaItemBaseProps & RefAttributes = { - videoEnabled, videoFit: cropVideo ? "cover" : "contain", ...props, }; @@ -135,6 +133,7 @@ const SpotlightItem: FC = ({ const ref = useMergedRefs(ourRef, theirRef); const displayName = useBehavior(vm.displayName$); const video = useBehavior(vm.video$); + const videoEnabled = useBehavior(vm.videoEnabled$); const unencryptedWarning = useBehavior(vm.unencryptedWarning$); const encryptionStatus = useBehavior(vm.encryptionStatus$); @@ -160,6 +159,7 @@ const SpotlightItem: FC = ({ targetWidth, targetHeight, video, + videoEnabled, member: vm.member, unencryptedWarning, displayName, @@ -169,7 +169,7 @@ const SpotlightItem: FC = ({ }; return vm instanceof ScreenShareViewModel ? ( - + ) : ( ); diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts new file mode 100644 index 00000000..5f488fb1 --- /dev/null +++ b/src/utils/observable.test.ts @@ -0,0 +1,24 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { test } from "vitest"; + +import { withTestScheduler } from "./test"; +import { pauseWhen } from "./observable"; + +test("pauseWhen", () => { + withTestScheduler(({ behavior, expectObservable }) => { + const inputMarbles = " abcdefgh-i-jk-"; + const pauseMarbles = " n-y--n-yn-y--n"; + const outputMarbles = "abc--fgh-i---k"; + expectObservable( + behavior(inputMarbles).pipe( + pauseWhen(behavior(pauseMarbles, { y: true, n: false })), + ), + ).toBe(outputMarbles); + }); +}); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 1c3a3be7..4ffcff46 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -7,16 +7,21 @@ Please see LICENSE in the repository root for full details. import { type Observable, + audit, combineLatest, concat, defer, + filter, finalize, map, + of, scan, startWith, takeWhile, tap, + withLatestFrom, } from "rxjs"; +import { Behavior } from "../state/Behavior"; const nothing = Symbol("nothing"); @@ -95,3 +100,19 @@ export function getValue(state$: Observable): T { export function and$(...inputs: Observable[]): Observable { return combineLatest(inputs, (...flags) => flags.every((flag) => flag)); } + +/** + * RxJS operator that pauses all changes in the input value whenever a Behavior + * is true. When the Behavior returns to being false, the most recently + * suppressed change is emitted as the most recent value. + */ +export function pauseWhen(pause$: Behavior) { + return (value$: Observable): Observable => + value$.pipe( + withLatestFrom(pause$), + audit(([, pause]) => + pause ? pause$.pipe(filter((pause) => !pause)) : of(null), + ), + map(([value]) => value), + ); +} diff --git a/src/utils/test.ts b/src/utils/test.ts index 5a301c00..a7a9e489 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -296,6 +296,7 @@ export async function withRemoteMedia( kind: E2eeType.PER_PARTICIPANT, }, mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }), + constant(false), constant(roomMember.rawDisplayName ?? "nodisplayname"), constant(null), constant(null),