Merge branch 'livekit' into one-on-one-portrait

This commit is contained in:
Robin
2026-05-15 11:28:47 +02:00
11 changed files with 578 additions and 58 deletions

View File

@@ -79,6 +79,11 @@ export default defineConfig({
firefoxUserPrefs: {
"permissions.default.microphone": 1,
"permissions.default.camera": 1,
// Equivalent to Chromium's --use-fake-device-for-media-stream:
// feeds a synthetic media stream so getUserMedia and
// enumerateDevices work on CI runners without real hardware.
"media.navigator.streams.fake": true,
"media.navigator.permission.disabled": true,
},
},
},

View File

@@ -73,6 +73,9 @@ test("BugFix: When unmuting in lobby, you had to click twice to unmute in call",
const microphoneButton = page.getByTestId("incall_mute");
const cameraButton = page.getByTestId("incall_videomute");
// Wait for devices to enumerate before the button enables.
await expect(microphoneButton).toBeEnabled({ timeout: 10_000 });
await microphoneButton.click();
await cameraButton.click();

View File

@@ -26,6 +26,7 @@ import {
QualitySurveyEventTracker,
CallDisconnectedEventTracker,
CallConnectDurationTracker,
CallReconnectingTracker,
} from "./PosthogEvents";
import { Config } from "../config/Config";
import { getUrlParams } from "../UrlParams";
@@ -421,4 +422,5 @@ export class PosthogAnalytics {
public eventQualitySurvey = new QualitySurveyEventTracker();
public eventCallDisconnected = new CallDisconnectedEventTracker();
public eventCallConnectDuration = new CallConnectDurationTracker();
public eventCallReconnecting = new CallReconnectingTracker();
}

View File

@@ -18,7 +18,11 @@ import { logger } from "matrix-js-sdk/lib/logger";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import { PosthogAnalytics } from "./PosthogAnalytics";
import { CallEndedTracker } from "./PosthogEvents";
import {
CallEndedTracker,
CallReconnectingTracker,
type CallReconnectingReason,
} from "./PosthogEvents";
import { mockConfig } from "../utils/test";
const defaultCounters = {
@@ -89,6 +93,11 @@ describe("CallEnded", () => {
roomEventEncryptionKeysSent: 10,
roomEventEncryptionKeysReceived: 5,
roomEventEncryptionKeysReceivedAverageAge: 100,
callReconnectingCount: 0,
callReconnectingCountSync: 0,
callReconnectingCountMembership: 0,
callReconnectingCountProbablyLeft: 0,
callReconnectingCountLivekit: 0,
},
{ send_instantly: true },
);
@@ -159,4 +168,70 @@ describe("CallEnded", () => {
{ send_instantly: false },
);
});
it("includes per-reason reconnecting counts in CallEnded", () => {
const tracker = new CallEndedTracker();
const mockSession = createMockRtcSession();
tracker.cacheStartCall(new Date());
tracker.cacheReconnecting("sync");
tracker.cacheReconnecting("sync");
tracker.cacheReconnecting("livekit");
tracker.cacheReconnecting("membership");
tracker.track("test-call-id", 1, false, mockSession);
expect(PosthogAnalytics.instance.trackEvent).toHaveBeenCalledWith(
expect.objectContaining({
callReconnectingCount: 4,
callReconnectingCountSync: 2,
callReconnectingCountMembership: 1,
callReconnectingCountProbablyLeft: 0,
callReconnectingCountLivekit: 1,
}),
expect.anything(),
);
});
});
describe("CallReconnecting", () => {
beforeAll(() => {
mockConfig();
});
beforeEach(() => {
vi.restoreAllMocks();
vi.spyOn(PosthogAnalytics.instance, "trackEvent").mockImplementation(
() => {},
);
});
afterAll(() => {
PosthogAnalytics.resetInstance();
});
it("tracks event with correct shape", () => {
const tracker = new CallReconnectingTracker();
tracker.track("!room:example.org", "sync", 3.5);
expect(PosthogAnalytics.instance.trackEvent).toHaveBeenCalledWith({
eventName: "CallReconnecting",
callId: "!room:example.org",
reason: "sync",
reconnectDuration: 3.5,
});
});
it.each([
"sync",
"membership",
"probablyLeft",
"livekit",
] as CallReconnectingReason[])("tracks reason %s correctly", (reason) => {
const tracker = new CallReconnectingTracker();
tracker.track("!room:example.org", reason, 1.0);
expect(PosthogAnalytics.instance.trackEvent).toHaveBeenCalledWith(
expect.objectContaining({ reason, reconnectDuration: 1.0 }),
);
});
});

