fix tests compilation peer session timo - wip

This commit is contained in:
Valere
2025-11-06 12:08:46 +01:00
parent d8e29467f6
commit 6e1a582265
11 changed files with 861 additions and 312 deletions

View File

@@ -113,7 +113,11 @@ import { type ObservableScope } from "./ObservableScope.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts";
import { createLocalMembership$ } from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import { createSessionMembershipsAndTransports$ } from "./SessionBehaviors.ts";
import {
createMemberships$,
createSessionMembershipsAndTransports$,
membershipsAndTransports$,
} from "./SessionBehaviors.ts";
import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts";
import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts";
@@ -189,11 +193,14 @@ export class CallViewModel {
}
: undefined;
private sessionBehaviors = createSessionMembershipsAndTransports$({
private memberships$ = createMemberships$({
scope: this.scope,
matrixRTCSession: this.matrixRTCSession,
});
private memberships$ = this.sessionBehaviors.memberships$;
private membershipsAndTransports = membershipsAndTransports$(
this.scope,
this.memberships$,
);
private localTransport$ = createLocalTransport$({
scope: this.scope,

View File

@@ -22,23 +22,15 @@ interface Props {
matrixRTCSession: MatrixRTCSession;
}
export const createSessionMembershipsAndTransports$ = ({
scope,
matrixRTCSession,
}: Props): {
memberships$: Behavior<CallMembership[]>;
export const membershipsAndTransports$ = (
scope: ObservableScope,
memberships$: Behavior<CallMembership[]>,
): {
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
transports$: Behavior<LivekitTransport[]>;
} => {
const memberships$ = scope.behavior(
fromEvent(
matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships,
),
);
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
* members. For completeness this also lists the preferred transport and
@@ -47,9 +39,7 @@ export const createSessionMembershipsAndTransports$ = ({
* together when it might change together is what you have to do in RxJS to
* avoid reading inconsistent state or observing too many changes.)
*/
const membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
> = scope.behavior(
const membershipsWithTransport$ = scope.behavior(
memberships$.pipe(
map((memberships) => {
return memberships.map((membership) => {
@@ -69,9 +59,48 @@ export const createSessionMembershipsAndTransports$ = ({
map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))),
),
);
return {
memberships$,
membershipsWithTransport$,
transports$,
};
};
export const createMemberships$ = ({
scope,
matrixRTCSession,
}: Props): Behavior<CallMembership[]> => {
return scope.behavior(
fromEvent(
matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships,
),
);
};
export const createSessionMembershipsAndTransports$ = ({
scope,
matrixRTCSession,
}: Props): {
memberships$: Behavior<CallMembership[]>;
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
transports$: Behavior<LivekitTransport[]>;
} => {
const memberships$ = scope.behavior(
fromEvent(
matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships,
),
);
const memberAndTransport = membershipsAndTransports$(scope, memberships$);
return {
memberships$,
...memberAndTransport,
};
};

View File

@@ -28,7 +28,10 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior";
import { type createConnectionManager$ } from "../remoteMembers/ConnectionManager";
import {
type ConnectionManagerReturn,
type createConnectionManager$,
} from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../ObservableScope";
import { Publisher } from "./Publisher";
import { type MuteStates } from "../MuteStates";
@@ -90,7 +93,7 @@ interface Props {
scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates;
connectionManager: ReturnType<typeof createConnectionManager$>;
connectionManager: ConnectionManagerReturn;
matrixRTCSession: MatrixRTCSession;
matrixRoom: MatrixRoom;
localTransport$: Behavior<LivekitTransport | undefined>;

View File

@@ -11,11 +11,14 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { ObservableScope } from "../ObservableScope.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import {
type ConnectionManagerReturn,
createConnectionManager$,
} from "./ConnectionManager.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
import { type Connection } from "./Connection.ts";
import { areLivekitTransportsEqual } from "./matrixLivekitMerger.ts";
import { flushPromises, withTestScheduler } from "../../utils/test.ts";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
// Some test constants
@@ -44,7 +47,7 @@ let connectionManagerInputs: {
connectionFactory: ConnectionFactory;
inputTransports$: BehaviorSubject<LivekitTransport[]>;
};
let manager: ReturnType<typeof createConnectionManager$>;
let manager: ConnectionManagerReturn;
beforeEach(() => {
testScope = new ObservableScope();

View File

@@ -21,7 +21,7 @@ import { type Behavior } from "../Behavior";
import { type Connection } from "./Connection";
import { type ObservableScope } from "../ObservableScope";
import { generateKeyed$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./matrixLivekitMerger";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
@@ -94,6 +94,12 @@ interface Props {
}
// TODO - write test for scopes (do we really need to bind scope)
export interface ConnectionManagerReturn {
deduplicatedTransports$: Behavior<LivekitTransport[]>;
connectionManagerData$: Behavior<ConnectionManagerData>;
connections$: Behavior<Connection[]>;
}
/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
@@ -112,11 +118,7 @@ export function createConnectionManager$({
scope,
connectionFactory,
inputTransports$,
}: Props): {
transports$: Behavior<LivekitTransport[]>;
connectionManagerData$: Behavior<ConnectionManagerData>;
connections$: Behavior<Connection[]>;
} {
}: Props): ConnectionManagerReturn {
const logger = rootLogger.getChild("ConnectionManager");
const running$ = new BehaviorSubject(true);
@@ -131,7 +133,7 @@ export function createConnectionManager$({
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
const transports$ = scope.behavior(
const deduplicatedTransports$ = scope.behavior(
combineLatest([running$, inputTransports$]).pipe(
map(([running, transports]) => (running ? transports : [])),
map(removeDuplicateTransports),
@@ -143,7 +145,7 @@ export function createConnectionManager$({
*/
const connections$ = scope.behavior(
generateKeyed$<LivekitTransport[], Connection, Connection[]>(
transports$,
deduplicatedTransports$,
(transports, createOrGet) => {
const createConnection =
(
@@ -202,7 +204,7 @@ export function createConnectionManager$({
// start empty
new ConnectionManagerData(),
);
return { transports$, connectionManagerData$, connections$ };
return { deduplicatedTransports$, connectionManagerData$, connections$ };
}
function removeDuplicateTransports(

View File

@@ -13,15 +13,14 @@ import {
type LivekitTransport,
type CallMembership,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, startWith, type Observable } from "rxjs";
import { combineLatest, map, type Observable } from "rxjs";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
// import type { Logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior";
import { type ObservableScope } from "../ObservableScope";
import { type createConnectionManager$ } from "./ConnectionManager";
import type * as ConnectionManager from "./ConnectionManager";
import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname";
import { type Connection } from "./Connection";
@@ -50,7 +49,7 @@ interface Props {
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
connectionManager: ReturnType<typeof createConnectionManager$>;
connectionManager: ConnectionManager.ConnectionManagerReturn;
// TODO this is too much information for that class,
// apparently needed to get a room member to later get the Avatar
// => Extract an AvatarService instead?
@@ -142,7 +141,7 @@ export function createMatrixLivekitMembers$({
);
}
return scope.behavior(createMatrixLivekitMember$().pipe(startWith([])));
return scope.behavior(createMatrixLivekitMember$(), []);
}
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)

View File

@@ -5,71 +5,51 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
describe,
test,
vi,
expect,
beforeEach,
afterEach,
type MockedObject,
} from "vitest";
import { BehaviorSubject, take } from "rxjs";
import { describe, test, vi, expect, beforeEach, afterEach } from "vitest";
import { BehaviorSubject } from "rxjs";
import {
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { type Room as MatrixRoom } from "matrix-js-sdk";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils";
import { type ConnectionManagerReturn } from "./ConnectionManager.ts";
import {
type MatrixLivekitMember,
matrixLivekitMerger$,
} from "./matrixLivekitMerger";
createMatrixLivekitMembers$,
areLivekitTransportsEqual,
} from "./MatrixLivekitMembers";
import { ObservableScope } from "../ObservableScope";
import { ConnectionManagerData } from "./ConnectionManager";
import {
type ConnectionManager,
ConnectionManagerData,
} from "./ConnectionManager";
import { aliceRtcMember } from "../../utils/test-fixtures";
import { mockRemoteParticipant } from "../../utils/test.ts";
mockCallMembership,
mockRemoteParticipant,
type OurRunHelpers,
withTestScheduler,
} from "../../utils/test.ts";
import { type Connection } from "./Connection.ts";
let testScope: ObservableScope;
let fakeManagerData$: BehaviorSubject<ConnectionManagerData>;
let fakeMemberships$: BehaviorSubject<CallMembership[]>;
let mockConnectionManager: MockedObject<ConnectionManager>;
let mockMatrixRoom: MatrixRoom;
const userId = "@local:example.com";
const deviceId = "DEVICE000";
// The merger beeing tested
let matrixLivekitMerger: matrixLivekitMerger$;
beforeEach(() => {
testScope = new ObservableScope();
fakeMemberships$ = new BehaviorSubject<CallMembership[]>([]);
fakeManagerData$ = new BehaviorSubject<ConnectionManagerData>(
new ConnectionManagerData(),
);
mockConnectionManager = vi.mocked<ConnectionManager>({
registerTransports: vi.fn(),
connectionManagerData$: fakeManagerData$,
} as unknown as ConnectionManager);
mockMatrixRoom = vi.mocked<MatrixRoom>({
getMember: vi.fn().mockReturnValue(null),
getMember: vi.fn().mockImplementation((userId: string) => {
return {
userId,
rawDisplayName: userId.replace("@", "").replace(":example.org", ""),
getMxcAvatarUrl: vi.fn().mockReturnValue(null),
} as unknown as RoomMember;
}),
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
} as unknown as MatrixRoom);
matrixLivekitMerger = new matrixLivekitMerger$(
testScope,
fakeMemberships$,
mockConnectionManager,
mockMatrixRoom,
userId,
deviceId,
);
});
afterEach(() => {
@@ -77,186 +57,357 @@ afterEach(() => {
});
test("should signal participant not yet connected to livekit", () => {
fakeMemberships$.next([aliceRtcMember]);
withTestScheduler(({ behavior, expectObservable }) => {
const bobMembership = {
userId: "@bob:example.org",
deviceId: "DEV000",
transports: [
{
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
},
],
} as unknown as CallMembership;
let items: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$
.pipe(take(1))
.subscribe((emitted) => {
items = emitted;
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
},
],
}),
connectionManager: {
connectionManagerData$: behavior("a", {
a: new ConnectionManagerData(),
}),
transports$: behavior("a", { a: [] }),
connections$: behavior("a", { a: [] }),
},
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expect(items).toHaveLength(1);
const item = items[0];
// Assert the expected membership
expect(item.membership).toBe(aliceRtcMember);
// Assert participant & connection are absent (not just `undefined`)
expect(item.participant).not.toBeDefined();
expect(item.participant).not.toBeDefined();
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
return (
data.length == 1 &&
data[0].membership === bobMembership &&
data[0].participant === undefined &&
data[0].connection === undefined
);
}),
});
});
});
function aConnectionManager(
data: ConnectionManagerData,
behavior: Pick<OurRunHelpers, "behavior">,
): ConnectionManagerReturn {
return {
connectionManagerData$: behavior("a", { a: data }),
transports$: behavior("a", {
a: [data.getConnections().map((connection) => connection.transport)],
}),
connections$: behavior("a", { a: [data.getConnections()] }),
};
}
test("should signal participant on a connection that is publishing", () => {
const fakeConnection = {
transport: aliceRtcMember.getTransport(aliceRtcMember) as LivekitTransport,
} as unknown as Connection;
withTestScheduler(({ behavior, expectObservable }) => {
const transport: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
fakeMemberships$.next([aliceRtcMember]);
const aliceParticipantId = getParticipantId(
aliceRtcMember.userId,
aliceRtcMember.deviceId,
);
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transport,
);
const managerData: ConnectionManagerData = new ConnectionManagerData();
managerData.add(fakeConnection, [
mockRemoteParticipant({ identity: aliceParticipantId }),
]);
fakeManagerData$.next(managerData);
let items: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$
.pipe(take(1))
.subscribe((emitted) => {
items = emitted;
const connectionWithPublisher = new ConnectionManagerData();
const bobParticipantId = getParticipantId(
bobMembership.userId,
bobMembership.deviceId,
);
const connection = {
transport: transport,
} as unknown as Connection;
connectionWithPublisher.add(connection, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport,
},
],
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expect(items).toHaveLength(1);
const item = items[0];
// Assert the expected membership
expect(item.membership).toBe(aliceRtcMember);
expect(item.participant?.identity).toBe(aliceParticipantId);
expect(item.connection?.transport).toEqual(fakeConnection.transport);
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(data[0].connection!.transport, transport),
).toBe(true);
return true;
}),
});
});
});
test("should signal participant on a connection that is not publishing", () => {
const fakeConnection = {
transport: aliceRtcMember.getTransport(aliceRtcMember) as LivekitTransport,
} as unknown as Connection;
withTestScheduler(({ behavior, expectObservable }) => {
const transport: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
fakeMemberships$.next([aliceRtcMember]);
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transport,
);
const managerData: ConnectionManagerData = new ConnectionManagerData();
managerData.add(fakeConnection, []);
fakeManagerData$.next(managerData);
const connectionWithPublisher = new ConnectionManagerData();
// const bobParticipantId = getParticipantId(bobMembership.userId, bobMembership.deviceId);
const connection = {
transport: transport,
} as unknown as Connection;
connectionWithPublisher.add(connection, []);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport,
},
],
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
matrixLivekitMerger.matrixLivekitMember$.pipe(take(1)).subscribe((items) => {
expect(items).toHaveLength(1);
const item = items[0];
// Assert the expected membership
expect(item.membership).toBe(aliceRtcMember);
expect(item.participant).not.toBeDefined();
// We have the connection
expect(item.connection?.transport).toEqual(fakeConnection.transport);
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).not.toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(data[0].connection!.transport, transport),
).toBe(true);
return true;
}),
});
});
});
describe("Publication edge case", () => {
const connectionA = {
transport: {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
},
} as unknown as Connection;
const connectionB = {
transport: {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
},
} as unknown as Connection;
const bobMembership = {
userId: "@bob:example.org",
deviceId: "DEV000",
transports: [connectionA.transport],
} as unknown as CallMembership;
const bobParticipantId = getParticipantId(
bobMembership.userId,
bobMembership.deviceId,
);
test("bob is publishing in several connections", () => {
let lastMatrixLkItems: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
lastMatrixLkItems = items;
withTestScheduler(({ behavior, expectObservable }) => {
const transportA: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const transportB: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transportA,
);
const connectionWithPublisher = new ConnectionManagerData();
const bobParticipantId = getParticipantId(
bobMembership.userId,
bobMembership.deviceId,
);
const connectionA = {
transport: transportA,
} as unknown as Connection;
const connectionB = {
transport: transportB,
} as unknown as Connection;
connectionWithPublisher.add(connectionA, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
connectionWithPublisher.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport: transportA,
},
],
}),
connectionManager: aConnectionManager(
connectionWithPublisher,
behavior,
),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).toBeDefined();
expect(data[0].participant!.identity).toEqual(bobParticipantId);
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(
data[0].connection!.transport,
transportA,
),
).toBe(true);
return true;
}),
});
});
vi.mocked(bobMembership).getTransport = vi
.fn()
.mockReturnValue(connectionA.transport);
fakeMemberships$.next([bobMembership]);
const lkMap = new ConnectionManagerData();
lkMap.add(connectionA, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
lkMap.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
fakeManagerData$.next(lkMap);
const items = lastMatrixLkItems;
expect(items).toHaveLength(1);
const item = items[0];
// Assert the expected membership
expect(item.membership.userId).toEqual(bobMembership.userId);
expect(item.membership.deviceId).toEqual(bobMembership.deviceId);
expect(item.participant?.identity).toEqual(bobParticipantId);
// The transport info should come from the membership transports and not only from the publishing connection
expect(item.connection?.transport?.livekit_service_url).toEqual(
bobMembership.transports[0]?.livekit_service_url,
);
expect(item.connection?.transport?.livekit_alias).toEqual(
bobMembership.transports[0]?.livekit_alias,
);
});
test("bob is publishing in the wrong connection", () => {
let lastMatrixLkItems: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
lastMatrixLkItems = items;
withTestScheduler(({ behavior, expectObservable }) => {
const transportA: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const transportB: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transportA,
);
const connectionWithPublisher = new ConnectionManagerData();
const bobParticipantId = getParticipantId(
bobMembership.userId,
bobMembership.deviceId,
);
const connectionA = {
transport: transportA,
} as unknown as Connection;
const connectionB = {
transport: transportB,
} as unknown as Connection;
connectionWithPublisher.add(connectionA, []);
connectionWithPublisher.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport: transportA,
},
],
}),
connectionManager: aConnectionManager(
connectionWithPublisher,
behavior,
),
matrixRoom: mockMatrixRoom,
userId,
deviceId,
});
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).not.toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(
data[0].connection!.transport,
transportA,
),
).toBe(true);
return true;
}),
});
});
vi.mocked(bobMembership).getTransport = vi
.fn()
.mockReturnValue(connectionA.transport);
// let lastMatrixLkItems: MatrixLivekitMember[] = [];
// matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
// lastMatrixLkItems = items;
// });
fakeMemberships$.next([bobMembership]);
// vi.mocked(bobMembership).getTransport = vi
// .fn()
// .mockReturnValue(connectionA.transport);
const lkMap = new ConnectionManagerData();
lkMap.add(connectionA, []);
lkMap.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
// fakeMemberships$.next([bobMembership]);
fakeManagerData$.next(lkMap);
// const lkMap = new ConnectionManagerData();
// lkMap.add(connectionA, []);
// lkMap.add(connectionB, [
// mockRemoteParticipant({ identity: bobParticipantId })
// ]);
const items = lastMatrixLkItems;
expect(items).toHaveLength(1);
const item = items[0];
// fakeManagerData$.next(lkMap);
// Assert the expected membership
expect(item.membership.userId).toEqual(bobMembership.userId);
expect(item.membership.deviceId).toEqual(bobMembership.deviceId);
// const items = lastMatrixLkItems;
// expect(items).toHaveLength(1);
// const item = items[0];
expect(item.participant).not.toBeDefined();
// // Assert the expected membership
// expect(item.membership.userId).toEqual(bobMembership.userId);
// expect(item.membership.deviceId).toEqual(bobMembership.deviceId);
// The transport info should come from the membership transports and not only from the publishing connection
expect(item.connection?.transport?.livekit_service_url).toEqual(
bobMembership.transports[0]?.livekit_service_url,
);
expect(item.connection?.transport?.livekit_alias).toEqual(
bobMembership.transports[0]?.livekit_alias,
);
// expect(item.participant).not.toBeDefined();
// // The transport info should come from the membership transports and not only from the publishing connection
// expect(item.connection?.transport?.livekit_service_url).toEqual(
// bobMembership.transports[0]?.livekit_service_url
// );
// expect(item.connection?.transport?.livekit_alias).toEqual(
// bobMembership.transports[0]?.livekit_alias
// );
});
});

