Turn multi-SFU media transport into a developer option

This commit is contained in:
Robin
2025-10-03 14:43:22 -04:00
parent 68aae4a8e3
commit 86fb026be8
12 changed files with 461 additions and 290 deletions

View File

@@ -72,6 +72,7 @@
"livekit_server_info": "LiveKit Server Info",
"livekit_sfu": "LiveKit SFU: {{url}}",
"matrix_id": "Matrix ID: {{id}}",
"multi_sfu": "Multi-SFU media transport",
"mute_all_audio": "Mute all audio (participants, reactions, join sounds)",
"show_connection_stats": "Show connection statistics",
"url_params": "URL parameters",
@@ -91,7 +92,7 @@
"generic_description": "Submitting debug logs will help us track down the problem.",
"insufficient_capacity": "Insufficient capacity",
"insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.",
"matrix_rtc_focus_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).",
"matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).",
"open_elsewhere": "Opened in another tab",
"open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.",
"room_creation_restricted": "Failed to create call",

View File

@@ -26,7 +26,7 @@ import {
E2EENotSupportedError,
type ElementCallError,
InsufficientCapacityError,
MatrixRTCFocusMissingError,
MatrixRTCTransportMissingError,
UnknownCallError,
} from "../utils/errors.ts";
import { mockConfig } from "../utils/test.ts";
@@ -34,7 +34,7 @@ import { ElementWidgetActions, type WidgetHelpers } from "../widget.ts";
test.each([
{
error: new MatrixRTCFocusMissingError("example.com"),
error: new MatrixRTCTransportMissingError("example.com"),
expectedTitle: "Call is not supported",
},
{
@@ -85,7 +85,7 @@ test.each([
);
test("should render the error page with link back to home", async () => {
const error = new MatrixRTCFocusMissingError("example.com");
const error = new MatrixRTCTransportMissingError("example.com");
const TestComponent = (): ReactNode => {
throw error;
};
@@ -213,7 +213,7 @@ describe("Rageshake button", () => {
});
test("should have a close button in widget mode", async () => {
const error = new MatrixRTCFocusMissingError("example.com");
const error = new MatrixRTCTransportMissingError("example.com");
const TestComponent = (): ReactNode => {
throw error;
};

View File

@@ -42,7 +42,7 @@ import {
import { GroupCallView } from "./GroupCallView";
import { type WidgetHelpers } from "../widget";
import { LazyEventEmitter } from "../LazyEventEmitter";
import { MatrixRTCFocusMissingError } from "../utils/errors";
import { MatrixRTCTransportMissingError } from "../utils/errors";
import { ProcessorProvider } from "../livekit/TrackProcessorContext";
import { MediaDevicesContext } from "../MediaDevicesContext";
import { HeaderStyle } from "../UrlParams";
@@ -258,7 +258,7 @@ test("GroupCallView leaves the session when an error occurs", async () => {
test("GroupCallView shows errors that occur during joining", async () => {
const user = userEvent.setup();
enterRTCSession.mockRejectedValue(new MatrixRTCFocusMissingError(""));
enterRTCSession.mockRejectedValue(new MatrixRTCTransportMissingError(""));
onTestFinished(() => {
enterRTCSession.mockReset();
});

View File

@@ -17,7 +17,7 @@ import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { PosthogAnalytics } from "./analytics/PosthogAnalytics";
import { Config } from "./config/Config";
import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget";
import { MatrixRTCFocusMissingError } from "./utils/errors";
import { MatrixRTCTransportMissingError } from "./utils/errors";
import { getUrlParams } from "./UrlParams";
import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts";
@@ -28,35 +28,31 @@ export function getLivekitAlias(rtcSession: MatrixRTCSession): string {
return rtcSession.room.roomId;
}
async function makeFocusInternal(
async function makeTransportInternal(
rtcSession: MatrixRTCSession,
): Promise<LivekitTransport> {
logger.log("Searching for a preferred focus");
logger.log("Searching for a preferred transport");
const livekitAlias = getLivekitAlias(rtcSession);
const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth");
// TODO-MULTI-SFU: Either remove this dev tool or make it more official
const urlFromStorage =
localStorage.getItem("robin-matrixrtc-auth") ??
localStorage.getItem("timo-focus-url");
if (urlFromStorage !== null) {
const focusFromStorage: LivekitTransport = {
const transportFromStorage: LivekitTransport = {
type: "livekit",
livekit_service_url: urlFromStorage,
livekit_alias: livekitAlias,
};
logger.log("Using LiveKit focus from local storage: ", focusFromStorage);
return focusFromStorage;
logger.log(
"Using LiveKit transport from local storage: ",
transportFromStorage,
);
return transportFromStorage;
}
// Prioritize the .well-known/matrix/client, if available, over the configured SFU
const domain = rtcSession.room.client.getDomain();
if (localStorage.getItem("timo-focus-url")) {
const timoFocusUrl = localStorage.getItem("timo-focus-url")!;
const focusFromUrl: LivekitTransport = {
type: "livekit",
livekit_service_url: timoFocusUrl,
livekit_alias: livekitAlias,
};
logger.log("Using LiveKit focus from localStorage: ", timoFocusUrl);
return focusFromUrl;
}
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
@@ -64,46 +60,46 @@ async function makeFocusInternal(
FOCI_WK_KEY
];
if (Array.isArray(wellKnownFoci)) {
const focus: LivekitTransportConfig | undefined = wellKnownFoci.find(
const transport: LivekitTransportConfig | undefined = wellKnownFoci.find(
(f) => f && isLivekitTransportConfig(f),
);
if (focus !== undefined) {
logger.log("Using LiveKit focus from .well-known: ", focus);
return { ...focus, livekit_alias: livekitAlias };
if (transport !== undefined) {
logger.log("Using LiveKit transport from .well-known: ", transport);
return { ...transport, livekit_alias: livekitAlias };
}
}
}
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
const focusFromConf: LivekitTransport = {
const transportFromConf: LivekitTransport = {
type: "livekit",
livekit_service_url: urlFromConf,
livekit_alias: livekitAlias,
};
logger.log("Using LiveKit focus from config: ", focusFromConf);
return focusFromConf;
logger.log("Using LiveKit transport from config: ", transportFromConf);
return transportFromConf;
}
throw new MatrixRTCFocusMissingError(domain ?? "");
throw new MatrixRTCTransportMissingError(domain ?? "");
}
export async function makeFocus(
export async function makeTransport(
rtcSession: MatrixRTCSession,
): Promise<LivekitTransport> {
const focus = await makeFocusInternal(rtcSession);
const transport = await makeTransportInternal(rtcSession);
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID(
rtcSession.room.client,
focus.livekit_service_url,
focus.livekit_alias,
transport.livekit_service_url,
transport.livekit_alias,
);
return focus;
return transport;
}
export async function enterRTCSession(
rtcSession: MatrixRTCSession,
focus: LivekitTransport,
transport: LivekitTransport,
encryptMedia: boolean,
useNewMembershipManager = true,
useExperimentalToDeviceTransport = false,
@@ -120,10 +116,10 @@ export async function enterRTCSession(
const useDeviceSessionMemberEvents =
features?.feature_use_device_session_member_events;
const { sendNotificationType: notificationType, callIntent } = getUrlParams();
// Multi-sfu does not need a focus preferred list. just the focus that is actually used.
// Multi-sfu does not need a preferred foci list. just the focus that is actually used.
rtcSession.joinRoomSession(
useMultiSfu ? [focus] : [],
useMultiSfu ? focus : undefined,
useMultiSfu ? [] : [transport],
useMultiSfu ? transport : undefined,
{
notificationType,
callIntent,

View File

@@ -16,6 +16,7 @@ import {
showConnectionStats as showConnectionStatsSetting,
useNewMembershipManager as useNewMembershipManagerSetting,
useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting,
multiSfu as multiSfuSetting,
muteAllAudio as muteAllAudioSetting,
alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting,
} from "./settings";
@@ -50,6 +51,7 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
useExperimentalToDeviceTransport,
setUseExperimentalToDeviceTransport,
] = useSetting(useExperimentalToDeviceTransportSetting);
const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting);
const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting);
@@ -166,6 +168,20 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
)}
/>
</FieldRow>
<FieldRow>
<InputField
id="multiSfu"
type="checkbox"
label={t("developer_mode.multi_sfu")}
checked={multiSfu}
onChange={useCallback(
(event: ChangeEvent<HTMLInputElement>): void => {
setMultiSfu(event.target.checked);
},
[setMultiSfu],
)}
/>
</FieldRow>
<FieldRow>
<InputField
id="muteAllAudio"

View File

@@ -125,6 +125,8 @@ export const useExperimentalToDeviceTransport = new Setting<boolean>(
true,
);
export const multiSfu = new Setting<boolean>("multi-sfu", false);
export const muteAllAudio = new Setting<boolean>("mute-all-audio", false);
export const alwaysShowSelf = new Setting<boolean>("always-show-self", true);

44
src/state/Async.ts Normal file
View File

@@ -0,0 +1,44 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
catchError,
from,
map,
Observable,
of,
startWith,
switchMap,
} from "rxjs";
export type Async<A> =
| { state: "loading" }
| { state: "error"; value: Error }
| { state: "ready"; value: A };
export const loading: Async<never> = { state: "loading" };
export function error(value: Error): Async<never> {
return { state: "error", value };
}
export function ready<A>(value: A): Async<A> {
return { state: "ready", value };
}
export function async<A>(promise: Promise<A>): Observable<Async<A>> {
return from(promise).pipe(
map(ready),
startWith(loading),
catchError((e) => of(error(e))),
);
}
export function mapAsync<A, B>(
async: Async<A>,
project: (value: A) => B,
): Async<B> {
return async.state === "ready" ? ready(project(async.value)) : async;
}

View File

@@ -28,6 +28,7 @@ import {
EventType,
RoomEvent,
} from "matrix-js-sdk";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import {
BehaviorSubject,
EMPTY,
@@ -48,6 +49,7 @@ import {
of,
pairwise,
race,
repeat,
scan,
skip,
skipWhile,
@@ -57,6 +59,7 @@ import {
switchScan,
take,
takeUntil,
takeWhile,
tap,
throttleTime,
timer,
@@ -65,6 +68,7 @@ import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitTransport,
type LivekitTransport,
type MatrixRTCSession,
MatrixRTCSessionEvent,
type MatrixRTCSessionEventHandlerMap,
@@ -90,6 +94,7 @@ import {
import { ObservableScope } from "./ObservableScope";
import {
duplicateTiles,
multiSfu,
playReactionsSound,
showReactions,
} from "../settings/settings";
@@ -118,7 +123,7 @@ import { constant, type Behavior } from "./Behavior";
import {
enterRTCSession,
getLivekitAlias,
makeFocus,
makeTransport,
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
@@ -127,6 +132,7 @@ import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
import { ElementWidgetActions, widget } from "../widget";
import { type Async, async, mapAsync, ready } from "./Async";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -449,27 +455,33 @@ export class CallViewModel extends ViewModel {
}
: undefined;
private readonly localFocus = makeFocus(this.matrixRTCSession);
private readonly join$ = new Subject<void>();
private readonly localConnection = this.localFocus.then(
(focus) =>
new PublishConnection(
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
);
public join(): void {
this.join$.next();
}
public readonly livekitConnectionState$ = this.scope.behavior(
combineLatest([this.localConnection]).pipe(
switchMap(([c]) => c.connectionState$),
startWith(ConnectionState.Disconnected),
// This is functionally the same Observable as leave$, except here it's
// hoisted to the top of the class. This enables the cyclic dependency between
// leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ ->
// localConnection$ -> transports$ -> joined$ -> leave$.
private readonly leaveHoisted$ = new Subject<
"user" | "timeout" | "decline" | "allOthersLeft"
>();
/**
* Whether we are joined to the call. This reflects our local state rather
* than whether all connections are truly up and running.
*/
private readonly joined$ = this.scope.behavior(
this.join$.pipe(
map(() => true),
// Using takeUntil with the repeat operator is perfectly valid.
// eslint-disable-next-line rxjs/no-unsafe-takeuntil
takeUntil(this.leaveHoisted$),
endWith(false),
repeat(),
startWith(false),
),
);
@@ -488,125 +500,224 @@ export class CallViewModel extends ViewModel {
),
);
private readonly membershipsAndFocusMap$ = this.scope.behavior(
this.memberships$.pipe(
map((memberships) =>
memberships.flatMap((m) => {
const f = this.matrixRTCSession.resolveActiveFocus(m);
return f && isLivekitTransport(f)
? [{ membership: m, focus: f }]
: [];
}),
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
private readonly preferredTransport = makeTransport(this.matrixRTCSession);
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
* members.
*/
private readonly transports$: Behavior<{
local: Async<LivekitTransport>;
remote: { membership: CallMembership; transport: LivekitTransport }[];
} | null> = this.scope.behavior(
this.joined$.pipe(
switchMap((joined) =>
joined
? combineLatest(
[
async(this.preferredTransport),
this.memberships$,
multiSfu.value$,
],
(preferred, memberships, multiSfu) => {
const remote = memberships.flatMap((m) => {
if (m.sender === this.userId && m.deviceId === this.deviceId)
return [];
const t = this.matrixRTCSession.resolveActiveFocus(m);
return t && isLivekitTransport(t)
? [{ membership: m, transport: t }]
: [];
});
let local = preferred;
if (!multiSfu) {
const oldest = this.matrixRTCSession.getOldestMembership();
if (oldest !== undefined) {
const selection = oldest.getTransport(oldest);
if (isLivekitTransport(selection)) local = ready(selection);
}
}
return { local, remote };
},
)
: of(null),
),
),
);
private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe(
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
/**
* Lists the transports used by each MatrixRTC session member other than
* ourselves.
*/
private readonly remoteTransports$ = this.scope.behavior(
this.transports$.pipe(map((transports) => transports?.remote ?? [])),
);
/**
* The transport over which we should be actively publishing our media.
*/
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
distinctUntilChanged(deepCompare),
),
);
private readonly localConnectionAndTransport$ = this.scope.behavior(
this.localTransport$.pipe(
map(
(transport) =>
transport &&
mapAsync(transport, (transport) => ({
connection: new PublishConnection(
transport,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.remoteTransports$,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
transport,
})),
),
),
);
private readonly localConnection$ = this.scope.behavior(
this.localConnectionAndTransport$.pipe(
map((value) => value && mapAsync(value, ({ connection }) => connection)),
),
);
public readonly livekitConnectionState$ = this.scope.behavior(
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
? c.value.connectionState$
: of(ConnectionState.Disconnected),
),
),
);
/**
* Connections for each transport in use by one or more session members that
* is *distinct* from the local transport.
*/
private readonly remoteConnections$ = this.scope.behavior(
combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe(
accumulate(
new Map<string, Connection>(),
(prev, [localFocus, focusUrls]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focusUrl of focusUrls) {
if (focusUrl !== localFocus.livekit_service_url) {
stopped.delete(focusUrl);
this.transports$.pipe(
accumulate(new Map<string, Connection>(), (prev, transports) => {
const next = new Map<string, Connection>();
let nextConnection = prev.get(focusUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
focusUrl,
);
nextConnection = new Connection(
{
livekit_service_url: focusUrl,
livekit_alias: this.livekitAlias,
type: "livekit",
},
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
this.e2eeLivekitOptions(),
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
focusUrl,
);
}
next.set(focusUrl, nextConnection);
// Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") {
const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set(
transports.remote.flatMap(({ membership, transport }) => {
const t = this.matrixRTCSession.resolveActiveFocus(membership);
return t &&
isLivekitTransport(t) &&
t.livekit_service_url !== localServiceUrl
? [t.livekit_service_url]
: [];
}),
);
for (const remoteServiceUrl of remoteServiceUrls) {
let nextConnection = prev.get(remoteServiceUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
remoteServiceUrl,
);
nextConnection = new Connection(
{
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
type: "livekit",
},
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.remoteTransports$,
this.e2eeLivekitOptions(),
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
remoteServiceUrl,
);
}
next.set(remoteServiceUrl, nextConnection);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
},
),
return next;
}),
map((transports) => [...transports.values()]),
),
);
private readonly join$ = new Subject<void>();
/**
* A list of the connections that should be active at any given time.
*/
private readonly connections$ = this.scope.behavior<Connection[]>(
combineLatest(
[this.localConnection$, this.remoteConnections$],
(local, remote) => [
...(local?.state === "ready" ? [local.value] : []),
...remote.values(),
],
),
);
public join(): void {
this.join$.next();
}
private readonly connectionInstructions$ = this.join$.pipe(
switchMap(() => this.remoteConnections$),
startWith(new Map<string, Connection>()),
private readonly connectionInstructions$ = this.connections$.pipe(
pairwise(),
map(([prev, next]) => {
const start = new Set(next.values());
for (const connection of prev.values()) start.delete(connection);
for (const connection of prev) start.delete(connection);
const stop = new Set(prev.values());
for (const connection of next.values()) stop.delete(connection);
for (const connection of next) stop.delete(connection);
return { start, stop };
}),
this.scope.share,
);
/**
* Emits with a connection whenever it should be started.
*/
private readonly startConnection$ = this.connectionInstructions$.pipe(
concatMap(({ start }) => start),
);
/**
* Emits with a connection whenever it should be stopped.
*/
private readonly stopConnection$ = this.connectionInstructions$.pipe(
concatMap(({ stop }) => stop),
);
public readonly allLivekitRooms$ = this.scope.behavior(
combineLatest([
this.remoteConnections$,
this.localConnection,
this.localFocus,
]).pipe(
map(([remoteConnections, localConnection, localFocus]) =>
Array.from(remoteConnections.entries())
.map(
([index, c]) =>
({
room: c.livekitRoom,
url: index,
}) as { room: LivekitRoom; url: string; isLocal?: boolean },
)
.concat([
{
room: localConnection.livekitRoom,
url: localFocus.livekit_service_url,
isLocal: true,
},
]),
this.connections$.pipe(
map((connections) =>
[...connections.values()].map((c) => ({
room: c.livekitRoom,
url: c.transport.livekit_service_url,
isLocal: c instanceof PublishConnection,
})),
),
startWith([]),
),
);
private readonly userId = this.matrixRoom.client.getUserId();
private readonly deviceId = this.matrixRoom.client.getDeviceId();
private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
@@ -679,6 +790,10 @@ export class CallViewModel extends ViewModel {
// in a split-brained state.
private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
* Lists, for each LiveKit room, the LiveKit participants whose media should
* be presented.
*/
public readonly participantsByRoom$ = this.scope.behavior<
{
livekitRoom: LivekitRoom;
@@ -689,9 +804,12 @@ export class CallViewModel extends ViewModel {
}[];
}[]
>(
combineLatest([this.localConnection, this.localFocus])
// TODO: Move this logic into Connection/PublishConnection if possible
this.localConnectionAndTransport$
.pipe(
switchMap(([localConnection, localFocus]) => {
switchMap((values) => {
if (values?.state !== "ready") return [];
const localConnection = values.value.connection;
const memberError = (): never => {
throw new Error("No room member for call membership");
};
@@ -702,12 +820,9 @@ export class CallViewModel extends ViewModel {
};
return this.remoteConnections$.pipe(
switchMap((connections) =>
switchMap((remoteConnections) =>
combineLatest(
[
[localFocus.livekit_service_url, localConnection] as const,
...connections,
].map(([url, c]) =>
[localConnection, ...remoteConnections].map((c) =>
c.publishingParticipants$.pipe(
map((ps) => {
const participants: {
@@ -726,7 +841,7 @@ export class CallViewModel extends ViewModel {
return {
livekitRoom: c.livekitRoom,
url,
url: c.transport.livekit_service_url,
participants,
};
}),
@@ -809,12 +924,8 @@ export class CallViewModel extends ViewModel {
* List of MediaItems that we want to display
*/
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([
this.participantsByRoom$,
duplicateTiles.value$,
this.memberships$,
]).pipe(
scan((prevItems, [participantsByRoom, duplicateTiles, memberships]) => {
combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe(
scan((prevItems, [participantsByRoom, duplicateTiles]) => {
const newItems: Map<string, UserMedia | ScreenShare> = new Map(
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
for (const { livekitRoom, participants } of participantsByRoom) {
@@ -829,6 +940,7 @@ export class CallViewModel extends ViewModel {
if (prevMedia && prevMedia instanceof UserMedia) {
prevMedia.updateParticipant(participant);
if (prevMedia.vm.member === undefined) {
// TODO-MULTI-SFU: This is outdated.
// We have a previous media created because of the `debugShowNonMember` flag.
// In this case we actually replace the media item.
// This "hack" never occurs if we do not use the `debugShowNonMember` debugging
@@ -931,6 +1043,16 @@ export class CallViewModel extends ViewModel {
this.memberships$.pipe(map((ms) => ms.length)),
);
private readonly allOthersLeft$ = this.memberships$.pipe(
pairwise(),
filter(
([prev, current]) =>
current.every((m) => m.sender === this.userId) &&
prev.some((m) => m.sender !== this.userId),
),
map(() => {}),
);
private readonly didSendCallNotification$ = fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.DidSendCallNotification,
@@ -1055,56 +1177,12 @@ export class CallViewModel extends ViewModel {
map(() => {}),
throttleTime(THROTTLE_SOUND_EFFECT_MS),
);
/**
* This observable tracks the matrix users that are currently in the call.
* There can be just one matrix user with multiple participants (see also participantChanges$)
*/
public readonly matrixUserChanges$ = this.userMedia$.pipe(
map(
(mediaItems) =>
new Set(
mediaItems
.map((m) => m.vm.member?.userId)
.filter((id) => id !== undefined),
),
),
scan<
Set<string>,
{
userIds: Set<string>;
joinedUserIds: Set<string>;
leftUserIds: Set<string>;
}
>(
(prevState, userIds) => {
const left = new Set(
[...prevState.userIds].filter((id) => !userIds.has(id)),
);
const joined = new Set(
[...userIds].filter((id) => !prevState.userIds.has(id)),
);
return { userIds: userIds, joinedUserIds: joined, leftUserIds: left };
},
{ userIds: new Set(), joinedUserIds: new Set(), leftUserIds: new Set() },
),
);
private readonly allOthersLeft$ = this.matrixUserChanges$.pipe(
filter(({ userIds, leftUserIds }) => {
if (!this.userId) {
logger.warn("Could not access user ID to compute allOthersLeft");
return false;
}
return (
userIds.size === 1 && userIds.has(this.userId) && leftUserIds.size > 0
);
}),
map(() => "allOthersLeft" as const),
);
// Public for testing
public readonly autoLeave$ = merge(
this.options.autoLeaveWhenOthersLeft ? this.allOthersLeft$ : NEVER,
this.options.autoLeaveWhenOthersLeft
? this.allOthersLeft$.pipe(map(() => "allOthersLeft" as const))
: NEVER,
this.callPickupState$.pipe(
filter((state) => state === "timeout" || state === "decline"),
),
@@ -1132,6 +1210,9 @@ export class CallViewModel extends ViewModel {
merge(this.userHangup$, this.widgetHangup$).pipe(
map(() => "user" as const),
),
).pipe(
this.scope.share,
tap((reason) => this.leaveHoisted$.next(reason)),
);
/**
@@ -1820,9 +1901,12 @@ export class CallViewModel extends ViewModel {
* Whether we are sharing our screen.
*/
public readonly sharingScreen$ = this.scope.behavior(
from(this.localConnection).pipe(
switchMap((c) => sharingScreen$(c.livekitRoom.localParticipant)),
startWith(false),
from(this.localConnection$).pipe(
switchMap((c) =>
c?.state === "ready"
? sharingScreen$(c.value.livekitRoom.localParticipant)
: of(false),
),
),
);
@@ -1834,17 +1918,26 @@ export class CallViewModel extends ViewModel {
"getDisplayMedia" in (navigator.mediaDevices ?? {}) &&
!this.urlParams.hideScreensharing
? (): void =>
void this.localConnection.then(
(c) =>
void c.livekitRoom.localParticipant
.setScreenShareEnabled(!this.sharingScreen$.value, {
audio: true,
selfBrowserSurface: "include",
surfaceSwitching: "include",
systemAudio: "include",
})
.catch(logger.error),
)
// Once a connection is ready...
void this.localConnection$
.pipe(
takeWhile((c) => c !== null && c.state !== "error"),
switchMap((c) => (c.state === "ready" ? of(c.value) : NEVER)),
take(1),
this.scope.bind(),
)
// ...toggle screen sharing.
.subscribe(
(c) =>
void c.livekitRoom.localParticipant
.setScreenShareEnabled(!this.sharingScreen$.value, {
audio: true,
selfBrowserSurface: "include",
surfaceSwitching: "include",
systemAudio: "include",
})
.catch(logger.error),
)
: null;
public constructor(
@@ -1864,32 +1957,33 @@ export class CallViewModel extends ViewModel {
) {
super();
void from(this.localConnection)
.pipe(this.scope.bind())
.subscribe(
(c) =>
void c
.start()
// eslint-disable-next-line no-console
.then(() => console.log("successfully started publishing"))
// eslint-disable-next-line no-console
.catch((e) => console.error("failed to start publishing", e)),
);
// Start and stop local and remote connections as needed
this.startConnection$.pipe(this.scope.bind()).subscribe(
(c) =>
void c.start().then(
() => logger.info(`Connected to ${c.transport.livekit_service_url}`),
(e) =>
logger.error(
`Failed to start connection to ${c.transport.livekit_service_url}`,
e,
),
),
);
this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => {
logger.info(`Disconnecting from ${c.transport.livekit_service_url}`);
c.stop();
});
this.startConnection$
.pipe(this.scope.bind())
.subscribe((c) => void c.start());
this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => c.stop());
combineLatest([this.localFocus, this.join$])
.pipe(this.scope.bind())
.subscribe(([localFocus]) => {
// Start and stop session membership as needed
this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => {
if (localTransport?.state === "ready") {
void enterRTCSession(
this.matrixRTCSession,
localFocus,
localTransport.value,
this.options.encryptionSystem.kind !== E2eeType.NONE,
true,
true,
multiSfu.value$.value,
)
.catch((e) => logger.error("Error entering RTC session", e))
.then(() =>
@@ -1906,19 +2000,20 @@ export class CallViewModel extends ViewModel {
),
),
);
});
this.leave$.pipe(this.scope.bind()).subscribe(() => {
// Only sends Matrix leave event. The LiveKit session will disconnect once, uh...
// (TODO-MULTI-SFU does anything actually cause it to disconnect?)
void this.matrixRTCSession
.leaveRoomSession()
.catch((e) => logger.error("Error leaving RTC session", e))
.then(async () =>
widget?.api.transport
.send(ElementWidgetActions.HangupCall, {})
.catch((e) => logger.error("Failed to send hangup action", e)),
);
return (): void =>
// Only sends Matrix leave event. The LiveKit session will disconnect
// as soon as either the stopConnection$ handler above gets to it or
// the view model is destroyed.
void this.matrixRTCSession
.leaveRoomSession()
.catch((e) => logger.error("Error leaving RTC session", e))
.then(async () =>
widget?.api.transport
.send(ElementWidgetActions.HangupCall, {})
.catch((e) => logger.error("Failed to send hangup action", e)),
);
}
});
// Pause upstream of all local media tracks when we're disconnected from
@@ -1927,10 +2022,12 @@ export class CallViewModel extends ViewModel {
// We use matrixConnected$ rather than reconnecting$ because we want to
// pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen.
void this.localConnection.then((localConnection) =>
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
combineLatest([this.localConnection$, this.matrixConnected$])
.pipe(this.scope.bind())
.subscribe(([connection, connected]) => {
if (connection?.state !== "ready") return;
const publications =
localConnection.livekitRoom.localParticipant.trackPublications.values();
connection.value.livekitRoom.localParticipant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
@@ -1966,8 +2063,7 @@ export class CallViewModel extends ViewModel {
}
}
}
}),
);
});
// Join automatically
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?

View File

@@ -62,7 +62,7 @@ export class Connection {
protected readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.focus.livekit_service_url,
this.transport.livekit_service_url,
this.livekitAlias,
);
@@ -72,12 +72,12 @@ export class Connection {
public connectionState$: Behavior<ConnectionState>;
public constructor(
protected readonly focus: LivekitTransport,
public readonly transport: LivekitTransport,
protected readonly livekitAlias: string,
protected readonly client: MatrixClient,
protected readonly scope: ObservableScope,
protected readonly membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitTransport }[]
protected readonly remoteTransports$: Behavior<
{ membership: CallMembership; transport: LivekitTransport }[]
>,
e2eeLivekitOptions: E2EEOptions | undefined,
livekitRoom: LivekitRoom | undefined = undefined,
@@ -95,12 +95,13 @@ export class Connection {
this.publishingParticipants$ = this.scope.behavior(
combineLatest(
[this.participantsIncludingSubscribers$, this.membershipsFocusMap$],
(participants, membershipsFocusMap) =>
membershipsFocusMap
[this.participantsIncludingSubscribers$, this.remoteTransports$],
(participants, remoteTransports) =>
remoteTransports
// Find all members that claim to publish on this connection
.flatMap(({ membership, focus }) =>
focus.livekit_service_url === this.focus.livekit_service_url
.flatMap(({ membership, transport }) =>
transport.livekit_service_url ===
this.transport.livekit_service_url
? [membership]
: [],
)
@@ -130,23 +131,35 @@ export class PublishConnection extends Connection {
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
if (!this.stopped) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: this.muteStates.audio.enabled$.value,
video: this.muteStates.video.enabled$.value,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
// TODO-MULTI-SFU: Prepublish a microphone track
const audio = this.muteStates.audio.enabled$.value;
const video = this.muteStates.video.enabled$.value;
// createTracks throws if called with audio=false and video=false
if (audio || video) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio,
video,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
}
}
}
public stop(): void {
this.muteStates.audio.unsetHandler();
this.muteStates.video.unsetHandler();
super.stop();
}
public constructor(
focus: LivekitTransport,
transport: LivekitTransport,
livekitAlias: string,
client: MatrixClient,
scope: ObservableScope,
membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitTransport }[]
remoteTransports$: Behavior<
{ membership: CallMembership; transport: LivekitTransport }[]
>,
devices: MediaDevices,
private readonly muteStates: MuteStates,
@@ -182,11 +195,11 @@ export class PublishConnection extends Connection {
});
super(
focus,
transport,
livekitAlias,
client,
scope,
membershipsFocusMap$,
remoteTransports$,
e2eeLivekitOptions,
room,
);
@@ -218,10 +231,6 @@ export class PublishConnection extends Connection {
}
return this.livekitRoom.localParticipant.isCameraEnabled;
});
this.scope.onEnd(() => {
this.muteStates.audio.unsetHandler();
this.muteStates.video.unsetHandler();
});
const syncDevice = (
kind: MediaDeviceKind,

View File

@@ -137,7 +137,7 @@ export class MuteStates {
this.scope,
this.mediaDevices.audioInput,
this.joined$,
Config.get().media_devices.enable_video,
Config.get().media_devices.enable_audio,
);
public readonly video = new MuteState(
this.scope,

View File

@@ -8,9 +8,10 @@ Please see LICENSE in the repository root for full details.
import {
BehaviorSubject,
distinctUntilChanged,
filter,
type Observable,
share,
Subject,
take,
takeUntil,
} from "rxjs";
@@ -24,9 +25,11 @@ const nothing = Symbol("nothing");
* A scope which limits the execution lifetime of its bound Observables.
*/
export class ObservableScope {
private readonly ended$ = new Subject<void>();
private readonly ended$ = new BehaviorSubject(false);
private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$);
private readonly bindImpl: MonoTypeOperator = takeUntil(
this.ended$.pipe(filter((ended) => ended)),
);
/**
* Binds an Observable to this scope, so that it completes when the scope
@@ -78,15 +81,19 @@ export class ObservableScope {
* Ends the scope, causing any bound Observables to complete.
*/
public end(): void {
this.ended$.next();
this.ended$.complete();
this.ended$.next(true);
}
/**
* Register a callback to be executed when the scope is ended.
*/
public onEnd(callback: () => void): void {
this.ended$.subscribe(callback);
this.ended$
.pipe(
filter((ended) => ended),
take(1),
)
.subscribe(callback);
}
}

View File

@@ -11,7 +11,7 @@ export enum ErrorCode {
/**
* Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured.
*/
MISSING_MATRIX_RTC_FOCUS = "MISSING_MATRIX_RTC_FOCUS",
MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT",
CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR",
/** LiveKit indicates that the server has hit its track limits */
INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR",
@@ -54,18 +54,18 @@ export class ElementCallError extends Error {
}
}
export class MatrixRTCFocusMissingError extends ElementCallError {
export class MatrixRTCTransportMissingError extends ElementCallError {
public domain: string;
public constructor(domain: string) {
super(
t("error.call_is_not_supported"),
ErrorCode.MISSING_MATRIX_RTC_FOCUS,
ErrorCode.MISSING_MATRIX_RTC_TRANSPORT,
ErrorCategory.CONFIGURATION_ISSUE,
t("error.matrix_rtc_focus_missing", {
t("error.matrix_rtc_transport_missing", {
domain,
brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call",
errorCode: ErrorCode.MISSING_MATRIX_RTC_FOCUS,
errorCode: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT,
}),
);
this.domain = domain;