From 802ebf828d5d6f19a00000675442d039df12ac14 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 28 Aug 2025 15:32:46 +0200 Subject: [PATCH] refactor connection Signed-off-by: Timo K --- src/state/CallViewModel.ts | 101 +++++++++++++++++++++---------------- src/state/Connection.ts | 50 +++++++++--------- 2 files changed, 82 insertions(+), 69 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 03463141..2cbaf738 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -63,6 +63,7 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, + isLivekitFocus, isLivekitFocusConfig, type LivekitFocusConfig, type MatrixRTCSession, @@ -476,11 +477,11 @@ export class CallViewModel extends ViewModel { (focus) => new PublishConnection( this.localConnectionLivekitRoom, - focus.livekit_service_url, + focus, this.livekitAlias, this.matrixRTCSession.room.client, this.scope, - this.matrixRTCSession, + this.membershipsAndFocusMap$, ), ); @@ -494,53 +495,67 @@ export class CallViewModel extends ViewModel { ), ); - private readonly foci$ = this.memberships$.pipe( - map( - (memberships) => - new Set( - membershipsFocusUrl(memberships, this.matrixRTCSession).map( - (f) => f.livekit_service_url, - ), - ), + private readonly membershipsAndFocusMap$ = this.scope.behavior( + this.memberships$.pipe( + map((memberships) => + memberships.flatMap((m) => { + const f = this.matrixRTCSession.resolveActiveFocus(m); + return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : []; + }), + ), ), ); + private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe( + map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), + ); + private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.foci$]).pipe( - accumulate(new Map(), (prev, [localFocus, foci]) => { - const stopped = new Map(prev); - const next = new Map(); - for (const focus of foci) { - if (focus !== localFocus.livekit_service_url) { - stopped.delete(focus); + combineLatest([this.localFocus, this.focusServiceUrls$]).pipe( + accumulate( + new Map(), + (prev, [localFocus, focusUrls]) => { + const stopped = new Map(prev); + const next = new Map(); + for (const focusUrl of focusUrls) { + if (focusUrl !== localFocus.livekit_service_url) { + stopped.delete(focusUrl); - let nextConnection = prev.get(focus); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", - focus, - ); - nextConnection = new Connection( - new LivekitRoom({ - ...defaultLiveKitOptions, - e2ee: this.e2eeOptions, - }), - focus, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.matrixRTCSession, - ); - } else { - logger.log("SFU remoteConnections$ use prev connection: ", focus); + let nextConnection = prev.get(focusUrl); + if (!nextConnection) { + logger.log( + "SFU remoteConnections$ construct new connection: ", + focusUrl, + ); + nextConnection = new Connection( + new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }), + { + livekit_service_url: focusUrl, + livekit_alias: this.livekitAlias, + type: "livekit", + }, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.membershipsAndFocusMap$, + ); + } else { + logger.log( + "SFU remoteConnections$ use prev connection: ", + focusUrl, + ); + } + next.set(focusUrl, nextConnection); } - next.set(focus, nextConnection); } - } - for (const connection of stopped.values()) connection.stop(); - return next; - }), + for (const connection of stopped.values()) connection.stop(); + return next; + }, + ), ), ); @@ -652,11 +667,11 @@ export class CallViewModel extends ViewModel { (localConnection, remoteConnections) => { const remoteConnectionsParticipants = [ ...remoteConnections.values(), - ].map((c) => c.publishingParticipants$(this.memberships$)); + ].map((c) => c.publishingParticipants$); return combineLatest( [ - localConnection.publishingParticipants$(this.memberships$), + localConnection.publishingParticipants$, ...remoteConnectionsParticipants, ], (...ps) => ps.flat(1), diff --git a/src/state/Connection.ts b/src/state/Connection.ts index ff5ebb64..6e114603 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -13,20 +13,19 @@ import { } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; import { + type LivekitFocus, type CallMembership, - type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, type Observable } from "rxjs"; import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; -import { membershipsFocusUrl } from "./CallViewModel"; import { type ObservableScope } from "./ObservableScope"; export class Connection { protected readonly sfuConfig = getSFUConfigWithOpenID( this.client, - this.serviceUrl, + this.focus.livekit_service_url, this.livekitAlias, ); @@ -48,42 +47,41 @@ export class Connection { [], ); - public readonly publishingParticipants$ = ( - memberships$: Behavior, - ): Observable => + public readonly publishingParticipants$: Observable = this.scope.behavior( combineLatest([ connectedParticipantsObserver(this.livekitRoom), - memberships$, + this.membershipsFocusMap$, ]).pipe( - map(([participants, memberships]) => { - const publishingMembers = membershipsFocusUrl( - memberships, - this.matrixRTCSession, - ) - .filter((f) => f.livekit_service_url === this.serviceUrl) - .map((f) => f.membership); - - const publishingP = publishingMembers - .map((m) => { - return participants.find((p) => { - return p.identity === `${m.sender}:${m.deviceId}`; - }); - }) - .filter((p): p is RemoteParticipant => !!p); - return publishingP; - }), + map(([participants, membershipsFocusMap]) => + membershipsFocusMap + // Find all members that claim to publish on this connection + .flatMap(({ membership, focus }) => + focus.livekit_service_url === this.focus.livekit_service_url + ? [membership] + : [], + ) + // Find all associated publishing livekit participant objects + .flatMap(({ sender, deviceId }) => { + const participant = participants.find( + (p) => p.identity === `${sender}:${deviceId}`, + ); + return participant ? [participant] : []; + }), + ), ), [], ); public constructor( protected readonly livekitRoom: LivekitRoom, - protected readonly serviceUrl: string, + protected readonly focus: LivekitFocus, protected readonly livekitAlias: string, protected readonly client: MatrixClient, protected readonly scope: ObservableScope, - protected readonly matrixRTCSession: MatrixRTCSession, + protected readonly membershipsFocusMap$: Behavior< + { membership: CallMembership; focus: LivekitFocus }[] + >, ) {} }