move HomeserverConnected

This commit is contained in:
Timo K
2025-11-21 13:04:28 +01:00
parent d34775fc47
commit 4099c4383d
9 changed files with 318 additions and 72 deletions

View File

@@ -122,7 +122,7 @@ export interface ConfigOptions {
delayed_leave_event_delay_ms?: number;
/**
* The time (in milliseconds) after which a we consider a delayed event restart http request to have failed.
* The time (in milliseconds) after which we consider a delayed event restart http request to have failed.
* Setting this to a lower value will result in more frequent retries but also a higher chance of failiour.
*
* In the presence of network packet loss (hurting TCP connections), the custom delayedEventRestartLocalTimeoutMs

View File

@@ -22,11 +22,13 @@ export type OpenIDClientParts = Pick<
"getOpenIdToken" | "getDeviceId"
>;
/**
*
* Gets a bearer token from the homeserver and then use it to authenticate
* to the matrix RTC backend in order to get acces to the SFU.
* It has built-in retry for calls to the homeserver with a backoff policy.
* @param client
* @param serviceUrl
* @param matrixRoomId
* @returns
* @returns Object containing the token information
* @throws FailToGetOpenIdToken
*/
export async function getSFUConfigWithOpenID(

View File

@@ -54,7 +54,7 @@ const ErrorPage: FC<ErrorPageProps> = ({
widget,
}: ErrorPageProps): ReactElement => {
const { t } = useTranslation();
logger.log("Error boundary caught:", error);
logger.error("Error boundary caught:", error);
let icon: ComponentType<SVGAttributes<SVGElement>>;
switch (error.category) {
case ErrorCategory.CONFIGURATION_ISSUE:

View File

@@ -97,8 +97,8 @@ import {
} from "../layout-types.ts";
import { type ElementCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts";
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
import {
createHomeserverConnected$,
createLocalMembership$,
enterRTCSession,
LivekitState,
@@ -365,9 +365,9 @@ export function createCallViewModel$(
reactionsSubject$: Observable<Record<string, ReactionInfo>>,
trackProcessorState$: Behavior<ProcessorState>,
): CallViewModel {
const userId = matrixRoom.client.getUserId()!;
const deviceId = matrixRoom.client.getDeviceId()!;
const client = matrixRoom.client;
const userId = client.getUserId()!;
const deviceId = client.getDeviceId()!;
const livekitKeyProvider = getE2eeKeyProvider(
options.encryptionSystem,
matrixRTCSession,
@@ -401,7 +401,7 @@ export function createCallViewModel$(
const localTransport$ = createLocalTransport$({
scope: scope,
memberships$: memberships$,
client: matrixRoom.client,
client,
roomId: matrixRoom.roomId,
useOldestMember$: scope.behavior(
matrixRTCMode.value$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
@@ -409,7 +409,7 @@ export function createCallViewModel$(
});
const connectionFactory = new ECConnectionFactory(
matrixRoom.client,
client,
mediaDevices,
trackProcessorState$,
livekitKeyProvider,
@@ -456,7 +456,7 @@ export function createCallViewModel$(
scope: scope,
homeserverConnected$: createHomeserverConnected$(
scope,
matrixRoom,
client,
matrixRTCSession,
),
muteStates: muteStates,

View File

@@ -0,0 +1,202 @@
/*
Copyright 2025 Element Creations Ltd.
Copyright 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { EventEmitter } from "events";
import { ClientEvent, SyncState } from "matrix-js-sdk";
import { MembershipManagerEvent, Status } from "matrix-js-sdk/lib/matrixrtc";
import { ObservableScope } from "../../ObservableScope";
import { createHomeserverConnected$ } from "./HomeserverConnected";
/**
* Minimal stub of a Matrix client sufficient for our tests:
```
createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
)
```
*/
class MockMatrixClient extends EventEmitter {
private syncState: SyncState;
public constructor(initial: SyncState) {
super();
this.syncState = initial;
}
public setSyncState(state: SyncState): void {
this.syncState = state;
// Matrix's Sync event in createHomeserverConnected$ expects [SyncState]
this.emit(ClientEvent.Sync, [state]);
}
public getSyncState(): SyncState {
return this.syncState;
}
}
/**
* Minimal stub of MatrixRTCSession (membership manager):
```
createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
)
```
*/
class MockMatrixRTCSession extends EventEmitter {
public membershipStatus: Status;
public probablyLeft: boolean;
public constructor(props: {
membershipStatus: Status;
probablyLeft: boolean;
}) {
super();
this.membershipStatus = props.membershipStatus;
this.probablyLeft = props.probablyLeft;
}
public setMembershipStatus(status: Status): void {
this.membershipStatus = status;
this.emit(MembershipManagerEvent.StatusChanged);
}
public setProbablyLeft(flag: boolean): void {
this.probablyLeft = flag;
this.emit(MembershipManagerEvent.ProbablyLeft);
}
}
describe("createHomeserverConnected$", () => {
let scope: ObservableScope;
let client: MockMatrixClient;
let session: MockMatrixRTCSession;
beforeEach(() => {
scope = new ObservableScope();
client = new MockMatrixClient(SyncState.Error); // start disconnected
session = new MockMatrixRTCSession({
membershipStatus: Status.Disconnected,
probablyLeft: false,
});
});
afterEach(() => {
scope.end();
});
// LLM generated test cases. They are a bit overkill but I improved the mocking so it is
// easy enough to read them so I think they can stay.
it("is false when sync state is not Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
expect(hsConnected$.value).toBe(false);
});
it("remains false while membership status is not Connected even if sync is Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // membership still disconnected
});
it("is false when membership status transitions to Connected but ProbablyLeft is true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Make sync loop OK
client.setSyncState(SyncState.Syncing);
// Indicate probable leave before connection
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(false);
});
it("becomes true only when all three conditions are satisfied", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// 1. Sync loop connected
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // not yet membership connected
// 2. Membership connected
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); // probablyLeft is false
});
it("drops back to false when sync loop leaves Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Reach connected state
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
// Sync loop error => should flip false
client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false);
});
it("drops back to false when membership status becomes disconnected", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setMembershipStatus(Status.Disconnected);
expect(hsConnected$.value).toBe(false);
});
it("drops to false when ProbablyLeft is emitted after being true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
});
it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
// Simulate clearing the flag (in realistic scenario membership manager would update)
session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true);
});
it("composite sequence reflects each individual failure reason", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Initially false (sync error + disconnected + not probably left)
expect(hsConnected$.value).toBe(false);
// Fix sync only
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false);
// Fix membership
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
// Introduce probablyLeft -> false
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
// Restore notProbablyLeft -> true again
session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true);
// Drop sync -> false
client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false);
});
});

