diff --git a/locales/en/app.json b/locales/en/app.json index dc027c92..704f68ac 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -72,6 +72,7 @@ "livekit_server_info": "LiveKit Server Info", "livekit_sfu": "LiveKit SFU: {{url}}", "matrix_id": "Matrix ID: {{id}}", + "multi_sfu": "Multi-SFU media transport", "mute_all_audio": "Mute all audio (participants, reactions, join sounds)", "show_connection_stats": "Show connection statistics", "url_params": "URL parameters", @@ -91,7 +92,7 @@ "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", - "matrix_rtc_focus_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/src/room/GroupCallErrorBoundary.test.tsx b/src/room/GroupCallErrorBoundary.test.tsx index 51912956..22338924 100644 --- a/src/room/GroupCallErrorBoundary.test.tsx +++ b/src/room/GroupCallErrorBoundary.test.tsx @@ -26,7 +26,7 @@ import { E2EENotSupportedError, type ElementCallError, InsufficientCapacityError, - MatrixRTCFocusMissingError, + MatrixRTCTransportMissingError, UnknownCallError, } from "../utils/errors.ts"; import { mockConfig } from "../utils/test.ts"; @@ -34,7 +34,7 @@ import { ElementWidgetActions, type WidgetHelpers } from "../widget.ts"; test.each([ { - error: new MatrixRTCFocusMissingError("example.com"), + error: new MatrixRTCTransportMissingError("example.com"), expectedTitle: "Call is not supported", }, { @@ -85,7 +85,7 @@ test.each([ ); test("should render the error page with link back to home", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; @@ -213,7 +213,7 @@ describe("Rageshake button", () => { }); test("should have a close button in widget mode", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; diff --git a/src/room/GroupCallView.test.tsx b/src/room/GroupCallView.test.tsx index bf5d1fef..b8bc2f53 100644 --- a/src/room/GroupCallView.test.tsx +++ b/src/room/GroupCallView.test.tsx @@ -42,7 +42,7 @@ import { import { GroupCallView } from "./GroupCallView"; import { type WidgetHelpers } from "../widget"; import { LazyEventEmitter } from "../LazyEventEmitter"; -import { MatrixRTCFocusMissingError } from "../utils/errors"; +import { MatrixRTCTransportMissingError } from "../utils/errors"; import { ProcessorProvider } from "../livekit/TrackProcessorContext"; import { MediaDevicesContext } from "../MediaDevicesContext"; import { HeaderStyle } from "../UrlParams"; @@ -258,7 +258,7 @@ test("GroupCallView leaves the session when an error occurs", async () => { test("GroupCallView shows errors that occur during joining", async () => { const user = userEvent.setup(); - enterRTCSession.mockRejectedValue(new MatrixRTCFocusMissingError("")); + enterRTCSession.mockRejectedValue(new MatrixRTCTransportMissingError("")); onTestFinished(() => { enterRTCSession.mockReset(); }); diff --git a/src/room/useActiveFocus.ts b/src/room/useActiveFocus.ts deleted file mode 100644 index 7a8f4521..00000000 --- a/src/room/useActiveFocus.ts +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2023, 2024 New Vector Ltd. - -SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -Please see LICENSE in the repository root for full details. -*/ - -import { - type MatrixRTCSession, - MatrixRTCSessionEvent, -} from "matrix-js-sdk/lib/matrixrtc"; -import { useCallback, useRef } from "react"; -import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { logger } from "matrix-js-sdk/lib/logger"; -import { type LivekitFocus, isLivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; - -import { useTypedEventEmitterState } from "../useEvents"; - -/** - * Gets the currently active (livekit) focus for a MatrixRTC session - * This logic is specific to livekit foci where the whole call must use one - * and the same focus. - */ -export function useActiveLivekitFocus( - rtcSession: MatrixRTCSession, -): LivekitFocus | undefined { - const prevActiveFocus = useRef(undefined); - return useTypedEventEmitterState( - rtcSession, - MatrixRTCSessionEvent.MembershipsChanged, - useCallback(() => { - const f = rtcSession.getActiveFocus(); - // Only handle foci with type="livekit" for now. - if (f && isLivekitFocus(f) && !deepCompare(f, prevActiveFocus.current)) { - const oldestMembership = rtcSession.getOldestMembership(); - logger.info( - `Got new active focus from membership: ${oldestMembership?.sender}/${oldestMembership?.deviceId}. - Updated focus (focus switch) from ${JSON.stringify(prevActiveFocus.current)} to ${JSON.stringify(f)}`, - ); - prevActiveFocus.current = f; - } - return prevActiveFocus.current; - }, [rtcSession]), - ); -} diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index b6918f3a..3cdd82e7 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -6,11 +6,10 @@ Please see LICENSE in the repository root for full details. */ import { - isLivekitFocusConfig, - type LivekitFocusConfig, - type LivekitFocus, - type LivekitFocusSelection, type MatrixRTCSession, + isLivekitTransportConfig, + type LivekitTransportConfig, + type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -18,53 +17,42 @@ import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { PosthogAnalytics } from "./analytics/PosthogAnalytics"; import { Config } from "./config/Config"; import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget"; -import { MatrixRTCFocusMissingError } from "./utils/errors"; +import { MatrixRTCTransportMissingError } from "./utils/errors"; import { getUrlParams } from "./UrlParams"; import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; -export function makeActiveFocus(): LivekitFocusSelection { - return { - type: "livekit", - focus_selection: "oldest_membership", - }; -} - export function getLivekitAlias(rtcSession: MatrixRTCSession): string { // For now we assume everything is a room-scoped call return rtcSession.room.roomId; } -async function makeFocusInternal( +async function makeTransportInternal( rtcSession: MatrixRTCSession, -): Promise { - logger.log("Searching for a preferred focus"); +): Promise { + logger.log("Searching for a preferred transport"); const livekitAlias = getLivekitAlias(rtcSession); - const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth"); + // TODO-MULTI-SFU: Either remove this dev tool or make it more official + const urlFromStorage = + localStorage.getItem("robin-matrixrtc-auth") ?? + localStorage.getItem("timo-focus-url"); if (urlFromStorage !== null) { - const focusFromStorage: LivekitFocus = { + const transportFromStorage: LivekitTransport = { type: "livekit", livekit_service_url: urlFromStorage, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from local storage: ", focusFromStorage); - return focusFromStorage; + logger.log( + "Using LiveKit transport from local storage: ", + transportFromStorage, + ); + return transportFromStorage; } // Prioritize the .well-known/matrix/client, if available, over the configured SFU const domain = rtcSession.room.client.getDomain(); - if (localStorage.getItem("timo-focus-url")) { - const timoFocusUrl = localStorage.getItem("timo-focus-url")!; - const focusFromUrl: LivekitFocus = { - type: "livekit", - livekit_service_url: timoFocusUrl, - livekit_alias: livekitAlias, - }; - logger.log("Using LiveKit focus from localStorage: ", timoFocusUrl); - return focusFromUrl; - } if (domain) { // we use AutoDiscovery instead of relying on the MatrixClient having already // been fully configured and started @@ -72,49 +60,50 @@ async function makeFocusInternal( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const focus: LivekitFocusConfig | undefined = wellKnownFoci.find( - (f) => f && isLivekitFocusConfig(f), + const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( + (f) => f && isLivekitTransportConfig(f), ); - if (focus !== undefined) { - logger.log("Using LiveKit focus from .well-known: ", focus); - return { ...focus, livekit_alias: livekitAlias }; + if (transport !== undefined) { + logger.log("Using LiveKit transport from .well-known: ", transport); + return { ...transport, livekit_alias: livekitAlias }; } } } const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFromConf: LivekitFocus = { + const transportFromConf: LivekitTransport = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from config: ", focusFromConf); - return focusFromConf; + logger.log("Using LiveKit transport from config: ", transportFromConf); + return transportFromConf; } - throw new MatrixRTCFocusMissingError(domain ?? ""); + throw new MatrixRTCTransportMissingError(domain ?? ""); } -export async function makeFocus( +export async function makeTransport( rtcSession: MatrixRTCSession, -): Promise { - const focus = await makeFocusInternal(rtcSession); +): Promise { + const transport = await makeTransportInternal(rtcSession); // this will call the jwt/sfu/get endpoint to pre create the livekit room. await getSFUConfigWithOpenID( rtcSession.room.client, - focus.livekit_service_url, - focus.livekit_alias, + transport.livekit_service_url, + transport.livekit_alias, ); - return focus; + return transport; } export async function enterRTCSession( rtcSession: MatrixRTCSession, - focus: LivekitFocus, + transport: LivekitTransport, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, + useMultiSfu = true, ): Promise { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -127,25 +116,31 @@ export async function enterRTCSession( const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; const { sendNotificationType: notificationType, callIntent } = getUrlParams(); - rtcSession.joinRoomSession([focus], focus, { - notificationType, - callIntent, - useNewMembershipManager, - manageMediaKeys: encryptMedia, - ...(useDeviceSessionMemberEvents !== undefined && { - useLegacyMemberEvents: !useDeviceSessionMemberEvents, - }), - delayedLeaveEventRestartMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_ms, - delayedLeaveEventDelayMs: - matrixRtcSessionConfig?.delayed_leave_event_delay_ms, - delayedLeaveEventRestartLocalTimeoutMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, - networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, - makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, - membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, - useExperimentalToDeviceTransport, - }); + // Multi-sfu does not need a preferred foci list. just the focus that is actually used. + rtcSession.joinRoomSession( + useMultiSfu ? [] : [transport], + useMultiSfu ? transport : undefined, + { + notificationType, + callIntent, + useNewMembershipManager, + manageMediaKeys: encryptMedia, + ...(useDeviceSessionMemberEvents !== undefined && { + useLegacyMemberEvents: !useDeviceSessionMemberEvents, + }), + delayedLeaveEventRestartMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_ms, + delayedLeaveEventDelayMs: + matrixRtcSessionConfig?.delayed_leave_event_delay_ms, + delayedLeaveEventRestartLocalTimeoutMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, + networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, + makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, + membershipEventExpiryMs: + matrixRtcSessionConfig?.membership_event_expiry_ms, + useExperimentalToDeviceTransport, + }, + ); if (widget) { try { await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); diff --git a/src/settings/DeveloperSettingsTab.tsx b/src/settings/DeveloperSettingsTab.tsx index 1949ecf7..36c8a2e6 100644 --- a/src/settings/DeveloperSettingsTab.tsx +++ b/src/settings/DeveloperSettingsTab.tsx @@ -16,6 +16,7 @@ import { showConnectionStats as showConnectionStatsSetting, useNewMembershipManager as useNewMembershipManagerSetting, useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting, + multiSfu as multiSfuSetting, muteAllAudio as muteAllAudioSetting, alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting, } from "./settings"; @@ -50,6 +51,7 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { useExperimentalToDeviceTransport, setUseExperimentalToDeviceTransport, ] = useSetting(useExperimentalToDeviceTransportSetting); + const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting); const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting); @@ -166,6 +168,20 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { )} /> + + ): void => { + setMultiSfu(event.target.checked); + }, + [setMultiSfu], + )} + /> + ( true, ); +export const multiSfu = new Setting("multi-sfu", false); + export const muteAllAudio = new Setting("mute-all-audio", false); export const alwaysShowSelf = new Setting("always-show-self", true); diff --git a/src/state/Async.ts b/src/state/Async.ts new file mode 100644 index 00000000..2baa674c --- /dev/null +++ b/src/state/Async.ts @@ -0,0 +1,44 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + catchError, + from, + map, + Observable, + of, + startWith, + switchMap, +} from "rxjs"; + +export type Async = + | { state: "loading" } + | { state: "error"; value: Error } + | { state: "ready"; value: A }; + +export const loading: Async = { state: "loading" }; +export function error(value: Error): Async { + return { state: "error", value }; +} +export function ready(value: A): Async { + return { state: "ready", value }; +} + +export function async(promise: Promise): Observable> { + return from(promise).pipe( + map(ready), + startWith(loading), + catchError((e) => of(error(e))), + ); +} + +export function mapAsync( + async: Async, + project: (value: A) => B, +): Async { + return async.state === "ready" ? ready(project(async.value)) : async; +} diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 31a7e32d..2c02521e 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -28,6 +28,7 @@ import { EventType, RoomEvent, } from "matrix-js-sdk"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; import { BehaviorSubject, EMPTY, @@ -36,7 +37,6 @@ import { Subject, combineLatest, concat, - concatMap, distinctUntilChanged, endWith, filter, @@ -48,6 +48,7 @@ import { of, pairwise, race, + repeat, scan, skip, skipWhile, @@ -57,6 +58,7 @@ import { switchScan, take, takeUntil, + takeWhile, tap, throttleTime, timer, @@ -64,7 +66,8 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, - isLivekitFocus, + isLivekitTransport, + type LivekitTransport, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -90,6 +93,7 @@ import { import { ObservableScope } from "./ObservableScope"; import { duplicateTiles, + multiSfu, playReactionsSound, showReactions, } from "../settings/settings"; @@ -118,7 +122,7 @@ import { constant, type Behavior } from "./Behavior"; import { enterRTCSession, getLivekitAlias, - makeFocus, + makeTransport, } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; @@ -128,6 +132,7 @@ import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; import { PublishConnection } from "./PublishConnection.ts"; +import { type Async, async, mapAsync, ready } from "./Async"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -299,7 +304,7 @@ class UserMedia { public readonly presenter$: Behavior; public constructor( public readonly id: string, - member: RoomMember | undefined, + member: RoomMember, participant: LocalParticipant | RemoteParticipant | undefined, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -372,7 +377,7 @@ class ScreenShare { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -450,42 +455,36 @@ export class CallViewModel extends ViewModel { } : undefined; - private readonly localFocus = makeFocus(this.matrixRTCSession); + private readonly join$ = new Subject(); - private readonly localConnection = this.localFocus.then( - (focus) => { - const args: ConnectionOpts = { - focus, - client: this.matrixRTCSession.room.client, - scope: this.scope, - membershipsFocusMap$: this.membershipsAndFocusMap$, - } - return new PublishConnection( - args, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ) - } + public join(): void { + this.join$.next(); + } + + // This is functionally the same Observable as leave$, except here it's + // hoisted to the top of the class. This enables the cyclic dependency between + // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> + // localConnection$ -> transports$ -> joined$ -> leave$. + private readonly leaveHoisted$ = new Subject< + "user" | "timeout" | "decline" | "allOthersLeft" + >(); + + /** + * Whether we are joined to the call. This reflects our local state rather + * than whether all connections are truly up and running. + */ + private readonly joined$ = this.scope.behavior( + this.join$.pipe( + map(() => true), + // Using takeUntil with the repeat operator is perfectly valid. + // eslint-disable-next-line rxjs/no-unsafe-takeuntil + takeUntil(this.leaveHoisted$), + endWith(false), + repeat(), + startWith(false), + ), ); - public readonly livekitConnectionState$ = - this.scope.behavior( - from(this.localConnection).pipe( - switchMap((c) => - c.focusedConnectionState$.pipe( - map((s) => { - if (s.state === "ConnectedToLkRoom") return s.connectionState; - return ConnectionState.Disconnected - }), - distinctUntilChanged(), - ), - ), - startWith(ConnectionState.Disconnected), - ), - ) - /** * The MatrixRTC session participants. @@ -502,122 +501,224 @@ export class CallViewModel extends ViewModel { ), ); - private readonly membershipsAndFocusMap$ = this.scope.behavior( - this.memberships$.pipe( - map((memberships) => - memberships.flatMap((m) => { - const f = this.matrixRTCSession.resolveActiveFocus(m); - return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : []; - }), - ), - ), - ); + /** + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). + */ + private readonly preferredTransport = makeTransport(this.matrixRTCSession); - private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe( - map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), - ); - - private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe( - accumulate( - new Map(), - (prev, [localFocus, focusUrls]) => { - const stopped = new Map(prev); - const next = new Map(); - for (const focusUrl of focusUrls) { - if (focusUrl !== localFocus.livekit_service_url) { - stopped.delete(focusUrl); - - let nextConnection = prev.get(focusUrl); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", - focusUrl, - ); - const args: ConnectionOpts = { - focus: { - type: "livekit", - livekit_service_url: focusUrl, - livekit_alias: this.livekitAlias, - }, - client: this.matrixRTCSession.room.client, - scope: this.scope, - membershipsFocusMap$: this.membershipsAndFocusMap$, + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. + */ + private readonly transports$: Behavior<{ + local: Async; + remote: { membership: CallMembership; transport: LivekitTransport }[]; + } | null> = this.scope.behavior( + this.joined$.pipe( + switchMap((joined) => + joined + ? combineLatest( + [ + async(this.preferredTransport), + this.memberships$, + multiSfu.value$, + ], + (preferred, memberships, multiSfu) => { + const remote = memberships.flatMap((m) => { + if (m.sender === this.userId && m.deviceId === this.deviceId) + return []; + const t = this.matrixRTCSession.resolveActiveFocus(m); + return t && isLivekitTransport(t) + ? [{ membership: m, transport: t }] + : []; + }); + let local = preferred; + if (!multiSfu) { + const oldest = this.matrixRTCSession.getOldestMembership(); + if (oldest !== undefined) { + const selection = oldest.getTransport(oldest); + if (isLivekitTransport(selection)) local = ready(selection); + } } - nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions()); - } else { - logger.log( - "SFU remoteConnections$ use prev connection: ", - focusUrl, - ); - } - next.set(focusUrl, nextConnection); - } - } - - for (const connection of stopped.values()) connection.stop(); - return next; - }, + return { local, remote }; + }, + ) + : of(null), ), ), ); - private readonly join$ = new Subject(); + /** + * Lists the transports used by each MatrixRTC session member other than + * ourselves. + */ + private readonly remoteTransports$ = this.scope.behavior( + this.transports$.pipe(map((transports) => transports?.remote ?? [])), + ); - public join(): void { - this.join$.next(); - } + /** + * The transport over which we should be actively publishing our media. + */ + private readonly localTransport$: Behavior | null> = + this.scope.behavior( + this.transports$.pipe( + map((transports) => transports?.local ?? null), + distinctUntilChanged(deepCompare), + ), + ); - private readonly connectionInstructions$ = this.join$.pipe( - switchMap(() => this.remoteConnections$), - startWith(new Map()), + private readonly localConnectionAndTransport$ = this.scope.behavior( + this.localTransport$.pipe( + map( + (transport) => + transport && + mapAsync(transport, (transport) => { + + const opts: ConnectionOpts = { + transport, + client: this.matrixRTCSession.room.client, + scope: this.scope, + remoteTransports$: this.remoteTransports$, + }; + return { + connection: new PublishConnection( + opts, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + transport, + }}), + ), + ), + ); + + private readonly localConnection$ = this.scope.behavior( + this.localConnectionAndTransport$.pipe( + map((value) => value && mapAsync(value, ({ connection }) => connection)), + ), + ); + + public readonly livekitConnectionState$ = this.scope.behavior( + this.localConnection$.pipe( + switchMap((c) => + c?.state === "ready" + // TODO mapping to ConnectionState for compatibility, but we should use the full state? + ? c.value.focusedConnectionState$.pipe( + map((s) => { + if (s.state === "ConnectedToLkRoom") return s.connectionState; + return ConnectionState.Disconnected + }), + distinctUntilChanged(), + ) + : of(ConnectionState.Disconnected), + ), + ), + ); + + /** + * Connections for each transport in use by one or more session members that + * is *distinct* from the local transport. + */ + private readonly remoteConnections$ = this.scope.behavior( + this.transports$.pipe( + accumulate(new Map(), (prev, transports) => { + const next = new Map(); + + // Until the local transport becomes ready we have no idea which + // transports will actually need a dedicated remote connection + if (transports?.local.state === "ready") { + const localServiceUrl = transports.local.value.livekit_service_url; + const remoteServiceUrls = new Set( + transports.remote.flatMap(({ membership, transport }) => { + const t = this.matrixRTCSession.resolveActiveFocus(membership); + return t && + isLivekitTransport(t) && + t.livekit_service_url !== localServiceUrl + ? [t.livekit_service_url] + : []; + }), + ); + + for (const remoteServiceUrl of remoteServiceUrls) { + let nextConnection = prev.get(remoteServiceUrl); + if (!nextConnection) { + logger.log( + "SFU remoteConnections$ construct new connection: ", + remoteServiceUrl, + ); + + const args: ConnectionOpts = { + transport: { + type: "livekit", + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + }, + client: this.matrixRTCSession.room.client, + scope: this.scope, + remoteTransports$: this.remoteTransports$, + } + nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions()); + } else { + logger.log( + "SFU remoteConnections$ use prev connection: ", + remoteServiceUrl, + ); + } + next.set(remoteServiceUrl, nextConnection); + } + } + + return next; + }), + map((transports) => [...transports.values()]), + ), + ); + + /** + * A list of the connections that should be active at any given time. + */ + private readonly connections$ = this.scope.behavior( + combineLatest( + [this.localConnection$, this.remoteConnections$], + (local, remote) => [ + ...(local?.state === "ready" ? [local.value] : []), + ...remote.values(), + ], + ), + ); + + /** + * Emits with connections whenever they should be started or stopped. + */ + private readonly connectionInstructions$ = this.connections$.pipe( pairwise(), map(([prev, next]) => { const start = new Set(next.values()); - for (const connection of prev.values()) start.delete(connection); + for (const connection of prev) start.delete(connection); const stop = new Set(prev.values()); - for (const connection of next.values()) stop.delete(connection); + for (const connection of next) stop.delete(connection); return { start, stop }; }), - this.scope.share, - ); - - private readonly startConnection$ = this.connectionInstructions$.pipe( - concatMap(({ start }) => start), - ); - private readonly stopConnection$ = this.connectionInstructions$.pipe( - concatMap(({ stop }) => stop), ); public readonly allLivekitRooms$ = this.scope.behavior( - combineLatest([ - this.remoteConnections$, - this.localConnection, - this.localFocus, - ]).pipe( - map(([remoteConnections, localConnection, localFocus]) => - Array.from(remoteConnections.entries()) - .map( - ([index, c]) => - ({ - room: c.livekitRoom, - url: index, - }) as { room: LivekitRoom; url: string; isLocal?: boolean }, - ) - .concat([ - { - room: localConnection.livekitRoom, - url: localFocus.livekit_service_url, - isLocal: true, - }, - ]), + this.connections$.pipe( + map((connections) => + [...connections.values()].map((c) => ({ + room: c.livekitRoom, + url: c.localTransport.livekit_service_url, + isLocal: c instanceof PublishConnection, + })), ), - startWith([]), ), ); private readonly userId = this.matrixRoom.client.getUserId(); + private readonly deviceId = this.matrixRoom.client.getDeviceId(); private readonly matrixConnected$ = this.scope.behavior( // To consider ourselves connected to MatrixRTC, we check the following: @@ -690,41 +791,52 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; + /** + * Lists, for each LiveKit room, the LiveKit participants whose media should + * be presented. + */ public readonly participantsByRoom$ = this.scope.behavior< { livekitRoom: LivekitRoom; url: string; participants: { - participant: LocalParticipant | RemoteParticipant; + id: string; + participant: LocalParticipant | RemoteParticipant | undefined; member: RoomMember; }[]; }[] >( - combineLatest([this.localConnection, this.localFocus]) + // TODO: Move this logic into Connection/PublishConnection if possible + this.localConnectionAndTransport$ .pipe( - switchMap(([localConnection, localFocus]) => { + switchMap((values) => { + if (values?.state !== "ready") return []; + const localConnection = values.value.connection; const memberError = (): never => { throw new Error("No room member for call membership"); }; const localParticipant = { + id: "local", participant: localConnection.livekitRoom.localParticipant, member: this.matrixRoom.getMember(this.userId ?? "") ?? memberError(), }; return this.remoteConnections$.pipe( - switchMap((connections) => + switchMap((remoteConnections) => combineLatest( - [ - [localFocus.livekit_service_url, localConnection] as const, - ...connections, - ].map(([url, c]) => + [localConnection, ...remoteConnections].map((c) => c.publishingParticipants$.pipe( map((ps) => { const participants: { - participant: LocalParticipant | RemoteParticipant; + id: string; + participant: + | LocalParticipant + | RemoteParticipant + | undefined; member: RoomMember; }[] = ps.map(({ participant, membership }) => ({ + id: `${membership.sender}:${membership.deviceId}`, participant, member: getRoomMemberFromRtcMember( @@ -737,7 +849,7 @@ export class CallViewModel extends ViewModel { return { livekitRoom: c.livekitRoom, - url, + url: c.localTransport.livekit_service_url, participants, }; }), @@ -820,34 +932,17 @@ export class CallViewModel extends ViewModel { * List of MediaItems that we want to display */ private readonly mediaItems$ = this.scope.behavior( - combineLatest([ - this.participantsByRoom$, - duplicateTiles.value$, - this.memberships$, - ]).pipe( - scan((prevItems, [participantsByRoom, duplicateTiles, memberships]) => { + combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( + scan((prevItems, [participantsByRoom, duplicateTiles]) => { const newItems: Map = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { for (const { livekitRoom, participants } of participantsByRoom) { - for (const { participant, member } of participants) { - const matrixId = participant.isLocal - ? "local" - : participant.identity; - + for (const { id, participant, member } of participants) { for (let i = 0; i < 1 + duplicateTiles; i++) { - const mediaId = `${matrixId}:${i}`; - let prevMedia = prevItems.get(mediaId); - if (prevMedia && prevMedia instanceof UserMedia) { + const mediaId = `${id}:${i}`; + const prevMedia = prevItems.get(mediaId); + if (prevMedia instanceof UserMedia) prevMedia.updateParticipant(participant); - if (prevMedia.vm.member === undefined) { - // We have a previous media created because of the `debugShowNonMember` flag. - // In this case we actually replace the media item. - // This "hack" never occurs if we do not use the `debugShowNonMember` debugging - // option and if we always find a room member for each rtc member (which also - // only fails if we have a fundamental problem) - prevMedia = undefined; - } - } yield [ mediaId, @@ -864,14 +959,10 @@ export class CallViewModel extends ViewModel { this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixId) ?? "[👻]"), - ), - this.handsRaised$.pipe( - map((v) => v[matrixId]?.time ?? null), - ), - this.reactions$.pipe( - map((v) => v[matrixId] ?? undefined), + map((m) => m.get(id) ?? "[👻]"), ), + this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), + this.reactions$.pipe(map((v) => v[id] ?? undefined)), ), ]; @@ -888,7 +979,7 @@ export class CallViewModel extends ViewModel { livekitRoom, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixId) ?? "[👻]"), + map((m) => m.get(id) ?? "[👻]"), ), ), ]; @@ -942,6 +1033,16 @@ export class CallViewModel extends ViewModel { this.memberships$.pipe(map((ms) => ms.length)), ); + private readonly allOthersLeft$ = this.memberships$.pipe( + pairwise(), + filter( + ([prev, current]) => + current.every((m) => m.sender === this.userId) && + prev.some((m) => m.sender !== this.userId), + ), + map(() => {}), + ); + private readonly didSendCallNotification$ = fromEvent( this.matrixRTCSession, MatrixRTCSessionEvent.DidSendCallNotification, @@ -1066,56 +1167,12 @@ export class CallViewModel extends ViewModel { map(() => {}), throttleTime(THROTTLE_SOUND_EFFECT_MS), ); - /** - * This observable tracks the matrix users that are currently in the call. - * There can be just one matrix user with multiple participants (see also participantChanges$) - */ - public readonly matrixUserChanges$ = this.userMedia$.pipe( - map( - (mediaItems) => - new Set( - mediaItems - .map((m) => m.vm.member?.userId) - .filter((id) => id !== undefined), - ), - ), - scan< - Set, - { - userIds: Set; - joinedUserIds: Set; - leftUserIds: Set; - } - >( - (prevState, userIds) => { - const left = new Set( - [...prevState.userIds].filter((id) => !userIds.has(id)), - ); - const joined = new Set( - [...userIds].filter((id) => !prevState.userIds.has(id)), - ); - return { userIds: userIds, joinedUserIds: joined, leftUserIds: left }; - }, - { userIds: new Set(), joinedUserIds: new Set(), leftUserIds: new Set() }, - ), - ); - - private readonly allOthersLeft$ = this.matrixUserChanges$.pipe( - filter(({ userIds, leftUserIds }) => { - if (!this.userId) { - logger.warn("Could not access user ID to compute allOthersLeft"); - return false; - } - return ( - userIds.size === 1 && userIds.has(this.userId) && leftUserIds.size > 0 - ); - }), - map(() => "allOthersLeft" as const), - ); // Public for testing public readonly autoLeave$ = merge( - this.options.autoLeaveWhenOthersLeft ? this.allOthersLeft$ : NEVER, + this.options.autoLeaveWhenOthersLeft + ? this.allOthersLeft$.pipe(map(() => "allOthersLeft" as const)) + : NEVER, this.callPickupState$.pipe( filter((state) => state === "timeout" || state === "decline"), ), @@ -1143,6 +1200,9 @@ export class CallViewModel extends ViewModel { merge(this.userHangup$, this.widgetHangup$).pipe( map(() => "user" as const), ), + ).pipe( + this.scope.share, + tap((reason) => this.leaveHoisted$.next(reason)), ); /** @@ -1831,9 +1891,12 @@ export class CallViewModel extends ViewModel { * Whether we are sharing our screen. */ public readonly sharingScreen$ = this.scope.behavior( - from(this.localConnection).pipe( - switchMap((c) => sharingScreen$(c.livekitRoom.localParticipant)), - startWith(false), + from(this.localConnection$).pipe( + switchMap((c) => + c?.state === "ready" + ? sharingScreen$(c.value.livekitRoom.localParticipant) + : of(false), + ), ), ); @@ -1845,17 +1908,26 @@ export class CallViewModel extends ViewModel { "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !this.urlParams.hideScreensharing ? (): void => - void this.localConnection.then( - (c) => - void c.livekitRoom.localParticipant - .setScreenShareEnabled(!this.sharingScreen$.value, { - audio: true, - selfBrowserSurface: "include", - surfaceSwitching: "include", - systemAudio: "include", - }) - .catch(logger.error), - ) + // Once a connection is ready... + void this.localConnection$ + .pipe( + takeWhile((c) => c !== null && c.state !== "error"), + switchMap((c) => (c.state === "ready" ? of(c.value) : NEVER)), + take(1), + this.scope.bind(), + ) + // ...toggle screen sharing. + .subscribe( + (c) => + void c.livekitRoom.localParticipant + .setScreenShareEnabled(!this.sharingScreen$.value, { + audio: true, + selfBrowserSurface: "include", + surfaceSwitching: "include", + systemAudio: "include", + }) + .catch(logger.error), + ) : null; public constructor( @@ -1875,61 +1947,72 @@ export class CallViewModel extends ViewModel { ) { super(); - void from(this.localConnection) + // Start and stop local and remote connections as needed + this.connectionInstructions$ .pipe(this.scope.bind()) - .subscribe( - (c) => - void c - .start() - // eslint-disable-next-line no-console - .then(() => console.log("successfully started publishing")) - // eslint-disable-next-line no-console - .catch((e) => console.error("failed to start publishing", e)), - ); - - this.startConnection$ - .pipe(this.scope.bind()) - .subscribe((c) => void c.start()); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => void c.stop()); - - combineLatest([this.localFocus, this.join$]) - .pipe(this.scope.bind()) - .subscribe(([localFocus]) => { - void enterRTCSession( - this.matrixRTCSession, - localFocus, - this.options.encryptionSystem.kind !== E2eeType.NONE, - true, - true, - ) - .catch((e) => logger.error("Error entering RTC session", e)) - .then(() => - // Update our member event when our mute state changes. - this.muteStates.video.enabled$ - .pipe(this.scope.bind(), takeUntil(this.leave$)) - // eslint-disable-next-line rxjs/no-nested-subscribe - .subscribe( - (videoEnabled) => - // TODO: Ensure that these calls are serialized in case of - // fast video toggling - void this.matrixRTCSession.updateCallIntent( - videoEnabled ? "video" : "audio", - ), + .subscribe(({ start, stop }) => { + for (const c of stop) { + logger.info(`Disconnecting from ${c.localTransport.livekit_service_url}`); + c.stop(); + } + for (const c of start) { + c.start().then( + () => + logger.info(`Connected to ${c.localTransport.livekit_service_url}`), + (e) => + logger.error( + `Failed to start connection to ${c.localTransport.livekit_service_url}`, + e, ), ); + } }); - this.leave$.pipe(this.scope.bind()).subscribe(() => { - // Only sends Matrix leave event. The LiveKit session will disconnect once, uh... - // (TODO-MULTI-SFU does anything actually cause it to disconnect?) - void this.matrixRTCSession - .leaveRoomSession() - .catch((e) => logger.error("Error leaving RTC session", e)) - .then(async () => - widget?.api.transport - .send(ElementWidgetActions.HangupCall, {}) - .catch((e) => logger.error("Failed to send hangup action", e)), + // Start and stop session membership as needed + this.scope.reconcile(this.localTransport$, async (localTransport) => { + if (localTransport?.state === "ready") { + try { + await enterRTCSession( + this.matrixRTCSession, + localTransport.value, + this.options.encryptionSystem.kind !== E2eeType.NONE, + true, + true, + multiSfu.value$.value, + ); + } catch (e) { + logger.error("Error entering RTC session", e); + } + // Update our member event when our mute state changes. + const muteSubscription = this.muteStates.video.enabled$.subscribe( + (videoEnabled) => + // TODO: Ensure that these calls are serialized in case of + // fast video toggling + void this.matrixRTCSession.updateCallIntent( + videoEnabled ? "video" : "audio", + ), ); + + return async (): Promise => { + muteSubscription.unsubscribe(); + // Only sends Matrix leave event. The LiveKit session will disconnect + // as soon as either the stopConnection$ handler above gets to it or + // the view model is destroyed. + try { + await this.matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send( + ElementWidgetActions.HangupCall, + {}, + ); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; + } }); // Pause upstream of all local media tracks when we're disconnected from @@ -1938,10 +2021,12 @@ export class CallViewModel extends ViewModel { // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - void this.localConnection.then((localConnection) => - this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { + combineLatest([this.localConnection$, this.matrixConnected$]) + .pipe(this.scope.bind()) + .subscribe(([connection, connected]) => { + if (connection?.state !== "ready") return; const publications = - localConnection.livekitRoom.localParticipant.trackPublications.values(); + connection.value.livekitRoom.localParticipant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -1977,8 +2062,7 @@ export class CallViewModel extends ViewModel { } } } - }), - ); + }); // Join automatically this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? diff --git a/src/state/Connection.test.ts b/src/state/Connection.test.ts index 5f1778b0..07a38d7d 100644 --- a/src/state/Connection.test.ts +++ b/src/state/Connection.test.ts @@ -5,8 +5,8 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { afterEach, describe, expect, it, type Mock, Mocked, type MockedObject, vi } from "vitest"; -import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; +import { afterEach, describe, expect, it, type Mock, type MockedObject, vi } from "vitest"; +import type { CallMembership, LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, of } from "rxjs"; import { ConnectionState, @@ -28,7 +28,6 @@ import { PublishConnection } from "./PublishConnection.ts"; import { mockMediaDevices, mockMuteStates } from "../utils/test.ts"; import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx"; import { type MuteStates } from "./MuteStates.ts"; -import { DeviceLabel, MediaDevice, SelectedDevice } from "./MediaDevices.ts"; let testScope: ObservableScope; @@ -41,9 +40,9 @@ let localParticipantEventEmiter: EventEmitter; let fakeLocalParticipant: MockedObject; let fakeRoomEventEmiter: EventEmitter; -let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>; +let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>; -const livekitFocus: LivekitFocus = { +const livekitFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", type: "livekit" @@ -62,7 +61,7 @@ function setupTest(): void { ), getDeviceId: vi.fn().mockReturnValue("ABCDEF") } as unknown as OpenIDClientParts); - fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>([]); + fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>([]); localParticipantEventEmiter = new EventEmitter(); @@ -98,8 +97,8 @@ function setupRemoteConnection(): RemoteConnection { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: () => fakeLivekitRoom }; @@ -142,8 +141,8 @@ describe("Start connection states", () => { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: () => fakeLivekitRoom }; @@ -162,8 +161,8 @@ describe("Start connection states", () => { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: () => fakeLivekitRoom }; @@ -215,8 +214,8 @@ describe("Start connection states", () => { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: () => fakeLivekitRoom }; @@ -274,8 +273,8 @@ describe("Start connection states", () => { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: () => fakeLivekitRoom }; @@ -502,7 +501,7 @@ describe("Publishing participants observations", () => { expect(observedPublishers.pop()!.length).toEqual(0); - const otherFocus: LivekitFocus = { + const otherFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt", type: "livekit" @@ -511,10 +510,10 @@ describe("Publishing participants observations", () => { const rtcMemberships = [ // Say bob is on the same focus - { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), focus: livekitFocus }, + { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus }, // Alice and carol is on a different focus - { membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), focus: otherFocus }, - { membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), focus: otherFocus } + { membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), transport: otherFocus }, + { membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), transport: otherFocus } // NO DAVE YET ]; // signal this change in rtc memberships @@ -528,7 +527,7 @@ describe("Publishing participants observations", () => { // Now let's make dan join the rtc memberships rtcMemberships - .push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), focus: livekitFocus }); + .push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), transport: livekitFocus }); fakeMembershipsFocusMap$.next(rtcMemberships); // We should have bob and dan has publishers now @@ -581,7 +580,7 @@ describe("Publishing participants observations", () => { const rtcMemberships = [ // Say bob is on the same focus - { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), focus: livekitFocus } + { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus } ]; // signal this change in rtc memberships fakeMembershipsFocusMap$.next(rtcMemberships); @@ -610,7 +609,7 @@ describe("Publishing participants observations", () => { describe("PublishConnection", () => { - let fakeBlurProcessor: ProcessorWrapper; + // let fakeBlurProcessor: ProcessorWrapper; let roomFactoryMock: Mock<() => LivekitRoom>; let muteStates: MockedObject; @@ -622,14 +621,13 @@ describe("PublishConnection", () => { muteStates = mockMuteStates(); - fakeBlurProcessor = vi.mocked>({ - name: "BackgroundBlur", - start: vi.fn().mockResolvedValue(undefined), - stop: vi.fn().mockResolvedValue(undefined), - setOptions: vi.fn().mockResolvedValue(undefined), - getOptions: vi.fn().mockReturnValue({ strength: 0.5 }), - isRunning: vi.fn().mockReturnValue(false) - }); + // fakeBlurProcessor = vi.mocked>({ + // name: "BackgroundBlur", + // restart: vi.fn().mockResolvedValue(undefined), + // setOptions: vi.fn().mockResolvedValue(undefined), + // getOptions: vi.fn().mockReturnValue({ strength: 0.5 }), + // isRunning: vi.fn().mockReturnValue(false) + // }); } @@ -638,7 +636,7 @@ describe("PublishConnection", () => { describe("Livekit room creation", () => { - function createSetup() { + function createSetup(): void { setUpPublishConnection(); const fakeTrackProcessorSubject$ = new BehaviorSubject({ @@ -648,8 +646,8 @@ describe("PublishConnection", () => { const opts: ConnectionOpts = { client: client, - focus: livekitFocus, - membershipsFocusMap$: fakeMembershipsFocusMap$, + transport: livekitFocus, + remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, livekitRoomFactory: roomFactoryMock }; diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 16dd2607..42423938 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details. import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client"; -import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; +import { type CallMembership, type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest } from "rxjs"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; @@ -17,13 +17,13 @@ import { defaultLiveKitOptions } from "../livekit/options"; export interface ConnectionOpts { /** The focus server to connect to. */ - focus: LivekitFocus; + transport: LivekitTransport; /** The Matrix client to use for OpenID and SFU config requests. */ client: OpenIDClientParts; /** The observable scope to use for this connection. */ scope: ObservableScope; /** An observable of the current RTC call memberships and their associated focus. */ - membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>; + remoteTransports$: Behavior<{ membership: CallMembership; transport: LivekitTransport }[]>; /** Optional factory to create the Livekit room, mainly for testing purposes. */ livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; @@ -31,12 +31,12 @@ export interface ConnectionOpts { export type FocusConnectionState = | { state: 'Initialized' } - | { state: 'FetchingConfig', focus: LivekitFocus } - | { state: 'ConnectingToLkRoom', focus: LivekitFocus } - | { state: 'PublishingTracks', focus: LivekitFocus } - | { state: 'FailedToStart', error: Error, focus: LivekitFocus } - | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitFocus } - | { state: 'Stopped', focus: LivekitFocus }; + | { state: 'FetchingConfig', focus: LivekitTransport } + | { state: 'ConnectingToLkRoom', focus: LivekitTransport } + | { state: 'PublishingTracks', focus: LivekitTransport } + | { state: 'FailedToStart', error: Error, focus: LivekitTransport } + | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitTransport } + | { state: 'Stopped', focus: LivekitTransport }; /** * A connection to a Matrix RTC LiveKit backend. @@ -71,20 +71,20 @@ export class Connection { public async start(): Promise { this.stopped = false; try { - this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.targetFocus }); + this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.localTransport }); // TODO could this be loaded earlier to save time? const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.targetFocus }); + this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.localTransport }); await this.livekitRoom.connect(url, jwt); // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.targetFocus, connectionState: this.livekitRoom.state }); + this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.localTransport, connectionState: this.livekitRoom.state }); } catch (error) { - this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.targetFocus }); + this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.localTransport }); throw error; } } @@ -93,8 +93,8 @@ export class Connection { protected async getSFUConfigWithOpenID(): Promise { return await getSFUConfigWithOpenID( this.client, - this.targetFocus.livekit_service_url, - this.targetFocus.livekit_alias + this.localTransport.livekit_service_url, + this.localTransport.livekit_alias ) } /** @@ -106,7 +106,7 @@ export class Connection { public async stop(): Promise { if (this.stopped) return; await this.livekitRoom.disconnect(); - this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus }); + this._focusedConnectionState$.next({ state: 'Stopped', focus: this.localTransport }); this.stopped = true; } @@ -121,7 +121,7 @@ export class Connection { /** * The focus server to connect to. */ - protected readonly targetFocus: LivekitFocus; + public readonly localTransport: LivekitTransport; private readonly client: OpenIDClientParts; /** @@ -135,11 +135,11 @@ export class Connection { public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, ) { - const { focus, client, scope, membershipsFocusMap$ } = + const { transport, client, scope, remoteTransports$ } = opts; this.livekitRoom = livekitRoom - this.targetFocus = focus; + this.localTransport = transport; this.client = client; this.focusedConnectionState$ = scope.behavior( @@ -153,23 +153,23 @@ export class Connection { this.publishingParticipants$ = scope.behavior( combineLatest( - [participantsIncludingSubscribers$, membershipsFocusMap$], - (participants, membershipsFocusMap) => - membershipsFocusMap + [participantsIncludingSubscribers$, remoteTransports$], + (participants, remoteTransports) => + remoteTransports // Find all members that claim to publish on this connection - .flatMap(({ membership, focus }) => - focus.livekit_service_url === this.targetFocus.livekit_service_url + .flatMap(({ membership, transport }) => + transport.livekit_service_url === + this.localTransport.livekit_service_url ? [membership] : [] ) - // Find all associated publishing livekit participant objects + // Pair with their associated LiveKit participant (if any) + // Uses flatMap to filter out memberships with no associated rtc participant ([]) .flatMap((membership) => { - const participant = participants.find( - (p) => - p.identity === `${membership.sender}:${membership.deviceId}` - ); + const id = `${membership.sender}:${membership.deviceId}`; + const participant = participants.find((p) => p.identity === id); return participant ? [{ participant, membership }] : []; - }) + }), ), [] ); diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index dc2c135a..016c6a49 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -255,7 +255,7 @@ abstract class BaseMediaViewModel extends ViewModel { */ // TODO: Fully separate the data layer from the UI layer by keeping the // member object internal - public readonly member: RoomMember | undefined, + public readonly member: RoomMember, // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // livekit. protected readonly participant$: Observable< @@ -403,7 +403,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -535,7 +535,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -641,7 +641,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -736,7 +736,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, diff --git a/src/state/MuteStates.ts b/src/state/MuteStates.ts index c93e88d8..07bc5665 100644 --- a/src/state/MuteStates.ts +++ b/src/state/MuteStates.ts @@ -137,7 +137,7 @@ export class MuteStates { this.scope, this.mediaDevices.audioInput, this.joined$, - Config.get().media_devices.enable_video, + Config.get().media_devices.enable_audio, ); public readonly video = new MuteState( this.scope, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index fe99d89b..08a4b859 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -7,10 +7,14 @@ Please see LICENSE in the repository root for full details. import { BehaviorSubject, + catchError, distinctUntilChanged, + EMPTY, + endWith, + filter, type Observable, share, - Subject, + take, takeUntil, } from "rxjs"; @@ -24,9 +28,11 @@ const nothing = Symbol("nothing"); * A scope which limits the execution lifetime of its bound Observables. */ export class ObservableScope { - private readonly ended$ = new Subject(); + private readonly ended$ = new BehaviorSubject(false); - private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$); + private readonly bindImpl: MonoTypeOperator = takeUntil( + this.ended$.pipe(filter((ended) => ended)), + ); /** * Binds an Observable to this scope, so that it completes when the scope @@ -78,15 +84,54 @@ export class ObservableScope { * Ends the scope, causing any bound Observables to complete. */ public end(): void { - this.ended$.next(); - this.ended$.complete(); + this.ended$.next(true); } /** * Register a callback to be executed when the scope is ended. */ public onEnd(callback: () => void): void { - this.ended$.subscribe(callback); + this.ended$ + .pipe( + filter((ended) => ended), + take(1), + ) + .subscribe(callback); + } + + // TODO-MULTI-SFU Dear Future Robin, please document this. Love, Past Robin. + public reconcile( + value$: Behavior, + callback: (value: T) => Promise<(() => Promise) | undefined>, + ): void { + let latestValue: T | typeof nothing = nothing; + let reconciledValue: T | typeof nothing = nothing; + let cleanUp: (() => Promise) | undefined = undefined; + let callbackPromise: Promise<(() => Promise) | undefined>; + value$ + .pipe( + catchError(() => EMPTY), + this.bind(), + endWith(nothing), + ) + .subscribe((value) => { + void (async (): Promise => { + if (latestValue === nothing) { + latestValue = value; + while (latestValue !== reconciledValue) { + await cleanUp?.(); + reconciledValue = latestValue; + if (latestValue !== nothing) { + callbackPromise = callback(latestValue); + cleanUp = await callbackPromise; + } + } + latestValue = nothing; + } else { + latestValue = value; + } + })(); + }); } } diff --git a/src/tile/MediaView.tsx b/src/tile/MediaView.tsx index a4fd0402..8506a650 100644 --- a/src/tile/MediaView.tsx +++ b/src/tile/MediaView.tsx @@ -32,7 +32,7 @@ interface Props extends ComponentProps { video: TrackReferenceOrPlaceholder | undefined; videoFit: "cover" | "contain"; mirror: boolean; - member: RoomMember | undefined; + member: RoomMember; videoEnabled: boolean; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; diff --git a/src/tile/SpotlightTile.tsx b/src/tile/SpotlightTile.tsx index 663fb912..b1a15332 100644 --- a/src/tile/SpotlightTile.tsx +++ b/src/tile/SpotlightTile.tsx @@ -55,7 +55,7 @@ interface SpotlightItemBaseProps { targetHeight: number; video: TrackReferenceOrPlaceholder | undefined; videoEnabled: boolean; - member: RoomMember | undefined; + member: RoomMember; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; displayName: string; diff --git a/src/utils/errors.ts b/src/utils/errors.ts index 5cb0b450..b77c0ff0 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -11,7 +11,7 @@ export enum ErrorCode { /** * Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured. */ - MISSING_MATRIX_RTC_FOCUS = "MISSING_MATRIX_RTC_FOCUS", + MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", @@ -54,18 +54,18 @@ export class ElementCallError extends Error { } } -export class MatrixRTCFocusMissingError extends ElementCallError { +export class MatrixRTCTransportMissingError extends ElementCallError { public domain: string; public constructor(domain: string) { super( t("error.call_is_not_supported"), - ErrorCode.MISSING_MATRIX_RTC_FOCUS, + ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, ErrorCategory.CONFIGURATION_ISSUE, - t("error.matrix_rtc_focus_missing", { + t("error.matrix_rtc_transport_missing", { domain, brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call", - errorCode: ErrorCode.MISSING_MATRIX_RTC_FOCUS, + errorCode: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, }), ); this.domain = domain; diff --git a/src/utils/test.ts b/src/utils/test.ts index 842ca008..519fdd50 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -16,12 +16,13 @@ import { } from "matrix-js-sdk"; import { CallMembership, - type Focus, + type Transport, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, MembershipManagerEvent, type SessionMembershipData, Status, + type LivekitFocusSelection, } from "matrix-js-sdk/lib/matrixrtc"; import { type MembershipManagerEventHandlerMap } from "matrix-js-sdk/lib/matrixrtc/IMembershipManager"; import { @@ -172,8 +173,11 @@ export function mockRtcMembership( user: string | RoomMember, deviceId: string, callId = "", - fociPreferred: Focus[] = [], - focusActive: Focus = { type: "oldest_membership" }, + fociPreferred: Transport[] = [], + focusActive: LivekitFocusSelection = { + type: "livekit", + focus_selection: "oldest_membership", + }, membership: Partial = {}, ): CallMembership { const data: SessionMembershipData = {