refactor: grug down the connection manager

This commit is contained in:
Valere
2026-04-14 10:18:55 +02:00
parent 23eadfcc9f
commit a95d77b90a
2 changed files with 241 additions and 136 deletions

View File

@@ -52,7 +52,7 @@ beforeEach(() => {
transport,
remoteParticipants$: new BehaviorSubject([]),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).start = vi.fn().mockResolvedValue(undefined);
vi.mocked(mockConnection).stop = vi.fn();
// Tie the connection's lifecycle to the scope to test scope lifecycle management
scope.onEnd(() => {
@@ -235,7 +235,9 @@ describe("connectionManagerData$ stream", () => {
transport,
remoteParticipants$: getRemoteParticipantsFor(transport),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).start = vi
.fn()
.mockResolvedValue(undefined);
vi.mocked(mockConnection).stop = vi.fn();
// Tie the connection's lifecycle to the scope to test scope lifecycle management
scope.onEnd(() => {

View File

@@ -7,7 +7,14 @@ Please see LICENSE in the repository root for full details.
*/
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap } from "rxjs";
import {
combineLatest,
map,
type Observable,
of,
scan,
switchMap,
} from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type RemoteParticipant } from "livekit-client";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
@@ -15,14 +22,11 @@ import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/En
import { type Behavior } from "../../Behavior.ts";
import { type Connection } from "./Connection.ts";
import { Epoch, type ObservableScope } from "../../ObservableScope.ts";
import { generateItemsWithEpoch } from "../../../utils/observable.ts";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
import {
isLocalTransportWithSFUConfig,
type LocalTransportWithSFUConfig,
} from "../localMember/LocalTransport.ts";
import { type SFUConfig } from "../../../livekit/openIDSFU.ts";
export class ConnectionManagerData {
private readonly store: Map<
@@ -80,11 +84,19 @@ interface Props {
ownMembershipIdentity: CallMembershipIdentityParts;
}
// TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager {
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
}
/**
* Incremental state based on prev/current transports and connections.
*/
interface ScannedState {
managedTransports: LivekitTransportConfig[];
managedConnections: Connection[];
epoch: number;
}
/**
* Crete a `ConnectionManager`
* @param props - Configuration object
@@ -114,144 +126,66 @@ export function createConnectionManager$({
ownMembershipIdentity,
}: Props): IConnectionManager {
const logger = parentLogger.getChild("[ConnectionManager]");
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
/**
* All transports currently managed by the ConnectionManager.
*
* This list does not include duplicate transports.
*
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
const localAndRemoteTransports$: Behavior<
Epoch<(LivekitTransportConfig | LocalTransportWithSFUConfig)[]>
> = scope.behavior(
combineLatest([remoteTransports$, localTransport$]).pipe(
// Combine local and remote transports into one transport array
// and set the forceOldJwtEndpoint property on the local transport
map(([remoteTransports, localTransport]) => {
let localTransportAsArray: LocalTransportWithSFUConfig[] = [];
if (localTransport) {
localTransportAsArray = [localTransport];
}
const dedupedRemote = removeDuplicateTransports(remoteTransports.value);
const remoteWithoutLocal = dedupedRemote.filter(
(transport) =>
!localTransportAsArray.find((l) =>
areLivekitTransportsEqual(l.transport, transport),
),
);
logger.debug(
"remoteWithoutLocal",
remoteWithoutLocal,
"localTransportAsArray",
localTransportAsArray,
);
return new Epoch(
[...localTransportAsArray, ...remoteWithoutLocal],
remoteTransports.epoch,
);
}),
),
// De-duplicate the list of transports and flatten it into a single list.
// The connection manager should only create one connection per unique transport config,
// even if multiple session members are using the same transport.
const localAndRemoteTransports$ = getLocalAndRemoteTransports$(
scope,
remoteTransports$,
localTransport$,
);
/**
* Connections for each transport in use by one or more session members.
*/
const connections$ = scope.behavior(
localAndRemoteTransports$.pipe(
generateItemsWithEpoch(
"ConnectionManager connections$",
function* (transports) {
for (const transport of transports) {
if (isLocalTransportWithSFUConfig(transport)) {
// This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field.
yield {
keys: [
transport.transport.livekit_service_url,
transport.sfuConfig,
],
data: undefined,
};
} else {
yield {
keys: [
transport.livekit_service_url,
undefined as SFUConfig | undefined,
],
data: undefined,
};
}
}
},
(scope, _data$, serviceUrl, sfuConfig) => {
const connection = connectionFactory.createConnection(
scope,
{
type: "livekit",
livekit_service_url: serviceUrl,
},
ownMembershipIdentity,
logger,
// TODO: This whole optional SFUConfig parameter is not particularly elegant.
// I would like it if connections always fetched the SFUConfig by themselves.
sfuConfig,
// Create and start connections for each transport.
// Incrementally checks for new and removed transports and stop and remove connections accordingly.
const state$ = scanInternalState$(
scope,
localAndRemoteTransports$,
ownMembershipIdentity,
connectionFactory,
logger,
);
const connectionManagerData$ = state$.pipe(
switchMap((state) => {
// Map each connection to a stream of {connection, participants}
const connectionWithParticipants$ = state.managedConnections.map(
(connection) => {
return connection.remoteParticipants$.pipe(
map((participants) => ({
connection,
participants,
})),
);
// Start the connection immediately
// Use connection state to track connection progress
void connection.start();
// TODO subscribe to connection state to retry or log issues?
return connection;
},
),
),
);
// Handle empty case
if (connectionWithParticipants$.length === 0) {
return of(new Epoch(new ConnectionManagerData(), state.epoch));
}
// Combine all the streams and reduce into ConnectionManagerData
return combineLatest(connectionWithParticipants$).pipe(
map((items) => {
const data = new ConnectionManagerData();
items.forEach(({ connection, participants }) => {
data.add(connection, participants);
});
return new Epoch(data, state.epoch);
}),
);
}),
);
const connectionManagerData$ = scope.behavior(
connections$.pipe(
switchMap((connections) => {
const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithRemoteParticipants = connections.value.map(
(connection) => {
return connection.remoteParticipants$.pipe(
map((participants) => ({
connection,
participants,
})),
);
},
);
// probably not required
if (listOfConnectionsWithRemoteParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
map(
(lists) =>
new Epoch(
lists.reduce((data, { connection, participants }) => {
data.add(connection, participants);
return data;
}, new ConnectionManagerData()),
epoch,
),
),
);
}),
),
new Epoch(new ConnectionManagerData(), -1),
);
return { connectionManagerData$ };
return { connectionManagerData$: scope.behavior(connectionManagerData$) };
}
/*
Each member sends its transport as part of the MatrixRTC membership.
The connection manager will create a connection for each unique transport,
even if multiple session members are using the same transport.
*/
function removeDuplicateTransports<T extends LivekitTransportConfig>(
transports: T[],
): T[] {
@@ -261,3 +195,172 @@ function removeDuplicateTransports<T extends LivekitTransportConfig>(
return acc;
}, [] as T[]);
}
type TransportsData = {
local: LocalTransportWithSFUConfig | null;
remotes: LivekitTransportConfig[];
};
/**
* All transports currently managed by the ConnectionManager.
*
* This list does not include duplicate transports.
*
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
function getLocalAndRemoteTransports$(
scope: ObservableScope,
remoteTransports$: Behavior<Epoch<LivekitTransportConfig[]>>,
localTransport$: Behavior<LocalTransportWithSFUConfig | null>,
): Behavior<Epoch<TransportsData>> {
return scope.behavior(
combineLatest([remoteTransports$, localTransport$]).pipe(
map(([remoteTransports, localTransport]) => {
// Get the unique transports we have to connect to
const dedupedRemote = removeDuplicateTransports(remoteTransports.value);
// For clarity do not include the local transport in the remote list.
const remoteWithoutLocal = dedupedRemote.filter(
(transport) =>
!areLivekitTransportsEqual(
localTransport?.transport ?? null,
transport,
),
);
return new Epoch(
{
local: localTransport,
remotes: remoteWithoutLocal,
},
remoteTransports.epoch,
);
}),
),
);
}
/**
* Monitors the list of transports and creates and stops connections accordingly.
*
* It will automatically:
* - Creates new connections when transports are added
* - Removes and stops connections when transports are removed;
*
* Returns a state object that contains the list of managed transports and connections.
*/
function scanInternalState$(
scope: ObservableScope,
localAndRemoteTransports$: Behavior<Epoch<TransportsData>>,
ownMembershipIdentity: CallMembershipIdentityParts,
connectionFactory: ConnectionFactory,
logger: Logger,
): Observable<ScannedState> {
const initialState: ScannedState = {
managedTransports: [],
managedConnections: [],
epoch: -1,
};
return localAndRemoteTransports$.pipe(
scan((state: ScannedState, transportsEpoch) => {
const transports = transportsEpoch.value;
// XXX do we need to handle the case where a remote transport is promoted to local?
// If so, we could add more info to the state and use that to decide whether to create a new connection or not.
// Combine local and remote transports into one transport array
const currentTransports = [
...(transports.local ? [transports.local.transport] : []),
...transports.remotes,
];
// Find new and removed transports
const { addedTransports, removedTransports } = computeTransportDiff(
currentTransports,
state.managedTransports,
);
if (removedTransports.length > 0) {
logger.debug("Removed transports detected :", removedTransports);
// stop connections for removed transports
removedTransports.forEach((transport) => {
const removedCo = state.managedConnections.find((connection) =>
areLivekitTransportsEqual(connection.transport, transport),
);
if (removedCo) {
void removedCo.stop();
}
});
}
// Remove all connections for removed transports
const remainingConnections = state.managedConnections.filter(
(connection) => {
return !removedTransports.some((transport) =>
areLivekitTransportsEqual(connection.transport, transport),
);
},
);
let addedConnections: Connection[] = [];
if (addedTransports.length > 0) {
logger.debug("New transports detected", addedTransports);
addedConnections = addedTransports.map((transport) => {
// let's create a connection for each transport
const connection = connectionFactory.createConnection(
scope,
transport,
ownMembershipIdentity,
logger,
transports.local?.transport?.livekit_service_url ===
transport.livekit_service_url
? transports.local?.sfuConfig
: undefined,
);
// start the connection immediately
connection.start().catch((e) => {
logger.error("Failed to start connection", e);
});
// TODO subscribe to connection state to retry or log issues?
return connection;
});
}
return {
managedTransports: currentTransports,
managedConnections: [...remainingConnections, ...addedConnections],
epoch: transportsEpoch.epoch,
};
}, initialState),
);
}
/**
* Utility function to compute the difference between two lists of transports.
* It returns the transports that are in the current list but not in the previous list (addedTransports)
* and the transports that are in the previous list but not in the current list (removedTransports).
* @param currentTransports - The current list of transports.
* @param prevTransports - The previous list of transports.
*/
function computeTransportDiff(
currentTransports: LivekitTransportConfig[],
prevTransports: LivekitTransportConfig[],
): {
addedTransports: LivekitTransportConfig[];
removedTransports: LivekitTransportConfig[];
} {
const newTransports = currentTransports.filter(
(current) =>
!prevTransports.some((prev) => areLivekitTransportsEqual(prev, current)),
);
const removedTransports = prevTransports.filter(
(prev) =>
!currentTransports.some((current) =>
areLivekitTransportsEqual(prev, current),
),
);
return { addedTransports: newTransports, removedTransports };
}