From f8310b46112c1207fb05a36c987014dd630c9119 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 10:31:08 +0100 Subject: [PATCH 1/7] publisher: only use highlevel participant APIs --- .../localMember/LocalMembership.test.ts | 37 +- .../localMember/LocalMembership.ts | 115 +++-- .../localMember/Publisher.test.ts | 426 +++++++++++++----- .../CallViewModel/localMember/Publisher.ts | 324 +++++++------ src/utils/test.ts | 2 + 5 files changed, 608 insertions(+), 296 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index cff5c06d..247e8ed1 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -271,6 +271,7 @@ describe("LocalMembership", () => { state: "ConnectedToLkRoom", }), transport: bTransport, + livekitRoom: mockLivekitRoom({}), } as unknown as Connection, [], ); @@ -281,13 +282,17 @@ describe("LocalMembership", () => { const localTransport$ = new BehaviorSubject(aTransport); const publishers: Publisher[] = []; - + let seed = 0; defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( () => { + const a = seed; + seed += 1; + logger.info(`creating [${a}]`); const p = { - stopPublishing: vi.fn(), + stopPublishing: vi.fn().mockImplementation(() => { + logger.info(`stopPublishing [${a}]`); + }), stopTracks: vi.fn(), - publishing$: constant(false), }; publishers.push(p as unknown as Publisher); return p; @@ -322,7 +327,7 @@ describe("LocalMembership", () => { await flushPromises(); // stop all tracks after ending scopes expect(publishers[1].stopPublishing).toHaveBeenCalled(); - expect(publishers[1].stopTracks).toHaveBeenCalled(); + // expect(publishers[1].stopTracks).toHaveBeenCalled(); defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); }); @@ -367,15 +372,17 @@ describe("LocalMembership", () => { }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledOnce(); - expect(localMembership.tracks$.value.length).toBe(0); + // expect(localMembership.tracks$.value.length).toBe(0); + expect(publishers[0].createAndSetupTracks).not.toHaveBeenCalled(); localMembership.startTracks(); await flushPromises(); - expect(localMembership.tracks$.value.length).toBe(2); + expect(publishers[0].createAndSetupTracks).toHaveBeenCalled(); + // expect(localMembership.tracks$.value.length).toBe(2); scope.end(); await flushPromises(); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); - expect(publishers[0].stopTracks).toHaveBeenCalled(); + // expect(publishers[0].stopTracks).toHaveBeenCalled(); publisherFactory.mockClear(); }); // TODO add an integration test combining publisher and localMembership @@ -446,16 +453,16 @@ describe("LocalMembership", () => { state: RTCBackendState.Initialized, }); expect(publisherFactory).toHaveBeenCalledOnce(); - expect(localMembership.tracks$.value.length).toBe(0); + // expect(localMembership.tracks$.value.length).toBe(0); // ------- localMembership.startTracks(); // ------- await flushPromises(); - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.CreatingTracks, - }); + // expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + // state: RTCBackendState.CreatingTracks, + // }); createTrackResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ @@ -466,9 +473,9 @@ describe("LocalMembership", () => { localMembership.requestConnect(); // ------- - expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: RTCBackendState.WaitingToPublish, - }); + // expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + // state: RTCBackendState.WaitingToPublish, + // }); publishResolver.resolve(); await flushPromises(); @@ -486,7 +493,7 @@ describe("LocalMembership", () => { }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); - expect(publishers[0].stopTracks).toHaveBeenCalled(); + // expect(publishers[0].stopTracks).toHaveBeenCalled(); }); // TODO add tests for matrix local matrix participation. }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 60ae79b8..de36a098 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -6,12 +6,13 @@ Please see LICENSE in the repository root for full details. */ import { - type LocalTrack, type Participant, ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, ConnectionState, + RoomEvent, + MediaDeviceFailure, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -24,6 +25,7 @@ import { combineLatest, distinctUntilChanged, from, + fromEvent, map, type Observable, of, @@ -61,9 +63,9 @@ export enum RTCBackendState { WaitingForConnection = "waiting_for_connection", /** Connection and transport arrived, publisher Initialized */ Initialized = "Initialized", - CreatingTracks = "creating_tracks", + // CreatingTracks = "creating_tracks", ReadyToPublish = "ready_to_publish", - WaitingToPublish = "waiting_to_publish", + // WaitingToPublish = "waiting_to_publish", Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", @@ -74,9 +76,9 @@ type LocalMemberRtcBackendState = | { state: RTCBackendState.WaitingForTransport } | { state: RTCBackendState.WaitingForConnection } | { state: RTCBackendState.Initialized } - | { state: RTCBackendState.CreatingTracks } + // | { state: RTCBackendState.CreatingTracks } | { state: RTCBackendState.ReadyToPublish } - | { state: RTCBackendState.WaitingToPublish } + // | { state: RTCBackendState.WaitingToPublish } | { state: RTCBackendState.Connected } | { state: RTCBackendState.Disconnected } | { state: RTCBackendState.Disconnecting }; @@ -159,7 +161,7 @@ export const createLocalMembership$ = ({ /** * This starts audio and video tracks. They will be reused when calling `requestConnect`. */ - startTracks: () => Behavior; + startTracks: () => Behavior; /** * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. @@ -172,7 +174,7 @@ export const createLocalMembership$ = ({ * Callback to toggle screen sharing. If null, screen sharing is not possible. */ toggleScreenSharing: (() => void) | null; - tracks$: Behavior; + // tracks$: Behavior; participant$: Behavior; connection$: Behavior; homeserverConnected$: Behavior; @@ -224,6 +226,33 @@ export const createLocalMembership$ = ({ ), ); + // Tracks error that happen when creating the local tracks. + const mediaErrors$ = localConnection$.pipe( + switchMap((connection) => { + if (!connection) { + return of(null); + } else { + return fromEvent( + connection.livekitRoom, + RoomEvent.MediaDevicesError, + (error: Error) => { + return MediaDeviceFailure.getFailure(error) ?? null; + }, + ); + } + }), + ); + + mediaErrors$.pipe(scope.bind()).subscribe((error) => { + if (error) { + logger.error(`Failed to create local tracks:`, error); + // TODO is it fatal? Do we need to create a new Specialized Error? + setMatrixError( + new UnknownCallError(new Error(`Media device error: ${error}`)), + ); + } + }); + const localConnectionState$ = localConnection$.pipe( switchMap((connection) => (connection ? connection.state$ : of(null))), ); @@ -293,16 +322,16 @@ export const createLocalMembership$ = ({ /** * Extract the tracks from the published. Also reacts to changing publishers. */ - const tracks$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), - ); - const publishing$ = scope.behavior( - publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), - ); + // const tracks$ = scope.behavior( + // publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), + // ); + // const publishing$ = scope.behavior( + // publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), + // ); - const startTracks = (): Behavior => { + const startTracks = (): Behavior => { trackStartRequested.resolve(); - return tracks$; + return constant(undefined); }; const requestConnect = (): void => { @@ -327,7 +356,7 @@ export const createLocalMembership$ = ({ } return Promise.resolve(async (): Promise => { await publisher$?.value?.stopPublishing(); - publisher$?.value?.stopTracks(); + await publisher$?.value?.stopTracks(); }); }); @@ -335,13 +364,16 @@ export const createLocalMembership$ = ({ // `tracks$` will update once they are ready. scope.reconcile( scope.behavior( - combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]), + combineLatest([ + publisher$ /*, tracks$*/, + from(trackStartRequested.promise), + ]), null, ), async (valueIfReady) => { if (!valueIfReady) return; - const [publisher, tracks] = valueIfReady; - if (publisher && tracks.length === 0) { + const [publisher] = valueIfReady; + if (publisher) { await publisher.createAndSetupTracks().catch((e) => logger.error(e)); } }, @@ -349,22 +381,23 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), - async ([publisher, tracks, shouldConnect]) => { - if (shouldConnect === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldConnect) { + scope.behavior(combineLatest([publisher$, connectRequested$])), + async ([publisher, shouldConnect]) => { + if (shouldConnect) { try { await publisher?.startPublishing(); } catch (error) { setLivekitError(error as ElementCallError); } - } else if (tracks.length !== 0 && !shouldConnect) { - try { - await publisher?.stopPublishing(); - } catch (error) { - setLivekitError(new UnknownCallError(error as Error)); - } } + // XXX Why is that? + // else { + // try { + // await publisher?.stopPublishing(); + // } catch (error) { + // setLivekitError(new UnknownCallError(error as Error)); + // } + // } }, ); @@ -378,12 +411,12 @@ export const createLocalMembership$ = ({ combineLatest([ publisher$, localTransport$, - tracks$.pipe( - tap((t) => { - logger.info("tracks$: ", t); - }), - ), - publishing$, + // tracks$.pipe( + // tap((t) => { + // logger.info("tracks$: ", t); + // }), + // ), + // publishing$, connectRequested$, from(trackStartRequested.promise).pipe( map(() => true), @@ -395,8 +428,8 @@ export const createLocalMembership$ = ({ ([ publisher, localTransport, - tracks, - publishing, + // tracks, + // publishing, shouldConnect, shouldStartTracks, error, @@ -408,15 +441,15 @@ export const createLocalMembership$ = ({ // as: // We do have but not yet so we are in if (error !== null) return { state: RTCBackendState.Error, error }; - const hasTracks = tracks.length > 0; + // const hasTracks = tracks.length > 0; if (!localTransport) return { state: RTCBackendState.WaitingForTransport }; if (!publisher) return { state: RTCBackendState.WaitingForConnection }; if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; - if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; + // if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; - if (!publishing) return { state: RTCBackendState.WaitingToPublish }; + // if (!publishing) return { state: RTCBackendState.WaitingToPublish }; return { state: RTCBackendState.Connected }; }, ), @@ -588,7 +621,7 @@ export const createLocalMembership$ = ({ livekit$: livekitState$, matrix$: matrixState$, }, - tracks$, + // tracks$, participant$, homeserverConnected$, reconnecting$, diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 9b3e5b2a..3ab3d48c 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -5,66 +5,320 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ +import { afterEach, beforeEach, describe, expect, it, test, vi } from "vitest"; import { - afterEach, - beforeEach, - describe, - expect, - it, - type Mock, - vi, -} from "vitest"; -import { ConnectionState as LivekitConenctionState } from "livekit-client"; -import { type BehaviorSubject } from "rxjs"; + ConnectionState as LivekitConnectionState, + LocalParticipant, + type LocalTrack, + type LocalTrackPublication, + ParticipantEvent, + Track, +} from "livekit-client"; +import { BehaviorSubject } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { + flushPromises, mockLivekitRoom, - mockLocalParticipant, - mockMediaDevices, + mockMediaDevices } from "../../../utils/test"; import { Publisher } from "./Publisher"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection"; +import { type Connection } from "../remoteMembers/Connection"; import { type MuteStates } from "../../MuteStates"; -import { FailToStartLivekitConnection } from "../../../utils/errors"; -describe("Publisher", () => { - let scope: ObservableScope; - let connection: Connection; - let muteStates: MuteStates; - beforeEach(() => { - muteStates = { - audio: { - enabled$: constant(false), - unsetHandler: vi.fn(), - setHandler: vi.fn(), - }, - video: { - enabled$: constant(false), - unsetHandler: vi.fn(), - setHandler: vi.fn(), - }, - } as unknown as MuteStates; - scope = new ObservableScope(); - connection = { - state$: constant({ - state: "ConnectedToLkRoom", - livekitConnectionState$: constant(LivekitConenctionState.Connected), - }), - livekitRoom: mockLivekitRoom({ - localParticipant: mockLocalParticipant({}), - }), - } as unknown as Connection; +let scope: ObservableScope; + +beforeEach(() => { + scope = new ObservableScope(); +}); + +// afterEach(() => scope.end()); + +function createMockLocalTrack(source: Track.Source): LocalTrack { + const track = { + source, + isMuted: false, + isUpstreamPaused: false, + } as Partial as LocalTrack; + + vi.mocked(track).mute = vi.fn().mockImplementation(() => { + track.isMuted = true; + }); + vi.mocked(track).unmute = vi.fn().mockImplementation(() => { + track.isMuted = false; + }); + vi.mocked(track).pauseUpstream = vi.fn().mockImplementation(() => { + // @ts-expect-error - for that test we want to set isUpstreamPaused directly + track.isUpstreamPaused = true; + }); + vi.mocked(track).resumeUpstream = vi.fn().mockImplementation(() => { + // @ts-expect-error - for that test we want to set isUpstreamPaused directly + track.isUpstreamPaused = false; }); - afterEach(() => scope.end()); + return track; +} - it("throws if livekit room could not publish", async () => { +function createMockMuteState(enabled$: BehaviorSubject): { + enabled$: BehaviorSubject; + setHandler: (h: (enabled: boolean) => void) => void; + unsetHandler: () => void; +} { + let currentHandler = (enabled: boolean): void => {}; + + const ms = { + enabled$, + setHandler: vi.fn().mockImplementation((h: (enabled: boolean) => void) => { + currentHandler = h; + }), + unsetHandler: vi.fn().mockImplementation(() => { + currentHandler = (enabled: boolean): void => {}; + }), + }; + // forward enabled$ emissions to the current handler + enabled$.subscribe((enabled) => { + logger.info(`MockMuteState: enabled changed to ${enabled}`); + currentHandler(enabled); + }); + + return ms; +} + +let connection: Connection; +let muteStates: MuteStates; +let localParticipant: LocalParticipant; +let audioEnabled$: BehaviorSubject; +let videoEnabled$: BehaviorSubject; +let trackPublications: LocalTrackPublication[]; +// use it to control when track creation resolves, default to resolved +let createTrackLock: Promise; + +beforeEach(() => { + trackPublications = []; + audioEnabled$ = new BehaviorSubject(false); + videoEnabled$ = new BehaviorSubject(false); + createTrackLock = Promise.resolve(); + + muteStates = { + audio: createMockMuteState(audioEnabled$), + video: createMockMuteState(videoEnabled$), + } as unknown as MuteStates; + + const mockSendDataPacket = vi.fn(); + const mockEngine = { + client: { + sendUpdateLocalMetadata: vi.fn(), + }, + on: vi.fn().mockReturnThis(), + sendDataPacket: mockSendDataPacket, + }; + + localParticipant = new LocalParticipant( + "local-sid", + "local-identity", + // @ts-expect-error - for that test we want a real LocalParticipant to have the pending publications logic + mockEngine, + { + adaptiveStream: true, + dynacase: false, + audioCaptureDefaults: {}, + videoCaptureDefaults: {}, + stopLocalTrackOnUnpublish: true, + reconnectPolicy: "always", + disconnectOnPageLeave: true, + }, + new Map(), + {}, + ); + + vi.mocked(localParticipant).createTracks = vi + .fn() + .mockImplementation(async (opts) => { + const tracks: LocalTrack[] = []; + if (opts.audio) { + tracks.push(createMockLocalTrack(Track.Source.Microphone)); + } + if (opts.video) { + tracks.push(createMockLocalTrack(Track.Source.Camera)); + } + await createTrackLock; + return tracks; + }); + + vi.mocked(localParticipant).publishTrack = vi + .fn() + .mockImplementation(async (track: LocalTrack) => { + const pub = { + track, + source: track.source, + mute: track.mute, + unmute: track.unmute, + } as Partial as LocalTrackPublication; + trackPublications.push(pub); + localParticipant.emit(ParticipantEvent.LocalTrackPublished, pub); + return Promise.resolve(pub); + }); + + vi.mocked(localParticipant).getTrackPublication = vi + .fn() + .mockImplementation((source: Track.Source) => { + return trackPublications.find((pub) => pub.track?.source === source); + }); + + connection = { + state$: constant({ + state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConnectionState.Connected), + }), + livekitRoom: mockLivekitRoom({ + localParticipant: localParticipant, + }), + } as unknown as Connection; +}); + +describe("Publisher", () => { + let publisher: Publisher; + + beforeEach(() => { + publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + logger, + ); + }); + + afterEach(() => {}); + + it("Should not create tracks if started muted to avoid unneeded permission requests", async () => { + const createTracksSpy = vi.spyOn( + connection.livekitRoom.localParticipant, + "createTracks", + ); + + audioEnabled$.next(false); + videoEnabled$.next(false); + await publisher.createAndSetupTracks(); + + expect(createTracksSpy).not.toHaveBeenCalled(); + }); + + it("Should minimize permission request by querying create at once", async () => { + const enableCameraAndMicrophoneSpy = vi.spyOn( + localParticipant, + "enableCameraAndMicrophone", + ); + const createTracksSpy = vi.spyOn(localParticipant, "createTracks"); + + audioEnabled$.next(true); + videoEnabled$.next(true); + await publisher.createAndSetupTracks(); + await flushPromises(); + + expect(enableCameraAndMicrophoneSpy).toHaveBeenCalled(); + + // It should create both at once + expect(createTracksSpy).toHaveBeenCalledWith({ + audio: true, + video: true, + }); + }); + + it("Ensure no data is streamed until publish has been called", async () => { + audioEnabled$.next(true); + await publisher.createAndSetupTracks(); + + // The track should be created and paused + expect(localParticipant.createTracks).toHaveBeenCalledWith({ + audio: true, + video: undefined, + }); + await flushPromises(); + expect(localParticipant.publishTrack).toHaveBeenCalled(); + + await flushPromises(); + const track = localParticipant.getTrackPublication( + Track.Source.Microphone, + )?.track; + expect(track).toBeDefined(); + expect(track!.pauseUpstream).toHaveBeenCalled(); + expect(track!.isUpstreamPaused).toBe(true); + }); + + it("Ensure resume upstream when published is called", async () => { + videoEnabled$.next(true); + await publisher.createAndSetupTracks(); + // await flushPromises(); + await publisher.startPublishing(); + + const track = localParticipant.getTrackPublication( + Track.Source.Camera, + )?.track; + expect(track).toBeDefined(); + // expect(track.pauseUpstream).toHaveBeenCalled(); + expect(track!.isUpstreamPaused).toBe(false); + }); + + describe("Mute states", () => { + let publisher: Publisher; + beforeEach(() => { + publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + logger, + ); + }); + + test.each([ + { mutes: { audioEnabled: true, videoEnabled: false } }, + { mutes: { audioEnabled: true, videoEnabled: false } }, + ])("only create the tracks that are unmuted $mutes", async ({ mutes }) => { + // Ensure all muted + audioEnabled$.next(mutes.audioEnabled); + videoEnabled$.next(mutes.videoEnabled); + + vi.mocked(connection.livekitRoom.localParticipant).createTracks = vi + .fn() + .mockResolvedValue([]); + + await publisher.createAndSetupTracks(); + + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledOnce(); + + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledWith({ + audio: mutes.audioEnabled ? true : undefined, + video: mutes.videoEnabled ? true : undefined, + }); + }); + }); + + it("does mute unmute audio", async () => {}); +}); + +describe("Bug fix", () => { + // There is a race condition when creating and publishing tracks while the mute state changes. + // This race condition could cause tracks to be published even though they are muted at the + // beginning of a call coming from lobby. + // This is caused by our stack using manually the low level API to create and publish tracks, + // but also using the higher level setMicrophoneEnabled and setCameraEnabled functions that also create + // and publish tracks, and managing pending publications. + // Race is as follow, on creation of the Publisher we create the tracks then publish them. + // If in the middle of that process the mute state changes: + // - the `setMicrophoneEnabled` will be no-op because it is not aware of our created track and can't see any pending publication + // - If start publication is requested it will publish the track even though there was a mute request. + it("wrongly publish tracks while muted", async () => { + // setLogLevel(`debug`); const publisher = new Publisher( scope, connection, @@ -73,68 +327,34 @@ describe("Publisher", () => { constant({ supported: false, processor: undefined }), logger, ); + audioEnabled$.next(true); - // should do nothing if no tracks have been created yet. - await publisher.startPublishing(); - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).not.toHaveBeenCalled(); + const resolvers = Promise.withResolvers(); + createTrackLock = resolvers.promise; - await expect(publisher.createAndSetupTracks()).rejects.toThrow( - Error("audio and video is false"), - ); - - (muteStates.audio.enabled$ as BehaviorSubject).next(true); - - ( - connection.livekitRoom.localParticipant.createTracks as Mock - ).mockResolvedValue([{}, {}]); - - await expect(publisher.createAndSetupTracks()).resolves.not.toThrow(); - expect( - connection.livekitRoom.localParticipant.createTracks, - ).toHaveBeenCalledOnce(); - - // failiour due to localParticipant.publishTrack - ( - connection.livekitRoom.localParticipant.publishTrack as Mock - ).mockRejectedValue(Error("testError")); - - await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testError"), - ); - - // does not try other conenction after the first one failed - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(1); - - // failiour due to connection.state$ - const beforeState = connection.state$.value; - (connection.state$ as BehaviorSubject).next({ - state: "FailedToStart", - error: Error("testStartError"), + // Initially the audio is unmuted, so creating tracks should publish the audio track + const createTracks = publisher.createAndSetupTracks(); + void publisher.startPublishing(); + void createTracks.then(() => { + void publisher.startPublishing(); }); + // now mute the audio before allowing track creation to complete + audioEnabled$.next(false); + resolvers.resolve(undefined); + await createTracks; - await expect(publisher.startPublishing()).rejects.toThrow( - new FailToStartLivekitConnection("testStartError"), - ); - (connection.state$ as BehaviorSubject).next(beforeState); + await flushPromises(); - // does not try other conenction after the first one failed - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(1); + const track = localParticipant.getTrackPublication( + Track.Source.Microphone, + )?.track; + expect(track).toBeDefined(); - // success case - ( - connection.livekitRoom.localParticipant.publishTrack as Mock - ).mockResolvedValue({}); - - await expect(publisher.startPublishing()).resolves.not.toThrow(); - - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(3); + try { + expect(localParticipant.publishTrack).not.toHaveBeenCalled(); + } catch { + expect(track!.mute).toHaveBeenCalled(); + expect(track!.isMuted).toBe(true); + } }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 326dedaf..5da6d899 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -6,15 +6,14 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { + ConnectionState as LivekitConnectionState, + type LocalTrackPublication, LocalVideoTrack, + ParticipantEvent, type Room as LivekitRoom, Track, - type LocalTrack, - type LocalTrackPublication, - ConnectionState as LivekitConnectionState, } from "livekit-client"; import { - BehaviorSubject, map, NEVER, type Observable, @@ -34,10 +33,6 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; -import { - ElementCallError, - FailToStartLivekitConnection, -} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -45,14 +40,21 @@ import { * The Publisher is also responsible for creating the media tracks. */ export class Publisher { + /** + * By default, livekit will start publishing tracks as soon as they are created. + * In the matrix RTC world, we want to control when tracks are published based + * on whether the user is part of the RTC session or not. + */ + public shouldPublish = false; + /** * Creates a new Publisher. * @param scope - The observable scope to use for managing the publisher. * @param connection - The connection to use for publishing. * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. - * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). + * @param logger - The logger to use for logging :D. */ public constructor( private scope: ObservableScope, @@ -62,7 +64,6 @@ export class Publisher { trackerProcessorState$: Behavior, private logger: Logger, ) { - this.logger.info("Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; @@ -80,41 +81,54 @@ export class Publisher { this.scope.onEnd(() => { this.logger.info("Scope ended -> stop publishing all tracks"); void this.stopPublishing(); + muteStates.audio.unsetHandler(); + muteStates.video.unsetHandler(); }); - // TODO move mute state handling here using reconcile (instead of inside the mute state class) - // this.scope.reconcile( - // this.scope.behavior( - // combineLatest([this.muteStates.video.enabled$, this.tracks$]), - // ), - // async ([videoEnabled, tracks]) => { - // const track = tracks.find((t) => t.kind == Track.Kind.Video); - // if (!track) return; - - // if (videoEnabled) { - // await track.unmute(); - // } else { - // await track.mute(); - // } - // }, - // ); + this.connection.livekitRoom.localParticipant.on( + ParticipantEvent.LocalTrackPublished, + this.onLocalTrackPublished.bind(this), + ); } - private _tracks$ = new BehaviorSubject[]>([]); - public tracks$ = this._tracks$ as Behavior[]>; - + // LiveKit will publish the tracks as soon as they are created + // but we want to control when tracks are published. + // We cannot just mute the tracks, even if this will effectively stop the publishing, + // it would also prevent the user from seeing their own video/audio preview. + // So for that we use pauseUpStream(): Stops sending media to the server by replacing + // the sender track with null, but keeps the local MediaStreamTrack active. + // The user can still see/hear themselves locally, but remote participants see nothing + private onLocalTrackPublished( + localTrackPublication: LocalTrackPublication, + ): void { + this.logger.info("Local track published", localTrackPublication); + const lkRoom = this.connection.livekitRoom; + if (!this.shouldPublish) { + this.pauseUpstreams(lkRoom, [localTrackPublication.source]).catch((e) => { + this.logger.error(`Failed to pause upstreams`, e); + }); + } + // also check the mute state and apply it + if (localTrackPublication.source === Track.Source.Microphone) { + const enabled = this.muteStates.audio.enabled$.value; + lkRoom.localParticipant.setMicrophoneEnabled(enabled).catch((e) => { + this.logger.error( + `Failed to enable microphone track, enabled:${enabled}`, + e, + ); + }); + } else if (localTrackPublication.source === Track.Source.Camera) { + const enabled = this.muteStates.video.enabled$.value; + lkRoom.localParticipant.setCameraEnabled(enabled).catch((e) => { + this.logger.error( + `Failed to enable camera track, enabled:${enabled}`, + e, + ); + }); + } + } /** - * Start the connection to LiveKit and publish local tracks. * - * This will: - * wait for the connection to be ready. - // * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) - // * 2. Use this token to request the SFU config to the MatrixRtc authentication service. - // * 3. Connect to the configured LiveKit room. - // * 4. Create local audio and video tracks based on the current mute states and publish them to the room. - * - * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. - * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ public async createAndSetupTracks(): Promise { this.logger.debug("createAndSetupTracks called"); @@ -122,119 +136,141 @@ export class Publisher { // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); - // TODO-MULTI-SFU: Prepublish a microphone track + // Check if audio and/or video is enabled. We only create tracks if enabled, + // because it could prompt for permission, and we don't want to do that unnecessarily. const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; - // createTracks throws if called with audio=false and video=false - if (audio || video) { - // TODO this can still throw errors? It will also prompt for permissions if not already granted - return lkRoom.localParticipant - .createTracks({ - audio, - video, - }) - .then((tracks) => { - this.logger.info( - "created track", - tracks.map((t) => t.kind + ", " + t.id), - ); - this._tracks$.next(tracks); - }) - .catch((error) => { - this.logger.error("Failed to create tracks", error); - }); - } - throw Error("audio and video is false"); + + const enableTracks = async (): Promise => { + if (audio && video) { + // Enable both at once in order to have a single permission prompt! + await lkRoom.localParticipant.enableCameraAndMicrophone(); + } else if (audio) { + await lkRoom.localParticipant.setMicrophoneEnabled(true); + } else if (video) { + await lkRoom.localParticipant.setCameraEnabled(true); + } + return; + }; + + // We cannot just wait because this call could wait for the track to be + // published (and not just created), which we don't want yet. + // Notice it will wait for that only the first time, when tracks are created, the + // later calls will be instant :/ + enableTracks() + .then(() => { + // At this point, LiveKit will have created & published the tracks as soon as possible + // but we want to control when tracks are published. + // We cannot just mute the tracks, even if this will effectively stop the publishing, + // it would also prevent the user from seeing their own video/audio preview. + // So for that we use pauseUpStream(): Stops sending media to the server by replacing + // the sender track with null, but keeps the local MediaStreamTrack active. + // The user can still see/hear themselves locally, but remote participants see nothing + if (!this.shouldPublish) { + this.pauseUpstreams(lkRoom, [ + Track.Source.Microphone, + Track.Source.Camera, + ]).catch((e) => { + this.logger.error(`Failed to pause upstreams`, e); + }); + } + }) + .catch((e: Error) => { + this.logger.error(`Failed to enable camera and microphone`, e); + }); + + return Promise.resolve(); + } + + private async pauseUpstreams( + lkRoom: LivekitRoom, + sources: Track.Source[], + ): Promise { + for (const source of sources) { + const track = lkRoom.localParticipant.getTrackPublication(source)?.track; + if (track) { + await track.pauseUpstream(); + } else { + this.logger.warn( + `No track found for source ${source} to pause upstream`, + ); + } + } + } + + private async resumeUpstreams( + lkRoom: LivekitRoom, + sources: Track.Source[], + ): Promise { + for (const source of sources) { + const track = lkRoom.localParticipant.getTrackPublication(source)?.track; + if (track) { + await track.resumeUpstream(); + } else { + this.logger.warn( + `No track found for source ${source} to resume upstream`, + ); + } + } } - private _publishing$ = new BehaviorSubject(false); - public publishing$ = this.scope.behavior(this._publishing$); /** + * + * Request to publish local tracks to the LiveKit room. + * This will wait for the connection to be ready before publishing. + * Livekit also have some local retry logic for publishing tracks. + * Can be called multiple times, localparticipant manages the state of published tracks (or pending publications). * * @returns - * @throws ElementCallError */ - public async startPublishing(): Promise { + public async startPublishing(): Promise { + if (this.shouldPublish) { + this.logger.debug(`Already publishing, ignoring startPublishing call`); + return; + } + this.shouldPublish = true; this.logger.debug("startPublishing called"); + const lkRoom = this.connection.livekitRoom; - const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((s) => { - switch (s.state) { - case "ConnectedToLkRoom": - resolve(); - break; - case "FailedToStart": - reject( - s.error instanceof ElementCallError - ? s.error - : new FailToStartLivekitConnection(s.error.message), - ); - break; - default: - this.logger.info("waiting for connection: ", s.state); - } - }); + + // Resume upstream for both audio and video tracks + // We need to call it explicitly because call setTrackEnabled does not always + // resume upstream. It will only if you switch the track from disabled to enabled, + // but if the track is already enabled but upstream is paused, it won't resume it. + // TODO what about screen share? try { - await promise; + await this.resumeUpstreams(lkRoom, [ + Track.Source.Microphone, + Track.Source.Camera, + ]); } catch (e) { - throw e; - } finally { - sub.unsubscribe(); + this.logger.error(`Failed to resume upstreams`, e); } - - for (const track of this.tracks$.value) { - this.logger.info("publish ", this.tracks$.value.length, "tracks"); - // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally - // with a timeout. - await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger.error("Failed to publish track", error); - throw new FailToStartLivekitConnection( - error instanceof Error ? error.message : error, - ); - }); - this.logger.info("published track ", track.kind, track.id); - - // TODO: check if the connection is still active? and break the loop if not? - } - this._publishing$.next(true); - return this.tracks$.value; } public async stopPublishing(): Promise { this.logger.debug("stopPublishing called"); - // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope - // actually has the right lifetime - this.muteStates.audio.unsetHandler(); - this.muteStates.video.unsetHandler(); - - const localParticipant = this.connection.livekitRoom.localParticipant; - const tracks: LocalTrack[] = []; - const addToTracksIfDefined = (p: LocalTrackPublication): void => { - if (p.track !== undefined) tracks.push(p.track); - }; - localParticipant.trackPublications.forEach(addToTracksIfDefined); - this.logger.debug( - "list of tracks to unpublish:", - tracks.map((t) => t.kind + ", " + t.id), - "start unpublishing now", - ); - await localParticipant.unpublishTracks(tracks).catch((error) => { - this.logger.error("Failed to unpublish tracks", error); - throw error; - }); - this.logger.debug( - "unpublished tracks", - tracks.map((t) => t.kind + ", " + t.id), - ); - this._publishing$.next(false); + this.shouldPublish = false; + await this.pauseUpstreams(this.connection.livekitRoom, [ + Track.Source.Microphone, + Track.Source.Camera, + Track.Source.ScreenShare, + ]); } - /** - * Stops all tracks that are currently running - */ - public stopTracks(): void { - this.tracks$.value.forEach((t) => t.stop()); - this._tracks$.next([]); + public async stopTracks(): Promise { + const lkRoom = this.connection.livekitRoom; + for (const source of [ + Track.Source.Microphone, + Track.Source.Camera, + Track.Source.ScreenShare, + ]) { + const localPub = lkRoom.localParticipant.getTrackPublication(source); + if (localPub?.track) { + // stops and unpublishes the track + await lkRoom.localParticipant.unpublishTrack(localPub!.track, true); + } + } } /// Private methods @@ -336,17 +372,31 @@ export class Publisher { */ private observeMuteStates(scope: ObservableScope): void { const lkRoom = this.connection.livekitRoom; - this.muteStates.audio.setHandler(async (desired) => { + this.muteStates.audio.setHandler(async (enable) => { try { - await lkRoom.localParticipant.setMicrophoneEnabled(desired); + this.logger.debug( + `handler: Setting LiveKit microphone enabled: ${enable}`, + ); + await lkRoom.localParticipant.setMicrophoneEnabled(enable); + // Unmute will restart the track if it was paused upstream, + // but until explicitly requested, we want to keep it paused. + if (!this.shouldPublish && enable) { + await this.pauseUpstreams(lkRoom, [Track.Source.Microphone]); + } } catch (e) { this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); - this.muteStates.video.setHandler(async (desired) => { + this.muteStates.video.setHandler(async (enable) => { try { - await lkRoom.localParticipant.setCameraEnabled(desired); + this.logger.debug(`handler: Setting LiveKit camera enabled: ${enable}`); + await lkRoom.localParticipant.setCameraEnabled(enable); + // Unmute will restart the track if it was paused upstream, + // but until explicitly requested, we want to keep it paused. + if (!this.shouldPublish && enable) { + await this.pauseUpstreams(lkRoom, [Track.Source.Camera]); + } } catch (e) { this.logger.error("Failed to update LiveKit video input mute state", e); } diff --git a/src/utils/test.ts b/src/utils/test.ts index bd7dcd6f..24b9bef0 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -311,6 +311,8 @@ export function mockLocalParticipant( publishTrack: vi.fn(), unpublishTracks: vi.fn().mockResolvedValue([]), createTracks: vi.fn(), + setMicrophoneEnabled: vi.fn(), + setCameraEnabled: vi.fn(), getTrackPublication: () => ({}) as Partial as LocalTrackPublication, ...mockEmitter(), From 610af439a8fbf371cc310af84fe1e96775b1a86e Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 10:37:37 +0100 Subject: [PATCH 2/7] cleaning: just use `LocalTrackPublished` event to pause/unpause --- .../CallViewModel/localMember/Publisher.ts | 34 +++++-------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 5da6d899..d4ad656c 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -152,32 +152,14 @@ export class Publisher { } return; }; - - // We cannot just wait because this call could wait for the track to be - // published (and not just created), which we don't want yet. - // Notice it will wait for that only the first time, when tracks are created, the - // later calls will be instant :/ - enableTracks() - .then(() => { - // At this point, LiveKit will have created & published the tracks as soon as possible - // but we want to control when tracks are published. - // We cannot just mute the tracks, even if this will effectively stop the publishing, - // it would also prevent the user from seeing their own video/audio preview. - // So for that we use pauseUpStream(): Stops sending media to the server by replacing - // the sender track with null, but keeps the local MediaStreamTrack active. - // The user can still see/hear themselves locally, but remote participants see nothing - if (!this.shouldPublish) { - this.pauseUpstreams(lkRoom, [ - Track.Source.Microphone, - Track.Source.Camera, - ]).catch((e) => { - this.logger.error(`Failed to pause upstreams`, e); - }); - } - }) - .catch((e: Error) => { - this.logger.error(`Failed to enable camera and microphone`, e); - }); + // We don't await enableTracks, because livekit could block until the tracks + // are fully published, and not only that they are created. + // We don't have control on that, localParticipant creates and publishes the tracks + // asap. + // We are using the `ParticipantEvent.LocalTrackPublished` to be notified + // when tracks are actually published, and at that point + // we can pause upstream if needed (depending on if startPublishing has been called). + void enableTracks(); return Promise.resolve(); } From b3b76d8b3d26e367c325233bf4c86745b41b7763 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 11:54:43 +0100 Subject: [PATCH 3/7] post merge --- .../CallViewModel/localMember/LocalMember.ts | 35 +++++++------------ 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 5898a3da..241cb633 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -10,7 +10,6 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, - ConnectionState, RoomEvent, MediaDeviceFailure, } from "livekit-client"; @@ -329,11 +328,10 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( scope.behavior( - combineLatest([publisher$, tracks$, joinAndPublishRequested$]), + combineLatest([publisher$, joinAndPublishRequested$]), ), - async ([publisher, tracks, shouldJoinAndPublish]) => { - if (shouldJoinAndPublish === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldJoinAndPublish) { + async ([publisher, shouldJoinAndPublish]) => { + if (shouldJoinAndPublish) { try { await publisher?.startPublishing(); } catch (error) { @@ -341,7 +339,7 @@ export const createLocalMembership$ = ({ error instanceof Error ? error.message : String(error); setPublishError(new FailToStartLivekitConnection(message)); } - } else if (tracks.length !== 0 && !shouldJoinAndPublish) { + } else { try { await publisher?.stopPublishing(); } catch (error) { @@ -391,15 +389,6 @@ export const createLocalMembership$ = ({ combineLatest([ localConnectionState$, localTransport$, - // tracks$.pipe( - // tap((t) => { - // logger.info("tracks$: ", t); - // }), - // ), - // publishing$, - connectRequested$, - tracks$, - publishing$, joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), @@ -410,16 +399,17 @@ export const createLocalMembership$ = ({ ([ localConnectionState, localTransport, - tracks, - publishing, + // tracks, + // publishing, shouldPublish, shouldStartTracks, ]) => { if (!localTransport) return null; - const hasTracks = tracks.length > 0; - let trackState: TrackState = TrackState.WaitingForUser; - if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; - if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + // const hasTracks = tracks.length > 0; + // let trackState: TrackState = TrackState.WaitingForUser; + // if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; + // if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + const trackState: TrackState = shouldStartTracks ? TrackState.Ready : TrackState.WaitingForUser; if ( localConnectionState !== ConnectionState.LivekitConnected || @@ -430,7 +420,7 @@ export const createLocalMembership$ = ({ tracks: trackState, }; if (!shouldPublish) return PublishState.WaitingForUser; - if (!publishing) return PublishState.Starting; + // if (!publishing) return PublishState.Starting; return PublishState.Publishing; }, ), @@ -660,7 +650,6 @@ export const createLocalMembership$ = ({ requestJoinAndPublish, requestDisconnect, localMemberState$, - tracks$, participant$, reconnecting$, disconnected$: scope.behavior( From 93da69983dfaa6aa7850429d7aa09bbe4af2224b Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 14:40:45 +0100 Subject: [PATCH 4/7] post merge: partial mapping of tracks/publish states --- .../localMember/LocalMember.test.ts | 18 ++++++++++-------- .../CallViewModel/localMember/LocalMember.ts | 18 +++++++++++------- .../localMember/Publisher.test.ts | 2 +- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index 892bbb3d..0d77611b 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -254,10 +254,12 @@ describe("LocalMembership", () => { const connectionTransportAConnecting = { ...connectionTransportAConnected, state$: constant(ConnectionState.LivekitConnecting), + livekitRoom: mockLivekitRoom({}), } as unknown as Connection; const connectionTransportBConnected = { state$: constant(ConnectionState.LivekitConnected), transport: bTransport, + livekitRoom: mockLivekitRoom({}), } as unknown as Connection; it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { @@ -477,13 +479,13 @@ describe("LocalMembership", () => { // ------- await flushPromises(); - expect(localMembership.localMemberState$.value).toStrictEqual({ - matrix: RTCMemberStatus.Connected, - media: { - tracks: TrackState.Creating, - connection: ConnectionState.LivekitConnected, - }, - }); + // expect(localMembership.localMemberState$.value).toStrictEqual({ + // matrix: RTCMemberStatus.Connected, + // media: { + // tracks: TrackState.Creating, + // connection: ConnectionState.LivekitConnected, + // }, + // }); createTrackResolver.resolve(); await flushPromises(); expect( @@ -498,7 +500,7 @@ describe("LocalMembership", () => { expect( // eslint-disable-next-line @typescript-eslint/no-explicit-any (localMembership.localMemberState$.value as any).media, - ).toStrictEqual(PublishState.Starting); + ).toStrictEqual(PublishState.Publishing); publishResolver.resolve(); await flushPromises(); diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 241cb633..21386fcd 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -74,6 +74,8 @@ export enum PublishState { Publishing = "publish_publishing", } +// TODO not sure how to map that correctly with the +// new publisher that does not manage tracks itself anymore export enum TrackState { /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ WaitingForUser = "tracks_waiting_for_user", @@ -244,7 +246,7 @@ export const createLocalMembership$ = ({ if (error) { logger.error(`Failed to create local tracks:`, error); setMatrixError( - // TODO is it fatal? Do we need to create a new Specialized Error? + // TODO is it fatal? Do we need to create a new Specialized Error? new UnknownCallError(new Error(`Media device error: ${error}`)), ); } @@ -327,11 +329,11 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior( - combineLatest([publisher$, joinAndPublishRequested$]), - ), + scope.behavior(combineLatest([publisher$, joinAndPublishRequested$])), async ([publisher, shouldJoinAndPublish]) => { - if (shouldJoinAndPublish) { + // Get the current publishing state to avoid redundant calls. + const isPublishing = publisher?.shouldPublish === true; + if (shouldJoinAndPublish && !isPublishing) { try { await publisher?.startPublishing(); } catch (error) { @@ -339,7 +341,7 @@ export const createLocalMembership$ = ({ error instanceof Error ? error.message : String(error); setPublishError(new FailToStartLivekitConnection(message)); } - } else { + } else if (isPublishing) { try { await publisher?.stopPublishing(); } catch (error) { @@ -409,7 +411,9 @@ export const createLocalMembership$ = ({ // let trackState: TrackState = TrackState.WaitingForUser; // if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; // if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; - const trackState: TrackState = shouldStartTracks ? TrackState.Ready : TrackState.WaitingForUser; + const trackState: TrackState = shouldStartTracks + ? TrackState.Ready + : TrackState.WaitingForUser; if ( localConnectionState !== ConnectionState.LivekitConnected || diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 3ab3d48c..3cc96bc2 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -22,7 +22,7 @@ import { constant } from "../../Behavior"; import { flushPromises, mockLivekitRoom, - mockMediaDevices + mockMediaDevices, } from "../../../utils/test"; import { Publisher } from "./Publisher"; import { type Connection } from "../remoteMembers/Connection"; From 8f2055b4f473313cf5ab8e2f16952dc256128633 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 14:46:13 +0100 Subject: [PATCH 5/7] eslint fix --- src/state/CallViewModel/localMember/LocalMember.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 21386fcd..b75bf522 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -303,7 +303,7 @@ export const createLocalMembership$ = ({ // Clean-up callback return Promise.resolve(async (): Promise => { await publisher.stopPublishing(); - publisher.stopTracks(); + await publisher.stopTracks(); }); } }); From 190cdfcb6030edbe2b2cc08ae8765908a380eeb5 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 12 Dec 2025 17:03:16 +0100 Subject: [PATCH 6/7] comment now dead state variant --- src/state/CallViewModel/localMember/LocalMember.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index b75bf522..daadbe7c 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -68,8 +68,8 @@ export enum TransportState { export enum PublishState { WaitingForUser = "publish_waiting_for_user", - /** Implies lk connection is connected */ - Starting = "publish_start_publishing", + // /** Implies lk connection is connected */ + // Starting = "publish_start_publishing", /** Implies lk connection is connected */ Publishing = "publish_publishing", } @@ -79,8 +79,8 @@ export enum PublishState { export enum TrackState { /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ WaitingForUser = "tracks_waiting_for_user", - /** Implies lk connection is connected */ - Creating = "tracks_creating", + // /** Implies lk connection is connected */ + // Creating = "tracks_creating", /** Implies lk connection is connected */ Ready = "tracks_ready", } From 80e760ca55d7e454f73d6fb36f33d3f036b36d67 Mon Sep 17 00:00:00 2001 From: Valere Date: Tue, 16 Dec 2025 13:40:06 +0100 Subject: [PATCH 7/7] review --- .../CallViewModel/localMember/LocalMember.ts | 38 +++++-------------- .../localMember/Publisher.test.ts | 2 +- .../CallViewModel/localMember/Publisher.ts | 35 ++++++++++------- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index daadbe7c..9ef94fe4 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -37,7 +37,7 @@ import { import { type Logger } from "matrix-js-sdk/lib/logger"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { constant, type Behavior } from "../../Behavior.ts"; +import { type Behavior } from "../../Behavior.ts"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { type Publisher } from "./Publisher.ts"; @@ -68,6 +68,8 @@ export enum TransportState { export enum PublishState { WaitingForUser = "publish_waiting_for_user", + // XXX: This state is removed for now since we do not have full control over + // track publication anymore with the publisher abstraction, might come back in the future? // /** Implies lk connection is connected */ // Starting = "publish_start_publishing", /** Implies lk connection is connected */ @@ -79,6 +81,8 @@ export enum PublishState { export enum TrackState { /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ WaitingForUser = "tracks_waiting_for_user", + // XXX: This state is removed for now since we do not have full control over + // track creation anymore with the publisher abstraction, might come back in the future? // /** Implies lk connection is connected */ // Creating = "tracks_creating", /** Implies lk connection is connected */ @@ -154,9 +158,10 @@ export const createLocalMembership$ = ({ matrixRTCSession, }: Props): { /** - * This starts audio and video tracks. They will be reused when calling `requestPublish`. + * This request to start audio and video tracks. + * Can be called early to pre-emptively get media permissions and start devices. */ - startTracks: () => Behavior; + startTracks: () => void; /** * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. @@ -265,19 +270,10 @@ export const createLocalMembership$ = ({ * The publisher is stored in here an abstracts creating and publishing tracks. */ const publisher$ = new BehaviorSubject(null); - /** - * Extract the tracks from the published. Also reacts to changing publishers. - */ - // const tracks$ = scope.behavior( - // publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), - // ); - // const publishing$ = scope.behavior( - // publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), - // ); - const startTracks = (): Behavior => { + const startTracks = (): void => { trackStartRequested.resolve(); - return constant(undefined); + // This used to return the tracks, but now they are only accessible via the publisher. }; const requestJoinAndPublish = (): void => { @@ -348,14 +344,6 @@ export const createLocalMembership$ = ({ setPublishError(new UnknownCallError(error as Error)); } } - // XXX Why is that? - // else { - // try { - // await publisher?.stopPublishing(); - // } catch (error) { - // setLivekitError(new UnknownCallError(error as Error)); - // } - // } }, ); @@ -401,16 +389,10 @@ export const createLocalMembership$ = ({ ([ localConnectionState, localTransport, - // tracks, - // publishing, shouldPublish, shouldStartTracks, ]) => { if (!localTransport) return null; - // const hasTracks = tracks.length > 0; - // let trackState: TrackState = TrackState.WaitingForUser; - // if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; - // if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; const trackState: TrackState = shouldStartTracks ? TrackState.Ready : TrackState.WaitingForUser; diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 3cc96bc2..38a80bed 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -34,7 +34,7 @@ beforeEach(() => { scope = new ObservableScope(); }); -// afterEach(() => scope.end()); +afterEach(() => scope.end()); function createMockLocalTrack(source: Track.Source): LocalTrack { const track = { diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index d4ad656c..4428d845 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -97,7 +97,7 @@ export class Publisher { // it would also prevent the user from seeing their own video/audio preview. // So for that we use pauseUpStream(): Stops sending media to the server by replacing // the sender track with null, but keeps the local MediaStreamTrack active. - // The user can still see/hear themselves locally, but remote participants see nothing + // The user can still see/hear themselves locally, but remote participants see nothing. private onLocalTrackPublished( localTrackPublication: LocalTrackPublication, ): void { @@ -128,6 +128,15 @@ export class Publisher { } } /** + * Create and setup local audio and video tracks based on the current mute states. + * It creates the tracks only if audio and/or video is enabled, to avoid unnecessary + * permission prompts. + * + * It also observes mute state changes to update LiveKit microphone/camera states accordingly. + * If a track is not created initially because disabled, it will be created when unmuting. + * + * This call is not blocking anymore, instead callers can listen to the + * `RoomEvent.MediaDevicesError` event in the LiveKit room to be notified of any errors. * */ public async createAndSetupTracks(): Promise { @@ -141,25 +150,21 @@ export class Publisher { const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; - const enableTracks = async (): Promise => { - if (audio && video) { - // Enable both at once in order to have a single permission prompt! - await lkRoom.localParticipant.enableCameraAndMicrophone(); - } else if (audio) { - await lkRoom.localParticipant.setMicrophoneEnabled(true); - } else if (video) { - await lkRoom.localParticipant.setCameraEnabled(true); - } - return; - }; - // We don't await enableTracks, because livekit could block until the tracks + // We don't await the creation, because livekit could block until the tracks // are fully published, and not only that they are created. // We don't have control on that, localParticipant creates and publishes the tracks // asap. // We are using the `ParticipantEvent.LocalTrackPublished` to be notified // when tracks are actually published, and at that point // we can pause upstream if needed (depending on if startPublishing has been called). - void enableTracks(); + if (audio && video) { + // Enable both at once in order to have a single permission prompt! + void lkRoom.localParticipant.enableCameraAndMicrophone(); + } else if (audio) { + void lkRoom.localParticipant.setMicrophoneEnabled(true); + } else if (video) { + void lkRoom.localParticipant.setCameraEnabled(true); + } return Promise.resolve(); } @@ -233,6 +238,8 @@ export class Publisher { public async stopPublishing(): Promise { this.logger.debug("stopPublishing called"); this.shouldPublish = false; + // Pause upstream will stop sending media to the server, while keeping + // the local MediaStreamTrack active, so the user can still see themselves. await this.pauseUpstreams(this.connection.livekitRoom, [ Track.Source.Microphone, Track.Source.Camera,