diff --git a/locales/en/app.json b/locales/en/app.json index dc027c92..704f68ac 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -72,6 +72,7 @@ "livekit_server_info": "LiveKit Server Info", "livekit_sfu": "LiveKit SFU: {{url}}", "matrix_id": "Matrix ID: {{id}}", + "multi_sfu": "Multi-SFU media transport", "mute_all_audio": "Mute all audio (participants, reactions, join sounds)", "show_connection_stats": "Show connection statistics", "url_params": "URL parameters", @@ -91,7 +92,7 @@ "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", - "matrix_rtc_focus_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/src/room/GroupCallErrorBoundary.test.tsx b/src/room/GroupCallErrorBoundary.test.tsx index 51912956..22338924 100644 --- a/src/room/GroupCallErrorBoundary.test.tsx +++ b/src/room/GroupCallErrorBoundary.test.tsx @@ -26,7 +26,7 @@ import { E2EENotSupportedError, type ElementCallError, InsufficientCapacityError, - MatrixRTCFocusMissingError, + MatrixRTCTransportMissingError, UnknownCallError, } from "../utils/errors.ts"; import { mockConfig } from "../utils/test.ts"; @@ -34,7 +34,7 @@ import { ElementWidgetActions, type WidgetHelpers } from "../widget.ts"; test.each([ { - error: new MatrixRTCFocusMissingError("example.com"), + error: new MatrixRTCTransportMissingError("example.com"), expectedTitle: "Call is not supported", }, { @@ -85,7 +85,7 @@ test.each([ ); test("should render the error page with link back to home", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; @@ -213,7 +213,7 @@ describe("Rageshake button", () => { }); test("should have a close button in widget mode", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; diff --git a/src/room/GroupCallView.test.tsx b/src/room/GroupCallView.test.tsx index bf5d1fef..b8bc2f53 100644 --- a/src/room/GroupCallView.test.tsx +++ b/src/room/GroupCallView.test.tsx @@ -42,7 +42,7 @@ import { import { GroupCallView } from "./GroupCallView"; import { type WidgetHelpers } from "../widget"; import { LazyEventEmitter } from "../LazyEventEmitter"; -import { MatrixRTCFocusMissingError } from "../utils/errors"; +import { MatrixRTCTransportMissingError } from "../utils/errors"; import { ProcessorProvider } from "../livekit/TrackProcessorContext"; import { MediaDevicesContext } from "../MediaDevicesContext"; import { HeaderStyle } from "../UrlParams"; @@ -258,7 +258,7 @@ test("GroupCallView leaves the session when an error occurs", async () => { test("GroupCallView shows errors that occur during joining", async () => { const user = userEvent.setup(); - enterRTCSession.mockRejectedValue(new MatrixRTCFocusMissingError("")); + enterRTCSession.mockRejectedValue(new MatrixRTCTransportMissingError("")); onTestFinished(() => { enterRTCSession.mockReset(); }); diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index 175b35f4..3cdd82e7 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -17,7 +17,7 @@ import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { PosthogAnalytics } from "./analytics/PosthogAnalytics"; import { Config } from "./config/Config"; import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget"; -import { MatrixRTCFocusMissingError } from "./utils/errors"; +import { MatrixRTCTransportMissingError } from "./utils/errors"; import { getUrlParams } from "./UrlParams"; import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; @@ -28,35 +28,31 @@ export function getLivekitAlias(rtcSession: MatrixRTCSession): string { return rtcSession.room.roomId; } -async function makeFocusInternal( +async function makeTransportInternal( rtcSession: MatrixRTCSession, ): Promise { - logger.log("Searching for a preferred focus"); + logger.log("Searching for a preferred transport"); const livekitAlias = getLivekitAlias(rtcSession); - const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth"); + // TODO-MULTI-SFU: Either remove this dev tool or make it more official + const urlFromStorage = + localStorage.getItem("robin-matrixrtc-auth") ?? + localStorage.getItem("timo-focus-url"); if (urlFromStorage !== null) { - const focusFromStorage: LivekitTransport = { + const transportFromStorage: LivekitTransport = { type: "livekit", livekit_service_url: urlFromStorage, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from local storage: ", focusFromStorage); - return focusFromStorage; + logger.log( + "Using LiveKit transport from local storage: ", + transportFromStorage, + ); + return transportFromStorage; } // Prioritize the .well-known/matrix/client, if available, over the configured SFU const domain = rtcSession.room.client.getDomain(); - if (localStorage.getItem("timo-focus-url")) { - const timoFocusUrl = localStorage.getItem("timo-focus-url")!; - const focusFromUrl: LivekitTransport = { - type: "livekit", - livekit_service_url: timoFocusUrl, - livekit_alias: livekitAlias, - }; - logger.log("Using LiveKit focus from localStorage: ", timoFocusUrl); - return focusFromUrl; - } if (domain) { // we use AutoDiscovery instead of relying on the MatrixClient having already // been fully configured and started @@ -64,46 +60,46 @@ async function makeFocusInternal( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const focus: LivekitTransportConfig | undefined = wellKnownFoci.find( + const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( (f) => f && isLivekitTransportConfig(f), ); - if (focus !== undefined) { - logger.log("Using LiveKit focus from .well-known: ", focus); - return { ...focus, livekit_alias: livekitAlias }; + if (transport !== undefined) { + logger.log("Using LiveKit transport from .well-known: ", transport); + return { ...transport, livekit_alias: livekitAlias }; } } } const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFromConf: LivekitTransport = { + const transportFromConf: LivekitTransport = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from config: ", focusFromConf); - return focusFromConf; + logger.log("Using LiveKit transport from config: ", transportFromConf); + return transportFromConf; } - throw new MatrixRTCFocusMissingError(domain ?? ""); + throw new MatrixRTCTransportMissingError(domain ?? ""); } -export async function makeFocus( +export async function makeTransport( rtcSession: MatrixRTCSession, ): Promise { - const focus = await makeFocusInternal(rtcSession); + const transport = await makeTransportInternal(rtcSession); // this will call the jwt/sfu/get endpoint to pre create the livekit room. await getSFUConfigWithOpenID( rtcSession.room.client, - focus.livekit_service_url, - focus.livekit_alias, + transport.livekit_service_url, + transport.livekit_alias, ); - return focus; + return transport; } export async function enterRTCSession( rtcSession: MatrixRTCSession, - focus: LivekitTransport, + transport: LivekitTransport, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, @@ -120,10 +116,10 @@ export async function enterRTCSession( const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; const { sendNotificationType: notificationType, callIntent } = getUrlParams(); - // Multi-sfu does not need a focus preferred list. just the focus that is actually used. + // Multi-sfu does not need a preferred foci list. just the focus that is actually used. rtcSession.joinRoomSession( - useMultiSfu ? [focus] : [], - useMultiSfu ? focus : undefined, + useMultiSfu ? [] : [transport], + useMultiSfu ? transport : undefined, { notificationType, callIntent, diff --git a/src/settings/DeveloperSettingsTab.tsx b/src/settings/DeveloperSettingsTab.tsx index 1949ecf7..36c8a2e6 100644 --- a/src/settings/DeveloperSettingsTab.tsx +++ b/src/settings/DeveloperSettingsTab.tsx @@ -16,6 +16,7 @@ import { showConnectionStats as showConnectionStatsSetting, useNewMembershipManager as useNewMembershipManagerSetting, useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting, + multiSfu as multiSfuSetting, muteAllAudio as muteAllAudioSetting, alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting, } from "./settings"; @@ -50,6 +51,7 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { useExperimentalToDeviceTransport, setUseExperimentalToDeviceTransport, ] = useSetting(useExperimentalToDeviceTransportSetting); + const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting); const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting); @@ -166,6 +168,20 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { )} /> + + ): void => { + setMultiSfu(event.target.checked); + }, + [setMultiSfu], + )} + /> + ( true, ); +export const multiSfu = new Setting("multi-sfu", false); + export const muteAllAudio = new Setting("mute-all-audio", false); export const alwaysShowSelf = new Setting("always-show-self", true); diff --git a/src/state/Async.ts b/src/state/Async.ts new file mode 100644 index 00000000..2baa674c --- /dev/null +++ b/src/state/Async.ts @@ -0,0 +1,44 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + catchError, + from, + map, + Observable, + of, + startWith, + switchMap, +} from "rxjs"; + +export type Async = + | { state: "loading" } + | { state: "error"; value: Error } + | { state: "ready"; value: A }; + +export const loading: Async = { state: "loading" }; +export function error(value: Error): Async { + return { state: "error", value }; +} +export function ready(value: A): Async { + return { state: "ready", value }; +} + +export function async(promise: Promise): Observable> { + return from(promise).pipe( + map(ready), + startWith(loading), + catchError((e) => of(error(e))), + ); +} + +export function mapAsync( + async: Async, + project: (value: A) => B, +): Async { + return async.state === "ready" ? ready(project(async.value)) : async; +} diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 2f4bfa0c..8988e518 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -28,6 +28,7 @@ import { EventType, RoomEvent, } from "matrix-js-sdk"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; import { BehaviorSubject, EMPTY, @@ -48,6 +49,7 @@ import { of, pairwise, race, + repeat, scan, skip, skipWhile, @@ -57,6 +59,7 @@ import { switchScan, take, takeUntil, + takeWhile, tap, throttleTime, timer, @@ -65,6 +68,7 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, isLivekitTransport, + type LivekitTransport, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -90,6 +94,7 @@ import { import { ObservableScope } from "./ObservableScope"; import { duplicateTiles, + multiSfu, playReactionsSound, showReactions, } from "../settings/settings"; @@ -118,7 +123,7 @@ import { constant, type Behavior } from "./Behavior"; import { enterRTCSession, getLivekitAlias, - makeFocus, + makeTransport, } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; @@ -127,6 +132,7 @@ import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; +import { type Async, async, mapAsync, ready } from "./Async"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -449,27 +455,33 @@ export class CallViewModel extends ViewModel { } : undefined; - private readonly localFocus = makeFocus(this.matrixRTCSession); + private readonly join$ = new Subject(); - private readonly localConnection = this.localFocus.then( - (focus) => - new PublishConnection( - focus, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ), - ); + public join(): void { + this.join$.next(); + } - public readonly livekitConnectionState$ = this.scope.behavior( - combineLatest([this.localConnection]).pipe( - switchMap(([c]) => c.connectionState$), - startWith(ConnectionState.Disconnected), + // This is functionally the same Observable as leave$, except here it's + // hoisted to the top of the class. This enables the cyclic dependency between + // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> + // localConnection$ -> transports$ -> joined$ -> leave$. + private readonly leaveHoisted$ = new Subject< + "user" | "timeout" | "decline" | "allOthersLeft" + >(); + + /** + * Whether we are joined to the call. This reflects our local state rather + * than whether all connections are truly up and running. + */ + private readonly joined$ = this.scope.behavior( + this.join$.pipe( + map(() => true), + // Using takeUntil with the repeat operator is perfectly valid. + // eslint-disable-next-line rxjs/no-unsafe-takeuntil + takeUntil(this.leaveHoisted$), + endWith(false), + repeat(), + startWith(false), ), ); @@ -488,125 +500,224 @@ export class CallViewModel extends ViewModel { ), ); - private readonly membershipsAndFocusMap$ = this.scope.behavior( - this.memberships$.pipe( - map((memberships) => - memberships.flatMap((m) => { - const f = this.matrixRTCSession.resolveActiveFocus(m); - return f && isLivekitTransport(f) - ? [{ membership: m, focus: f }] - : []; - }), + /** + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). + */ + private readonly preferredTransport = makeTransport(this.matrixRTCSession); + + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. + */ + private readonly transports$: Behavior<{ + local: Async; + remote: { membership: CallMembership; transport: LivekitTransport }[]; + } | null> = this.scope.behavior( + this.joined$.pipe( + switchMap((joined) => + joined + ? combineLatest( + [ + async(this.preferredTransport), + this.memberships$, + multiSfu.value$, + ], + (preferred, memberships, multiSfu) => { + const remote = memberships.flatMap((m) => { + if (m.sender === this.userId && m.deviceId === this.deviceId) + return []; + const t = this.matrixRTCSession.resolveActiveFocus(m); + return t && isLivekitTransport(t) + ? [{ membership: m, transport: t }] + : []; + }); + let local = preferred; + if (!multiSfu) { + const oldest = this.matrixRTCSession.getOldestMembership(); + if (oldest !== undefined) { + const selection = oldest.getTransport(oldest); + if (isLivekitTransport(selection)) local = ready(selection); + } + } + return { local, remote }; + }, + ) + : of(null), ), ), ); - private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe( - map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), + /** + * Lists the transports used by each MatrixRTC session member other than + * ourselves. + */ + private readonly remoteTransports$ = this.scope.behavior( + this.transports$.pipe(map((transports) => transports?.remote ?? [])), ); + /** + * The transport over which we should be actively publishing our media. + */ + private readonly localTransport$: Behavior | null> = + this.scope.behavior( + this.transports$.pipe( + map((transports) => transports?.local ?? null), + distinctUntilChanged(deepCompare), + ), + ); + + private readonly localConnectionAndTransport$ = this.scope.behavior( + this.localTransport$.pipe( + map( + (transport) => + transport && + mapAsync(transport, (transport) => ({ + connection: new PublishConnection( + transport, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.remoteTransports$, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + transport, + })), + ), + ), + ); + + private readonly localConnection$ = this.scope.behavior( + this.localConnectionAndTransport$.pipe( + map((value) => value && mapAsync(value, ({ connection }) => connection)), + ), + ); + + public readonly livekitConnectionState$ = this.scope.behavior( + this.localConnection$.pipe( + switchMap((c) => + c?.state === "ready" + ? c.value.connectionState$ + : of(ConnectionState.Disconnected), + ), + ), + ); + + /** + * Connections for each transport in use by one or more session members that + * is *distinct* from the local transport. + */ private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe( - accumulate( - new Map(), - (prev, [localFocus, focusUrls]) => { - const stopped = new Map(prev); - const next = new Map(); - for (const focusUrl of focusUrls) { - if (focusUrl !== localFocus.livekit_service_url) { - stopped.delete(focusUrl); + this.transports$.pipe( + accumulate(new Map(), (prev, transports) => { + const next = new Map(); - let nextConnection = prev.get(focusUrl); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", - focusUrl, - ); - nextConnection = new Connection( - { - livekit_service_url: focusUrl, - livekit_alias: this.livekitAlias, - type: "livekit", - }, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, - this.e2eeLivekitOptions(), - ); - } else { - logger.log( - "SFU remoteConnections$ use prev connection: ", - focusUrl, - ); - } - next.set(focusUrl, nextConnection); + // Until the local transport becomes ready we have no idea which + // transports will actually need a dedicated remote connection + if (transports?.local.state === "ready") { + const localServiceUrl = transports.local.value.livekit_service_url; + const remoteServiceUrls = new Set( + transports.remote.flatMap(({ membership, transport }) => { + const t = this.matrixRTCSession.resolveActiveFocus(membership); + return t && + isLivekitTransport(t) && + t.livekit_service_url !== localServiceUrl + ? [t.livekit_service_url] + : []; + }), + ); + + for (const remoteServiceUrl of remoteServiceUrls) { + let nextConnection = prev.get(remoteServiceUrl); + if (!nextConnection) { + logger.log( + "SFU remoteConnections$ construct new connection: ", + remoteServiceUrl, + ); + nextConnection = new Connection( + { + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + type: "livekit", + }, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.remoteTransports$, + this.e2eeLivekitOptions(), + ); + } else { + logger.log( + "SFU remoteConnections$ use prev connection: ", + remoteServiceUrl, + ); } + next.set(remoteServiceUrl, nextConnection); } + } - for (const connection of stopped.values()) connection.stop(); - return next; - }, - ), + return next; + }), + map((transports) => [...transports.values()]), ), ); - private readonly join$ = new Subject(); + /** + * A list of the connections that should be active at any given time. + */ + private readonly connections$ = this.scope.behavior( + combineLatest( + [this.localConnection$, this.remoteConnections$], + (local, remote) => [ + ...(local?.state === "ready" ? [local.value] : []), + ...remote.values(), + ], + ), + ); - public join(): void { - this.join$.next(); - } - - private readonly connectionInstructions$ = this.join$.pipe( - switchMap(() => this.remoteConnections$), - startWith(new Map()), + private readonly connectionInstructions$ = this.connections$.pipe( pairwise(), map(([prev, next]) => { const start = new Set(next.values()); - for (const connection of prev.values()) start.delete(connection); + for (const connection of prev) start.delete(connection); const stop = new Set(prev.values()); - for (const connection of next.values()) stop.delete(connection); + for (const connection of next) stop.delete(connection); return { start, stop }; }), this.scope.share, ); + /** + * Emits with a connection whenever it should be started. + */ private readonly startConnection$ = this.connectionInstructions$.pipe( concatMap(({ start }) => start), ); + /** + * Emits with a connection whenever it should be stopped. + */ private readonly stopConnection$ = this.connectionInstructions$.pipe( concatMap(({ stop }) => stop), ); public readonly allLivekitRooms$ = this.scope.behavior( - combineLatest([ - this.remoteConnections$, - this.localConnection, - this.localFocus, - ]).pipe( - map(([remoteConnections, localConnection, localFocus]) => - Array.from(remoteConnections.entries()) - .map( - ([index, c]) => - ({ - room: c.livekitRoom, - url: index, - }) as { room: LivekitRoom; url: string; isLocal?: boolean }, - ) - .concat([ - { - room: localConnection.livekitRoom, - url: localFocus.livekit_service_url, - isLocal: true, - }, - ]), + this.connections$.pipe( + map((connections) => + [...connections.values()].map((c) => ({ + room: c.livekitRoom, + url: c.transport.livekit_service_url, + isLocal: c instanceof PublishConnection, + })), ), - startWith([]), ), ); private readonly userId = this.matrixRoom.client.getUserId(); + private readonly deviceId = this.matrixRoom.client.getDeviceId(); private readonly matrixConnected$ = this.scope.behavior( // To consider ourselves connected to MatrixRTC, we check the following: @@ -679,6 +790,10 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; + /** + * Lists, for each LiveKit room, the LiveKit participants whose media should + * be presented. + */ public readonly participantsByRoom$ = this.scope.behavior< { livekitRoom: LivekitRoom; @@ -689,9 +804,12 @@ export class CallViewModel extends ViewModel { }[]; }[] >( - combineLatest([this.localConnection, this.localFocus]) + // TODO: Move this logic into Connection/PublishConnection if possible + this.localConnectionAndTransport$ .pipe( - switchMap(([localConnection, localFocus]) => { + switchMap((values) => { + if (values?.state !== "ready") return []; + const localConnection = values.value.connection; const memberError = (): never => { throw new Error("No room member for call membership"); }; @@ -702,12 +820,9 @@ export class CallViewModel extends ViewModel { }; return this.remoteConnections$.pipe( - switchMap((connections) => + switchMap((remoteConnections) => combineLatest( - [ - [localFocus.livekit_service_url, localConnection] as const, - ...connections, - ].map(([url, c]) => + [localConnection, ...remoteConnections].map((c) => c.publishingParticipants$.pipe( map((ps) => { const participants: { @@ -726,7 +841,7 @@ export class CallViewModel extends ViewModel { return { livekitRoom: c.livekitRoom, - url, + url: c.transport.livekit_service_url, participants, }; }), @@ -809,12 +924,8 @@ export class CallViewModel extends ViewModel { * List of MediaItems that we want to display */ private readonly mediaItems$ = this.scope.behavior( - combineLatest([ - this.participantsByRoom$, - duplicateTiles.value$, - this.memberships$, - ]).pipe( - scan((prevItems, [participantsByRoom, duplicateTiles, memberships]) => { + combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( + scan((prevItems, [participantsByRoom, duplicateTiles]) => { const newItems: Map = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { for (const { livekitRoom, participants } of participantsByRoom) { @@ -829,6 +940,7 @@ export class CallViewModel extends ViewModel { if (prevMedia && prevMedia instanceof UserMedia) { prevMedia.updateParticipant(participant); if (prevMedia.vm.member === undefined) { + // TODO-MULTI-SFU: This is outdated. // We have a previous media created because of the `debugShowNonMember` flag. // In this case we actually replace the media item. // This "hack" never occurs if we do not use the `debugShowNonMember` debugging @@ -931,6 +1043,16 @@ export class CallViewModel extends ViewModel { this.memberships$.pipe(map((ms) => ms.length)), ); + private readonly allOthersLeft$ = this.memberships$.pipe( + pairwise(), + filter( + ([prev, current]) => + current.every((m) => m.sender === this.userId) && + prev.some((m) => m.sender !== this.userId), + ), + map(() => {}), + ); + private readonly didSendCallNotification$ = fromEvent( this.matrixRTCSession, MatrixRTCSessionEvent.DidSendCallNotification, @@ -1055,56 +1177,12 @@ export class CallViewModel extends ViewModel { map(() => {}), throttleTime(THROTTLE_SOUND_EFFECT_MS), ); - /** - * This observable tracks the matrix users that are currently in the call. - * There can be just one matrix user with multiple participants (see also participantChanges$) - */ - public readonly matrixUserChanges$ = this.userMedia$.pipe( - map( - (mediaItems) => - new Set( - mediaItems - .map((m) => m.vm.member?.userId) - .filter((id) => id !== undefined), - ), - ), - scan< - Set, - { - userIds: Set; - joinedUserIds: Set; - leftUserIds: Set; - } - >( - (prevState, userIds) => { - const left = new Set( - [...prevState.userIds].filter((id) => !userIds.has(id)), - ); - const joined = new Set( - [...userIds].filter((id) => !prevState.userIds.has(id)), - ); - return { userIds: userIds, joinedUserIds: joined, leftUserIds: left }; - }, - { userIds: new Set(), joinedUserIds: new Set(), leftUserIds: new Set() }, - ), - ); - - private readonly allOthersLeft$ = this.matrixUserChanges$.pipe( - filter(({ userIds, leftUserIds }) => { - if (!this.userId) { - logger.warn("Could not access user ID to compute allOthersLeft"); - return false; - } - return ( - userIds.size === 1 && userIds.has(this.userId) && leftUserIds.size > 0 - ); - }), - map(() => "allOthersLeft" as const), - ); // Public for testing public readonly autoLeave$ = merge( - this.options.autoLeaveWhenOthersLeft ? this.allOthersLeft$ : NEVER, + this.options.autoLeaveWhenOthersLeft + ? this.allOthersLeft$.pipe(map(() => "allOthersLeft" as const)) + : NEVER, this.callPickupState$.pipe( filter((state) => state === "timeout" || state === "decline"), ), @@ -1132,6 +1210,9 @@ export class CallViewModel extends ViewModel { merge(this.userHangup$, this.widgetHangup$).pipe( map(() => "user" as const), ), + ).pipe( + this.scope.share, + tap((reason) => this.leaveHoisted$.next(reason)), ); /** @@ -1820,9 +1901,12 @@ export class CallViewModel extends ViewModel { * Whether we are sharing our screen. */ public readonly sharingScreen$ = this.scope.behavior( - from(this.localConnection).pipe( - switchMap((c) => sharingScreen$(c.livekitRoom.localParticipant)), - startWith(false), + from(this.localConnection$).pipe( + switchMap((c) => + c?.state === "ready" + ? sharingScreen$(c.value.livekitRoom.localParticipant) + : of(false), + ), ), ); @@ -1834,17 +1918,26 @@ export class CallViewModel extends ViewModel { "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !this.urlParams.hideScreensharing ? (): void => - void this.localConnection.then( - (c) => - void c.livekitRoom.localParticipant - .setScreenShareEnabled(!this.sharingScreen$.value, { - audio: true, - selfBrowserSurface: "include", - surfaceSwitching: "include", - systemAudio: "include", - }) - .catch(logger.error), - ) + // Once a connection is ready... + void this.localConnection$ + .pipe( + takeWhile((c) => c !== null && c.state !== "error"), + switchMap((c) => (c.state === "ready" ? of(c.value) : NEVER)), + take(1), + this.scope.bind(), + ) + // ...toggle screen sharing. + .subscribe( + (c) => + void c.livekitRoom.localParticipant + .setScreenShareEnabled(!this.sharingScreen$.value, { + audio: true, + selfBrowserSurface: "include", + surfaceSwitching: "include", + systemAudio: "include", + }) + .catch(logger.error), + ) : null; public constructor( @@ -1864,32 +1957,33 @@ export class CallViewModel extends ViewModel { ) { super(); - void from(this.localConnection) - .pipe(this.scope.bind()) - .subscribe( - (c) => - void c - .start() - // eslint-disable-next-line no-console - .then(() => console.log("successfully started publishing")) - // eslint-disable-next-line no-console - .catch((e) => console.error("failed to start publishing", e)), - ); + // Start and stop local and remote connections as needed + this.startConnection$.pipe(this.scope.bind()).subscribe( + (c) => + void c.start().then( + () => logger.info(`Connected to ${c.transport.livekit_service_url}`), + (e) => + logger.error( + `Failed to start connection to ${c.transport.livekit_service_url}`, + e, + ), + ), + ); + this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => { + logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); + c.stop(); + }); - this.startConnection$ - .pipe(this.scope.bind()) - .subscribe((c) => void c.start()); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => c.stop()); - - combineLatest([this.localFocus, this.join$]) - .pipe(this.scope.bind()) - .subscribe(([localFocus]) => { + // Start and stop session membership as needed + this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => { + if (localTransport?.state === "ready") { void enterRTCSession( this.matrixRTCSession, - localFocus, + localTransport.value, this.options.encryptionSystem.kind !== E2eeType.NONE, true, true, + multiSfu.value$.value, ) .catch((e) => logger.error("Error entering RTC session", e)) .then(() => @@ -1906,19 +2000,20 @@ export class CallViewModel extends ViewModel { ), ), ); - }); - this.leave$.pipe(this.scope.bind()).subscribe(() => { - // Only sends Matrix leave event. The LiveKit session will disconnect once, uh... - // (TODO-MULTI-SFU does anything actually cause it to disconnect?) - void this.matrixRTCSession - .leaveRoomSession() - .catch((e) => logger.error("Error leaving RTC session", e)) - .then(async () => - widget?.api.transport - .send(ElementWidgetActions.HangupCall, {}) - .catch((e) => logger.error("Failed to send hangup action", e)), - ); + return (): void => + // Only sends Matrix leave event. The LiveKit session will disconnect + // as soon as either the stopConnection$ handler above gets to it or + // the view model is destroyed. + void this.matrixRTCSession + .leaveRoomSession() + .catch((e) => logger.error("Error leaving RTC session", e)) + .then(async () => + widget?.api.transport + .send(ElementWidgetActions.HangupCall, {}) + .catch((e) => logger.error("Failed to send hangup action", e)), + ); + } }); // Pause upstream of all local media tracks when we're disconnected from @@ -1927,10 +2022,12 @@ export class CallViewModel extends ViewModel { // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - void this.localConnection.then((localConnection) => - this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { + combineLatest([this.localConnection$, this.matrixConnected$]) + .pipe(this.scope.bind()) + .subscribe(([connection, connected]) => { + if (connection?.state !== "ready") return; const publications = - localConnection.livekitRoom.localParticipant.trackPublications.values(); + connection.value.livekitRoom.localParticipant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -1966,8 +2063,7 @@ export class CallViewModel extends ViewModel { } } } - }), - ); + }); // Join automatically this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 8eaed463..992d8840 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -62,7 +62,7 @@ export class Connection { protected readonly sfuConfig = getSFUConfigWithOpenID( this.client, - this.focus.livekit_service_url, + this.transport.livekit_service_url, this.livekitAlias, ); @@ -72,12 +72,12 @@ export class Connection { public connectionState$: Behavior; public constructor( - protected readonly focus: LivekitTransport, + public readonly transport: LivekitTransport, protected readonly livekitAlias: string, protected readonly client: MatrixClient, protected readonly scope: ObservableScope, - protected readonly membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitTransport }[] + protected readonly remoteTransports$: Behavior< + { membership: CallMembership; transport: LivekitTransport }[] >, e2eeLivekitOptions: E2EEOptions | undefined, livekitRoom: LivekitRoom | undefined = undefined, @@ -95,12 +95,13 @@ export class Connection { this.publishingParticipants$ = this.scope.behavior( combineLatest( - [this.participantsIncludingSubscribers$, this.membershipsFocusMap$], - (participants, membershipsFocusMap) => - membershipsFocusMap + [this.participantsIncludingSubscribers$, this.remoteTransports$], + (participants, remoteTransports) => + remoteTransports // Find all members that claim to publish on this connection - .flatMap(({ membership, focus }) => - focus.livekit_service_url === this.focus.livekit_service_url + .flatMap(({ membership, transport }) => + transport.livekit_service_url === + this.transport.livekit_service_url ? [membership] : [], ) @@ -130,23 +131,35 @@ export class PublishConnection extends Connection { if (!this.stopped) await this.livekitRoom.connect(url, jwt); if (!this.stopped) { - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: this.muteStates.audio.enabled$.value, - video: this.muteStates.video.enabled$.value, - }); - for (const track of tracks) { - await this.livekitRoom.localParticipant.publishTrack(track); + // TODO-MULTI-SFU: Prepublish a microphone track + const audio = this.muteStates.audio.enabled$.value; + const video = this.muteStates.video.enabled$.value; + // createTracks throws if called with audio=false and video=false + if (audio || video) { + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio, + video, + }); + for (const track of tracks) { + await this.livekitRoom.localParticipant.publishTrack(track); + } } } } + public stop(): void { + this.muteStates.audio.unsetHandler(); + this.muteStates.video.unsetHandler(); + super.stop(); + } + public constructor( - focus: LivekitTransport, + transport: LivekitTransport, livekitAlias: string, client: MatrixClient, scope: ObservableScope, - membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitTransport }[] + remoteTransports$: Behavior< + { membership: CallMembership; transport: LivekitTransport }[] >, devices: MediaDevices, private readonly muteStates: MuteStates, @@ -182,11 +195,11 @@ export class PublishConnection extends Connection { }); super( - focus, + transport, livekitAlias, client, scope, - membershipsFocusMap$, + remoteTransports$, e2eeLivekitOptions, room, ); @@ -218,10 +231,6 @@ export class PublishConnection extends Connection { } return this.livekitRoom.localParticipant.isCameraEnabled; }); - this.scope.onEnd(() => { - this.muteStates.audio.unsetHandler(); - this.muteStates.video.unsetHandler(); - }); const syncDevice = ( kind: MediaDeviceKind, diff --git a/src/state/MuteStates.ts b/src/state/MuteStates.ts index c93e88d8..07bc5665 100644 --- a/src/state/MuteStates.ts +++ b/src/state/MuteStates.ts @@ -137,7 +137,7 @@ export class MuteStates { this.scope, this.mediaDevices.audioInput, this.joined$, - Config.get().media_devices.enable_video, + Config.get().media_devices.enable_audio, ); public readonly video = new MuteState( this.scope, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index fe99d89b..8ac816ca 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -8,9 +8,10 @@ Please see LICENSE in the repository root for full details. import { BehaviorSubject, distinctUntilChanged, + filter, type Observable, share, - Subject, + take, takeUntil, } from "rxjs"; @@ -24,9 +25,11 @@ const nothing = Symbol("nothing"); * A scope which limits the execution lifetime of its bound Observables. */ export class ObservableScope { - private readonly ended$ = new Subject(); + private readonly ended$ = new BehaviorSubject(false); - private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$); + private readonly bindImpl: MonoTypeOperator = takeUntil( + this.ended$.pipe(filter((ended) => ended)), + ); /** * Binds an Observable to this scope, so that it completes when the scope @@ -78,15 +81,19 @@ export class ObservableScope { * Ends the scope, causing any bound Observables to complete. */ public end(): void { - this.ended$.next(); - this.ended$.complete(); + this.ended$.next(true); } /** * Register a callback to be executed when the scope is ended. */ public onEnd(callback: () => void): void { - this.ended$.subscribe(callback); + this.ended$ + .pipe( + filter((ended) => ended), + take(1), + ) + .subscribe(callback); } } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index 5cb0b450..b77c0ff0 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -11,7 +11,7 @@ export enum ErrorCode { /** * Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured. */ - MISSING_MATRIX_RTC_FOCUS = "MISSING_MATRIX_RTC_FOCUS", + MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", @@ -54,18 +54,18 @@ export class ElementCallError extends Error { } } -export class MatrixRTCFocusMissingError extends ElementCallError { +export class MatrixRTCTransportMissingError extends ElementCallError { public domain: string; public constructor(domain: string) { super( t("error.call_is_not_supported"), - ErrorCode.MISSING_MATRIX_RTC_FOCUS, + ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, ErrorCategory.CONFIGURATION_ISSUE, - t("error.matrix_rtc_focus_missing", { + t("error.matrix_rtc_transport_missing", { domain, brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call", - errorCode: ErrorCode.MISSING_MATRIX_RTC_FOCUS, + errorCode: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, }), ); this.domain = domain;