mirror of
https://github.com/vector-im/element-call.git
synced 2026-03-16 06:17:10 +00:00
Merge branch 'livekit' into toger5/pseudonomous-identities
This commit is contained in:
@@ -32,7 +32,6 @@ import {
|
||||
Connection,
|
||||
ConnectionState,
|
||||
type ConnectionOpts,
|
||||
type PublishingParticipant,
|
||||
} from "./Connection.ts";
|
||||
import { ObservableScope } from "../../ObservableScope.ts";
|
||||
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
|
||||
@@ -40,6 +39,7 @@ import {
|
||||
ElementCallError,
|
||||
FailToGetOpenIdToken,
|
||||
} from "../../../utils/errors.ts";
|
||||
import { mockRemoteParticipant } from "../../../utils/test.ts";
|
||||
|
||||
let testScope: ObservableScope;
|
||||
|
||||
@@ -377,46 +377,32 @@ describe("Start connection states", () => {
|
||||
});
|
||||
});
|
||||
|
||||
function fakeRemoteLivekitParticipant(
|
||||
id: string,
|
||||
publications: number = 1,
|
||||
): RemoteParticipant {
|
||||
return {
|
||||
identity: id,
|
||||
getTrackPublications: () => Array(publications),
|
||||
} as unknown as RemoteParticipant;
|
||||
}
|
||||
|
||||
describe("Publishing participants observations", () => {
|
||||
it("should emit the list of publishing participants", () => {
|
||||
describe("remote participants", () => {
|
||||
it("emits the list of remote participants", () => {
|
||||
setupTest();
|
||||
|
||||
const connection = setupRemoteConnection();
|
||||
|
||||
const bobIsAPublisher = Promise.withResolvers<void>();
|
||||
const danIsAPublisher = Promise.withResolvers<void>();
|
||||
const observedPublishers: PublishingParticipant[][] = [];
|
||||
const s = connection.remoteParticipantsWithTracks$.subscribe(
|
||||
(publishers) => {
|
||||
observedPublishers.push(publishers);
|
||||
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
|
||||
bobIsAPublisher.resolve();
|
||||
}
|
||||
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
|
||||
danIsAPublisher.resolve();
|
||||
}
|
||||
},
|
||||
);
|
||||
const observedParticipants: RemoteParticipant[][] = [];
|
||||
const s = connection.remoteParticipants$.subscribe((participants) => {
|
||||
observedParticipants.push(participants);
|
||||
});
|
||||
onTestFinished(() => s.unsubscribe());
|
||||
// The publishingParticipants$ observable is derived from the current members of the
|
||||
// The remoteParticipants$ observable is derived from the current members of the
|
||||
// livekitRoom and the rtc membership in order to publish the members that are publishing
|
||||
// on this connection.
|
||||
|
||||
let participants: RemoteParticipant[] = [
|
||||
fakeRemoteLivekitParticipant("@alice:example.org:DEV000", 0),
|
||||
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
|
||||
fakeRemoteLivekitParticipant("@carol:example.org:DEV222", 0),
|
||||
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 0),
|
||||
const participants: RemoteParticipant[] = [
|
||||
mockRemoteParticipant({ identity: "@alice:example.org:DEV000" }),
|
||||
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
|
||||
mockRemoteParticipant({ identity: "@carol:example.org:DEV222" }),
|
||||
// Mock Dan to have no published tracks. We want him to still show show up
|
||||
// in the participants list.
|
||||
mockRemoteParticipant({
|
||||
identity: "@dan:example.org:DEV333",
|
||||
getTrackPublication: () => undefined,
|
||||
getTrackPublications: () => [],
|
||||
}),
|
||||
];
|
||||
|
||||
// Let's simulate 3 members on the livekitRoom
|
||||
@@ -428,21 +414,8 @@ describe("Publishing participants observations", () => {
|
||||
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
|
||||
);
|
||||
|
||||
// At this point there should be no publishers
|
||||
expect(observedPublishers.pop()!.length).toEqual(0);
|
||||
|
||||
participants = [
|
||||
fakeRemoteLivekitParticipant("@alice:example.org:DEV000", 1),
|
||||
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1),
|
||||
fakeRemoteLivekitParticipant("@carol:example.org:DEV222", 1),
|
||||
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2),
|
||||
];
|
||||
participants.forEach((p) =>
|
||||
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
|
||||
);
|
||||
|
||||
// At this point there should be no publishers
|
||||
expect(observedPublishers.pop()!.length).toEqual(4);
|
||||
// All remote participants should be present
|
||||
expect(observedParticipants.pop()!.length).toEqual(4);
|
||||
});
|
||||
|
||||
it("should be scoped to parent scope", (): void => {
|
||||
@@ -450,16 +423,14 @@ describe("Publishing participants observations", () => {
|
||||
|
||||
const connection = setupRemoteConnection();
|
||||
|
||||
let observedPublishers: PublishingParticipant[][] = [];
|
||||
const s = connection.remoteParticipantsWithTracks$.subscribe(
|
||||
(publishers) => {
|
||||
observedPublishers.push(publishers);
|
||||
},
|
||||
);
|
||||
let observedParticipants: RemoteParticipant[][] = [];
|
||||
const s = connection.remoteParticipants$.subscribe((participants) => {
|
||||
observedParticipants.push(participants);
|
||||
});
|
||||
onTestFinished(() => s.unsubscribe());
|
||||
|
||||
let participants: RemoteParticipant[] = [
|
||||
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
|
||||
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
|
||||
];
|
||||
|
||||
// Let's simulate 3 members on the livekitRoom
|
||||
@@ -471,35 +442,26 @@ describe("Publishing participants observations", () => {
|
||||
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
|
||||
}
|
||||
|
||||
// At this point there should be no publishers
|
||||
expect(observedPublishers.pop()!.length).toEqual(0);
|
||||
|
||||
participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)];
|
||||
|
||||
for (const participant of participants) {
|
||||
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
|
||||
}
|
||||
|
||||
// We should have bob has a publisher now
|
||||
const publishers = observedPublishers.pop();
|
||||
expect(publishers?.length).toEqual(1);
|
||||
expect(publishers?.[0]?.identity).toEqual("@bob:example.org:DEV111");
|
||||
// We should have bob as a participant now
|
||||
const ps = observedParticipants.pop();
|
||||
expect(ps?.length).toEqual(1);
|
||||
expect(ps?.[0]?.identity).toEqual("@bob:example.org:DEV111");
|
||||
|
||||
// end the parent scope
|
||||
testScope.end();
|
||||
observedPublishers = [];
|
||||
observedParticipants = [];
|
||||
|
||||
// SHOULD NOT emit any more publishers as the scope is ended
|
||||
// SHOULD NOT emit any more participants as the scope is ended
|
||||
participants = participants.filter(
|
||||
(p) => p.identity !== "@bob:example.org:DEV111",
|
||||
);
|
||||
|
||||
fakeLivekitRoom.emit(
|
||||
RoomEvent.ParticipantDisconnected,
|
||||
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
|
||||
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
|
||||
);
|
||||
|
||||
expect(observedPublishers.length).toEqual(0);
|
||||
expect(observedParticipants.length).toEqual(0);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -13,9 +13,7 @@ import {
|
||||
import {
|
||||
ConnectionError,
|
||||
type Room as LivekitRoom,
|
||||
type LocalParticipant,
|
||||
type RemoteParticipant,
|
||||
RoomEvent,
|
||||
} from "livekit-client";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { BehaviorSubject, map } from "rxjs";
|
||||
@@ -35,8 +33,6 @@ import {
|
||||
UnknownCallError,
|
||||
} from "../../../utils/errors.ts";
|
||||
|
||||
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
|
||||
|
||||
export interface ConnectionOpts {
|
||||
/** The media transport to connect to. */
|
||||
transport: LivekitTransport;
|
||||
@@ -99,13 +95,13 @@ export class Connection {
|
||||
private scope: ObservableScope;
|
||||
|
||||
/**
|
||||
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
|
||||
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
|
||||
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
|
||||
* The remote LiveKit participants that are visible on this connection.
|
||||
*
|
||||
* Note that this may include participants that are connected only to
|
||||
* subscribe, or publishers that are otherwise unattested in MatrixRTC state.
|
||||
* It is therefore more low-level than what should be presented to the user.
|
||||
*/
|
||||
public readonly remoteParticipantsWithTracks$: Behavior<
|
||||
PublishingParticipant[]
|
||||
>;
|
||||
public readonly remoteParticipants$: Behavior<RemoteParticipant[]>;
|
||||
|
||||
/**
|
||||
* Whether the connection has been stopped.
|
||||
@@ -236,23 +232,9 @@ export class Connection {
|
||||
this.transport = transport;
|
||||
this.client = client;
|
||||
|
||||
// REMOTE participants with track!!!
|
||||
// this.remoteParticipantsWithTracks$
|
||||
this.remoteParticipantsWithTracks$ = scope.behavior(
|
||||
// only tracks remote participants
|
||||
connectedParticipantsObserver(this.livekitRoom, {
|
||||
additionalRoomEvents: [
|
||||
RoomEvent.TrackPublished,
|
||||
RoomEvent.TrackUnpublished,
|
||||
],
|
||||
}).pipe(
|
||||
map((participants) => {
|
||||
return participants.filter(
|
||||
(participant) => participant.getTrackPublications().length > 0,
|
||||
);
|
||||
}),
|
||||
),
|
||||
[],
|
||||
this.remoteParticipants$ = scope.behavior(
|
||||
// Only tracks remote participants
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
);
|
||||
|
||||
scope.onEnd(() => {
|
||||
|
||||
@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { BehaviorSubject } from "rxjs";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type Participant as LivekitParticipant } from "livekit-client";
|
||||
import { type RemoteParticipant } from "livekit-client";
|
||||
import { logger } from "matrix-js-sdk/lib/logger";
|
||||
|
||||
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
|
||||
@@ -52,7 +52,7 @@ beforeEach(() => {
|
||||
(transport: LivekitTransport, scope: ObservableScope) => {
|
||||
const mockConnection = {
|
||||
transport,
|
||||
remoteParticipantsWithTracks$: new BehaviorSubject([]),
|
||||
remoteParticipants$: new BehaviorSubject([]),
|
||||
} as unknown as Connection;
|
||||
vi.mocked(mockConnection).start = vi.fn();
|
||||
vi.mocked(mockConnection).stop = vi.fn();
|
||||
@@ -200,24 +200,21 @@ describe("connections$ stream", () => {
|
||||
});
|
||||
|
||||
describe("connectionManagerData$ stream", () => {
|
||||
// Used in test to control fake connections' remoteParticipantsWithTracks$ streams
|
||||
let fakePublishingParticipantsStreams: Map<
|
||||
string,
|
||||
Behavior<LivekitParticipant[]>
|
||||
>;
|
||||
// Used in test to control fake connections' remoteParticipants$ streams
|
||||
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
|
||||
|
||||
function keyForTransport(transport: LivekitTransport): string {
|
||||
return `${transport.livekit_service_url}|${transport.livekit_alias}`;
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
fakePublishingParticipantsStreams = new Map();
|
||||
fakeRemoteParticipantsStreams = new Map();
|
||||
|
||||
function getPublishingParticipantsFor(
|
||||
function getRemoteParticipantsFor(
|
||||
transport: LivekitTransport,
|
||||
): Behavior<LivekitParticipant[]> {
|
||||
): Behavior<RemoteParticipant[]> {
|
||||
return (
|
||||
fakePublishingParticipantsStreams.get(keyForTransport(transport)) ??
|
||||
fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
|
||||
new BehaviorSubject([])
|
||||
);
|
||||
}
|
||||
@@ -227,13 +224,12 @@ describe("connectionManagerData$ stream", () => {
|
||||
.fn()
|
||||
.mockImplementation(
|
||||
(transport: LivekitTransport, scope: ObservableScope) => {
|
||||
const fakePublishingParticipants$ = new BehaviorSubject<
|
||||
LivekitParticipant[]
|
||||
const fakeRemoteParticipants$ = new BehaviorSubject<
|
||||
RemoteParticipant[]
|
||||
>([]);
|
||||
const mockConnection = {
|
||||
transport,
|
||||
remoteParticipantsWithTracks$:
|
||||
getPublishingParticipantsFor(transport),
|
||||
remoteParticipants$: getRemoteParticipantsFor(transport),
|
||||
} as unknown as Connection;
|
||||
vi.mocked(mockConnection).start = vi.fn();
|
||||
vi.mocked(mockConnection).stop = vi.fn();
|
||||
@@ -242,36 +238,36 @@ describe("connectionManagerData$ stream", () => {
|
||||
void mockConnection.stop();
|
||||
});
|
||||
|
||||
fakePublishingParticipantsStreams.set(
|
||||
fakeRemoteParticipantsStreams.set(
|
||||
keyForTransport(transport),
|
||||
fakePublishingParticipants$,
|
||||
fakeRemoteParticipants$,
|
||||
);
|
||||
return mockConnection;
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
test("Should report connections with the publishing participants", () => {
|
||||
test("Should report connections with the remote participants", () => {
|
||||
withTestScheduler(({ expectObservable, schedule, behavior }) => {
|
||||
// Setup the fake participants streams behavior
|
||||
// ==============================
|
||||
fakePublishingParticipantsStreams.set(
|
||||
fakeRemoteParticipantsStreams.set(
|
||||
keyForTransport(TRANSPORT_1),
|
||||
behavior("oa-b", {
|
||||
o: [],
|
||||
a: [{ identity: "user1A" } as LivekitParticipant],
|
||||
a: [{ identity: "user1A" } as RemoteParticipant],
|
||||
b: [
|
||||
{ identity: "user1A" } as LivekitParticipant,
|
||||
{ identity: "user1B" } as LivekitParticipant,
|
||||
{ identity: "user1A" } as RemoteParticipant,
|
||||
{ identity: "user1B" } as RemoteParticipant,
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
fakePublishingParticipantsStreams.set(
|
||||
fakeRemoteParticipantsStreams.set(
|
||||
keyForTransport(TRANSPORT_2),
|
||||
behavior("o-a", {
|
||||
o: [],
|
||||
a: [{ identity: "user2A" } as LivekitParticipant],
|
||||
a: [{ identity: "user2A" } as RemoteParticipant],
|
||||
}),
|
||||
);
|
||||
// ==============================
|
||||
|
||||
@@ -9,7 +9,7 @@ Please see LICENSE in the repository root for full details.
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { combineLatest, map, of, switchMap, tap } from "rxjs";
|
||||
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
|
||||
import { type RemoteParticipant } from "livekit-client";
|
||||
|
||||
import { type Behavior } from "../../Behavior.ts";
|
||||
import { type Connection } from "./Connection.ts";
|
||||
@@ -19,17 +19,12 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
|
||||
import { type ConnectionFactory } from "./ConnectionFactory.ts";
|
||||
|
||||
export class ConnectionManagerData {
|
||||
private readonly store: Map<
|
||||
string,
|
||||
[Connection, (LocalParticipant | RemoteParticipant)[]]
|
||||
> = new Map();
|
||||
private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
|
||||
new Map();
|
||||
|
||||
public constructor() {}
|
||||
|
||||
public add(
|
||||
connection: Connection,
|
||||
participants: (LocalParticipant | RemoteParticipant)[],
|
||||
): void {
|
||||
public add(connection: Connection, participants: RemoteParticipant[]): void {
|
||||
const key = this.getKey(connection.transport);
|
||||
const existing = this.store.get(key);
|
||||
if (!existing) {
|
||||
@@ -55,7 +50,7 @@ export class ConnectionManagerData {
|
||||
|
||||
public getParticipantForTransport(
|
||||
transport: LivekitTransport,
|
||||
): (LocalParticipant | RemoteParticipant)[] {
|
||||
): RemoteParticipant[] {
|
||||
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
|
||||
return this.store.get(key)?.[1] ?? [];
|
||||
}
|
||||
@@ -67,10 +62,12 @@ interface Props {
|
||||
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
// TODO - write test for scopes (do we really need to bind scope)
|
||||
export interface IConnectionManager {
|
||||
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Crete a `ConnectionManager`
|
||||
* @param scope the observable scope used by this object.
|
||||
@@ -153,23 +150,24 @@ export function createConnectionManager$({
|
||||
const epoch = connections.epoch;
|
||||
|
||||
// Map the connections to list of {connection, participants}[]
|
||||
const listOfConnectionsWithPublishingParticipants =
|
||||
connections.value.map((connection) => {
|
||||
return connection.remoteParticipantsWithTracks$.pipe(
|
||||
const listOfConnectionsWithRemoteParticipants = connections.value.map(
|
||||
(connection) => {
|
||||
return connection.remoteParticipants$.pipe(
|
||||
map((participants) => ({
|
||||
connection,
|
||||
participants,
|
||||
})),
|
||||
);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// probably not required
|
||||
if (listOfConnectionsWithPublishingParticipants.length === 0) {
|
||||
if (listOfConnectionsWithRemoteParticipants.length === 0) {
|
||||
return of(new Epoch(new ConnectionManagerData(), epoch));
|
||||
}
|
||||
|
||||
// combineLatest the several streams into a single stream with the ConnectionManagerData
|
||||
return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
|
||||
return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
|
||||
map(
|
||||
(lists) =>
|
||||
new Epoch(
|
||||
|
||||
@@ -14,7 +14,7 @@ import { BehaviorSubject, combineLatest, map, type Observable } from "rxjs";
|
||||
|
||||
import { type IConnectionManager } from "./ConnectionManager.ts";
|
||||
import {
|
||||
type MatrixLivekitMember,
|
||||
type RemoteMatrixLivekitMember,
|
||||
createMatrixLivekitMembers$,
|
||||
} from "./MatrixLivekitMembers.ts";
|
||||
import {
|
||||
@@ -102,10 +102,10 @@ test("should signal participant not yet connected to livekit", async () => {
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
(data: MatrixLivekitMember[]) => {
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
expect(data[0].participant$.value).toBe(null);
|
||||
expect(data[0].participant.value$.value).toBe(null);
|
||||
expect(data[0].connection$.value).toBe(null);
|
||||
return true;
|
||||
},
|
||||
@@ -171,10 +171,10 @@ test("should signal participant on a connection that is publishing", async () =>
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
(data: MatrixLivekitMember[]) => {
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
expect(data[0].participant$.value).toSatisfy((participant) => {
|
||||
expect(data[0].participant.value$.value).toSatisfy((participant) => {
|
||||
expect(participant).toBeDefined();
|
||||
expect(participant!.identity).toEqual(bobParticipantId);
|
||||
return true;
|
||||
@@ -210,10 +210,10 @@ test("should signal participant on a connection that is not publishing", async (
|
||||
});
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
(data: MatrixLivekitMember[]) => {
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
expect(data[0].participant$.value).toBe(null);
|
||||
expect(data[0].participant.value$.value).toBe(null);
|
||||
expect(data[0].connection$.value).toBe(connection);
|
||||
return true;
|
||||
},
|
||||
@@ -258,11 +258,11 @@ describe("Publication edge case", () => {
|
||||
});
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
(data: MatrixLivekitMember[]) => {
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(2);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
expect(data[0].connection$.value).toBe(connectionA);
|
||||
expect(data[0].participant$.value).toSatisfy((participant) => {
|
||||
expect(data[0].participant.value$.value).toSatisfy((participant) => {
|
||||
expect(participant).toBeDefined();
|
||||
expect(participant!.identity).toEqual(bobParticipantId);
|
||||
return true;
|
||||
@@ -317,11 +317,11 @@ test("bob is publishing in the wrong connection", async () => {
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
(data: MatrixLivekitMember[]) => {
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(2);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
expect(data[0].connection$.value).toBe(connectionA);
|
||||
expect(data[0].participant$.value).toBe(null);
|
||||
expect(data[0].participant.value$.value).toBe(null);
|
||||
return true;
|
||||
},
|
||||
);
|
||||
|
||||
@@ -5,10 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import {
|
||||
type LocalParticipant as LocalLivekitParticipant,
|
||||
type RemoteParticipant as RemoteLivekitParticipant,
|
||||
} from "livekit-client";
|
||||
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
|
||||
import {
|
||||
type LivekitTransport,
|
||||
type CallMembership,
|
||||
@@ -25,22 +22,44 @@ import { computeLivekitParticipantIdentity$ } from "./LivekitParticipantIdentity
|
||||
|
||||
const logger = rootLogger.getChild("[MatrixLivekitMembers]");
|
||||
|
||||
/**
|
||||
* Represents a Matrix call member and their associated LiveKit participation.
|
||||
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room
|
||||
* or if it has no livekit transport at all.
|
||||
*/
|
||||
export interface MatrixLivekitMember {
|
||||
interface LocalTaggedParticipant {
|
||||
type: "local";
|
||||
value$: Behavior<LocalParticipant | null>;
|
||||
}
|
||||
interface RemoteTaggedParticipant {
|
||||
type: "remote";
|
||||
value$: Behavior<RemoteParticipant | null>;
|
||||
}
|
||||
export type TaggedParticipant =
|
||||
| LocalTaggedParticipant
|
||||
| RemoteTaggedParticipant;
|
||||
|
||||
interface MatrixLivekitMember {
|
||||
membership$: Behavior<CallMembership>;
|
||||
participant$: Behavior<
|
||||
LocalLivekitParticipant | RemoteLivekitParticipant | null
|
||||
>;
|
||||
connection$: Behavior<Connection | null>;
|
||||
// participantId: string; We do not want a participantId here since it will be generated by the jwt
|
||||
// TODO decide if we can also drop the userId. Its in the matrix membership anyways.
|
||||
userId: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the local Matrix call member and their associated LiveKit participation.
|
||||
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
|
||||
* or if it has no livekit transport at all.
|
||||
*/
|
||||
export interface LocalMatrixLivekitMember extends MatrixLivekitMember {
|
||||
participant: LocalTaggedParticipant;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a remote Matrix call member and their associated LiveKit participation.
|
||||
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
|
||||
* or if it has no livekit transport at all.
|
||||
*/
|
||||
export interface RemoteMatrixLivekitMember extends MatrixLivekitMember {
|
||||
participant: RemoteTaggedParticipant;
|
||||
}
|
||||
|
||||
interface Props {
|
||||
scope: ObservableScope;
|
||||
membershipsWithTransport$: Behavior<
|
||||
@@ -62,7 +81,7 @@ export function createMatrixLivekitMembers$({
|
||||
scope,
|
||||
membershipsWithTransport$,
|
||||
connectionManager,
|
||||
}: Props): Behavior<Epoch<MatrixLivekitMember[]>> {
|
||||
}: Props): Behavior<Epoch<RemoteMatrixLivekitMember[]>> {
|
||||
/**
|
||||
* This internal observable is used to compute the async sha256 hash of the user's identity.
|
||||
* a promise is treated like an observable. So we can switchMap on the promise from the identity computation.
|
||||
@@ -138,12 +157,14 @@ export function createMatrixLivekitMembers$({
|
||||
logger.debug(
|
||||
`Generating member for livekitIdentity: ${identity}, userId:deviceId: ${userId}${deviceId}`,
|
||||
);
|
||||
const { participant$, ...rest } = scope.splitBehavior(data$);
|
||||
// will only get called once per `participantId, userId` pair.
|
||||
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.
|
||||
return {
|
||||
identity,
|
||||
userId,
|
||||
...scope.splitBehavior(data$),
|
||||
participant: { type: "remote" as const, value$: participant$ },
|
||||
...rest,
|
||||
};
|
||||
},
|
||||
),
|
||||
|
||||
@@ -30,7 +30,7 @@ import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"
|
||||
import {
|
||||
areLivekitTransportsEqual,
|
||||
createMatrixLivekitMembers$,
|
||||
type MatrixLivekitMember,
|
||||
type RemoteMatrixLivekitMember,
|
||||
} from "./MatrixLivekitMembers.ts";
|
||||
import { createConnectionManager$ } from "./ConnectionManager.ts";
|
||||
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
|
||||
@@ -138,7 +138,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
});
|
||||
|
||||
expectObservable(matrixLivekitItems$).toBe(vMarble, {
|
||||
a: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
|
||||
a: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
|
||||
const items = e.value;
|
||||
expect(items.length).toBe(1);
|
||||
const item = items[0]!;
|
||||
@@ -153,12 +153,12 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
),
|
||||
),
|
||||
});
|
||||
expectObservable(item.participant$).toBe("a", {
|
||||
expectObservable(item.participant.value$).toBe("a", {
|
||||
a: null,
|
||||
});
|
||||
return true;
|
||||
}),
|
||||
b: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
|
||||
b: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
|
||||
const items = e.value;
|
||||
expect(items.length).toBe(2);
|
||||
|
||||
@@ -167,7 +167,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
expectObservable(item.membership$).toBe("a", {
|
||||
a: bobMembership,
|
||||
});
|
||||
expectObservable(item.participant$).toBe("a", {
|
||||
expectObservable(item.participant.value$).toBe("a", {
|
||||
a: null,
|
||||
});
|
||||
}
|
||||
@@ -178,7 +178,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
expectObservable(item.membership$).toBe("a", {
|
||||
a: carlMembership,
|
||||
});
|
||||
expectObservable(item.participant$).toBe("a", {
|
||||
expectObservable(item.participant.value$).toBe("a", {
|
||||
a: null,
|
||||
});
|
||||
expectObservable(item.connection$).toBe("a", {
|
||||
@@ -195,7 +195,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
}
|
||||
return true;
|
||||
}),
|
||||
c: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
|
||||
c: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
|
||||
const items = e.value;
|
||||
expect(items.length).toBe(3);
|
||||
|
||||
@@ -222,7 +222,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
return true;
|
||||
}),
|
||||
});
|
||||
expectObservable(item.participant$).toBe("a", {
|
||||
expectObservable(item.participant.value$).toBe("a", {
|
||||
a: null,
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user