From d5efba285b328ab80bc594e175c247f2496370c0 Mon Sep 17 00:00:00 2001 From: Robin Date: Thu, 16 Oct 2025 15:52:56 -0400 Subject: [PATCH] Fix resource leaks when we stop using a connection The execution of certain Observables related to a local or remote connection would continue even after we stopped caring about said connection because we were failing to give these state holders a proper ObservableScope of their own, separate from the CallViewModel's longer-lived scope. With this commit they now have scopes managed by generateKeyed$. --- src/state/CallViewModel.ts | 130 ++++++++++++++++----------------- src/state/Connection.ts | 4 + src/state/PublishConnection.ts | 2 +- 3 files changed, 66 insertions(+), 70 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6d7937de..e003ce3b 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -116,11 +116,7 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { - type Connection, - type ConnectionOpts, - RemoteConnection, -} from "./Connection"; +import { type Connection, RemoteConnection } from "./Connection"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; @@ -369,26 +365,36 @@ export class CallViewModel { */ private readonly localConnection$: Behavior | null> = this.scope.behavior( - this.localTransport$.pipe( - map( - (transport) => - transport && - mapAsync(transport, (transport) => { - const opts: ConnectionOpts = { - transport, - client: this.matrixRTCSession.room.client, - scope: this.scope, - remoteTransports$: this.remoteTransports$, - }; - return new PublishConnection( - opts, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ); - }), - ), + generateKeyed$< + Async | null, + PublishConnection, + Async | null + >( + this.localTransport$, + (transport, createOrGet) => + transport && + mapAsync(transport, (transport) => + createOrGet( + // Stable key that uniquely idenifies the transport + JSON.stringify({ + url: transport.livekit_service_url, + alias: transport.livekit_alias, + }), + (scope) => + new PublishConnection( + { + transport, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + }, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + ), + ), ), ); @@ -415,61 +421,47 @@ export class CallViewModel { * is *distinct* from the local transport. */ private readonly remoteConnections$ = this.scope.behavior( - this.transports$.pipe( - accumulate(new Map(), (prev, transports) => { - const next = new Map(); + generateKeyed$( + this.transports$, + (transports, createOrGet) => { + const connections: Connection[] = []; // Until the local transport becomes ready we have no idea which // transports will actually need a dedicated remote connection if (transports?.local.state === "ready") { - const oldestMembership = this.matrixRTCSession.getOldestMembership(); + // TODO: Handle custom transport.livekit_alias values here const localServiceUrl = transports.local.value.livekit_service_url; const remoteServiceUrls = new Set( - transports.remote.flatMap(({ membership, transport }) => { - const t = membership.getTransport(oldestMembership ?? membership); - return t && - isLivekitTransport(t) && - t.livekit_service_url !== localServiceUrl - ? [t.livekit_service_url] - : []; - }), + transports.remote.map( + ({ transport }) => transport.livekit_service_url, + ), ); + remoteServiceUrls.delete(localServiceUrl); - for (const remoteServiceUrl of remoteServiceUrls) { - let nextConnection = prev.get(remoteServiceUrl); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", + for (const remoteServiceUrl of remoteServiceUrls) + connections.push( + createOrGet( remoteServiceUrl, - ); - - const args: ConnectionOpts = { - transport: { - type: "livekit", - livekit_service_url: remoteServiceUrl, - livekit_alias: this.livekitAlias, - }, - client: this.matrixRTCSession.room.client, - scope: this.scope, - remoteTransports$: this.remoteTransports$, - }; - nextConnection = new RemoteConnection( - args, - this.e2eeLivekitOptions(), - ); - } else { - logger.log( - "SFU remoteConnections$ use prev connection: ", - remoteServiceUrl, - ); - } - next.set(remoteServiceUrl, nextConnection); - } + (scope) => + new RemoteConnection( + { + transport: { + type: "livekit", + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + }, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + }, + this.e2eeLivekitOptions(), + ), + ), + ); } - return next; - }), - map((transports) => [...transports.values()]), + return connections; + }, ), ); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 7b044e1d..17a5c4c7 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -21,6 +21,7 @@ import { type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; +import { logger } from "matrix-js-sdk/lib/logger"; import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; import { @@ -218,6 +219,9 @@ export class Connection { public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, ) { + logger.log( + `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, + ); const { transport, client, scope, remoteTransports$ } = opts; this.transport = transport; diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts index 1bb79211..3f01073f 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/PublishConnection.ts @@ -58,7 +58,7 @@ export class PublishConnection extends Connection { trackerProcessorState$: Behavior, ) { const { scope } = args; - logger.info("[LivekitRoom] Create LiveKit room"); + logger.info("[PublishConnection] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const factory =