mirror of
https://github.com/vector-im/element-call.git
synced 2026-01-18 02:32:27 +00:00
Disable a bunch of media/event sources when reconnecting
This commit is contained in:
@@ -70,7 +70,12 @@ import {
|
||||
ScreenShareViewModel,
|
||||
type UserMediaViewModel,
|
||||
} from "./MediaViewModel";
|
||||
import { accumulate, and$, finalizeValue } from "../utils/observable";
|
||||
import {
|
||||
accumulate,
|
||||
and$,
|
||||
finalizeValue,
|
||||
pauseWhen,
|
||||
} from "../utils/observable";
|
||||
import { ObservableScope } from "./ObservableScope";
|
||||
import {
|
||||
duplicateTiles,
|
||||
@@ -269,6 +274,7 @@ class UserMedia {
|
||||
encryptionSystem: EncryptionSystem,
|
||||
livekitRoom: LivekitRoom,
|
||||
mediaDevices: MediaDevices,
|
||||
pretendToBeDisconnected$: Behavior<boolean>,
|
||||
displayname$: Observable<string>,
|
||||
handRaised$: Observable<Date | null>,
|
||||
reaction$: Observable<ReactionOption | null>,
|
||||
@@ -296,6 +302,7 @@ class UserMedia {
|
||||
>,
|
||||
encryptionSystem,
|
||||
livekitRoom,
|
||||
pretendToBeDisconnected$,
|
||||
this.scope.behavior(displayname$),
|
||||
this.scope.behavior(handRaised$),
|
||||
this.scope.behavior(reaction$),
|
||||
@@ -349,7 +356,8 @@ class ScreenShare {
|
||||
member: RoomMember | undefined,
|
||||
participant: LocalParticipant | RemoteParticipant,
|
||||
encryptionSystem: EncryptionSystem,
|
||||
liveKitRoom: LivekitRoom,
|
||||
livekitRoom: LivekitRoom,
|
||||
pretendToBeDisconnected$: Behavior<boolean>,
|
||||
displayName$: Observable<string>,
|
||||
) {
|
||||
this.participant$ = new BehaviorSubject(participant);
|
||||
@@ -359,7 +367,8 @@ class ScreenShare {
|
||||
member,
|
||||
this.participant$.asObservable(),
|
||||
encryptionSystem,
|
||||
liveKitRoom,
|
||||
livekitRoom,
|
||||
pretendToBeDisconnected$,
|
||||
this.scope.behavior(displayName$),
|
||||
participant.isLocal,
|
||||
);
|
||||
@@ -400,81 +409,6 @@ export class CallViewModel extends ViewModel {
|
||||
private readonly userId = this.matrixRoom.client.getUserId();
|
||||
private readonly deviceId = this.matrixRoom.client.getDeviceId();
|
||||
|
||||
/**
|
||||
* The raw list of RemoteParticipants as reported by LiveKit
|
||||
*/
|
||||
private readonly rawRemoteParticipants$ = this.scope.behavior<
|
||||
RemoteParticipant[]
|
||||
>(connectedParticipantsObserver(this.livekitRoom), []);
|
||||
|
||||
/**
|
||||
* Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that
|
||||
* they've left
|
||||
*/
|
||||
private readonly remoteParticipantHolds$ = this.scope.behavior<
|
||||
RemoteParticipant[][]
|
||||
>(
|
||||
this.livekitConnectionState$.pipe(
|
||||
withLatestFrom(this.rawRemoteParticipants$),
|
||||
mergeMap(([s, ps]) => {
|
||||
// Whenever we switch focuses, we should retain all the previous
|
||||
// participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to
|
||||
// give their clients time to switch over and avoid jarring layout shifts
|
||||
if (s === ECAddonConnectionState.ECSwitchingFocus) {
|
||||
return concat(
|
||||
// Hold these participants
|
||||
of({ hold: ps }),
|
||||
// Wait for time to pass and the connection state to have changed
|
||||
forkJoin([
|
||||
timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS),
|
||||
this.livekitConnectionState$.pipe(
|
||||
filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus),
|
||||
take(1),
|
||||
),
|
||||
// Then unhold them
|
||||
]).pipe(map(() => ({ unhold: ps }))),
|
||||
);
|
||||
} else {
|
||||
return EMPTY;
|
||||
}
|
||||
}),
|
||||
// Accumulate the hold instructions into a single list showing which
|
||||
// participants are being held
|
||||
accumulate([] as RemoteParticipant[][], (holds, instruction) =>
|
||||
"hold" in instruction
|
||||
? [instruction.hold, ...holds]
|
||||
: holds.filter((h) => h !== instruction.unhold),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* The RemoteParticipants including those that are being "held" on the screen
|
||||
*/
|
||||
private readonly remoteParticipants$ = this.scope.behavior<
|
||||
RemoteParticipant[]
|
||||
>(
|
||||
combineLatest(
|
||||
[this.rawRemoteParticipants$, this.remoteParticipantHolds$],
|
||||
(raw, holds) => {
|
||||
const result = [...raw];
|
||||
const resultIds = new Set(result.map((p) => p.identity));
|
||||
|
||||
// Incorporate the held participants into the list
|
||||
for (const hold of holds) {
|
||||
for (const p of hold) {
|
||||
if (!resultIds.has(p.identity)) {
|
||||
result.push(p);
|
||||
resultIds.add(p.identity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
private readonly memberships$: Observable<CallMembership[]> = merge(
|
||||
// Handle call membership changes.
|
||||
fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged),
|
||||
@@ -548,35 +482,126 @@ export class CallViewModel extends ViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Whether various media/event sources should pretend to be disconnected from
|
||||
* all network input, even if their connection still technically works.
|
||||
*/
|
||||
// We do this when the app is in the 'reconnecting' state, because it might be
|
||||
// that the LiveKit connection is still functional while the homeserver is
|
||||
// down, for example, and we want to avoid making people worry that the app is
|
||||
// in a split-brained state.
|
||||
private readonly pretendToBeDisconnected$ = this.reconnecting$;
|
||||
|
||||
/**
|
||||
* The raw list of RemoteParticipants as reported by LiveKit
|
||||
*/
|
||||
private readonly rawRemoteParticipants$ = this.scope.behavior<
|
||||
RemoteParticipant[]
|
||||
>(connectedParticipantsObserver(this.livekitRoom), []);
|
||||
|
||||
/**
|
||||
* Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that
|
||||
* they've left
|
||||
*/
|
||||
private readonly remoteParticipantHolds$ = this.scope.behavior<
|
||||
RemoteParticipant[][]
|
||||
>(
|
||||
this.livekitConnectionState$.pipe(
|
||||
withLatestFrom(this.rawRemoteParticipants$),
|
||||
mergeMap(([s, ps]) => {
|
||||
// Whenever we switch focuses, we should retain all the previous
|
||||
// participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to
|
||||
// give their clients time to switch over and avoid jarring layout shifts
|
||||
if (s === ECAddonConnectionState.ECSwitchingFocus) {
|
||||
return concat(
|
||||
// Hold these participants
|
||||
of({ hold: ps }),
|
||||
// Wait for time to pass and the connection state to have changed
|
||||
forkJoin([
|
||||
timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS),
|
||||
this.livekitConnectionState$.pipe(
|
||||
filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus),
|
||||
take(1),
|
||||
),
|
||||
// Then unhold them
|
||||
]).pipe(map(() => ({ unhold: ps }))),
|
||||
);
|
||||
} else {
|
||||
return EMPTY;
|
||||
}
|
||||
}),
|
||||
// Accumulate the hold instructions into a single list showing which
|
||||
// participants are being held
|
||||
accumulate([] as RemoteParticipant[][], (holds, instruction) =>
|
||||
"hold" in instruction
|
||||
? [instruction.hold, ...holds]
|
||||
: holds.filter((h) => h !== instruction.unhold),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* The RemoteParticipants including those that are being "held" on the screen
|
||||
*/
|
||||
private readonly remoteParticipants$ = this.scope
|
||||
.behavior<RemoteParticipant[]>(
|
||||
combineLatest(
|
||||
[this.rawRemoteParticipants$, this.remoteParticipantHolds$],
|
||||
(raw, holds) => {
|
||||
const result = [...raw];
|
||||
const resultIds = new Set(result.map((p) => p.identity));
|
||||
|
||||
// Incorporate the held participants into the list
|
||||
for (const hold of holds) {
|
||||
for (const p of hold) {
|
||||
if (!resultIds.has(p.identity)) {
|
||||
result.push(p);
|
||||
resultIds.add(p.identity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
},
|
||||
),
|
||||
)
|
||||
.pipe(pauseWhen(this.pretendToBeDisconnected$));
|
||||
|
||||
/**
|
||||
* Displaynames for each member of the call. This will disambiguate
|
||||
* any displaynames that clashes with another member. Only members
|
||||
* joined to the call are considered here.
|
||||
*/
|
||||
public readonly memberDisplaynames$ = this.memberships$.pipe(
|
||||
map((memberships) => {
|
||||
const displaynameMap = new Map<string, string>();
|
||||
const room = this.matrixRoom;
|
||||
// It turns out that doing the disambiguation above is rather expensive on Safari (10x slower
|
||||
// than on Chrome/Firefox). This means it is important that we multicast the result so that we
|
||||
// don't do this work more times than we need to. This is achieved by converting to a behavior:
|
||||
public readonly memberDisplaynames$ = this.scope.behavior(
|
||||
this.memberships$.pipe(
|
||||
map((memberships) => {
|
||||
const displaynameMap = new Map<string, string>();
|
||||
const room = this.matrixRoom;
|
||||
|
||||
// We only consider RTC members for disambiguation as they are the only visible members.
|
||||
for (const rtcMember of memberships) {
|
||||
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
|
||||
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
|
||||
if (!member) {
|
||||
logger.error("Could not find member for media id:", matrixIdentifier);
|
||||
continue;
|
||||
// We only consider RTC members for disambiguation as they are the only visible members.
|
||||
for (const rtcMember of memberships) {
|
||||
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
|
||||
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
|
||||
if (!member) {
|
||||
logger.error(
|
||||
"Could not find member for media id:",
|
||||
matrixIdentifier,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const disambiguate = shouldDisambiguate(member, memberships, room);
|
||||
displaynameMap.set(
|
||||
matrixIdentifier,
|
||||
calculateDisplayName(member, disambiguate),
|
||||
);
|
||||
}
|
||||
const disambiguate = shouldDisambiguate(member, memberships, room);
|
||||
displaynameMap.set(
|
||||
matrixIdentifier,
|
||||
calculateDisplayName(member, disambiguate),
|
||||
);
|
||||
}
|
||||
return displaynameMap;
|
||||
}),
|
||||
// It turns out that doing the disambiguation above is rather expensive on Safari (10x slower
|
||||
// than on Chrome/Firefox). This means it is important that we multicast the result so that we
|
||||
// don't do this work more times than we need to. This is achieved by converting to a behavior:
|
||||
return displaynameMap;
|
||||
}),
|
||||
pauseWhen(this.pretendToBeDisconnected$),
|
||||
),
|
||||
);
|
||||
|
||||
public readonly handsRaised$ = this.scope.behavior(this.handsRaisedSubject$);
|
||||
@@ -591,6 +616,7 @@ export class CallViewModel extends ViewModel {
|
||||
]),
|
||||
),
|
||||
),
|
||||
pauseWhen(this.pretendToBeDisconnected$),
|
||||
),
|
||||
);
|
||||
|
||||
@@ -608,7 +634,7 @@ export class CallViewModel extends ViewModel {
|
||||
fromEvent(
|
||||
this.matrixRTCSession,
|
||||
MatrixRTCSessionEvent.MembershipsChanged,
|
||||
).pipe(startWith(null)),
|
||||
).pipe(startWith(null), pauseWhen(this.pretendToBeDisconnected$)),
|
||||
showNonMemberTiles.value$,
|
||||
]).pipe(
|
||||
scan(
|
||||
@@ -676,6 +702,7 @@ export class CallViewModel extends ViewModel {
|
||||
this.options.encryptionSystem,
|
||||
this.livekitRoom,
|
||||
this.mediaDevices,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
|
||||
),
|
||||
@@ -699,6 +726,7 @@ export class CallViewModel extends ViewModel {
|
||||
participant,
|
||||
this.options.encryptionSystem,
|
||||
this.livekitRoom,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
|
||||
),
|
||||
@@ -741,6 +769,7 @@ export class CallViewModel extends ViewModel {
|
||||
this.options.encryptionSystem,
|
||||
this.livekitRoom,
|
||||
this.mediaDevices,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map(
|
||||
(m) => m.get(participant.identity) ?? "[👻]",
|
||||
@@ -962,7 +991,7 @@ export class CallViewModel extends ViewModel {
|
||||
map((speaker) => (speaker ? [speaker] : [])),
|
||||
);
|
||||
}),
|
||||
distinctUntilChanged(shallowEquals),
|
||||
distinctUntilChanged<MediaViewModel[]>(shallowEquals),
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -361,10 +361,7 @@ export type UserMediaViewModel =
|
||||
* Some participant's user media.
|
||||
*/
|
||||
abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
|
||||
/**
|
||||
* Whether the participant is speaking.
|
||||
*/
|
||||
public readonly speaking$ = this.scope.behavior(
|
||||
private readonly _speaking$ = this.scope.behavior(
|
||||
this.participant$.pipe(
|
||||
switchMap((p) =>
|
||||
p
|
||||
@@ -376,15 +373,27 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
|
||||
),
|
||||
),
|
||||
);
|
||||
/**
|
||||
* Whether the participant is speaking.
|
||||
*/
|
||||
// Getter backed by a private field so that subclasses can override it
|
||||
public get speaking$(): Behavior<boolean> {
|
||||
return this._speaking$;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether this participant is sending audio (i.e. is unmuted on their side).
|
||||
*/
|
||||
public readonly audioEnabled$: Behavior<boolean>;
|
||||
|
||||
private readonly _videoEnabled$: Behavior<boolean>;
|
||||
/**
|
||||
* Whether this participant is sending video.
|
||||
*/
|
||||
public readonly videoEnabled$: Behavior<boolean>;
|
||||
// Getter backed by a private field so that subclasses can override it
|
||||
public get videoEnabled$(): Behavior<boolean> {
|
||||
return this._videoEnabled$;
|
||||
}
|
||||
|
||||
private readonly _cropVideo$ = new BehaviorSubject(true);
|
||||
/**
|
||||
@@ -421,7 +430,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
|
||||
this.audioEnabled$ = this.scope.behavior(
|
||||
media$.pipe(map((m) => m?.microphoneTrack?.isMuted === false)),
|
||||
);
|
||||
this.videoEnabled$ = this.scope.behavior(
|
||||
this._videoEnabled$ = this.scope.behavior(
|
||||
media$.pipe(map((m) => m?.cameraTrack?.isMuted === false)),
|
||||
);
|
||||
}
|
||||
@@ -572,6 +581,12 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
* A remote participant's user media.
|
||||
*/
|
||||
export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
// This private field is used to override the value from the superclass
|
||||
private __speaking$: Behavior<boolean>;
|
||||
public get speaking$(): Behavior<boolean> {
|
||||
return this.__speaking$;
|
||||
}
|
||||
|
||||
private readonly locallyMutedToggle$ = new Subject<void>();
|
||||
private readonly localVolumeAdjustment$ = new Subject<number>();
|
||||
private readonly localVolumeCommit$ = new Subject<void>();
|
||||
@@ -611,6 +626,23 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* The local volume, taking into account whether we're supposed to pretend
|
||||
* that the audio stream is disconnected (since we don't necessarily want that
|
||||
* to modify the UI state).
|
||||
*/
|
||||
private readonly actualLocalVolume$ = this.scope.behavior(
|
||||
this.pretendToBeDisconnected$.pipe(
|
||||
switchMap((disconnected) => (disconnected ? of(0) : this.localVolume$)),
|
||||
),
|
||||
);
|
||||
|
||||
// This private field is used to override the value from the superclass
|
||||
private __videoEnabled$: Behavior<boolean>;
|
||||
public get videoEnabled$(): Behavior<boolean> {
|
||||
return this.__videoEnabled$;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether this participant's audio is disabled.
|
||||
*/
|
||||
@@ -624,6 +656,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
participant$: Observable<RemoteParticipant | undefined>,
|
||||
encryptionSystem: EncryptionSystem,
|
||||
livekitRoom: LivekitRoom,
|
||||
private readonly pretendToBeDisconnected$: Behavior<boolean>,
|
||||
displayname$: Behavior<string>,
|
||||
handRaised$: Behavior<Date | null>,
|
||||
reaction$: Behavior<ReactionOption | null>,
|
||||
@@ -639,11 +672,27 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
reaction$,
|
||||
);
|
||||
|
||||
this.__speaking$ = this.scope.behavior(
|
||||
pretendToBeDisconnected$.pipe(
|
||||
switchMap((disconnected) =>
|
||||
disconnected ? of(false) : super.speaking$,
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
this.__videoEnabled$ = this.scope.behavior(
|
||||
pretendToBeDisconnected$.pipe(
|
||||
switchMap((disconnected) =>
|
||||
disconnected ? of(false) : super.videoEnabled$,
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Sync the local volume with LiveKit
|
||||
combineLatest([
|
||||
participant$,
|
||||
this.localVolume$.pipe(this.scope.bind()),
|
||||
]).subscribe(([p, volume]) => p && p.setVolume(volume));
|
||||
this.actualLocalVolume$.pipe(this.scope.bind()),
|
||||
]).subscribe(([p, volume]) => p?.setVolume(volume));
|
||||
}
|
||||
|
||||
public toggleLocallyMuted(): void {
|
||||
@@ -683,12 +732,20 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
|
||||
* Some participant's screen share media.
|
||||
*/
|
||||
export class ScreenShareViewModel extends BaseMediaViewModel {
|
||||
/**
|
||||
* Whether this screen share's video should be displayed.
|
||||
*/
|
||||
public readonly videoEnabled$ = this.scope.behavior(
|
||||
this.pretendToBeDisconnected$.pipe(map((disconnected) => !disconnected)),
|
||||
);
|
||||
|
||||
public constructor(
|
||||
id: string,
|
||||
member: RoomMember | undefined,
|
||||
participant$: Observable<LocalParticipant | RemoteParticipant>,
|
||||
encryptionSystem: EncryptionSystem,
|
||||
livekitRoom: LivekitRoom,
|
||||
private readonly pretendToBeDisconnected$: Behavior<boolean>,
|
||||
displayname$: Behavior<string>,
|
||||
public readonly local: boolean,
|
||||
) {
|
||||
|
||||
@@ -54,6 +54,7 @@ interface SpotlightItemBaseProps {
|
||||
targetWidth: number;
|
||||
targetHeight: number;
|
||||
video: TrackReferenceOrPlaceholder | undefined;
|
||||
videoEnabled: boolean;
|
||||
member: RoomMember | undefined;
|
||||
unencryptedWarning: boolean;
|
||||
encryptionStatus: EncryptionStatus;
|
||||
@@ -63,7 +64,6 @@ interface SpotlightItemBaseProps {
|
||||
}
|
||||
|
||||
interface SpotlightUserMediaItemBaseProps extends SpotlightItemBaseProps {
|
||||
videoEnabled: boolean;
|
||||
videoFit: "contain" | "cover";
|
||||
}
|
||||
|
||||
@@ -90,12 +90,10 @@ const SpotlightUserMediaItem: FC<SpotlightUserMediaItemProps> = ({
|
||||
vm,
|
||||
...props
|
||||
}) => {
|
||||
const videoEnabled = useBehavior(vm.videoEnabled$);
|
||||
const cropVideo = useBehavior(vm.cropVideo$);
|
||||
|
||||
const baseProps: SpotlightUserMediaItemBaseProps &
|
||||
RefAttributes<HTMLDivElement> = {
|
||||
videoEnabled,
|
||||
videoFit: cropVideo ? "cover" : "contain",
|
||||
...props,
|
||||
};
|
||||
@@ -135,6 +133,7 @@ const SpotlightItem: FC<SpotlightItemProps> = ({
|
||||
const ref = useMergedRefs(ourRef, theirRef);
|
||||
const displayName = useBehavior(vm.displayName$);
|
||||
const video = useBehavior(vm.video$);
|
||||
const videoEnabled = useBehavior(vm.videoEnabled$);
|
||||
const unencryptedWarning = useBehavior(vm.unencryptedWarning$);
|
||||
const encryptionStatus = useBehavior(vm.encryptionStatus$);
|
||||
|
||||
@@ -160,6 +159,7 @@ const SpotlightItem: FC<SpotlightItemProps> = ({
|
||||
targetWidth,
|
||||
targetHeight,
|
||||
video,
|
||||
videoEnabled,
|
||||
member: vm.member,
|
||||
unencryptedWarning,
|
||||
displayName,
|
||||
@@ -169,7 +169,7 @@ const SpotlightItem: FC<SpotlightItemProps> = ({
|
||||
};
|
||||
|
||||
return vm instanceof ScreenShareViewModel ? (
|
||||
<MediaView videoEnabled videoFit="contain" mirror={false} {...baseProps} />
|
||||
<MediaView videoFit="contain" mirror={false} {...baseProps} />
|
||||
) : (
|
||||
<SpotlightUserMediaItem vm={vm} {...baseProps} />
|
||||
);
|
||||
|
||||
24
src/utils/observable.test.ts
Normal file
24
src/utils/observable.test.ts
Normal file
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
Copyright 2025 New Vector Ltd.
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import { test } from "vitest";
|
||||
|
||||
import { withTestScheduler } from "./test";
|
||||
import { pauseWhen } from "./observable";
|
||||
|
||||
test("pauseWhen", () => {
|
||||
withTestScheduler(({ behavior, expectObservable }) => {
|
||||
const inputMarbles = " abcdefgh-i-jk-";
|
||||
const pauseMarbles = " n-y--n-yn-y--n";
|
||||
const outputMarbles = "abc--fgh-i---k";
|
||||
expectObservable(
|
||||
behavior(inputMarbles).pipe(
|
||||
pauseWhen(behavior(pauseMarbles, { y: true, n: false })),
|
||||
),
|
||||
).toBe(outputMarbles);
|
||||
});
|
||||
});
|
||||
@@ -7,16 +7,21 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import {
|
||||
type Observable,
|
||||
audit,
|
||||
combineLatest,
|
||||
concat,
|
||||
defer,
|
||||
filter,
|
||||
finalize,
|
||||
map,
|
||||
of,
|
||||
scan,
|
||||
startWith,
|
||||
takeWhile,
|
||||
tap,
|
||||
withLatestFrom,
|
||||
} from "rxjs";
|
||||
import { Behavior } from "../state/Behavior";
|
||||
|
||||
const nothing = Symbol("nothing");
|
||||
|
||||
@@ -95,3 +100,19 @@ export function getValue<T>(state$: Observable<T>): T {
|
||||
export function and$(...inputs: Observable<boolean>[]): Observable<boolean> {
|
||||
return combineLatest(inputs, (...flags) => flags.every((flag) => flag));
|
||||
}
|
||||
|
||||
/**
|
||||
* RxJS operator that pauses all changes in the input value whenever a Behavior
|
||||
* is true. When the Behavior returns to being false, the most recently
|
||||
* suppressed change is emitted as the most recent value.
|
||||
*/
|
||||
export function pauseWhen<T>(pause$: Behavior<boolean>) {
|
||||
return (value$: Observable<T>): Observable<T> =>
|
||||
value$.pipe(
|
||||
withLatestFrom(pause$),
|
||||
audit(([, pause]) =>
|
||||
pause ? pause$.pipe(filter((pause) => !pause)) : of(null),
|
||||
),
|
||||
map(([value]) => value),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -296,6 +296,7 @@ export async function withRemoteMedia(
|
||||
kind: E2eeType.PER_PARTICIPANT,
|
||||
},
|
||||
mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }),
|
||||
constant(false),
|
||||
constant(roomMember.rawDisplayName ?? "nodisplayname"),
|
||||
constant(null),
|
||||
constant(null),
|
||||
|
||||
Reference in New Issue
Block a user