Merge branch 'voip-team/multi-SFU' of github.com:element-hq/element-call into voip-team/multi-SFU

This commit is contained in:
Robin
2025-08-28 16:03:17 +02:00
2 changed files with 177 additions and 132 deletions

View File

@@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details.
*/
import {
connectedParticipantsObserver,
observeParticipantEvents,
observeParticipantMedia,
} from "@livekit/components-core";
@@ -21,7 +20,6 @@ import {
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import {
ClientEvent,
type MatrixClient,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
@@ -54,6 +52,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocus,
isLivekitFocusConfig,
type LivekitFocusConfig,
type MatrixRTCSession,
@@ -105,7 +104,6 @@ import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { type Behavior } from "./Behavior";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { defaultLiveKitOptions } from "../livekit/options";
import {
enterRTCSession,
@@ -114,6 +112,7 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { Connection, PublishConnection } from "./Connection";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -436,88 +435,6 @@ function getRoomMemberFromRtcMember(
return { id, member };
}
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
class Connection {
private readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.serviceUrl,
this.livekitAlias,
);
public async startSubscribing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
await this.livekitRoom.localParticipant.publishTrack(tracks[0]);
}
public async startPublishing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
if (!this.stopped) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: true,
video: true,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
}
}
private stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$ = (
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
memberships$,
]).pipe(
map(([participants, memberships]) => {
const publishingMembers = membershipsFocusUrl(
memberships,
this.matrixRTCSession,
)
.filter((f) => f.livekit_service_url === this.serviceUrl)
.map((f) => f.membership);
const publishingP = publishingMembers
.map((m) => {
return participants.find((p) => {
return p.identity === `${m.sender}:${m.deviceId}`;
});
})
.filter((p): p is RemoteParticipant => !!p);
return publishingP;
}),
),
[],
);
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
private readonly matrixRTCSession: MatrixRTCSession,
) {}
}
export class CallViewModel extends ViewModel {
private readonly e2eeOptions = getE2eeOptions(
this.options.encryptionSystem,
@@ -535,13 +452,13 @@ export class CallViewModel extends ViewModel {
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
new PublishConnection(
this.localConnectionLivekitRoom,
focus.livekit_service_url,
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
this.membershipsAndFocusMap$,
),
);
@@ -555,53 +472,67 @@ export class CallViewModel extends ViewModel {
),
);
private readonly foci$ = this.memberships$.pipe(
map(
(memberships) =>
new Set(
membershipsFocusUrl(memberships, this.matrixRTCSession).map(
(f) => f.livekit_service_url,
),
),
private readonly membershipsAndFocusMap$ = this.scope.behavior(
this.memberships$.pipe(
map((memberships) =>
memberships.flatMap((m) => {
const f = this.matrixRTCSession.resolveActiveFocus(m);
return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : [];
}),
),
),
);
private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe(
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
);
private readonly remoteConnections$ = this.scope.behavior(
combineLatest([this.localFocus, this.foci$]).pipe(
accumulate(new Map<string, Connection>(), (prev, [localFocus, foci]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focus of foci) {
if (focus !== localFocus.livekit_service_url) {
stopped.delete(focus);
combineLatest([this.localFocus, this.focusServiceUrls$]).pipe(
accumulate(
new Map<string, Connection>(),
(prev, [localFocus, focusUrls]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focusUrl of focusUrls) {
if (focusUrl !== localFocus.livekit_service_url) {
stopped.delete(focusUrl);
let nextConnection = prev.get(focus);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
focus,
);
nextConnection = new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
);
} else {
logger.log("SFU remoteConnections$ use prev connection: ", focus);
let nextConnection = prev.get(focusUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
focusUrl,
);
nextConnection = new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
{
livekit_service_url: focusUrl,
livekit_alias: this.livekitAlias,
type: "livekit",
},
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
focusUrl,
);
}
next.set(focusUrl, nextConnection);
}
next.set(focus, nextConnection);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
}),
for (const connection of stopped.values()) connection.stop();
return next;
},
),
),
);
@@ -713,11 +644,11 @@ export class CallViewModel extends ViewModel {
(localConnection, remoteConnections) => {
const remoteConnectionsParticipants = [
...remoteConnections.values(),
].map((c) => c.publishingParticipants$(this.memberships$));
].map((c) => c.publishingParticipants$);
return combineLatest(
[
localConnection.publishingParticipants$(this.memberships$),
localConnection.publishingParticipants$,
...remoteConnectionsParticipants,
],
(...ps) => ps.flat(1),
@@ -1765,7 +1696,7 @@ export class CallViewModel extends ViewModel {
.subscribe(
(c) =>
void c
.startPublishing()
.start()
// eslint-disable-next-line no-console
.then(() => console.log("successfully started publishing"))
// eslint-disable-next-line no-console
@@ -1774,7 +1705,7 @@ export class CallViewModel extends ViewModel {
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const connection of start) void connection.startSubscribing();
for (const connection of start) void connection.start();
for (const connection of stop) connection.stop();
});
combineLatest([this.localFocus, this.joined$])
@@ -1838,7 +1769,7 @@ export class CallViewModel extends ViewModel {
}
}
const membershipsFocusUrl = (
export const membershipsFocusUrl = (
memberships: CallMembership[],
matrixRTCSession: MatrixRTCSession,
): { livekit_service_url: string; membership: CallMembership }[] => {

114
src/state/Connection.ts Normal file
View File

@@ -0,0 +1,114 @@
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { connectedParticipantsObserver } from "@livekit/components-core";
import {
type Room as LivekitRoom,
type RemoteParticipant,
} from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk";
import {
type LivekitFocus,
type CallMembership,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, type Observable } from "rxjs";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope";
export class Connection {
protected readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.focus.livekit_service_url,
this.livekitAlias,
);
public async start(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
protected stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$: Observable<RemoteParticipant[]> =
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
this.membershipsFocusMap$,
]).pipe(
map(([participants, membershipsFocusMap]) =>
membershipsFocusMap
// Find all members that claim to publish on this connection
.flatMap(({ membership, focus }) =>
focus.livekit_service_url === this.focus.livekit_service_url
? [membership]
: [],
)
// Find all associated publishing livekit participant objects
.flatMap(({ sender, deviceId }) => {
const participant = participants.find(
(p) => p.identity === `${sender}:${deviceId}`,
);
return participant ? [participant] : [];
}),
),
),
[],
);
public constructor(
protected readonly livekitRoom: LivekitRoom,
protected readonly focus: LivekitFocus,
protected readonly livekitAlias: string,
protected readonly client: MatrixClient,
protected readonly scope: ObservableScope,
protected readonly membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitFocus }[]
>,
) {}
}
export class PublishConnection extends Connection {
public async start(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
if (!this.stopped) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: true,
video: true,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
}
}
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
}