diff --git a/src/main.tsx b/src/main.tsx index e6a102c6..e41aaff8 100644 --- a/src/main.tsx +++ b/src/main.tsx @@ -24,6 +24,7 @@ import { App } from "./App"; import { init as initRageshake } from "./settings/rageshake"; import { Initializer } from "./initializer"; import { AppViewModel } from "./state/AppViewModel"; +import { globalScope } from "./state/ObservableScope"; window.setLKLogLevel = setLKLogLevel; @@ -61,7 +62,7 @@ Initializer.initBeforeReact() .then(() => { root.render( - , + , , ); }) diff --git a/src/reactions/ReactionsReader.test.tsx b/src/reactions/ReactionsReader.test.tsx index 76ea45be..dd82a718 100644 --- a/src/reactions/ReactionsReader.test.tsx +++ b/src/reactions/ReactionsReader.test.tsx @@ -23,7 +23,7 @@ import { localRtcMember, } from "../utils/test-fixtures"; import { getBasicRTCSession } from "../utils/test-viewmodel"; -import { withTestScheduler } from "../utils/test"; +import { testScope, withTestScheduler } from "../utils/test"; import { ElementCallReactionEventType, ReactionSet } from "."; afterEach(() => { @@ -37,6 +37,7 @@ test("handles a hand raised reaction", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("ab", { @@ -85,6 +86,7 @@ test("handles a redaction", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("abc", { @@ -148,6 +150,7 @@ test("handles waiting for event decryption", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("abc", { @@ -217,6 +220,7 @@ test("hands rejecting events without a proper membership", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("ab", { @@ -261,7 +265,10 @@ test("handles a reaction", () => { withTestScheduler(({ schedule, time, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule(`abc`, { a: () => {}, b: () => { @@ -317,7 +324,10 @@ test("ignores bad reaction events", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule("ab", { a: () => {}, b: () => { @@ -439,7 +449,10 @@ test("that reactions cannot be spammed", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule("abcd", { a: () => {}, b: () => { diff --git a/src/reactions/ReactionsReader.ts b/src/reactions/ReactionsReader.ts index 9e30eff5..c1f78b51 100644 --- a/src/reactions/ReactionsReader.ts +++ b/src/reactions/ReactionsReader.ts @@ -18,7 +18,7 @@ import { EventType, RoomEvent as MatrixRoomEvent, } from "matrix-js-sdk"; -import { BehaviorSubject, delay, type Subscription } from "rxjs"; +import { BehaviorSubject, delay } from "rxjs"; import { ElementCallReactionEventType, @@ -28,6 +28,7 @@ import { type RaisedHandInfo, type ReactionInfo, } from "."; +import { type ObservableScope } from "../state/ObservableScope"; export const REACTION_ACTIVE_TIME_MS = 3000; @@ -54,12 +55,13 @@ export class ReactionsReader { */ public readonly reactions$ = this.reactionsSubject$.asObservable(); - private readonly reactionsSub: Subscription; - - public constructor(private readonly rtcSession: MatrixRTCSession) { + public constructor( + private readonly scope: ObservableScope, + private readonly rtcSession: MatrixRTCSession, + ) { // Hide reactions after a given time. - this.reactionsSub = this.reactionsSubject$ - .pipe(delay(REACTION_ACTIVE_TIME_MS)) + this.reactionsSubject$ + .pipe(delay(REACTION_ACTIVE_TIME_MS), this.scope.bind()) .subscribe((reactions) => { const date = new Date(); const nextEntries = Object.fromEntries( @@ -71,15 +73,38 @@ export class ReactionsReader { this.reactionsSubject$.next(nextEntries); }); + // TODO: Convert this class to the functional reactive style and get rid of + // all this manual setup and teardown for event listeners + this.rtcSession.room.on(MatrixRoomEvent.Timeline, this.handleReactionEvent); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.Timeline, + this.handleReactionEvent, + ), + ); + this.rtcSession.room.on( MatrixRoomEvent.Redaction, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.Redaction, + this.handleReactionEvent, + ), + ); + this.rtcSession.room.client.on( MatrixEventEvent.Decrypted, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.client.off( + MatrixEventEvent.Decrypted, + this.handleReactionEvent, + ), + ); // We listen for a local echo to get the real event ID, as timeline events // may still be sending. @@ -87,11 +112,23 @@ export class ReactionsReader { MatrixRoomEvent.LocalEchoUpdated, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.LocalEchoUpdated, + this.handleReactionEvent, + ), + ); - rtcSession.on( + this.rtcSession.on( MatrixRTCSessionEvent.MembershipsChanged, this.onMembershipsChanged, ); + this.scope.onEnd(() => + this.rtcSession.off( + MatrixRTCSessionEvent.MembershipsChanged, + this.onMembershipsChanged, + ), + ); // Run this once to ensure we have fetched the state from the call. this.onMembershipsChanged([]); @@ -309,31 +346,4 @@ export class ReactionsReader { this.removeRaisedHand(targetUser); } }; - - /** - * Stop listening for events. - */ - public destroy(): void { - this.rtcSession.off( - MatrixRTCSessionEvent.MembershipsChanged, - this.onMembershipsChanged, - ); - this.rtcSession.room.off( - MatrixRoomEvent.Timeline, - this.handleReactionEvent, - ); - this.rtcSession.room.off( - MatrixRoomEvent.Redaction, - this.handleReactionEvent, - ); - this.rtcSession.room.client.off( - MatrixEventEvent.Decrypted, - this.handleReactionEvent, - ); - this.rtcSession.room.off( - MatrixRoomEvent.LocalEchoUpdated, - this.handleReactionEvent, - ); - this.reactionsSub.unsubscribe(); - } } diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6b07dacc..bca7c964 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -110,6 +110,7 @@ import ringtoneMp3 from "../sound/ringtone.mp3?url"; import ringtoneOgg from "../sound/ringtone.ogg?url"; import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx"; import { type Layout } from "../state/layout-types.ts"; +import { ObservableScope } from "../state/ObservableScope.ts"; const maxTapDurationMs = 400; @@ -129,8 +130,10 @@ export const ActiveCall: FC = (props) => { const trackProcessorState$ = useTrackProcessorObservable$(); useEffect(() => { - const reactionsReader = new ReactionsReader(props.rtcSession); + const scope = new ObservableScope(); + const reactionsReader = new ReactionsReader(scope, props.rtcSession); const vm = new CallViewModel( + scope, props.rtcSession, props.matrixRoom, mediaDevices, @@ -146,11 +149,9 @@ export const ActiveCall: FC = (props) => { ); setVm(vm); - const sub = vm.leave$.subscribe(props.onLeft); + vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); return (): void => { - vm.destroy(); - sub.unsubscribe(); - reactionsReader.destroy(); + scope.end(); }; }, [ props.rtcSession, diff --git a/src/room/MuteStates.test.tsx b/src/room/MuteStates.test.tsx index d34f4d39..530b5050 100644 --- a/src/room/MuteStates.test.tsx +++ b/src/room/MuteStates.test.tsx @@ -108,9 +108,7 @@ function mockMediaDevices( throw new Error("Unimplemented"); } }); - const scope = new ObservableScope(); - onTestFinished(() => scope.end()); - return new MediaDevices(scope); + return new MediaDevices(testScope()); } describe("useMuteStates VITE_PACKAGE='full' (SPA) mode", () => { diff --git a/src/state/AppViewModel.ts b/src/state/AppViewModel.ts index 5f65c226..7ad91e9d 100644 --- a/src/state/AppViewModel.ts +++ b/src/state/AppViewModel.ts @@ -6,14 +6,16 @@ Please see LICENSE in the repository root for full details. */ import { MediaDevices } from "./MediaDevices"; -import { ViewModel } from "./ViewModel"; +import { type ObservableScope } from "./ObservableScope"; /** * The top-level state holder for the application. */ -export class AppViewModel extends ViewModel { +export class AppViewModel { public readonly mediaDevices = new MediaDevices(this.scope); // TODO: Move more application logic here. The CallViewModel, at the very // least, ought to be accessible from this object. + + public constructor(private readonly scope: ObservableScope) {} } diff --git a/src/state/CallViewModel.test.ts b/src/state/CallViewModel.test.ts index 52d13ca4..035f545a 100644 --- a/src/state/CallViewModel.test.ts +++ b/src/state/CallViewModel.test.ts @@ -60,6 +60,7 @@ import { mockMediaDevices, mockMuteStates, mockConfig, + testScope, } from "../utils/test"; import { ECAddonConnectionState, @@ -89,7 +90,6 @@ import { localRtcMember, localRtcMemberDevice2, } from "../utils/test-fixtures"; -import { ObservableScope } from "./ObservableScope"; import { MediaDevices } from "./MediaDevices"; import { getValue } from "../utils/observable"; import { type Behavior, constant } from "./Behavior"; @@ -347,6 +347,7 @@ function withCallViewModel( const reactions$ = new BehaviorSubject>({}); const vm = new CallViewModel( + testScope(), rtcSession.asMockedSession(), room, mediaDevices, @@ -361,7 +362,6 @@ function withCallViewModel( ); onTestFinished(() => { - vm!.destroy(); participantsSpy!.mockRestore(); mediaSpy!.mockRestore(); eventsSpy!.mockRestore(); @@ -402,6 +402,7 @@ test("test missing RTC config error", async () => { vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({}); const callVM = new CallViewModel( + testScope(), fakeRtcSession.asMockedSession(), matrixRoom, mockMediaDevices({}), @@ -1630,9 +1631,7 @@ test("audio output changes when toggling earpiece mode", () => { getUrlParams.mockReturnValue({ controlledAudioDevices: true }); vi.mocked(ComponentsCore.createMediaDeviceObserver).mockReturnValue(of([])); - const scope = new ObservableScope(); - onTestFinished(() => scope.end()); - const devices = new MediaDevices(scope); + const devices = new MediaDevices(testScope()); window.controls.setAvailableAudioDevices([ { id: "speaker", name: "Speaker", isSpeaker: true }, diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6b046b28..6d7937de 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -73,7 +73,6 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { type IWidgetApiRequest } from "matrix-widget-api"; -import { ViewModel } from "./ViewModel"; import { LocalUserMediaViewModel, type MediaViewModel, @@ -84,7 +83,7 @@ import { import { accumulate, and$, - finalizeValue, + generateKeyed$, pauseWhen, } from "../utils/observable"; import { @@ -176,7 +175,7 @@ interface LayoutScanState { type MediaItem = UserMedia | ScreenShare; -export class CallViewModel extends ViewModel { +export class CallViewModel { private readonly urlParams = getUrlParams(); private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); @@ -755,80 +754,76 @@ export class CallViewModel extends ViewModel { ); /** - * List of MediaItems that we want to display + * List of MediaItems that we want to have tiles for. */ private readonly mediaItems$ = this.scope.behavior( - combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( - scan((prevItems, [participantsByRoom, duplicateTiles]) => { - const newItems: Map = new Map( - function* (this: CallViewModel): Iterable<[string, MediaItem]> { - for (const { - livekitRoom, - participants, - url, - } of participantsByRoom) { - for (const { id, participant, member } of participants) { - for (let i = 0; i < 1 + duplicateTiles; i++) { - const mediaId = `${id}:${i}`; - const prevMedia = prevItems.get(mediaId); - if (prevMedia instanceof UserMedia) - prevMedia.updateParticipant(participant); + generateKeyed$< + [typeof this.participantsByRoom$.value, number], + MediaItem, + MediaItem[] + >( + combineLatest([this.participantsByRoom$, duplicateTiles.value$]), + ([participantsByRoom, duplicateTiles], createOrGet) => { + const items: MediaItem[] = []; - yield [ + for (const { livekitRoom, participants, url } of participantsByRoom) { + for (const { id, participant, member } of participants) { + for (let i = 0; i < 1 + duplicateTiles; i++) { + const mediaId = `${id}:${i}`; + const item = createOrGet( + mediaId, + (scope) => + // We create UserMedia with or without a participant. + // This will be the initial value of a BehaviourSubject. + // Once a participant appears we will update the BehaviourSubject. (see below) + new UserMedia( + scope, mediaId, - // We create UserMedia with or without a participant. - // This will be the initial value of a BehaviourSubject. - // Once a participant appears we will update the BehaviourSubject. (see above) - prevMedia ?? - new UserMedia( - mediaId, + member, + participant, + this.options.encryptionSystem, + livekitRoom, + url, + this.mediaDevices, + this.pretendToBeDisconnected$, + this.memberDisplaynames$.pipe( + map((m) => m.get(id) ?? "[👻]"), + ), + this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), + this.reactions$.pipe(map((v) => v[id] ?? undefined)), + ), + ); + items.push(item); + (item as UserMedia).updateParticipant(participant); + + if (participant?.isScreenShareEnabled) { + const screenShareId = `${mediaId}:screen-share`; + items.push( + createOrGet( + screenShareId, + (scope) => + new ScreenShare( + scope, + screenShareId, member, participant, this.options.encryptionSystem, livekitRoom, url, - this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(id) ?? "[👻]"), ), - this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), - this.reactions$.pipe(map((v) => v[id] ?? undefined)), ), - ]; - - if (participant?.isScreenShareEnabled) { - const screenShareId = `${mediaId}:screen-share`; - yield [ - screenShareId, - prevItems.get(screenShareId) ?? - new ScreenShare( - screenShareId, - member, - participant, - this.options.encryptionSystem, - livekitRoom, - url, - this.pretendToBeDisconnected$, - this.memberDisplaynames$.pipe( - map((m) => m.get(id) ?? "[👻]"), - ), - ), - ]; - } - } + ), + ); } } - }.bind(this)(), - ); + } + } - for (const [id, t] of prevItems) if (!newItems.has(id)) t.destroy(); - return newItems; - }, new Map()), - map((mediaItems) => [...mediaItems.values()]), - finalizeValue((ts) => { - for (const t of ts) t.destroy(); - }), + return items; + }, ), ); @@ -1739,6 +1734,7 @@ export class CallViewModel extends ViewModel { : null; public constructor( + private readonly scope: ObservableScope, // A call is permanently tied to a single Matrix room private readonly matrixRTCSession: MatrixRTCSession, private readonly matrixRoom: MatrixRoom, @@ -1753,8 +1749,6 @@ export class CallViewModel extends ViewModel { >, private readonly trackProcessorState$: Observable, ) { - super(); - // Start and stop local and remote connections as needed this.connectionInstructions$ .pipe(this.scope.bind()) diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index de09e24d..0b79183e 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -46,7 +46,6 @@ import { throttleTime, } from "rxjs"; -import { ViewModel } from "./ViewModel"; import { alwaysShowSelf } from "../settings/settings"; import { showConnectionStats } from "../settings/settings"; import { accumulate } from "../utils/observable"; @@ -56,6 +55,7 @@ import { type ReactionOption } from "../reactions"; import { platform } from "../Platform"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior } from "./Behavior"; +import { type ObservableScope } from "./ObservableScope"; export function observeTrackReference$( participant: Participant, @@ -216,7 +216,7 @@ export enum EncryptionStatus { PasswordInvalid, } -abstract class BaseMediaViewModel extends ViewModel { +abstract class BaseMediaViewModel { /** * The LiveKit video track for this media. */ @@ -246,6 +246,7 @@ abstract class BaseMediaViewModel extends ViewModel { } public constructor( + protected readonly scope: ObservableScope, /** * An opaque identifier for this media. */ @@ -269,8 +270,6 @@ abstract class BaseMediaViewModel extends ViewModel { public readonly focusURL: string, public readonly displayName$: Behavior, ) { - super(); - const audio$ = this.observeTrackReference$(audioSource); this.video$ = this.observeTrackReference$(videoSource); @@ -403,6 +402,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public readonly cropVideo$: Behavior = this._cropVideo$; public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -414,6 +414,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public readonly reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -537,6 +538,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Behavior, @@ -549,6 +551,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -645,6 +648,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -657,6 +661,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -742,6 +747,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -753,6 +759,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public readonly local: boolean, ) { super( + scope, id, member, participant$, diff --git a/src/state/ScreenShare.ts b/src/state/ScreenShare.ts index e6fa81ec..9803a5f4 100644 --- a/src/state/ScreenShare.ts +++ b/src/state/ScreenShare.ts @@ -11,7 +11,7 @@ import { type Room as LivekitRoom, } from "livekit-client"; -import { ObservableScope } from "./ObservableScope.ts"; +import { type ObservableScope } from "./ObservableScope.ts"; import { ScreenShareViewModel } from "./MediaViewModel.ts"; import type { RoomMember } from "matrix-js-sdk"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; @@ -23,10 +23,10 @@ import type { Behavior } from "./Behavior.ts"; * ObservableScope for behaviors that the view model depends on. */ export class ScreenShare { - private readonly scope = new ObservableScope(); public readonly vm: ScreenShareViewModel; public constructor( + private readonly scope: ObservableScope, id: string, member: RoomMember, participant: LocalParticipant | RemoteParticipant, @@ -37,6 +37,7 @@ export class ScreenShare { displayName$: Observable, ) { this.vm = new ScreenShareViewModel( + this.scope, id, member, of(participant), @@ -48,9 +49,4 @@ export class ScreenShare { participant.isLocal, ); } - - public destroy(): void { - this.scope.end(); - this.vm.destroy(); - } } diff --git a/src/state/TileStore.ts b/src/state/TileStore.ts index 85bf8bc7..04633fb9 100644 --- a/src/state/TileStore.ts +++ b/src/state/TileStore.ts @@ -44,10 +44,6 @@ class SpotlightTileData { this.maximised$ = new BehaviorSubject(maximised); this.vm = new SpotlightTileViewModel(this.media$, this.maximised$); } - - public destroy(): void { - this.vm.destroy(); - } } class GridTileData { @@ -65,14 +61,10 @@ class GridTileData { this.media$ = new BehaviorSubject(media); this.vm = new GridTileViewModel(this.media$); } - - public destroy(): void { - this.vm.destroy(); - } } /** - * A collection of tiles to be mapped to a layout. + * An immutable collection of tiles to be mapped to a layout. */ export class TileStore { private constructor( @@ -288,13 +280,6 @@ export class TileStoreBuilder { ); } - // Destroy unused tiles - if (this.spotlight === null && this.prevSpotlight !== null) - this.prevSpotlight.destroy(); - const gridEntries = new Set(grid); - for (const entry of this.prevGrid) - if (!gridEntries.has(entry)) entry.destroy(); - return this.construct(this.spotlight, grid); } } diff --git a/src/state/TileViewModel.ts b/src/state/TileViewModel.ts index 5e5eb29c..a645a0d1 100644 --- a/src/state/TileViewModel.ts +++ b/src/state/TileViewModel.ts @@ -5,7 +5,6 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { ViewModel } from "./ViewModel"; import { type MediaViewModel, type UserMediaViewModel } from "./MediaViewModel"; import { type Behavior } from "./Behavior"; @@ -14,21 +13,17 @@ function createId(): string { return (nextId++).toString(); } -export class GridTileViewModel extends ViewModel { +export class GridTileViewModel { public readonly id = createId(); - public constructor(public readonly media$: Behavior) { - super(); - } + public constructor(public readonly media$: Behavior) {} } -export class SpotlightTileViewModel extends ViewModel { +export class SpotlightTileViewModel { public constructor( public readonly media$: Behavior, public readonly maximised$: Behavior, - ) { - super(); - } + ) {} } export type TileViewModel = GridTileViewModel | SpotlightTileViewModel; diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 42016f7c..65bd4e92 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -22,7 +22,7 @@ import { } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; -import { ObservableScope } from "./ObservableScope.ts"; +import { type ObservableScope } from "./ObservableScope.ts"; import { LocalUserMediaViewModel, RemoteUserMediaViewModel, @@ -75,11 +75,11 @@ enum SortingBin { * for inclusion in the call layout. */ export class UserMedia { - private readonly scope = new ObservableScope(); private readonly participant$ = new BehaviorSubject(this.initialParticipant); public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal ? new LocalUserMediaViewModel( + this.scope, this.id, this.member, this.participant$ as Behavior, @@ -92,6 +92,7 @@ export class UserMedia { this.scope.behavior(this.reaction$), ) : new RemoteUserMediaViewModel( + this.scope, this.id, this.member, this.participant$ as Observable, @@ -144,6 +145,7 @@ export class UserMedia { ); public constructor( + private readonly scope: ObservableScope, public readonly id: string, private readonly member: RoomMember, private readonly initialParticipant: @@ -168,11 +170,6 @@ export class UserMedia { this.participant$.next(newParticipant); } } - - public destroy(): void { - this.scope.end(); - this.vm.destroy(); - } } export function sharingScreen$(p: Participant): Observable { diff --git a/src/state/ViewModel.ts b/src/state/ViewModel.ts deleted file mode 100644 index e83ae82b..00000000 --- a/src/state/ViewModel.ts +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2023, 2024 New Vector Ltd. - -SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -Please see LICENSE in the repository root for full details. -*/ - -import { ObservableScope } from "./ObservableScope"; - -/** - * An MVVM view model. - */ -export abstract class ViewModel { - protected readonly scope = new ObservableScope(); - - /** - * Instructs the ViewModel to clean up its resources. If you forget to call - * this, there may be memory leaks! - */ - public destroy(): void { - this.scope.end(); - } -} diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index 5f488fb1..e039c846 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -6,9 +6,10 @@ Please see LICENSE in the repository root for full details. */ import { test } from "vitest"; +import { Subject } from "rxjs"; import { withTestScheduler } from "./test"; -import { pauseWhen } from "./observable"; +import { generateKeyed$, pauseWhen } from "./observable"; test("pauseWhen", () => { withTestScheduler(({ behavior, expectObservable }) => { @@ -22,3 +23,43 @@ test("pauseWhen", () => { ).toBe(outputMarbles); }); }); + +test("generateKeyed$ has the right output and ends scopes at the right times", () => { + const scope1$ = new Subject(); + const scope2$ = new Subject(); + const scope3$ = new Subject(); + const scope4$ = new Subject(); + const scopeSubjects = [scope1$, scope2$, scope3$, scope4$]; + + withTestScheduler(({ hot, expectObservable }) => { + // Each scope should start when the input number reaches or surpasses their + // number and end when the input number drops back below their number. + // At the very end, unsubscribing should end all remaining scopes. + const inputMarbles = " 123242"; + const outputMarbles = " abcbdb"; + const subscriptionMarbles = "^-----!"; + const scope1Marbles = " y-----n"; + const scope2Marbles = " -y----n"; + const scope3Marbles = " --ynyn"; + const scope4Marbles = " ----yn"; + + expectObservable( + generateKeyed$(hot(inputMarbles), (input, createOrGet) => { + for (let i = 1; i <= +input; i++) { + createOrGet(i.toString(), (scope) => { + scopeSubjects[i - 1].next("y"); + scope.onEnd(() => scopeSubjects[i - 1].next("n")); + return i.toString(); + }); + } + return "abcd"[+input - 1]; + }), + subscriptionMarbles, + ).toBe(outputMarbles); + + expectObservable(scope1$).toBe(scope1Marbles); + expectObservable(scope2$).toBe(scope2Marbles); + expectObservable(scope3$).toBe(scope3Marbles); + expectObservable(scope4$).toBe(scope4Marbles); + }); +}); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 74acfaf2..8b141ef4 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -23,6 +23,7 @@ import { } from "rxjs"; import { type Behavior } from "../state/Behavior"; +import { ObservableScope } from "../state/ObservableScope"; const nothing = Symbol("nothing"); @@ -117,3 +118,56 @@ export function pauseWhen(pause$: Behavior) { map(([value]) => value), ); } + +/** + * Maps a changing input value to an output value consisting of items that have + * automatically generated ObservableScopes tied to a key. Items will be + * automatically created when their key is requested for the first time, reused + * when the same key is requested at a later time, and destroyed (have their + * scope ended) when the key is no longer requested. + */ +export function generateKeyed$( + input$: Observable, + project: ( + input: In, + createOrGet: ( + key: string, + factory: (scope: ObservableScope) => Item, + ) => Item, + ) => Out, +): Observable { + return input$.pipe( + scan< + In, + { + items: Map; + output: Out; + }, + { items: Map } + >( + (state, data) => { + const nextItems = new Map< + string, + { item: Item; scope: ObservableScope } + >(); + const output = project(data, (key, factory) => { + let item = state.items.get(key); + if (item === undefined) { + const scope = new ObservableScope(); + item = { item: factory(scope), scope }; + } + nextItems.set(key, item); + return item.item; + }); + for (const [key, { scope }] of state.items) + if (!nextItems.has(key)) scope.end(); + return { items: nextItems, output }; + }, + { items: new Map() }, + ), + finalizeValue((state) => { + for (const { scope } of state.items.values()) scope.end(); + }), + map(({ output }) => output), + ); +} diff --git a/src/utils/test-viewmodel.ts b/src/utils/test-viewmodel.ts index d6a4bb10..fa6bb691 100644 --- a/src/utils/test-viewmodel.ts +++ b/src/utils/test-viewmodel.ts @@ -27,6 +27,7 @@ import { mockMediaDevices, mockMuteStates, MockRTCSession, + testScope, } from "./test"; import { aliceRtcMember, localRtcMember } from "./test-fixtures"; import { type RaisedHandInfo, type ReactionInfo } from "../reactions"; @@ -134,6 +135,7 @@ export function getBasicCallViewModelEnvironment( // const remoteParticipants$ = of([aliceParticipant]); const vm = new CallViewModel( + testScope(), rtcSession.asMockedSession(), matrixRoom, mockMediaDevices({}), diff --git a/src/utils/test.ts b/src/utils/test.ts index d50c1af4..49a9800a 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { map, type Observable, of, type SchedulerLike } from "rxjs"; import { type RunHelpers, TestScheduler } from "rxjs/testing"; -import { expect, type MockedObject, vi, vitest } from "vitest"; +import { expect, type MockedObject, onTestFinished, vi, vitest } from "vitest"; import { type RoomMember, type Room as MatrixRoom, @@ -89,6 +89,15 @@ interface TestRunnerGlobal { rxjsTestScheduler?: SchedulerLike; } +/** + * Create a new ObservableScope which ends when the current test ends. + */ +export function testScope(): ObservableScope { + const scope = new ObservableScope(); + onTestFinished(() => scope.end()); + return scope; +} + /** * Run Observables with a scheduler that virtualizes time, for testing purposes. */ @@ -267,6 +276,7 @@ export async function withLocalMedia( continuation: (vm: LocalUserMediaViewModel) => void | Promise, ): Promise { const vm = new LocalUserMediaViewModel( + testScope(), "local", mockMatrixRoomMember(localRtcMember, roomMember), constant(localParticipant), @@ -280,11 +290,8 @@ export async function withLocalMedia( constant(null), constant(null), ); - try { - await continuation(vm); - } finally { - vm.destroy(); - } + // TODO: Simplify to just return the view model + await continuation(vm); } export function mockRemoteParticipant( @@ -308,6 +315,7 @@ export async function withRemoteMedia( ): Promise { const remoteParticipant = mockRemoteParticipant(participant); const vm = new RemoteUserMediaViewModel( + testScope(), "remote", mockMatrixRoomMember(localRtcMember, roomMember), of(remoteParticipant), @@ -321,11 +329,8 @@ export async function withRemoteMedia( constant(null), constant(null), ); - try { - await continuation(vm); - } finally { - vm.destroy(); - } + // TODO: Simplify to just return the view model + await continuation(vm); } export function mockConfig(config: Partial = {}): void {