Fix resource leaks when we stop using a connection

The execution of certain Observables related to a local or remote connection would continue even after we stopped caring about said connection because we were failing to give these state holders a proper ObservableScope of their own, separate from the CallViewModel's longer-lived scope. With this commit they now have scopes managed by generateKeyed$.
This commit is contained in:
Robin
2025-10-16 15:52:56 -04:00
parent 717c7420f9
commit d5efba285b
3 changed files with 66 additions and 70 deletions

View File

@@ -116,11 +116,7 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import {
type Connection,
type ConnectionOpts,
RemoteConnection,
} from "./Connection";
import { type Connection, RemoteConnection } from "./Connection";
import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
@@ -369,26 +365,36 @@ export class CallViewModel {
*/
private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
this.scope.behavior(
this.localTransport$.pipe(
map(
(transport) =>
transport &&
mapAsync(transport, (transport) => {
const opts: ConnectionOpts = {
transport,
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
};
return new PublishConnection(
opts,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
);
}),
),
generateKeyed$<
Async<LivekitTransport> | null,
PublishConnection,
Async<PublishConnection> | null
>(
this.localTransport$,
(transport, createOrGet) =>
transport &&
mapAsync(transport, (transport) =>
createOrGet(
// Stable key that uniquely idenifies the transport
JSON.stringify({
url: transport.livekit_service_url,
alias: transport.livekit_alias,
}),
(scope) =>
new PublishConnection(
{
transport,
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
},
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
),
),
),
);
@@ -415,61 +421,47 @@ export class CallViewModel {
* is *distinct* from the local transport.
*/
private readonly remoteConnections$ = this.scope.behavior(
this.transports$.pipe(
accumulate(new Map<string, Connection>(), (prev, transports) => {
const next = new Map<string, Connection>();
generateKeyed$<typeof this.transports$.value, Connection, Connection[]>(
this.transports$,
(transports, createOrGet) => {
const connections: Connection[] = [];
// Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") {
const oldestMembership = this.matrixRTCSession.getOldestMembership();
// TODO: Handle custom transport.livekit_alias values here
const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set(
transports.remote.flatMap(({ membership, transport }) => {
const t = membership.getTransport(oldestMembership ?? membership);
return t &&
isLivekitTransport(t) &&
t.livekit_service_url !== localServiceUrl
? [t.livekit_service_url]
: [];
}),
transports.remote.map(
({ transport }) => transport.livekit_service_url,
),
);
remoteServiceUrls.delete(localServiceUrl);
for (const remoteServiceUrl of remoteServiceUrls) {
let nextConnection = prev.get(remoteServiceUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
for (const remoteServiceUrl of remoteServiceUrls)
connections.push(
createOrGet(
remoteServiceUrl,
);
const args: ConnectionOpts = {
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
},
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
};
nextConnection = new RemoteConnection(
args,
this.e2eeLivekitOptions(),
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
remoteServiceUrl,
);
}
next.set(remoteServiceUrl, nextConnection);
}
(scope) =>
new RemoteConnection(
{
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
},
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
},
this.e2eeLivekitOptions(),
),
),
);
}
return next;
}),
map((transports) => [...transports.values()]),
return connections;
},
),
);

View File

@@ -21,6 +21,7 @@ import {
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { BehaviorSubject, combineLatest, type Observable } from "rxjs";
import {
@@ -218,6 +219,9 @@ export class Connection {
public readonly livekitRoom: LivekitRoom,
opts: ConnectionOpts,
) {
logger.log(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope, remoteTransports$ } = opts;
this.transport = transport;

View File

@@ -58,7 +58,7 @@ export class PublishConnection extends Connection {
trackerProcessorState$: Behavior<ProcessorState>,
) {
const { scope } = args;
logger.info("[LivekitRoom] Create LiveKit room");
logger.info("[PublishConnection] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams();
const factory =