From 28047217b85e2e6f491c887ac7099499662fa46e Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 7 Nov 2025 12:32:29 +0100 Subject: [PATCH] Almost running - NEVER use undefined as the default for behaviors (FOOTGUN) --- .../CallNotificationLifecycle.ts | 2 +- src/state/CallViewModel/CallViewModel.ts | 13 +++++++-- src/state/CallViewModel/callPickupState$.ts | 0 .../localMember/LocalMembership.ts | 5 ++-- .../localMember/LocalTransport.ts | 16 ++++++----- .../CallViewModel/localMember/Publisher.ts | 1 + .../remoteMembers/ConnectionManager.ts | 8 +++++- .../remoteMembers/MatrixLivekitMembers.ts | 2 +- .../remoteMembers/displayname.ts | 12 ++++++++ src/state/ObservableScope.test.ts | 21 +++++++++++++- src/state/ObservableScope.ts | 28 +++++++++---------- src/state/SessionBehaviors.ts | 3 +- src/state/UserMedia.ts | 6 ++-- 13 files changed, 83 insertions(+), 34 deletions(-) delete mode 100644 src/state/CallViewModel/callPickupState$.ts diff --git a/src/state/CallViewModel/CallNotificationLifecycle.ts b/src/state/CallViewModel/CallNotificationLifecycle.ts index baf2b665..40826d07 100644 --- a/src/state/CallViewModel/CallNotificationLifecycle.ts +++ b/src/state/CallViewModel/CallNotificationLifecycle.ts @@ -146,7 +146,7 @@ export function createCallNotificationLifecycle$({ newAndLegacyEvents?.[0].notification_type === "ring", ), map((e) => e as CallNotificationWrapper), - switchMap(([notificastionEvent]) => { + switchMap(([notificationEvent]) => { const lifetimeMs = notificationEvent?.lifetime ?? 0; return concat( lifetimeMs === 0 diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index d2931623..f3c1bca9 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -109,6 +109,7 @@ import { createReceivedDecline$, createSentCallNotification$, } from "./CallNotificationLifecycle.ts"; +import { createRoomMembers$ } from "./remoteMembers/displayname.ts"; const logger = rootLogger.getChild("[CallViewModel]"); //TODO @@ -266,6 +267,7 @@ export class CallViewModel { // ------------------------------------------------------------------------ // CallNotificationLifecycle + // consider inlining these!!! private sentCallNotification$ = createSentCallNotification$( this.scope, this.matrixRTCSession, @@ -281,6 +283,9 @@ export class CallViewModel { localUser: { userId: this.userId, deviceId: this.deviceId }, }); + // ------------------------------------------------------------------------ + // ROOM MEMBER tracking TODO + private roomMembers$ = createRoomMembers$(this.scope, this.matrixRoom); /** * If there is a configuration error with the call (e.g. misconfigured E2EE). * This is a fatal error that prevents the call from being created/joined. @@ -440,6 +445,7 @@ export class CallViewModel { mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), ), ), + [], ); public readonly joinSoundEffect$ = this.userMedia$.pipe( @@ -465,6 +471,9 @@ export class CallViewModel { this.memberships$.pipe(map((ms) => ms.value.length)), ); + // only public to expose to the view. + public readonly callPickupState$ = this.callLifecycle.callPickupState$; + public readonly leaveSoundEffect$ = combineLatest([ this.callLifecycle.callPickupState$, this.userMedia$, @@ -645,7 +654,6 @@ export class CallViewModel { private readonly naturalWindowMode$ = this.scope.behavior( fromEvent(window, "resize").pipe( - startWith(null), map(() => { const height = window.innerHeight; const width = window.innerWidth; @@ -658,6 +666,7 @@ export class CallViewModel { return "normal"; }), ), + "normal", ); /** @@ -687,7 +696,6 @@ export class CallViewModel { // automatically switch to spotlight mode and reset when screen sharing ends this.scope.behavior( this.gridModeUserSelection$.pipe( - startWith(null), switchMap((userSelection) => (userSelection === "spotlight" ? EMPTY @@ -706,6 +714,7 @@ export class CallViewModel { ).pipe(startWith(userSelection ?? "grid")), ), ), + "grid", ); public setGridMode(value: GridMode): void { diff --git a/src/state/CallViewModel/callPickupState$.ts b/src/state/CallViewModel/callPickupState$.ts deleted file mode 100644 index e69de29b..00000000 diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 1bbbcb7d..96edd8da 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -288,7 +288,7 @@ export const createLocalMembership$ = ({ }; const requestConnect = (): LocalMemberConnectionState => { - if (state.livekit$.value === null) { + if (state.livekit$.value.state === LivekitState.Uninitialized) { startTracks(); state.livekit$.next({ state: LivekitState.Connecting }); combineLatest([publisher$, tracks$], (publisher, tracks) => { @@ -302,7 +302,7 @@ export const createLocalMembership$ = ({ }); }); } - if (state.matrix$.value.state !== MatrixState.Disconnected) { + if (state.matrix$.value.state === MatrixState.Disconnected) { state.matrix$.next({ state: MatrixState.Connecting }); localTransport$.pipe( tap((transport) => { @@ -438,6 +438,7 @@ export const createLocalMembership$ = ({ return of(false); }), ), + null, ); const toggleScreenSharing = diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index d4474897..69c9b934 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -67,18 +67,22 @@ export const createLocalTransport$ = ({ */ const oldestMemberTransport$ = scope.behavior( memberships$.pipe( - mapEpoch((memberships) => memberships[0].getTransport(memberships[0])), - first((t) => t != undefined && isLivekitTransport(t)), + mapEpoch( + (memberships) => memberships[0]?.getTransport(memberships[0]) ?? null, + ), + first((t) => t != null && isLivekitTransport(t)), ), - undefined, + null, ); /** * The transport that we would personally prefer to publish on (if not for the * transport preferences of others, perhaps). */ - const preferredTransport$: Behavior = - scope.behavior(from(makeTransport(client, roomId)), undefined); + const preferredTransport$: Behavior = scope.behavior( + from(makeTransport(client, roomId)), + null, + ); /** * The transport we should advertise in our MatrixRTC membership. @@ -89,7 +93,6 @@ export const createLocalTransport$ = ({ (useOldestMember, oldestMemberTransport, preferredTransport) => useOldestMember ? oldestMemberTransport : preferredTransport, ).pipe(distinctUntilChanged(deepCompare)), - undefined, ); return advertisedTransport$; }; @@ -103,7 +106,6 @@ async function makeTransportInternal( logger.log("Searching for a preferred transport"); //TODO refactor this to use the jwt service returned alias. const livekitAlias = roomId; - // TODO-MULTI-SFU: Either remove this dev tool or make it more official const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth") ?? diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index f5a36e99..9be50bde 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -307,6 +307,7 @@ export class Publisher { return track instanceof LocalVideoTrack ? track : null; }), ), + null, ); trackProcessorSync(track$, trackerProcessorState$); } diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 485fae1b..2859e49b 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -13,7 +13,7 @@ import { type LivekitTransport, type ParticipantId, } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; +import { BehaviorSubject, combineLatest, map, of, switchMap } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; @@ -191,6 +191,11 @@ export function createConnectionManager$({ ); }); + // probably not required + if (listOfConnectionsWithPublishingParticipants.length === 0) { + return of(new Epoch(new ConnectionManagerData(), epoch)); + } + // combineLatest the several streams into a single stream with the ConnectionManagerData return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( map( @@ -206,6 +211,7 @@ export function createConnectionManager$({ ); }), ), + new Epoch(new ConnectionManagerData()), ); return { transports$, connectionManagerData$, connections$ }; diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 5703fbd4..544f5241 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -17,7 +17,6 @@ import { combineLatest, filter, map } from "rxjs"; // eslint-disable-next-line rxjs/no-internal import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; -import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; @@ -56,6 +55,7 @@ interface Props { // => Extract an AvatarService instead? // Better with just `getMember` matrixRoom: Pick & NodeStyleEventEmitter; + roomMember$: Behavior>; } // Alternative structure idea: // const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable => { diff --git a/src/state/CallViewModel/remoteMembers/displayname.ts b/src/state/CallViewModel/remoteMembers/displayname.ts index 07ff3f59..c8484a9a 100644 --- a/src/state/CallViewModel/remoteMembers/displayname.ts +++ b/src/state/CallViewModel/remoteMembers/displayname.ts @@ -26,6 +26,17 @@ import { } from "../../../utils/displayname"; import { type Behavior } from "../../Behavior"; +export function createRoomMembers$( + scope: ObservableScope, + matrixRoom: MatrixRoom, +): Behavior[]> { + return scope.behavior( + fromEvent(matrixRoom, RoomStateEvent.Members).pipe( + map(() => matrixRoom.getMembers()), + ), + [], + ); +} /** * Displayname for each member of the call. This will disambiguate * any displayname that clashes with another member. Only members @@ -37,6 +48,7 @@ import { type Behavior } from "../../Behavior"; export const memberDisplaynames$ = ( scope: ObservableScope, matrixRoom: Pick & NodeStyleEventEmitter, + // roomMember$: Behavior>; memberships$: Observable>, ): Behavior>> => scope.behavior( diff --git a/src/state/ObservableScope.test.ts b/src/state/ObservableScope.test.ts index d53084da..19fea76c 100644 --- a/src/state/ObservableScope.test.ts +++ b/src/state/ObservableScope.test.ts @@ -7,8 +7,14 @@ Please see LICENSE in the repository root for full details. import { describe, expect, it } from "vitest"; -import { Epoch, mapEpoch, trackEpoch } from "./ObservableScope"; +import { + Epoch, + mapEpoch, + ObservableScope, + trackEpoch, +} from "./ObservableScope"; import { withTestScheduler } from "../utils/test"; +import { BehaviorSubject, timer } from "rxjs"; describe("Epoch", () => { it("should map the value correctly", () => { @@ -53,4 +59,17 @@ describe("Epoch", () => { }); }); }); + it("obs", () => { + const nothing = Symbol("nothing"); + const scope = new ObservableScope(); + const sb$ = new BehaviorSubject("initial"); + const su$ = new BehaviorSubject(undefined); + expect(sb$.value).toBe("initial"); + expect(su$.value).toBe(undefined); + expect(su$.value === nothing).toBe(false); + + const a$ = timer(10); + + scope.behavior(a$, undefined); + }); }); diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index fbf92ada..d1d6c297 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -24,7 +24,7 @@ import { type Behavior } from "./Behavior"; type MonoTypeOperator = (o: Observable) => Observable; -const nothing = Symbol("nothing"); +export const noInitialValue = Symbol("nothing"); /** * A scope which limits the execution lifetime of its bound Observables. @@ -59,10 +59,7 @@ export class ObservableScope { * Converts an Observable to a Behavior. If no initial value is specified, the * Observable must synchronously emit an initial value. */ - public behavior( - setValue$: Observable, - initialValue: T | typeof nothing = nothing, - ): Behavior { + public behavior(setValue$: Observable, initialValue?: T): Behavior { const subject$ = new BehaviorSubject(initialValue); // Push values from the Observable into the BehaviorSubject. // BehaviorSubjects have an undesirable feature where if you call 'complete', @@ -77,7 +74,7 @@ export class ObservableScope { subject$.error(err); }, }); - if (subject$.value === nothing) + if (subject$.value === noInitialValue) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } @@ -118,27 +115,27 @@ export class ObservableScope { value$: Behavior, callback: (value: T) => Promise<(() => Promise) | void>, ): void { - let latestValue: T | typeof nothing = nothing; - let reconciledValue: T | typeof nothing = nothing; + let latestValue: T | typeof noInitialValue = noInitialValue; + let reconciledValue: T | typeof noInitialValue = noInitialValue; let cleanUp: (() => Promise) | void = undefined; value$ .pipe( catchError(() => EMPTY), // Ignore errors this.bind(), // Limit to the duration of the scope - endWith(nothing), // Clean up when the scope ends + endWith(noInitialValue), // Clean up when the scope ends ) .subscribe((value) => { void (async (): Promise => { - if (latestValue === nothing) { + if (latestValue === noInitialValue) { latestValue = value; while (latestValue !== reconciledValue) { await cleanUp?.(); // Call the previous value's clean-up handler reconciledValue = latestValue; - if (latestValue !== nothing) + if (latestValue !== noInitialValue) cleanUp = await callback(latestValue); // Sync current value } // Reset to signal that reconciliation is done for now - latestValue = nothing; + latestValue = noInitialValue; } else { // There's already an instance of the above 'while' loop running // concurrently. Just update the latest value and let it be handled. @@ -176,11 +173,11 @@ export const globalScope = new ObservableScope(); * * # Use Epoch * ``` - * const rootObs$ = of(1,2,3).pipe(trackEpoch()); - * const derivedObs$ = rootObs$.pipe( + * const ancestorObs$ = of(1,2,3).pipe(trackEpoch()); + * const derivedObs$ = ancestorObs$.pipe( * mapEpoch((v)=> "this number: " + v) * ); - * const otherDerivedObs$ = rootObs$.pipe( + * const otherDerivedObs$ = ancestorObs$.pipe( * mapEpoch((v)=> "multiplied by: " + v) * ); * const mergedObs$ = combineLatest([derivedObs$, otherDerivedObs$]).pipe( @@ -241,6 +238,7 @@ export function mapEpoch( ): OperatorFunction, Epoch> { return map((e) => e.mapInner(mapFn)); } + /** * # usage * ``` diff --git a/src/state/SessionBehaviors.ts b/src/state/SessionBehaviors.ts index d44ad33a..7d38ac3d 100644 --- a/src/state/SessionBehaviors.ts +++ b/src/state/SessionBehaviors.ts @@ -15,7 +15,7 @@ import { import { fromEvent } from "rxjs"; import { - type Epoch, + Epoch, mapEpoch, trackEpoch, type ObservableScope, @@ -76,5 +76,6 @@ export const createMemberships$ = ( MatrixRTCSessionEvent.MembershipsChanged, (_, memberships: CallMembership[]) => memberships, ).pipe(trackEpoch()), + new Epoch([]), ); }; diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 65bd4e92..55de2061 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -111,7 +111,7 @@ export class UserMedia { private readonly presenter$ = this.scope.behavior( this.participant$.pipe( - switchMap((p) => (p === undefined ? of(false) : sharingScreen$(p))), + switchMap((p) => (p === null ? of(false) : sharingScreen$(p))), ), ); @@ -151,7 +151,7 @@ export class UserMedia { private readonly initialParticipant: | LocalParticipant | RemoteParticipant - | undefined, + | null = null, private readonly encryptionSystem: EncryptionSystem, private readonly livekitRoom: LivekitRoom, private readonly focusURL: string, @@ -163,7 +163,7 @@ export class UserMedia { ) {} public updateParticipant( - newParticipant: LocalParticipant | RemoteParticipant | undefined, + newParticipant: LocalParticipant | RemoteParticipant | null = null, ): void { if (this.participant$.value !== newParticipant) { // Update the BehaviourSubject in the UserMedia.