Merge branch 'livekit' into toger5/lib-ec-version

This commit is contained in:
Timo K
2025-12-22 12:43:09 +01:00
39 changed files with 1978 additions and 1496 deletions

View File

@@ -30,13 +30,16 @@ import { logger } from "matrix-js-sdk/lib/logger";
import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import {
Connection,
ConnectionState,
type ConnectionOpts,
type ConnectionState,
type PublishingParticipant,
} from "./Connection.ts";
import { ObservableScope } from "../../ObservableScope.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../../../utils/errors.ts";
import {
ElementCallError,
FailToGetOpenIdToken,
} from "../../../utils/errors.ts";
import { mockRemoteParticipant } from "../../../utils/test.ts";
let testScope: ObservableScope;
@@ -47,11 +50,6 @@ let fakeLivekitRoom: MockedObject<LivekitRoom>;
let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>;
let fakeRoomEventEmiter: EventEmitter;
// let fakeMembershipsFocusMap$: BehaviorSubject<
// { membership: CallMembership; transport: LivekitTransport }[]
// >;
const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org",
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
@@ -88,22 +86,25 @@ function setupTest(): void {
localParticipantEventEmiter,
),
} as unknown as LocalParticipant);
fakeRoomEventEmiter = new EventEmitter();
const fakeRoomEventEmitter = new EventEmitter();
fakeLivekitRoom = vi.mocked<LivekitRoom>({
connect: vi.fn(),
disconnect: vi.fn(),
remoteParticipants: new Map(),
localParticipant: fakeLocalParticipant,
state: LivekitConnectionState.Disconnected,
on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter),
off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter),
addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter),
on: fakeRoomEventEmitter.on.bind(fakeRoomEventEmitter),
off: fakeRoomEventEmitter.off.bind(fakeRoomEventEmitter),
addListener: fakeRoomEventEmitter.addListener.bind(fakeRoomEventEmitter),
removeListener:
fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter),
fakeRoomEventEmitter.removeListener.bind(fakeRoomEventEmitter),
removeAllListeners:
fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter),
fakeRoomEventEmitter.removeAllListeners.bind(fakeRoomEventEmitter),
setE2EEEnabled: vi.fn().mockResolvedValue(undefined),
emit: (eventName: string | symbol, ...args: unknown[]) => {
fakeRoomEventEmitter.emit(eventName, ...args);
},
} as unknown as LivekitRoom);
}
@@ -125,7 +126,16 @@ function setupRemoteConnection(): Connection {
};
});
fakeLivekitRoom.connect.mockResolvedValue(undefined);
fakeLivekitRoom.connect.mockImplementation(async (): Promise<void> => {
const changeEv = RoomEvent.ConnectionStateChanged;
fakeLivekitRoom.state = LivekitConnectionState.Connecting;
fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state);
fakeLivekitRoom.state = LivekitConnectionState.Connected;
fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state);
return Promise.resolve();
});
return new Connection(opts, logger);
}
@@ -148,7 +158,7 @@ describe("Start connection states", () => {
};
const connection = new Connection(opts, logger);
expect(connection.state$.getValue().state).toEqual("Initialized");
expect(connection.state$.getValue()).toEqual("Initialized");
});
it("fail to getOpenId token then error state", async () => {
@@ -164,7 +174,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
@@ -184,22 +194,20 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
expect(capturedState!.state).toEqual("FetchingConfig");
expect(capturedState!).toEqual("FetchingConfig");
deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token")));
await vi.runAllTimersAsync();
capturedState = capturedStates.pop();
if (capturedState!.state === "FailedToStart") {
expect(capturedState!.error.message).toEqual("Something went wrong");
if (capturedState instanceof Error) {
expect(capturedState.message).toEqual("Something went wrong");
expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
expect.fail(
"Expected FailedToStart state but got " + capturedState?.state,
);
expect.fail("Expected FailedToStart state but got " + capturedState);
}
});
@@ -216,7 +224,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
@@ -238,24 +246,25 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig");
expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve();
await vi.runAllTimersAsync();
capturedState = capturedStates.pop();
if (capturedState?.state === "FailedToStart") {
expect(capturedState?.error.message).toContain(
if (
capturedState instanceof ElementCallError &&
capturedState.cause instanceof Error
) {
expect(capturedState.cause.message).toContain(
"SFU Config fetch failed with exception Error",
);
expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
expect.fail(
"Expected FailedToStart state but got " + capturedState?.state,
);
expect.fail("Expected FailedToStart state but got " + capturedState);
}
});
@@ -272,7 +281,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = [];
const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
@@ -302,15 +311,18 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig");
expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve();
await vi.runAllTimersAsync();
capturedState = capturedStates.pop();
if (capturedState && capturedState?.state === "FailedToStart") {
expect(capturedState.error.message).toContain(
if (
capturedState instanceof ElementCallError &&
capturedState.cause instanceof Error
) {
expect(capturedState.cause.message).toContain(
"Failed to connect to livekit",
);
expect(connection.transport.livekit_alias).toEqual(
@@ -329,7 +341,7 @@ describe("Start connection states", () => {
const connection = setupRemoteConnection();
const capturedStates: ConnectionState[] = [];
const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
@@ -339,13 +351,15 @@ describe("Start connection states", () => {
await vi.runAllTimersAsync();
const initialState = capturedStates.shift();
expect(initialState?.state).toEqual("Initialized");
expect(initialState).toEqual(ConnectionState.Initialized);
const fetchingState = capturedStates.shift();
expect(fetchingState?.state).toEqual("FetchingConfig");
expect(fetchingState).toEqual(ConnectionState.FetchingConfig);
const disconnectedState = capturedStates.shift();
expect(disconnectedState).toEqual(ConnectionState.LivekitDisconnected);
const connectingState = capturedStates.shift();
expect(connectingState?.state).toEqual("ConnectingToLkRoom");
expect(connectingState).toEqual(ConnectionState.LivekitConnecting);
const connectedState = capturedStates.shift();
expect(connectedState?.state).toEqual("ConnectedToLkRoom");
expect(connectedState).toEqual(ConnectionState.LivekitConnected);
});
it("shutting down the scope should stop the connection", async () => {
@@ -363,44 +377,32 @@ describe("Start connection states", () => {
});
});
function fakeRemoteLivekitParticipant(
id: string,
publications: number = 1,
): RemoteParticipant {
return {
identity: id,
getTrackPublications: () => Array(publications),
} as unknown as RemoteParticipant;
}
describe("Publishing participants observations", () => {
it("should emit the list of publishing participants", () => {
describe("remote participants", () => {
it("emits the list of remote participants", () => {
setupTest();
const connection = setupRemoteConnection();
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
const observedParticipants: RemoteParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((participants) => {
observedParticipants.push(participants);
});
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the
// The remoteParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection.
let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@alice:example.org:DEV000", 0),
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
fakeRemoteLivekitParticipant("@carol:example.org:DEV222", 0),
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 0),
const participants: RemoteParticipant[] = [
mockRemoteParticipant({ identity: "@alice:example.org:DEV000" }),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
mockRemoteParticipant({ identity: "@carol:example.org:DEV222" }),
// Mock Dan to have no published tracks. We want him to still show show up
// in the participants list.
mockRemoteParticipant({
identity: "@dan:example.org:DEV333",
getTrackPublication: () => undefined,
getTrackPublications: () => [],
}),
];
// Let's simulate 3 members on the livekitRoom
@@ -409,9 +411,10 @@ describe("Publishing participants observations", () => {
);
participants.forEach((p) =>
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p),
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
);
<<<<<<< HEAD
// At this point there should be ~~no~~ publishers
// We do have publisher now, since we do not filter for publishers anymore (to also have participants with only data tracks)
// The filtering we do is just based on the matrixRTC member events.
@@ -429,6 +432,10 @@ describe("Publishing participants observations", () => {
// At this point there should be no publishers
expect(observedPublishers.pop()!.length).toEqual(4);
=======
// All remote participants should be present
expect(observedParticipants.pop()!.length).toEqual(4);
>>>>>>> livekit
});
it("should be scoped to parent scope", (): void => {
@@ -436,14 +443,20 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
<<<<<<< HEAD
let observedPublishers: PublishingParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
=======
let observedParticipants: RemoteParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((participants) => {
observedParticipants.push(participants);
>>>>>>> livekit
});
onTestFinished(() => s.unsubscribe());
let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
];
// Let's simulate 3 members on the livekitRoom
@@ -452,9 +465,10 @@ describe("Publishing participants observations", () => {
);
for (const participant of participants) {
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant);
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
}
<<<<<<< HEAD
// At this point there should be ~~no~~ publishers
// We do have publisher now, since we do not filter for publishers anymore (to also have participants with only data tracks)
// The filtering we do is just based on the matrixRTC member events.
@@ -470,22 +484,28 @@ describe("Publishing participants observations", () => {
const publishers = observedPublishers.pop();
expect(publishers?.length).toEqual(1);
expect(publishers?.[0]?.identity).toEqual("@bob:example.org:DEV111");
=======
// We should have bob as a participant now
const ps = observedParticipants.pop();
expect(ps?.length).toEqual(1);
expect(ps?.[0]?.identity).toEqual("@bob:example.org:DEV111");
>>>>>>> livekit
// end the parent scope
testScope.end();
observedPublishers = [];
observedParticipants = [];
// SHOULD NOT emit any more publishers as the scope is ended
// SHOULD NOT emit any more participants as the scope is ended
participants = participants.filter(
(p) => p.identity !== "@bob:example.org:DEV111",
);
fakeRoomEventEmiter.emit(
fakeLivekitRoom.emit(
RoomEvent.ParticipantDisconnected,
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
);
expect(observedPublishers.length).toEqual(0);
expect(observedParticipants.length).toEqual(0);
});
});

View File

@@ -12,11 +12,8 @@ import {
} from "@livekit/components-core";
import {
ConnectionError,
type ConnectionState as LivekitConenctionState,
type Room as LivekitRoom,
type LocalParticipant,
type RemoteParticipant,
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject } from "rxjs";
@@ -30,12 +27,12 @@ import {
import { type Behavior } from "../../Behavior.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import {
ElementCallError,
InsufficientCapacityError,
SFURoomCreationRestrictedError,
UnknownCallError,
} from "../../../utils/errors.ts";
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
export interface ConnectionOpts {
/** The media transport to connect to. */
transport: LivekitTransport;
@@ -47,17 +44,30 @@ export interface ConnectionOpts {
/** Optional factory to create the LiveKit room, mainly for testing purposes. */
livekitRoomFactory: () => LivekitRoom;
}
export class FailedToStartError extends Error {
public constructor(message: string) {
super(message);
this.name = "FailedToStartError";
}
}
export type ConnectionState =
| { state: "Initialized" }
| { state: "FetchingConfig" }
| { state: "ConnectingToLkRoom" }
| {
state: "ConnectedToLkRoom";
livekitConnectionState$: Behavior<LivekitConenctionState>;
}
| { state: "FailedToStart"; error: Error }
| { state: "Stopped" };
export enum ConnectionState {
/** The start state of a connection. It has been created but nothing has loaded yet. */
Initialized = "Initialized",
/** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */
FetchingConfig = "FetchingConfig",
Stopped = "Stopped",
/** The same as ConnectionState.Disconnected from `livekit-client` */
LivekitDisconnected = "disconnected",
/** The same as ConnectionState.Connecting from `livekit-client` */
LivekitConnecting = "connecting",
/** The same as ConnectionState.Connected from `livekit-client` */
LivekitConnected = "connected",
/** The same as ConnectionState.Reconnecting from `livekit-client` */
LivekitReconnecting = "reconnecting",
/** The same as ConnectionState.SignalReconnecting from `livekit-client` */
LivekitSignalReconnecting = "signalReconnecting",
}
/**
* A connection to a Matrix RTC LiveKit backend.
@@ -66,14 +76,14 @@ export type ConnectionState =
*/
export class Connection {
// Private Behavior
private readonly _state$ = new BehaviorSubject<ConnectionState>({
state: "Initialized",
});
private readonly _state$ = new BehaviorSubject<
ConnectionState | ElementCallError
>(ConnectionState.Initialized);
/**
* The current state of the connection to the media transport.
*/
public readonly state$: Behavior<ConnectionState> = this._state$;
public readonly state$: Behavior<ConnectionState | Error> = this._state$;
/**
* The media transport to connect to.
@@ -85,11 +95,13 @@ export class Connection {
private scope: ObservableScope;
/**
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
* The remote LiveKit participants that are visible on this connection.
*
* Note that this may include participants that are connected only to
* subscribe, or publishers that are otherwise unattested in MatrixRTC state.
* It is therefore more low-level than what should be presented to the user.
*/
public readonly remoteParticipants$: Behavior<PublishingParticipant[]>;
public readonly remoteParticipants$: Behavior<RemoteParticipant[]>;
/**
* Whether the connection has been stopped.
@@ -115,16 +127,24 @@ export class Connection {
this.logger.debug("Starting Connection");
this.stopped = false;
try {
this._state$.next({
state: "FetchingConfig",
});
this._state$.next(ConnectionState.FetchingConfig);
// We should already have this information after creating the localTransport.
// It would probably be better to forward this here.
const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return;
this._state$.next({
state: "ConnectingToLkRoom",
});
// Setup observer once we are done with getSFUConfigWithOpenID
connectionStateObserver(this.livekitRoom)
.pipe(
this.scope.bind(),
map((s) => s as unknown as ConnectionState),
)
.subscribe((lkState) => {
// It is save to cast lkState to ConnectionState as they are fully overlapping.
this._state$.next(lkState);
});
try {
await this.livekitRoom.connect(url, jwt);
} catch (e) {
@@ -139,7 +159,8 @@ export class Connection {
throw new InsufficientCapacityError();
}
if (e.status === 404) {
// error msg is "Could not establish signal connection: requested room does not exist"
// error msg is "Failed to create call"
// error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists."
// The room does not exist. There are two different modes of operation for the SFU:
// - the room is created on the fly when connecting (livekit `auto_create` option)
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
@@ -151,23 +172,16 @@ export class Connection {
}
// If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return;
this._state$.next({
state: "ConnectedToLkRoom",
livekitConnectionState$: this.scope.behavior(
connectionStateObserver(this.livekitRoom),
),
});
this.logger.info(
"Connected to LiveKit room",
this.transport.livekit_service_url,
);
} catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
});
this._state$.next(
error instanceof ElementCallError
? error
: error instanceof Error
? new UnknownCallError(error)
: new UnknownCallError(new Error(`${error}`)),
);
// Its okay to ignore the throw. The error is part of the state.
throw error;
}
}
@@ -192,9 +206,7 @@ export class Connection {
);
if (this.stopped) return;
await this.livekitRoom.disconnect();
this._state$.next({
state: "Stopped",
});
this._state$.next(ConnectionState.Stopped);
this.stopped = true;
}
@@ -220,24 +232,9 @@ export class Connection {
this.transport = transport;
this.client = client;
// REMOTE participants with track!!!
// this.remoteParticipants$
this.remoteParticipants$ = scope.behavior(
// only tracks remote participants
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
RoomEvent.TrackPublished,
RoomEvent.TrackUnpublished,
],
}),
// .pipe(
// map((participants) => {
// return participants.filter(
// (participant) => participant.getTrackPublications().length > 0,
// );
// }),
// )
[],
// Only tracks remote participants
connectedParticipantsObserver(this.livekitRoom),
);
scope.onEnd(() => {

View File

@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { type RemoteParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
@@ -201,23 +201,20 @@ describe("connections$ stream", () => {
describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' remoteParticipants$ streams
let fakePublishingParticipantsStreams: Map<
string,
Behavior<LivekitParticipant[]>
>;
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
function keyForTransport(transport: LivekitTransport): string {
return `${transport.livekit_service_url}|${transport.livekit_alias}`;
}
beforeEach(() => {
fakePublishingParticipantsStreams = new Map();
fakeRemoteParticipantsStreams = new Map();
function getPublishingParticipantsFor(
function getRemoteParticipantsFor(
transport: LivekitTransport,
): Behavior<LivekitParticipant[]> {
): Behavior<RemoteParticipant[]> {
return (
fakePublishingParticipantsStreams.get(keyForTransport(transport)) ??
fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
new BehaviorSubject([])
);
}
@@ -227,12 +224,12 @@ describe("connectionManagerData$ stream", () => {
.fn()
.mockImplementation(
(transport: LivekitTransport, scope: ObservableScope) => {
const fakePublishingParticipants$ = new BehaviorSubject<
LivekitParticipant[]
const fakeRemoteParticipants$ = new BehaviorSubject<
RemoteParticipant[]
>([]);
const mockConnection = {
transport,
remoteParticipants$: getPublishingParticipantsFor(transport),
remoteParticipants$: getRemoteParticipantsFor(transport),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn();
@@ -241,36 +238,36 @@ describe("connectionManagerData$ stream", () => {
void mockConnection.stop();
});
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(transport),
fakePublishingParticipants$,
fakeRemoteParticipants$,
);
return mockConnection;
},
);
});
test("Should report connections with the publishing participants", () => {
test("Should report connections with the remote participants", () => {
withTestScheduler(({ expectObservable, schedule, behavior }) => {
// Setup the fake participants streams behavior
// ==============================
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_1),
behavior("oa-b", {
o: [],
a: [{ identity: "user1A" } as LivekitParticipant],
a: [{ identity: "user1A" } as RemoteParticipant],
b: [
{ identity: "user1A" } as LivekitParticipant,
{ identity: "user1B" } as LivekitParticipant,
{ identity: "user1A" } as RemoteParticipant,
{ identity: "user1B" } as RemoteParticipant,
],
}),
);
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_2),
behavior("o-a", {
o: [],
a: [{ identity: "user2A" } as LivekitParticipant],
a: [{ identity: "user2A" } as RemoteParticipant],
}),
);
// ==============================

View File

@@ -6,13 +6,10 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
type LivekitTransport,
type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../../Behavior.ts";
import { type Connection } from "./Connection.ts";
@@ -22,20 +19,12 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<
string,
{
connection: Connection;
participants: (LocalParticipant | RemoteParticipant)[];
}
> = new Map();
private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
new Map();
public constructor() {}
public add(
connection: Connection,
participants: (LocalParticipant | RemoteParticipant)[],
): void {
public add(connection: Connection, participants: RemoteParticipant[]): void {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
@@ -61,7 +50,7 @@ export class ConnectionManagerData {
public getParticipantsForTransport(
transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] {
): RemoteParticipant[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key);
if (existing) {
@@ -69,39 +58,20 @@ export class ConnectionManagerData {
}
return [];
}
/**
* Get all connections where the given participant is publishing.
* In theory, there could be several connections where the same participant is publishing but with
* only well behaving clients a participant should only be publishing on a single connection.
* @param participantId
*/
public getConnectionsForParticipant(
participantId: ParticipantId,
): Connection[] {
const connections: Connection[] = [];
for (const { connection, participants } of this.store.values()) {
if (
participants.some(
(participant) => participant?.identity === participantId,
)
) {
connections.push(connection);
}
}
return connections;
}
}
interface Props {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
logger: Logger;
}
// TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager {
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
}
/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
@@ -184,7 +154,7 @@ export function createConnectionManager$({
const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithParticipants = connections.value.map(
const listOfConnectionsWithRemoteParticipants = connections.value.map(
(connection) => {
return connection.remoteParticipants$.pipe(
map((participants) => ({
@@ -196,12 +166,16 @@ export function createConnectionManager$({
);
// probably not required
<<<<<<< HEAD
if (listOfConnectionsWithParticipants.length === 0) {
=======
if (listOfConnectionsWithRemoteParticipants.length === 0) {
>>>>>>> livekit
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithParticipants).pipe(
return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
map(
(lists) =>
new Epoch(

View File

@@ -15,7 +15,7 @@ import { combineLatest, map, type Observable } from "rxjs";
import { type IConnectionManager } from "./ConnectionManager.ts";
import {
type MatrixLivekitMember,
type RemoteMatrixLivekitMember,
createMatrixLivekitMembers$,
} from "./MatrixLivekitMembers.ts";
import {
@@ -99,24 +99,21 @@ test("should signal participant not yet connected to livekit", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
},
);
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
});
});
});
@@ -182,28 +179,25 @@ test("should signal participant on a connection that is publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
});
});
@@ -236,24 +230,21 @@ test("should signal participant on a connection that is not publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
});
});
@@ -305,7 +296,7 @@ describe("Publication edge case", () => {
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
@@ -314,7 +305,7 @@ describe("Publication edge case", () => {
// The real connection should be from transportA as per the membership
a: connectionA,
});
expectObservable(data[0].participant$).toBe("a", {
expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
@@ -371,7 +362,7 @@ describe("Publication edge case", () => {
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
@@ -380,7 +371,7 @@ describe("Publication edge case", () => {
// The real connection should be from transportA as per the membership
a: connectionA,
});
expectObservable(data[0].participant$).toBe("a", {
expectObservable(data[0].participant.value$).toBe("a", {
// No participant as Bob is not publishing on his membership transport
a: null,
});

View File

@@ -5,10 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
type LocalParticipant as LocalLivekitParticipant,
type RemoteParticipant as RemoteLivekitParticipant,
} from "livekit-client";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import {
type LivekitTransport,
type CallMembership,
@@ -24,22 +21,44 @@ import { generateItemsWithEpoch } from "../../../utils/observable";
const logger = rootLogger.getChild("[MatrixLivekitMembers]");
/**
* Represents a Matrix call member and their associated LiveKit participation.
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitMember {
interface LocalTaggedParticipant {
type: "local";
value$: Behavior<LocalParticipant | null>;
}
interface RemoteTaggedParticipant {
type: "remote";
value$: Behavior<RemoteParticipant | null>;
}
export type TaggedParticipant =
| LocalTaggedParticipant
| RemoteTaggedParticipant;
interface MatrixLivekitMember {
membership$: Behavior<CallMembership>;
participant$: Behavior<
LocalLivekitParticipant | RemoteLivekitParticipant | null
>;
connection$: Behavior<Connection | null>;
// participantId: string; We do not want a participantId here since it will be generated by the jwt
// TODO decide if we can also drop the userId. Its in the matrix membership anyways.
userId: string;
}
/**
* Represents the local Matrix call member and their associated LiveKit participation.
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface LocalMatrixLivekitMember extends MatrixLivekitMember {
participant: LocalTaggedParticipant;
}
/**
* Represents a remote Matrix call member and their associated LiveKit participation.
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface RemoteMatrixLivekitMember extends MatrixLivekitMember {
participant: RemoteTaggedParticipant;
}
interface Props {
scope: ObservableScope;
membershipsWithTransport$: Behavior<
@@ -61,7 +80,7 @@ export function createMatrixLivekitMembers$({
scope,
membershipsWithTransport$,
connectionManager,
}: Props): { matrixLivekitMembers$: Behavior<Epoch<MatrixLivekitMember[]>> } {
}: Props): Behavior<Epoch<RemoteMatrixLivekitMember[]>> {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
@@ -110,12 +129,14 @@ export function createMatrixLivekitMembers$({
logger.debug(
`Generating member for participantId: ${participantId}, userId: ${userId}`,
);
const { participant$, ...rest } = scope.splitBehavior(data$);
// will only get called once per `participantId, userId` pair.
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.
return {
participantId,
userId,
...scope.splitBehavior(data$),
participant: { type: "remote" as const, value$: participant$ },
...rest,
};
},
),

View File

@@ -29,7 +29,7 @@ import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"
import {
areLivekitTransportsEqual,
createMatrixLivekitMembers$,
type MatrixLivekitMember,
type RemoteMatrixLivekitMember,
} from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
@@ -131,8 +131,8 @@ test("bob, carl, then bob joining no tracks yet", () => {
connectionManager,
});
expectObservable(matrixLivekitMembers$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
expectObservable(matrixLivekitItems$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(1);
const item = items[0]!;
@@ -147,12 +147,12 @@ test("bob, carl, then bob joining no tracks yet", () => {
),
),
});
expectObservable(item.participant$).toBe("a", {
expectObservable(item.participant.value$).toBe("a", {
a: null,
});
return true;
}),
b: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
b: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(2);
@@ -161,7 +161,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expectObservable(item.membership$).toBe("a", {
a: bobMembership,
});
expectObservable(item.participant$).toBe("a", {
expectObservable(item.participant.value$).toBe("a", {
a: null,
});
}
@@ -172,7 +172,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expectObservable(item.membership$).toBe("a", {
a: carlMembership,
});
expectObservable(item.participant$).toBe("a", {
expectObservable(item.participant.value$).toBe("a", {
a: null,
});
expectObservable(item.connection$).toBe("a", {
@@ -189,7 +189,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
}
return true;
}),
c: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
c: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(3);
@@ -216,7 +216,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
return true;
}),
});
expectObservable(item.participant$).toBe("a", {
expectObservable(item.participant.value$).toBe("a", {
a: null,
});
}