ErrorHandling: publish connection error handling

This commit is contained in:
Valere
2025-10-14 10:46:57 +02:00
parent a5aba928dd
commit a9db9c8b59
12 changed files with 183 additions and 97 deletions

View File

@@ -141,8 +141,8 @@ export function useGroupCallRooms(client: MatrixClient): GroupCallRoom[] {
.filter(roomIsJoinable);
const sortedRooms = sortRooms(client, rooms);
Promise.all(
sortedRooms.map(async (room) => {
const session = await client.matrixRTC.getRoomSession(room);
sortedRooms.map((room) => {
const session = client.matrixRTC.getRoomSession(room);
return {
roomAlias: room.getCanonicalAlias() ?? undefined,
roomName: room.name,

View File

@@ -59,7 +59,7 @@ import { type MuteStates } from "../state/MuteStates";
import { type MatrixInfo } from "./VideoPreview";
import { InviteButton } from "../button/InviteButton";
import { LayoutToggle } from "./LayoutToggle";
import { CallViewModel, GridMode } from "../state/CallViewModel";
import { CallViewModel, type GridMode } from "../state/CallViewModel";
import { Grid, type TileProps } from "../grid/Grid";
import { useInitial } from "../useInitial";
import { SpotlightTile } from "../tile/SpotlightTile";
@@ -109,7 +109,7 @@ import { useAudioContext } from "../useAudioContext";
import ringtoneMp3 from "../sound/ringtone.mp3?url";
import ringtoneOgg from "../sound/ringtone.ogg?url";
import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx";
import { Layout } from "../state/layout-types.ts";
import { type Layout } from "../state/layout-types.ts";
const maxTapDurationMs = 400;
@@ -297,6 +297,10 @@ export const InCallView: FC<InCallViewProps> = ({
const audioOutputSwitcher = useBehavior(vm.audioOutputSwitcher$);
const sharingScreen = useBehavior(vm.sharingScreen$);
const fatalCallError = useBehavior(vm.configError$);
// Stop the rendering and throw for the error boundary
if (fatalCallError) throw fatalCallError;
// We need to set the proper timings on the animation based upon the sound length.
const ringDuration = pickupPhaseAudio?.soundDuration["waiting"] ?? 1;
useEffect((): (() => void) => {

View File

@@ -13,7 +13,6 @@ import EventEmitter from "events";
import { enterRTCSession, leaveRTCSession } from "../src/rtcSessionHelpers";
import { mockConfig } from "./utils/test";
import { ElementWidgetActions, widget } from "./widget";
import { ErrorCode } from "./utils/errors.ts";
const USE_MUTI_SFU = false;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
@@ -97,38 +96,20 @@ test("It joins the correct Session", async () => {
{
encryptMedia: true,
useMultiSfu: USE_MUTI_SFU,
}
},
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "my-oldest-member-service-alias",
livekit_service_url: "http://my-oldest-member-service-url.com",
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url2.com",
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-default-service-url.com",
type: "livekit",
},
],
{
focus_selection: "oldest_membership",
type: "livekit",
},
undefined,
expect.objectContaining({
manageMediaKeys: false,
manageMediaKeys: true,
useLegacyMemberEvents: false,
useNewMembershipManager: true,
useExperimentalToDeviceTransport: false,
@@ -177,40 +158,6 @@ test("leaveRTCSession doesn't close the widget when returning to lobby", async (
await testLeaveRTCSession("user", false);
});
test("It fails with configuration error if no live kit url config is set in fallback", async () => {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({});
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await expect(
enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
useMultiSfu: USE_MUTI_SFU,
}
),
).rejects.toThrowError(
expect.objectContaining({ code: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT }),
);
});
test("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
@@ -250,6 +197,6 @@ test("It should not fail with configuration error if homeserver config has livek
{
encryptMedia: true,
useMultiSfu: USE_MUTI_SFU,
}
},
);
});

View File

@@ -123,7 +123,6 @@ export async function enterRTCSession(
useMultiSfu: true,
},
): Promise<void> {
const {
encryptMedia,
useNewMembershipManager = true,

View File

@@ -5,7 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { test, vi, onTestFinished, it, describe } from "vitest";
import { test, vi, onTestFinished, it, describe, expect } from "vitest";
import EventEmitter from "events";
import {
BehaviorSubject,
@@ -45,6 +45,7 @@ import {
MatrixRTCSessionEvent,
} from "matrix-js-sdk/lib/matrixrtc";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { CallViewModel, type CallViewModelOptions } from "./CallViewModel";
import { type Layout } from "./layout-types";
@@ -58,6 +59,7 @@ import {
MockRTCSession,
mockMediaDevices,
mockMuteStates,
mockConfig,
} from "../utils/test";
import {
ECAddonConnectionState,
@@ -92,6 +94,10 @@ import { MediaDevices } from "./MediaDevices";
import { getValue } from "../utils/observable";
import { type Behavior, constant } from "./Behavior";
import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx";
import {
type ElementCallError,
MatrixRTCTransportMissingError,
} from "../utils/errors.ts";
const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
vi.mock("../UrlParams", () => ({ getUrlParams }));
@@ -365,6 +371,61 @@ function withCallViewModel(
continuation(vm, rtcSession, { raisedHands$: raisedHands$ }, setSyncState);
}
test("test missing RTC config error", async () => {
const rtcMemberships$ = new BehaviorSubject<CallMembership[]>([]);
const emitter = new EventEmitter();
const client = vi.mocked<MatrixClient>({
on: emitter.on.bind(emitter),
off: emitter.off.bind(emitter),
getSyncState: vi.fn().mockReturnValue(SyncState.Syncing),
getUserId: vi.fn().mockReturnValue("@user:localhost"),
getUser: vi.fn().mockReturnValue(null),
getDeviceId: vi.fn().mockReturnValue("DEVICE"),
credentials: {
userId: "@user:localhost",
},
getCrypto: vi.fn().mockReturnValue(undefined),
getDomain: vi.fn().mockReturnValue("example.org"),
} as unknown as MatrixClient);
const matrixRoom = mockMatrixRoom({
roomId: "!myRoomId:example.com",
client,
getMember: vi.fn().mockReturnValue(undefined),
});
const fakeRtcSession = new MockRTCSession(matrixRoom).withMemberships(
rtcMemberships$,
);
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({});
const callVM = new CallViewModel(
fakeRtcSession.asMockedSession(),
matrixRoom,
mockMediaDevices({}),
mockMuteStates(),
{
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
autoLeaveWhenOthersLeft: false,
},
new BehaviorSubject({} as Record<string, RaisedHandInfo>),
new BehaviorSubject({} as Record<string, ReactionInfo>),
of({ processor: undefined, supported: false }),
);
const failPromise = Promise.withResolvers<ElementCallError>();
callVM.configError$.subscribe((error) => {
if (error) {
failPromise.resolve(error);
}
});
const error = await failPromise.promise;
expect(error).toBeInstanceOf(MatrixRTCTransportMissingError);
});
test("participants are retained during a focus switch", () => {
withTestScheduler(({ behavior, expectObservable }) => {
// Participants disappear on frame 2 and come back on frame 3

View File

@@ -27,6 +27,7 @@ import {
} from "matrix-js-sdk";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import {
BehaviorSubject,
combineLatest,
concat,
distinctUntilChanged,
@@ -76,7 +77,7 @@ import { ViewModel } from "./ViewModel";
import {
LocalUserMediaViewModel,
type MediaViewModel,
RemoteUserMediaViewModel,
type RemoteUserMediaViewModel,
ScreenShareViewModel,
type UserMediaViewModel,
} from "./MediaViewModel";
@@ -130,14 +131,15 @@ import { type Async, async$, mapAsync, ready } from "./Async";
import { sharingScreen$, UserMedia } from "./UserMedia.ts";
import { ScreenShare } from "./ScreenShare.ts";
import {
GridLayoutMedia,
Layout,
LayoutMedia,
OneOnOneLayoutMedia,
SpotlightExpandedLayoutMedia,
SpotlightLandscapeLayoutMedia,
SpotlightPortraitLayoutMedia,
type GridLayoutMedia,
type Layout,
type LayoutMedia,
type OneOnOneLayoutMedia,
type SpotlightExpandedLayoutMedia,
type SpotlightLandscapeLayoutMedia,
type SpotlightPortraitLayoutMedia,
} from "./layout-types.ts";
import { ElementCallError, UnknownCallError } from "../utils/errors.ts";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -224,6 +226,19 @@ export class CallViewModel extends ViewModel {
}
: undefined;
private readonly _configError$ = new BehaviorSubject<ElementCallError | null>(
null,
);
/**
* If there is a configuration error with the call (e.g. misconfigured E2EE).
* This is a fatal error that prevents the call from being created/joined.
* Should render a blocking error screen.
*/
public get configError$(): Behavior<ElementCallError | null> {
return this._configError$;
}
private readonly join$ = new Subject<void>();
public join(): void {
@@ -273,7 +288,7 @@ export class CallViewModel extends ViewModel {
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
private readonly preferredTransport = makeTransport(this.matrixRTCSession);
private readonly preferredTransport$: Observable<Async<LivekitTransport>>;
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
@@ -287,11 +302,7 @@ export class CallViewModel extends ViewModel {
switchMap((joined) =>
joined
? combineLatest(
[
async$(this.preferredTransport),
this.memberships$,
multiSfu.value$,
],
[this.preferredTransport$, this.memberships$, multiSfu.value$],
(preferred, memberships, multiSfu) => {
const oldestMembership =
this.matrixRTCSession.getOldestMembership();
@@ -313,6 +324,13 @@ export class CallViewModel extends ViewModel {
local = ready(selection);
}
}
if (local.state === "error") {
this._configError$.next(
local.value instanceof ElementCallError
? local.value
: new UnknownCallError(local.value),
);
}
return { local, remote };
},
)
@@ -1743,6 +1761,10 @@ export class CallViewModel extends ViewModel {
) {
super();
this.preferredTransport$ = async$(
makeTransport(this.matrixRTCSession),
).pipe(this.scope.bind());
// Start and stop local and remote connections as needed
this.connectionInstructions$
.pipe(this.scope.bind())
@@ -1765,11 +1787,21 @@ export class CallViewModel extends ViewModel {
logger.info(
`Connected to ${c.localTransport.livekit_service_url}`,
),
(e) =>
(e) => {
// We only want to report fatal errors `_configError$` for the publish connection.
// If there is an error with another connection, it will not terminate the call and will be displayed
// on eacn tile.
if (
c instanceof PublishConnection &&
e instanceof ElementCallError
) {
this._configError$.next(e);
}
logger.error(
`Failed to start connection to ${c.localTransport.livekit_service_url}`,
e,
),
);
},
);
}
});
@@ -1778,6 +1810,7 @@ export class CallViewModel extends ViewModel {
this.scope.reconcile(this.localTransport$, async (localTransport) => {
if (localTransport?.state === "ready") {
try {
this._configError$.next(null);
await enterRTCSession(this.matrixRTCSession, localTransport.value, {
encryptMedia: this.options.encryptionSystem.kind !== E2eeType.NONE,
useExperimentalToDeviceTransport: true,

View File

@@ -10,6 +10,7 @@ import {
connectionStateObserver,
} from "@livekit/components-core";
import {
ConnectionError,
type ConnectionState,
type E2EEOptions,
Room as LivekitRoom,
@@ -29,6 +30,10 @@ import {
import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope";
import { defaultLiveKitOptions } from "../livekit/options";
import {
InsufficientCapacityError,
SFURoomCreationRestrictedError,
} from "../utils/errors.ts";
export interface ConnectionOpts {
/** The focus server to connect to. */
@@ -88,6 +93,9 @@ export class Connection {
* 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.)
* 2. Use this token to request the SFU config to the MatrixRtc authentication service.
* 3. Connect to the configured LiveKit room.
*
* @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection.
* @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created.
*/
public async start(): Promise<void> {
this.stopped = false;
@@ -105,7 +113,30 @@ export class Connection {
state: "ConnectingToLkRoom",
focus: this.localTransport,
});
await this.livekitRoom.connect(url, jwt);
try {
await this.livekitRoom.connect(url, jwt);
} catch (e) {
// LiveKit uses 503 to indicate that the server has hit its track limits.
// https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171
// It also errors with a status code of 200 (yes, really) for room
// participant limits.
// LiveKit Cloud uses 429 for connection limits.
// Either way, all these errors can be explained as "insufficient capacity".
if (e instanceof ConnectionError) {
if (e.status === 503 || e.status === 200 || e.status === 429) {
throw new InsufficientCapacityError();
}
if (e.status === 404) {
// error msg is "Could not establish signal connection: requested room does not exist"
// The room does not exist. There are two different modes of operation for the SFU:
// - the room is created on the fly when connecting (livekit `auto_create` option)
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
// In the first case there will not be a 404, so we are in the second case.
throw new SFURoomCreationRestrictedError();
}
}
throw e;
}
// If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return;

View File

@@ -94,6 +94,9 @@ export class PublishConnection extends Connection {
* 2. Use this token to request the SFU config to the MatrixRtc authentication service.
* 3. Connect to the configured LiveKit room.
* 4. Create local audio and video tracks based on the current mute states and publish them to the room.
*
* @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection.
* @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created.
*/
public async start(): Promise<void> {
this.stopped = false;

View File

@@ -4,14 +4,15 @@ Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { ObservableScope } from "./ObservableScope.ts";
import { ScreenShareViewModel } from "./MediaViewModel.ts";
import { BehaviorSubject, type Observable } from "rxjs";
import {
LocalParticipant,
RemoteParticipant,
type LocalParticipant,
type RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import { ObservableScope } from "./ObservableScope.ts";
import { ScreenShareViewModel } from "./MediaViewModel.ts";
import type { RoomMember } from "matrix-js-sdk";
import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts";
import type { Behavior } from "./Behavior.ts";

View File

@@ -5,27 +5,28 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { BehaviorSubject, map, type Observable, of, switchMap } from "rxjs";
import {
type LocalParticipant,
type Participant,
ParticipantEvent,
type RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core";
import { ObservableScope } from "./ObservableScope.ts";
import {
LocalUserMediaViewModel,
RemoteUserMediaViewModel,
UserMediaViewModel,
type UserMediaViewModel,
} from "./MediaViewModel.ts";
import { BehaviorSubject, map, type Observable, of, switchMap } from "rxjs";
import {
LocalParticipant,
Participant,
ParticipantEvent,
RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import type { Behavior } from "./Behavior.ts";
import type { RoomMember } from "matrix-js-sdk";
import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts";
import type { MediaDevices } from "./MediaDevices.ts";
import type { ReactionOption } from "../reactions";
import { observeSpeaker$ } from "./observeSpeaker.ts";
import { observeParticipantEvents } from "@livekit/components-core";
/**
* TODO Document this

View File

@@ -5,8 +5,14 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { GridTileViewModel, SpotlightTileViewModel } from "./TileViewModel.ts";
import { MediaViewModel, UserMediaViewModel } from "./MediaViewModel.ts";
import {
type GridTileViewModel,
type SpotlightTileViewModel,
} from "./TileViewModel.ts";
import {
type MediaViewModel,
type UserMediaViewModel,
} from "./MediaViewModel.ts";
export interface GridLayoutMedia {
type: "grid";

View File

@@ -196,7 +196,7 @@ export function mockRtcMembership(
content: data,
});
const cms = new CallMembership(event);
const cms = new CallMembership(event, data);
vi.mocked(cms).getTransport = vi.fn().mockReturnValue(fociPreferred[0]);
return cms;
}