This commit is contained in:
Timo K
2025-11-06 15:26:17 +01:00
parent 6e1a582265
commit a55ce19048
9 changed files with 122 additions and 161 deletions

View File

@@ -52,7 +52,7 @@ import {
throttleTime,
timer,
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import {
type MatrixRTCSession,
MatrixRTCSessionEvent,
@@ -110,17 +110,17 @@ import {
} from "./layout-types.ts";
import { type ElementCallError } from "../utils/errors.ts";
import { type ObservableScope } from "./ObservableScope.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts";
import { createLocalMembership$ } from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import {
createMemberships$,
createSessionMembershipsAndTransports$,
membershipsAndTransports$,
} from "./SessionBehaviors.ts";
import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts";
import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/MatrixLivekitMembers.ts";
const logger = rootLogger.getChild("[CallViewModel]");
//TODO
// Larger rename
// member,membership -> rtcMember
@@ -193,10 +193,8 @@ export class CallViewModel {
}
: undefined;
private memberships$ = createMemberships$({
scope: this.scope,
matrixRTCSession: this.matrixRTCSession,
});
private memberships$ = createMemberships$(this.scope, this.matrixRTCSession);
private membershipsAndTransports = membershipsAndTransports$(
this.scope,
this.memberships$,
@@ -225,7 +223,7 @@ export class CallViewModel {
// Can contain duplicates. The connection manager will take care of this.
private allTransports$ = this.scope.behavior(
combineLatest(
[this.localTransport$, this.sessionBehaviors.transports$],
[this.localTransport$, this.membershipsAndTransports.transports$],
(localTransport, transports) => {
const localTransportAsArray = localTransport ? [localTransport] : [];
return [...localTransportAsArray, ...transports];
@@ -243,11 +241,10 @@ export class CallViewModel {
private matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: this.scope,
membershipsWithTransport$: this.sessionBehaviors.membershipsWithTransport$,
membershipsWithTransport$:
this.membershipsAndTransports.membershipsWithTransport$,
connectionManager: this.connectionManager,
matrixRoom: this.matrixRoom,
userId: this.userId,
deviceId: this.deviceId,
});
private connectOptions$ = this.scope.behavior(
@@ -357,7 +354,7 @@ export class CallViewModel {
connection,
participant,
member,
displayName$,
displayName,
participantId,
} of matrixLivekitMembers) {
if (connection === undefined) {
@@ -368,7 +365,7 @@ export class CallViewModel {
const mediaId = `${participantId}:${i}`;
const lkRoom = connection?.livekitRoom;
const url = connection?.transport.livekit_service_url;
const dpName$ = displayName$.pipe(map((n) => n ?? "[👻]"));
const item = createOrGet(
mediaId,
(scope) =>
@@ -385,7 +382,7 @@ export class CallViewModel {
url,
this.mediaDevices,
this.pretendToBeDisconnected$,
dpName$,
constant(displayName ?? "[👻]"),
this.handsRaised$.pipe(
map((v) => v[participantId]?.time ?? null),
),
@@ -412,7 +409,7 @@ export class CallViewModel {
lkRoom,
url,
this.pretendToBeDisconnected$,
dpName$,
constant(displayName ?? "[👻]"),
),
),
);

View File

@@ -17,11 +17,6 @@ import { fromEvent, map } from "rxjs";
import { type ObservableScope } from "./ObservableScope";
import { type Behavior } from "./Behavior";
interface Props {
scope: ObservableScope;
matrixRTCSession: MatrixRTCSession;
}
export const membershipsAndTransports$ = (
scope: ObservableScope,
memberships$: Behavior<CallMembership[]>,
@@ -66,10 +61,10 @@ export const membershipsAndTransports$ = (
};
};
export const createMemberships$ = ({
scope,
matrixRTCSession,
}: Props): Behavior<CallMembership[]> => {
export const createMemberships$ = (
scope: ObservableScope,
matrixRTCSession: MatrixRTCSession,
): Behavior<CallMembership[]> => {
return scope.behavior(
fromEvent(
matrixRTCSession,
@@ -78,29 +73,3 @@ export const createMemberships$ = ({
),
);
};
export const createSessionMembershipsAndTransports$ = ({
scope,
matrixRTCSession,
}: Props): {
memberships$: Behavior<CallMembership[]>;
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
transports$: Behavior<LivekitTransport[]>;
} => {
const memberships$ = scope.behavior(
fromEvent(
matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships,
),
);
const memberAndTransport = membershipsAndTransports$(scope, memberships$);
return {
memberships$,
...memberAndTransport,
};
};

View File

@@ -12,7 +12,7 @@ import { type Participant as LivekitParticipant } from "livekit-client";
import { ObservableScope } from "../ObservableScope.ts";
import {
type ConnectionManagerReturn,
type IConnectionManager,
createConnectionManager$,
} from "./ConnectionManager.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
@@ -47,7 +47,7 @@ let connectionManagerInputs: {
connectionFactory: ConnectionFactory;
inputTransports$: BehaviorSubject<LivekitTransport[]>;
};
let manager: ConnectionManagerReturn;
let manager: IConnectionManager;
beforeEach(() => {
testScope = new ObservableScope();

View File

@@ -21,7 +21,7 @@ import { type Behavior } from "../Behavior";
import { type Connection } from "./Connection";
import { type ObservableScope } from "../ObservableScope";
import { generateKeyed$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
@@ -93,13 +93,11 @@ interface Props {
inputTransports$: Behavior<LivekitTransport[]>;
}
// TODO - write test for scopes (do we really need to bind scope)
export interface ConnectionManagerReturn {
deduplicatedTransports$: Behavior<LivekitTransport[]>;
export interface IConnectionManager {
transports$: Behavior<LivekitTransport[]>;
connectionManagerData$: Behavior<ConnectionManagerData>;
connections$: Behavior<Connection[]>;
}
/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
@@ -118,7 +116,7 @@ export function createConnectionManager$({
scope,
connectionFactory,
inputTransports$,
}: Props): ConnectionManagerReturn {
}: Props): IConnectionManager {
const logger = rootLogger.getChild("ConnectionManager");
const running$ = new BehaviorSubject(true);
@@ -133,10 +131,13 @@ export function createConnectionManager$({
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
const deduplicatedTransports$ = scope.behavior(
const transports$ = scope.behavior(
combineLatest([running$, inputTransports$]).pipe(
map(([running, transports]) => (running ? transports : [])),
map(removeDuplicateTransports),
map(([running, transports]) => ({
epoch: transports.epoch,
value: running ? transports.value : [],
})),
map((transports) => removeDuplicateTransports(transports.value)),
),
);
@@ -145,7 +146,7 @@ export function createConnectionManager$({
*/
const connections$ = scope.behavior(
generateKeyed$<LivekitTransport[], Connection, Connection[]>(
deduplicatedTransports$,
transports$,
(transports, createOrGet) => {
const createConnection =
(
@@ -204,7 +205,7 @@ export function createConnectionManager$({
// start empty
new ConnectionManagerData(),
);
return { deduplicatedTransports$, connectionManagerData$, connections$ };
return { transports$, connectionManagerData$, connections$ };
}
function removeDuplicateTransports(

View File

@@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details.
*/
import { describe, test, vi, expect, beforeEach, afterEach } from "vitest";
import { BehaviorSubject } from "rxjs";
import {
type CallMembership,
type LivekitTransport,
@@ -14,14 +13,14 @@ import {
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils";
import { type ConnectionManagerReturn } from "./ConnectionManager.ts";
import { type IConnectionManager } from "./ConnectionManager.ts";
import {
type MatrixLivekitMember,
createMatrixLivekitMembers$,
areLivekitTransportsEqual,
} from "./MatrixLivekitMembers";
import { ObservableScope } from "../ObservableScope";
import { ConnectionManagerData } from "./ConnectionManager";
} from "./MatrixLivekitMembers.ts";
import { ObservableScope } from "../ObservableScope.ts";
import { ConnectionManagerData } from "./ConnectionManager.ts";
import {
mockCallMembership,
mockRemoteParticipant,
@@ -32,8 +31,6 @@ import { type Connection } from "./Connection.ts";
let testScope: ObservableScope;
let mockMatrixRoom: MatrixRoom;
const userId = "@local:example.com";
const deviceId = "DEVICE000";
// The merger beeing tested
@@ -87,8 +84,6 @@ test("should signal participant not yet connected to livekit", () => {
connections$: behavior("a", { a: [] }),
},
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
@@ -106,14 +101,14 @@ test("should signal participant not yet connected to livekit", () => {
function aConnectionManager(
data: ConnectionManagerData,
behavior: Pick<OurRunHelpers, "behavior">,
): ConnectionManagerReturn {
behavior: OurRunHelpers["behavior"],
): IConnectionManager {
return {
connectionManagerData$: behavior("a", { a: data }),
transports$: behavior("a", {
a: [data.getConnections().map((connection) => connection.transport)],
a: data.getConnections().map((connection) => connection.transport),
}),
connections$: behavior("a", { a: [data.getConnections()] }),
connections$: behavior("a", { a: data.getConnections() }),
};
}
@@ -154,8 +149,6 @@ test("should signal participant on a connection that is publishing", () => {
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
@@ -205,8 +198,6 @@ test("should signal participant on a connection that is not publishing", () => {
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
@@ -278,8 +269,6 @@ describe("Publication edge case", () => {
behavior,
),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
@@ -352,8 +341,6 @@ describe("Publication edge case", () => {
behavior,
),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {

View File

@@ -13,14 +13,14 @@ import {
type LivekitTransport,
type CallMembership,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, type Observable } from "rxjs";
import { combineLatest, filter, map, skipWhile, type Observable } from "rxjs";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { type Behavior } from "../Behavior";
import { type IConnectionManager } from "./ConnectionManager";
import { type ObservableScope } from "../ObservableScope";
import type * as ConnectionManager from "./ConnectionManager";
import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname";
import { type Connection } from "./Connection";
@@ -31,7 +31,7 @@ import { type Connection } from "./Connection";
*/
export interface MatrixLivekitMember {
membership: CallMembership;
displayName$: Behavior<string>;
displayName?: string;
participant?: LocalLivekitParticipant | RemoteLivekitParticipant;
connection?: Connection;
/**
@@ -49,14 +49,12 @@ interface Props {
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
connectionManager: ConnectionManager.ConnectionManagerReturn;
connectionManager: IConnectionManager;
// TODO this is too much information for that class,
// apparently needed to get a room member to later get the Avatar
// => Extract an AvatarService instead?
// Better with just `getMember`
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter;
userId: string;
deviceId: string;
}
// Alternative structure idea:
// const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitMember[]> => {
@@ -76,27 +74,30 @@ export function createMatrixLivekitMembers$({
membershipsWithTransport$,
connectionManager,
matrixRoom,
userId,
deviceId,
}: Props): Behavior<MatrixLivekitMember[]> {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
function createMatrixLivekitMember$(): Observable<MatrixLivekitMember[]> {
const displaynameMap$ = memberDisplaynames$(
scope,
matrixRoom,
membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))),
userId,
deviceId,
);
const displaynameMap$ = memberDisplaynames$(
scope,
matrixRoom,
membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))),
);
return combineLatest([
return scope.behavior(
combineLatest([
membershipsWithTransport$,
connectionManager.connectionManagerData$,
displaynameMap$,
]).pipe(
map(([memberships, managerData]) => {
filter(
([membershipsWithTransports, managerData, displaynames]) =>
// for each change in
displaynames.size === membershipsWithTransports.length &&
displaynames.size === managerData.getConnections().length,
),
map(([memberships, managerData, displaynames]) => {
const items: MatrixLivekitMember[] = memberships.map(
({ membership, transport }) => {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to
@@ -115,22 +116,15 @@ export function createMatrixLivekitMembers$({
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
const displayName$ = scope.behavior(
displaynameMap$.pipe(
map(
(displayNameMap) =>
displayNameMap.get(membership.membershipID) ?? "---",
),
),
);
const displayName = displaynames.get(participantId);
return {
participant,
membership,
connection,
// This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
// This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
// TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely.
member: member as RoomMember,
displayName$,
displayName,
mxcAvatarUrl: member?.getMxcAvatarUrl(),
participantId,
};
@@ -138,10 +132,8 @@ export function createMatrixLivekitMembers$({
);
return items;
}),
);
}
return scope.behavior(createMatrixLivekitMember$(), []);
),
);
}
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)

View File

@@ -37,8 +37,6 @@ export const memberDisplaynames$ = (
scope: ObservableScope,
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter,
memberships$: Observable<CallMembership[]>,
userId: string,
deviceId: string,
): Behavior<Map<string, string>> =>
scope.behavior(
combineLatest([
@@ -49,12 +47,7 @@ export const memberDisplaynames$ = (
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
]).pipe(
map(([memberships, _displayNames]) => {
const displaynameMap = new Map<string, string>([
[
`${userId}:${deviceId}`,
matrixRoom.getMember(userId)?.rawDisplayName ?? userId,
],
]);
const displaynameMap = new Map<string, string>();
const room = matrixRoom;
// We only consider RTC members for disambiguation as they are the only visible members.

View File

@@ -26,8 +26,12 @@ import {
createMatrixLivekitMembers$,
type MatrixLivekitMember,
} from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import {
ConnectionManagerData,
createConnectionManager$,
} from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../SessionBehaviors.ts";
import { Connection } from "./Connection.ts";
// Test the integration of ConnectionManager and MatrixLivekitMerger
@@ -109,61 +113,79 @@ test("example test 2", () => {
const bobMembership = mockCallMembership("@bob:example.com", "BDEV000");
const carlMembership = mockCallMembership("@carl:example.com", "CDEV000");
const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000");
const memberships$ = behavior("ab---c", {
const memberships$ = behavior("abc", {
a: [bobMembership],
b: [bobMembership, carlMembership],
c: [bobMembership, carlMembership, daveMembership],
});
const transports$ = testScope.behavior(
memberships$.pipe(
map((memberships) => {
return memberships.map((membership) => {
return membership.getTransport(memberships[0]) as LivekitTransport;
});
}),
),
const membershipsAndTransports = membershipsAndTransports$(
testScope,
memberships$,
);
const connectionManager = createConnectionManager$({
scope: testScope,
connectionFactory: ecConnectionFactory,
inputTransports$: transports$,
inputTransports$: membershipsAndTransports.transports$,
});
const marixLivekitItems$ = createMatrixLivekitMembers$({
const matrixLivekitItems$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: membershipsAndTransports$(
testScope,
memberships$,
).membershipsWithTransport$,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager,
matrixRoom: mockMatrixRoom,
userId: "local:example.org",
deviceId: "ME00",
});
expectObservable(marixLivekitItems$).toBe("a(bb)(cc)", {
expectObservable(membershipsAndTransports.transports$).toBe("abc", {
a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1),
b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2),
c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 3),
});
expectObservable(membershipsAndTransports.membershipsWithTransport$).toBe(
"abc",
{
a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1),
b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2),
c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 3),
},
);
expectObservable(connectionManager.transports$).toBe("abc", {
a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1),
b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1),
c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2),
});
expectObservable(connectionManager.connectionManagerData$).toBe("abc", {
a: expect.toSatisfy(
(d: ConnectionManagerData) => d.getConnections().length === 1,
),
b: expect.toSatisfy(
(d: ConnectionManagerData) => d.getConnections().length === 1,
),
c: expect.toSatisfy(
(d: ConnectionManagerData) => d.getConnections().length === 2,
),
});
expectObservable(connectionManager.connections$).toBe("abc", {
a: expect.toSatisfy((t: Connection[]) => t.length === 1),
b: expect.toSatisfy((t: Connection[]) => t.length === 1),
c: expect.toSatisfy((t: Connection[]) => t.length === 2),
});
expectObservable(matrixLivekitItems$).toBe("abc", {
a: expect.toSatisfy((items: MatrixLivekitMember[]) => {
expect(items.length).toBe(1);
const item = items[0]!;
expect(item.membership).toStrictEqual(bobMembership);
expect(item.participant).toBeUndefined();
return true;
}),
b: expect.toSatisfy((items: MatrixLivekitMember[]) => {
// TODO
// expect(items.length).toBe(2);
//
// expect(items.length).toBe(1);
// const item = items[0]!;
// expect(item.membership).toStrictEqual(bobMembership);
// expect(item.participant).toBeUndefined();
//
// {
// const item = items[1]!;
// expect(item.membership).toStrictEqual(carlMembership);
// expect(item.participant).toBeUndefined();
// }
return true;
}),
b: expect.toSatisfy((items: MatrixLivekitMember[]) => {
return true;
}),
c: expect.toSatisfy(() => true),

View File

@@ -78,11 +78,11 @@ export interface OurRunHelpers extends RunHelpers {
* diagram.
*/
schedule: (marbles: string, actions: Record<string, () => void>) => void;
behavior<T = string>(
behavior: <T>(
marbles: string,
values?: { [marble: string]: T },
error?: unknown,
): Behavior<T>;
) => Behavior<T>;
scope: ObservableScope;
}