diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 082da477..e48dd8c4 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -101,7 +101,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, - LivekitState, + RTCBackendState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -473,6 +473,9 @@ export function createCallViewModel$( mediaDevices, muteStates, trackProcessorState$, + logger.getChild( + "[Publisher" + connection.transport.livekit_service_url + "]", + ), ); }, connectionManager: connectionManager, @@ -664,7 +667,7 @@ export function createCallViewModel$( { value: matrixLivekitMembers }, duplicateTiles, ]) { - let localParticipantId = undefined; + let localParticipantId: string | undefined = undefined; // add local member if available if (localMatrixLivekitMember) { const { userId, participant$, connection$, membership$ } = @@ -1452,7 +1455,7 @@ export function createCallViewModel$( fatalError$: scope.behavior( localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === LivekitState.Error), + filter((v) => v.state === RTCBackendState.Error), map((s) => s.error), ), null, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index e5b7cc4a..8d9df81d 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -27,7 +27,7 @@ import { import { createLocalMembership$, enterRTCSession, - LivekitState, + RTCBackendState, } from "./LocalMembership"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; @@ -225,9 +225,9 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.WaitingForConnection }, + n: { state: RTCBackendState.WaitingForConnection }, e: { - state: LivekitState.Error, + state: RTCBackendState.Error, error: expect.toSatisfy( (e) => e instanceof MatrixRTCTransportMissingError, ), @@ -428,17 +428,17 @@ describe("LocalMembership", () => { await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingForTransport, + state: RTCBackendState.WaitingForTransport, }); localTransport$.next(aTransport); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingForConnection, + state: RTCBackendState.WaitingForConnection, }); connectionManagerData$.next(new Epoch(connectionManagerData)); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Initialized, + state: RTCBackendState.Initialized, }); expect(publisherFactory).toHaveBeenCalledOnce(); expect(localMembership.tracks$.value.length).toBe(0); @@ -449,12 +449,12 @@ describe("LocalMembership", () => { await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.CreatingTracks, + state: RTCBackendState.CreatingTracks, }); createTrackResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.ReadyToPublish, + state: RTCBackendState.ReadyToPublish, }); // ------- @@ -462,13 +462,13 @@ describe("LocalMembership", () => { // ------- expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.WaitingToPublish, + state: RTCBackendState.WaitingToPublish, }); publishResolver.resolve(); await flushPromises(); expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Connected, + state: RTCBackendState.Connected, }); expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); @@ -477,7 +477,7 @@ describe("LocalMembership", () => { await flushPromises(); // stays in connected state because it is stopped before the update to tracks update the state. expect(localMembership.connectionState.livekit$.value).toStrictEqual({ - state: LivekitState.Connected, + state: RTCBackendState.Connected, }); // stop all tracks after ending scopes expect(publishers[0].stopPublishing).toHaveBeenCalled(); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index a68738e1..3e9cd0cf 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,6 +11,7 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, + ConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -52,7 +53,7 @@ import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; -export enum LivekitState { +export enum RTCBackendState { Error = "error", /** Not even a transport is available to the LocalMembership */ WaitingForTransport = "waiting_for_transport", @@ -68,17 +69,17 @@ export enum LivekitState { Disconnecting = "disconnecting", } -type LocalMemberLivekitState = - | { state: LivekitState.Error; error: ElementCallError } - | { state: LivekitState.WaitingForTransport } - | { state: LivekitState.WaitingForConnection } - | { state: LivekitState.Initialized } - | { state: LivekitState.CreatingTracks } - | { state: LivekitState.ReadyToPublish } - | { state: LivekitState.WaitingToPublish } - | { state: LivekitState.Connected } - | { state: LivekitState.Disconnected } - | { state: LivekitState.Disconnecting }; +type LocalMemberRtcBackendState = + | { state: RTCBackendState.Error; error: ElementCallError } + | { state: RTCBackendState.WaitingForTransport } + | { state: RTCBackendState.WaitingForConnection } + | { state: RTCBackendState.Initialized } + | { state: RTCBackendState.CreatingTracks } + | { state: RTCBackendState.ReadyToPublish } + | { state: RTCBackendState.WaitingToPublish } + | { state: RTCBackendState.Connected } + | { state: RTCBackendState.Disconnected } + | { state: RTCBackendState.Disconnecting }; export enum MatrixState { WaitingForTransport = "waiting_for_transport", @@ -98,7 +99,7 @@ type LocalMemberMatrixState = | { state: MatrixState.Error; error: Error }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -155,8 +156,15 @@ export const createLocalMembership$ = ({ muteStates, matrixRTCSession, }: Props): { - requestConnect: () => void; + /** + * This starts audio and video tracks. They will be reused when calling `requestConnect`. + */ startTracks: () => Behavior; + /** + * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * connected to matrix and livekit. + */ + requestConnect: () => void; requestDisconnect: () => void; connectionState: LocalMemberConnectionState; sharingScreen$: Behavior; @@ -228,7 +236,15 @@ export const createLocalMembership$ = ({ and$( homeserverConnected$, localConnectionState$.pipe( - map((state) => (state ? state.state === "ConnectedToLkRoom" : false)), + switchMap((state) => { + if (!state) return of(false); + if (state.state === "ConnectedToLkRoom") { + state.livekitConnectionState$.pipe( + map((lkState) => lkState === ConnectionState.Connected), + ); + } + return of(false); + }), ), ), ); @@ -274,9 +290,7 @@ export const createLocalMembership$ = ({ publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), ); const publishing$ = scope.behavior( - publisher$.pipe( - switchMap((p) => (p?.publishing$ ? p.publishing$ : constant(false))), - ), + publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), ); const startTracks = (): Behavior => { @@ -353,7 +367,7 @@ export const createLocalMembership$ = ({ logger.error("Multiple Livkit Errors:", e); else fatalLivekitError$.next(e); }; - const livekitState$: Behavior = scope.behavior( + const livekitState$: Behavior = scope.behavior( combineLatest([ publisher$, localTransport$, @@ -386,16 +400,17 @@ export const createLocalMembership$ = ({ // // as: // We do have but not yet so we are in - if (error !== null) return { state: LivekitState.Error, error }; + if (error !== null) return { state: RTCBackendState.Error, error }; const hasTracks = tracks.length > 0; if (!localTransport) - return { state: LivekitState.WaitingForTransport }; - if (!publisher) return { state: LivekitState.WaitingForConnection }; - if (!shouldStartTracks) return { state: LivekitState.Initialized }; - if (!hasTracks) return { state: LivekitState.CreatingTracks }; - if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; - if (!publishing) return { state: LivekitState.WaitingToPublish }; - return { state: LivekitState.Connected }; + return { state: RTCBackendState.WaitingForTransport }; + if (!publisher) + return { state: RTCBackendState.WaitingForConnection }; + if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; + if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; + if (!publishing) return { state: RTCBackendState.WaitingToPublish }; + return { state: RTCBackendState.Connected }; }, ), distinctUntilChanged(deepCompare), @@ -526,7 +541,7 @@ export const createLocalMembership$ = ({ ), ); - let toggleScreenSharing = null; + let toggleScreenSharing: (() => void) | null = null; if ( "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !getUrlParams().hideScreensharing diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index f45f7abe..9b3e5b2a 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -16,6 +16,7 @@ import { } from "vitest"; import { ConnectionState as LivekitConenctionState } from "livekit-client"; import { type BehaviorSubject } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; @@ -70,6 +71,7 @@ describe("Publisher", () => { mockMediaDevices({}), muteStates, constant({ supported: false, processor: undefined }), + logger, ); // should do nothing if no tracks have been created yet. diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 3f3192d1..326dedaf 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -60,15 +60,15 @@ export class Publisher { devices: MediaDevices, private readonly muteStates: MuteStates, trackerProcessorState$: Behavior, - private logger?: Logger, + private logger: Logger, ) { - this.logger?.info("[PublishConnection] Create LiveKit room"); + this.logger.info("Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => { - this.logger?.error("Failed to set E2EE enabled on room", e); + this.logger.error("Failed to set E2EE enabled on room", e); }); // Setup track processor syncing (blur) @@ -78,9 +78,7 @@ export class Publisher { this.workaroundRestartAudioInputTrackChrome(devices, scope); this.scope.onEnd(() => { - this.logger?.info( - "[PublishConnection] Scope ended -> stop publishing all tracks", - ); + this.logger.info("Scope ended -> stop publishing all tracks"); void this.stopPublishing(); }); @@ -119,6 +117,7 @@ export class Publisher { * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ public async createAndSetupTracks(): Promise { + this.logger.debug("createAndSetupTracks called"); const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); @@ -135,7 +134,14 @@ export class Publisher { video, }) .then((tracks) => { + this.logger.info( + "created track", + tracks.map((t) => t.kind + ", " + t.id), + ); this._tracks$.next(tracks); + }) + .catch((error) => { + this.logger.error("Failed to create tracks", error); }); } throw Error("audio and video is false"); @@ -149,6 +155,7 @@ export class Publisher { * @throws ElementCallError */ public async startPublishing(): Promise { + this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { @@ -164,7 +171,7 @@ export class Publisher { ); break; default: - this.logger?.info("waiting for connection: ", s.state); + this.logger.info("waiting for connection: ", s.state); } }); try { @@ -176,20 +183,27 @@ export class Publisher { } for (const track of this.tracks$.value) { + this.logger.info("publish ", this.tracks$.value.length, "tracks"); // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger?.error("Failed to publish track", error); + this.logger.error("Failed to publish track", error); throw new FailToStartLivekitConnection( error instanceof Error ? error.message : error, ); }); + this.logger.info("published track ", track.kind, track.id); + + // TODO: check if the connection is still active? and break the loop if not? } this._publishing$.next(true); return this.tracks$.value; } public async stopPublishing(): Promise { + this.logger.debug("stopPublishing called"); + // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope + // actually has the right lifetime this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); @@ -199,7 +213,19 @@ export class Publisher { if (p.track !== undefined) tracks.push(p.track); }; localParticipant.trackPublications.forEach(addToTracksIfDefined); - await localParticipant.unpublishTracks(tracks); + this.logger.debug( + "list of tracks to unpublish:", + tracks.map((t) => t.kind + ", " + t.id), + "start unpublishing now", + ); + await localParticipant.unpublishTracks(tracks).catch((error) => { + this.logger.error("Failed to unpublish tracks", error); + throw error; + }); + this.logger.debug( + "unpublished tracks", + tracks.map((t) => t.kind + ", " + t.id), + ); this._publishing$.next(false); } @@ -256,7 +282,7 @@ export class Publisher { .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { - this.logger?.error(`Failed to restart audio device track`, e); + this.logger.error(`Failed to restart audio device track`, e); }); } }); @@ -276,7 +302,7 @@ export class Publisher { selected$.pipe(scope.bind()).subscribe((device) => { if (lkRoom.state != LivekitConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; - this.logger?.info( + this.logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", lkRoom.getActiveDevice(kind), " !== ", @@ -289,7 +315,7 @@ export class Publisher { lkRoom .switchActiveDevice(kind, device.id) .catch((e: Error) => - this.logger?.error( + this.logger.error( `Failed to sync ${kind} device with LiveKit`, e, ), @@ -314,10 +340,7 @@ export class Publisher { try { await lkRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit audio input mute state", - e, - ); + this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); @@ -325,10 +348,7 @@ export class Publisher { try { await lkRoom.localParticipant.setCameraEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit video input mute state", - e, - ); + this.logger.error("Failed to update LiveKit video input mute state", e); } return lkRoom.localParticipant.isCameraEnabled; });