diff --git a/src/livekit/MatrixAudioRenderer.tsx b/src/livekit/MatrixAudioRenderer.tsx index 24975509..4b28a733 100644 --- a/src/livekit/MatrixAudioRenderer.tsx +++ b/src/livekit/MatrixAudioRenderer.tsx @@ -78,6 +78,8 @@ export function MatrixAudioRenderer({ loggedInvalidIdentities.current.add(identity); }; + // TODO-MULTI-SFU this uses the livekit room form the context. We need to change it so it uses the + // livekit room explicitly so we can pass a list of rooms into the audio renderer and call useTracks for each room. const tracks = useTracks( [ Track.Source.Microphone, diff --git a/src/livekit/livekitSubscriptionRoom.ts b/src/livekit/livekitSubscriptionRoom.ts new file mode 100644 index 00000000..f92ff10e --- /dev/null +++ b/src/livekit/livekitSubscriptionRoom.ts @@ -0,0 +1,123 @@ +/* +Copyright 2023, 2024 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + ConnectionState, + type E2EEManagerOptions, + ExternalE2EEKeyProvider, + LocalVideoTrack, + Room, + type RoomOptions, +} from "livekit-client"; +import { useEffect, useRef } from "react"; +import E2EEWorker from "livekit-client/e2ee-worker?worker"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; + +import { defaultLiveKitOptions } from "./options"; +import { type SFUConfig } from "./openIDSFU"; +import { type MuteStates } from "../room/MuteStates"; +import { useMediaDevices } from "../MediaDevicesContext"; +import { + type ECConnectionState, + useECConnectionState, +} from "./useECConnectionState"; +import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; +import { E2eeType } from "../e2ee/e2eeType"; +import { type EncryptionSystem } from "../e2ee/sharedKeyManagement"; +import { + useTrackProcessor, + useTrackProcessorSync, +} from "./TrackProcessorContext"; +import { observeTrackReference$ } from "../state/MediaViewModel"; +import { useUrlParams } from "../UrlParams"; +import { useInitial } from "../useInitial"; +import { getValue } from "../utils/observable"; +import { type SelectedDevice } from "../state/MediaDevices"; + +interface UseLivekitResult { + livekitPublicationRoom?: Room; + connState: ECConnectionState; +} + +// TODO-MULTI-SFU This is all the logic we need in the subscription connection logic (sync output devices) +// This is not used! (but summarizes what we need) +export function livekitSubscriptionRoom( + rtcSession: MatrixRTCSession, + muteStates: MuteStates, + sfuConfig: SFUConfig | undefined, + e2eeSystem: EncryptionSystem, +): UseLivekitResult { + // Only ever create the room once via useInitial. + // The call can end up with multiple livekit rooms. This is the particular room in + // which this participant publishes their media. + const publicationRoom = useInitial(() => { + logger.info("[LivekitRoom] Create LiveKit room"); + + let e2ee: E2EEManagerOptions | undefined; + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + logger.info("Created MatrixKeyProvider (per participant)"); + e2ee = { + keyProvider: new MatrixKeyProvider(), + worker: new E2EEWorker(), + }; + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + logger.info("Created ExternalE2EEKeyProvider (shared key)"); + e2ee = { + keyProvider: new ExternalE2EEKeyProvider(), + worker: new E2EEWorker(), + }; + } + + const roomOptions: RoomOptions = { + ...defaultLiveKitOptions, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : getValue(devices.audioOutput.selected$)?.id, + }, + e2ee, + }; + // We have to create the room manually here due to a bug inside + // @livekit/components-react. JSON.stringify() is used in deps of a + // useEffect() with an argument that references itself, if E2EE is enabled + const room = new Room(roomOptions); + room.setE2EEEnabled(e2eeSystem.kind !== E2eeType.NONE).catch((e) => { + logger.error("Failed to set E2EE enabled on room", e); + }); + + return room; + }); + + // Setup and update the keyProvider which was create by `createRoom` + useEffect(() => { + const e2eeOptions = publicationRoom.options.e2ee; + if ( + e2eeSystem.kind === E2eeType.NONE || + !(e2eeOptions && "keyProvider" in e2eeOptions) + ) + return; + + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + (e2eeOptions.keyProvider as MatrixKeyProvider).setRTCSession(rtcSession); + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + (e2eeOptions.keyProvider as ExternalE2EEKeyProvider) + .setKey(e2eeSystem.secret) + .catch((e) => { + logger.error("Failed to set shared key for E2EE", e); + }); + } + }, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]); + + return { + connState: connectionState, + livekitPublicationRoom: publicationRoom, + }; +} diff --git a/src/livekit/openIDSFU.ts b/src/livekit/openIDSFU.ts index 2ebd6045..a288ec57 100644 --- a/src/livekit/openIDSFU.ts +++ b/src/livekit/openIDSFU.ts @@ -7,12 +7,7 @@ Please see LICENSE in the repository root for full details. import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; -import { useEffect, useState } from "react"; -import { type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; -import { useActiveLivekitFocus } from "../room/useActiveFocus"; -import { useErrorBoundary } from "../useErrorBoundary"; import { FailToGetOpenIdToken } from "../utils/errors"; import { doNetworkOperationWithRetry } from "../utils/matrix"; @@ -34,38 +29,11 @@ export type OpenIDClientParts = Pick< "getOpenIdToken" | "getDeviceId" >; -export function useOpenIDSFU( - client: OpenIDClientParts, - rtcSession: MatrixRTCSession, -): SFUConfig | undefined { - const [sfuConfig, setSFUConfig] = useState(undefined); - - const activeFocus = useActiveLivekitFocus(rtcSession); - const { showErrorBoundary } = useErrorBoundary(); - - useEffect(() => { - if (activeFocus) { - getSFUConfigWithOpenID(client, activeFocus).then( - (sfuConfig) => { - setSFUConfig(sfuConfig); - }, - (e) => { - showErrorBoundary(new FailToGetOpenIdToken(e)); - logger.error("Failed to get SFU config", e); - }, - ); - } else { - setSFUConfig(undefined); - } - }, [client, activeFocus, showErrorBoundary]); - - return sfuConfig; -} - export async function getSFUConfigWithOpenID( client: OpenIDClientParts, - activeFocus: LivekitFocus, -): Promise { + serviceUrl: string, + livekitAlias: string, +): Promise { let openIdToken: IOpenIDToken; try { openIdToken = await doNetworkOperationWithRetry(async () => @@ -78,26 +46,16 @@ export async function getSFUConfigWithOpenID( } logger.debug("Got openID token", openIdToken); - try { - logger.info( - `Trying to get JWT from call's active focus URL of ${activeFocus.livekit_service_url}...`, - ); - const sfuConfig = await getLiveKitJWT( - client, - activeFocus.livekit_service_url, - activeFocus.livekit_alias, - openIdToken, - ); - logger.info(`Got JWT from call's active focus URL.`); + logger.info(`Trying to get JWT for focus ${serviceUrl}...`); + const sfuConfig = await getLiveKitJWT( + client, + serviceUrl, + livekitAlias, + openIdToken, + ); + logger.info(`Got JWT from call's active focus URL.`); - return sfuConfig; - } catch (e) { - logger.warn( - `Failed to get JWT from RTC session's active focus URL of ${activeFocus.livekit_service_url}.`, - e, - ); - return undefined; - } + return sfuConfig; } async function getLiveKitJWT( diff --git a/src/livekit/useLivekit.ts b/src/livekit/useLivekit.ts index 4c669b47..0672a8eb 100644 --- a/src/livekit/useLivekit.ts +++ b/src/livekit/useLivekit.ts @@ -50,11 +50,12 @@ import { getValue } from "../utils/observable"; import { type SelectedDevice } from "../state/MediaDevices"; interface UseLivekitResult { - livekitRoom?: Room; + livekitPublicationRoom?: Room; connState: ECConnectionState; } -export function useLivekit( +// TODO-MULTI-SFU This is not used anymore but the device syncing logic needs to be moved into the connection object. +export function useLivekitPublicationRoom( rtcSession: MatrixRTCSession, muteStates: MuteStates, sfuConfig: SFUConfig | undefined, @@ -83,7 +84,9 @@ export function useLivekit( const { processor } = useTrackProcessor(); // Only ever create the room once via useInitial. - const room = useInitial(() => { + // The call can end up with multiple livekit rooms. This is the particular room in + // which this participant publishes their media. + const publicationRoom = useInitial(() => { logger.info("[LivekitRoom] Create LiveKit room"); let e2ee: E2EEManagerOptions | undefined; @@ -135,7 +138,7 @@ export function useLivekit( // Setup and update the keyProvider which was create by `createRoom` useEffect(() => { - const e2eeOptions = room.options.e2ee; + const e2eeOptions = publicationRoom.options.e2ee; if ( e2eeSystem.kind === E2eeType.NONE || !(e2eeOptions && "keyProvider" in e2eeOptions) @@ -151,7 +154,7 @@ export function useLivekit( logger.error("Failed to set shared key for E2EE", e); }); } - }, [room.options.e2ee, e2eeSystem, rtcSession]); + }, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]); // Sync the requested track processors with LiveKit useTrackProcessorSync( @@ -170,7 +173,7 @@ export function useLivekit( return track instanceof LocalVideoTrack ? track : null; }), ), - [room], + [publicationRoom], ), ), ); @@ -178,7 +181,7 @@ export function useLivekit( const connectionState = useECConnectionState( initialAudioInputId, initialMuteStates.audio.enabled, - room, + publicationRoom, sfuConfig, ); @@ -216,8 +219,11 @@ export function useLivekit( // It's important that we only do this in the connected state, because // LiveKit's internal mute states aren't consistent during connection setup, // and setting tracks to be enabled during this time causes errors. - if (room !== undefined && connectionState === ConnectionState.Connected) { - const participant = room.localParticipant; + if ( + publicationRoom !== undefined && + connectionState === ConnectionState.Connected + ) { + const participant = publicationRoom.localParticipant; // Always update the muteButtonState Ref so that we can read the current // state in awaited blocks. buttonEnabled.current = { @@ -275,7 +281,7 @@ export function useLivekit( audioMuteUpdating.current = true; trackPublication = await participant.setMicrophoneEnabled( buttonEnabled.current.audio, - room.options.audioCaptureDefaults, + publicationRoom.options.audioCaptureDefaults, ); audioMuteUpdating.current = false; break; @@ -283,7 +289,7 @@ export function useLivekit( videoMuteUpdating.current = true; trackPublication = await participant.setCameraEnabled( buttonEnabled.current.video, - room.options.videoCaptureDefaults, + publicationRoom.options.videoCaptureDefaults, ); videoMuteUpdating.current = false; break; @@ -347,11 +353,14 @@ export function useLivekit( logger.error("Failed to sync video mute state with LiveKit", e); }); } - }, [room, muteStates, connectionState]); + }, [publicationRoom, muteStates, connectionState]); useEffect(() => { // Sync the requested devices with LiveKit's devices - if (room !== undefined && connectionState === ConnectionState.Connected) { + if ( + publicationRoom !== undefined && + connectionState === ConnectionState.Connected + ) { const syncDevice = ( kind: MediaDeviceKind, selected$: Observable, @@ -359,15 +368,15 @@ export function useLivekit( selected$.subscribe((device) => { logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", - room.getActiveDevice(kind), + publicationRoom.getActiveDevice(kind), " !== ", device?.id, ); if ( device !== undefined && - room.getActiveDevice(kind) !== device.id + publicationRoom.getActiveDevice(kind) !== device.id ) { - room + publicationRoom .switchActiveDevice(kind, device.id) .catch((e) => logger.error(`Failed to sync ${kind} device with LiveKit`, e), @@ -393,7 +402,7 @@ export function useLivekit( .pipe(switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER)) .subscribe(() => { const activeMicTrack = Array.from( - room.localParticipant.audioTrackPublications.values(), + publicationRoom.localParticipant.audioTrackPublications.values(), ).find((d) => d.source === Track.Source.Microphone)?.track; if ( @@ -408,7 +417,7 @@ export function useLivekit( // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because // the deviceId hasn't changed (was & still is default). - room.localParticipant + publicationRoom.localParticipant .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { @@ -422,10 +431,10 @@ export function useLivekit( for (const s of subscriptions) s?.unsubscribe(); }; } - }, [room, devices, connectionState, controlledAudioDevices]); + }, [publicationRoom, devices, connectionState, controlledAudioDevices]); return { connState: connectionState, - livekitRoom: room, + livekitPublicationRoom: publicationRoom, }; } diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6cdbb75c..e12fc060 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -5,9 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { RoomContext, useLocalParticipant } from "@livekit/components-react"; import { IconButton, Text, Tooltip } from "@vector-im/compound-web"; -import { ConnectionState, type Room as LivekitRoom } from "livekit-client"; import { type MatrixClient, type Room as MatrixRoom } from "matrix-js-sdk"; import { type FC, @@ -37,6 +35,7 @@ import { VolumeOnSolidIcon, } from "@vector-im/compound-design-tokens/assets/web/icons"; import { useTranslation } from "react-i18next"; +import { ConnectionState } from "livekit-client"; import LogoMark from "../icons/LogoMark.svg?react"; import LogoType from "../icons/LogoType.svg?react"; @@ -59,14 +58,12 @@ import { type OTelGroupCallMembership } from "../otel/OTelGroupCallMembership"; import { SettingsModal, defaultSettingsTab } from "../settings/SettingsModal"; import { useRageshakeRequestModal } from "../settings/submit-rageshake"; import { RageshakeRequestModal } from "./RageshakeRequestModal"; -import { useLivekit } from "../livekit/useLivekit.ts"; import { useWakeLock } from "../useWakeLock"; import { useMergedRefs } from "../useMergedRefs"; import { type MuteStates } from "./MuteStates"; import { type MatrixInfo } from "./VideoPreview"; import { InviteButton } from "../button/InviteButton"; import { LayoutToggle } from "./LayoutToggle"; -import { useOpenIDSFU } from "../livekit/openIDSFU"; import { CallViewModel, type GridMode, @@ -108,9 +105,7 @@ import { useSetting, } from "../settings/settings"; import { ReactionsReader } from "../reactions/ReactionsReader"; -import { ConnectionLostError } from "../utils/errors.ts"; import { useTypedEventEmitter } from "../useEvents.ts"; -import { MatrixAudioRenderer } from "../livekit/MatrixAudioRenderer.tsx"; import { muteAllAudio$ } from "../state/MuteAllAudioModel.ts"; import { useMatrixRTCSessionMemberships } from "../useMatrixRTCSessionMemberships.ts"; import { useMediaDevices } from "../MediaDevicesContext.ts"; @@ -125,7 +120,7 @@ import { prefetchSounds } from "../soundUtils"; import { useAudioContext } from "../useAudioContext"; import ringtoneMp3 from "../sound/ringtone.mp3?url"; import ringtoneOgg from "../sound/ringtone.ogg?url"; -import { ObservableScope } from "../state/ObservableScope.ts"; +import { ConnectionLostError } from "../utils/errors.ts"; const canScreenshare = "getDisplayMedia" in (navigator.mediaDevices ?? {}); @@ -138,92 +133,47 @@ export interface ActiveCallProps export const ActiveCall: FC = (props) => { const mediaDevices = useMediaDevices(); - const sfuConfig = useOpenIDSFU(props.client, props.rtcSession); - const { livekitRoom, connState } = useLivekit( - props.rtcSession, - props.muteStates, - sfuConfig, - props.e2eeSystem, - ); - const observableScope = useInitial(() => new ObservableScope()); - const connStateBehavior$ = useObservable( - (inputs$) => - observableScope.behavior( - inputs$.pipe(map(([connState]) => connState)), - connState, - ), - [connState], - ); const [vm, setVm] = useState(null); - useEffect(() => { - logger.info( - `[Lifecycle] InCallView Component mounted, livekit room state ${livekitRoom?.state}`, - ); - return (): void => { - logger.info( - `[Lifecycle] InCallView Component unmounted, livekit room state ${livekitRoom?.state}`, - ); - livekitRoom - ?.disconnect() - .then(() => { - logger.info( - `[Lifecycle] Disconnected from livekit room, state:${livekitRoom?.state}`, - ); - }) - .catch((e) => { - logger.error("[Lifecycle] Failed to disconnect from livekit room", e); - }); - }; - }, [livekitRoom]); - - const { autoLeaveWhenOthersLeft, sendNotificationType, waitForCallPickup } = + const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } = useUrlParams(); useEffect(() => { - if (livekitRoom !== undefined) { - const reactionsReader = new ReactionsReader(props.rtcSession); - const vm = new CallViewModel( - props.rtcSession, - props.matrixRoom, - livekitRoom, - mediaDevices, - { - encryptionSystem: props.e2eeSystem, - autoLeaveWhenOthersLeft, - waitForCallPickup: - waitForCallPickup && sendNotificationType === "ring", - }, - connStateBehavior$, - reactionsReader.raisedHands$, - reactionsReader.reactions$, - ); - setVm(vm); - return (): void => { - vm.destroy(); - reactionsReader.destroy(); - }; - } + const reactionsReader = new ReactionsReader(props.rtcSession); + const vm = new CallViewModel( + props.rtcSession, + props.matrixRoom, + mediaDevices, + { + encryptionSystem: props.e2eeSystem, + autoLeaveWhenOthersLeft, + waitForCallPickup: waitForCallPickup && sendNotificationType === "ring", + }, + reactionsReader.raisedHands$, + reactionsReader.reactions$, + props.e2eeSystem, + ); + setVm(vm); + return (): void => { + vm.destroy(); + reactionsReader.destroy(); + }; }, [ props.rtcSession, props.matrixRoom, - livekitRoom, mediaDevices, props.e2eeSystem, - connStateBehavior$, autoLeaveWhenOthersLeft, sendNotificationType, waitForCallPickup, ]); - if (livekitRoom === undefined || vm === null) return null; + if (vm === null) return null; return ( - - - - - + + + ); }; @@ -233,7 +183,6 @@ export interface InCallViewProps { matrixInfo: MatrixInfo; rtcSession: MatrixRTCSession; matrixRoom: MatrixRoom; - livekitRoom: LivekitRoom; muteStates: MuteStates; /** Function to call when the user explicitly ends the call */ onLeave: (cause: "user", soundFile?: CallEventSounds) => void; @@ -248,7 +197,6 @@ export const InCallView: FC = ({ matrixInfo, rtcSession, matrixRoom, - livekitRoom, muteStates, onLeave, header: headerStyle, @@ -273,10 +221,6 @@ export const InCallView: FC = ({ const { hideScreensharing, showControls } = useUrlParams(); - const { isScreenShareEnabled, localParticipant } = useLocalParticipant({ - room: livekitRoom, - }); - const muteAllAudio = useBehavior(muteAllAudio$); // Call pickup state and display names are needed for waiting overlay/sounds const callPickupState = useBehavior(vm.callPickupState$); @@ -806,15 +750,16 @@ export const InCallView: FC = ({ ); const toggleScreensharing = useCallback(() => { - localParticipant - .setScreenShareEnabled(!isScreenShareEnabled, { - audio: true, - selfBrowserSurface: "include", - surfaceSwitching: "include", - systemAudio: "include", - }) - .catch(logger.error); - }, [localParticipant, isScreenShareEnabled]); + throw new Error("TODO-MULTI-SFU"); + // localParticipant + // .setScreenShareEnabled(!isScreenShareEnabled, { + // audio: true, + // selfBrowserSurface: "include", + // surfaceSwitching: "include", + // systemAudio: "include", + // }) + // .catch(logger.error); + }, []); const buttons: JSX.Element[] = []; @@ -841,7 +786,7 @@ export const InCallView: FC = ({ = ({ ) } - + {/* TODO-MULTI-SFU: */} {renderContent()} @@ -955,7 +900,7 @@ export const InCallView: FC = ({ onDismiss={closeSettings} tab={settingsTab} onTabChange={setSettingsTab} - livekitRoom={livekitRoom} + livekitRoom={undefined} // TODO-MULTI-SFU /> )} diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index 73f58cea..e5e567ef 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -8,6 +8,7 @@ Please see LICENSE in the repository root for full details. import { isLivekitFocus, isLivekitFocusConfig, + LivekitFocusConfig, type LivekitFocus, type LivekitFocusActive, type MatrixRTCSession, @@ -31,24 +32,16 @@ export function makeActiveFocus(): LivekitFocusActive { }; } -async function makePreferredLivekitFoci( +export function getLivekitAlias(rtcSession: MatrixRTCSession): string { + // For now we assume everything is a room-scoped call + return rtcSession.room.roomId; +} + +async function makeFocusInternal( rtcSession: MatrixRTCSession, - livekitAlias: string, -): Promise { - logger.log("Start building foci_preferred list: ", rtcSession.room.roomId); - - const preferredFoci: LivekitFocus[] = []; - - // Make the Focus from the running rtc session the highest priority one - // This minimizes how often we need to switch foci during a call. - const focusInUse = rtcSession.getFocusInUse(); - if (focusInUse && isLivekitFocus(focusInUse)) { - logger.log("Adding livekit focus from oldest member: ", focusInUse); - preferredFoci.push(focusInUse); - } - - // Warm up the first focus we owned, to ensure livekit room is created before any state event sent. - let toWarmUp: LivekitFocus | undefined; +): Promise { + logger.log("Searching for a preferred focus"); + const livekitAlias = getLivekitAlias(rtcSession); // Prioritize the .well-known/matrix/client, if available, over the configured SFU const domain = rtcSession.room.client.getDomain(); @@ -59,51 +52,42 @@ async function makePreferredLivekitFoci( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const validWellKnownFoci = wellKnownFoci - .filter((f) => !!f) - .filter(isLivekitFocusConfig) - .map((wellKnownFocus) => { - logger.log("Adding livekit focus from well known: ", wellKnownFocus); - return { ...wellKnownFocus, livekit_alias: livekitAlias }; - }); - if (validWellKnownFoci.length > 0) { - toWarmUp = validWellKnownFoci[0]; + const focus: LivekitFocusConfig | undefined = wellKnownFoci.find( + (f) => f && isLivekitFocusConfig(f), + ); + if (focus !== undefined) { + logger.log("Using LiveKit focus from .well-known: ", focus); + return { ...focus, livekit_alias: livekitAlias }; } - preferredFoci.push(...validWellKnownFoci); } } const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFormConf: LivekitFocus = { + const focusFromConf: LivekitFocus = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, }; - toWarmUp = toWarmUp ?? focusFormConf; - logger.log("Adding livekit focus from config: ", focusFormConf); - preferredFoci.push(focusFormConf); + logger.log("Using LiveKit focus from config: ", focusFromConf); + return focusFromConf; } - if (toWarmUp) { - // this will call the jwt/sfu/get endpoint to pre create the livekit room. - await getSFUConfigWithOpenID(rtcSession.room.client, toWarmUp); - } - if (preferredFoci.length === 0) - throw new MatrixRTCFocusMissingError(domain ?? ""); - return Promise.resolve(preferredFoci); + throw new MatrixRTCFocusMissingError(domain ?? ""); +} - // TODO: we want to do something like this: - // - // const focusOtherMembers = await focusFromOtherMembers( - // rtcSession, - // livekitAlias, - // ); - // if (focusOtherMembers) preferredFoci.push(focusOtherMembers); +export async function makeFocus( + rtcSession: MatrixRTCSession, +): Promise { + const focus = await makeFocusInternal(rtcSession); + // this will call the jwt/sfu/get endpoint to pre create the livekit room. + await getSFUConfigWithOpenID(rtcSession.room.client, focus); + return focus; } export async function enterRTCSession( rtcSession: MatrixRTCSession, + focus: LivekitFocus, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, @@ -115,34 +99,27 @@ export async function enterRTCSession( // have started tracking by the time calls start getting created. // groupCallOTelMembership?.onJoinCall(); - // right now we assume everything is a room-scoped call - const livekitAlias = rtcSession.room.roomId; const { features, matrix_rtc_session: matrixRtcSessionConfig } = Config.get(); const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; - rtcSession.joinRoomSession( - await makePreferredLivekitFoci(rtcSession, livekitAlias), - makeActiveFocus(), - { - notificationType: getUrlParams().sendNotificationType, - useNewMembershipManager, - manageMediaKeys: encryptMedia, - ...(useDeviceSessionMemberEvents !== undefined && { - useLegacyMemberEvents: !useDeviceSessionMemberEvents, - }), - delayedLeaveEventRestartMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_ms, - delayedLeaveEventDelayMs: - matrixRtcSessionConfig?.delayed_leave_event_delay_ms, - delayedLeaveEventRestartLocalTimeoutMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, - networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, - makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, - membershipEventExpiryMs: - matrixRtcSessionConfig?.membership_event_expiry_ms, - useExperimentalToDeviceTransport, - }, - ); + rtcSession.joinRoomSession([focus], focus, { + notificationType: getUrlParams().sendNotificationType, + useNewMembershipManager, + manageMediaKeys: encryptMedia, + ...(useDeviceSessionMemberEvents !== undefined && { + useLegacyMemberEvents: !useDeviceSessionMemberEvents, + }), + delayedLeaveEventRestartMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_ms, + delayedLeaveEventDelayMs: + matrixRtcSessionConfig?.delayed_leave_event_delay_ms, + delayedLeaveEventRestartLocalTimeoutMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, + networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, + makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, + membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, + useExperimentalToDeviceTransport, + }); if (widget) { try { await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 462e4afc..f0d2c0b7 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -12,7 +12,9 @@ import { } from "@livekit/components-core"; import { ConnectionState, - type Room as LivekitRoom, + E2EEOptions, + ExternalE2EEKeyProvider, + Room as LivekitRoom, type LocalParticipant, ParticipantEvent, type RemoteParticipant, @@ -22,6 +24,7 @@ import { type EventTimelineSetHandlerMap, EventType, RoomEvent, + MatrixClient, RoomStateEvent, SyncState, type Room as MatrixRoom, @@ -63,6 +66,7 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, + isLivekitFocusConfig, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -116,7 +120,16 @@ import { observeSpeaker$ } from "./observeSpeaker"; import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; -import { constant, type Behavior } from "./Behavior"; +import { type Behavior } from "./Behavior"; +import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; +import { defaultLiveKitOptions } from "../livekit/options"; +import { + enterRTCSession, + getLivekitAlias, + makeFocus, +} from "../rtcSessionHelpers"; +import { E2eeType } from "../e2ee/e2eeType"; +import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -405,6 +418,31 @@ class ScreenShare { type MediaItem = UserMedia | ScreenShare; +function getE2eeOptions( + e2eeSystem: EncryptionSystem, + rtcSession: MatrixRTCSession, +): E2EEOptions | undefined { + if (e2eeSystem.kind === E2eeType.NONE) return undefined; + + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + const keyProvider = new MatrixKeyProvider(); + keyProvider.setRTCSession(rtcSession); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + const keyProvider = new ExternalE2EEKeyProvider(); + keyProvider + .setKey(e2eeSystem.secret) + .catch((e) => logger.error("Failed to set shared key for E2EE", e)); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } +} + function getRoomMemberFromRtcMember( rtcMember: CallMembership, room: MatrixRoom, @@ -427,8 +465,151 @@ function getRoomMemberFromRtcMember( return { id, member }; } -// TODO: Move wayyyy more business logic from the call and lobby views into here +class Connection { + // TODO-MULTI-SFU Add all device syncing logic from useLivekit + private readonly sfuConfig = getSFUConfigWithOpenID( + this.client, + this.serviceUrl, + this.livekitAlias, + ); + + public async startSubscribing(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + public async startPublishing(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) + // TODO-MULTI-SFU this should not create a track? + await this.livekitRoom.localParticipant.createTracks({ + audio: { deviceId: "default" }, + }); + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + private stopped = false; + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participants$ = connectedParticipantsObserver( + this.livekitRoom, + ).pipe(this.scope.state()); + + public constructor( + private readonly livekitRoom: LivekitRoom, + private readonly serviceUrl: string, + private readonly livekitAlias: string, + private readonly client: MatrixClient, + private readonly scope: ObservableScope, + ) {} +} + export class CallViewModel extends ViewModel { + private readonly e2eeOptions = getE2eeOptions( + this.encryptionSystem, + this.matrixRTCSession, + ); + + private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); + + private readonly livekitRoom = new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }); + + private readonly localFocus = makeFocus(this.matrixRTCSession); + + private readonly localConnection = this.localFocus.then( + (focus) => + new Connection( + this.livekitRoom, + focus.livekit_service_url, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + ), + ); + + private readonly memberships$ = fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ).pipe(map(() => this.matrixRTCSession.memberships)); + + private readonly foci$ = this.memberships$.pipe( + map( + (memberships) => + new Set( + memberships + .map((m) => this.matrixRTCSession.resolveActiveFocus(m)) + .filter((f) => f !== undefined && isLivekitFocusConfig(f)) + .map((f) => f.livekit_service_url), + ), + ), + ); + + private readonly remoteConnections$ = combineLatest([ + this.localFocus, + this.foci$, + ]).pipe( + accumulate(new Map(), (prev, [localFocus, foci]) => { + const stopped = new Map(prev); + const next = new Map(); + + for (const focus of foci) { + if (focus !== localFocus.livekit_service_url) { + stopped.delete(focus); + next.set( + focus, + prev.get(focus) ?? + new Connection( + new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }), + focus, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + ), + ); + } + } + + for (const connection of stopped.values()) connection.stop(); + return next; + }), + ); + + private readonly joined$ = new Subject(); + + public join(): void { + this.joined$.next(); + } + + public leave(): void { + // TODO + } + + private readonly connectionInstructions$ = this.joined$.pipe( + switchMap(() => this.remoteConnections$), + startWith(new Map()), + pairwise(), + map(([prev, next]) => { + const start = new Set(next.values()); + for (const connection of prev.values()) start.delete(connection); + const stop = new Set(prev.values()); + for (const connection of next.values()) stop.delete(connection); + + return { start, stop }; + }), + ); + private readonly userId = this.matrixRoom.client.getUserId(); private readonly matrixConnected$ = this.scope.behavior( @@ -502,79 +683,13 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; - /** - * The raw list of RemoteParticipants as reported by LiveKit - */ - private readonly rawRemoteParticipants$ = this.scope.behavior< - RemoteParticipant[] - >(connectedParticipantsObserver(this.livekitRoom), []); - - /** - * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that - * they've left - */ - private readonly remoteParticipantHolds$ = this.scope.behavior< - RemoteParticipant[][] - >( - this.livekitConnectionState$.pipe( - withLatestFrom(this.rawRemoteParticipants$), - mergeMap(([s, ps]) => { - // Whenever we switch focuses, we should retain all the previous - // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to - // give their clients time to switch over and avoid jarring layout shifts - if (s === ECAddonConnectionState.ECSwitchingFocus) { - return concat( - // Hold these participants - of({ hold: ps }), - // Wait for time to pass and the connection state to have changed - forkJoin([ - timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), - this.livekitConnectionState$.pipe( - filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), - take(1), - ), - // Then unhold them - ]).pipe(map(() => ({ unhold: ps }))), - ); - } else { - return EMPTY; - } - }), - // Accumulate the hold instructions into a single list showing which - // participants are being held - accumulate([] as RemoteParticipant[][], (holds, instruction) => - "hold" in instruction - ? [instruction.hold, ...holds] - : holds.filter((h) => h !== instruction.unhold), - ), - ), - ); - /** * The RemoteParticipants including those that are being "held" on the screen */ private readonly remoteParticipants$ = this.scope - .behavior( - combineLatest( - [this.rawRemoteParticipants$, this.remoteParticipantHolds$], - (raw, holds) => { - const result = [...raw]; - const resultIds = new Set(result.map((p) => p.identity)); - - // Incorporate the held participants into the list - for (const hold of holds) { - for (const p of hold) { - if (!resultIds.has(p.identity)) { - result.push(p); - resultIds.add(p.identity); - } - } - } - - return result; - }, - ), - ) + .behavior< + RemoteParticipant[] + >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) .pipe(pauseWhen(this.pretendToBeDisconnected$)); private readonly memberships$ = this.scope.behavior( @@ -1685,24 +1800,42 @@ export class CallViewModel extends ViewModel { ), filter((v) => v.playSounds), ); + // TODO-REBASE: expose connection state observable + public readonly livekitConnectionState$: Observable; public constructor( - // A call is permanently tied to a single Matrix room and LiveKit room + // A call is permanently tied to a single Matrix room private readonly matrixRTCSession: MatrixRTCSession, private readonly matrixRoom: MatrixRoom, - private readonly livekitRoom: LivekitRoom, private readonly mediaDevices: MediaDevices, private readonly options: CallViewModelOptions, - public readonly livekitConnectionState$: Behavior, private readonly handsRaisedSubject$: Observable< Record >, private readonly reactionsSubject$: Observable< Record >, + private readonly encryptionSystem: EncryptionSystem, ) { super(); + void this.localConnection.then((c) => c.startPublishing()); + this.connectionInstructions$ + .pipe(this.scope.bind()) + .subscribe(({ start, stop }) => { + for (const connection of start) connection.startSubscribing(); + for (const connection of stop) connection.stop(); + }); + combineLatest([this.localFocus, this.joined$]) + .pipe(this.scope.bind()) + .subscribe(([localFocus]) => { + enterRTCSession( + this.matrixRTCSession, + localFocus, + this.encryptionSystem.kind !== E2eeType.PER_PARTICIPANT, + ); + }); + // Pause upstream of all local media tracks when we're disconnected from // MatrixRTC, because it can be an unpleasant surprise for the app to say // 'reconnecting' and yet still be transmitting your media to others. diff --git a/yarn.lock b/yarn.lock index 5f224576..668706b4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10317,7 +10317,7 @@ __metadata: uuid: "npm:11" checksum: 10c0/ecd019c677c272c5598617dcde407dbe4b1b11460863b2a577e33f3fd8732c9d9073ec0221b471ec1eb24e2839eec20728db7f92c9348be83126547286e50805 languageName: node - linkType: hard + linkType: soft "matrix-widget-api@npm:^1.10.0, matrix-widget-api@npm:^1.13.0": version: 1.13.1