View File

@@ -0,0 +1,299 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { afterEach, beforeEach, test, vi } from "vitest";
import {
type MatrixEvent,
type RoomMember,
type RoomState,
RoomStateEvent,
} from "matrix-js-sdk";
import EventEmitter from "events";
import { ObservableScope } from "../ObservableScope.ts";
import type { Room as MatrixRoom } from "matrix-js-sdk/lib/models/room";
import { mockCallMembership, withTestScheduler } from "../../utils/test.ts";
import { memberDisplaynames$ } from "./displayname.ts";
let testScope: ObservableScope;
let mockMatrixRoom: MatrixRoom;
/*
* To be populated in the test setup.
* Maps userId to a partial/mock RoomMember object.
*/
let fakeMembersMap: Map<string, Partial<RoomMember>>;
beforeEach(() => {
testScope = new ObservableScope();
fakeMembersMap = new Map<string, Partial<RoomMember>>();
const roomEmitter = new EventEmitter();
mockMatrixRoom = {
on: roomEmitter.on.bind(roomEmitter),
off: roomEmitter.off.bind(roomEmitter),
emit: roomEmitter.emit.bind(roomEmitter),
// addListener: roomEmitter.addListener.bind(roomEmitter),
// removeListener: roomEmitter.removeListener.bind(roomEmitter),
getMember: vi.fn().mockImplementation((userId: string) => {
const member = fakeMembersMap.get(userId);
if (member) {
return member as RoomMember;
}
return null;
}),
} as unknown as MatrixRoom;
});
function fakeMemberWith(data: Partial<RoomMember>): void {
const userId = data.userId || "@alice:example.com";
const member: Partial<RoomMember> = {
userId: userId,
rawDisplayName: data.rawDisplayName ?? userId,
...data,
} as unknown as RoomMember;
fakeMembersMap.set(userId, member);
// return member as RoomMember;
}
function updateDisplayName(
userId: `@${string}:${string}`,
newDisplayName: string,
): void {
const member = fakeMembersMap.get(userId);
if (member) {
member.rawDisplayName = newDisplayName;
// Emit the event to notify listeners
mockMatrixRoom.emit(
RoomStateEvent.Members,
{} as unknown as MatrixEvent,
{} as unknown as RoomState,
member as RoomMember,
);
} else {
throw new Error(`No member found with userId: ${userId}`);
}
}
afterEach(() => {
fakeMembersMap.clear();
});
test("should always have our own user", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("a", {
a: [],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("a", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "@local:example.com"],
]),
});
});
});
function setUpBasicRoom(): void {
fakeMemberWith({ userId: "@local:example.com", rawDisplayName: "it's a me" });
fakeMemberWith({ userId: "@alice:example.com", rawDisplayName: "Alice" });
fakeMemberWith({ userId: "@bob:example.com", rawDisplayName: "Bob" });
fakeMemberWith({ userId: "@carl:example.com", rawDisplayName: "Carl" });
fakeMemberWith({ userId: "@evil:example.com", rawDisplayName: "Carl" });
fakeMemberWith({ userId: "@bob:foo.bar", rawDisplayName: "Bob" });
fakeMemberWith({ userId: "@no-name:foo.bar" });
}
test("should get displayName for users", () => {
setUpBasicRoom();
withTestScheduler(({ cold, schedule, expectObservable }) => {
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("a", {
a: [
mockCallMembership("@alice:example.com", "DEVICE1"),
mockCallMembership("@bob:example.com", "DEVICE1"),
],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("a", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@alice:example.com:DEVICE1", "Alice"],
["@bob:example.com:DEVICE1", "Bob"],
]),
});
});
});
test("should use userId if no display name", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
setUpBasicRoom();
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("a", {
a: [mockCallMembership("@no-name:foo.bar", "D000")],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("a", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@no-name:foo.bar:D000", "@no-name:foo.bar"],
]),
});
});
});
test("should disambiguate users with same display name", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
setUpBasicRoom();
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("a", {
a: [
mockCallMembership("@bob:example.com", "DEVICE1"),
mockCallMembership("@bob:example.com", "DEVICE2"),
mockCallMembership("@bob:foo.bar", "BOB000"),
mockCallMembership("@carl:example.com", "C000"),
mockCallMembership("@evil:example.com", "E000"),
],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("a", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"],
["@bob:example.com:DEVICE2", "Bob (@bob:example.com)"],
["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"],
["@carl:example.com:C000", "Carl (@carl:example.com)"],
["@evil:example.com:E000", "Carl (@evil:example.com)"],
]),
});
});
});
test("should disambiguate when needed", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
setUpBasicRoom();
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("ab", {
a: [mockCallMembership("@bob:example.com", "DEVICE1")],
b: [
mockCallMembership("@bob:example.com", "DEVICE1"),
mockCallMembership("@bob:foo.bar", "BOB000"),
],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("ab", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:DEVICE1", "Bob"],
]),
b: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"],
["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"],
]),
});
});
});
test.skip("should keep disambiguated name when other leave", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
setUpBasicRoom();
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("ab", {
a: [
mockCallMembership("@bob:example.com", "DEVICE1"),
mockCallMembership("@bob:foo.bar", "BOB000"),
],
b: [mockCallMembership("@bob:example.com", "DEVICE1")],
}),
"@local:example.com",
"DEVICE000",
);
expectObservable(dn$).toBe("ab", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"],
["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"],
]),
b: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"],
]),
});
});
});
test("should disambiguate on name change", () => {
withTestScheduler(({ cold, schedule, expectObservable }) => {
setUpBasicRoom();
const dn$ = memberDisplaynames$(
testScope,
mockMatrixRoom,
cold("a", {
a: [
mockCallMembership("@bob:example.com", "B000"),
mockCallMembership("@carl:example.com", "C000"),
],
}),
"@local:example.com",
"DEVICE000",
);
schedule("-a", {
a: () => {
updateDisplayName("@carl:example.com", "Bob");
},
});
expectObservable(dn$).toBe("ab", {
a: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:B000", "Bob"],
["@carl:example.com:C000", "Carl"],
]),
b: new Map<string, string>([
["@local:example.com:DEVICE000", "it's a me"],
["@bob:example.com:B000", "Bob (@bob:example.com)"],
["@carl:example.com:C000", "Bob (@carl:example.com)"],
]),
});
});
});