View File

@@ -0,0 +1,85 @@
/*
Copyright 2025 Element Creations Ltd.
Copyright 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
MembershipManagerEvent,
Status,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { ClientEvent, type MatrixClient, SyncState } from "matrix-js-sdk";
import { fromEvent, startWith, map, tap, type Observable } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type ObservableScope } from "../../ObservableScope";
import { type Behavior } from "../../Behavior";
import { and$ } from "../../../utils/observable";
import { type NodeStyleEventEmitter } from "../../../utils/test";
/**
* Logger instance (scoped child) for homeserver connection updates.
*/
const logger = rootLogger.getChild("[HomeserverConnected]");
/**
* Behavior representing whether we consider ourselves connected to the Matrix homeserver
* for the purposes of a MatrixRTC session.
*
* Becomes FALSE if ANY sub-condition is fulfilled:
* 1. Sync loop is not in SyncState.Syncing
* 2. membershipStatus !== Status.Connected
* 3. probablyLeft === true
*/
export function createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
): Behavior<boolean> {
const syncing$ = (
fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]>
).pipe(
startWith([client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
);
const membershipConnected$ = fromEvent(
matrixRTCSession,
MembershipManagerEvent.StatusChanged,
).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
);
// This is basically notProbablyLeft$
//
// probablyLeft is computed by a local timer that mimics the server delayed event.
// If we locally predict our server event timed out. We consider ourselves as probablyLeft
// even though we might not yet have received the delayed event leave.
//
// If that is not the case we certainly still have a valid membership on the matrix network
// independet if the sync currently works.
const certainlyConnected$ = fromEvent(
matrixRTCSession,
MembershipManagerEvent.ProbablyLeft,
).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
);
const connectedCombined$ = and$(
syncing$,
membershipConnected$,
certainlyConnected$,
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
);
return scope.behavior(connectedCombined$);
}

