diff --git a/src/livekit/MatrixAudioRenderer.tsx b/src/livekit/MatrixAudioRenderer.tsx index 24975509..4b28a733 100644 --- a/src/livekit/MatrixAudioRenderer.tsx +++ b/src/livekit/MatrixAudioRenderer.tsx @@ -78,6 +78,8 @@ export function MatrixAudioRenderer({ loggedInvalidIdentities.current.add(identity); }; + // TODO-MULTI-SFU this uses the livekit room form the context. We need to change it so it uses the + // livekit room explicitly so we can pass a list of rooms into the audio renderer and call useTracks for each room. const tracks = useTracks( [ Track.Source.Microphone, diff --git a/src/livekit/livekitSubscriptionRoom.ts b/src/livekit/livekitSubscriptionRoom.ts new file mode 100644 index 00000000..f92ff10e --- /dev/null +++ b/src/livekit/livekitSubscriptionRoom.ts @@ -0,0 +1,123 @@ +/* +Copyright 2023, 2024 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + ConnectionState, + type E2EEManagerOptions, + ExternalE2EEKeyProvider, + LocalVideoTrack, + Room, + type RoomOptions, +} from "livekit-client"; +import { useEffect, useRef } from "react"; +import E2EEWorker from "livekit-client/e2ee-worker?worker"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; + +import { defaultLiveKitOptions } from "./options"; +import { type SFUConfig } from "./openIDSFU"; +import { type MuteStates } from "../room/MuteStates"; +import { useMediaDevices } from "../MediaDevicesContext"; +import { + type ECConnectionState, + useECConnectionState, +} from "./useECConnectionState"; +import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; +import { E2eeType } from "../e2ee/e2eeType"; +import { type EncryptionSystem } from "../e2ee/sharedKeyManagement"; +import { + useTrackProcessor, + useTrackProcessorSync, +} from "./TrackProcessorContext"; +import { observeTrackReference$ } from "../state/MediaViewModel"; +import { useUrlParams } from "../UrlParams"; +import { useInitial } from "../useInitial"; +import { getValue } from "../utils/observable"; +import { type SelectedDevice } from "../state/MediaDevices"; + +interface UseLivekitResult { + livekitPublicationRoom?: Room; + connState: ECConnectionState; +} + +// TODO-MULTI-SFU This is all the logic we need in the subscription connection logic (sync output devices) +// This is not used! (but summarizes what we need) +export function livekitSubscriptionRoom( + rtcSession: MatrixRTCSession, + muteStates: MuteStates, + sfuConfig: SFUConfig | undefined, + e2eeSystem: EncryptionSystem, +): UseLivekitResult { + // Only ever create the room once via useInitial. + // The call can end up with multiple livekit rooms. This is the particular room in + // which this participant publishes their media. + const publicationRoom = useInitial(() => { + logger.info("[LivekitRoom] Create LiveKit room"); + + let e2ee: E2EEManagerOptions | undefined; + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + logger.info("Created MatrixKeyProvider (per participant)"); + e2ee = { + keyProvider: new MatrixKeyProvider(), + worker: new E2EEWorker(), + }; + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + logger.info("Created ExternalE2EEKeyProvider (shared key)"); + e2ee = { + keyProvider: new ExternalE2EEKeyProvider(), + worker: new E2EEWorker(), + }; + } + + const roomOptions: RoomOptions = { + ...defaultLiveKitOptions, + audioOutput: { + // When using controlled audio devices, we don't want to set the + // deviceId here, because it will be set by the native app. + // (also the id does not need to match a browser device id) + deviceId: controlledAudioDevices + ? undefined + : getValue(devices.audioOutput.selected$)?.id, + }, + e2ee, + }; + // We have to create the room manually here due to a bug inside + // @livekit/components-react. JSON.stringify() is used in deps of a + // useEffect() with an argument that references itself, if E2EE is enabled + const room = new Room(roomOptions); + room.setE2EEEnabled(e2eeSystem.kind !== E2eeType.NONE).catch((e) => { + logger.error("Failed to set E2EE enabled on room", e); + }); + + return room; + }); + + // Setup and update the keyProvider which was create by `createRoom` + useEffect(() => { + const e2eeOptions = publicationRoom.options.e2ee; + if ( + e2eeSystem.kind === E2eeType.NONE || + !(e2eeOptions && "keyProvider" in e2eeOptions) + ) + return; + + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + (e2eeOptions.keyProvider as MatrixKeyProvider).setRTCSession(rtcSession); + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + (e2eeOptions.keyProvider as ExternalE2EEKeyProvider) + .setKey(e2eeSystem.secret) + .catch((e) => { + logger.error("Failed to set shared key for E2EE", e); + }); + } + }, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]); + + return { + connState: connectionState, + livekitPublicationRoom: publicationRoom, + }; +} diff --git a/src/livekit/openIDSFU.ts b/src/livekit/openIDSFU.ts index 2ebd6045..a288ec57 100644 --- a/src/livekit/openIDSFU.ts +++ b/src/livekit/openIDSFU.ts @@ -7,12 +7,7 @@ Please see LICENSE in the repository root for full details. import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk"; import { logger } from "matrix-js-sdk/lib/logger"; -import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; -import { useEffect, useState } from "react"; -import { type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; -import { useActiveLivekitFocus } from "../room/useActiveFocus"; -import { useErrorBoundary } from "../useErrorBoundary"; import { FailToGetOpenIdToken } from "../utils/errors"; import { doNetworkOperationWithRetry } from "../utils/matrix"; @@ -34,38 +29,11 @@ export type OpenIDClientParts = Pick< "getOpenIdToken" | "getDeviceId" >; -export function useOpenIDSFU( - client: OpenIDClientParts, - rtcSession: MatrixRTCSession, -): SFUConfig | undefined { - const [sfuConfig, setSFUConfig] = useState(undefined); - - const activeFocus = useActiveLivekitFocus(rtcSession); - const { showErrorBoundary } = useErrorBoundary(); - - useEffect(() => { - if (activeFocus) { - getSFUConfigWithOpenID(client, activeFocus).then( - (sfuConfig) => { - setSFUConfig(sfuConfig); - }, - (e) => { - showErrorBoundary(new FailToGetOpenIdToken(e)); - logger.error("Failed to get SFU config", e); - }, - ); - } else { - setSFUConfig(undefined); - } - }, [client, activeFocus, showErrorBoundary]); - - return sfuConfig; -} - export async function getSFUConfigWithOpenID( client: OpenIDClientParts, - activeFocus: LivekitFocus, -): Promise { + serviceUrl: string, + livekitAlias: string, +): Promise { let openIdToken: IOpenIDToken; try { openIdToken = await doNetworkOperationWithRetry(async () => @@ -78,26 +46,16 @@ export async function getSFUConfigWithOpenID( } logger.debug("Got openID token", openIdToken); - try { - logger.info( - `Trying to get JWT from call's active focus URL of ${activeFocus.livekit_service_url}...`, - ); - const sfuConfig = await getLiveKitJWT( - client, - activeFocus.livekit_service_url, - activeFocus.livekit_alias, - openIdToken, - ); - logger.info(`Got JWT from call's active focus URL.`); + logger.info(`Trying to get JWT for focus ${serviceUrl}...`); + const sfuConfig = await getLiveKitJWT( + client, + serviceUrl, + livekitAlias, + openIdToken, + ); + logger.info(`Got JWT from call's active focus URL.`); - return sfuConfig; - } catch (e) { - logger.warn( - `Failed to get JWT from RTC session's active focus URL of ${activeFocus.livekit_service_url}.`, - e, - ); - return undefined; - } + return sfuConfig; } async function getLiveKitJWT( diff --git a/src/livekit/useLivekit.ts b/src/livekit/useLivekit.ts index 24e0ca29..42cb93db 100644 --- a/src/livekit/useLivekit.ts +++ b/src/livekit/useLivekit.ts @@ -49,11 +49,12 @@ import { getValue } from "../utils/observable"; import { type SelectedDevice } from "../state/MediaDevices"; interface UseLivekitResult { - livekitRoom?: Room; + livekitPublicationRoom?: Room; connState: ECConnectionState; } -export function useLivekit( +// TODO-MULTI-SFU This is not used anymore but the device syncing logic needs to be moved into the connection object. +export function useLivekitPublicationRoom( rtcSession: MatrixRTCSession, muteStates: MuteStates, sfuConfig: SFUConfig | undefined, @@ -82,7 +83,9 @@ export function useLivekit( const { processor } = useTrackProcessor(); // Only ever create the room once via useInitial. - const room = useInitial(() => { + // The call can end up with multiple livekit rooms. This is the particular room in + // which this participant publishes their media. + const publicationRoom = useInitial(() => { logger.info("[LivekitRoom] Create LiveKit room"); let e2ee: E2EEManagerOptions | undefined; @@ -134,7 +137,7 @@ export function useLivekit( // Setup and update the keyProvider which was create by `createRoom` useEffect(() => { - const e2eeOptions = room.options.e2ee; + const e2eeOptions = publicationRoom.options.e2ee; if ( e2eeSystem.kind === E2eeType.NONE || !(e2eeOptions && "keyProvider" in e2eeOptions) @@ -150,7 +153,7 @@ export function useLivekit( logger.error("Failed to set shared key for E2EE", e); }); } - }, [room.options.e2ee, e2eeSystem, rtcSession]); + }, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]); // Sync the requested track processors with LiveKit useTrackProcessorSync( @@ -169,7 +172,7 @@ export function useLivekit( return track instanceof LocalVideoTrack ? track : null; }), ), - [room], + [publicationRoom], ), ), ); @@ -177,7 +180,7 @@ export function useLivekit( const connectionState = useECConnectionState( initialAudioInputId, initialMuteStates.audio.enabled, - room, + publicationRoom, sfuConfig, ); @@ -188,8 +191,11 @@ export function useLivekit( // It's important that we only do this in the connected state, because // LiveKit's internal mute states aren't consistent during connection setup, // and setting tracks to be enabled during this time causes errors. - if (room !== undefined && connectionState === ConnectionState.Connected) { - const participant = room.localParticipant; + if ( + publicationRoom !== undefined && + connectionState === ConnectionState.Connected + ) { + const participant = publicationRoom.localParticipant; // Always update the muteButtonState Ref so that we can read the current // state in awaited blocks. buttonEnabled.current = { @@ -247,7 +253,7 @@ export function useLivekit( audioMuteUpdating.current = true; trackPublication = await participant.setMicrophoneEnabled( buttonEnabled.current.audio, - room.options.audioCaptureDefaults, + publicationRoom.options.audioCaptureDefaults, ); audioMuteUpdating.current = false; break; @@ -255,7 +261,7 @@ export function useLivekit( videoMuteUpdating.current = true; trackPublication = await participant.setCameraEnabled( buttonEnabled.current.video, - room.options.videoCaptureDefaults, + publicationRoom.options.videoCaptureDefaults, ); videoMuteUpdating.current = false; break; @@ -319,11 +325,14 @@ export function useLivekit( logger.error("Failed to sync video mute state with LiveKit", e); }); } - }, [room, muteStates, connectionState]); + }, [publicationRoom, muteStates, connectionState]); useEffect(() => { // Sync the requested devices with LiveKit's devices - if (room !== undefined && connectionState === ConnectionState.Connected) { + if ( + publicationRoom !== undefined && + connectionState === ConnectionState.Connected + ) { const syncDevice = ( kind: MediaDeviceKind, selected$: Observable, @@ -331,15 +340,15 @@ export function useLivekit( selected$.subscribe((device) => { logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", - room.getActiveDevice(kind), + publicationRoom.getActiveDevice(kind), " !== ", device?.id, ); if ( device !== undefined && - room.getActiveDevice(kind) !== device.id + publicationRoom.getActiveDevice(kind) !== device.id ) { - room + publicationRoom .switchActiveDevice(kind, device.id) .catch((e) => logger.error(`Failed to sync ${kind} device with LiveKit`, e), @@ -365,7 +374,7 @@ export function useLivekit( .pipe(switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER)) .subscribe(() => { const activeMicTrack = Array.from( - room.localParticipant.audioTrackPublications.values(), + publicationRoom.localParticipant.audioTrackPublications.values(), ).find((d) => d.source === Track.Source.Microphone)?.track; if ( @@ -380,7 +389,7 @@ export function useLivekit( // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because // the deviceId hasn't changed (was & still is default). - room.localParticipant + publicationRoom.localParticipant .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { @@ -394,10 +403,10 @@ export function useLivekit( for (const s of subscriptions) s?.unsubscribe(); }; } - }, [room, devices, connectionState, controlledAudioDevices]); + }, [publicationRoom, devices, connectionState, controlledAudioDevices]); return { connState: connectionState, - livekitRoom: room, + livekitPublicationRoom: publicationRoom, }; } diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 4e3229a5..bc0de11b 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -5,9 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { RoomContext, useLocalParticipant } from "@livekit/components-react"; import { IconButton, Text, Tooltip } from "@vector-im/compound-web"; -import { ConnectionState, type Room as LivekitRoom } from "livekit-client"; import { type MatrixClient, type Room as MatrixRoom } from "matrix-js-sdk"; import { type FC, @@ -55,15 +53,12 @@ import { type OTelGroupCallMembership } from "../otel/OTelGroupCallMembership"; import { SettingsModal, defaultSettingsTab } from "../settings/SettingsModal"; import { useRageshakeRequestModal } from "../settings/submit-rageshake"; import { RageshakeRequestModal } from "./RageshakeRequestModal"; -import { useLivekit } from "../livekit/useLivekit.ts"; import { useWakeLock } from "../useWakeLock"; import { useMergedRefs } from "../useMergedRefs"; import { type MuteStates } from "./MuteStates"; import { type MatrixInfo } from "./VideoPreview"; import { InviteButton } from "../button/InviteButton"; import { LayoutToggle } from "./LayoutToggle"; -import { type ECConnectionState } from "../livekit/useECConnectionState"; -import { useOpenIDSFU } from "../livekit/openIDSFU"; import { CallViewModel, type GridMode, @@ -102,7 +97,6 @@ import { useSetting, } from "../settings/settings"; import { ReactionsReader } from "../reactions/ReactionsReader"; -import { ConnectionLostError } from "../utils/errors.ts"; import { useTypedEventEmitter } from "../useEvents.ts"; import { MatrixAudioRenderer } from "../livekit/MatrixAudioRenderer.tsx"; import { muteAllAudio$ } from "../state/MuteAllAudioModel.ts"; @@ -124,87 +118,42 @@ export interface ActiveCallProps export const ActiveCall: FC = (props) => { const mediaDevices = useMediaDevices(); - const sfuConfig = useOpenIDSFU(props.client, props.rtcSession); - const { livekitRoom, connState } = useLivekit( - props.rtcSession, - props.muteStates, - sfuConfig, - props.e2eeSystem, - ); - const connStateObservable$ = useObservable( - (inputs$) => inputs$.pipe(map(([connState]) => connState)), - [connState], - ); const [vm, setVm] = useState(null); - useEffect(() => { - logger.info( - `[Lifecycle] InCallView Component mounted, livekit room state ${livekitRoom?.state}`, - ); - return (): void => { - logger.info( - `[Lifecycle] InCallView Component unmounted, livekit room state ${livekitRoom?.state}`, - ); - livekitRoom - ?.disconnect() - .then(() => { - logger.info( - `[Lifecycle] Disconnected from livekit room, state:${livekitRoom?.state}`, - ); - }) - .catch((e) => { - logger.error("[Lifecycle] Failed to disconnect from livekit room", e); - }); - }; - }, [livekitRoom]); - const { autoLeaveWhenOthersLeft } = useUrlParams(); useEffect(() => { - if (livekitRoom !== undefined) { - const reactionsReader = new ReactionsReader(props.rtcSession); - const vm = new CallViewModel( - props.rtcSession, - props.matrixRoom, - livekitRoom, - mediaDevices, - { - encryptionSystem: props.e2eeSystem, - autoLeaveWhenOthersLeft, - }, - connStateObservable$, - reactionsReader.raisedHands$, - reactionsReader.reactions$, - ); - setVm(vm); - return (): void => { - vm.destroy(); - reactionsReader.destroy(); - }; - } + const reactionsReader = new ReactionsReader(props.rtcSession); + const vm = new CallViewModel( + props.rtcSession, + props.matrixRoom, + mediaDevices, + { + encryptionSystem: props.e2eeSystem, + autoLeaveWhenOthersLeft, + }, + reactionsReader.raisedHands$, + reactionsReader.reactions$, + ); + setVm(vm); + return (): void => { + vm.destroy(); + reactionsReader.destroy(); + }; }, [ props.rtcSession, props.matrixRoom, - livekitRoom, mediaDevices, props.e2eeSystem, - connStateObservable$, autoLeaveWhenOthersLeft, ]); - if (livekitRoom === undefined || vm === null) return null; + if (vm === null) return null; return ( - - - - - + + + ); }; @@ -214,14 +163,12 @@ export interface InCallViewProps { matrixInfo: MatrixInfo; rtcSession: MatrixRTCSession; matrixRoom: MatrixRoom; - livekitRoom: LivekitRoom; muteStates: MuteStates; participantCount: number; /** Function to call when the user explicitly ends the call */ onLeave: () => void; header: HeaderStyle; otelGroupCallMembership?: OTelGroupCallMembership; - connState: ECConnectionState; onShareClick: (() => void) | null; } @@ -231,12 +178,10 @@ export const InCallView: FC = ({ matrixInfo, rtcSession, matrixRoom, - livekitRoom, muteStates, participantCount, onLeave, header: headerStyle, - connState, onShareClick, }) => { const { t } = useTranslation(); @@ -245,11 +190,6 @@ export const InCallView: FC = ({ useWakeLock(); - // annoyingly we don't get the disconnection reason this way, - // only by listening for the emitted event - if (connState === ConnectionState.Disconnected) - throw new ConnectionLostError(); - const containerRef1 = useRef(null); const [containerRef2, bounds] = useMeasure(); // Merge the refs so they can attach to the same element @@ -257,10 +197,6 @@ export const InCallView: FC = ({ const { hideScreensharing, showControls } = useUrlParams(); - const { isScreenShareEnabled, localParticipant } = useLocalParticipant({ - room: livekitRoom, - }); - const muteAllAudio = useBehavior(muteAllAudio$); // This seems like it might be enough logic to use move it into the call view model? @@ -653,15 +589,16 @@ export const InCallView: FC = ({ ); const toggleScreensharing = useCallback(() => { - localParticipant - .setScreenShareEnabled(!isScreenShareEnabled, { - audio: true, - selfBrowserSurface: "include", - surfaceSwitching: "include", - systemAudio: "include", - }) - .catch(logger.error); - }, [localParticipant, isScreenShareEnabled]); + throw new Error("TODO-MULTI-SFU"); + // localParticipant + // .setScreenShareEnabled(!isScreenShareEnabled, { + // audio: true, + // selfBrowserSurface: "include", + // surfaceSwitching: "include", + // systemAudio: "include", + // }) + // .catch(logger.error); + }, []); const buttons: JSX.Element[] = []; @@ -688,7 +625,7 @@ export const InCallView: FC = ({ = ({ ) } - + {/* TODO-MULTI-SFU: */} {renderContent()} @@ -813,7 +750,7 @@ export const InCallView: FC = ({ onDismiss={closeSettings} tab={settingsTab} onTabChange={setSettingsTab} - livekitRoom={livekitRoom} + livekitRoom={undefined} // TODO-MULTI-SFU /> )} diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index 73f58cea..e5e567ef 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -8,6 +8,7 @@ Please see LICENSE in the repository root for full details. import { isLivekitFocus, isLivekitFocusConfig, + LivekitFocusConfig, type LivekitFocus, type LivekitFocusActive, type MatrixRTCSession, @@ -31,24 +32,16 @@ export function makeActiveFocus(): LivekitFocusActive { }; } -async function makePreferredLivekitFoci( +export function getLivekitAlias(rtcSession: MatrixRTCSession): string { + // For now we assume everything is a room-scoped call + return rtcSession.room.roomId; +} + +async function makeFocusInternal( rtcSession: MatrixRTCSession, - livekitAlias: string, -): Promise { - logger.log("Start building foci_preferred list: ", rtcSession.room.roomId); - - const preferredFoci: LivekitFocus[] = []; - - // Make the Focus from the running rtc session the highest priority one - // This minimizes how often we need to switch foci during a call. - const focusInUse = rtcSession.getFocusInUse(); - if (focusInUse && isLivekitFocus(focusInUse)) { - logger.log("Adding livekit focus from oldest member: ", focusInUse); - preferredFoci.push(focusInUse); - } - - // Warm up the first focus we owned, to ensure livekit room is created before any state event sent. - let toWarmUp: LivekitFocus | undefined; +): Promise { + logger.log("Searching for a preferred focus"); + const livekitAlias = getLivekitAlias(rtcSession); // Prioritize the .well-known/matrix/client, if available, over the configured SFU const domain = rtcSession.room.client.getDomain(); @@ -59,51 +52,42 @@ async function makePreferredLivekitFoci( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const validWellKnownFoci = wellKnownFoci - .filter((f) => !!f) - .filter(isLivekitFocusConfig) - .map((wellKnownFocus) => { - logger.log("Adding livekit focus from well known: ", wellKnownFocus); - return { ...wellKnownFocus, livekit_alias: livekitAlias }; - }); - if (validWellKnownFoci.length > 0) { - toWarmUp = validWellKnownFoci[0]; + const focus: LivekitFocusConfig | undefined = wellKnownFoci.find( + (f) => f && isLivekitFocusConfig(f), + ); + if (focus !== undefined) { + logger.log("Using LiveKit focus from .well-known: ", focus); + return { ...focus, livekit_alias: livekitAlias }; } - preferredFoci.push(...validWellKnownFoci); } } const urlFromConf = Config.get().livekit?.livekit_service_url; if (urlFromConf) { - const focusFormConf: LivekitFocus = { + const focusFromConf: LivekitFocus = { type: "livekit", livekit_service_url: urlFromConf, livekit_alias: livekitAlias, }; - toWarmUp = toWarmUp ?? focusFormConf; - logger.log("Adding livekit focus from config: ", focusFormConf); - preferredFoci.push(focusFormConf); + logger.log("Using LiveKit focus from config: ", focusFromConf); + return focusFromConf; } - if (toWarmUp) { - // this will call the jwt/sfu/get endpoint to pre create the livekit room. - await getSFUConfigWithOpenID(rtcSession.room.client, toWarmUp); - } - if (preferredFoci.length === 0) - throw new MatrixRTCFocusMissingError(domain ?? ""); - return Promise.resolve(preferredFoci); + throw new MatrixRTCFocusMissingError(domain ?? ""); +} - // TODO: we want to do something like this: - // - // const focusOtherMembers = await focusFromOtherMembers( - // rtcSession, - // livekitAlias, - // ); - // if (focusOtherMembers) preferredFoci.push(focusOtherMembers); +export async function makeFocus( + rtcSession: MatrixRTCSession, +): Promise { + const focus = await makeFocusInternal(rtcSession); + // this will call the jwt/sfu/get endpoint to pre create the livekit room. + await getSFUConfigWithOpenID(rtcSession.room.client, focus); + return focus; } export async function enterRTCSession( rtcSession: MatrixRTCSession, + focus: LivekitFocus, encryptMedia: boolean, useNewMembershipManager = true, useExperimentalToDeviceTransport = false, @@ -115,34 +99,27 @@ export async function enterRTCSession( // have started tracking by the time calls start getting created. // groupCallOTelMembership?.onJoinCall(); - // right now we assume everything is a room-scoped call - const livekitAlias = rtcSession.room.roomId; const { features, matrix_rtc_session: matrixRtcSessionConfig } = Config.get(); const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; - rtcSession.joinRoomSession( - await makePreferredLivekitFoci(rtcSession, livekitAlias), - makeActiveFocus(), - { - notificationType: getUrlParams().sendNotificationType, - useNewMembershipManager, - manageMediaKeys: encryptMedia, - ...(useDeviceSessionMemberEvents !== undefined && { - useLegacyMemberEvents: !useDeviceSessionMemberEvents, - }), - delayedLeaveEventRestartMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_ms, - delayedLeaveEventDelayMs: - matrixRtcSessionConfig?.delayed_leave_event_delay_ms, - delayedLeaveEventRestartLocalTimeoutMs: - matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, - networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, - makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, - membershipEventExpiryMs: - matrixRtcSessionConfig?.membership_event_expiry_ms, - useExperimentalToDeviceTransport, - }, - ); + rtcSession.joinRoomSession([focus], focus, { + notificationType: getUrlParams().sendNotificationType, + useNewMembershipManager, + manageMediaKeys: encryptMedia, + ...(useDeviceSessionMemberEvents !== undefined && { + useLegacyMemberEvents: !useDeviceSessionMemberEvents, + }), + delayedLeaveEventRestartMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_ms, + delayedLeaveEventDelayMs: + matrixRtcSessionConfig?.delayed_leave_event_delay_ms, + delayedLeaveEventRestartLocalTimeoutMs: + matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, + networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, + makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, + membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, + useExperimentalToDeviceTransport, + }); if (widget) { try { await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index d7bf1812..140c76a5 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -12,13 +12,16 @@ import { } from "@livekit/components-core"; import { ConnectionState, - type Room as LivekitRoom, + E2EEOptions, + ExternalE2EEKeyProvider, + Room as LivekitRoom, type LocalParticipant, ParticipantEvent, type RemoteParticipant, } from "livekit-client"; import { ClientEvent, + MatrixClient, RoomStateEvent, SyncState, type Room as MatrixRoom, @@ -39,6 +42,7 @@ import { merge, mergeMap, of, + pairwise, race, scan, skip, @@ -53,6 +57,7 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, + isLivekitFocusConfig, type MatrixRTCSession, MatrixRTCSessionEvent, MembershipManagerEvent, @@ -106,6 +111,15 @@ import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior } from "./Behavior"; +import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; +import { defaultLiveKitOptions } from "../livekit/options"; +import { + enterRTCSession, + getLivekitAlias, + makeFocus, +} from "../rtcSessionHelpers"; +import { E2eeType } from "../e2ee/e2eeType"; +import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -383,6 +397,31 @@ class ScreenShare { type MediaItem = UserMedia | ScreenShare; +function getE2eeOptions( + e2eeSystem: EncryptionSystem, + rtcSession: MatrixRTCSession, +): E2EEOptions | undefined { + if (e2eeSystem.kind === E2eeType.NONE) return undefined; + + if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) { + const keyProvider = new MatrixKeyProvider(); + keyProvider.setRTCSession(rtcSession); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) { + const keyProvider = new ExternalE2EEKeyProvider(); + keyProvider + .setKey(e2eeSystem.secret) + .catch((e) => logger.error("Failed to set shared key for E2EE", e)); + return { + keyProvider, + worker: new E2EEWorker(), + }; + } +} + function getRoomMemberFromRtcMember( rtcMember: CallMembership, room: MatrixRoom, @@ -405,8 +444,151 @@ function getRoomMemberFromRtcMember( return { id, member }; } -// TODO: Move wayyyy more business logic from the call and lobby views into here +class Connection { + // TODO-MULTI-SFU Add all device syncing logic from useLivekit + private readonly sfuConfig = getSFUConfigWithOpenID( + this.client, + this.serviceUrl, + this.livekitAlias, + ); + + public async startSubscribing(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + public async startPublishing(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) + // TODO-MULTI-SFU this should not create a track? + await this.livekitRoom.localParticipant.createTracks({ + audio: { deviceId: "default" }, + }); + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + private stopped = false; + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participants$ = connectedParticipantsObserver( + this.livekitRoom, + ).pipe(this.scope.state()); + + public constructor( + private readonly livekitRoom: LivekitRoom, + private readonly serviceUrl: string, + private readonly livekitAlias: string, + private readonly client: MatrixClient, + private readonly scope: ObservableScope, + ) {} +} + export class CallViewModel extends ViewModel { + private readonly e2eeOptions = getE2eeOptions( + this.encryptionSystem, + this.matrixRTCSession, + ); + + private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); + + private readonly livekitRoom = new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }); + + private readonly localFocus = makeFocus(this.matrixRTCSession); + + private readonly localConnection = this.localFocus.then( + (focus) => + new Connection( + this.livekitRoom, + focus.livekit_service_url, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + ), + ); + + private readonly memberships$ = fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ).pipe(map(() => this.matrixRTCSession.memberships)); + + private readonly foci$ = this.memberships$.pipe( + map( + (memberships) => + new Set( + memberships + .map((m) => this.matrixRTCSession.resolveActiveFocus(m)) + .filter((f) => f !== undefined && isLivekitFocusConfig(f)) + .map((f) => f.livekit_service_url), + ), + ), + ); + + private readonly remoteConnections$ = combineLatest([ + this.localFocus, + this.foci$, + ]).pipe( + accumulate(new Map(), (prev, [localFocus, foci]) => { + const stopped = new Map(prev); + const next = new Map(); + + for (const focus of foci) { + if (focus !== localFocus.livekit_service_url) { + stopped.delete(focus); + next.set( + focus, + prev.get(focus) ?? + new Connection( + new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }), + focus, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + ), + ); + } + } + + for (const connection of stopped.values()) connection.stop(); + return next; + }), + ); + + private readonly joined$ = new Subject(); + + public join(): void { + this.joined$.next(); + } + + public leave(): void { + // TODO + } + + private readonly connectionInstructions$ = this.joined$.pipe( + switchMap(() => this.remoteConnections$), + startWith(new Map()), + pairwise(), + map(([prev, next]) => { + const start = new Set(next.values()); + for (const connection of prev.values()) start.delete(connection); + const stop = new Set(prev.values()); + for (const connection of next.values()) stop.delete(connection); + + return { start, stop }; + }), + ); + private readonly userId = this.matrixRoom.client.getUserId(); private readonly matrixConnected$ = this.scope.behavior( @@ -480,79 +662,13 @@ export class CallViewModel extends ViewModel { // in a split-brained state. private readonly pretendToBeDisconnected$ = this.reconnecting$; - /** - * The raw list of RemoteParticipants as reported by LiveKit - */ - private readonly rawRemoteParticipants$ = this.scope.behavior< - RemoteParticipant[] - >(connectedParticipantsObserver(this.livekitRoom), []); - - /** - * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that - * they've left - */ - private readonly remoteParticipantHolds$ = this.scope.behavior< - RemoteParticipant[][] - >( - this.livekitConnectionState$.pipe( - withLatestFrom(this.rawRemoteParticipants$), - mergeMap(([s, ps]) => { - // Whenever we switch focuses, we should retain all the previous - // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to - // give their clients time to switch over and avoid jarring layout shifts - if (s === ECAddonConnectionState.ECSwitchingFocus) { - return concat( - // Hold these participants - of({ hold: ps }), - // Wait for time to pass and the connection state to have changed - forkJoin([ - timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), - this.livekitConnectionState$.pipe( - filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), - take(1), - ), - // Then unhold them - ]).pipe(map(() => ({ unhold: ps }))), - ); - } else { - return EMPTY; - } - }), - // Accumulate the hold instructions into a single list showing which - // participants are being held - accumulate([] as RemoteParticipant[][], (holds, instruction) => - "hold" in instruction - ? [instruction.hold, ...holds] - : holds.filter((h) => h !== instruction.unhold), - ), - ), - ); - /** * The RemoteParticipants including those that are being "held" on the screen */ private readonly remoteParticipants$ = this.scope - .behavior( - combineLatest( - [this.rawRemoteParticipants$, this.remoteParticipantHolds$], - (raw, holds) => { - const result = [...raw]; - const resultIds = new Set(result.map((p) => p.identity)); - - // Incorporate the held participants into the list - for (const hold of holds) { - for (const p of hold) { - if (!resultIds.has(p.identity)) { - result.push(p); - resultIds.add(p.identity); - } - } - } - - return result; - }, - ), - ) + .behavior< + RemoteParticipant[] + >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) .pipe(pauseWhen(this.pretendToBeDisconnected$)); /** @@ -1574,22 +1690,38 @@ export class CallViewModel extends ViewModel { ); public constructor( - // A call is permanently tied to a single Matrix room and LiveKit room + // A call is permanently tied to a single Matrix room private readonly matrixRTCSession: MatrixRTCSession, private readonly matrixRoom: MatrixRoom, - private readonly livekitRoom: LivekitRoom, private readonly mediaDevices: MediaDevices, private readonly options: CallViewModelOptions, - private readonly livekitConnectionState$: Observable, private readonly handsRaisedSubject$: Observable< Record >, private readonly reactionsSubject$: Observable< Record >, + private readonly encryptionSystem: EncryptionSystem, ) { super(); + void this.localConnection.then((c) => c.startPublishing()); + this.connectionInstructions$ + .pipe(this.scope.bind()) + .subscribe(({ start, stop }) => { + for (const connection of start) connection.startSubscribing(); + for (const connection of stop) connection.stop(); + }); + combineLatest([this.localFocus, this.joined$]) + .pipe(this.scope.bind()) + .subscribe(([localFocus]) => { + enterRTCSession( + this.matrixRTCSession, + localFocus, + this.encryptionSystem.kind !== E2eeType.PER_PARTICIPANT, + ); + }); + // Pause upstream of all local media tracks when we're disconnected from // MatrixRTC, because it can be an unpleasant surprise for the app to say // 'reconnecting' and yet still be transmitting your media to others. diff --git a/yarn.lock b/yarn.lock index 94f8feaf..e923cf78 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10278,9 +10278,9 @@ __metadata: languageName: node linkType: hard -"matrix-js-sdk@github:matrix-org/matrix-js-sdk#head=develop": - version: 37.13.0 - resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=3a33c658bbcb8ce8791ec066db899f2571f5c52f" +"matrix-js-sdk@portal:/Users/timo/Projects/matrix-js-sdk::locator=element-call%40workspace%3A.": + version: 0.0.0-use.local + resolution: "matrix-js-sdk@portal:/Users/timo/Projects/matrix-js-sdk::locator=element-call%40workspace%3A." dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.1.0" @@ -10296,9 +10296,8 @@ __metadata: sdp-transform: "npm:^2.14.1" unhomoglyph: "npm:^1.0.6" uuid: "npm:11" - checksum: 10c0/1db0d39cfbe4f1c69c8acda0ea7580a4819fc47a7d4bff057382e33e72d9a610f8c03043a6c00bc647dfdc2815aa643c69d25022fb759342a92b77e1841524f1 languageName: node - linkType: hard + linkType: soft "matrix-widget-api@npm:^1.10.0, matrix-widget-api@npm:^1.13.0": version: 1.13.1