test: fixup MatrixLivekitMembers tests

This commit is contained in:
Valere
2025-11-10 10:43:53 +01:00
parent 92ddc4c797
commit 5c83e0dce1

View File

@@ -12,19 +12,23 @@ import {
} from "matrix-js-sdk/lib/matrixrtc";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils";
import { combineLatest, map, type Observable } from "rxjs";
import { type IConnectionManager } from "./ConnectionManager.ts";
import {
type MatrixLivekitMember,
createMatrixLivekitMembers$,
areLivekitTransportsEqual,
} from "./MatrixLivekitMembers.ts";
import { ObservableScope } from "../../ObservableScope.ts";
import {
Epoch,
mapEpoch,
ObservableScope,
trackEpoch,
} from "../../ObservableScope.ts";
import { ConnectionManagerData } from "./ConnectionManager.ts";
import {
mockCallMembership,
mockRemoteParticipant,
type OurRunHelpers,
withTestScheduler,
} from "../../../utils/test.ts";
import { type Connection } from "./Connection.ts";
@@ -32,7 +36,28 @@ import { type Connection } from "./Connection.ts";
let testScope: ObservableScope;
let mockMatrixRoom: MatrixRoom;
// The merger beeing tested
const transportA: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const transportB: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transportA,
);
const carlMembership = mockCallMembership(
"@carl:sample.com",
"DEV111",
transportB,
);
beforeEach(() => {
testScope = new ObservableScope();
@@ -53,113 +78,138 @@ afterEach(() => {
testScope.end();
});
function epochMeWith$<T, U>(
source$: Observable<Epoch<U>>,
me$: Observable<T>,
): Observable<Epoch<T>> {
return combineLatest([source$, me$]).pipe(
map(([ep, cd]) => {
return new Epoch(cd, ep.epoch);
}),
);
}
test("should signal participant not yet connected to livekit", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const bobMembership = {
userId: "@bob:example.org",
deviceId: "DEV000",
transports: [
{
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
},
],
} as unknown as CallMembership;
const { memberships$, membershipsWithTransport$ } = fromMemberships$(
behavior("a", {
a: [bobMembership],
}),
);
const connectionManagerData$ = epochMeWith$(
memberships$,
behavior("a", {
a: new ConnectionManagerData(),
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
},
],
}),
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: behavior("a", {
a: new ConnectionManagerData(),
}),
transports$: behavior("a", { a: [] }),
connections$: behavior("a", { a: [] }),
},
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
matrixRoom: mockMatrixRoom,
});
expectObservable(matrixLivekitMember$).toBe("a", {
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
return (
data.length == 1 &&
data[0].membership === bobMembership &&
data[0].participant === undefined &&
data[0].connection === undefined
);
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
});
});
});
function aConnectionManager(
data: ConnectionManagerData,
behavior: OurRunHelpers["behavior"],
): IConnectionManager {
return {
connectionManagerData$: behavior("a", { a: data }),
transports$: behavior("a", {
a: data.getConnections().map((connection) => connection.transport),
// Helper to create epoch'ed memberships$ and membershipsWithTransport$ from memberships observable.
function fromMemberships$(m$: Observable<CallMembership[]>): {
memberships$: Observable<Epoch<CallMembership[]>>;
membershipsWithTransport$: Observable<
Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]>
>;
} {
const memberships$ = m$.pipe(trackEpoch());
const membershipsWithTransport$ = memberships$.pipe(
mapEpoch((members) => {
return members.map((m) => {
const tr = m.getTransport(m);
return {
membership: m,
transport:
tr?.type === "livekit" ? (tr as LivekitTransport) : undefined,
};
});
}),
connections$: behavior("a", { a: data.getConnections() }),
);
return {
memberships$,
membershipsWithTransport$,
};
}
test("should signal participant on a connection that is publishing", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const transport: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transport,
);
const connectionWithPublisher = new ConnectionManagerData();
const bobParticipantId = getParticipantId(
bobMembership.userId,
bobMembership.deviceId,
);
const { memberships$, membershipsWithTransport$ } = fromMemberships$(
behavior("a", {
a: [bobMembership],
}),
);
const connection = {
transport: transport,
transport: bobMembership.getTransport(bobMembership),
} as unknown as Connection;
connectionWithPublisher.add(connection, [
const dataWithPublisher = new ConnectionManagerData();
dataWithPublisher.add(connection, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const connectionManagerData$ = epochMeWith$(
memberships$,
behavior("a", {
a: dataWithPublisher,
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport,
},
],
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
matrixRoom: mockMatrixRoom,
});
expectObservable(matrixLivekitMember$).toBe("a", {
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(data[0].connection!.transport, transport),
).toBe(true);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
@@ -168,47 +218,46 @@ test("should signal participant on a connection that is publishing", () => {
test("should signal participant on a connection that is not publishing", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const transport: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transport,
const { memberships$, membershipsWithTransport$ } = fromMemberships$(
behavior("a", {
a: [bobMembership],
}),
);
const connectionWithPublisher = new ConnectionManagerData();
// const bobParticipantId = getParticipantId(bobMembership.userId, bobMembership.deviceId);
const connection = {
transport: transport,
transport: bobMembership.getTransport(bobMembership),
} as unknown as Connection;
connectionWithPublisher.add(connection, []);
const dataWithPublisher = new ConnectionManagerData();
dataWithPublisher.add(connection, []);
const connectionManagerData$ = epochMeWith$(
memberships$,
behavior("a", {
a: dataWithPublisher,
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport,
},
],
}),
connectionManager: aConnectionManager(connectionWithPublisher, behavior),
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
matrixRoom: mockMatrixRoom,
});
expectObservable(matrixLivekitMember$).toBe("a", {
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).not.toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(data[0].connection!.transport, transport),
).toBe(true);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
@@ -218,22 +267,10 @@ test("should signal participant on a connection that is not publishing", () => {
describe("Publication edge case", () => {
test("bob is publishing in several connections", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const transportA: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const transportB: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transportA,
const { memberships$, membershipsWithTransport$ } = fromMemberships$(
behavior("a", {
a: [bobMembership, carlMembership],
}),
);
const connectionWithPublisher = new ConnectionManagerData();
@@ -254,60 +291,57 @@ describe("Publication edge case", () => {
connectionWithPublisher.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const connectionManagerData$ = epochMeWith$(
memberships$,
behavior("a", {
a: connectionWithPublisher,
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport: transportA,
},
],
}),
connectionManager: aConnectionManager(
connectionWithPublisher,
behavior,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
matrixRoom: mockMatrixRoom,
});
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).toBeDefined();
expect(data[0].participant!.identity).toEqual(bobParticipantId);
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(
data[0].connection!.transport,
transportA,
),
).toBe(true);
return true;
}),
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].connection$).toBe("a", {
// The real connection should be from transportA as per the membership
a: connectionA,
});
expectObservable(data[0].participant$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
return true;
}),
},
);
});
});
test("bob is publishing in the wrong connection", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const transportA: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
};
const transportB: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
};
const bobMembership = mockCallMembership(
"@bob:example.org",
"DEV000",
transportA,
const { memberships$, membershipsWithTransport$ } = fromMemberships$(
behavior("a", {
a: [bobMembership, carlMembership],
}),
);
const connectionWithPublisher = new ConnectionManagerData();
@@ -315,86 +349,54 @@ describe("Publication edge case", () => {
bobMembership.userId,
bobMembership.deviceId,
);
const connectionA = {
transport: transportA,
} as unknown as Connection;
const connectionB = {
transport: transportB,
} as unknown as Connection;
const connectionA = { transport: transportA } as unknown as Connection;
const connectionB = { transport: transportB } as unknown as Connection;
// Bob is not publishing on A
connectionWithPublisher.add(connectionA, []);
// Bob is publishing on B but his membership says A
connectionWithPublisher.add(connectionB, [
mockRemoteParticipant({ identity: bobParticipantId }),
]);
const connectionManagerData$ = epochMeWith$(
memberships$,
behavior("a", {
a: connectionWithPublisher,
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: behavior("a", {
a: [
{
membership: bobMembership,
transport: transportA,
},
],
}),
connectionManager: aConnectionManager(
connectionWithPublisher,
behavior,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
),
connectionManager: {
connectionManagerData$: connectionManagerData$,
} as unknown as IConnectionManager,
matrixRoom: mockMatrixRoom,
});
expectObservable(matrixLivekitMember$).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expect(data[0].participant).not.toBeDefined();
expect(data[0].connection).toBeDefined();
expect(data[0].membership).toEqual(bobMembership);
expect(
areLivekitTransportsEqual(
data[0].connection!.transport,
transportA,
),
).toBe(true);
return true;
}),
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].connection$).toBe("a", {
// The real connection should be from transportA as per the membership
a: connectionA,
});
expectObservable(data[0].participant$).toBe("a", {
// No participant as Bob is not publishing on his membership transport
a: null,
});
return true;
}),
},
);
});
// let lastMatrixLkItems: MatrixLivekitMember[] = [];
// matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
// lastMatrixLkItems = items;
// });
// vi.mocked(bobMembership).getTransport = vi
// .fn()
// .mockReturnValue(connectionA.transport);
// fakeMemberships$.next([bobMembership]);
// const lkMap = new ConnectionManagerData();
// lkMap.add(connectionA, []);
// lkMap.add(connectionB, [
// mockRemoteParticipant({ identity: bobParticipantId })
// ]);
// fakeManagerData$.next(lkMap);
// const items = lastMatrixLkItems;
// expect(items).toHaveLength(1);
// const item = items[0];
// // Assert the expected membership
// expect(item.membership.userId).toEqual(bobMembership.userId);
// expect(item.membership.deviceId).toEqual(bobMembership.deviceId);
// expect(item.participant).not.toBeDefined();
// // The transport info should come from the membership transports and not only from the publishing connection
// expect(item.connection?.transport?.livekit_service_url).toEqual(
// bobMembership.transports[0]?.livekit_service_url
// );
// expect(item.connection?.transport?.livekit_alias).toEqual(
// bobMembership.transports[0]?.livekit_alias
// );
});
});