From e5117f705d78113e8c236ba0d6e1cd6e3c50a9cb Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 27 Nov 2025 14:42:23 +0100 Subject: [PATCH] More testing and cleaning up --- .../localMember/LocalMembership.test.ts | 252 +++++++++++++++--- .../localMember/LocalMembership.ts | 171 ++++++------ .../CallViewModel/localMember/Publisher.ts | 18 ++ src/state/ObservableScope.ts | 7 +- 4 files changed, 321 insertions(+), 127 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index a3bfe158..6c6c3d6e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -14,7 +14,7 @@ import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type LocalParticipant } from "livekit-client"; +import { type LocalParticipant, type LocalTrack } from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { @@ -34,6 +34,7 @@ import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { type Connection } from "../remoteMembers/Connection"; +import { type Publisher } from "./Publisher"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); @@ -235,44 +236,54 @@ describe("LocalMembership", () => { }); }); - it("recreates publisher if new connection is used", async () => { + const aTransport = { + livekit_service_url: "a", + } as LivekitTransport; + const bTransport = { + livekit_service_url: "b", + } as LivekitTransport; + + const connectionManagerData = new ConnectionManagerData(); + + connectionManagerData.add( + { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: aTransport, + } as unknown as Connection, + [], + ); + connectionManagerData.add( + { + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: bTransport, + } as unknown as Connection, + [], + ); + + it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { const scope = new ObservableScope(); - const aTransport = { - livekit_service_url: "a", - } as LivekitTransport; - const bTransport = { - livekit_service_url: "b", - } as LivekitTransport; const localTransport$ = new BehaviorSubject(aTransport); - const connectionManagerData = new ConnectionManagerData(); + const publishers: Publisher[] = []; - connectionManagerData.add( - { - livekitRoom: mockLivekitRoom({ - localParticipant: { - isScreenShareEnabled: false, - trackPublications: [], - } as unknown as LocalParticipant, - }), - state$: constant({ - state: "ConnectedToLkRoom", - }), - transport: aTransport, - } as unknown as Connection, - [], + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { stopPublishing: vi.fn(), stopTracks: vi.fn() }; + publishers.push(p as unknown as Publisher); + return p; + }, ); - connectionManagerData.add( - { - state$: constant({ - state: "ConnectedToLkRoom", - }), - transport: bTransport, - } as unknown as Connection, - [], - ); - const publisherFactory = defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< typeof vi.fn @@ -290,7 +301,182 @@ describe("LocalMembership", () => { localTransport$.next(bTransport); await flushPromises(); expect(publisherFactory).toHaveBeenCalledTimes(2); + expect(publishers.length).toBe(2); + // stop the first Publisher and let the second one life. + expect(publishers[0].stopTracks).toHaveBeenCalled(); + expect(publishers[1].stopTracks).not.toHaveBeenCalled(); + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopPublishing).not.toHaveBeenCalled(); expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[1].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopTracks).toHaveBeenCalled(); + + defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); + }); + + it("only start tracks if requested", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(aTransport); + + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn(), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + tracks$.next([{}, {}] as LocalTrack[]); + return Promise.resolve(); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + localMembership.startTracks(); + await flushPromises(); + expect(localMembership.tracks$.value.length).toBe(2); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); + }); + // TODO add an integration test combining publisher and localMembership + // + it("tracks livekit state correctly", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(null); + const connectionManagerData$ = new BehaviorSubject< + Epoch + >(new Epoch(new ConnectionManagerData())); + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + const createTrackResolver = Promise.withResolvers(); + const publishResolver = Promise.withResolvers(); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn().mockImplementation(() => { + logger.info("stopTracks"); + tracks$.next([]); + }), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + await createTrackResolver.promise; + tracks$.next([{}, {}] as LocalTrack[]); + }), + startPublishing: vi.fn().mockImplementation(async () => { + await publishResolver.promise; + publishing$.next(true); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$, + }, + localTransport$, + }); + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingForTransport, + }); + localTransport$.next(aTransport); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingForConnection, + }); + connectionManagerData$.next(new Epoch(connectionManagerData)); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Initialized, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + + // ------- + localMembership.startTracks(); + // ------- + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.CreatingTracks, + }); + createTrackResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.ReadyToPublish, + }); + + // ------- + localMembership.requestConnect(); + // ------- + + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.WaitingToPublish, + }); + + publishResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Connected, + }); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); + + expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + scope.end(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.isStopped).toBe(true); + // stays in connected state because it is stopped before the update to tracks update the state. + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: LivekitState.Connected, + }); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); }); }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index cfc715e0..706aeaca 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -22,10 +22,12 @@ import { catchError, combineLatest, distinctUntilChanged, + from, map, type Observable, of, scan, + startWith, switchMap, tap, } from "rxjs"; @@ -54,13 +56,13 @@ export enum LivekitState { Error = "error", /** Not even a transport is available to the LocalMembership */ WaitingForTransport = "waiting_for_transport", - /** A transport is and we are loading the connection based on the transport */ - Connecting = "connecting", - InitialisingPublisher = "uninitialized", + /** A connection appeared so we can initialise the publisher */ + WaitingForConnection = "waiting_for_connection", + /** Connection and transport arrived, publisher Initialized */ Initialized = "Initialized", CreatingTracks = "creating_tracks", ReadyToPublish = "ready_to_publish", - WaitingToPublish = "publishing", + WaitingToPublish = "waiting_to_publish", Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", @@ -69,8 +71,7 @@ export enum LivekitState { type LocalMemberLivekitState = | { state: LivekitState.Error; error: ElementCallError } | { state: LivekitState.WaitingForTransport } - | { state: LivekitState.Connecting } - | { state: LivekitState.InitialisingPublisher } + | { state: LivekitState.WaitingForConnection } | { state: LivekitState.Initialized } | { state: LivekitState.CreatingTracks } | { state: LivekitState.ReadyToPublish } @@ -163,12 +164,10 @@ export const createLocalMembership$ = ({ * Callback to toggle screen sharing. If null, screen sharing is not possible. */ toggleScreenSharing: (() => void) | null; + tracks$: Behavior; participant$: Behavior; connection$: Behavior; homeserverConnected$: Behavior; - // deprecated fields - /** @deprecated use state instead*/ - connected$: Behavior; // this needs to be discussed /** @deprecated use state instead*/ reconnecting$: Behavior; @@ -217,20 +216,19 @@ export const createLocalMembership$ = ({ ), ); + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + // /** // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - // // TODO use this in combination with the MemberState. const connected$ = scope.behavior( and$( homeserverConnected$, - localConnection$.pipe( - switchMap((c) => - c - ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) - : of(false), - ), + localConnectionState$.pipe( + map((state) => (state ? state.state === "ConnectedToLkRoom" : false)), ), ), ); @@ -259,7 +257,7 @@ export const createLocalMembership$ = ({ // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const trackStartRequested$ = new BehaviorSubject(false); + const trackStartRequested = Promise.withResolvers(); // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. @@ -273,19 +271,21 @@ export const createLocalMembership$ = ({ * Extract the tracks from the published. Also reacts to changing publishers. */ const tracks$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))), + publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), ); const publishing$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))), + publisher$.pipe( + switchMap((p) => (p?.publishing$ ? p.publishing$ : constant(false))), + ), ); const startTracks = (): Behavior => { - trackStartRequested$.next(true); + trackStartRequested.resolve(); return tracks$; }; const requestConnect = (): void => { - trackStartRequested$.next(true); + trackStartRequested.resolve(); connectRequested$.next(true); }; @@ -310,37 +310,18 @@ export const createLocalMembership$ = ({ }); }); - // const mutestate= publisher$.pipe(switchMap((publisher) => { - // return publisher.muteState$ - // }); - - // For each publisher create the descired tracks - // If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks - // THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher - // track start request can than just toggle the tracks. - // TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks? - combineLatest([publisher$, trackStartRequested$]).subscribe( - ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { - publisher.createAndSetupTracks().catch( - // TODO make this set some error state - (e) => logger.error(e), - ); - } else if (publisher) { - publisher.stopTracks(); - } - }, - ); - // Use reconcile here to not run concurrent createAndSetupTracks calls // `tracks$` will update once they are ready. scope.reconcile( - scope.behavior(combineLatest([publisher$, trackStartRequested$])), - async ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { + scope.behavior( + combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]), + null, + ), + async (valueIfReady) => { + if (!valueIfReady) return; + const [publisher, tracks] = valueIfReady; + if (publisher && tracks.length === 0) { await publisher.createAndSetupTracks().catch((e) => logger.error(e)); - } else if (publisher) { - publisher.stopTracks(); } }, ); @@ -349,8 +330,7 @@ export const createLocalMembership$ = ({ scope.reconcile( scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) - return Promise.resolve(); + if (shouldConnect === publisher?.publishing$.value) return; if (tracks.length !== 0 && shouldConnect) { try { await publisher?.startPublishing(); @@ -374,46 +354,53 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Observable = combineLatest([ - publisher$, - localTransport$, - localConnection$, - tracks$, - publishing$, - connectRequested$, - trackStartRequested$, - fatalLivekitError$, - ]).pipe( - map( - ([ - publisher, - localTransport, - localConnection, - tracks, - publishing, - shouldConnect, - shouldStartTracks, - error, - ]) => { - // read this: - // if(!) return {state: ...} - // if(!) return {state: } - // - // as: - // We do have but not yet so we are in - if (error !== null) return { state: LivekitState.Error, error }; - const hasTracks = tracks.length > 0; - if (!localTransport) return { state: LivekitState.WaitingForTransport }; - if (!localConnection) return { state: LivekitState.Connecting }; - if (!publisher) return { state: LivekitState.InitialisingPublisher }; - if (!shouldStartTracks) return { state: LivekitState.Initialized }; - if (!hasTracks) return { state: LivekitState.CreatingTracks }; - if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; - if (!publishing) return { state: LivekitState.WaitingToPublish }; - return { state: LivekitState.Connected }; - }, + const livekitState$: Behavior = scope.behavior( + combineLatest([ + publisher$, + localTransport$, + tracks$.pipe( + tap((t) => { + logger.info("tracks$: ", t); + }), + ), + publishing$, + connectRequested$, + from(trackStartRequested.promise).pipe( + map(() => true), + startWith(false), + ), + fatalLivekitError$, + ]).pipe( + map( + ([ + publisher, + localTransport, + tracks, + publishing, + shouldConnect, + shouldStartTracks, + error, + ]) => { + // read this: + // if(!) return {state: ...} + // if(!) return {state: } + // + // as: + // We do have but not yet so we are in + if (error !== null) return { state: LivekitState.Error, error }; + const hasTracks = tracks.length > 0; + if (!localTransport) + return { state: LivekitState.WaitingForTransport }; + if (!publisher) return { state: LivekitState.WaitingForConnection }; + if (!shouldStartTracks) return { state: LivekitState.Initialized }; + if (!hasTracks) return { state: LivekitState.CreatingTracks }; + if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; + if (!publishing) return { state: LivekitState.WaitingToPublish }; + return { state: LivekitState.Connected }; + }, + ), + distinctUntilChanged(deepCompare), ), - distinctUntilChanged(deepCompare), ); const fatalMatrixError$ = new BehaviorSubject(null); @@ -577,15 +564,15 @@ export const createLocalMembership$ = ({ requestConnect, requestDisconnect, connectionState: { - livekit$: scope.behavior(livekitState$), + livekit$: livekitState$, matrix$: matrixState$, }, + tracks$, + participant$, homeserverConnected$, - connected$, reconnecting$, sharingScreen$, toggleScreenSharing, - participant$, connection$: localConnection$, }; }; diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index df6addb8..14f44491 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -15,6 +15,7 @@ import { } from "livekit-client"; import { BehaviorSubject, + combineLatest, map, NEVER, type Observable, @@ -80,6 +81,23 @@ export class Publisher { ); void this.stopPublishing(); }); + + // TODO move mute state handling here using reconcile (instead of inside the mute state class) + // this.scope.reconcile( + // this.scope.behavior( + // combineLatest([this.muteStates.video.enabled$, this.tracks$]), + // ), + // async ([videoEnabled, tracks]) => { + // const track = tracks.find((t) => t.kind == Track.Kind.Video); + // if (!track) return; + + // if (videoEnabled) { + // await track.unmute(); + // } else { + // await track.mute(); + // } + // }, + // ); } private _tracks$ = new BehaviorSubject[]>([]); diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 6d867414..812cfcd7 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -80,8 +80,11 @@ export class ObservableScope { error(err: unknown) { subject$.error(err); }, + complete() { + subject$.complete(); + }, }); - if (subject$.value === nothing) + if (subject$.value === nothing && !subject$.isStopped) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } @@ -125,11 +128,11 @@ export class ObservableScope { let latestValue: T | typeof nothing = nothing; let reconcilePromise: Promise | undefined = undefined; let cleanUp: (() => Promise) | void = undefined; + let prevVal: T | typeof nothing = nothing; // While this loop runs it will process the latest from `value$` until it caught up with the updates. // It might skip updates from `value$` and only process the newest value after callback has resolved. const reconcileLoop = async (): Promise => { - let prevVal: T | typeof nothing = nothing; while (latestValue !== prevVal) { await cleanUp?.(); // Call the previous value's clean-up handler prevVal = latestValue;