diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index aa7f32be..d1dbc7b4 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -227,6 +227,7 @@ export class CallViewModel { scope: this.scope, connectionFactory: this.connectionFactory, inputTransports$: this.allTransports$, + logger: logger, }); // ------------------------------------------------------------------------ @@ -259,6 +260,7 @@ export class CallViewModel { trackProcessorState$: this.trackProcessorState$, widget, options: this.connectOptions$, + logger: logger.getChild(`[${Date.now()}]`), }); private localRtcMembership$ = this.scope.behavior( diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 1d517643..5f74577e 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -22,6 +22,7 @@ import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk"; import { BehaviorSubject, combineLatest, + distinctUntilChanged, fromEvent, map, type Observable, @@ -29,8 +30,9 @@ import { scan, startWith, switchMap, + tap, } from "rxjs"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { type Logger, logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; @@ -61,6 +63,7 @@ export enum LivekitState { Disconnected = "disconnected", Disconnecting = "disconnecting", } + type LocalMemberLivekitState = | { state: LivekitState.Error; error: string } | { state: LivekitState.Connected } @@ -74,6 +77,7 @@ export enum MatrixState { Disconnected = "disconnected", Connecting = "connecting", } + type LocalMemberMatrixState = | { state: MatrixState.Connected } | { state: MatrixState.Connecting } @@ -106,6 +110,7 @@ interface Props { localTransport$: Behavior; trackProcessorState$: Behavior; widget: WidgetHelpers | null; + logger: Logger; } /** @@ -132,6 +137,7 @@ export const createLocalMembership$ = ({ matrixRoom, trackProcessorState$, widget, + logger, }: Props): { // publisher: Publisher requestConnect: () => LocalMemberConnectionState; @@ -157,6 +163,8 @@ export const createLocalMembership$ = ({ /** @deprecated use state instead*/ configError$: Behavior; } => { + const prefixLogger = logger.getChild("[LocalMembership]"); + prefixLogger.debug(`Creating local membership..`); const state = { livekit$: new BehaviorSubject({ state: LivekitState.Uninitialized, @@ -178,49 +186,73 @@ export const createLocalMembership$ = ({ const tracks$ = new BehaviorSubject([]); // Drop Epoch data here since we will not combine this anymore - const connection$ = scope.behavior( - combineLatest( - [connectionManager.connections$, localTransport$], - (connections, transport) => { - if (transport === null) return null; - return ( - connections.value.find((connection) => - areLivekitTransportsEqual(connection.transport, transport), - ) ?? null - ); - }, - ), + const localConnection$ = scope.behavior( + combineLatest([connectionManager.connections$, localTransport$]) + .pipe( + map(([connections, localTransport]) => { + if (localTransport === null) { + return null; + } + return ( + connections.value.find((connection) => + areLivekitTransportsEqual(connection.transport, localTransport), + ) ?? null + ); + }), + ) + .pipe( + distinctUntilChanged((a, b) => { + const eq = a === b; + logger.debug( + `distinctUntilChanged: Local connection equality check: ${eq}`, + ); + return eq; + }), + ) + .pipe( + tap((connection) => { + prefixLogger.info( + `Local connection updated: ${connection?.transport?.livekit_service_url}`, + ); + }), + ), ); /** * Whether we are connected to the MatrixRTC session. */ - const homeserverConnected$ = scope.behavior( - // To consider ourselves connected to MatrixRTC, we check the following: - and$( - // The client is connected to the sync loop - ( - fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< - [SyncState] - > - ).pipe( - startWith([matrixRoom.client.getSyncState()]), - map(([state]) => state === SyncState.Syncing), + const homeserverConnected$ = scope + .behavior( + // To consider ourselves connected to MatrixRTC, we check the following: + and$( + // The client is connected to the sync loop + ( + fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< + [SyncState] + > + ).pipe( + startWith([matrixRoom.client.getSyncState()]), + map(([state]) => state === SyncState.Syncing), + ), + // Room state observed by session says we're connected + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + startWith(null), + map(() => matrixRTCSession.membershipStatus === Status.Connected), + ), + // Also watch out for warnings that we've likely hit a timeout and our + // delayed leave event is being sent (this condition is here because it + // provides an earlier warning than the sync loop timeout, and we wouldn't + // see the actual leave event until we reconnect to the sync loop) + fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( + startWith(null), + map(() => matrixRTCSession.probablyLeft !== true), + ), ), - // Room state observed by session says we're connected - fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), - ), - // Also watch out for warnings that we've likely hit a timeout and our - // delayed leave event is being sent (this condition is here because it - // provides an earlier warning than the sync loop timeout, and we wouldn't - // see the actual leave event until we reconnect to the sync loop) - fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( - startWith(null), - map(() => matrixRTCSession.probablyLeft !== true), - ), - ), - ); + ) + .pipe( + tap((connected) => { + prefixLogger.info(`Homeserver connected update: ${connected}`); + }), + ); // /** // * Whether we are "fully" connected to the call. Accounts for both the @@ -230,7 +262,7 @@ export const createLocalMembership$ = ({ const connected$ = scope.behavior( and$( homeserverConnected$, - connection$.pipe( + localConnection$.pipe( switchMap((c) => c ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) @@ -241,8 +273,9 @@ export const createLocalMembership$ = ({ ); const publisher$ = new BehaviorSubject(null); - connection$.subscribe((connection) => { + localConnection$.subscribe((connection) => { if (connection !== null && publisher$.value === null) { + // TODO looks strange to not change publisher if connection changes. publisher$.next( new Publisher( scope, @@ -366,7 +399,7 @@ export const createLocalMembership$ = ({ // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - combineLatest([connection$, homeserverConnected$]) + combineLatest([localConnection$, homeserverConnected$]) .pipe(scope.bind()) .subscribe(([connection, connected]) => { if (connection?.state$.value.state !== "ConnectedToLkRoom") return; @@ -451,7 +484,7 @@ export const createLocalMembership$ = ({ * Whether the user is currently sharing their screen. */ const sharingScreen$ = scope.behavior( - connection$.pipe( + localConnection$.pipe( switchMap((c) => c === null ? of(false) @@ -472,7 +505,7 @@ export const createLocalMembership$ = ({ // We also allow screen sharing to be toggled even if the connection // is still initializing or publishing tracks, because there's no // technical reason to disallow this. LiveKit will publish if it can. - void connection$.value?.livekitRoom.localParticipant + void localConnection$.value?.livekitRoom.localParticipant .setScreenShareEnabled(!sharingScreen$.value, { audio: true, selfBrowserSurface: "include", @@ -483,7 +516,7 @@ export const createLocalMembership$ = ({ : null; const participant$ = scope.behavior( - connection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)), + localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)), ); return { startTracks, @@ -497,7 +530,7 @@ export const createLocalMembership$ = ({ sharingScreen$, toggleScreenSharing, participant$, - connection$, + connection$: localConnection$, }; }; diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index ec7906aa..2480a832 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -14,7 +14,6 @@ import { onTestFinished, vi, } from "vitest"; -import { BehaviorSubject } from "rxjs"; import { type LocalParticipant, type RemoteParticipant, @@ -25,11 +24,9 @@ import { import fetchMock from "fetch-mock"; import EventEmitter from "events"; import { type IOpenIDToken } from "matrix-js-sdk"; +import { logger } from "matrix-js-sdk/lib/logger"; -import type { - CallMembership, - LivekitTransport, -} from "matrix-js-sdk/lib/matrixrtc"; +import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { Connection, type ConnectionOpts, @@ -39,6 +36,7 @@ import { import { ObservableScope } from "../../ObservableScope.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; import { FailToGetOpenIdToken } from "../../../utils/errors.ts"; + let testScope: ObservableScope; let client: MockedObject; @@ -49,9 +47,9 @@ let localParticipantEventEmiter: EventEmitter; let fakeLocalParticipant: MockedObject; let fakeRoomEventEmiter: EventEmitter; -let fakeMembershipsFocusMap$: BehaviorSubject< - { membership: CallMembership; transport: LivekitTransport }[] ->; +// let fakeMembershipsFocusMap$: BehaviorSubject< +// { membership: CallMembership; transport: LivekitTransport }[] +// >; const livekitFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", @@ -70,9 +68,9 @@ function setupTest(): void { }), getDeviceId: vi.fn().mockReturnValue("ABCDEF"), } as unknown as OpenIDClientParts); - fakeMembershipsFocusMap$ = new BehaviorSubject< - { membership: CallMembership; transport: LivekitTransport }[] - >([]); + // fakeMembershipsFocusMap$ = new BehaviorSubject< + // { membership: CallMembership; transport: LivekitTransport }[] + // >([]); localParticipantEventEmiter = new EventEmitter(); @@ -131,7 +129,7 @@ function setupRemoteConnection(): Connection { fakeLivekitRoom.connect.mockResolvedValue(undefined); - return new Connection(opts); + return new Connection(opts, logger); } afterEach(() => { @@ -150,7 +148,7 @@ describe("Start connection states", () => { scope: testScope, livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new Connection(opts); + const connection = new Connection(opts, logger); expect(connection.state$.getValue().state).toEqual("Initialized"); }); @@ -166,7 +164,7 @@ describe("Start connection states", () => { livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new Connection(opts, undefined); + const connection = new Connection(opts, logger); const capturedStates: ConnectionState[] = []; const s = connection.state$.subscribe((value) => { @@ -218,7 +216,7 @@ describe("Start connection states", () => { livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new Connection(opts, undefined); + const connection = new Connection(opts, logger); const capturedStates: ConnectionState[] = []; const s = connection.state$.subscribe((value) => { @@ -274,7 +272,7 @@ describe("Start connection states", () => { livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new Connection(opts, undefined); + const connection = new Connection(opts, logger); const capturedStates: ConnectionState[] = []; const s = connection.state$.subscribe((value) => { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 454ae9fe..44ce4972 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -98,6 +98,7 @@ export class Connection { // TODO dont make this throw and instead store a connection error state in this class? // TODO consider an autostart pattern... public async start(): Promise { + this.logger.debug("Starting Connection"); this.stopped = false; try { this._state$.next({ @@ -145,6 +146,7 @@ export class Connection { livekitConnectionState$: connectionStateObserver(this.livekitRoom), }); } catch (error) { + this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ state: "FailedToStart", error: error instanceof Error ? error : new Error(`${error}`), @@ -169,6 +171,9 @@ export class Connection { * If the connection is already stopped, this is a no-op. */ public async stop(): Promise { + this.logger.debug( + `Stopping connection to ${this.transport.livekit_service_url}`, + ); if (this.stopped) return; await this.livekitRoom.disconnect(); this._state$.next({ @@ -195,15 +200,18 @@ export class Connection { private readonly client: OpenIDClientParts; public readonly livekitRoom: LivekitRoom; + private readonly logger: Logger; + /** * Creates a new connection to a matrix RTC LiveKit backend. * - * @param livekitRoom - LiveKit room instance to use. * @param opts - Connection options {@link ConnectionOpts}. * + * @param logger */ - public constructor(opts: ConnectionOpts, logger?: Logger) { - logger?.info( + public constructor(opts: ConnectionOpts, logger: Logger) { + this.logger = logger.getChild("[Connection]"); + this.logger.info( `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, ); const { transport, client, scope } = opts; @@ -223,15 +231,17 @@ export class Connection { ], }).pipe( map((participants) => { - const partsFiltered = participants.filter( + return participants.filter( (participant) => participant.getTrackPublications().length > 0, ); - return partsFiltered; }), ), [], ); - scope.onEnd(() => void this.stop()); + scope.onEnd(() => { + this.logger.info(`Connection scope ended, stopping connection`); + void this.stop(); + }); } } diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index 97b6c2fd..5887442c 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -9,6 +9,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { BehaviorSubject } from "rxjs"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Participant as LivekitParticipant } from "livekit-client"; +import { logger } from "matrix-js-sdk/lib/logger"; import { Epoch, ObservableScope } from "../../ObservableScope.ts"; import { @@ -78,6 +79,7 @@ describe("connections$ stream", () => { inputTransports$: behavior("a", { a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0), }), + logger: logger, }); expectObservable(connections$).toBe("a", { @@ -119,6 +121,7 @@ describe("connections$ stream", () => { e: new Epoch([TRANSPORT_1], 4), f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5), }), + logger: logger, }); expectObservable(connections$).toBe("xxxxxa", { @@ -158,6 +161,7 @@ describe("connections$ stream", () => { b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1), c: new Epoch([TRANSPORT_1], 2), }), + logger: logger, }); expectObservable(connections$).toBe("xab", { @@ -272,6 +276,7 @@ describe("connectionManagerData$ stream", () => { inputTransports$: behavior("a", { a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0), }), + logger, }); expectObservable(connectionManagerData$).toBe("abcd", { diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 32d42d75..cc661678 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -13,8 +13,8 @@ import { type LivekitTransport, type ParticipantId, } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, combineLatest, map, of, switchMap } from "rxjs"; -import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; +import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs"; +import { type Logger } from "matrix-js-sdk/lib/logger"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type Behavior } from "../../Behavior.ts"; @@ -91,6 +91,7 @@ interface Props { scope: ObservableScope; connectionFactory: ConnectionFactory; inputTransports$: Behavior>; + logger: Logger; } // TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { @@ -116,8 +117,9 @@ export function createConnectionManager$({ scope, connectionFactory, inputTransports$, + logger: _logger, }: Props): IConnectionManager { - const logger = rootLogger.getChild("ConnectionManager"); + const logger = _logger.getChild("[ConnectionManager]"); const running$ = new BehaviorSubject(true); scope.onEnd(() => running$.next(false)); @@ -137,6 +139,11 @@ export function createConnectionManager$({ transports.mapInner((transport) => (running ? transport : [])), ), map((transports) => transports.mapInner(removeDuplicateTransports)), + tap(({ value: transports }) => { + logger.trace( + `Managing transports: ${transports.map((t) => t.livekit_service_url).join(", ")}`, + ); + }), ), ); @@ -154,6 +161,7 @@ export function createConnectionManager$({ }; }, (scope, _data$, serviceUrl, alias) => { + logger.debug(`Creating connection to ${serviceUrl} (${alias})`); const connection = connectionFactory.createConnection( { type: "livekit", diff --git a/src/state/CallViewModel/remoteMembers/integration.test.ts b/src/state/CallViewModel/remoteMembers/integration.test.ts index 6115694d..e3aa6be8 100644 --- a/src/state/CallViewModel/remoteMembers/integration.test.ts +++ b/src/state/CallViewModel/remoteMembers/integration.test.ts @@ -11,6 +11,7 @@ import { type Room as LivekitRoom } from "livekit-client"; import EventEmitter from "events"; import fetchMock from "fetch-mock"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; +import { logger } from "matrix-js-sdk/lib/logger"; import { type Epoch, @@ -120,6 +121,7 @@ test("bob, carl, then bob joining no tracks yet", () => { scope: testScope, connectionFactory: ecConnectionFactory, inputTransports$: membershipsAndTransports.transports$, + logger: logger, }); const matrixLivekitItems$ = createMatrixLivekitMembers$({ diff --git a/src/state/ObservableScope.test.ts b/src/state/ObservableScope.test.ts index 36f6f308..e6c1249d 100644 --- a/src/state/ObservableScope.test.ts +++ b/src/state/ObservableScope.test.ts @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { describe, expect, it } from "vitest"; -import { BehaviorSubject, combineLatest, of, Subject } from "rxjs"; +import { BehaviorSubject, combineLatest, Subject } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 5372246c..b9fdc8fd 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -19,9 +19,9 @@ import { take, takeUntil, } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "./Behavior"; -import { logger } from "matrix-js-sdk/lib/logger"; type MonoTypeOperator = (o: Observable) => Observable;