View File

@@ -17,6 +17,7 @@ import {
interface CallEnded extends IPosthogEvent {
eventName: "CallEnded";
// the callId posthog key is essentially a Matrix roomId
callId: string;
callParticipantsOnLeave: number;
callParticipantsMax: number;
@@ -24,16 +25,43 @@ interface CallEnded extends IPosthogEvent {
roomEventEncryptionKeysSent: number;
roomEventEncryptionKeysReceived: number;
roomEventEncryptionKeysReceivedAverageAge: number;
callReconnectingCount: number;
callReconnectingCountSync: number;
callReconnectingCountMembership: number;
callReconnectingCountProbablyLeft: number;
callReconnectingCountLivekit: number;
}
export class CallEndedTracker {
private cache: { startTime?: Date; maxParticipantsCount: number } = {
private cache: {
startTime?: Date;
maxParticipantsCount: number;
reconnectingCount: number;
reconnectingCountByReason: Record<CallReconnectingReason, number>;
} = {
startTime: undefined,
maxParticipantsCount: 0,
reconnectingCount: 0,
reconnectingCountByReason: {
sync: 0,
membership: 0,
probablyLeft: 0,
livekit: 0,
},
};
public cacheStartCall(time: Date): void {
this.cache.startTime = time;
this.cache = {
startTime: time,
maxParticipantsCount: 0,
reconnectingCount: 0,
reconnectingCountByReason: {
sync: 0,
membership: 0,
probablyLeft: 0,
livekit: 0,
},
};
}
public cacheParticipantCountChanged(count: number): void {
@@ -43,6 +71,11 @@ export class CallEndedTracker {
);
}
public cacheReconnecting(reason: CallReconnectingReason): void {
this.cache.reconnectingCount++;
this.cache.reconnectingCountByReason[reason]++;
}
public track(
callId: string,
callParticipantsNow: number,
@@ -67,6 +100,14 @@ export class CallEndedTracker {
.roomEventEncryptionKeysReceivedTotalAge /
rtcSession.statistics.counters.roomEventEncryptionKeysReceived
: 0,
callReconnectingCount: this.cache.reconnectingCount,
callReconnectingCountSync: this.cache.reconnectingCountByReason.sync,
callReconnectingCountMembership:
this.cache.reconnectingCountByReason.membership,
callReconnectingCountProbablyLeft:
this.cache.reconnectingCountByReason.probablyLeft,
callReconnectingCountLivekit:
this.cache.reconnectingCountByReason.livekit,
},
{ send_instantly: sendInstantly },
);
@@ -80,6 +121,7 @@ export class CallEndedTracker {
interface CallStarted extends IPosthogEvent {
eventName: "CallStarted";
// the callId posthog key is essentially a Matrix roomId
callId: string;
}
@@ -140,6 +182,7 @@ export class LoginTracker {
interface MuteMicrophone {
eventName: "MuteMicrophone";
targetMuteState: "mute" | "unmute";
// the callId posthog key is essentially a Matrix roomId
callId: string;
}
@@ -156,6 +199,7 @@ export class MuteMicrophoneTracker {
interface MuteCamera {
eventName: "MuteCamera";
targetMuteState: "mute" | "unmute";
// the callId posthog key is essentially a Matrix roomId
callId: string;
}
@@ -171,6 +215,7 @@ export class MuteCameraTracker {
interface UndecryptableToDeviceEvent {
eventName: "UndecryptableToDeviceEvent";
// the callId posthog key is essentially a Matrix roomId
callId: string;
}
@@ -185,6 +230,7 @@ export class UndecryptableToDeviceEventTracker {
interface QualitySurveyEvent {
eventName: "QualitySurvey";
// the callId posthog key is essentially a Matrix roomId
callId: string;
feedbackText: string;
stars: number;
@@ -249,3 +295,32 @@ export class CallConnectDurationTracker {
);
}
}
export type CallReconnectingReason =
| "sync"
| "membership"
| "probablyLeft"
| "livekit";
interface CallReconnecting extends IPosthogEvent {
eventName: "CallReconnecting";
// the callId posthog key is essentially a Matrix roomId
callId: string;
reason: CallReconnectingReason;
reconnectDuration: number;
}
export class CallReconnectingTracker {
public track(
callId: string,
reason: CallReconnectingReason,
reconnectDuration: number,
): void {
PosthogAnalytics.instance.trackEvent<CallReconnecting>({
eventName: "CallReconnecting",
callId,
reason,
reconnectDuration,
});
}
}

View File

@@ -564,6 +564,7 @@ export function createCallViewModel$(
connectionManager,
matrixRTCSession,
localTransport$,
roomId: matrixRoom.roomId,
logger: logger.getChild(`[${Date.now()}]`),
});

View File

@@ -98,108 +98,181 @@ describe("createHomeserverConnected$", () => {
// LLM generated test cases. They are a bit overkill but I improved the mocking so it is
// easy enough to read them so I think they can stay.
// Note: gracePeriodMs is set to 0 to avoid debouncing delays in tests
it("is false when sync state is not Syncing", () => {
it("reports syncing reason when sync state is not Syncing", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
it("remains false while membership status is not Connected even if sync is Syncing", () => {
it("reports membership reason when sync is Syncing but membership is not Connected", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe(false); // membership still disconnected
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
});
it("is false when membership status transitions to Connected but ProbablyLeft is true", () => {
it("reports probablyLeft reason when membership transitions to Connected but ProbablyLeft is true", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// Make sync loop OK
client.setSyncState(SyncState.Syncing);
// Indicate probable leave before connection
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
});
it("becomes true only when all three conditions are satisfied", () => {
it("becomes null (connected) only when all three conditions are satisfied", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// 1. Sync loop connected
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe(false); // not yet membership connected
expect(hsConnected.combined$.value).toEqual([false, "membership"]); // not yet membership connected
// 2. Membership connected
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true); // probablyLeft is false
expect(hsConnected.combined$.value).toEqual([true, null]); // probablyLeft is false
});
it("drops back to false when sync loop leaves Syncing", () => {
it("returns syncing reason when sync loop leaves Syncing", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// Reach connected state
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
// Sync loop error => should flip false
// Sync loop error => should report syncing reason
client.setSyncState(SyncState.Error);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
it("drops back to false when membership status becomes disconnected", () => {
it("returns membershipConnected reason when membership status becomes disconnected", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setMembershipStatus(Status.Disconnected);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
});
it("drops to false when ProbablyLeft is emitted after being true", () => {
it("returns certainlyConnected reason when ProbablyLeft is emitted", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
});
it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => {
it("recovers to null (connected) if ProbablyLeft becomes false again while other conditions remain true", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
// Simulate clearing the flag (in realistic scenario membership manager would update)
session.setProbablyLeft(false);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
});
it("composite sequence reflects each individual failure reason", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// Initially false (sync error + disconnected + not probably left)
expect(hsConnected.combined$.value).toBe(false);
// Initially: sync error + membership disconnected → syncing wins (highest priority)
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
// Fix sync only
// Fix sync only → membershipConnected is now the blocker
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
// Fix membership
// Fix membership → all conditions satisfied
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
// Introduce probablyLeft -> false
// Introduce probablyLeft → certainlyConnected
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
// Restore notProbablyLeft -> true again
// Restore notProbablyLeft → connected again
session.setProbablyLeft(false);
expect(hsConnected.combined$.value).toBe(true);
expect(hsConnected.combined$.value).toEqual([true, null]);
// Drop sync -> false
// Drop sync → syncing reason
client.setSyncState(SyncState.Error);
expect(hsConnected.combined$.value).toBe(false);
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
});
describe("createHomeserverConnected$ - combined$ reason values", () => {
let scope: ObservableScope;
let client: MockMatrixClient;
let session: MockMatrixRTCSession;
beforeEach(() => {
scope = new ObservableScope();
// Start with sync failing and membership disconnected
client = new MockMatrixClient(SyncState.Error);
session = new MockMatrixRTCSession({
membershipStatus: Status.Disconnected,
probablyLeft: false,
});
});
afterEach(() => {
scope.end();
});
it("is [true, null] when all three conditions are satisfied", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(combined$.value).toEqual([true, null]);
});
it("reports syncing when sync loop is not Syncing", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// client starts with SyncState.Error, membership also disconnected
expect(combined$.value).toEqual([false, "sync"]);
});
it("reports membershipConnected when sync is fine but membership is not Connected", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
// session still Status.Disconnected
expect(combined$.value).toEqual([false, "membership"]);
});
it("reports certainlyConnected when probablyLeft is true", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
session.setProbablyLeft(true);
expect(combined$.value).toEqual([false, "probablyLeft"]);
});
it("prioritises syncing over membershipConnected when both fail", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// Both sync (Error) and membership (Disconnected) are failing
expect(combined$.value).toEqual([false, "sync"]);
});
it("updates reason as conditions change", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// Initially: syncing fails
expect(combined$.value).toEqual([false, "sync"]);
// Fix sync → membershipConnected is now the blocker
client.setSyncState(SyncState.Syncing);
expect(combined$.value).toEqual([false, "membership"]);
// Fix membership → probablyLeft makes certainlyConnected fail
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(combined$.value).toEqual([false, "probablyLeft"]);
// Clear probablyLeft → all conditions satisfied
session.setProbablyLeft(false);
expect(combined$.value).toEqual([true, null]);
});
});
@@ -231,8 +304,8 @@ describe("createHomeserverConnected$ - Grace Period", () => {
GRACE_PERIOD,
);
expectObservable(hsConnected.combined$).toBe(expectedConnectedMarbles, {
y: true,
n: false,
y: [true, null],
n: [false, "sync"],
});
});
}

View File

@@ -22,13 +22,13 @@ import {
switchMap,
of,
delay,
combineLatest,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { Config } from "../../../config/Config";
import { type ObservableScope } from "../../ObservableScope";
import { type Behavior } from "../../Behavior";
import { and$ } from "../../../utils/observable";
import { type NodeStyleEventEmitter } from "../../../utils/test";
/**
@@ -36,8 +36,14 @@ import { type NodeStyleEventEmitter } from "../../../utils/test";
*/
const logger = rootLogger.getChild("[HomeserverConnected]");
export type HomeserverDisconnectReason = "sync" | "membership" | "probablyLeft";
export interface HomeserverConnected {
combined$: Behavior<boolean>;
/**
* Emits `[true, null]` when the homeserver connection is healthy, or
* `[false, reason]` when one of the three sub-conditions fails.
*/
combined$: Behavior<[boolean, HomeserverDisconnectReason | null]>;
rtsSession$: Behavior<Status>;
}
@@ -45,10 +51,11 @@ export interface HomeserverConnected {
* Behavior representing whether we consider ourselves connected to the Matrix homeserver
* for the purposes of a MatrixRTC session.
*
* Becomes FALSE if ANY sub-condition is fulfilled:
* 1. Sync loop is not in SyncState.Syncing (after grace period)
* 2. membershipStatus !== Status.Connected
* 3. probablyLeft === true
* `combined$` emits `null` when all conditions are satisfied, or the first failing
* reason (priority: syncing > membershipConnected > certainlyConnected):
* 1. Sync loop is not in SyncState.Syncing (after grace period) → "sync"
* 2. membershipStatus !== Status.Connected → "membership"
* 3. probablyLeft === true → "probablyLeft"
*
* @param scope - The observable scope for lifecycle management.
* @param client - The Matrix client to monitor sync state.
@@ -109,9 +116,22 @@ export function createHomeserverConnected$(
);
const combined$ = scope.behavior(
and$(syncing$, membershipConnected$, certainlyConnected$).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
combineLatest([syncing$, membershipConnected$, certainlyConnected$]).pipe(
map(
([syncing, membership, certainly]): [
boolean,
HomeserverDisconnectReason | null,
] => {
if (!syncing) return [false, "sync"];
if (!membership) return [false, "membership"];
if (!certainly) return [false, "probablyLeft"];
return [true, null];
},
),
tap(([connected, reason]) => {
logger.info(
`Homeserver connected update: ${connected ? "connected" : reason}`,
);
}),
),
);

View File

@@ -11,13 +11,23 @@ import {
type LivekitTransportConfig,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { describe, expect, it, vi } from "vitest";
import {
describe,
expect,
it,
vi,
beforeAll,
afterAll,
beforeEach,
} from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { BehaviorSubject, map, of } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type LocalTrack } from "livekit-client";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics";
import { MatrixRTCMode } from "../../../settings/settings";
import { type HomeserverDisconnectReason } from "./HomeserverConnected";
import {
flushPromises,
mockConfig,
@@ -215,9 +225,13 @@ describe("LocalMembership", () => {
createPublisherFactory: vi.fn(),
joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected: {
combined$: constant(true),
combined$: constant<[boolean, HomeserverDisconnectReason | null]>([
true,
null,
]),
rtsSession$: constant(RTCMemberStatus.Connected),
},
roomId: "!test-room-id:example.org",
};
it("throws error on missing RTC config error", () => {
@@ -667,4 +681,210 @@ describe("LocalMembership", () => {
// expect(publishers[0].stopTracks).toHaveBeenCalled();
});
// TODO add tests for matrix local matrix participation.
describe("reconnecting analytics", () => {
beforeAll(() => {
mockConfig();
});
beforeEach(() => {
vi.restoreAllMocks();
});
afterAll(() => {
PosthogAnalytics.resetInstance();
});
it("does not fire CallReconnecting for the initial non-connected state at startup", async () => {
const scope = new ObservableScope();
const trackSpy = vi.spyOn(
PosthogAnalytics.instance.eventCallReconnecting,
"track",
);
// Simulate startup where membership isn't established yet
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([false, "membership"]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
homeserverConnected: {
combined$: hsReason$,
rtsSession$: constant(RTCMemberStatus.Connected),
},
connectionManager: {
connectionManagerData$: constant(new Epoch(connectionManagerData)),
},
localTransport$: new BehaviorSubject({
advertised$: new BehaviorSubject(aTransport),
active$: new BehaviorSubject(aTransportWithSFUConfig),
}),
});
await flushPromises();
// Membership is established — call is now connected
hsReason$.next([true, null]);
expect(trackSpy).not.toHaveBeenCalled();
scope.end();
});
it("fires CallReconnecting with homeserver reason and duration when reconnected", async () => {
const scope = new ObservableScope();
const trackSpy = vi.spyOn(
PosthogAnalytics.instance.eventCallReconnecting,
"track",
);
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
homeserverConnected: {
combined$: hsReason$,
rtsSession$: constant(RTCMemberStatus.Connected),
},
connectionManager: {
connectionManagerData$: constant(new Epoch(connectionManagerData)),
},
localTransport$: new BehaviorSubject({
advertised$: new BehaviorSubject(aTransport),
active$: new BehaviorSubject(aTransportWithSFUConfig),
}),
});
await flushPromises();
hsReason$.next([false, "sync"]);
hsReason$.next([true, null]);
expect(trackSpy).toHaveBeenCalledWith(
defaultCreateLocalMemberValues.roomId,
"sync",
expect.any(Number),
);
scope.end();
});
it("reports livekit reason when livekit disconnects then reconnects", async () => {
const scope = new ObservableScope();
const trackSpy = vi.spyOn(
PosthogAnalytics.instance.eventCallReconnecting,
"track",
);
const connectionState$ = new BehaviorSubject<ConnectionState>(
ConnectionState.LivekitConnected,
);
const mutableConnection = {
...connectionTransportAConnected,
state$: connectionState$,
} as unknown as Connection;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(mutableConnection, []);
createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
homeserverConnected: {
combined$: new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]),
rtsSession$: constant(RTCMemberStatus.Connected),
},
connectionManager: {
connectionManagerData$: constant(new Epoch(connectionManagerData)),
},
localTransport$: new BehaviorSubject({
advertised$: new BehaviorSubject(aTransport),
active$: new BehaviorSubject(aTransportWithSFUConfig),
}),
});
await flushPromises();
connectionState$.next(ConnectionState.LivekitDisconnected);
connectionState$.next(ConnectionState.LivekitConnected);
expect(trackSpy).toHaveBeenCalledWith(
defaultCreateLocalMemberValues.roomId,
"livekit",
expect.any(Number),
);
scope.end();
});
it("fires one event per completed reconnection cycle", async () => {
const scope = new ObservableScope();
const trackSpy = vi.spyOn(
PosthogAnalytics.instance.eventCallReconnecting,
"track",
);
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
homeserverConnected: {
combined$: hsReason$,
rtsSession$: constant(RTCMemberStatus.Connected),
},
connectionManager: {
connectionManagerData$: constant(new Epoch(connectionManagerData)),
},
localTransport$: new BehaviorSubject({
advertised$: new BehaviorSubject(aTransport),
active$: new BehaviorSubject(aTransportWithSFUConfig),
}),
});
await flushPromises();
hsReason$.next([false, "membership"]);
hsReason$.next([true, null]);
hsReason$.next([false, "probablyLeft"]);
hsReason$.next([false, "sync"]);
hsReason$.next([false, "membership"]);
hsReason$.next([true, null]);
expect(trackSpy).toHaveBeenCalledTimes(2);
expect(trackSpy).toHaveBeenNthCalledWith(
1,
defaultCreateLocalMemberValues.roomId,
"membership",
expect.any(Number),
);
expect(trackSpy).toHaveBeenNthCalledWith(
2,
defaultCreateLocalMemberValues.roomId,
"probablyLeft",
expect.any(Number),
);
scope.end();
});
});
});

View File

@@ -61,7 +61,6 @@ import {
type FailedToStartError,
} from "../remoteMembers/Connection.ts";
import { type HomeserverConnected } from "./HomeserverConnected.ts";
import { and$ } from "../../../utils/observable.ts";
import { type LocalTransport } from "./LocalTransport.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
@@ -129,6 +128,7 @@ interface Props {
createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (transport: LivekitTransportConfig) => void;
homeserverConnected: HomeserverConnected;
roomId: string;
localTransport$: Behavior<LocalTransport>;
matrixRTCSession: Pick<
MatrixRTCSession,
@@ -152,6 +152,7 @@ interface Props {
* @param props.logger The logger to use.
* @param props.muteStates The mute states for video and audio.
* @param props.matrixRTCSession The matrix RTC session to join.
* @param props.roomId The room ID used as the call identifier in analytics events.
* @returns
* - publisher: The handle to create tracks and publish them to the room.
* - connected$: the current connection state. Including matrix server and livekit server connection. (only considering the livekit server we are using for our own media publication)
@@ -169,6 +170,7 @@ export const createLocalMembership$ = ({
logger: parentLogger,
muteStates,
matrixRTCSession,
roomId: roomId,
}: Props): {
/**
* This request to start audio and video tracks.
@@ -494,20 +496,35 @@ export const createLocalMembership$ = ({
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
* The disconnect reason for the combined Matrix + LiveKit connection, or null
* when fully connected. Homeserver reasons take priority over livekit.
* Both connectivity state and reason come from the same combineLatest emission,
* avoiding any race between the two.
*/
const matrixAndLivekitConnected$ = scope.behavior(
and$(
const connectionDisconnectReason$ = scope.behavior(
combineLatest([
homeserverConnected.combined$,
localConnectionState$.pipe(
map((state) => state === ConnectionState.LivekitConnected),
),
).pipe(
]).pipe(
map(([[hsConnected, hsReason], livekitConnected]) => {
if (!hsConnected) return hsReason!;
if (!livekitConnected) return "livekit" as const;
return null;
}),
tap((v) => logger.debug("livekit+matrix: Connected state changed", v)),
),
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
const matrixAndLivekitConnected$ = scope.behavior(
connectionDisconnectReason$.pipe(map((reason) => reason === null)),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
@@ -519,6 +536,33 @@ export const createLocalMembership$ = ({
false,
);
let reconnectStart: {
time: number;
reason: NonNullable<(typeof connectionDisconnectReason$)["value"]>;
} | null = null;
connectionDisconnectReason$
.pipe(distinctUntilChanged(), pairwise(), scope.bind())
.subscribe(([prev, reason]) => {
if (reason !== null) {
// Only begin tracking when transitioning FROM connected (null → non-null).
// This prevents the initial startup phase — where we may be non-null before
// the first real connection — from being counted as a reconnect.
if (prev === null) {
reconnectStart ??= { time: Date.now(), reason };
}
} else if (reconnectStart !== null) {
PosthogAnalytics.instance.eventCallReconnecting.track(
roomId,
reconnectStart.reason,
(Date.now() - reconnectStart.time) / 1000,
);
PosthogAnalytics.instance.eventCallEnded.cacheReconnecting(
reconnectStart.reason,
);
reconnectStart = null;
}
});
// inform the widget about the connect and disconnect intent from the user.
scope
.behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [
@@ -606,7 +650,7 @@ export const createLocalMembership$ = ({
// TODO refactor this based no livekitState$
combineLatest([participant$, homeserverConnected.combined$])
.pipe(scope.bind())
.subscribe(([participant, connected]) => {
.subscribe(([participant, [connected]]) => {
if (!participant) return;
const publications = participant.trackPublications.values();
if (connected) {

View File

@@ -116,6 +116,8 @@ export function getValue<T>(state$: Observable<T>): T {
/**
* Creates an Observable that has a value of true whenever all its inputs are
* true.
*
* @public
*/
export function and$(...inputs: Observable<boolean>[]): Observable<boolean> {
return combineLatest(inputs, (...flags) => flags.every((flag) => flag));