cleanup based on new js-sdk impl

This commit is contained in:
Timo K
2025-12-29 17:38:54 +01:00
parent 7591e2bda1
commit 0f5c5d8be5
18 changed files with 191 additions and 156 deletions

View File

@@ -26,6 +26,7 @@ import fetchMock from "fetch-mock";
import EventEmitter from "events";
import { type IOpenIDToken } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport";
import {
Connection,
@@ -39,7 +40,6 @@ import {
FailToGetOpenIdToken,
} from "../../../utils/errors.ts";
import { mockRemoteParticipant, ownMemberMock } from "../../../utils/test.ts";
import { type LivekitTransportWithVersion } from "./ConnectionManager.ts";
let testScope: ObservableScope;
@@ -50,7 +50,7 @@ let fakeLivekitRoom: MockedObject<LivekitRoom>;
let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>;
const livekitFocus: LivekitTransportWithVersion = {
const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org",
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
type: "livekit",

View File

@@ -35,8 +35,10 @@ import {
} from "../../../utils/errors.ts";
export interface ConnectionOpts {
/** Whether we always try to connect to this connection via the legacy jwt endpoint. (no hash identity) */
forceOldJwtEndpoint?: boolean;
/** The media transport to connect to. */
transport: LivekitTransport & { useMatrix2: boolean };
transport: LivekitTransport;
/** The Matrix client to use for OpenID and SFU config requests. */
client: OpenIDClientParts;
/** The observable scope to use for this connection. */
@@ -89,7 +91,7 @@ export class Connection {
/**
* The media transport to connect to.
*/
public readonly transport: LivekitTransport & { useMatrix2: boolean };
public readonly transport: LivekitTransport;
public readonly livekitRoom: LivekitRoom;
@@ -192,16 +194,14 @@ export class Connection {
this.client,
this.ownMembershipIdentity,
this.transport.livekit_service_url,
this.forceOldJwtEndpoint,
this.transport.livekit_alias,
this.transport.useMatrix2,
// For the remote members we intentionally do not pass a delayEndpointBaseUrl.
undefined,
// and no delayId.
undefined,
this.logger,
);
// client: OpenIDClientParts,
// membership: CallMembershipIdentityParts,
// serviceUrl: string,
// livekitRoomAlias: string,
// matrix2jwt: boolean,
// delayEndpointBaseUrl?: string,
// delayId?: string,
}
/**
@@ -222,7 +222,7 @@ export class Connection {
private readonly client: OpenIDClientParts;
private readonly logger: Logger;
private readonly forceOldJwtEndpoint: boolean;
/**
* Creates a new connection to a matrix RTC LiveKit backend.
*
@@ -235,6 +235,7 @@ export class Connection {
logger: Logger,
private ownMembershipIdentity: CallMembershipIdentityParts,
) {
this.forceOldJwtEndpoint = opts.forceOldJwtEndpoint ?? false;
this.logger = logger.getChild("[Connection]");
this.logger.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,

View File

@@ -15,6 +15,7 @@ import {
import { type Logger } from "matrix-js-sdk/lib/logger";
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport";
import { type ObservableScope } from "../../ObservableScope.ts";
import { Connection } from "./Connection.ts";
@@ -23,15 +24,15 @@ import type { MediaDevices } from "../../MediaDevices.ts";
import type { Behavior } from "../../Behavior.ts";
import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
import { defaultLiveKitOptions } from "../../../livekit/options.ts";
import { type LivekitTransportWithVersion } from "./ConnectionManager.ts";
// TODO evaluate if this should be done like the Publisher Factory
export interface ConnectionFactory {
createConnection(
transport: LivekitTransportWithVersion,
transport: LivekitTransport,
scope: ObservableScope,
logger: Logger,
ownMembershipIdentity: CallMembershipIdentityParts,
forceOldJwtEndpoint?: boolean,
): Connection;
}
@@ -88,10 +89,11 @@ export class ECConnectionFactory implements ConnectionFactory {
* @returns
*/
public createConnection(
transport: LivekitTransportWithVersion,
transport: LivekitTransport,
scope: ObservableScope,
logger: Logger,
ownMembershipIdentity: CallMembershipIdentityParts,
forceOldJwtEndpoint?: boolean,
): Connection {
return new Connection(
{
@@ -99,6 +101,7 @@ export class ECConnectionFactory implements ConnectionFactory {
client: this.client,
scope: scope,
livekitRoomFactory: this.livekitRoomFactory,
forceOldJwtEndpoint,
},
logger,
ownMembershipIdentity,

View File

@@ -14,29 +14,26 @@ import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
import {
createConnectionManager$,
type LivekitTransportWithVersion,
type ConnectionManagerData,
} from "./ConnectionManager.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
import { type Connection } from "./Connection.ts";
import { ownMemberMock, withTestScheduler } from "../../../utils/test.ts";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type Behavior } from "../../Behavior.ts";
import { constant, type Behavior } from "../../Behavior.ts";
// Some test constants
const TRANSPORT_1: LivekitTransportWithVersion = {
const TRANSPORT_1: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
useMatrix2: false,
};
const TRANSPORT_2: LivekitTransportWithVersion = {
const TRANSPORT_2: LivekitTransport = {
type: "livekit",
livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
useMatrix2: false,
};
let fakeConnectionFactory: ConnectionFactory;
@@ -79,7 +76,8 @@ describe("connections$ stream", () => {
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("a", {
localTransport$: constant(null),
remoteTransports$: behavior("a", {
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}),
logger: logger,
@@ -119,7 +117,8 @@ describe("connections$ stream", () => {
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abcdef", {
localTransport$: constant(null),
remoteTransports$: behavior("abcdef", {
a: new Epoch([TRANSPORT_1], 0),
b: new Epoch([TRANSPORT_1], 1),
c: new Epoch([TRANSPORT_1], 2),
@@ -165,7 +164,8 @@ describe("connections$ stream", () => {
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abc", {
localTransport$: constant(null),
remoteTransports$: behavior("abc", {
a: new Epoch([TRANSPORT_1], 0),
b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1),
c: new Epoch([TRANSPORT_1], 2),
@@ -281,7 +281,8 @@ describe("connectionManagerData$ stream", () => {
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("a", {
localTransport$: constant(null),
remoteTransports$: behavior("a", {
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}),
logger,

View File

@@ -7,22 +7,18 @@ Please see LICENSE in the repository root for full details.
*/
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap, tap } from "rxjs";
import { combineLatest, map, of, switchMap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type RemoteParticipant } from "livekit-client";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type Behavior } from "../../Behavior.ts";
import { constant, type Behavior } from "../../Behavior.ts";
import { type Connection } from "./Connection.ts";
import { Epoch, type ObservableScope } from "../../ObservableScope.ts";
import { generateItemsWithEpoch } from "../../../utils/observable.ts";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export type LivekitTransportWithVersion = LivekitTransport & {
useMatrix2: boolean;
};
export class ConnectionManagerData {
private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
new Map();
@@ -64,7 +60,9 @@ export class ConnectionManagerData {
interface Props {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: Behavior<Epoch<LivekitTransportWithVersion[]>>;
localTransport$: Behavior<LivekitTransport | null>;
remoteTransports$: Behavior<Epoch<LivekitTransport[]>>;
forceOldJwtEndpointForLocalTransport$?: Behavior<boolean>;
logger: Logger;
ownMembershipIdentity: CallMembershipIdentityParts;
}
@@ -91,13 +89,29 @@ export interface IConnectionManager {
export function createConnectionManager$({
scope,
connectionFactory,
inputTransports$,
localTransport$,
remoteTransports$,
forceOldJwtEndpointForLocalTransport$ = constant(false),
logger: parentLogger,
ownMembershipIdentity,
}: Props): IConnectionManager {
const logger = parentLogger.getChild("[ConnectionManager]");
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
const allInputTransports$ = combineLatest([
localTransport$,
remoteTransports$,
]).pipe(
map(([localTransport, transports]) => {
const localTransportAsArray = localTransport ? [localTransport] : [];
return transports.mapInner((transports) => [
...localTransportAsArray,
...transports,
]);
}),
map((transports) => transports.mapInner(removeDuplicateTransports)),
);
/**
* All transports currently managed by the ConnectionManager.
*
@@ -106,14 +120,32 @@ export function createConnectionManager$({
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
const transports$ = scope.behavior(
inputTransports$.pipe(
map((transports) => transports.mapInner(removeDuplicateTransports)),
tap(({ value: transports }) => {
logger.trace(
`Managing transports: ${transports.map((t) => t.livekit_service_url).join(", ")}`,
);
}),
const transportsWithJwtTag$ = scope.behavior(
combineLatest([
allInputTransports$,
localTransport$,
forceOldJwtEndpointForLocalTransport$,
]).pipe(
map(
([
transports,
localTransport,
forceOldJwtEndpointForLocalTransport,
]) => {
// nmodify only the local transport with forceOldJwtEndpointForLocalTransport
const index = transports.value.findIndex((t) =>
areLivekitTransportsEqual(localTransport, t),
);
transports.value[index].forceOldJwtEndpoint =
forceOldJwtEndpointForLocalTransport;
logger.trace(
`Managing transports: ${transports.value.map((t) => t.livekit_service_url).join(", ")}`,
);
return transports as Epoch<
(LivekitTransport & { forceOldJwtEndpoint?: boolean })[]
>;
},
),
),
);
@@ -121,7 +153,7 @@ export function createConnectionManager$({
* Connections for each transport in use by one or more session members.
*/
const connections$ = scope.behavior(
transports$.pipe(
transportsWithJwtTag$.pipe(
generateItemsWithEpoch(
function* (transports) {
for (const transport of transports)
@@ -129,23 +161,23 @@ export function createConnectionManager$({
keys: [
transport.livekit_service_url,
transport.livekit_alias,
transport.useMatrix2,
transport.forceOldJwtEndpoint,
],
data: undefined,
};
},
(scope, _data$, serviceUrl, alias, useMatrix2) => {
(scope, _data$, serviceUrl, alias, forceOldJwtEndpoint) => {
logger.debug(`Creating connection to ${serviceUrl} (${alias})`);
const connection = connectionFactory.createConnection(
{
type: "livekit",
livekit_service_url: serviceUrl,
livekit_alias: alias,
useMatrix2,
},
scope,
logger,
ownMembershipIdentity,
forceOldJwtEndpoint,
);
// Start the connection immediately
// Use connection state to track connection progress

View File

@@ -13,11 +13,7 @@ import fetchMock from "fetch-mock";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import {
type Epoch,
ObservableScope,
trackEpoch,
} from "../../ObservableScope.ts";
import { type Epoch, ObservableScope, trackEpoch } from "../../ObservableScope.ts";
import { ECConnectionFactory } from "./ConnectionFactory.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
import {
@@ -34,6 +30,7 @@ import {
} from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
import { constant } from "../../Behavior.ts";
// Test the integration of ConnectionManager and MatrixLivekitMerger
@@ -121,7 +118,8 @@ test("bob, carl, then bob joining no tracks yet", () => {
const connectionManager = createConnectionManager$({
scope: testScope,
connectionFactory: ecConnectionFactory,
inputTransports$: membershipsAndTransports.transports$,
localTransport$: constant(null),
remoteTransports$: membershipsAndTransports.transports$,
logger: logger,
ownMembershipIdentity: ownMemberMock,
});