View File

@@ -36,16 +36,6 @@ const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
vi.mock("../../../UrlParams", () => ({ getUrlParams }));
// vi.mock("../../../widget", async (importOriginal) => ({
// ...(await importOriginal()),
// widget: {
// api: {
// setAlwaysOnScreen: (): void => {},
// transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() },
// },
// lazyActions: new EventEmitter(),
// },
// }));
describe("LocalMembership", () => {
describe("enterRTCSession", () => {
it("It joins the correct Session", async () => {

View File

@@ -13,8 +13,6 @@ import {
} from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core";
import {
MembershipManagerEvent,
Status,
type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
@@ -23,17 +21,14 @@ import {
catchError,
combineLatest,
distinctUntilChanged,
fromEvent,
map,
type Observable,
of,
scan,
startWith,
switchMap,
tap,
} from "rxjs";
import { logger, type Logger } from "matrix-js-sdk/lib/logger";
import { ClientEvent, type Room, SyncState } from "matrix-js-sdk";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
@@ -185,11 +180,17 @@ export const createLocalMembership$ = ({
const localTransport$ = scope.behavior(
localTransportCanThrow$.pipe(
catchError((e: unknown) => {
let error: ElementCallError;
if (e instanceof ElementCallError) {
state.livekit$.next({ state: LivekitState.Error, error: e });
error = e;
} else {
logger.error("Unknown error from localTransport$", e);
error = new UnknownCallError(
e instanceof Error
? e
: new Error("Unknown error from localTransport"),
);
}
state.livekit$.next({ state: LivekitState.Error, error });
return of(null);
}),
),
@@ -238,7 +239,7 @@ export const createLocalMembership$ = ({
localConnection$.pipe(scope.bind()).subscribe((connection) => {
if (connection !== null && publisher$.value === null) {
// TODO looks strange to not change publisher if connection changes.
// @valere will take care of this!
// @toger5 will take care of this!
publisher$.next(createPublisherFactory(connection));
}
});
@@ -492,7 +493,7 @@ export const createLocalMembership$ = ({
const sharingScreen$ = scope.behavior(
localConnection$.pipe(
switchMap((c) =>
c !== null && c.livekitRoom
c !== null
? observeSharingScreen$(c.livekitRoom.localParticipant)
: of(false),
),
@@ -615,44 +616,3 @@ export async function enterRTCSession(
await widget.api.transport.send(ElementWidgetActions.JoinCall, {});
}
}
/**
* Whether we are connected to the MatrixRTC session.
*/
export function createHomeserverConnected$(
scope: ObservableScope,
matrixRoom: Room,
matrixRTCSession: MatrixRTCSession,
): Behavior<boolean> {
return scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
),
);
}

View File

@@ -79,6 +79,13 @@ export async function flushPromises(): Promise<void> {
await new Promise<void>((resolve) => window.setTimeout(resolve));
}
export type NodeEventHandler = (...args: unknown[]) => void;
export interface NodeStyleEventEmitter {
addListener(eventName: string | symbol, handler: NodeEventHandler): this;
removeListener(eventName: string | symbol, handler: NodeEventHandler): this;
}
export interface OurRunHelpers extends RunHelpers {
/**
* Schedules a sequence of actions to happen, as described by a marble