Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-27 14:01:01 +02:00
parent 38e60208ef
commit 1ee35066d8
8 changed files with 457 additions and 320 deletions

View File

@@ -78,6 +78,8 @@ export function MatrixAudioRenderer({
loggedInvalidIdentities.current.add(identity);
};
// TODO-MULTI-SFU this uses the livekit room form the context. We need to change it so it uses the
// livekit room explicitly so we can pass a list of rooms into the audio renderer and call useTracks for each room.
const tracks = useTracks(
[
Track.Source.Microphone,

View File

@@ -0,0 +1,123 @@
/*
Copyright 2023, 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
ConnectionState,
type E2EEManagerOptions,
ExternalE2EEKeyProvider,
LocalVideoTrack,
Room,
type RoomOptions,
} from "livekit-client";
import { useEffect, useRef } from "react";
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import { logger } from "matrix-js-sdk/lib/logger";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import { defaultLiveKitOptions } from "./options";
import { type SFUConfig } from "./openIDSFU";
import { type MuteStates } from "../room/MuteStates";
import { useMediaDevices } from "../MediaDevicesContext";
import {
type ECConnectionState,
useECConnectionState,
} from "./useECConnectionState";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { E2eeType } from "../e2ee/e2eeType";
import { type EncryptionSystem } from "../e2ee/sharedKeyManagement";
import {
useTrackProcessor,
useTrackProcessorSync,
} from "./TrackProcessorContext";
import { observeTrackReference$ } from "../state/MediaViewModel";
import { useUrlParams } from "../UrlParams";
import { useInitial } from "../useInitial";
import { getValue } from "../utils/observable";
import { type SelectedDevice } from "../state/MediaDevices";
interface UseLivekitResult {
livekitPublicationRoom?: Room;
connState: ECConnectionState;
}
// TODO-MULTI-SFU This is all the logic we need in the subscription connection logic (sync output devices)
// This is not used! (but summarizes what we need)
export function livekitSubscriptionRoom(
rtcSession: MatrixRTCSession,
muteStates: MuteStates,
sfuConfig: SFUConfig | undefined,
e2eeSystem: EncryptionSystem,
): UseLivekitResult {
// Only ever create the room once via useInitial.
// The call can end up with multiple livekit rooms. This is the particular room in
// which this participant publishes their media.
const publicationRoom = useInitial(() => {
logger.info("[LivekitRoom] Create LiveKit room");
let e2ee: E2EEManagerOptions | undefined;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
logger.info("Created MatrixKeyProvider (per participant)");
e2ee = {
keyProvider: new MatrixKeyProvider(),
worker: new E2EEWorker(),
};
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
logger.info("Created ExternalE2EEKeyProvider (shared key)");
e2ee = {
keyProvider: new ExternalE2EEKeyProvider(),
worker: new E2EEWorker(),
};
}
const roomOptions: RoomOptions = {
...defaultLiveKitOptions,
audioOutput: {
// When using controlled audio devices, we don't want to set the
// deviceId here, because it will be set by the native app.
// (also the id does not need to match a browser device id)
deviceId: controlledAudioDevices
? undefined
: getValue(devices.audioOutput.selected$)?.id,
},
e2ee,
};
// We have to create the room manually here due to a bug inside
// @livekit/components-react. JSON.stringify() is used in deps of a
// useEffect() with an argument that references itself, if E2EE is enabled
const room = new Room(roomOptions);
room.setE2EEEnabled(e2eeSystem.kind !== E2eeType.NONE).catch((e) => {
logger.error("Failed to set E2EE enabled on room", e);
});
return room;
});
// Setup and update the keyProvider which was create by `createRoom`
useEffect(() => {
const e2eeOptions = publicationRoom.options.e2ee;
if (
e2eeSystem.kind === E2eeType.NONE ||
!(e2eeOptions && "keyProvider" in e2eeOptions)
)
return;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
(e2eeOptions.keyProvider as MatrixKeyProvider).setRTCSession(rtcSession);
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
(e2eeOptions.keyProvider as ExternalE2EEKeyProvider)
.setKey(e2eeSystem.secret)
.catch((e) => {
logger.error("Failed to set shared key for E2EE", e);
});
}
}, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]);
return {
connState: connectionState,
livekitPublicationRoom: publicationRoom,
};
}

View File

@@ -7,12 +7,7 @@ Please see LICENSE in the repository root for full details.
import { type IOpenIDToken, type MatrixClient } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import { useEffect, useState } from "react";
import { type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc";
import { useActiveLivekitFocus } from "../room/useActiveFocus";
import { useErrorBoundary } from "../useErrorBoundary";
import { FailToGetOpenIdToken } from "../utils/errors";
import { doNetworkOperationWithRetry } from "../utils/matrix";
@@ -34,38 +29,11 @@ export type OpenIDClientParts = Pick<
"getOpenIdToken" | "getDeviceId"
>;
export function useOpenIDSFU(
client: OpenIDClientParts,
rtcSession: MatrixRTCSession,
): SFUConfig | undefined {
const [sfuConfig, setSFUConfig] = useState<SFUConfig | undefined>(undefined);
const activeFocus = useActiveLivekitFocus(rtcSession);
const { showErrorBoundary } = useErrorBoundary();
useEffect(() => {
if (activeFocus) {
getSFUConfigWithOpenID(client, activeFocus).then(
(sfuConfig) => {
setSFUConfig(sfuConfig);
},
(e) => {
showErrorBoundary(new FailToGetOpenIdToken(e));
logger.error("Failed to get SFU config", e);
},
);
} else {
setSFUConfig(undefined);
}
}, [client, activeFocus, showErrorBoundary]);
return sfuConfig;
}
export async function getSFUConfigWithOpenID(
client: OpenIDClientParts,
activeFocus: LivekitFocus,
): Promise<SFUConfig | undefined> {
serviceUrl: string,
livekitAlias: string,
): Promise<SFUConfig> {
let openIdToken: IOpenIDToken;
try {
openIdToken = await doNetworkOperationWithRetry(async () =>
@@ -78,26 +46,16 @@ export async function getSFUConfigWithOpenID(
}
logger.debug("Got openID token", openIdToken);
try {
logger.info(
`Trying to get JWT from call's active focus URL of ${activeFocus.livekit_service_url}...`,
);
const sfuConfig = await getLiveKitJWT(
client,
activeFocus.livekit_service_url,
activeFocus.livekit_alias,
openIdToken,
);
logger.info(`Got JWT from call's active focus URL.`);
logger.info(`Trying to get JWT for focus ${serviceUrl}...`);
const sfuConfig = await getLiveKitJWT(
client,
serviceUrl,
livekitAlias,
openIdToken,
);
logger.info(`Got JWT from call's active focus URL.`);
return sfuConfig;
} catch (e) {
logger.warn(
`Failed to get JWT from RTC session's active focus URL of ${activeFocus.livekit_service_url}.`,
e,
);
return undefined;
}
return sfuConfig;
}
async function getLiveKitJWT(

View File

@@ -49,11 +49,12 @@ import { getValue } from "../utils/observable";
import { type SelectedDevice } from "../state/MediaDevices";
interface UseLivekitResult {
livekitRoom?: Room;
livekitPublicationRoom?: Room;
connState: ECConnectionState;
}
export function useLivekit(
// TODO-MULTI-SFU This is not used anymore but the device syncing logic needs to be moved into the connection object.
export function useLivekitPublicationRoom(
rtcSession: MatrixRTCSession,
muteStates: MuteStates,
sfuConfig: SFUConfig | undefined,
@@ -82,7 +83,9 @@ export function useLivekit(
const { processor } = useTrackProcessor();
// Only ever create the room once via useInitial.
const room = useInitial(() => {
// The call can end up with multiple livekit rooms. This is the particular room in
// which this participant publishes their media.
const publicationRoom = useInitial(() => {
logger.info("[LivekitRoom] Create LiveKit room");
let e2ee: E2EEManagerOptions | undefined;
@@ -134,7 +137,7 @@ export function useLivekit(
// Setup and update the keyProvider which was create by `createRoom`
useEffect(() => {
const e2eeOptions = room.options.e2ee;
const e2eeOptions = publicationRoom.options.e2ee;
if (
e2eeSystem.kind === E2eeType.NONE ||
!(e2eeOptions && "keyProvider" in e2eeOptions)
@@ -150,7 +153,7 @@ export function useLivekit(
logger.error("Failed to set shared key for E2EE", e);
});
}
}, [room.options.e2ee, e2eeSystem, rtcSession]);
}, [publicationRoom.options.e2ee, e2eeSystem, rtcSession]);
// Sync the requested track processors with LiveKit
useTrackProcessorSync(
@@ -169,7 +172,7 @@ export function useLivekit(
return track instanceof LocalVideoTrack ? track : null;
}),
),
[room],
[publicationRoom],
),
),
);
@@ -177,7 +180,7 @@ export function useLivekit(
const connectionState = useECConnectionState(
initialAudioInputId,
initialMuteStates.audio.enabled,
room,
publicationRoom,
sfuConfig,
);
@@ -188,8 +191,11 @@ export function useLivekit(
// It's important that we only do this in the connected state, because
// LiveKit's internal mute states aren't consistent during connection setup,
// and setting tracks to be enabled during this time causes errors.
if (room !== undefined && connectionState === ConnectionState.Connected) {
const participant = room.localParticipant;
if (
publicationRoom !== undefined &&
connectionState === ConnectionState.Connected
) {
const participant = publicationRoom.localParticipant;
// Always update the muteButtonState Ref so that we can read the current
// state in awaited blocks.
buttonEnabled.current = {
@@ -247,7 +253,7 @@ export function useLivekit(
audioMuteUpdating.current = true;
trackPublication = await participant.setMicrophoneEnabled(
buttonEnabled.current.audio,
room.options.audioCaptureDefaults,
publicationRoom.options.audioCaptureDefaults,
);
audioMuteUpdating.current = false;
break;
@@ -255,7 +261,7 @@ export function useLivekit(
videoMuteUpdating.current = true;
trackPublication = await participant.setCameraEnabled(
buttonEnabled.current.video,
room.options.videoCaptureDefaults,
publicationRoom.options.videoCaptureDefaults,
);
videoMuteUpdating.current = false;
break;
@@ -319,11 +325,14 @@ export function useLivekit(
logger.error("Failed to sync video mute state with LiveKit", e);
});
}
}, [room, muteStates, connectionState]);
}, [publicationRoom, muteStates, connectionState]);
useEffect(() => {
// Sync the requested devices with LiveKit's devices
if (room !== undefined && connectionState === ConnectionState.Connected) {
if (
publicationRoom !== undefined &&
connectionState === ConnectionState.Connected
) {
const syncDevice = (
kind: MediaDeviceKind,
selected$: Observable<SelectedDevice | undefined>,
@@ -331,15 +340,15 @@ export function useLivekit(
selected$.subscribe((device) => {
logger.info(
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
room.getActiveDevice(kind),
publicationRoom.getActiveDevice(kind),
" !== ",
device?.id,
);
if (
device !== undefined &&
room.getActiveDevice(kind) !== device.id
publicationRoom.getActiveDevice(kind) !== device.id
) {
room
publicationRoom
.switchActiveDevice(kind, device.id)
.catch((e) =>
logger.error(`Failed to sync ${kind} device with LiveKit`, e),
@@ -365,7 +374,7 @@ export function useLivekit(
.pipe(switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER))
.subscribe(() => {
const activeMicTrack = Array.from(
room.localParticipant.audioTrackPublications.values(),
publicationRoom.localParticipant.audioTrackPublications.values(),
).find((d) => d.source === Track.Source.Microphone)?.track;
if (
@@ -380,7 +389,7 @@ export function useLivekit(
// getUserMedia() call with deviceId: default to get the *new* default device.
// Note that room.switchActiveDevice() won't work: Livekit will ignore it because
// the deviceId hasn't changed (was & still is default).
room.localParticipant
publicationRoom.localParticipant
.getTrackPublication(Track.Source.Microphone)
?.audioTrack?.restartTrack()
.catch((e) => {
@@ -394,10 +403,10 @@ export function useLivekit(
for (const s of subscriptions) s?.unsubscribe();
};
}
}, [room, devices, connectionState, controlledAudioDevices]);
}, [publicationRoom, devices, connectionState, controlledAudioDevices]);
return {
connState: connectionState,
livekitRoom: room,
livekitPublicationRoom: publicationRoom,
};
}

View File

@@ -5,9 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { RoomContext, useLocalParticipant } from "@livekit/components-react";
import { IconButton, Text, Tooltip } from "@vector-im/compound-web";
import { ConnectionState, type Room as LivekitRoom } from "livekit-client";
import { type MatrixClient, type Room as MatrixRoom } from "matrix-js-sdk";
import {
type FC,
@@ -55,15 +53,12 @@ import { type OTelGroupCallMembership } from "../otel/OTelGroupCallMembership";
import { SettingsModal, defaultSettingsTab } from "../settings/SettingsModal";
import { useRageshakeRequestModal } from "../settings/submit-rageshake";
import { RageshakeRequestModal } from "./RageshakeRequestModal";
import { useLivekit } from "../livekit/useLivekit.ts";
import { useWakeLock } from "../useWakeLock";
import { useMergedRefs } from "../useMergedRefs";
import { type MuteStates } from "./MuteStates";
import { type MatrixInfo } from "./VideoPreview";
import { InviteButton } from "../button/InviteButton";
import { LayoutToggle } from "./LayoutToggle";
import { type ECConnectionState } from "../livekit/useECConnectionState";
import { useOpenIDSFU } from "../livekit/openIDSFU";
import {
CallViewModel,
type GridMode,
@@ -102,7 +97,6 @@ import {
useSetting,
} from "../settings/settings";
import { ReactionsReader } from "../reactions/ReactionsReader";
import { ConnectionLostError } from "../utils/errors.ts";
import { useTypedEventEmitter } from "../useEvents.ts";
import { MatrixAudioRenderer } from "../livekit/MatrixAudioRenderer.tsx";
import { muteAllAudio$ } from "../state/MuteAllAudioModel.ts";
@@ -124,87 +118,42 @@ export interface ActiveCallProps
export const ActiveCall: FC<ActiveCallProps> = (props) => {
const mediaDevices = useMediaDevices();
const sfuConfig = useOpenIDSFU(props.client, props.rtcSession);
const { livekitRoom, connState } = useLivekit(
props.rtcSession,
props.muteStates,
sfuConfig,
props.e2eeSystem,
);
const connStateObservable$ = useObservable(
(inputs$) => inputs$.pipe(map(([connState]) => connState)),
[connState],
);
const [vm, setVm] = useState<CallViewModel | null>(null);
useEffect(() => {
logger.info(
`[Lifecycle] InCallView Component mounted, livekit room state ${livekitRoom?.state}`,
);
return (): void => {
logger.info(
`[Lifecycle] InCallView Component unmounted, livekit room state ${livekitRoom?.state}`,
);
livekitRoom
?.disconnect()
.then(() => {
logger.info(
`[Lifecycle] Disconnected from livekit room, state:${livekitRoom?.state}`,
);
})
.catch((e) => {
logger.error("[Lifecycle] Failed to disconnect from livekit room", e);
});
};
}, [livekitRoom]);
const { autoLeaveWhenOthersLeft } = useUrlParams();
useEffect(() => {
if (livekitRoom !== undefined) {
const reactionsReader = new ReactionsReader(props.rtcSession);
const vm = new CallViewModel(
props.rtcSession,
props.matrixRoom,
livekitRoom,
mediaDevices,
{
encryptionSystem: props.e2eeSystem,
autoLeaveWhenOthersLeft,
},
connStateObservable$,
reactionsReader.raisedHands$,
reactionsReader.reactions$,
);
setVm(vm);
return (): void => {
vm.destroy();
reactionsReader.destroy();
};
}
const reactionsReader = new ReactionsReader(props.rtcSession);
const vm = new CallViewModel(
props.rtcSession,
props.matrixRoom,
mediaDevices,
{
encryptionSystem: props.e2eeSystem,
autoLeaveWhenOthersLeft,
},
reactionsReader.raisedHands$,
reactionsReader.reactions$,
);
setVm(vm);
return (): void => {
vm.destroy();
reactionsReader.destroy();
};
}, [
props.rtcSession,
props.matrixRoom,
livekitRoom,
mediaDevices,
props.e2eeSystem,
connStateObservable$,
autoLeaveWhenOthersLeft,
]);
if (livekitRoom === undefined || vm === null) return null;
if (vm === null) return null;
return (
<RoomContext value={livekitRoom}>
<ReactionsSenderProvider vm={vm} rtcSession={props.rtcSession}>
<InCallView
{...props}
vm={vm}
livekitRoom={livekitRoom}
connState={connState}
/>
</ReactionsSenderProvider>
</RoomContext>
<ReactionsSenderProvider vm={vm} rtcSession={props.rtcSession}>
<InCallView {...props} vm={vm} />
</ReactionsSenderProvider>
);
};
@@ -214,14 +163,12 @@ export interface InCallViewProps {
matrixInfo: MatrixInfo;
rtcSession: MatrixRTCSession;
matrixRoom: MatrixRoom;
livekitRoom: LivekitRoom;
muteStates: MuteStates;
participantCount: number;
/** Function to call when the user explicitly ends the call */
onLeave: () => void;
header: HeaderStyle;
otelGroupCallMembership?: OTelGroupCallMembership;
connState: ECConnectionState;
onShareClick: (() => void) | null;
}
@@ -231,12 +178,10 @@ export const InCallView: FC<InCallViewProps> = ({
matrixInfo,
rtcSession,
matrixRoom,
livekitRoom,
muteStates,
participantCount,
onLeave,
header: headerStyle,
connState,
onShareClick,
}) => {
const { t } = useTranslation();
@@ -245,11 +190,6 @@ export const InCallView: FC<InCallViewProps> = ({
useWakeLock();
// annoyingly we don't get the disconnection reason this way,
// only by listening for the emitted event
if (connState === ConnectionState.Disconnected)
throw new ConnectionLostError();
const containerRef1 = useRef<HTMLDivElement | null>(null);
const [containerRef2, bounds] = useMeasure();
// Merge the refs so they can attach to the same element
@@ -257,10 +197,6 @@ export const InCallView: FC<InCallViewProps> = ({
const { hideScreensharing, showControls } = useUrlParams();
const { isScreenShareEnabled, localParticipant } = useLocalParticipant({
room: livekitRoom,
});
const muteAllAudio = useBehavior(muteAllAudio$);
// This seems like it might be enough logic to use move it into the call view model?
@@ -653,15 +589,16 @@ export const InCallView: FC<InCallViewProps> = ({
);
const toggleScreensharing = useCallback(() => {
localParticipant
.setScreenShareEnabled(!isScreenShareEnabled, {
audio: true,
selfBrowserSurface: "include",
surfaceSwitching: "include",
systemAudio: "include",
})
.catch(logger.error);
}, [localParticipant, isScreenShareEnabled]);
throw new Error("TODO-MULTI-SFU");
// localParticipant
// .setScreenShareEnabled(!isScreenShareEnabled, {
// audio: true,
// selfBrowserSurface: "include",
// surfaceSwitching: "include",
// systemAudio: "include",
// })
// .catch(logger.error);
}, []);
const buttons: JSX.Element[] = [];
@@ -688,7 +625,7 @@ export const InCallView: FC<InCallViewProps> = ({
<ShareScreenButton
key="share_screen"
className={styles.shareScreen}
enabled={isScreenShareEnabled}
enabled={false} // TODO-MULTI-SFU
onClick={toggleScreensharing}
onTouchEnd={onControlsTouchEnd}
data-testid="incall_screenshare"
@@ -786,7 +723,7 @@ export const InCallView: FC<InCallViewProps> = ({
</Text>
)
}
<MatrixAudioRenderer members={memberships} muted={muteAllAudio} />
{/* TODO-MULTI-SFU: <MatrixAudioRenderer members={memberships} muted={muteAllAudio} /> */}
{renderContent()}
<CallEventAudioRenderer vm={vm} muted={muteAllAudio} />
<ReactionsAudioRenderer vm={vm} muted={muteAllAudio} />
@@ -813,7 +750,7 @@ export const InCallView: FC<InCallViewProps> = ({
onDismiss={closeSettings}
tab={settingsTab}
onTabChange={setSettingsTab}
livekitRoom={livekitRoom}
livekitRoom={undefined} // TODO-MULTI-SFU
/>
</>
)}

View File

@@ -8,6 +8,7 @@ Please see LICENSE in the repository root for full details.
import {
isLivekitFocus,
isLivekitFocusConfig,
LivekitFocusConfig,
type LivekitFocus,
type LivekitFocusActive,
type MatrixRTCSession,
@@ -31,24 +32,16 @@ export function makeActiveFocus(): LivekitFocusActive {
};
}
async function makePreferredLivekitFoci(
export function getLivekitAlias(rtcSession: MatrixRTCSession): string {
// For now we assume everything is a room-scoped call
return rtcSession.room.roomId;
}
async function makeFocusInternal(
rtcSession: MatrixRTCSession,
livekitAlias: string,
): Promise<LivekitFocus[]> {
logger.log("Start building foci_preferred list: ", rtcSession.room.roomId);
const preferredFoci: LivekitFocus[] = [];
// Make the Focus from the running rtc session the highest priority one
// This minimizes how often we need to switch foci during a call.
const focusInUse = rtcSession.getFocusInUse();
if (focusInUse && isLivekitFocus(focusInUse)) {
logger.log("Adding livekit focus from oldest member: ", focusInUse);
preferredFoci.push(focusInUse);
}
// Warm up the first focus we owned, to ensure livekit room is created before any state event sent.
let toWarmUp: LivekitFocus | undefined;
): Promise<LivekitFocus> {
logger.log("Searching for a preferred focus");
const livekitAlias = getLivekitAlias(rtcSession);
// Prioritize the .well-known/matrix/client, if available, over the configured SFU
const domain = rtcSession.room.client.getDomain();
@@ -59,51 +52,42 @@ async function makePreferredLivekitFoci(
FOCI_WK_KEY
];
if (Array.isArray(wellKnownFoci)) {
const validWellKnownFoci = wellKnownFoci
.filter((f) => !!f)
.filter(isLivekitFocusConfig)
.map((wellKnownFocus) => {
logger.log("Adding livekit focus from well known: ", wellKnownFocus);
return { ...wellKnownFocus, livekit_alias: livekitAlias };
});
if (validWellKnownFoci.length > 0) {
toWarmUp = validWellKnownFoci[0];
const focus: LivekitFocusConfig | undefined = wellKnownFoci.find(
(f) => f && isLivekitFocusConfig(f),
);
if (focus !== undefined) {
logger.log("Using LiveKit focus from .well-known: ", focus);
return { ...focus, livekit_alias: livekitAlias };
}
preferredFoci.push(...validWellKnownFoci);
}
}
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
const focusFormConf: LivekitFocus = {
const focusFromConf: LivekitFocus = {
type: "livekit",
livekit_service_url: urlFromConf,
livekit_alias: livekitAlias,
};
toWarmUp = toWarmUp ?? focusFormConf;
logger.log("Adding livekit focus from config: ", focusFormConf);
preferredFoci.push(focusFormConf);
logger.log("Using LiveKit focus from config: ", focusFromConf);
return focusFromConf;
}
if (toWarmUp) {
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID(rtcSession.room.client, toWarmUp);
}
if (preferredFoci.length === 0)
throw new MatrixRTCFocusMissingError(domain ?? "");
return Promise.resolve(preferredFoci);
throw new MatrixRTCFocusMissingError(domain ?? "");
}
// TODO: we want to do something like this:
//
// const focusOtherMembers = await focusFromOtherMembers(
// rtcSession,
// livekitAlias,
// );
// if (focusOtherMembers) preferredFoci.push(focusOtherMembers);
export async function makeFocus(
rtcSession: MatrixRTCSession,
): Promise<LivekitFocus> {
const focus = await makeFocusInternal(rtcSession);
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID(rtcSession.room.client, focus);
return focus;
}
export async function enterRTCSession(
rtcSession: MatrixRTCSession,
focus: LivekitFocus,
encryptMedia: boolean,
useNewMembershipManager = true,
useExperimentalToDeviceTransport = false,
@@ -115,34 +99,27 @@ export async function enterRTCSession(
// have started tracking by the time calls start getting created.
// groupCallOTelMembership?.onJoinCall();
// right now we assume everything is a room-scoped call
const livekitAlias = rtcSession.room.roomId;
const { features, matrix_rtc_session: matrixRtcSessionConfig } = Config.get();
const useDeviceSessionMemberEvents =
features?.feature_use_device_session_member_events;
rtcSession.joinRoomSession(
await makePreferredLivekitFoci(rtcSession, livekitAlias),
makeActiveFocus(),
{
notificationType: getUrlParams().sendNotificationType,
useNewMembershipManager,
manageMediaKeys: encryptMedia,
...(useDeviceSessionMemberEvents !== undefined && {
useLegacyMemberEvents: !useDeviceSessionMemberEvents,
}),
delayedLeaveEventRestartMs:
matrixRtcSessionConfig?.delayed_leave_event_restart_ms,
delayedLeaveEventDelayMs:
matrixRtcSessionConfig?.delayed_leave_event_delay_ms,
delayedLeaveEventRestartLocalTimeoutMs:
matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms,
networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms,
makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms,
membershipEventExpiryMs:
matrixRtcSessionConfig?.membership_event_expiry_ms,
useExperimentalToDeviceTransport,
},
);
rtcSession.joinRoomSession([focus], focus, {
notificationType: getUrlParams().sendNotificationType,
useNewMembershipManager,
manageMediaKeys: encryptMedia,
...(useDeviceSessionMemberEvents !== undefined && {
useLegacyMemberEvents: !useDeviceSessionMemberEvents,
}),
delayedLeaveEventRestartMs:
matrixRtcSessionConfig?.delayed_leave_event_restart_ms,
delayedLeaveEventDelayMs:
matrixRtcSessionConfig?.delayed_leave_event_delay_ms,
delayedLeaveEventRestartLocalTimeoutMs:
matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms,
networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms,
makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms,
membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms,
useExperimentalToDeviceTransport,
});
if (widget) {
try {
await widget.api.transport.send(ElementWidgetActions.JoinCall, {});

View File

@@ -12,13 +12,16 @@ import {
} from "@livekit/components-core";
import {
ConnectionState,
type Room as LivekitRoom,
E2EEOptions,
ExternalE2EEKeyProvider,
Room as LivekitRoom,
type LocalParticipant,
ParticipantEvent,
type RemoteParticipant,
} from "livekit-client";
import {
ClientEvent,
MatrixClient,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
@@ -39,6 +42,7 @@ import {
merge,
mergeMap,
of,
pairwise,
race,
scan,
skip,
@@ -53,6 +57,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocusConfig,
type MatrixRTCSession,
MatrixRTCSessionEvent,
MembershipManagerEvent,
@@ -106,6 +111,15 @@ import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { type Behavior } from "./Behavior";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { defaultLiveKitOptions } from "../livekit/options";
import {
enterRTCSession,
getLivekitAlias,
makeFocus,
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -383,6 +397,31 @@ class ScreenShare {
type MediaItem = UserMedia | ScreenShare;
function getE2eeOptions(
e2eeSystem: EncryptionSystem,
rtcSession: MatrixRTCSession,
): E2EEOptions | undefined {
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
const keyProvider = new MatrixKeyProvider();
keyProvider.setRTCSession(rtcSession);
return {
keyProvider,
worker: new E2EEWorker(),
};
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
const keyProvider = new ExternalE2EEKeyProvider();
keyProvider
.setKey(e2eeSystem.secret)
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
return {
keyProvider,
worker: new E2EEWorker(),
};
}
}
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
@@ -405,8 +444,151 @@ function getRoomMemberFromRtcMember(
return { id, member };
}
// TODO: Move wayyyy more business logic from the call and lobby views into here
class Connection {
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
private readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.serviceUrl,
this.livekitAlias,
);
public async startSubscribing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
public async startPublishing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped)
// TODO-MULTI-SFU this should not create a track?
await this.livekitRoom.localParticipant.createTracks({
audio: { deviceId: "default" },
});
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
private stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participants$ = connectedParticipantsObserver(
this.livekitRoom,
).pipe(this.scope.state());
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
) {}
}
export class CallViewModel extends ViewModel {
private readonly e2eeOptions = getE2eeOptions(
this.encryptionSystem,
this.matrixRTCSession,
);
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly livekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
});
private readonly localFocus = makeFocus(this.matrixRTCSession);
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
this.livekitRoom,
focus.livekit_service_url,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
),
);
private readonly memberships$ = fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
).pipe(map(() => this.matrixRTCSession.memberships));
private readonly foci$ = this.memberships$.pipe(
map(
(memberships) =>
new Set(
memberships
.map((m) => this.matrixRTCSession.resolveActiveFocus(m))
.filter((f) => f !== undefined && isLivekitFocusConfig(f))
.map((f) => f.livekit_service_url),
),
),
);
private readonly remoteConnections$ = combineLatest([
this.localFocus,
this.foci$,
]).pipe(
accumulate(new Map<string, Connection>(), (prev, [localFocus, foci]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focus of foci) {
if (focus !== localFocus.livekit_service_url) {
stopped.delete(focus);
next.set(
focus,
prev.get(focus) ??
new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
),
);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
}),
);
private readonly joined$ = new Subject<void>();
public join(): void {
this.joined$.next();
}
public leave(): void {
// TODO
}
private readonly connectionInstructions$ = this.joined$.pipe(
switchMap(() => this.remoteConnections$),
startWith(new Map<string, Connection>()),
pairwise(),
map(([prev, next]) => {
const start = new Set(next.values());
for (const connection of prev.values()) start.delete(connection);
const stop = new Set(prev.values());
for (const connection of next.values()) stop.delete(connection);
return { start, stop };
}),
);
private readonly userId = this.matrixRoom.client.getUserId();
private readonly matrixConnected$ = this.scope.behavior(
@@ -480,79 +662,13 @@ export class CallViewModel extends ViewModel {
// in a split-brained state.
private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
* The raw list of RemoteParticipants as reported by LiveKit
*/
private readonly rawRemoteParticipants$ = this.scope.behavior<
RemoteParticipant[]
>(connectedParticipantsObserver(this.livekitRoom), []);
/**
* Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that
* they've left
*/
private readonly remoteParticipantHolds$ = this.scope.behavior<
RemoteParticipant[][]
>(
this.livekitConnectionState$.pipe(
withLatestFrom(this.rawRemoteParticipants$),
mergeMap(([s, ps]) => {
// Whenever we switch focuses, we should retain all the previous
// participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to
// give their clients time to switch over and avoid jarring layout shifts
if (s === ECAddonConnectionState.ECSwitchingFocus) {
return concat(
// Hold these participants
of({ hold: ps }),
// Wait for time to pass and the connection state to have changed
forkJoin([
timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS),
this.livekitConnectionState$.pipe(
filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus),
take(1),
),
// Then unhold them
]).pipe(map(() => ({ unhold: ps }))),
);
} else {
return EMPTY;
}
}),
// Accumulate the hold instructions into a single list showing which
// participants are being held
accumulate([] as RemoteParticipant[][], (holds, instruction) =>
"hold" in instruction
? [instruction.hold, ...holds]
: holds.filter((h) => h !== instruction.unhold),
),
),
);
/**
* The RemoteParticipants including those that are being "held" on the screen
*/
private readonly remoteParticipants$ = this.scope
.behavior<RemoteParticipant[]>(
combineLatest(
[this.rawRemoteParticipants$, this.remoteParticipantHolds$],
(raw, holds) => {
const result = [...raw];
const resultIds = new Set(result.map((p) => p.identity));
// Incorporate the held participants into the list
for (const hold of holds) {
for (const p of hold) {
if (!resultIds.has(p.identity)) {
result.push(p);
resultIds.add(p.identity);
}
}
}
return result;
},
),
)
.behavior<
RemoteParticipant[]
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
.pipe(pauseWhen(this.pretendToBeDisconnected$));
/**
@@ -1574,22 +1690,38 @@ export class CallViewModel extends ViewModel {
);
public constructor(
// A call is permanently tied to a single Matrix room and LiveKit room
// A call is permanently tied to a single Matrix room
private readonly matrixRTCSession: MatrixRTCSession,
private readonly matrixRoom: MatrixRoom,
private readonly livekitRoom: LivekitRoom,
private readonly mediaDevices: MediaDevices,
private readonly options: CallViewModelOptions,
private readonly livekitConnectionState$: Observable<ECConnectionState>,
private readonly handsRaisedSubject$: Observable<
Record<string, RaisedHandInfo>
>,
private readonly reactionsSubject$: Observable<
Record<string, ReactionInfo>
>,
private readonly encryptionSystem: EncryptionSystem,
) {
super();
void this.localConnection.then((c) => c.startPublishing());
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const connection of start) connection.startSubscribing();
for (const connection of stop) connection.stop();
});
combineLatest([this.localFocus, this.joined$])
.pipe(this.scope.bind())
.subscribe(([localFocus]) => {
enterRTCSession(
this.matrixRTCSession,
localFocus,
this.encryptionSystem.kind !== E2eeType.PER_PARTICIPANT,
);
});
// Pause upstream of all local media tracks when we're disconnected from
// MatrixRTC, because it can be an unpleasant surprise for the app to say
// 'reconnecting' and yet still be transmitting your media to others.

View File

@@ -10278,9 +10278,9 @@ __metadata:
languageName: node
linkType: hard
"matrix-js-sdk@github:matrix-org/matrix-js-sdk#head=develop":
version: 37.13.0
resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=3a33c658bbcb8ce8791ec066db899f2571f5c52f"
"matrix-js-sdk@portal:/Users/timo/Projects/matrix-js-sdk::locator=element-call%40workspace%3A.":
version: 0.0.0-use.local
resolution: "matrix-js-sdk@portal:/Users/timo/Projects/matrix-js-sdk::locator=element-call%40workspace%3A."
dependencies:
"@babel/runtime": "npm:^7.12.5"
"@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.1.0"
@@ -10296,9 +10296,8 @@ __metadata:
sdp-transform: "npm:^2.14.1"
unhomoglyph: "npm:^1.0.6"
uuid: "npm:11"
checksum: 10c0/1db0d39cfbe4f1c69c8acda0ea7580a4819fc47a7d4bff057382e33e72d9a610f8c03043a6c00bc647dfdc2815aa643c69d25022fb759342a92b77e1841524f1
languageName: node
linkType: hard
linkType: soft
"matrix-widget-api@npm:^1.10.0, matrix-widget-api@npm:^1.13.0":
version: 1.13.1