View File

@@ -6,12 +6,16 @@ Please see LICENSE in the repository root for full details.
*/
import { type RoomMember, RoomStateEvent } from "matrix-js-sdk";
import { combineLatest, fromEvent, type Observable, startWith } from "rxjs";
import {
combineLatest,
fromEvent,
map,
type Observable,
startWith,
} from "rxjs";
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type ObservableScope } from "../ObservableScope";
import {
@@ -19,6 +23,7 @@ import {
shouldDisambiguate,
} from "../../utils/displayname";
import { type Behavior } from "../Behavior";
import type { NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEvent.ts";
/**
* Displayname for each member of the call. This will disambiguate
@@ -36,15 +41,14 @@ export const memberDisplaynames$ = (
deviceId: string,
): Behavior<Map<string, string>> =>
scope.behavior(
combineLatest(
[
// Handle call membership changes
memberships$,
// Additionally handle display name changes (implicitly reacting to them)
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)),
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
],
(memberships, _displaynames) => {
combineLatest([
// Handle call membership changes
memberships$,
// Additionally handle display name changes (implicitly reacting to them)
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)),
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
]).pipe(
map(([memberships, _displayNames]) => {
const displaynameMap = new Map<string, string>([
[
`${userId}:${deviceId}`,
@@ -55,11 +59,12 @@ export const memberDisplaynames$ = (
// We only consider RTC members for disambiguation as they are the only visible members.
for (const rtcMember of memberships) {
// TODO a hard-coded participant ID ? should use rtcMember.membershipID instead?
const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`;
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
if (!member) {
logger.error(
"Could not find member for media id:",
"Could not find member for participant id:",
matrixIdentifier,
);
continue;
@@ -71,7 +76,7 @@ export const memberDisplaynames$ = (
);
}
return displaynameMap;
},
}),
),
new Map<string, string>(),
);

View File

@@ -5,22 +5,29 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { test, vi, beforeEach, afterEach } from "vitest";
import { BehaviorSubject, type Observable } from "rxjs";
import { test, vi, expect, beforeEach, afterEach } from "vitest";
import { BehaviorSubject, map } from "rxjs";
import { type Room as LivekitRoom } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import EventEmitter from "events";
import fetchMock from "fetch-mock";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { ConnectionManager } from "./ConnectionManager.ts";
import { ObservableScope } from "../ObservableScope.ts";
import { ECConnectionFactory } from "./ConnectionFactory.ts";
import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts";
import { mockMediaDevices, withTestScheduler } from "../../utils/test";
import {
mockCallMembership,
mockMediaDevices,
withTestScheduler,
} from "../../utils/test";
import { type ProcessorState } from "../../livekit/TrackProcessorContext.tsx";
import { matrixLivekitMerger$ } from "./matrixLivekitMerger.ts";
import type { CallMembership, Transport } from "matrix-js-sdk/lib/matrixrtc";
import { TRANSPORT_1 } from "./ConnectionManager.test.ts";
import {
createMatrixLivekitMembers$,
type MatrixLivekitMember,
} from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../SessionBehaviors.ts";
// Test the integration of ConnectionManager and MatrixLivekitMerger
@@ -28,33 +35,10 @@ let testScope: ObservableScope;
let ecConnectionFactory: ECConnectionFactory;
let mockClient: OpenIDClientParts;
let lkRoomFactory: () => LivekitRoom;
let mockMatrixRoom: MatrixRoom;
const createdMockLivekitRooms: Map<string, LivekitRoom> = new Map();
// Main test input
const memberships$ = new BehaviorSubject<CallMembership[]>([]);
// under test
let connectionManager: ConnectionManager;
function createLkMerger(
memberships$: Observable<CallMembership[]>,
): matrixLivekitMerger$ {
const mockRoomEmitter = new EventEmitter();
return new matrixLivekitMerger$(
testScope,
memberships$,
connectionManager,
{
on: mockRoomEmitter.on.bind(mockRoomEmitter),
off: mockRoomEmitter.off.bind(mockRoomEmitter),
getMember: vi.fn().mockReturnValue(undefined),
},
"@user:example.com",
"DEV000",
);
}
beforeEach(() => {
testScope = new ObservableScope();
mockClient = {
@@ -90,16 +74,9 @@ beforeEach(() => {
lkRoomFactory,
);
connectionManager = new ConnectionManager(
testScope,
ecConnectionFactory,
logger,
);
//TODO a bit annoying to have to do a http mock?
fetchMock.post(`**/sfu/get`, (url) => {
fetchMock.post(`path:/sfu/get`, (url) => {
const domain = new URL(url).hostname; // Extract the domain from the URL
return {
status: 200,
body: {
@@ -108,6 +85,18 @@ beforeEach(() => {
},
};
});
mockMatrixRoom = vi.mocked<MatrixRoom>({
getMember: vi.fn().mockImplementation((userId: string) => {
return {
userId,
rawDisplayName: userId.replace("@", "").replace(":example.org", ""),
getMxcAvatarUrl: vi.fn().mockReturnValue(null),
} as unknown as RoomMember;
}),
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
} as unknown as MatrixRoom);
});
afterEach(() => {
@@ -115,43 +104,82 @@ afterEach(() => {
fetchMock.reset();
});
test("example test", () => {
withTestScheduler(({ schedule, expectObservable, cold }) => {
connectionManager.connections$.subscribe((connections) => {
// console.log(
// "Connections updated:",
// connections.map((c) => c.transport),
// );
test("example test 2", () => {
withTestScheduler(({ schedule, expectObservable, behavior, cold }) => {
const bobMembership = mockCallMembership("@bob:example.com", "BDEV000");
const carlMembership = mockCallMembership("@carl:example.com", "CDEV000");
const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000");
const memberships$ = behavior("ab---c", {
a: [bobMembership],
b: [bobMembership, carlMembership],
c: [bobMembership, carlMembership, daveMembership],
});
const memberships$ = cold("-a-b-c", {
a: [mockCallmembership("@bob:example.com", "BDEV000")],
b: [
mockCallmembership("@bob:example.com", "BDEV000"),
mockCallmembership("@carl:example.com", "CDEV000"),
],
c: [
mockCallmembership("@bob:example.com", "BDEV000"),
mockCallmembership("@carl:example.com", "CDEV000"),
mockCallmembership("@dave:foo.bar", "DDEV000"),
],
const transports$ = testScope.behavior(
memberships$.pipe(
map((memberships) => {
return memberships.map((membership) => {
return membership.getTransport(memberships[0]) as LivekitTransport;
});
}),
),
);
const connectionManager = createConnectionManager$({
scope: testScope,
connectionFactory: ecConnectionFactory,
inputTransports$: transports$,
});
// TODO IN PROGRESS
const merger = createLkMerger(memberships$);
const marixLivekitItems$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: membershipsAndTransports$(
testScope,
memberships$,
).membershipsWithTransport$,
connectionManager,
matrixRoom: mockMatrixRoom,
userId: "local:example.org",
deviceId: "ME00",
});
expectObservable(marixLivekitItems$).toBe("a(bb)(cc)", {
a: expect.toSatisfy((items: MatrixLivekitMember[]) => {
expect(items.length).toBe(1);
const item = items[0]!;
expect(item.membership).toStrictEqual(bobMembership);
expect(item.participant).toBeUndefined();
return true;
}),
b: expect.toSatisfy((items: MatrixLivekitMember[]) => {
// TODO
// expect(items.length).toBe(2);
//
// const item = items[0]!;
// expect(item.membership).toStrictEqual(bobMembership);
// expect(item.participant).toBeUndefined();
//
// {
// const item = items[1]!;
// expect(item.membership).toStrictEqual(carlMembership);
// expect(item.participant).toBeUndefined();
// }
return true;
}),
c: expect.toSatisfy(() => true),
});
});
});
function mockCallmembership(
userId: string,
deviceId: string,
transport?: Transport,
): CallMembership {
const t = transport ?? TRANSPORT_1;
return {
userId: userId,
deviceId: deviceId,
getTransport: vi.fn().mockReturnValue(t),
transports: [t],
} as unknown as CallMembership;
}
// test("Tryng", () => {
//
// withTestScheduler(({ schedule, expectObservable, behavior, cold }) => {
// const one = cold("a-b-c", { a: 1, b: 2, c: 3 });
// const a = one.pipe(map(() => 1));
// const b = one.pipe(map(() => 2));
// const combined = combineLatest([a,b])
// .pipe(map(([a,b])=>`${a}${b}`));
// expectObservable(combined).toBe("a-b-c", { a: 1, b: expect.anything(), c: 3 });
//
// })
// })

View File

@@ -187,6 +187,29 @@ export const exampleTransport: LivekitTransport = {
livekit_alias: "!alias:example.org",
};
export function mockCallMembership(
userId: string,
deviceId: string,
transport?: Transport,
): CallMembership {
const t = transport ?? transportForUser(userId);
return {
userId: userId,
deviceId: deviceId,
getTransport: vi.fn().mockReturnValue(t),
transports: [t],
} as unknown as CallMembership;
}
function transportForUser(userId: string): Transport {
const domain = userId.split(":")[1];
return {
type: "livekit",
livekit_service_url: `https://lk.${domain}`,
livekit_alias: `!alias:${domain}`,
};
}
export function mockRtcMembership(
user: string | RoomMember,
deviceId: string,