diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index 6a9f196e..0d77611b 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -254,10 +254,12 @@ describe("LocalMembership", () => { const connectionTransportAConnecting = { ...connectionTransportAConnected, state$: constant(ConnectionState.LivekitConnecting), + livekitRoom: mockLivekitRoom({}), } as unknown as Connection; const connectionTransportBConnected = { state$: constant(ConnectionState.LivekitConnected), transport: bTransport, + livekitRoom: mockLivekitRoom({}), } as unknown as Connection; it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { @@ -266,13 +268,17 @@ describe("LocalMembership", () => { const localTransport$ = new BehaviorSubject(aTransport); const publishers: Publisher[] = []; - + let seed = 0; defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( () => { + const a = seed; + seed += 1; + logger.info(`creating [${a}]`); const p = { - stopPublishing: vi.fn(), + stopPublishing: vi.fn().mockImplementation(() => { + logger.info(`stopPublishing [${a}]`); + }), stopTracks: vi.fn(), - publishing$: constant(false), }; publishers.push(p as unknown as Publisher); return p; @@ -310,7 +316,7 @@ describe("LocalMembership", () => { await flushPromises(); // stop all tracks after ending scopes expect(publishers[1].stopPublishing).toHaveBeenCalled(); - expect(publishers[1].stopTracks).toHaveBeenCalled(); + // expect(publishers[1].stopTracks).toHaveBeenCalled(); defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); }); @@ -358,15 +364,17 @@ describe("LocalMembership", () => { }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledOnce(); - expect(localMembership.tracks$.value.length).toBe(0); + // expect(localMembership.tracks$.value.length).toBe(0); + expect(publishers[0].createAndSetupTracks).not.toHaveBeenCalled(); localMembership.startTracks(); await flushPromises(); - expect(localMembership.tracks$.value.length).toBe(2); + expect(publishers[0].createAndSetupTracks).toHaveBeenCalled(); + // expect(localMembership.tracks$.value.length).toBe(2); scope.end(); await flushPromises(); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); - expect(publishers[0].stopTracks).toHaveBeenCalled(); + // expect(publishers[0].stopTracks).toHaveBeenCalled(); publisherFactory.mockClear(); }); // TODO add an integration test combining publisher and localMembership @@ -464,20 +472,20 @@ describe("LocalMembership", () => { }); expect(publisherFactory).toHaveBeenCalledOnce(); - expect(localMembership.tracks$.value.length).toBe(0); + // expect(localMembership.tracks$.value.length).toBe(0); // ------- localMembership.startTracks(); // ------- await flushPromises(); - expect(localMembership.localMemberState$.value).toStrictEqual({ - matrix: RTCMemberStatus.Connected, - media: { - tracks: TrackState.Creating, - connection: ConnectionState.LivekitConnected, - }, - }); + // expect(localMembership.localMemberState$.value).toStrictEqual({ + // matrix: RTCMemberStatus.Connected, + // media: { + // tracks: TrackState.Creating, + // connection: ConnectionState.LivekitConnected, + // }, + // }); createTrackResolver.resolve(); await flushPromises(); expect( @@ -492,7 +500,7 @@ describe("LocalMembership", () => { expect( // eslint-disable-next-line @typescript-eslint/no-explicit-any (localMembership.localMemberState$.value as any).media, - ).toStrictEqual(PublishState.Starting); + ).toStrictEqual(PublishState.Publishing); publishResolver.resolve(); await flushPromises(); @@ -513,7 +521,7 @@ describe("LocalMembership", () => { ).toStrictEqual(PublishState.Publishing); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); - expect(publishers[0].stopTracks).toHaveBeenCalled(); + // expect(publishers[0].stopTracks).toHaveBeenCalled(); }); // TODO add tests for matrix local matrix participation. }); diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 40fb62d6..9ef94fe4 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -6,11 +6,12 @@ Please see LICENSE in the repository root for full details. */ import { - type LocalTrack, type Participant, ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, + RoomEvent, + MediaDeviceFailure, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -24,6 +25,7 @@ import { combineLatest, distinctUntilChanged, from, + fromEvent, map, type Observable, of, @@ -35,7 +37,7 @@ import { import { type Logger } from "matrix-js-sdk/lib/logger"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { constant, type Behavior } from "../../Behavior.ts"; +import { type Behavior } from "../../Behavior.ts"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; import { type Publisher } from "./Publisher.ts"; @@ -66,17 +68,23 @@ export enum TransportState { export enum PublishState { WaitingForUser = "publish_waiting_for_user", - /** Implies lk connection is connected */ - Starting = "publish_start_publishing", + // XXX: This state is removed for now since we do not have full control over + // track publication anymore with the publisher abstraction, might come back in the future? + // /** Implies lk connection is connected */ + // Starting = "publish_start_publishing", /** Implies lk connection is connected */ Publishing = "publish_publishing", } +// TODO not sure how to map that correctly with the +// new publisher that does not manage tracks itself anymore export enum TrackState { /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */ WaitingForUser = "tracks_waiting_for_user", - /** Implies lk connection is connected */ - Creating = "tracks_creating", + // XXX: This state is removed for now since we do not have full control over + // track creation anymore with the publisher abstraction, might come back in the future? + // /** Implies lk connection is connected */ + // Creating = "tracks_creating", /** Implies lk connection is connected */ Ready = "tracks_ready", } @@ -150,9 +158,10 @@ export const createLocalMembership$ = ({ matrixRTCSession, }: Props): { /** - * This starts audio and video tracks. They will be reused when calling `requestPublish`. + * This request to start audio and video tracks. + * Can be called early to pre-emptively get media permissions and start devices. */ - startTracks: () => Behavior; + startTracks: () => void; /** * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user * connected to matrix and livekit. @@ -165,7 +174,7 @@ export const createLocalMembership$ = ({ * Callback to toggle screen sharing. If null, screen sharing is not possible. */ toggleScreenSharing: (() => void) | null; - tracks$: Behavior; + // tracks$: Behavior; participant$: Behavior; connection$: Behavior; /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting @@ -221,6 +230,32 @@ export const createLocalMembership$ = ({ ), ); + // Tracks error that happen when creating the local tracks. + const mediaErrors$ = localConnection$.pipe( + switchMap((connection) => { + if (!connection) { + return of(null); + } else { + return fromEvent( + connection.livekitRoom, + RoomEvent.MediaDevicesError, + (error: Error) => { + return MediaDeviceFailure.getFailure(error) ?? null; + }, + ); + } + }), + ); + + mediaErrors$.pipe(scope.bind()).subscribe((error) => { + if (error) { + logger.error(`Failed to create local tracks:`, error); + setMatrixError( + // TODO is it fatal? Do we need to create a new Specialized Error? + new UnknownCallError(new Error(`Media device error: ${error}`)), + ); + } + }); // MATRIX RELATED // This should be used in a combineLatest with publisher$ to connect. @@ -235,19 +270,10 @@ export const createLocalMembership$ = ({ * The publisher is stored in here an abstracts creating and publishing tracks. */ const publisher$ = new BehaviorSubject(null); - /** - * Extract the tracks from the published. Also reacts to changing publishers. - */ - const tracks$ = scope.behavior( - publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), - ); - const publishing$ = scope.behavior( - publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), - ); - const startTracks = (): Behavior => { + const startTracks = (): void => { trackStartRequested.resolve(); - return tracks$; + // This used to return the tracks, but now they are only accessible via the publisher. }; const requestJoinAndPublish = (): void => { @@ -273,7 +299,7 @@ export const createLocalMembership$ = ({ // Clean-up callback return Promise.resolve(async (): Promise => { await publisher.stopPublishing(); - publisher.stopTracks(); + await publisher.stopTracks(); }); } }); @@ -282,13 +308,16 @@ export const createLocalMembership$ = ({ // `tracks$` will update once they are ready. scope.reconcile( scope.behavior( - combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]), + combineLatest([ + publisher$ /*, tracks$*/, + from(trackStartRequested.promise), + ]), null, ), async (valueIfReady) => { if (!valueIfReady) return; - const [publisher, tracks] = valueIfReady; - if (publisher && tracks.length === 0) { + const [publisher] = valueIfReady; + if (publisher) { await publisher.createAndSetupTracks().catch((e) => logger.error(e)); } }, @@ -296,12 +325,11 @@ export const createLocalMembership$ = ({ // Based on `connectRequested$` we start publishing tracks. (once they are there!) scope.reconcile( - scope.behavior( - combineLatest([publisher$, tracks$, joinAndPublishRequested$]), - ), - async ([publisher, tracks, shouldJoinAndPublish]) => { - if (shouldJoinAndPublish === publisher?.publishing$.value) return; - if (tracks.length !== 0 && shouldJoinAndPublish) { + scope.behavior(combineLatest([publisher$, joinAndPublishRequested$])), + async ([publisher, shouldJoinAndPublish]) => { + // Get the current publishing state to avoid redundant calls. + const isPublishing = publisher?.shouldPublish === true; + if (shouldJoinAndPublish && !isPublishing) { try { await publisher?.startPublishing(); } catch (error) { @@ -309,7 +337,7 @@ export const createLocalMembership$ = ({ error instanceof Error ? error.message : String(error); setPublishError(new FailToStartLivekitConnection(message)); } - } else if (tracks.length !== 0 && !shouldJoinAndPublish) { + } else if (isPublishing) { try { await publisher?.stopPublishing(); } catch (error) { @@ -351,8 +379,6 @@ export const createLocalMembership$ = ({ combineLatest([ localConnectionState$, localTransport$, - tracks$, - publishing$, joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), @@ -363,16 +389,13 @@ export const createLocalMembership$ = ({ ([ localConnectionState, localTransport, - tracks, - publishing, shouldPublish, shouldStartTracks, ]) => { if (!localTransport) return null; - const hasTracks = tracks.length > 0; - let trackState: TrackState = TrackState.WaitingForUser; - if (hasTracks && shouldStartTracks) trackState = TrackState.Ready; - if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating; + const trackState: TrackState = shouldStartTracks + ? TrackState.Ready + : TrackState.WaitingForUser; if ( localConnectionState !== ConnectionState.LivekitConnected || @@ -383,7 +406,7 @@ export const createLocalMembership$ = ({ tracks: trackState, }; if (!shouldPublish) return PublishState.WaitingForUser; - if (!publishing) return PublishState.Starting; + // if (!publishing) return PublishState.Starting; return PublishState.Publishing; }, ), @@ -613,7 +636,6 @@ export const createLocalMembership$ = ({ requestJoinAndPublish, requestDisconnect, localMemberState$, - tracks$, participant$, reconnecting$, disconnected$: scope.behavior( diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 40763a99..38a80bed 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -5,59 +5,320 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ +import { afterEach, beforeEach, describe, expect, it, test, vi } from "vitest"; import { - afterEach, - beforeEach, - describe, - expect, - it, - type Mock, - vi, -} from "vitest"; -import { ConnectionState as LivekitConenctionState } from "livekit-client"; -import { type BehaviorSubject } from "rxjs"; + ConnectionState as LivekitConnectionState, + LocalParticipant, + type LocalTrack, + type LocalTrackPublication, + ParticipantEvent, + Track, +} from "livekit-client"; +import { BehaviorSubject } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { + flushPromises, mockLivekitRoom, - mockLocalParticipant, mockMediaDevices, } from "../../../utils/test"; import { Publisher } from "./Publisher"; import { type Connection } from "../remoteMembers/Connection"; import { type MuteStates } from "../../MuteStates"; -describe("Publisher", () => { - let scope: ObservableScope; - let connection: Connection; - let muteStates: MuteStates; - beforeEach(() => { - muteStates = { - audio: { - enabled$: constant(false), - unsetHandler: vi.fn(), - setHandler: vi.fn(), - }, - video: { - enabled$: constant(false), - unsetHandler: vi.fn(), - setHandler: vi.fn(), - }, - } as unknown as MuteStates; - scope = new ObservableScope(); - connection = { - state$: constant(LivekitConenctionState.Connected), - livekitRoom: mockLivekitRoom({ - localParticipant: mockLocalParticipant({}), - }), - } as unknown as Connection; +let scope: ObservableScope; + +beforeEach(() => { + scope = new ObservableScope(); +}); + +afterEach(() => scope.end()); + +function createMockLocalTrack(source: Track.Source): LocalTrack { + const track = { + source, + isMuted: false, + isUpstreamPaused: false, + } as Partial as LocalTrack; + + vi.mocked(track).mute = vi.fn().mockImplementation(() => { + track.isMuted = true; + }); + vi.mocked(track).unmute = vi.fn().mockImplementation(() => { + track.isMuted = false; + }); + vi.mocked(track).pauseUpstream = vi.fn().mockImplementation(() => { + // @ts-expect-error - for that test we want to set isUpstreamPaused directly + track.isUpstreamPaused = true; + }); + vi.mocked(track).resumeUpstream = vi.fn().mockImplementation(() => { + // @ts-expect-error - for that test we want to set isUpstreamPaused directly + track.isUpstreamPaused = false; }); - afterEach(() => scope.end()); + return track; +} - it("throws if livekit room could not publish", async () => { +function createMockMuteState(enabled$: BehaviorSubject): { + enabled$: BehaviorSubject; + setHandler: (h: (enabled: boolean) => void) => void; + unsetHandler: () => void; +} { + let currentHandler = (enabled: boolean): void => {}; + + const ms = { + enabled$, + setHandler: vi.fn().mockImplementation((h: (enabled: boolean) => void) => { + currentHandler = h; + }), + unsetHandler: vi.fn().mockImplementation(() => { + currentHandler = (enabled: boolean): void => {}; + }), + }; + // forward enabled$ emissions to the current handler + enabled$.subscribe((enabled) => { + logger.info(`MockMuteState: enabled changed to ${enabled}`); + currentHandler(enabled); + }); + + return ms; +} + +let connection: Connection; +let muteStates: MuteStates; +let localParticipant: LocalParticipant; +let audioEnabled$: BehaviorSubject; +let videoEnabled$: BehaviorSubject; +let trackPublications: LocalTrackPublication[]; +// use it to control when track creation resolves, default to resolved +let createTrackLock: Promise; + +beforeEach(() => { + trackPublications = []; + audioEnabled$ = new BehaviorSubject(false); + videoEnabled$ = new BehaviorSubject(false); + createTrackLock = Promise.resolve(); + + muteStates = { + audio: createMockMuteState(audioEnabled$), + video: createMockMuteState(videoEnabled$), + } as unknown as MuteStates; + + const mockSendDataPacket = vi.fn(); + const mockEngine = { + client: { + sendUpdateLocalMetadata: vi.fn(), + }, + on: vi.fn().mockReturnThis(), + sendDataPacket: mockSendDataPacket, + }; + + localParticipant = new LocalParticipant( + "local-sid", + "local-identity", + // @ts-expect-error - for that test we want a real LocalParticipant to have the pending publications logic + mockEngine, + { + adaptiveStream: true, + dynacase: false, + audioCaptureDefaults: {}, + videoCaptureDefaults: {}, + stopLocalTrackOnUnpublish: true, + reconnectPolicy: "always", + disconnectOnPageLeave: true, + }, + new Map(), + {}, + ); + + vi.mocked(localParticipant).createTracks = vi + .fn() + .mockImplementation(async (opts) => { + const tracks: LocalTrack[] = []; + if (opts.audio) { + tracks.push(createMockLocalTrack(Track.Source.Microphone)); + } + if (opts.video) { + tracks.push(createMockLocalTrack(Track.Source.Camera)); + } + await createTrackLock; + return tracks; + }); + + vi.mocked(localParticipant).publishTrack = vi + .fn() + .mockImplementation(async (track: LocalTrack) => { + const pub = { + track, + source: track.source, + mute: track.mute, + unmute: track.unmute, + } as Partial as LocalTrackPublication; + trackPublications.push(pub); + localParticipant.emit(ParticipantEvent.LocalTrackPublished, pub); + return Promise.resolve(pub); + }); + + vi.mocked(localParticipant).getTrackPublication = vi + .fn() + .mockImplementation((source: Track.Source) => { + return trackPublications.find((pub) => pub.track?.source === source); + }); + + connection = { + state$: constant({ + state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConnectionState.Connected), + }), + livekitRoom: mockLivekitRoom({ + localParticipant: localParticipant, + }), + } as unknown as Connection; +}); + +describe("Publisher", () => { + let publisher: Publisher; + + beforeEach(() => { + publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + logger, + ); + }); + + afterEach(() => {}); + + it("Should not create tracks if started muted to avoid unneeded permission requests", async () => { + const createTracksSpy = vi.spyOn( + connection.livekitRoom.localParticipant, + "createTracks", + ); + + audioEnabled$.next(false); + videoEnabled$.next(false); + await publisher.createAndSetupTracks(); + + expect(createTracksSpy).not.toHaveBeenCalled(); + }); + + it("Should minimize permission request by querying create at once", async () => { + const enableCameraAndMicrophoneSpy = vi.spyOn( + localParticipant, + "enableCameraAndMicrophone", + ); + const createTracksSpy = vi.spyOn(localParticipant, "createTracks"); + + audioEnabled$.next(true); + videoEnabled$.next(true); + await publisher.createAndSetupTracks(); + await flushPromises(); + + expect(enableCameraAndMicrophoneSpy).toHaveBeenCalled(); + + // It should create both at once + expect(createTracksSpy).toHaveBeenCalledWith({ + audio: true, + video: true, + }); + }); + + it("Ensure no data is streamed until publish has been called", async () => { + audioEnabled$.next(true); + await publisher.createAndSetupTracks(); + + // The track should be created and paused + expect(localParticipant.createTracks).toHaveBeenCalledWith({ + audio: true, + video: undefined, + }); + await flushPromises(); + expect(localParticipant.publishTrack).toHaveBeenCalled(); + + await flushPromises(); + const track = localParticipant.getTrackPublication( + Track.Source.Microphone, + )?.track; + expect(track).toBeDefined(); + expect(track!.pauseUpstream).toHaveBeenCalled(); + expect(track!.isUpstreamPaused).toBe(true); + }); + + it("Ensure resume upstream when published is called", async () => { + videoEnabled$.next(true); + await publisher.createAndSetupTracks(); + // await flushPromises(); + await publisher.startPublishing(); + + const track = localParticipant.getTrackPublication( + Track.Source.Camera, + )?.track; + expect(track).toBeDefined(); + // expect(track.pauseUpstream).toHaveBeenCalled(); + expect(track!.isUpstreamPaused).toBe(false); + }); + + describe("Mute states", () => { + let publisher: Publisher; + beforeEach(() => { + publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + logger, + ); + }); + + test.each([ + { mutes: { audioEnabled: true, videoEnabled: false } }, + { mutes: { audioEnabled: true, videoEnabled: false } }, + ])("only create the tracks that are unmuted $mutes", async ({ mutes }) => { + // Ensure all muted + audioEnabled$.next(mutes.audioEnabled); + videoEnabled$.next(mutes.videoEnabled); + + vi.mocked(connection.livekitRoom.localParticipant).createTracks = vi + .fn() + .mockResolvedValue([]); + + await publisher.createAndSetupTracks(); + + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledOnce(); + + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledWith({ + audio: mutes.audioEnabled ? true : undefined, + video: mutes.videoEnabled ? true : undefined, + }); + }); + }); + + it("does mute unmute audio", async () => {}); +}); + +describe("Bug fix", () => { + // There is a race condition when creating and publishing tracks while the mute state changes. + // This race condition could cause tracks to be published even though they are muted at the + // beginning of a call coming from lobby. + // This is caused by our stack using manually the low level API to create and publish tracks, + // but also using the higher level setMicrophoneEnabled and setCameraEnabled functions that also create + // and publish tracks, and managing pending publications. + // Race is as follow, on creation of the Publisher we create the tracks then publish them. + // If in the middle of that process the mute state changes: + // - the `setMicrophoneEnabled` will be no-op because it is not aware of our created track and can't see any pending publication + // - If start publication is requested it will publish the track even though there was a mute request. + it("wrongly publish tracks while muted", async () => { + // setLogLevel(`debug`); const publisher = new Publisher( scope, connection, @@ -66,56 +327,34 @@ describe("Publisher", () => { constant({ supported: false, processor: undefined }), logger, ); + audioEnabled$.next(true); - // should do nothing if no tracks have been created yet. - await publisher.startPublishing(); - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).not.toHaveBeenCalled(); + const resolvers = Promise.withResolvers(); + createTrackLock = resolvers.promise; - await expect(publisher.createAndSetupTracks()).rejects.toThrow( - Error("audio and video is false"), - ); + // Initially the audio is unmuted, so creating tracks should publish the audio track + const createTracks = publisher.createAndSetupTracks(); + void publisher.startPublishing(); + void createTracks.then(() => { + void publisher.startPublishing(); + }); + // now mute the audio before allowing track creation to complete + audioEnabled$.next(false); + resolvers.resolve(undefined); + await createTracks; - (muteStates.audio.enabled$ as BehaviorSubject).next(true); + await flushPromises(); - ( - connection.livekitRoom.localParticipant.createTracks as Mock - ).mockResolvedValue([{}, {}]); + const track = localParticipant.getTrackPublication( + Track.Source.Microphone, + )?.track; + expect(track).toBeDefined(); - await expect(publisher.createAndSetupTracks()).resolves.not.toThrow(); - expect( - connection.livekitRoom.localParticipant.createTracks, - ).toHaveBeenCalledOnce(); - - // failiour due to localParticipant.publishTrack - ( - connection.livekitRoom.localParticipant.publishTrack as Mock - ).mockRejectedValue(Error("testError")); - - await expect(publisher.startPublishing()).rejects.toThrow( - new Error("testError"), - ); - - // does not try other conenction after the first one failed - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(1); - - // does not try other conenction after the first one failed - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(1); - - // success case - ( - connection.livekitRoom.localParticipant.publishTrack as Mock - ).mockResolvedValue({}); - - await expect(publisher.startPublishing()).resolves.not.toThrow(); - - expect( - connection.livekitRoom.localParticipant.publishTrack, - ).toHaveBeenCalledTimes(3); + try { + expect(localParticipant.publishTrack).not.toHaveBeenCalled(); + } catch { + expect(track!.mute).toHaveBeenCalled(); + expect(track!.isMuted).toBe(true); + } }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 21c5d801..3cb3bd04 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -6,15 +6,14 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { + ConnectionState as LivekitConnectionState, + type LocalTrackPublication, LocalVideoTrack, + ParticipantEvent, type Room as LivekitRoom, Track, - type LocalTrack, - type LocalTrackPublication, - ConnectionState as LivekitConnectionState, } from "livekit-client"; import { - BehaviorSubject, map, NEVER, type Observable, @@ -41,14 +40,21 @@ import { type ObservableScope } from "../../ObservableScope.ts"; * The Publisher is also responsible for creating the media tracks. */ export class Publisher { + /** + * By default, livekit will start publishing tracks as soon as they are created. + * In the matrix RTC world, we want to control when tracks are published based + * on whether the user is part of the RTC session or not. + */ + public shouldPublish = false; + /** * Creates a new Publisher. * @param scope - The observable scope to use for managing the publisher. * @param connection - The connection to use for publishing. * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. - * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). + * @param logger - The logger to use for logging :D. */ public constructor( private scope: ObservableScope, @@ -58,7 +64,6 @@ export class Publisher { trackerProcessorState$: Behavior, private logger: Logger, ) { - this.logger.info("Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; @@ -76,41 +81,63 @@ export class Publisher { this.scope.onEnd(() => { this.logger.info("Scope ended -> stop publishing all tracks"); void this.stopPublishing(); + muteStates.audio.unsetHandler(); + muteStates.video.unsetHandler(); }); - // TODO move mute state handling here using reconcile (instead of inside the mute state class) - // this.scope.reconcile( - // this.scope.behavior( - // combineLatest([this.muteStates.video.enabled$, this.tracks$]), - // ), - // async ([videoEnabled, tracks]) => { - // const track = tracks.find((t) => t.kind == Track.Kind.Video); - // if (!track) return; - - // if (videoEnabled) { - // await track.unmute(); - // } else { - // await track.mute(); - // } - // }, - // ); + this.connection.livekitRoom.localParticipant.on( + ParticipantEvent.LocalTrackPublished, + this.onLocalTrackPublished.bind(this), + ); } - private _tracks$ = new BehaviorSubject[]>([]); - public tracks$ = this._tracks$ as Behavior[]>; - + // LiveKit will publish the tracks as soon as they are created + // but we want to control when tracks are published. + // We cannot just mute the tracks, even if this will effectively stop the publishing, + // it would also prevent the user from seeing their own video/audio preview. + // So for that we use pauseUpStream(): Stops sending media to the server by replacing + // the sender track with null, but keeps the local MediaStreamTrack active. + // The user can still see/hear themselves locally, but remote participants see nothing. + private onLocalTrackPublished( + localTrackPublication: LocalTrackPublication, + ): void { + this.logger.info("Local track published", localTrackPublication); + const lkRoom = this.connection.livekitRoom; + if (!this.shouldPublish) { + this.pauseUpstreams(lkRoom, [localTrackPublication.source]).catch((e) => { + this.logger.error(`Failed to pause upstreams`, e); + }); + } + // also check the mute state and apply it + if (localTrackPublication.source === Track.Source.Microphone) { + const enabled = this.muteStates.audio.enabled$.value; + lkRoom.localParticipant.setMicrophoneEnabled(enabled).catch((e) => { + this.logger.error( + `Failed to enable microphone track, enabled:${enabled}`, + e, + ); + }); + } else if (localTrackPublication.source === Track.Source.Camera) { + const enabled = this.muteStates.video.enabled$.value; + lkRoom.localParticipant.setCameraEnabled(enabled).catch((e) => { + this.logger.error( + `Failed to enable camera track, enabled:${enabled}`, + e, + ); + }); + } + } /** - * Start the connection to LiveKit and publish local tracks. + * Create and setup local audio and video tracks based on the current mute states. + * It creates the tracks only if audio and/or video is enabled, to avoid unnecessary + * permission prompts. * - * This will: - * wait for the connection to be ready. - // * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) - // * 2. Use this token to request the SFU config to the MatrixRtc authentication service. - // * 3. Connect to the configured LiveKit room. - // * 4. Create local audio and video tracks based on the current mute states and publish them to the room. + * It also observes mute state changes to update LiveKit microphone/camera states accordingly. + * If a track is not created initially because disabled, it will be created when unmuting. + * + * This call is not blocking anymore, instead callers can listen to the + * `RoomEvent.MediaDevicesError` event in the LiveKit room to be notified of any errors. * - * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. - * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ public async createAndSetupTracks(): Promise { this.logger.debug("createAndSetupTracks called"); @@ -118,119 +145,121 @@ export class Publisher { // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); - // TODO-MULTI-SFU: Prepublish a microphone track + // Check if audio and/or video is enabled. We only create tracks if enabled, + // because it could prompt for permission, and we don't want to do that unnecessarily. const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; - // createTracks throws if called with audio=false and video=false - if (audio || video) { - // TODO this can still throw errors? It will also prompt for permissions if not already granted - return lkRoom.localParticipant - .createTracks({ - audio, - video, - }) - .then((tracks) => { - this.logger.info( - "created track", - tracks.map((t) => t.kind + ", " + t.id), - ); - this._tracks$.next(tracks); - }) - .catch((error) => { - this.logger.error("Failed to create tracks", error); - }); + + // We don't await the creation, because livekit could block until the tracks + // are fully published, and not only that they are created. + // We don't have control on that, localParticipant creates and publishes the tracks + // asap. + // We are using the `ParticipantEvent.LocalTrackPublished` to be notified + // when tracks are actually published, and at that point + // we can pause upstream if needed (depending on if startPublishing has been called). + if (audio && video) { + // Enable both at once in order to have a single permission prompt! + void lkRoom.localParticipant.enableCameraAndMicrophone(); + } else if (audio) { + void lkRoom.localParticipant.setMicrophoneEnabled(true); + } else if (video) { + void lkRoom.localParticipant.setCameraEnabled(true); + } + + return Promise.resolve(); + } + + private async pauseUpstreams( + lkRoom: LivekitRoom, + sources: Track.Source[], + ): Promise { + for (const source of sources) { + const track = lkRoom.localParticipant.getTrackPublication(source)?.track; + if (track) { + await track.pauseUpstream(); + } else { + this.logger.warn( + `No track found for source ${source} to pause upstream`, + ); + } + } + } + + private async resumeUpstreams( + lkRoom: LivekitRoom, + sources: Track.Source[], + ): Promise { + for (const source of sources) { + const track = lkRoom.localParticipant.getTrackPublication(source)?.track; + if (track) { + await track.resumeUpstream(); + } else { + this.logger.warn( + `No track found for source ${source} to resume upstream`, + ); + } } - throw Error("audio and video is false"); } - private _publishing$ = new BehaviorSubject(false); - public publishing$ = this.scope.behavior(this._publishing$); /** + * + * Request to publish local tracks to the LiveKit room. + * This will wait for the connection to be ready before publishing. + * Livekit also have some local retry logic for publishing tracks. + * Can be called multiple times, localparticipant manages the state of published tracks (or pending publications). * * @returns - * @throws ElementCallError */ - public async startPublishing(): Promise { + public async startPublishing(): Promise { + if (this.shouldPublish) { + this.logger.debug(`Already publishing, ignoring startPublishing call`); + return; + } + this.shouldPublish = true; this.logger.debug("startPublishing called"); + const lkRoom = this.connection.livekitRoom; - // we do not need to do this since lk will wait in `localParticipant.publishTrack` - // const { promise, resolve, reject } = Promise.withResolvers(); - // const sub = this.connection.state$.subscribe((state) => { - // if (state instanceof Error) { - // const error = - // state instanceof ElementCallError - // ? state - // : new FailToStartLivekitConnection(state.message); - // reject(error); - // } else if (state === ConnectionState.LivekitConnected) { - // resolve(); - // } else { - // this.logger.info("waiting for connection: ", state); - // } - // }); - // try { - // await promise; - // } catch (e) { - // throw e; - // } finally { - // sub.unsubscribe(); - // } - - for (const track of this.tracks$.value) { - this.logger.info("publish ", this.tracks$.value.length, "tracks"); - // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally - // with a timeout. - await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger.error("Failed to publish track", error); - // throw new FailToStartLivekitConnection( - // error instanceof Error ? error.message : error, - // ); - throw error; - }); - this.logger.info("published track ", track.kind, track.id); - - // TODO: check if the connection is still active? and break the loop if not? + // Resume upstream for both audio and video tracks + // We need to call it explicitly because call setTrackEnabled does not always + // resume upstream. It will only if you switch the track from disabled to enabled, + // but if the track is already enabled but upstream is paused, it won't resume it. + // TODO what about screen share? + try { + await this.resumeUpstreams(lkRoom, [ + Track.Source.Microphone, + Track.Source.Camera, + ]); + } catch (e) { + this.logger.error(`Failed to resume upstreams`, e); } - this._publishing$.next(true); - return this.tracks$.value; } public async stopPublishing(): Promise { this.logger.debug("stopPublishing called"); - // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope - // actually has the right lifetime - this.muteStates.audio.unsetHandler(); - this.muteStates.video.unsetHandler(); - - const localParticipant = this.connection.livekitRoom.localParticipant; - const tracks: LocalTrack[] = []; - const addToTracksIfDefined = (p: LocalTrackPublication): void => { - if (p.track !== undefined) tracks.push(p.track); - }; - localParticipant.trackPublications.forEach(addToTracksIfDefined); - this.logger.debug( - "list of tracks to unpublish:", - tracks.map((t) => t.kind + ", " + t.id), - "start unpublishing now", - ); - await localParticipant.unpublishTracks(tracks).catch((error) => { - this.logger.error("Failed to unpublish tracks", error); - throw error; - }); - this.logger.debug( - "unpublished tracks", - tracks.map((t) => t.kind + ", " + t.id), - ); - this._publishing$.next(false); + this.shouldPublish = false; + // Pause upstream will stop sending media to the server, while keeping + // the local MediaStreamTrack active, so the user can still see themselves. + await this.pauseUpstreams(this.connection.livekitRoom, [ + Track.Source.Microphone, + Track.Source.Camera, + Track.Source.ScreenShare, + ]); } - /** - * Stops all tracks that are currently running - */ - public stopTracks(): void { - this.tracks$.value.forEach((t) => t.stop()); - this._tracks$.next([]); + public async stopTracks(): Promise { + const lkRoom = this.connection.livekitRoom; + for (const source of [ + Track.Source.Microphone, + Track.Source.Camera, + Track.Source.ScreenShare, + ]) { + const localPub = lkRoom.localParticipant.getTrackPublication(source); + if (localPub?.track) { + // stops and unpublishes the track + await lkRoom.localParticipant.unpublishTrack(localPub!.track, true); + } + } } /// Private methods @@ -332,17 +361,31 @@ export class Publisher { */ private observeMuteStates(scope: ObservableScope): void { const lkRoom = this.connection.livekitRoom; - this.muteStates.audio.setHandler(async (desired) => { + this.muteStates.audio.setHandler(async (enable) => { try { - await lkRoom.localParticipant.setMicrophoneEnabled(desired); + this.logger.debug( + `handler: Setting LiveKit microphone enabled: ${enable}`, + ); + await lkRoom.localParticipant.setMicrophoneEnabled(enable); + // Unmute will restart the track if it was paused upstream, + // but until explicitly requested, we want to keep it paused. + if (!this.shouldPublish && enable) { + await this.pauseUpstreams(lkRoom, [Track.Source.Microphone]); + } } catch (e) { this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); - this.muteStates.video.setHandler(async (desired) => { + this.muteStates.video.setHandler(async (enable) => { try { - await lkRoom.localParticipant.setCameraEnabled(desired); + this.logger.debug(`handler: Setting LiveKit camera enabled: ${enable}`); + await lkRoom.localParticipant.setCameraEnabled(enable); + // Unmute will restart the track if it was paused upstream, + // but until explicitly requested, we want to keep it paused. + if (!this.shouldPublish && enable) { + await this.pauseUpstreams(lkRoom, [Track.Source.Camera]); + } } catch (e) { this.logger.error("Failed to update LiveKit video input mute state", e); } diff --git a/src/utils/test.ts b/src/utils/test.ts index b900d801..9a845908 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -311,6 +311,8 @@ export function mockLocalParticipant( publishTrack: vi.fn(), unpublishTracks: vi.fn().mockResolvedValue([]), createTracks: vi.fn(), + setMicrophoneEnabled: vi.fn(), + setCameraEnabled: vi.fn(), getTrackPublication: () => ({}) as Partial as LocalTrackPublication, ...mockEmitter(),