use createSomething$ mathods instead of classes

Rename several classes/behaviors to factory-style creators and adapt
call wiring and tests accordingly:
- Replace ConnectionManager class with createConnectionManager$ which
  returns transports$, connectionManagerData$, connections$
- Convert MatrixLivekitMerger to createMatrixLivekitMembers$
  (matrixLivekitMerger$)
- Rename sessionBehaviors$, localMembership$, localTransport$ to
  createSessionMembershipsAndTransports$, createLocalMembership$,
  createLocalTransport$
- Adjust participant types and hook up connectOptions$; expose join via
  localMembership.requestConnect
- Update tests to use the new factory APIs
This commit is contained in:
Timo K
2025-11-05 18:57:24 +01:00
parent 4d0de2fb71
commit c19e2245c8
10 changed files with 167 additions and 163 deletions

View File

@@ -110,15 +110,12 @@ import {
} from "./layout-types.ts";
import { type ElementCallError } from "../utils/errors.ts";
import { type ObservableScope } from "./ObservableScope.ts";
import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts";
import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts";
import {
localMembership$,
type LocalMemberState,
} from "./localMember/LocalMembership.ts";
import { localTransport$ as computeLocalTransport$ } from "./localMember/LocalTransport.ts";
import { sessionBehaviors$ } from "./SessionBehaviors.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts";
import { createLocalMembership$ } from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import { createSessionMembershipsAndTransports$ } from "./SessionBehaviors.ts";
import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts";
import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts";
//TODO
// Larger rename
@@ -192,13 +189,13 @@ export class CallViewModel {
}
: undefined;
private sessionBehaviors = sessionBehaviors$({
private sessionBehaviors = createSessionMembershipsAndTransports$({
scope: this.scope,
matrixRTCSession: this.matrixRTCSession,
});
private memberships$ = this.sessionBehaviors.memberships$;
private localTransport$ = computeLocalTransport$({
private localTransport$ = createLocalTransport$({
scope: this.scope,
memberships$: this.memberships$,
client: this.matrixRoom.client,
@@ -229,25 +226,34 @@ export class CallViewModel {
),
);
private connectionManager = new ConnectionManager(
this.scope,
this.connectionFactory,
this.allTransports$,
);
private connectionManager = createConnectionManager$({
scope: this.scope,
connectionFactory: this.connectionFactory,
inputTransports$: this.allTransports$,
});
// ------------------------------------------------------------------------
private matrixLivekitMerger = new MatrixLivekitMerger(
this.scope,
this.sessionBehaviors.membershipsWithTransport$,
this.connectionManager,
this.matrixRoom,
this.userId,
this.deviceId,
);
private matrixLivekitMembers$ = this.matrixLivekitMerger.matrixLivekitMember$;
private matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: this.scope,
membershipsWithTransport$: this.sessionBehaviors.membershipsWithTransport$,
connectionManager: this.connectionManager,
matrixRoom: this.matrixRoom,
userId: this.userId,
deviceId: this.deviceId,
});
private localMembership = localMembership$({
private connectOptions$ = this.scope.behavior(
matrixRTCMode.value$.pipe(
map((mode) => ({
encryptMedia: this.e2eeLivekitOptions !== undefined,
// TODO. This might need to get called again on each cahnge of matrixRTCMode...
matrixRTCMode: mode,
})),
),
);
private localMembership = createLocalMembership$({
scope: this.scope,
muteStates: this.muteStates,
mediaDevices: this.mediaDevices,
@@ -258,6 +264,7 @@ export class CallViewModel {
e2eeLivekitOptions: this.e2eeLivekitOptions,
trackProcessorState$: this.trackProcessorState$,
widget,
options: this.connectOptions$,
});
/**
@@ -269,13 +276,7 @@ export class CallViewModel {
return this.localMembership.configError$;
}
public join(): LocalMemberState {
return this.localMembership.requestConnect({
encryptMedia: this.e2eeLivekitOptions !== undefined,
// TODO. This might need to get called again on each cahnge of matrixRTCMode...
matrixRTCMode: matrixRTCMode.getValue(),
});
}
public join = this.localMembership.requestConnect;
// CODESMELL?
// This is functionally the same Observable as leave$, except here it's

View File

@@ -22,24 +22,16 @@ interface Props {
matrixRTCSession: MatrixRTCSession;
}
/**
* Wraps behaviors that we extract from an matrixRTCSession.
*/
interface RxRtcSession {
/**
* some prop
*/
export const createSessionMembershipsAndTransports$ = ({
scope,
matrixRTCSession,
}: Props): {
memberships$: Behavior<CallMembership[]>;
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
transports$: Behavior<LivekitTransport[]>;
}
export const sessionBehaviors$ = ({
scope,
matrixRTCSession,
}: Props): RxRtcSession => {
} => {
const memberships$ = scope.behavior(
fromEvent(
matrixRTCSession,

View File

@@ -28,7 +28,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior";
import { type ConnectionManager } from "../remoteMembers/ConnectionManager";
import { type createConnectionManager$ } from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../ObservableScope";
import { Publisher } from "./Publisher";
import { type MuteStates } from "../MuteStates";
@@ -90,7 +90,7 @@ interface Props {
scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates;
connectionManager: ConnectionManager;
connectionManager: ReturnType<typeof createConnectionManager$>;
matrixRTCSession: MatrixRTCSession;
matrixRoom: MatrixRoom;
localTransport$: Behavior<LivekitTransport | undefined>;
@@ -111,7 +111,7 @@ interface Props {
* - transport$: the transport object the ownMembership$ ended up using.
*
*/
export const localMembership$ = ({
export const createLocalMembership$ = ({
scope,
options,
muteStates,
@@ -151,13 +151,14 @@ export const localMembership$ = ({
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
const connection$ = scope.behavior(
combineLatest([connectionManager.connections$, localTransport$]).pipe(
map(([connections, transport]) => {
combineLatest(
[connectionManager.connections$, localTransport$],
(connections, transport) => {
if (transport === undefined) return undefined;
return connections.find((connection) =>
areLivekitTransportsEqual(connection.transport, transport),
);
}),
},
),
);
/**

View File

@@ -50,7 +50,7 @@ interface Props {
* @prop useOldestMember Whether to use the same transport as the oldest member.
* This will only update once the first oldest member appears. Will not recompute if the oldest member leaves.
*/
export const localTransport$ = ({
export const createLocalTransport$ = ({
scope,
memberships$,
client,

View File

@@ -13,7 +13,8 @@ import {
ConnectionError,
type ConnectionState as LivekitConenctionState,
type Room as LivekitRoom,
type Participant,
type LocalParticipant,
type RemoteParticipant,
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
@@ -32,7 +33,7 @@ import {
SFURoomCreationRestrictedError,
} from "../../utils/errors.ts";
export type PublishingParticipant = Participant;
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
export interface ConnectionOpts {
/** The media transport to connect to. */

View File

@@ -6,13 +6,12 @@ Please see LICENSE in the repository root for full details.
*/
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { logger } from "matrix-js-sdk/lib/logger";
import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { ObservableScope } from "../ObservableScope.ts";
import { ConnectionManager } from "./ConnectionManager.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
import { type Connection } from "./Connection.ts";
import { areLivekitTransportsEqual } from "./matrixLivekitMerger.ts";
@@ -37,15 +36,15 @@ const TRANSPORT_2: LivekitTransport = {
// livekit_service_url: "https://lk-other.sample.com",
// livekit_alias: "!alias:sample.com",
// };
let testScope: ObservableScope;
let fakeConnectionFactory: ConnectionFactory;
let testScope: ObservableScope;
let testTransportStream$: BehaviorSubject<LivekitTransport[]>;
// The connection manager under test
let manager: ConnectionManager;
let connectionManagerInputs: {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: BehaviorSubject<LivekitTransport[]>;
};
let manager: ReturnType<typeof createConnectionManager$>;
beforeEach(() => {
testScope = new ObservableScope();
@@ -68,9 +67,12 @@ beforeEach(() => {
);
testTransportStream$ = new BehaviorSubject<LivekitTransport[]>([]);
manager = new ConnectionManager(testScope, fakeConnectionFactory, logger);
manager.registerTransports(testTransportStream$);
connectionManagerInputs = {
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: testTransportStream$,
};
manager = createConnectionManager$(connectionManagerInputs);
});
afterEach(() => {
@@ -84,7 +86,7 @@ describe("connections$ stream", () => {
if (connections.length > 0) managedConnections.resolve(connections);
});
testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]);
connectionManagerInputs.inputTransports$.next([TRANSPORT_1, TRANSPORT_2]);
const connections = await managedConnections.promise;
@@ -211,11 +213,13 @@ describe("connectionManagerData$ stream", () => {
test("Should report connections with the publishing participants", () => {
withTestScheduler(({ expectObservable, schedule, behavior }) => {
manager.registerTransports(
behavior("a", {
manager = createConnectionManager$({
...connectionManagerInputs,
inputTransports$: behavior("a", {
a: [TRANSPORT_1, TRANSPORT_2],
}),
);
});
const conn1Participants$ = fakePublishingParticipantsStreams.get(
keyForTransport(TRANSPORT_1),
)!;

View File

@@ -14,8 +14,8 @@ import {
type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs";
import { logger, type Logger } from "matrix-js-sdk/lib/logger";
import { type Participant as LivekitParticipant } from "livekit-client";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../Behavior";
import { type Connection } from "./Connection";
@@ -25,12 +25,17 @@ import { areLivekitTransportsEqual } from "./matrixLivekitMerger";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<string, [Connection, LivekitParticipant[]]> =
new Map();
private readonly store: Map<
string,
[Connection, (LocalParticipant | RemoteParticipant)[]]
> = new Map();
public constructor() {}
public add(connection: Connection, participants: LivekitParticipant[]): void {
public add(
connection: Connection,
participants: (LocalParticipant | RemoteParticipant)[],
): void {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
@@ -56,7 +61,7 @@ export class ConnectionManagerData {
public getParticipantForTransport(
transport: LivekitTransport,
): LivekitParticipant[] {
): (LocalParticipant | RemoteParticipant)[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key);
if (existing) {
@@ -82,35 +87,41 @@ export class ConnectionManagerData {
return connections;
}
}
interface Props {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: Behavior<LivekitTransport[]>;
}
// TODO - write test for scopes (do we really need to bind scope)
export class ConnectionManager {
private readonly logger: Logger;
private running$ = new BehaviorSubject(true);
/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
* @param connectionFactory used to create new connections.
* @param _transportsSubscriptions$ A list of Behaviors each containing a LIST of LivekitTransport.
* Each of these behaviors can be interpreted as subscribed list of transports.
*
* Using `registerTransports` independent external modules can control what connections
* are created by the ConnectionManager.
*
* The connection manager will remove all duplicate transports in each subscibed list.
*
* See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe.
*/
public constructor(
private readonly scope: ObservableScope,
private readonly connectionFactory: ConnectionFactory,
private readonly inputTransports$: Behavior<LivekitTransport[]>,
) {
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
this.logger = logger.getChild("ConnectionManager");
scope.onEnd(() => this.running$.next(false));
}
/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
* @param connectionFactory used to create new connections.
* @param _transportsSubscriptions$ A list of Behaviors each containing a LIST of LivekitTransport.
* Each of these behaviors can be interpreted as subscribed list of transports.
*
* Using `registerTransports` independent external modules can control what connections
* are created by the ConnectionManager.
*
* The connection manager will remove all duplicate transports in each subscibed list.
*
* See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe.
*/
export function createConnectionManager$({
scope,
connectionFactory,
inputTransports$,
}: Props): {
transports$: Behavior<LivekitTransport[]>;
connectionManagerData$: Behavior<ConnectionManagerData>;
connections$: Behavior<Connection[]>;
} {
const logger = rootLogger.getChild("ConnectionManager");
const running$ = new BehaviorSubject(true);
scope.onEnd(() => running$.next(false));
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
/**
* All transports currently managed by the ConnectionManager.
@@ -120,8 +131,8 @@ export class ConnectionManager {
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
private readonly transports$ = this.scope.behavior(
combineLatest([this.running$, this.inputTransports$]).pipe(
const transports$ = scope.behavior(
combineLatest([running$, inputTransports$]).pipe(
map(([running, transports]) => (running ? transports : [])),
map(removeDuplicateTransports),
),
@@ -130,19 +141,19 @@ export class ConnectionManager {
/**
* Connections for each transport in use by one or more session members.
*/
public readonly connections$ = this.scope.behavior(
const connections$ = scope.behavior(
generateKeyed$<LivekitTransport[], Connection, Connection[]>(
this.transports$,
transports$,
(transports, createOrGet) => {
const createConnection =
(
transport: LivekitTransport,
): ((scope: ObservableScope) => Connection) =>
(scope) => {
const connection = this.connectionFactory.createConnection(
const connection = connectionFactory.createConnection(
transport,
scope,
this.logger,
logger,
);
// Start the connection immediately
// Use connection state to track connection progress
@@ -160,9 +171,9 @@ export class ConnectionManager {
),
);
public connectionManagerData$: Behavior<ConnectionManagerData> =
this.scope.behavior(
this.connections$.pipe(
const connectionManagerData$: Behavior<ConnectionManagerData> =
scope.behavior(
connections$.pipe(
switchMap((connections) => {
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants = connections.map(
@@ -191,6 +202,7 @@ export class ConnectionManager {
// start empty
new ConnectionManagerData(),
);
return { transports$, connectionManagerData$, connections$ };
}
function removeDuplicateTransports(

View File

@@ -24,7 +24,7 @@ import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils";
import {
type MatrixLivekitMember,
MatrixLivekitMerger,
matrixLivekitMerger$,
} from "./matrixLivekitMerger";
import { ObservableScope } from "../ObservableScope";
import {
@@ -44,7 +44,7 @@ const userId = "@local:example.com";
const deviceId = "DEVICE000";
// The merger beeing tested
let matrixLivekitMerger: MatrixLivekitMerger;
let matrixLivekitMerger: matrixLivekitMerger$;
beforeEach(() => {
testScope = new ObservableScope();
@@ -62,7 +62,7 @@ beforeEach(() => {
removeEventListener: vi.fn(),
} as unknown as MatrixRoom);
matrixLivekitMerger = new MatrixLivekitMerger(
matrixLivekitMerger = new matrixLivekitMerger$(
testScope,
fakeMemberships$,
mockConnectionManager,

View File

@@ -18,7 +18,7 @@ import { ECConnectionFactory } from "./ConnectionFactory.ts";
import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts";
import { mockMediaDevices, withTestScheduler } from "../../utils/test";
import { type ProcessorState } from "../../livekit/TrackProcessorContext.tsx";
import { MatrixLivekitMerger } from "./matrixLivekitMerger.ts";
import { matrixLivekitMerger$ } from "./matrixLivekitMerger.ts";
import type { CallMembership, Transport } from "matrix-js-sdk/lib/matrixrtc";
import { TRANSPORT_1 } from "./ConnectionManager.test.ts";
@@ -39,9 +39,9 @@ let connectionManager: ConnectionManager;
function createLkMerger(
memberships$: Observable<CallMembership[]>,
): MatrixLivekitMerger {
): matrixLivekitMerger$ {
const mockRoomEmitter = new EventEmitter();
return new MatrixLivekitMerger(
return new matrixLivekitMerger$(
testScope,
memberships$,
connectionManager,

View File

@@ -16,12 +16,12 @@ import {
import { combineLatest, map, startWith, type Observable } from "rxjs";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import type { Room as MatrixRoom, RoomMember } from "matrix-js-sdk";
// import type { Logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior";
import { type ObservableScope } from "../ObservableScope";
import { type ConnectionManager } from "./ConnectionManager";
import { type createConnectionManager$ } from "./ConnectionManager";
import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname";
import { type Connection } from "./Connection";
@@ -45,11 +45,25 @@ export interface MatrixLivekitMember {
participantId: string;
}
interface Props {
scope: ObservableScope;
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
connectionManager: ReturnType<typeof createConnectionManager$>;
// TODO this is too much information for that class,
// apparently needed to get a room member to later get the Avatar
// => Extract an AvatarService instead?
// Better with just `getMember`
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter;
userId: string;
deviceId: string;
}
// Alternative structure idea:
// const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitMember[]> => {
/**
* Combines MatrixRtc and Livekit worlds.
* Combines MatrixRTC and Livekit worlds.
*
* It has a small public interface:
* - in (via constructor):
@@ -58,54 +72,30 @@ export interface MatrixLivekitMember {
* - out (via public Observable):
* - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data.
*/
export class MatrixLivekitMerger {
export function createMatrixLivekitMembers$({
scope,
membershipsWithTransport$,
connectionManager,
matrixRoom,
userId,
deviceId,
}: Props): Behavior<MatrixLivekitMember[]> {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
public matrixLivekitMember$: Behavior<MatrixLivekitMember[]>;
// private readonly logger: Logger;
public constructor(
private scope: ObservableScope,
private membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>,
private connectionManager: ConnectionManager,
// TODO this is too much information for that class,
// apparently needed to get a room member to later get the Avatar
// => Extract an AvatarService instead?
// Better with just `getMember`
private matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter,
private userId: string,
private deviceId: string,
// parentLogger: Logger,
) {
// this.logger = parentLogger.getChild("MatrixLivekitMerger");
this.matrixLivekitMember$ = this.scope.behavior(
this.start$().pipe(startWith([])),
);
}
// =======================================
/// PRIVATES
// =======================================
private start$(): Observable<MatrixLivekitMember[]> {
function createMatrixLivekitMember$(): Observable<MatrixLivekitMember[]> {
const displaynameMap$ = memberDisplaynames$(
this.scope,
this.matrixRoom,
this.membershipsWithTransport$.pipe(
map((v) => v.map((v) => v.membership)),
),
this.userId,
this.deviceId,
scope,
matrixRoom,
membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))),
userId,
deviceId,
);
const membershipsWithTransport$ = this.membershipsWithTransport$;
return combineLatest([
membershipsWithTransport$,
this.connectionManager.connectionManagerData$,
connectionManager.connectionManagerData$,
]).pipe(
map(([memberships, managerData]) => {
const items: MatrixLivekitMember[] = memberships.map(
@@ -121,12 +111,12 @@ export class MatrixLivekitMerger {
);
const member = getRoomMemberFromRtcMember(
membership,
this.matrixRoom,
matrixRoom,
)?.member;
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
const displayName$ = this.scope.behavior(
const displayName$ = scope.behavior(
displaynameMap$.pipe(
map(
(displayNameMap) =>
@@ -139,7 +129,8 @@ export class MatrixLivekitMerger {
membership,
connection,
// This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
member,
// TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely.
member: member as RoomMember,
displayName$,
mxcAvatarUrl: member?.getMxcAvatarUrl(),
participantId,
@@ -150,6 +141,8 @@ export class MatrixLivekitMerger {
}),
);
}
return scope.behavior(createMatrixLivekitMember$().pipe(startWith([])));
}
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)