refactor connection

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-28 15:32:46 +02:00
parent 1e8db09868
commit a190c4e491
2 changed files with 82 additions and 69 deletions

View File

@@ -52,6 +52,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocus,
isLivekitFocusConfig,
type LivekitFocusConfig,
type MatrixRTCSession,
@@ -453,11 +454,11 @@ export class CallViewModel extends ViewModel {
(focus) =>
new PublishConnection(
this.localConnectionLivekitRoom,
focus.livekit_service_url,
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
this.membershipsAndFocusMap$,
),
);
@@ -471,53 +472,67 @@ export class CallViewModel extends ViewModel {
),
);
private readonly foci$ = this.memberships$.pipe(
map(
(memberships) =>
new Set(
membershipsFocusUrl(memberships, this.matrixRTCSession).map(
(f) => f.livekit_service_url,
),
),
private readonly membershipsAndFocusMap$ = this.scope.behavior(
this.memberships$.pipe(
map((memberships) =>
memberships.flatMap((m) => {
const f = this.matrixRTCSession.resolveActiveFocus(m);
return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : [];
}),
),
),
);
private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe(
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
);
private readonly remoteConnections$ = this.scope.behavior(
combineLatest([this.localFocus, this.foci$]).pipe(
accumulate(new Map<string, Connection>(), (prev, [localFocus, foci]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focus of foci) {
if (focus !== localFocus.livekit_service_url) {
stopped.delete(focus);
combineLatest([this.localFocus, this.focusServiceUrls$]).pipe(
accumulate(
new Map<string, Connection>(),
(prev, [localFocus, focusUrls]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focusUrl of focusUrls) {
if (focusUrl !== localFocus.livekit_service_url) {
stopped.delete(focusUrl);
let nextConnection = prev.get(focus);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
focus,
);
nextConnection = new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
);
} else {
logger.log("SFU remoteConnections$ use prev connection: ", focus);
let nextConnection = prev.get(focusUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
focusUrl,
);
nextConnection = new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
{
livekit_service_url: focusUrl,
livekit_alias: this.livekitAlias,
type: "livekit",
},
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
focusUrl,
);
}
next.set(focusUrl, nextConnection);
}
next.set(focus, nextConnection);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
}),
for (const connection of stopped.values()) connection.stop();
return next;
},
),
),
);
@@ -629,11 +644,11 @@ export class CallViewModel extends ViewModel {
(localConnection, remoteConnections) => {
const remoteConnectionsParticipants = [
...remoteConnections.values(),
].map((c) => c.publishingParticipants$(this.memberships$));
].map((c) => c.publishingParticipants$);
return combineLatest(
[
localConnection.publishingParticipants$(this.memberships$),
localConnection.publishingParticipants$,
...remoteConnectionsParticipants,
],
(...ps) => ps.flat(1),