From 2819c7959013a6a287ebc9960f898a4b454cb4a1 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 30 Sep 2025 16:47:45 +0200 Subject: [PATCH 1/5] use updated multi sfu js-sdk Signed-off-by: Timo K --- src/room/useActiveFocus.ts | 45 ----------------------- src/rtcSessionHelpers.ts | 75 +++++++++++++++++++------------------- src/state/CallViewModel.ts | 6 ++- src/state/Connection.ts | 10 ++--- yarn.lock | 4 +- 5 files changed, 48 insertions(+), 92 deletions(-) delete mode 100644 src/room/useActiveFocus.ts diff --git a/src/room/useActiveFocus.ts b/src/room/useActiveFocus.ts deleted file mode 100644 index 7a8f4521..00000000 --- a/src/room/useActiveFocus.ts +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2023, 2024 New Vector Ltd. - -SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -Please see LICENSE in the repository root for full details. -*/ - -import { - type MatrixRTCSession, - MatrixRTCSessionEvent, -} from "matrix-js-sdk/lib/matrixrtc"; -import { useCallback, useRef } from "react"; -import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { logger } from "matrix-js-sdk/lib/logger"; -import { type LivekitFocus, isLivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; - -import { useTypedEventEmitterState } from "../useEvents"; - -/** - * Gets the currently active (livekit) focus for a MatrixRTC session - * This logic is specific to livekit foci where the whole call must use one - * and the same focus. - */ -export function useActiveLivekitFocus( - rtcSession: MatrixRTCSession, -): LivekitFocus | undefined { - const prevActiveFocus = useRef(undefined); - return useTypedEventEmitterState( - rtcSession, - MatrixRTCSessionEvent.MembershipsChanged, - useCallback(() => { - const f = rtcSession.getActiveFocus(); - // Only handle foci with type="livekit" for now. - if (f && isLivekitFocus(f) && !deepCompare(f, prevActiveFocus.current)) { - const oldestMembership = rtcSession.getOldestMembership(); - logger.info( - `Got new active focus from membership: ${oldestMembership?.sender}/${oldestMembership?.deviceId}. - Updated focus (focus switch) from ${JSON.stringify(prevActiveFocus.current)} to ${JSON.stringify(f)}`, - ); - prevActiveFocus.current = f; - } - return prevActiveFocus.current; - }, [rtcSession]), - ); -} diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index b6918f3a..175b35f4 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -6,11 +6,10 @@ Please see LICENSE in the repository root for full details. */ import { - isLivekitFocusConfig, - type LivekitFocusConfig, - type LivekitFocus, - type LivekitFocusSelection, type MatrixRTCSession, + isLivekitTransportConfig, + type LivekitTransportConfig, + type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -24,13 +23,6 @@ import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; -export function makeActiveFocus(): LivekitFocusSelection { - return { - type: "livekit", - focus_selection: "oldest_membership", - }; -} - export function getLivekitAlias(rtcSession: MatrixRTCSession): string { // For now we assume everything is a room-scoped call return rtcSession.room.roomId; @@ -38,13 +30,13 @@ export function getLivekitAlias(rtcSession: MatrixRTCSession): string { async function makeFocusInternal( rtcSession: MatrixRTCSession, -): Promise { +): Promise { logger.log("Searching for a preferred focus"); const livekitAlias = getLivekitAlias(rtcSession); const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth"); if (urlFromStorage !== null) { - const focusFromStorage: LivekitFocus = { + const focusFromStorage: LivekitTransport = { type: "livekit", livekit_service_url: urlFromStorage, livekit_alias: livekitAlias, @@ -57,7 +49,7 @@ async function makeFocusInternal( const domain = rtcSession.room.client.getDomain(); if (localStorage.getItem("timo-focus-url")) { const timoFocusUrl = localStorage.getItem("timo-focus-url")!; - const focusFromUrl: LivekitFocus = { + const focusFromUrl: LivekitTransport = { type: "livekit", livekit_service_url: timoFocusUrl, livekit_alias: livekitAlias, @@ -72,8 +64,8 @@ async function makeFocusInternal( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const focus: LivekitFocusConfig | undefined = wellKnownFoci.find( - (f) => f && isLivekitFocusConfig(f), + const focus: LivekitTransportConfig | undefined = wellKnownFoci.find( + (f) => f && isLivekitTransportConfig(f), ); if (focus !== undefined) { logger.log("Using LiveKit focus from .well-known: ", focus); @@ -84,7 +76,7 @@ async function makeFocusInternal( const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFromConf: LivekitFocus = { + const focusFromConf: LivekitTransport = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, @@ -98,7 +90,7 @@ async function makeFocusInternal( export async function makeFocus( rtcSession: MatrixRTCSession, -): Promise { +): Promise { const focus = await makeFocusInternal(rtcSession); // this will call the jwt/sfu/get endpoint to pre create the livekit room. await getSFUConfigWithOpenID( @@ -111,10 +103,11 @@ export async function makeFocus( export async function enterRTCSession( rtcSession: MatrixRTCSession, - focus: LivekitFocus, + focus: LivekitTransport, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, + useMultiSfu = true, ): Promise { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -127,25 +120,31 @@ export async function enterRTCSession( const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; const { sendNotificationType: notificationType, callIntent } = getUrlParams(); - rtcSession.joinRoomSession([focus], focus, { - notificationType, - callIntent, - useNewMembershipManager, - manageMediaKeys: encryptMedia, - ...(useDeviceSessionMemberEvents !== undefined && { - useLegacyMemberEvents: !useDeviceSessionMemberEvents, - }), - delayedLeaveEventRestartMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_ms, - delayedLeaveEventDelayMs: - matrixRtcSessionConfig?.delayed_leave_event_delay_ms, - delayedLeaveEventRestartLocalTimeoutMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, - networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, - makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, - membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, - useExperimentalToDeviceTransport, - }); + // Multi-sfu does not need a focus preferred list. just the focus that is actually used. + rtcSession.joinRoomSession( + useMultiSfu ? [focus] : [], + useMultiSfu ? focus : undefined, + { + notificationType, + callIntent, + useNewMembershipManager, + manageMediaKeys: encryptMedia, + ...(useDeviceSessionMemberEvents !== undefined && { + useLegacyMemberEvents: !useDeviceSessionMemberEvents, + }), + delayedLeaveEventRestartMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_ms, + delayedLeaveEventDelayMs: + matrixRtcSessionConfig?.delayed_leave_event_delay_ms, + delayedLeaveEventRestartLocalTimeoutMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, + networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, + makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, + membershipEventExpiryMs: + matrixRtcSessionConfig?.membership_event_expiry_ms, + useExperimentalToDeviceTransport, + }, + ); if (widget) { try { await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 7e3a5bdf..2f4bfa0c 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -64,7 +64,7 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, - isLivekitFocus, + isLivekitTransport, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -493,7 +493,9 @@ export class CallViewModel extends ViewModel { map((memberships) => memberships.flatMap((m) => { const f = this.matrixRTCSession.resolveActiveFocus(m); - return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : []; + return f && isLivekitTransport(f) + ? [{ membership: m, focus: f }] + : []; }), ), ), diff --git a/src/state/Connection.ts b/src/state/Connection.ts index db456ba0..8eaed463 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -18,7 +18,7 @@ import { } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; import { - type LivekitFocus, + type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; import { @@ -72,12 +72,12 @@ export class Connection { public connectionState$: Behavior; public constructor( - protected readonly focus: LivekitFocus, + protected readonly focus: LivekitTransport, protected readonly livekitAlias: string, protected readonly client: MatrixClient, protected readonly scope: ObservableScope, protected readonly membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] + { membership: CallMembership; focus: LivekitTransport }[] >, e2eeLivekitOptions: E2EEOptions | undefined, livekitRoom: LivekitRoom | undefined = undefined, @@ -141,12 +141,12 @@ export class PublishConnection extends Connection { } public constructor( - focus: LivekitFocus, + focus: LivekitTransport, livekitAlias: string, client: MatrixClient, scope: ObservableScope, membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitFocus }[] + { membership: CallMembership; focus: LivekitTransport }[] >, devices: MediaDevices, private readonly muteStates: MuteStates, diff --git a/yarn.lock b/yarn.lock index 4429b7d4..a149eaf5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10299,7 +10299,7 @@ __metadata: "matrix-js-sdk@github:matrix-org/matrix-js-sdk#head=voip-team/multi-SFU": version: 38.3.0 - resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=ca4a9c655537702daf9a69ed5d94831cebc49666" + resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=d94d02d19b9f17c724b5919b185fea3413dbf7a2" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" @@ -10315,7 +10315,7 @@ __metadata: sdp-transform: "npm:^2.14.1" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/1fb0933d0bb686b0f290b1a62f75eec290b7c52a410d5968c2ccfb527a64e78a58012e1bd8f90c874d385dace3228b9a8c80e114ee227fc8a60e7c9611112ceb + checksum: 10c0/dc43617a9398754275e2025af7d5fdee1f2e01b89241fc7881c1206d925e83ad6fe55f439501ae34e734cfbfa5479f6bee3167f4828c913f4f33817d95850189 languageName: node linkType: hard From 68aae4a8e3e2cd208db5ccc402b134b34159af5c Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 2 Oct 2025 11:23:11 +0200 Subject: [PATCH 2/5] fix another rename + another js-sdk bump Signed-off-by: Timo K --- src/utils/test.ts | 10 +++++++--- yarn.lock | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/utils/test.ts b/src/utils/test.ts index 842ca008..519fdd50 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -16,12 +16,13 @@ import { } from "matrix-js-sdk"; import { CallMembership, - type Focus, + type Transport, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, MembershipManagerEvent, type SessionMembershipData, Status, + type LivekitFocusSelection, } from "matrix-js-sdk/lib/matrixrtc"; import { type MembershipManagerEventHandlerMap } from "matrix-js-sdk/lib/matrixrtc/IMembershipManager"; import { @@ -172,8 +173,11 @@ export function mockRtcMembership( user: string | RoomMember, deviceId: string, callId = "", - fociPreferred: Focus[] = [], - focusActive: Focus = { type: "oldest_membership" }, + fociPreferred: Transport[] = [], + focusActive: LivekitFocusSelection = { + type: "livekit", + focus_selection: "oldest_membership", + }, membership: Partial = {}, ): CallMembership { const data: SessionMembershipData = { diff --git a/yarn.lock b/yarn.lock index a149eaf5..197cee3e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10299,7 +10299,7 @@ __metadata: "matrix-js-sdk@github:matrix-org/matrix-js-sdk#head=voip-team/multi-SFU": version: 38.3.0 - resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=d94d02d19b9f17c724b5919b185fea3413dbf7a2" + resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=a343e8c92a5a37f419eb1b762db3a123e41ef66d" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" @@ -10315,7 +10315,7 @@ __metadata: sdp-transform: "npm:^2.14.1" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/dc43617a9398754275e2025af7d5fdee1f2e01b89241fc7881c1206d925e83ad6fe55f439501ae34e734cfbfa5479f6bee3167f4828c913f4f33817d95850189 + checksum: 10c0/4893878f2fe07b06334bab4674a01569037d0f3e737fef3f0bb97a98b01d71fc304627921673f128821a17d824de9b63cc06456db15f9d45eb10bba1ceacd5c5 languageName: node linkType: hard From 86fb026be86d7ea0fa904babd920b6e1d0b9f1c0 Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 3 Oct 2025 14:43:22 -0400 Subject: [PATCH 3/5] Turn multi-SFU media transport into a developer option --- locales/en/app.json | 3 +- src/room/GroupCallErrorBoundary.test.tsx | 8 +- src/room/GroupCallView.test.tsx | 4 +- src/rtcSessionHelpers.ts | 64 ++- src/settings/DeveloperSettingsTab.tsx | 16 + src/settings/settings.ts | 2 + src/state/Async.ts | 44 ++ src/state/CallViewModel.ts | 522 ++++++++++++++--------- src/state/Connection.ts | 57 +-- src/state/MuteStates.ts | 2 +- src/state/ObservableScope.ts | 19 +- src/utils/errors.ts | 10 +- 12 files changed, 461 insertions(+), 290 deletions(-) create mode 100644 src/state/Async.ts diff --git a/locales/en/app.json b/locales/en/app.json index dc027c92..704f68ac 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -72,6 +72,7 @@ "livekit_server_info": "LiveKit Server Info", "livekit_sfu": "LiveKit SFU: {{url}}", "matrix_id": "Matrix ID: {{id}}", + "multi_sfu": "Multi-SFU media transport", "mute_all_audio": "Mute all audio (participants, reactions, join sounds)", "show_connection_stats": "Show connection statistics", "url_params": "URL parameters", @@ -91,7 +92,7 @@ "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", - "matrix_rtc_focus_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/src/room/GroupCallErrorBoundary.test.tsx b/src/room/GroupCallErrorBoundary.test.tsx index 51912956..22338924 100644 --- a/src/room/GroupCallErrorBoundary.test.tsx +++ b/src/room/GroupCallErrorBoundary.test.tsx @@ -26,7 +26,7 @@ import { E2EENotSupportedError, type ElementCallError, InsufficientCapacityError, - MatrixRTCFocusMissingError, + MatrixRTCTransportMissingError, UnknownCallError, } from "../utils/errors.ts"; import { mockConfig } from "../utils/test.ts"; @@ -34,7 +34,7 @@ import { ElementWidgetActions, type WidgetHelpers } from "../widget.ts"; test.each([ { - error: new MatrixRTCFocusMissingError("example.com"), + error: new MatrixRTCTransportMissingError("example.com"), expectedTitle: "Call is not supported", }, { @@ -85,7 +85,7 @@ test.each([ ); test("should render the error page with link back to home", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; @@ -213,7 +213,7 @@ describe("Rageshake button", () => { }); test("should have a close button in widget mode", async () => { - const error = new MatrixRTCFocusMissingError("example.com"); + const error = new MatrixRTCTransportMissingError("example.com"); const TestComponent = (): ReactNode => { throw error; }; diff --git a/src/room/GroupCallView.test.tsx b/src/room/GroupCallView.test.tsx index bf5d1fef..b8bc2f53 100644 --- a/src/room/GroupCallView.test.tsx +++ b/src/room/GroupCallView.test.tsx @@ -42,7 +42,7 @@ import { import { GroupCallView } from "./GroupCallView"; import { type WidgetHelpers } from "../widget"; import { LazyEventEmitter } from "../LazyEventEmitter"; -import { MatrixRTCFocusMissingError } from "../utils/errors"; +import { MatrixRTCTransportMissingError } from "../utils/errors"; import { ProcessorProvider } from "../livekit/TrackProcessorContext"; import { MediaDevicesContext } from "../MediaDevicesContext"; import { HeaderStyle } from "../UrlParams"; @@ -258,7 +258,7 @@ test("GroupCallView leaves the session when an error occurs", async () => { test("GroupCallView shows errors that occur during joining", async () => { const user = userEvent.setup(); - enterRTCSession.mockRejectedValue(new MatrixRTCFocusMissingError("")); + enterRTCSession.mockRejectedValue(new MatrixRTCTransportMissingError("")); onTestFinished(() => { enterRTCSession.mockReset(); }); diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index 175b35f4..3cdd82e7 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -17,7 +17,7 @@ import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { PosthogAnalytics } from "./analytics/PosthogAnalytics"; import { Config } from "./config/Config"; import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget"; -import { MatrixRTCFocusMissingError } from "./utils/errors"; +import { MatrixRTCTransportMissingError } from "./utils/errors"; import { getUrlParams } from "./UrlParams"; import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; @@ -28,35 +28,31 @@ export function getLivekitAlias(rtcSession: MatrixRTCSession): string { return rtcSession.room.roomId; } -async function makeFocusInternal( +async function makeTransportInternal( rtcSession: MatrixRTCSession, ): Promise { - logger.log("Searching for a preferred focus"); + logger.log("Searching for a preferred transport"); const livekitAlias = getLivekitAlias(rtcSession); - const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth"); + // TODO-MULTI-SFU: Either remove this dev tool or make it more official + const urlFromStorage = + localStorage.getItem("robin-matrixrtc-auth") ?? + localStorage.getItem("timo-focus-url"); if (urlFromStorage !== null) { - const focusFromStorage: LivekitTransport = { + const transportFromStorage: LivekitTransport = { type: "livekit", livekit_service_url: urlFromStorage, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from local storage: ", focusFromStorage); - return focusFromStorage; + logger.log( + "Using LiveKit transport from local storage: ", + transportFromStorage, + ); + return transportFromStorage; } // Prioritize the .well-known/matrix/client, if available, over the configured SFU const domain = rtcSession.room.client.getDomain(); - if (localStorage.getItem("timo-focus-url")) { - const timoFocusUrl = localStorage.getItem("timo-focus-url")!; - const focusFromUrl: LivekitTransport = { - type: "livekit", - livekit_service_url: timoFocusUrl, - livekit_alias: livekitAlias, - }; - logger.log("Using LiveKit focus from localStorage: ", timoFocusUrl); - return focusFromUrl; - } if (domain) { // we use AutoDiscovery instead of relying on the MatrixClient having already // been fully configured and started @@ -64,46 +60,46 @@ async function makeFocusInternal( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const focus: LivekitTransportConfig | undefined = wellKnownFoci.find( + const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( (f) => f && isLivekitTransportConfig(f), ); - if (focus !== undefined) { - logger.log("Using LiveKit focus from .well-known: ", focus); - return { ...focus, livekit_alias: livekitAlias }; + if (transport !== undefined) { + logger.log("Using LiveKit transport from .well-known: ", transport); + return { ...transport, livekit_alias: livekitAlias }; } } } const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFromConf: LivekitTransport = { + const transportFromConf: LivekitTransport = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, }; - logger.log("Using LiveKit focus from config: ", focusFromConf); - return focusFromConf; + logger.log("Using LiveKit transport from config: ", transportFromConf); + return transportFromConf; } - throw new MatrixRTCFocusMissingError(domain ?? ""); + throw new MatrixRTCTransportMissingError(domain ?? ""); } -export async function makeFocus( +export async function makeTransport( rtcSession: MatrixRTCSession, ): Promise { - const focus = await makeFocusInternal(rtcSession); + const transport = await makeTransportInternal(rtcSession); // this will call the jwt/sfu/get endpoint to pre create the livekit room. await getSFUConfigWithOpenID( rtcSession.room.client, - focus.livekit_service_url, - focus.livekit_alias, + transport.livekit_service_url, + transport.livekit_alias, ); - return focus; + return transport; } export async function enterRTCSession( rtcSession: MatrixRTCSession, - focus: LivekitTransport, + transport: LivekitTransport, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, @@ -120,10 +116,10 @@ export async function enterRTCSession( const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; const { sendNotificationType: notificationType, callIntent } = getUrlParams(); - // Multi-sfu does not need a focus preferred list. just the focus that is actually used. + // Multi-sfu does not need a preferred foci list. just the focus that is actually used. rtcSession.joinRoomSession( - useMultiSfu ? [focus] : [], - useMultiSfu ? focus : undefined, + useMultiSfu ? [] : [transport], + useMultiSfu ? transport : undefined, { notificationType, callIntent, diff --git a/src/settings/DeveloperSettingsTab.tsx b/src/settings/DeveloperSettingsTab.tsx index 1949ecf7..36c8a2e6 100644 --- a/src/settings/DeveloperSettingsTab.tsx +++ b/src/settings/DeveloperSettingsTab.tsx @@ -16,6 +16,7 @@ import { showConnectionStats as showConnectionStatsSetting, useNewMembershipManager as useNewMembershipManagerSetting, useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting, + multiSfu as multiSfuSetting, muteAllAudio as muteAllAudioSetting, alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting, } from "./settings"; @@ -50,6 +51,7 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { useExperimentalToDeviceTransport, setUseExperimentalToDeviceTransport, ] = useSetting(useExperimentalToDeviceTransportSetting); + const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting); const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting); @@ -166,6 +168,20 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { )} /> + + ): void => { + setMultiSfu(event.target.checked); + }, + [setMultiSfu], + )} + /> + ( true, ); +export const multiSfu = new Setting("multi-sfu", false); + export const muteAllAudio = new Setting("mute-all-audio", false); export const alwaysShowSelf = new Setting("always-show-self", true); diff --git a/src/state/Async.ts b/src/state/Async.ts new file mode 100644 index 00000000..2baa674c --- /dev/null +++ b/src/state/Async.ts @@ -0,0 +1,44 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + catchError, + from, + map, + Observable, + of, + startWith, + switchMap, +} from "rxjs"; + +export type Async = + | { state: "loading" } + | { state: "error"; value: Error } + | { state: "ready"; value: A }; + +export const loading: Async = { state: "loading" }; +export function error(value: Error): Async { + return { state: "error", value }; +} +export function ready(value: A): Async { + return { state: "ready", value }; +} + +export function async(promise: Promise): Observable> { + return from(promise).pipe( + map(ready), + startWith(loading), + catchError((e) => of(error(e))), + ); +} + +export function mapAsync( + async: Async, + project: (value: A) => B, +): Async { + return async.state === "ready" ? ready(project(async.value)) : async; +} diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 2f4bfa0c..8988e518 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -28,6 +28,7 @@ import { EventType, RoomEvent, } from "matrix-js-sdk"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; import { BehaviorSubject, EMPTY, @@ -48,6 +49,7 @@ import { of, pairwise, race, + repeat, scan, skip, skipWhile, @@ -57,6 +59,7 @@ import { switchScan, take, takeUntil, + takeWhile, tap, throttleTime, timer, @@ -65,6 +68,7 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, isLivekitTransport, + type LivekitTransport, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -90,6 +94,7 @@ import { import { ObservableScope } from "./ObservableScope"; import { duplicateTiles, + multiSfu, playReactionsSound, showReactions, } from "../settings/settings"; @@ -118,7 +123,7 @@ import { constant, type Behavior } from "./Behavior"; import { enterRTCSession, getLivekitAlias, - makeFocus, + makeTransport, } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; @@ -127,6 +132,7 @@ import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; +import { type Async, async, mapAsync, ready } from "./Async"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -449,27 +455,33 @@ export class CallViewModel extends ViewModel { } : undefined; - private readonly localFocus = makeFocus(this.matrixRTCSession); + private readonly join$ = new Subject(); - private readonly localConnection = this.localFocus.then( - (focus) => - new PublishConnection( - focus, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ), - ); + public join(): void { + this.join$.next(); + } - public readonly livekitConnectionState$ = this.scope.behavior( - combineLatest([this.localConnection]).pipe( - switchMap(([c]) => c.connectionState$), - startWith(ConnectionState.Disconnected), + // This is functionally the same Observable as leave$, except here it's + // hoisted to the top of the class. This enables the cyclic dependency between + // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> + // localConnection$ -> transports$ -> joined$ -> leave$. + private readonly leaveHoisted$ = new Subject< + "user" | "timeout" | "decline" | "allOthersLeft" + >(); + + /** + * Whether we are joined to the call. This reflects our local state rather + * than whether all connections are truly up and running. + */ + private readonly joined$ = this.scope.behavior( + this.join$.pipe( + map(() => true), + // Using takeUntil with the repeat operator is perfectly valid. + // eslint-disable-next-line rxjs/no-unsafe-takeuntil + takeUntil(this.leaveHoisted$), + endWith(false), + repeat(), + startWith(false), ), ); @@ -488,125 +500,224 @@ export class CallViewModel extends ViewModel { ), ); - private readonly membershipsAndFocusMap$ = this.scope.behavior( - this.memberships$.pipe( - map((memberships) => - memberships.flatMap((m) => { - const f = this.matrixRTCSession.resolveActiveFocus(m); - return f && isLivekitTransport(f) - ? [{ membership: m, focus: f }] - : []; - }), + /** + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). + */ + private readonly preferredTransport = makeTransport(this.matrixRTCSession); + + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. + */ + private readonly transports$: Behavior<{ + local: Async; + remote: { membership: CallMembership; transport: LivekitTransport }[]; + } | null> = this.scope.behavior( + this.joined$.pipe( + switchMap((joined) => + joined + ? combineLatest( + [ + async(this.preferredTransport), + this.memberships$, + multiSfu.value$, + ], + (preferred, memberships, multiSfu) => { + const remote = memberships.flatMap((m) => { + if (m.sender === this.userId && m.deviceId === this.deviceId) + return []; + const t = this.matrixRTCSession.resolveActiveFocus(m); + return t && isLivekitTransport(t) + ? [{ membership: m, transport: t }] + : []; + }); + let local = preferred; + if (!multiSfu) { + const oldest = this.matrixRTCSession.getOldestMembership(); + if (oldest !== undefined) { + const selection = oldest.getTransport(oldest); + if (isLivekitTransport(selection)) local = ready(selection); + } + } + return { local, remote }; + }, + ) + : of(null), ), ), ); - private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe( - map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), + /** + * Lists the transports used by each MatrixRTC session member other than + * ourselves. + */ + private readonly remoteTransports$ = this.scope.behavior( + this.transports$.pipe(map((transports) => transports?.remote ?? [])), ); + /** + * The transport over which we should be actively publishing our media. + */ + private readonly localTransport$: Behavior | null> = + this.scope.behavior( + this.transports$.pipe( + map((transports) => transports?.local ?? null), + distinctUntilChanged(deepCompare), + ), + ); + + private readonly localConnectionAndTransport$ = this.scope.behavior( + this.localTransport$.pipe( + map( + (transport) => + transport && + mapAsync(transport, (transport) => ({ + connection: new PublishConnection( + transport, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.remoteTransports$, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + transport, + })), + ), + ), + ); + + private readonly localConnection$ = this.scope.behavior( + this.localConnectionAndTransport$.pipe( + map((value) => value && mapAsync(value, ({ connection }) => connection)), + ), + ); + + public readonly livekitConnectionState$ = this.scope.behavior( + this.localConnection$.pipe( + switchMap((c) => + c?.state === "ready" + ? c.value.connectionState$ + : of(ConnectionState.Disconnected), + ), + ), + ); + + /** + * Connections for each transport in use by one or more session members that + * is *distinct* from the local transport. + */ private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe( - accumulate( - new Map(), - (prev, [localFocus, focusUrls]) => { - const stopped = new Map(prev); - const next = new Map(); - for (const focusUrl of focusUrls) { - if (focusUrl !== localFocus.livekit_service_url) { - stopped.delete(focusUrl); + this.transports$.pipe( + accumulate(new Map(), (prev, transports) => { + const next = new Map(); - let nextConnection = prev.get(focusUrl); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", - focusUrl, - ); - nextConnection = new Connection( - { - livekit_service_url: focusUrl, - livekit_alias: this.livekitAlias, - type: "livekit", - }, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.membershipsAndFocusMap$, - this.e2eeLivekitOptions(), - ); - } else { - logger.log( - "SFU remoteConnections$ use prev connection: ", - focusUrl, - ); - } - next.set(focusUrl, nextConnection); + // Until the local transport becomes ready we have no idea which + // transports will actually need a dedicated remote connection + if (transports?.local.state === "ready") { + const localServiceUrl = transports.local.value.livekit_service_url; + const remoteServiceUrls = new Set( + transports.remote.flatMap(({ membership, transport }) => { + const t = this.matrixRTCSession.resolveActiveFocus(membership); + return t && + isLivekitTransport(t) && + t.livekit_service_url !== localServiceUrl + ? [t.livekit_service_url] + : []; + }), + ); + + for (const remoteServiceUrl of remoteServiceUrls) { + let nextConnection = prev.get(remoteServiceUrl); + if (!nextConnection) { + logger.log( + "SFU remoteConnections$ construct new connection: ", + remoteServiceUrl, + ); + nextConnection = new Connection( + { + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + type: "livekit", + }, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.remoteTransports$, + this.e2eeLivekitOptions(), + ); + } else { + logger.log( + "SFU remoteConnections$ use prev connection: ", + remoteServiceUrl, + ); } + next.set(remoteServiceUrl, nextConnection); } + } - for (const connection of stopped.values()) connection.stop(); - return next; - }, - ), + return next; + }), + map((transports) => [...transports.values()]), ), ); - private readonly join$ = new Subject(); + /** + * A list of the connections that should be active at any given time. + */ + private readonly connections$ = this.scope.behavior( + combineLatest( + [this.localConnection$, this.remoteConnections$], + (local, remote) => [ + ...(local?.state === "ready" ? [local.value] : []), + ...remote.values(), + ], + ), + ); - public join(): void { - this.join$.next(); - } - - private readonly connectionInstructions$ = this.join$.pipe( - switchMap(() => this.remoteConnections$), - startWith(new Map()), + private readonly connectionInstructions$ = this.connections$.pipe( pairwise(), map(([prev, next]) => { const start = new Set(next.values()); - for (const connection of prev.values()) start.delete(connection); + for (const connection of prev) start.delete(connection); const stop = new Set(prev.values()); - for (const connection of next.values()) stop.delete(connection); + for (const connection of next) stop.delete(connection); return { start, stop }; }), this.scope.share, ); + /** + * Emits with a connection whenever it should be started. + */ private readonly startConnection$ = this.connectionInstructions$.pipe( concatMap(({ start }) => start), ); + /** + * Emits with a connection whenever it should be stopped. + */ private readonly stopConnection$ = this.connectionInstructions$.pipe( concatMap(({ stop }) => stop), ); public readonly allLivekitRooms$ = this.scope.behavior( - combineLatest([ - this.remoteConnections$, - this.localConnection, - this.localFocus, - ]).pipe( - map(([remoteConnections, localConnection, localFocus]) => - Array.from(remoteConnections.entries()) - .map( - ([index, c]) => - ({ - room: c.livekitRoom, - url: index, - }) as { room: LivekitRoom; url: string; isLocal?: boolean }, - ) - .concat([ - { - room: localConnection.livekitRoom, - url: localFocus.livekit_service_url, - isLocal: true, - }, - ]), + this.connections$.pipe( + map((connections) => + [...connections.values()].map((c) => ({ + room: c.livekitRoom, + url: c.transport.livekit_service_url, + isLocal: c instanceof PublishConnection, + })), ), - startWith([]), ), ); private readonly userId = this.matrixRoom.client.getUserId(); + private readonly deviceId = this.matrixRoom.client.getDeviceId(); private readonly matrixConnected$ = this.scope.behavior( // To consider ourselves connected to MatrixRTC, we check the following: @@ -679,6 +790,10 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; + /** + * Lists, for each LiveKit room, the LiveKit participants whose media should + * be presented. + */ public readonly participantsByRoom$ = this.scope.behavior< { livekitRoom: LivekitRoom; @@ -689,9 +804,12 @@ export class CallViewModel extends ViewModel { }[]; }[] >( - combineLatest([this.localConnection, this.localFocus]) + // TODO: Move this logic into Connection/PublishConnection if possible + this.localConnectionAndTransport$ .pipe( - switchMap(([localConnection, localFocus]) => { + switchMap((values) => { + if (values?.state !== "ready") return []; + const localConnection = values.value.connection; const memberError = (): never => { throw new Error("No room member for call membership"); }; @@ -702,12 +820,9 @@ export class CallViewModel extends ViewModel { }; return this.remoteConnections$.pipe( - switchMap((connections) => + switchMap((remoteConnections) => combineLatest( - [ - [localFocus.livekit_service_url, localConnection] as const, - ...connections, - ].map(([url, c]) => + [localConnection, ...remoteConnections].map((c) => c.publishingParticipants$.pipe( map((ps) => { const participants: { @@ -726,7 +841,7 @@ export class CallViewModel extends ViewModel { return { livekitRoom: c.livekitRoom, - url, + url: c.transport.livekit_service_url, participants, }; }), @@ -809,12 +924,8 @@ export class CallViewModel extends ViewModel { * List of MediaItems that we want to display */ private readonly mediaItems$ = this.scope.behavior( - combineLatest([ - this.participantsByRoom$, - duplicateTiles.value$, - this.memberships$, - ]).pipe( - scan((prevItems, [participantsByRoom, duplicateTiles, memberships]) => { + combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( + scan((prevItems, [participantsByRoom, duplicateTiles]) => { const newItems: Map = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { for (const { livekitRoom, participants } of participantsByRoom) { @@ -829,6 +940,7 @@ export class CallViewModel extends ViewModel { if (prevMedia && prevMedia instanceof UserMedia) { prevMedia.updateParticipant(participant); if (prevMedia.vm.member === undefined) { + // TODO-MULTI-SFU: This is outdated. // We have a previous media created because of the `debugShowNonMember` flag. // In this case we actually replace the media item. // This "hack" never occurs if we do not use the `debugShowNonMember` debugging @@ -931,6 +1043,16 @@ export class CallViewModel extends ViewModel { this.memberships$.pipe(map((ms) => ms.length)), ); + private readonly allOthersLeft$ = this.memberships$.pipe( + pairwise(), + filter( + ([prev, current]) => + current.every((m) => m.sender === this.userId) && + prev.some((m) => m.sender !== this.userId), + ), + map(() => {}), + ); + private readonly didSendCallNotification$ = fromEvent( this.matrixRTCSession, MatrixRTCSessionEvent.DidSendCallNotification, @@ -1055,56 +1177,12 @@ export class CallViewModel extends ViewModel { map(() => {}), throttleTime(THROTTLE_SOUND_EFFECT_MS), ); - /** - * This observable tracks the matrix users that are currently in the call. - * There can be just one matrix user with multiple participants (see also participantChanges$) - */ - public readonly matrixUserChanges$ = this.userMedia$.pipe( - map( - (mediaItems) => - new Set( - mediaItems - .map((m) => m.vm.member?.userId) - .filter((id) => id !== undefined), - ), - ), - scan< - Set, - { - userIds: Set; - joinedUserIds: Set; - leftUserIds: Set; - } - >( - (prevState, userIds) => { - const left = new Set( - [...prevState.userIds].filter((id) => !userIds.has(id)), - ); - const joined = new Set( - [...userIds].filter((id) => !prevState.userIds.has(id)), - ); - return { userIds: userIds, joinedUserIds: joined, leftUserIds: left }; - }, - { userIds: new Set(), joinedUserIds: new Set(), leftUserIds: new Set() }, - ), - ); - - private readonly allOthersLeft$ = this.matrixUserChanges$.pipe( - filter(({ userIds, leftUserIds }) => { - if (!this.userId) { - logger.warn("Could not access user ID to compute allOthersLeft"); - return false; - } - return ( - userIds.size === 1 && userIds.has(this.userId) && leftUserIds.size > 0 - ); - }), - map(() => "allOthersLeft" as const), - ); // Public for testing public readonly autoLeave$ = merge( - this.options.autoLeaveWhenOthersLeft ? this.allOthersLeft$ : NEVER, + this.options.autoLeaveWhenOthersLeft + ? this.allOthersLeft$.pipe(map(() => "allOthersLeft" as const)) + : NEVER, this.callPickupState$.pipe( filter((state) => state === "timeout" || state === "decline"), ), @@ -1132,6 +1210,9 @@ export class CallViewModel extends ViewModel { merge(this.userHangup$, this.widgetHangup$).pipe( map(() => "user" as const), ), + ).pipe( + this.scope.share, + tap((reason) => this.leaveHoisted$.next(reason)), ); /** @@ -1820,9 +1901,12 @@ export class CallViewModel extends ViewModel { * Whether we are sharing our screen. */ public readonly sharingScreen$ = this.scope.behavior( - from(this.localConnection).pipe( - switchMap((c) => sharingScreen$(c.livekitRoom.localParticipant)), - startWith(false), + from(this.localConnection$).pipe( + switchMap((c) => + c?.state === "ready" + ? sharingScreen$(c.value.livekitRoom.localParticipant) + : of(false), + ), ), ); @@ -1834,17 +1918,26 @@ export class CallViewModel extends ViewModel { "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !this.urlParams.hideScreensharing ? (): void => - void this.localConnection.then( - (c) => - void c.livekitRoom.localParticipant - .setScreenShareEnabled(!this.sharingScreen$.value, { - audio: true, - selfBrowserSurface: "include", - surfaceSwitching: "include", - systemAudio: "include", - }) - .catch(logger.error), - ) + // Once a connection is ready... + void this.localConnection$ + .pipe( + takeWhile((c) => c !== null && c.state !== "error"), + switchMap((c) => (c.state === "ready" ? of(c.value) : NEVER)), + take(1), + this.scope.bind(), + ) + // ...toggle screen sharing. + .subscribe( + (c) => + void c.livekitRoom.localParticipant + .setScreenShareEnabled(!this.sharingScreen$.value, { + audio: true, + selfBrowserSurface: "include", + surfaceSwitching: "include", + systemAudio: "include", + }) + .catch(logger.error), + ) : null; public constructor( @@ -1864,32 +1957,33 @@ export class CallViewModel extends ViewModel { ) { super(); - void from(this.localConnection) - .pipe(this.scope.bind()) - .subscribe( - (c) => - void c - .start() - // eslint-disable-next-line no-console - .then(() => console.log("successfully started publishing")) - // eslint-disable-next-line no-console - .catch((e) => console.error("failed to start publishing", e)), - ); + // Start and stop local and remote connections as needed + this.startConnection$.pipe(this.scope.bind()).subscribe( + (c) => + void c.start().then( + () => logger.info(`Connected to ${c.transport.livekit_service_url}`), + (e) => + logger.error( + `Failed to start connection to ${c.transport.livekit_service_url}`, + e, + ), + ), + ); + this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => { + logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); + c.stop(); + }); - this.startConnection$ - .pipe(this.scope.bind()) - .subscribe((c) => void c.start()); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => c.stop()); - - combineLatest([this.localFocus, this.join$]) - .pipe(this.scope.bind()) - .subscribe(([localFocus]) => { + // Start and stop session membership as needed + this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => { + if (localTransport?.state === "ready") { void enterRTCSession( this.matrixRTCSession, - localFocus, + localTransport.value, this.options.encryptionSystem.kind !== E2eeType.NONE, true, true, + multiSfu.value$.value, ) .catch((e) => logger.error("Error entering RTC session", e)) .then(() => @@ -1906,19 +2000,20 @@ export class CallViewModel extends ViewModel { ), ), ); - }); - this.leave$.pipe(this.scope.bind()).subscribe(() => { - // Only sends Matrix leave event. The LiveKit session will disconnect once, uh... - // (TODO-MULTI-SFU does anything actually cause it to disconnect?) - void this.matrixRTCSession - .leaveRoomSession() - .catch((e) => logger.error("Error leaving RTC session", e)) - .then(async () => - widget?.api.transport - .send(ElementWidgetActions.HangupCall, {}) - .catch((e) => logger.error("Failed to send hangup action", e)), - ); + return (): void => + // Only sends Matrix leave event. The LiveKit session will disconnect + // as soon as either the stopConnection$ handler above gets to it or + // the view model is destroyed. + void this.matrixRTCSession + .leaveRoomSession() + .catch((e) => logger.error("Error leaving RTC session", e)) + .then(async () => + widget?.api.transport + .send(ElementWidgetActions.HangupCall, {}) + .catch((e) => logger.error("Failed to send hangup action", e)), + ); + } }); // Pause upstream of all local media tracks when we're disconnected from @@ -1927,10 +2022,12 @@ export class CallViewModel extends ViewModel { // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - void this.localConnection.then((localConnection) => - this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { + combineLatest([this.localConnection$, this.matrixConnected$]) + .pipe(this.scope.bind()) + .subscribe(([connection, connected]) => { + if (connection?.state !== "ready") return; const publications = - localConnection.livekitRoom.localParticipant.trackPublications.values(); + connection.value.livekitRoom.localParticipant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -1966,8 +2063,7 @@ export class CallViewModel extends ViewModel { } } } - }), - ); + }); // Join automatically this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 8eaed463..992d8840 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -62,7 +62,7 @@ export class Connection { protected readonly sfuConfig = getSFUConfigWithOpenID( this.client, - this.focus.livekit_service_url, + this.transport.livekit_service_url, this.livekitAlias, ); @@ -72,12 +72,12 @@ export class Connection { public connectionState$: Behavior; public constructor( - protected readonly focus: LivekitTransport, + public readonly transport: LivekitTransport, protected readonly livekitAlias: string, protected readonly client: MatrixClient, protected readonly scope: ObservableScope, - protected readonly membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitTransport }[] + protected readonly remoteTransports$: Behavior< + { membership: CallMembership; transport: LivekitTransport }[] >, e2eeLivekitOptions: E2EEOptions | undefined, livekitRoom: LivekitRoom | undefined = undefined, @@ -95,12 +95,13 @@ export class Connection { this.publishingParticipants$ = this.scope.behavior( combineLatest( - [this.participantsIncludingSubscribers$, this.membershipsFocusMap$], - (participants, membershipsFocusMap) => - membershipsFocusMap + [this.participantsIncludingSubscribers$, this.remoteTransports$], + (participants, remoteTransports) => + remoteTransports // Find all members that claim to publish on this connection - .flatMap(({ membership, focus }) => - focus.livekit_service_url === this.focus.livekit_service_url + .flatMap(({ membership, transport }) => + transport.livekit_service_url === + this.transport.livekit_service_url ? [membership] : [], ) @@ -130,23 +131,35 @@ export class PublishConnection extends Connection { if (!this.stopped) await this.livekitRoom.connect(url, jwt); if (!this.stopped) { - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: this.muteStates.audio.enabled$.value, - video: this.muteStates.video.enabled$.value, - }); - for (const track of tracks) { - await this.livekitRoom.localParticipant.publishTrack(track); + // TODO-MULTI-SFU: Prepublish a microphone track + const audio = this.muteStates.audio.enabled$.value; + const video = this.muteStates.video.enabled$.value; + // createTracks throws if called with audio=false and video=false + if (audio || video) { + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio, + video, + }); + for (const track of tracks) { + await this.livekitRoom.localParticipant.publishTrack(track); + } } } } + public stop(): void { + this.muteStates.audio.unsetHandler(); + this.muteStates.video.unsetHandler(); + super.stop(); + } + public constructor( - focus: LivekitTransport, + transport: LivekitTransport, livekitAlias: string, client: MatrixClient, scope: ObservableScope, - membershipsFocusMap$: Behavior< - { membership: CallMembership; focus: LivekitTransport }[] + remoteTransports$: Behavior< + { membership: CallMembership; transport: LivekitTransport }[] >, devices: MediaDevices, private readonly muteStates: MuteStates, @@ -182,11 +195,11 @@ export class PublishConnection extends Connection { }); super( - focus, + transport, livekitAlias, client, scope, - membershipsFocusMap$, + remoteTransports$, e2eeLivekitOptions, room, ); @@ -218,10 +231,6 @@ export class PublishConnection extends Connection { } return this.livekitRoom.localParticipant.isCameraEnabled; }); - this.scope.onEnd(() => { - this.muteStates.audio.unsetHandler(); - this.muteStates.video.unsetHandler(); - }); const syncDevice = ( kind: MediaDeviceKind, diff --git a/src/state/MuteStates.ts b/src/state/MuteStates.ts index c93e88d8..07bc5665 100644 --- a/src/state/MuteStates.ts +++ b/src/state/MuteStates.ts @@ -137,7 +137,7 @@ export class MuteStates { this.scope, this.mediaDevices.audioInput, this.joined$, - Config.get().media_devices.enable_video, + Config.get().media_devices.enable_audio, ); public readonly video = new MuteState( this.scope, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index fe99d89b..8ac816ca 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -8,9 +8,10 @@ Please see LICENSE in the repository root for full details. import { BehaviorSubject, distinctUntilChanged, + filter, type Observable, share, - Subject, + take, takeUntil, } from "rxjs"; @@ -24,9 +25,11 @@ const nothing = Symbol("nothing"); * A scope which limits the execution lifetime of its bound Observables. */ export class ObservableScope { - private readonly ended$ = new Subject(); + private readonly ended$ = new BehaviorSubject(false); - private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$); + private readonly bindImpl: MonoTypeOperator = takeUntil( + this.ended$.pipe(filter((ended) => ended)), + ); /** * Binds an Observable to this scope, so that it completes when the scope @@ -78,15 +81,19 @@ export class ObservableScope { * Ends the scope, causing any bound Observables to complete. */ public end(): void { - this.ended$.next(); - this.ended$.complete(); + this.ended$.next(true); } /** * Register a callback to be executed when the scope is ended. */ public onEnd(callback: () => void): void { - this.ended$.subscribe(callback); + this.ended$ + .pipe( + filter((ended) => ended), + take(1), + ) + .subscribe(callback); } } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index 5cb0b450..b77c0ff0 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -11,7 +11,7 @@ export enum ErrorCode { /** * Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured. */ - MISSING_MATRIX_RTC_FOCUS = "MISSING_MATRIX_RTC_FOCUS", + MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", @@ -54,18 +54,18 @@ export class ElementCallError extends Error { } } -export class MatrixRTCFocusMissingError extends ElementCallError { +export class MatrixRTCTransportMissingError extends ElementCallError { public domain: string; public constructor(domain: string) { super( t("error.call_is_not_supported"), - ErrorCode.MISSING_MATRIX_RTC_FOCUS, + ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, ErrorCategory.CONFIGURATION_ISSUE, - t("error.matrix_rtc_focus_missing", { + t("error.matrix_rtc_transport_missing", { domain, brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call", - errorCode: ErrorCode.MISSING_MATRIX_RTC_FOCUS, + errorCode: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT, }), ); this.domain = domain; From 1820cac3f66e9b9801209990d5ad5469ad21e97a Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 3 Oct 2025 19:14:48 -0400 Subject: [PATCH 4/5] Create media items for session members not joined to LiveKit --- src/state/CallViewModel.ts | 49 ++++++++++++++----------------------- src/state/Connection.ts | 14 +++++------ src/state/MediaViewModel.ts | 10 ++++---- src/tile/MediaView.tsx | 2 +- src/tile/SpotlightTile.tsx | 2 +- 5 files changed, 32 insertions(+), 45 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 8988e518..6e333bec 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -304,7 +304,7 @@ class UserMedia { public readonly presenter$: Behavior; public constructor( public readonly id: string, - member: RoomMember | undefined, + member: RoomMember, participant: LocalParticipant | RemoteParticipant | undefined, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -377,7 +377,7 @@ class ScreenShare { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -799,7 +799,8 @@ export class CallViewModel extends ViewModel { livekitRoom: LivekitRoom; url: string; participants: { - participant: LocalParticipant | RemoteParticipant; + id: string; + participant: LocalParticipant | RemoteParticipant | undefined; member: RoomMember; }[]; }[] @@ -814,6 +815,7 @@ export class CallViewModel extends ViewModel { throw new Error("No room member for call membership"); }; const localParticipant = { + id: "local", participant: localConnection.livekitRoom.localParticipant, member: this.matrixRoom.getMember(this.userId ?? "") ?? memberError(), @@ -826,9 +828,14 @@ export class CallViewModel extends ViewModel { c.publishingParticipants$.pipe( map((ps) => { const participants: { - participant: LocalParticipant | RemoteParticipant; + id: string; + participant: + | LocalParticipant + | RemoteParticipant + | undefined; member: RoomMember; }[] = ps.map(({ participant, membership }) => ({ + id: `${membership.sender}:${membership.deviceId}`, participant, member: getRoomMemberFromRtcMember( @@ -929,26 +936,12 @@ export class CallViewModel extends ViewModel { const newItems: Map = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { for (const { livekitRoom, participants } of participantsByRoom) { - for (const { participant, member } of participants) { - const matrixId = participant.isLocal - ? "local" - : participant.identity; - + for (const { id, participant, member } of participants) { for (let i = 0; i < 1 + duplicateTiles; i++) { - const mediaId = `${matrixId}:${i}`; - let prevMedia = prevItems.get(mediaId); - if (prevMedia && prevMedia instanceof UserMedia) { + const mediaId = `${id}:${i}`; + const prevMedia = prevItems.get(mediaId); + if (prevMedia instanceof UserMedia) prevMedia.updateParticipant(participant); - if (prevMedia.vm.member === undefined) { - // TODO-MULTI-SFU: This is outdated. - // We have a previous media created because of the `debugShowNonMember` flag. - // In this case we actually replace the media item. - // This "hack" never occurs if we do not use the `debugShowNonMember` debugging - // option and if we always find a room member for each rtc member (which also - // only fails if we have a fundamental problem) - prevMedia = undefined; - } - } yield [ mediaId, @@ -965,14 +958,10 @@ export class CallViewModel extends ViewModel { this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixId) ?? "[👻]"), - ), - this.handsRaised$.pipe( - map((v) => v[matrixId]?.time ?? null), - ), - this.reactions$.pipe( - map((v) => v[matrixId] ?? undefined), + map((m) => m.get(id) ?? "[👻]"), ), + this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), + this.reactions$.pipe(map((v) => v[id] ?? undefined)), ), ]; @@ -989,7 +978,7 @@ export class CallViewModel extends ViewModel { livekitRoom, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixId) ?? "[👻]"), + map((m) => m.get(id) ?? "[👻]"), ), ), ]; diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 992d8840..4908e42f 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -66,7 +66,7 @@ export class Connection { this.livekitAlias, ); - public readonly participantsIncludingSubscribers$; + private readonly participantsIncludingSubscribers$; public readonly publishingParticipants$; public readonly livekitRoom: LivekitRoom; @@ -105,13 +105,11 @@ export class Connection { ? [membership] : [], ) - // Find all associated publishing livekit participant objects - .flatMap((membership) => { - const participant = participants.find( - (p) => - p.identity === `${membership.sender}:${membership.deviceId}`, - ); - return participant ? [{ participant, membership }] : []; + // Pair with their associated LiveKit participant (if any) + .map((membership) => { + const id = `${membership.sender}:${membership.deviceId}`; + const participant = participants.find((p) => p.identity === id); + return { participant, membership }; }), ), [], diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index dc2c135a..016c6a49 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -255,7 +255,7 @@ abstract class BaseMediaViewModel extends ViewModel { */ // TODO: Fully separate the data layer from the UI layer by keeping the // member object internal - public readonly member: RoomMember | undefined, + public readonly member: RoomMember, // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // livekit. protected readonly participant$: Observable< @@ -403,7 +403,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -535,7 +535,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -641,7 +641,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, @@ -736,7 +736,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public constructor( id: string, - member: RoomMember | undefined, + member: RoomMember, participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, diff --git a/src/tile/MediaView.tsx b/src/tile/MediaView.tsx index a4fd0402..8506a650 100644 --- a/src/tile/MediaView.tsx +++ b/src/tile/MediaView.tsx @@ -32,7 +32,7 @@ interface Props extends ComponentProps { video: TrackReferenceOrPlaceholder | undefined; videoFit: "cover" | "contain"; mirror: boolean; - member: RoomMember | undefined; + member: RoomMember; videoEnabled: boolean; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; diff --git a/src/tile/SpotlightTile.tsx b/src/tile/SpotlightTile.tsx index 663fb912..b1a15332 100644 --- a/src/tile/SpotlightTile.tsx +++ b/src/tile/SpotlightTile.tsx @@ -55,7 +55,7 @@ interface SpotlightItemBaseProps { targetHeight: number; video: TrackReferenceOrPlaceholder | undefined; videoEnabled: boolean; - member: RoomMember | undefined; + member: RoomMember; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; displayName: string; From 1fff71ace1f11dcc869fdfc301114f9f77f67d9a Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 3 Oct 2025 21:00:45 -0400 Subject: [PATCH 5/5] Actually leave the MatrixRTC session again --- src/state/CallViewModel.ts | 123 +++++++++++++++++------------------ src/state/Connection.ts | 35 +++++----- src/state/ObservableScope.ts | 38 +++++++++++ 3 files changed, 116 insertions(+), 80 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6e333bec..1a20589c 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -37,7 +37,6 @@ import { Subject, combineLatest, concat, - concatMap, distinctUntilChanged, endWith, filter, @@ -678,6 +677,9 @@ export class CallViewModel extends ViewModel { ), ); + /** + * Emits with connections whenever they should be started or stopped. + */ private readonly connectionInstructions$ = this.connections$.pipe( pairwise(), map(([prev, next]) => { @@ -688,20 +690,6 @@ export class CallViewModel extends ViewModel { return { start, stop }; }), - this.scope.share, - ); - - /** - * Emits with a connection whenever it should be started. - */ - private readonly startConnection$ = this.connectionInstructions$.pipe( - concatMap(({ start }) => start), - ); - /** - * Emits with a connection whenever it should be stopped. - */ - private readonly stopConnection$ = this.connectionInstructions$.pipe( - concatMap(({ stop }) => stop), ); public readonly allLivekitRooms$ = this.scope.behavior( @@ -1947,61 +1935,70 @@ export class CallViewModel extends ViewModel { super(); // Start and stop local and remote connections as needed - this.startConnection$.pipe(this.scope.bind()).subscribe( - (c) => - void c.start().then( - () => logger.info(`Connected to ${c.transport.livekit_service_url}`), - (e) => - logger.error( - `Failed to start connection to ${c.transport.livekit_service_url}`, - e, - ), - ), - ); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => { - logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); - c.stop(); - }); - - // Start and stop session membership as needed - this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => { - if (localTransport?.state === "ready") { - void enterRTCSession( - this.matrixRTCSession, - localTransport.value, - this.options.encryptionSystem.kind !== E2eeType.NONE, - true, - true, - multiSfu.value$.value, - ) - .catch((e) => logger.error("Error entering RTC session", e)) - .then(() => - // Update our member event when our mute state changes. - this.muteStates.video.enabled$ - .pipe(this.scope.bind(), takeUntil(this.leave$)) - // eslint-disable-next-line rxjs/no-nested-subscribe - .subscribe( - (videoEnabled) => - // TODO: Ensure that these calls are serialized in case of - // fast video toggling - void this.matrixRTCSession.updateCallIntent( - videoEnabled ? "video" : "audio", - ), + this.connectionInstructions$ + .pipe(this.scope.bind()) + .subscribe(({ start, stop }) => { + for (const c of stop) { + logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); + c.stop(); + } + for (const c of start) { + c.start().then( + () => + logger.info(`Connected to ${c.transport.livekit_service_url}`), + (e) => + logger.error( + `Failed to start connection to ${c.transport.livekit_service_url}`, + e, ), ); + } + }); - return (): void => + // Start and stop session membership as needed + this.scope.reconcile(this.localTransport$, async (localTransport) => { + if (localTransport?.state === "ready") { + try { + await enterRTCSession( + this.matrixRTCSession, + localTransport.value, + this.options.encryptionSystem.kind !== E2eeType.NONE, + true, + true, + multiSfu.value$.value, + ); + } catch (e) { + logger.error("Error entering RTC session", e); + } + // Update our member event when our mute state changes. + const muteSubscription = this.muteStates.video.enabled$.subscribe( + (videoEnabled) => + // TODO: Ensure that these calls are serialized in case of + // fast video toggling + void this.matrixRTCSession.updateCallIntent( + videoEnabled ? "video" : "audio", + ), + ); + + return async (): Promise => { + muteSubscription.unsubscribe(); // Only sends Matrix leave event. The LiveKit session will disconnect // as soon as either the stopConnection$ handler above gets to it or // the view model is destroyed. - void this.matrixRTCSession - .leaveRoomSession() - .catch((e) => logger.error("Error leaving RTC session", e)) - .then(async () => - widget?.api.transport - .send(ElementWidgetActions.HangupCall, {}) - .catch((e) => logger.error("Failed to send hangup action", e)), + try { + await this.matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send( + ElementWidgetActions.HangupCall, + {}, ); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; } }); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 4908e42f..55afdacf 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -125,6 +125,24 @@ export class Connection { export class PublishConnection extends Connection { public async start(): Promise { this.stopped = false; + + this.muteStates.audio.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit audio input mute state", e); + } + return this.livekitRoom.localParticipant.isMicrophoneEnabled; + }); + this.muteStates.video.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setCameraEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit video input mute state", e); + } + return this.livekitRoom.localParticipant.isCameraEnabled; + }); + const { url, jwt } = await this.sfuConfig; if (!this.stopped) await this.livekitRoom.connect(url, jwt); @@ -213,23 +231,6 @@ export class PublishConnection extends Connection { ); trackProcessorSync(track$, trackerProcessorState$); - this.muteStates.audio.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit audio input mute state", e); - } - return this.livekitRoom.localParticipant.isMicrophoneEnabled; - }); - this.muteStates.video.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setCameraEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit video input mute state", e); - } - return this.livekitRoom.localParticipant.isCameraEnabled; - }); - const syncDevice = ( kind: MediaDeviceKind, selected$: Observable, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 8ac816ca..08a4b859 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -7,7 +7,10 @@ Please see LICENSE in the repository root for full details. import { BehaviorSubject, + catchError, distinctUntilChanged, + EMPTY, + endWith, filter, type Observable, share, @@ -95,6 +98,41 @@ export class ObservableScope { ) .subscribe(callback); } + + // TODO-MULTI-SFU Dear Future Robin, please document this. Love, Past Robin. + public reconcile( + value$: Behavior, + callback: (value: T) => Promise<(() => Promise) | undefined>, + ): void { + let latestValue: T | typeof nothing = nothing; + let reconciledValue: T | typeof nothing = nothing; + let cleanUp: (() => Promise) | undefined = undefined; + let callbackPromise: Promise<(() => Promise) | undefined>; + value$ + .pipe( + catchError(() => EMPTY), + this.bind(), + endWith(nothing), + ) + .subscribe((value) => { + void (async (): Promise => { + if (latestValue === nothing) { + latestValue = value; + while (latestValue !== reconciledValue) { + await cleanUp?.(); + reconciledValue = latestValue; + if (latestValue !== nothing) { + callbackPromise = callback(latestValue); + cleanUp = await callbackPromise; + } + } + latestValue = nothing; + } else { + latestValue = value; + } + })(); + }); + } } /**