Adding more logs

This commit is contained in:
Valere
2025-11-13 16:41:32 +01:00
parent 0115242a2b
commit 03bec6f904
9 changed files with 131 additions and 73 deletions

View File

@@ -227,6 +227,7 @@ export class CallViewModel {
scope: this.scope,
connectionFactory: this.connectionFactory,
inputTransports$: this.allTransports$,
logger: logger,
});
// ------------------------------------------------------------------------
@@ -259,6 +260,7 @@ export class CallViewModel {
trackProcessorState$: this.trackProcessorState$,
widget,
options: this.connectOptions$,
logger: logger.getChild(`[${Date.now()}]`),
});
private localRtcMembership$ = this.scope.behavior(

View File

@@ -22,6 +22,7 @@ import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk";
import {
BehaviorSubject,
combineLatest,
distinctUntilChanged,
fromEvent,
map,
type Observable,
@@ -29,8 +30,9 @@ import {
scan,
startWith,
switchMap,
tap,
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Logger, logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
@@ -61,6 +63,7 @@ export enum LivekitState {
Disconnected = "disconnected",
Disconnecting = "disconnecting",
}
type LocalMemberLivekitState =
| { state: LivekitState.Error; error: string }
| { state: LivekitState.Connected }
@@ -74,6 +77,7 @@ export enum MatrixState {
Disconnected = "disconnected",
Connecting = "connecting",
}
type LocalMemberMatrixState =
| { state: MatrixState.Connected }
| { state: MatrixState.Connecting }
@@ -106,6 +110,7 @@ interface Props {
localTransport$: Behavior<LivekitTransport | null>;
trackProcessorState$: Behavior<ProcessorState>;
widget: WidgetHelpers | null;
logger: Logger;
}
/**
@@ -132,6 +137,7 @@ export const createLocalMembership$ = ({
matrixRoom,
trackProcessorState$,
widget,
logger,
}: Props): {
// publisher: Publisher
requestConnect: () => LocalMemberConnectionState;
@@ -157,6 +163,8 @@ export const createLocalMembership$ = ({
/** @deprecated use state instead*/
configError$: Behavior<ElementCallError | null>;
} => {
const prefixLogger = logger.getChild("[LocalMembership]");
prefixLogger.debug(`Creating local membership..`);
const state = {
livekit$: new BehaviorSubject<LocalMemberLivekitState>({
state: LivekitState.Uninitialized,
@@ -178,49 +186,73 @@ export const createLocalMembership$ = ({
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
// Drop Epoch data here since we will not combine this anymore
const connection$ = scope.behavior(
combineLatest(
[connectionManager.connections$, localTransport$],
(connections, transport) => {
if (transport === null) return null;
return (
connections.value.find((connection) =>
areLivekitTransportsEqual(connection.transport, transport),
) ?? null
);
},
),
const localConnection$ = scope.behavior(
combineLatest([connectionManager.connections$, localTransport$])
.pipe(
map(([connections, localTransport]) => {
if (localTransport === null) {
return null;
}
return (
connections.value.find((connection) =>
areLivekitTransportsEqual(connection.transport, localTransport),
) ?? null
);
}),
)
.pipe(
distinctUntilChanged((a, b) => {
const eq = a === b;
logger.debug(
`distinctUntilChanged: Local connection equality check: ${eq}`,
);
return eq;
}),
)
.pipe(
tap((connection) => {
prefixLogger.info(
`Local connection updated: ${connection?.transport?.livekit_service_url}`,
);
}),
),
);
/**
* Whether we are connected to the MatrixRTC session.
*/
const homeserverConnected$ = scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
const homeserverConnected$ = scope
.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
),
);
)
.pipe(
tap((connected) => {
prefixLogger.info(`Homeserver connected update: ${connected}`);
}),
);
// /**
// * Whether we are "fully" connected to the call. Accounts for both the
@@ -230,7 +262,7 @@ export const createLocalMembership$ = ({
const connected$ = scope.behavior(
and$(
homeserverConnected$,
connection$.pipe(
localConnection$.pipe(
switchMap((c) =>
c
? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom"))
@@ -241,8 +273,9 @@ export const createLocalMembership$ = ({
);
const publisher$ = new BehaviorSubject<Publisher | null>(null);
connection$.subscribe((connection) => {
localConnection$.subscribe((connection) => {
if (connection !== null && publisher$.value === null) {
// TODO looks strange to not change publisher if connection changes.
publisher$.next(
new Publisher(
scope,
@@ -366,7 +399,7 @@ export const createLocalMembership$ = ({
// We use matrixConnected$ rather than reconnecting$ because we want to
// pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen.
combineLatest([connection$, homeserverConnected$])
combineLatest([localConnection$, homeserverConnected$])
.pipe(scope.bind())
.subscribe(([connection, connected]) => {
if (connection?.state$.value.state !== "ConnectedToLkRoom") return;
@@ -451,7 +484,7 @@ export const createLocalMembership$ = ({
* Whether the user is currently sharing their screen.
*/
const sharingScreen$ = scope.behavior(
connection$.pipe(
localConnection$.pipe(
switchMap((c) =>
c === null
? of(false)
@@ -472,7 +505,7 @@ export const createLocalMembership$ = ({
// We also allow screen sharing to be toggled even if the connection
// is still initializing or publishing tracks, because there's no
// technical reason to disallow this. LiveKit will publish if it can.
void connection$.value?.livekitRoom.localParticipant
void localConnection$.value?.livekitRoom.localParticipant
.setScreenShareEnabled(!sharingScreen$.value, {
audio: true,
selfBrowserSurface: "include",
@@ -483,7 +516,7 @@ export const createLocalMembership$ = ({
: null;
const participant$ = scope.behavior(
connection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
);
return {
startTracks,
@@ -497,7 +530,7 @@ export const createLocalMembership$ = ({
sharingScreen$,
toggleScreenSharing,
participant$,
connection$,
connection$: localConnection$,
};
};

View File

@@ -14,7 +14,6 @@ import {
onTestFinished,
vi,
} from "vitest";
import { BehaviorSubject } from "rxjs";
import {
type LocalParticipant,
type RemoteParticipant,
@@ -25,11 +24,9 @@ import {
import fetchMock from "fetch-mock";
import EventEmitter from "events";
import { type IOpenIDToken } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import type {
CallMembership,
LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import {
Connection,
type ConnectionOpts,
@@ -39,6 +36,7 @@ import {
import { ObservableScope } from "../../ObservableScope.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../../../utils/errors.ts";
let testScope: ObservableScope;
let client: MockedObject<OpenIDClientParts>;
@@ -49,9 +47,9 @@ let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>;
let fakeRoomEventEmiter: EventEmitter;
let fakeMembershipsFocusMap$: BehaviorSubject<
{ membership: CallMembership; transport: LivekitTransport }[]
>;
// let fakeMembershipsFocusMap$: BehaviorSubject<
// { membership: CallMembership; transport: LivekitTransport }[]
// >;
const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org",
@@ -70,9 +68,9 @@ function setupTest(): void {
}),
getDeviceId: vi.fn().mockReturnValue("ABCDEF"),
} as unknown as OpenIDClientParts);
fakeMembershipsFocusMap$ = new BehaviorSubject<
{ membership: CallMembership; transport: LivekitTransport }[]
>([]);
// fakeMembershipsFocusMap$ = new BehaviorSubject<
// { membership: CallMembership; transport: LivekitTransport }[]
// >([]);
localParticipantEventEmiter = new EventEmitter();
@@ -131,7 +129,7 @@ function setupRemoteConnection(): Connection {
fakeLivekitRoom.connect.mockResolvedValue(undefined);
return new Connection(opts);
return new Connection(opts, logger);
}
afterEach(() => {
@@ -150,7 +148,7 @@ describe("Start connection states", () => {
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new Connection(opts);
const connection = new Connection(opts, logger);
expect(connection.state$.getValue().state).toEqual("Initialized");
});
@@ -166,7 +164,7 @@ describe("Start connection states", () => {
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new Connection(opts, undefined);
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
@@ -218,7 +216,7 @@ describe("Start connection states", () => {
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new Connection(opts, undefined);
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
@@ -274,7 +272,7 @@ describe("Start connection states", () => {
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new Connection(opts, undefined);
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {

View File

@@ -98,6 +98,7 @@ export class Connection {
// TODO dont make this throw and instead store a connection error state in this class?
// TODO consider an autostart pattern...
public async start(): Promise<void> {
this.logger.debug("Starting Connection");
this.stopped = false;
try {
this._state$.next({
@@ -145,6 +146,7 @@ export class Connection {
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
});
} catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
@@ -169,6 +171,9 @@ export class Connection {
* If the connection is already stopped, this is a no-op.
*/
public async stop(): Promise<void> {
this.logger.debug(
`Stopping connection to ${this.transport.livekit_service_url}`,
);
if (this.stopped) return;
await this.livekitRoom.disconnect();
this._state$.next({
@@ -195,15 +200,18 @@ export class Connection {
private readonly client: OpenIDClientParts;
public readonly livekitRoom: LivekitRoom;
private readonly logger: Logger;
/**
* Creates a new connection to a matrix RTC LiveKit backend.
*
* @param livekitRoom - LiveKit room instance to use.
* @param opts - Connection options {@link ConnectionOpts}.
*
* @param logger
*/
public constructor(opts: ConnectionOpts, logger?: Logger) {
logger?.info(
public constructor(opts: ConnectionOpts, logger: Logger) {
this.logger = logger.getChild("[Connection]");
this.logger.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope } = opts;
@@ -223,15 +231,17 @@ export class Connection {
],
}).pipe(
map((participants) => {
const partsFiltered = participants.filter(
return participants.filter(
(participant) => participant.getTrackPublications().length > 0,
);
return partsFiltered;
}),
),
[],
);
scope.onEnd(() => void this.stop());
scope.onEnd(() => {
this.logger.info(`Connection scope ended, stopping connection`);
void this.stop();
});
}
}

View File

@@ -9,6 +9,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, ObservableScope } from "../../ObservableScope.ts";
import {
@@ -78,6 +79,7 @@ describe("connections$ stream", () => {
inputTransports$: behavior("a", {
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}),
logger: logger,
});
expectObservable(connections$).toBe("a", {
@@ -119,6 +121,7 @@ describe("connections$ stream", () => {
e: new Epoch([TRANSPORT_1], 4),
f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5),
}),
logger: logger,
});
expectObservable(connections$).toBe("xxxxxa", {
@@ -158,6 +161,7 @@ describe("connections$ stream", () => {
b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1),
c: new Epoch([TRANSPORT_1], 2),
}),
logger: logger,
});
expectObservable(connections$).toBe("xab", {
@@ -272,6 +276,7 @@ describe("connectionManagerData$ stream", () => {
inputTransports$: behavior("a", {
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}),
logger,
});
expectObservable(connectionManagerData$).toBe("abcd", {

View File

@@ -13,8 +13,8 @@ import {
type LivekitTransport,
type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, map, of, switchMap } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../../Behavior.ts";
@@ -91,6 +91,7 @@ interface Props {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
logger: Logger;
}
// TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager {
@@ -116,8 +117,9 @@ export function createConnectionManager$({
scope,
connectionFactory,
inputTransports$,
logger: _logger,
}: Props): IConnectionManager {
const logger = rootLogger.getChild("ConnectionManager");
const logger = _logger.getChild("[ConnectionManager]");
const running$ = new BehaviorSubject(true);
scope.onEnd(() => running$.next(false));
@@ -137,6 +139,11 @@ export function createConnectionManager$({
transports.mapInner((transport) => (running ? transport : [])),
),
map((transports) => transports.mapInner(removeDuplicateTransports)),
tap(({ value: transports }) => {
logger.trace(
`Managing transports: ${transports.map((t) => t.livekit_service_url).join(", ")}`,
);
}),
),
);
@@ -154,6 +161,7 @@ export function createConnectionManager$({
};
},
(scope, _data$, serviceUrl, alias) => {
logger.debug(`Creating connection to ${serviceUrl} (${alias})`);
const connection = connectionFactory.createConnection(
{
type: "livekit",

View File

@@ -11,6 +11,7 @@ import { type Room as LivekitRoom } from "livekit-client";
import EventEmitter from "events";
import fetchMock from "fetch-mock";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import {
type Epoch,
@@ -120,6 +121,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
scope: testScope,
connectionFactory: ecConnectionFactory,
inputTransports$: membershipsAndTransports.transports$,
logger: logger,
});
const matrixLivekitItems$ = createMatrixLivekitMembers$({

View File

@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
*/
import { describe, expect, it } from "vitest";
import { BehaviorSubject, combineLatest, of, Subject } from "rxjs";
import { BehaviorSubject, combineLatest, Subject } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import {

View File

@@ -19,9 +19,9 @@ import {
take,
takeUntil,
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "./Behavior";
import { logger } from "matrix-js-sdk/lib/logger";
type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>;