mirror of
https://github.com/vector-im/element-call.git
synced 2026-03-13 06:07:04 +00:00
Merge pull request #3718 from element-hq/toger5/backport-fix_remove_livekit_alias_signaling
[backport] The advertised livekit_alias in membership is deprecated #3706
This commit is contained in:
@@ -78,8 +78,8 @@ export type OpenIDClientParts = Pick<
|
||||
* @param membership Our own membership identity parts used to send to jwt service.
|
||||
* @param serviceUrl The URL of the livekit SFU service
|
||||
* @param roomId The room id used in the jwt request. This is NOT the livekit_alias. The jwt service will provide the alias. It maps matrix room ids <-> Livekit aliases.
|
||||
* @param opts Additional options to modify which endpoint with which data will be used to aquire the jwt token.
|
||||
* @param opts.forceJwtEndpoint This will use the old jwt endpoint which will create the rtc backend identity based on string concatination
|
||||
* @param opts Additional options to modify which endpoint with which data will be used to acquire the jwt token.
|
||||
* @param opts.forceJwtEndpoint This will use the old jwt endpoint which will create the rtc backend identity based on string concatenation
|
||||
* instead of a hash.
|
||||
* This function by default uses whatever is possible with the current jwt service installed next to the SFU.
|
||||
* For remote connections this does not matter, since we will not publish there we can rely on the newest option.
|
||||
|
||||
@@ -42,7 +42,7 @@ import {
|
||||
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
|
||||
import {
|
||||
MembershipManagerEvent,
|
||||
type LivekitTransport,
|
||||
type LivekitTransportConfig,
|
||||
type MatrixRTCSession,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type IWidgetApiRequest } from "matrix-widget-api";
|
||||
@@ -103,7 +103,7 @@ import {
|
||||
type SpotlightPortraitLayoutMedia,
|
||||
} from "../layout-types.ts";
|
||||
import { ElementCallError, UnknownCallError } from "../../utils/errors.ts";
|
||||
import { type ObservableScope } from "../ObservableScope.ts";
|
||||
import { type Epoch, type ObservableScope } from "../ObservableScope.ts";
|
||||
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
|
||||
import {
|
||||
createLocalMembership$,
|
||||
@@ -468,6 +468,7 @@ export function createCallViewModel$(
|
||||
|
||||
const connectionFactory = new ECConnectionFactory(
|
||||
client,
|
||||
matrixRoom.roomId,
|
||||
mediaDevices,
|
||||
trackProcessorState$,
|
||||
livekitKeyProvider,
|
||||
@@ -496,12 +497,13 @@ export function createCallViewModel$(
|
||||
ownMembershipIdentity,
|
||||
});
|
||||
|
||||
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
|
||||
scope: scope,
|
||||
membershipsWithTransport$:
|
||||
membershipsAndTransports.membershipsWithTransport$,
|
||||
connectionManager: connectionManager,
|
||||
});
|
||||
const matrixLivekitMembers$: Behavior<Epoch<RemoteMatrixLivekitMember[]>> =
|
||||
createMatrixLivekitMembers$({
|
||||
scope: scope,
|
||||
membershipsWithTransport$:
|
||||
membershipsAndTransports.membershipsWithTransport$,
|
||||
connectionManager: connectionManager,
|
||||
});
|
||||
|
||||
const connectOptions$ = scope.behavior(
|
||||
matrixRTCMode$.pipe(
|
||||
@@ -521,7 +523,7 @@ export function createCallViewModel$(
|
||||
matrixRTCSession,
|
||||
),
|
||||
muteStates: muteStates,
|
||||
joinMatrixRTC: (transport: LivekitTransport) => {
|
||||
joinMatrixRTC: (transport: LivekitTransportConfig) => {
|
||||
return enterRTCSession(
|
||||
matrixRTCSession,
|
||||
ownMembershipIdentity,
|
||||
|
||||
@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import {
|
||||
Status as RTCMemberStatus,
|
||||
type LivekitTransport,
|
||||
type LivekitTransportConfig,
|
||||
type MatrixRTCSession,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
@@ -281,7 +281,7 @@ describe("LocalMembership", () => {
|
||||
const aTransport = {
|
||||
transport: {
|
||||
livekit_service_url: "a",
|
||||
} as LivekitTransport,
|
||||
} as LivekitTransportConfig,
|
||||
sfuConfig: {
|
||||
url: "sfu-url",
|
||||
jwt: "sfu-token",
|
||||
@@ -290,7 +290,7 @@ describe("LocalMembership", () => {
|
||||
const bTransport = {
|
||||
transport: {
|
||||
livekit_service_url: "b",
|
||||
} as LivekitTransport,
|
||||
} as LivekitTransportConfig,
|
||||
sfuConfig: {
|
||||
url: "sfu-url",
|
||||
jwt: "sfu-token",
|
||||
|
||||
@@ -17,6 +17,7 @@ import { observeParticipantEvents } from "@livekit/components-core";
|
||||
import {
|
||||
Status as RTCSessionStatus,
|
||||
type LivekitTransport,
|
||||
type LivekitTransportConfig,
|
||||
type MatrixRTCSession,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import {
|
||||
@@ -125,7 +126,7 @@ interface Props {
|
||||
muteStates: MuteStates;
|
||||
connectionManager: IConnectionManager;
|
||||
createPublisherFactory: (connection: Connection) => Publisher;
|
||||
joinMatrixRTC: (transport: LivekitTransport) => void;
|
||||
joinMatrixRTC: (transport: LivekitTransportConfig) => void;
|
||||
homeserverConnected: HomeserverConnected;
|
||||
localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
|
||||
matrixRTCSession: Pick<
|
||||
@@ -717,7 +718,7 @@ interface EnterRTCSessionOptions {
|
||||
export function enterRTCSession(
|
||||
rtcSession: MatrixRTCSession,
|
||||
ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
options: EnterRTCSessionOptions,
|
||||
): void {
|
||||
const { encryptMedia, matrixRTCMode } = options;
|
||||
@@ -735,12 +736,26 @@ export function enterRTCSession(
|
||||
const multiSFU =
|
||||
matrixRTCMode === MatrixRTCMode.Compatibility ||
|
||||
matrixRTCMode === MatrixRTCMode.Matrix_2_0;
|
||||
|
||||
// For backwards compatibility with Element Call versions that do not do Matrix 2.0,
|
||||
// we add the livekit alias to the transport.
|
||||
let backwardCompatibleTransport: LivekitTransport | LivekitTransportConfig;
|
||||
if (matrixRTCMode === MatrixRTCMode.Matrix_2_0) {
|
||||
backwardCompatibleTransport = transport;
|
||||
} else {
|
||||
backwardCompatibleTransport = {
|
||||
livekit_alias: rtcSession.room.roomId,
|
||||
...transport,
|
||||
};
|
||||
}
|
||||
|
||||
// Multi-sfu does not need a preferred foci list. just the focus that is actually used.
|
||||
// TODO where/how do we track errors originating from the ongoing rtcSession?
|
||||
|
||||
rtcSession.joinRTCSession(
|
||||
ownMembershipIdentity,
|
||||
multiSFU ? [] : [transport],
|
||||
multiSFU ? transport : undefined,
|
||||
multiSFU ? [] : [backwardCompatibleTransport],
|
||||
multiSFU ? backwardCompatibleTransport : undefined,
|
||||
{
|
||||
notificationType,
|
||||
callIntent,
|
||||
|
||||
@@ -34,7 +34,7 @@ describe("LocalTransport", () => {
|
||||
const openIdResponse: openIDSFU.SFUConfig = {
|
||||
url: "https://lk.example.org",
|
||||
jwt: testJWTToken,
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
};
|
||||
|
||||
@@ -147,7 +147,7 @@ describe("LocalTransport", () => {
|
||||
openIdResolver.resolve?.({
|
||||
url: "https://lk.example.org",
|
||||
jwt: "jwt",
|
||||
livekitAlias: "!room:example.org",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId,
|
||||
});
|
||||
expect(localTransport$.value).toBe(null);
|
||||
@@ -155,13 +155,12 @@ describe("LocalTransport", () => {
|
||||
// final
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!room:example.org",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "jwt",
|
||||
livekitAlias: "!room:example.org",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@alice:example.org:DEVICE",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
@@ -204,13 +203,12 @@ describe("LocalTransport", () => {
|
||||
// final
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!example_room_id",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
@@ -264,13 +262,12 @@ describe("LocalTransport", () => {
|
||||
await flushPromises();
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!example_room_id",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
@@ -284,13 +281,12 @@ describe("LocalTransport", () => {
|
||||
await flushPromises();
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!example_room_id",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
@@ -306,13 +302,12 @@ describe("LocalTransport", () => {
|
||||
await flushPromises();
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!example_room_id",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
@@ -345,13 +340,12 @@ describe("LocalTransport", () => {
|
||||
await flushPromises();
|
||||
expect(localTransport$.value).toStrictEqual({
|
||||
transport: {
|
||||
livekit_alias: "!example_room_id",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
type: "livekit",
|
||||
},
|
||||
sfuConfig: {
|
||||
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
|
||||
livekitAlias: "!example_room_id",
|
||||
livekitAlias: "Akph4alDMhen",
|
||||
livekitIdentity: "@lk_user:ABCDEF",
|
||||
url: "https://lk.example.org",
|
||||
},
|
||||
|
||||
@@ -7,10 +7,9 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import {
|
||||
type CallMembership,
|
||||
isLivekitTransport,
|
||||
type LivekitTransport,
|
||||
isLivekitTransportConfig,
|
||||
type Transport,
|
||||
type LivekitTransportConfig,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { MatrixError, type MatrixClient } from "matrix-js-sdk";
|
||||
import {
|
||||
@@ -57,6 +56,7 @@ interface Props {
|
||||
"getDomain" | "baseUrl" | "_unstable_getRTCTransports"
|
||||
> &
|
||||
OpenIDClientParts;
|
||||
// Used by the jwt service to create the livekit room and compute the livekit alias.
|
||||
roomId: string;
|
||||
useOldestMember$: Behavior<boolean>;
|
||||
forceJwtEndpoint$: Behavior<JwtEndpointVersion>;
|
||||
@@ -90,11 +90,11 @@ export enum JwtEndpointVersion {
|
||||
// 2.
|
||||
// We need to make sure we do not sent livekit_alias in sticky events and that we drop all code for sending state events!
|
||||
export interface LocalTransportWithSFUConfig {
|
||||
transport: LivekitTransport;
|
||||
transport: LivekitTransportConfig;
|
||||
sfuConfig: SFUConfig;
|
||||
}
|
||||
export function isLocalTransportWithSFUConfig(
|
||||
obj: LivekitTransport | LocalTransportWithSFUConfig,
|
||||
obj: LivekitTransportConfig | LocalTransportWithSFUConfig,
|
||||
): obj is LocalTransportWithSFUConfig {
|
||||
return "transport" in obj && "sfuConfig" in obj;
|
||||
}
|
||||
@@ -137,11 +137,10 @@ export const createLocalTransport$ = ({
|
||||
return transport;
|
||||
}),
|
||||
switchMap((transport) => {
|
||||
if (transport !== null && isLivekitTransport(transport)) {
|
||||
if (transport !== null && isLivekitTransportConfig(transport)) {
|
||||
// Get the open jwt token to connect to the sfu
|
||||
const computeLocalTransportWithSFUConfig =
|
||||
async (): Promise<LocalTransportWithSFUConfig> => {
|
||||
// await sleep(1000);
|
||||
return {
|
||||
transport,
|
||||
sfuConfig: await getSFUConfigWithOpenID(
|
||||
@@ -288,18 +287,6 @@ async function makeTransport(
|
||||
transport: {
|
||||
type: "livekit",
|
||||
livekit_service_url: url,
|
||||
// WARNING PLS READ ME!!!
|
||||
// This looks unintuitive especially considering that `sfuConfig.livekitAlias` exists.
|
||||
// Why do we not use: `livekit_alias: sfuConfig.livekitAlias`
|
||||
//
|
||||
// - This is going to be used for sending our state event transport (focus_preferred)
|
||||
// - In sticky events it is expected to NOT send this field at all. The transport is only the `type`, `livekit_service_url`
|
||||
// - If we set it to the hased alias we get from the jwt, we will end up using the hashed alias as the body.roomId field
|
||||
// in v0.16.0. (It will use oldest member transport. It is using the transport.livekit_alias as the body.roomId)
|
||||
//
|
||||
// TLDR this is a temporal field that allow for comaptibilty but the spec expects it to not exists. (but its existance also does not break anything)
|
||||
// It is just named poorly: It was intetended to be the actual alias. But now we do pseudonymys ids so we use a hashed alias.
|
||||
livekit_alias: roomId,
|
||||
},
|
||||
sfuConfig,
|
||||
};
|
||||
|
||||
@@ -26,7 +26,7 @@ import fetchMock from "fetch-mock";
|
||||
import EventEmitter from "events";
|
||||
import { type IOpenIDToken } from "matrix-js-sdk";
|
||||
import { logger } from "matrix-js-sdk/lib/logger";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
|
||||
import {
|
||||
Connection,
|
||||
@@ -51,8 +51,9 @@ let fakeLivekitRoom: MockedObject<LivekitRoom>;
|
||||
let localParticipantEventEmiter: EventEmitter;
|
||||
let fakeLocalParticipant: MockedObject<LocalParticipant>;
|
||||
|
||||
const livekitFocus: LivekitTransport = {
|
||||
livekit_alias: "!roomID:example.org",
|
||||
const ROOM_ID = "!roomID:example.org";
|
||||
|
||||
const livekitFocus: LivekitTransportConfig = {
|
||||
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
|
||||
type: "livekit",
|
||||
};
|
||||
@@ -112,6 +113,7 @@ function setupTest(): void {
|
||||
function setupRemoteConnection(): Connection {
|
||||
const opts: ConnectionOpts = {
|
||||
client: client,
|
||||
roomId: ROOM_ID,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
@@ -154,6 +156,7 @@ describe("Start connection states", () => {
|
||||
|
||||
const opts: ConnectionOpts = {
|
||||
client: client,
|
||||
roomId: ROOM_ID,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
@@ -170,6 +173,7 @@ describe("Start connection states", () => {
|
||||
|
||||
const opts: ConnectionOpts = {
|
||||
client: client,
|
||||
roomId: ROOM_ID,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
@@ -221,6 +225,7 @@ describe("Start connection states", () => {
|
||||
|
||||
const opts: ConnectionOpts = {
|
||||
client: client,
|
||||
roomId: ROOM_ID,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
@@ -279,6 +284,7 @@ describe("Start connection states", () => {
|
||||
|
||||
const opts: ConnectionOpts = {
|
||||
client: client,
|
||||
roomId: ROOM_ID,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
|
||||
@@ -15,7 +15,7 @@ import {
|
||||
type Room as LivekitRoom,
|
||||
type RemoteParticipant,
|
||||
} from "livekit-client";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { BehaviorSubject, map } from "rxjs";
|
||||
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
||||
@@ -49,9 +49,11 @@ export interface ConnectionOpts {
|
||||
/** The identity parts to use on this connection */
|
||||
ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
/** The media transport to connect to. */
|
||||
transport: LivekitTransport;
|
||||
transport: LivekitTransportConfig;
|
||||
/** The Matrix client to use for OpenID and SFU config requests. */
|
||||
client: OpenIDClientParts;
|
||||
/** The room ID this connection is associated with. */
|
||||
roomId: string;
|
||||
/** The observable scope to use for this connection. */
|
||||
scope: ObservableScope;
|
||||
|
||||
@@ -102,7 +104,7 @@ export class Connection {
|
||||
/**
|
||||
* The media transport to connect to.
|
||||
*/
|
||||
public readonly transport: LivekitTransport;
|
||||
public readonly transport: LivekitTransportConfig;
|
||||
|
||||
public readonly livekitRoom: LivekitRoom;
|
||||
|
||||
@@ -131,6 +133,47 @@ export class Connection {
|
||||
* */
|
||||
protected stopped = false;
|
||||
|
||||
// TODO: can we just keep the ConnectionOpts object instead of spreading?
|
||||
private readonly client: OpenIDClientParts;
|
||||
private readonly roomId: string;
|
||||
private readonly logger: Logger;
|
||||
private readonly ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
private readonly existingSFUConfig?: SFUConfig;
|
||||
/**
|
||||
* Creates a new connection to a matrix RTC LiveKit backend.
|
||||
*
|
||||
* @param opts - Connection options {@link ConnectionOpts}.
|
||||
*
|
||||
* @param logger - The logger to use.
|
||||
*/
|
||||
public constructor(opts: ConnectionOpts, logger: Logger) {
|
||||
this.ownMembershipIdentity = opts.ownMembershipIdentity;
|
||||
this.existingSFUConfig = opts.existingSFUConfig;
|
||||
this.roomId = opts.roomId;
|
||||
this.logger = logger.getChild(
|
||||
"[Connection " + opts.transport.livekit_service_url + "]",
|
||||
);
|
||||
this.logger.info(
|
||||
`constructor: ${opts.transport.livekit_service_url} roomId: ${this.roomId} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`,
|
||||
);
|
||||
const { transport, client, scope } = opts;
|
||||
|
||||
this.scope = scope;
|
||||
this.livekitRoom = opts.livekitRoomFactory();
|
||||
this.transport = transport;
|
||||
this.client = client;
|
||||
|
||||
this.remoteParticipants$ = scope.behavior(
|
||||
// Only tracks remote participants
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
);
|
||||
|
||||
scope.onEnd(() => {
|
||||
this.logger.info(`Connection scope ended, stopping connection`);
|
||||
void this.stop();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the connection.
|
||||
*
|
||||
@@ -231,7 +274,7 @@ export class Connection {
|
||||
this.client,
|
||||
this.ownMembershipIdentity,
|
||||
this.transport.livekit_service_url,
|
||||
this.transport.livekit_alias,
|
||||
this.roomId,
|
||||
// dont pass any custom opts for the subscribe only connections
|
||||
{},
|
||||
this.logger,
|
||||
@@ -256,42 +299,4 @@ export class Connection {
|
||||
`stop: DONE disconnecing from lk room ${this.transport.livekit_service_url}`,
|
||||
);
|
||||
}
|
||||
|
||||
private readonly client: OpenIDClientParts;
|
||||
private readonly logger: Logger;
|
||||
private readonly ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
private readonly existingSFUConfig?: SFUConfig;
|
||||
/**
|
||||
* Creates a new connection to a matrix RTC LiveKit backend.
|
||||
*
|
||||
* @param opts - Connection options {@link ConnectionOpts}.
|
||||
*
|
||||
* @param logger - The logger to use.
|
||||
*/
|
||||
public constructor(opts: ConnectionOpts, logger: Logger) {
|
||||
this.ownMembershipIdentity = opts.ownMembershipIdentity;
|
||||
this.existingSFUConfig = opts.existingSFUConfig;
|
||||
this.logger = logger.getChild(
|
||||
"[Connection " + opts.transport.livekit_service_url + "]",
|
||||
);
|
||||
this.logger.info(
|
||||
`constructor: ${opts.transport.livekit_service_url} alias: ${opts.transport.livekit_alias} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`,
|
||||
);
|
||||
const { transport, client, scope } = opts;
|
||||
|
||||
this.scope = scope;
|
||||
this.livekitRoom = opts.livekitRoomFactory();
|
||||
this.transport = transport;
|
||||
this.client = client;
|
||||
|
||||
this.remoteParticipants$ = scope.behavior(
|
||||
// Only tracks remote participants
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
);
|
||||
|
||||
scope.onEnd(() => {
|
||||
this.logger.info(`Connection scope ended, stopping connection`);
|
||||
void this.stop();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
// imported as inline to support worker when loaded from a cdn (cross domain)
|
||||
import E2EEWorker from "livekit-client/e2ee-worker?worker&inline";
|
||||
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
|
||||
import { type ObservableScope } from "../../ObservableScope.ts";
|
||||
import { Connection } from "./Connection.ts";
|
||||
@@ -33,7 +33,7 @@ import { defaultLiveKitOptions } from "../../../livekit/options.ts";
|
||||
export interface ConnectionFactory {
|
||||
createConnection(
|
||||
scope: ObservableScope,
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
logger: Logger,
|
||||
sfuConfig?: SFUConfig,
|
||||
@@ -47,6 +47,7 @@ export class ECConnectionFactory implements ConnectionFactory {
|
||||
* Creates a ConnectionFactory for LiveKit connections.
|
||||
*
|
||||
* @param client - The OpenID client parts for authentication, needed to get openID and JWT tokens.
|
||||
* @param roomId - The current room ID.
|
||||
* @param devices - Used for video/audio out/in capture options.
|
||||
* @param processorState$ - Effects like background blur (only for publishing connection?)
|
||||
* @param livekitKeyProvider - Optional key provider for end-to-end encryption.
|
||||
@@ -57,6 +58,7 @@ export class ECConnectionFactory implements ConnectionFactory {
|
||||
*/
|
||||
public constructor(
|
||||
private client: OpenIDClientParts,
|
||||
private readonly roomId: string,
|
||||
private devices: MediaDevices,
|
||||
private processorState$: Behavior<ProcessorState>,
|
||||
livekitKeyProvider: BaseKeyProvider | undefined,
|
||||
@@ -95,7 +97,7 @@ export class ECConnectionFactory implements ConnectionFactory {
|
||||
*/
|
||||
public createConnection(
|
||||
scope: ObservableScope,
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
logger: Logger,
|
||||
sfuConfig?: SFUConfig,
|
||||
@@ -103,6 +105,7 @@ export class ECConnectionFactory implements ConnectionFactory {
|
||||
return new Connection(
|
||||
{
|
||||
existingSFUConfig: sfuConfig,
|
||||
roomId: this.roomId,
|
||||
transport,
|
||||
client: this.client,
|
||||
scope: scope,
|
||||
|
||||
@@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { BehaviorSubject } from "rxjs";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type RemoteParticipant } from "livekit-client";
|
||||
import { logger } from "matrix-js-sdk/lib/logger";
|
||||
|
||||
@@ -24,16 +24,14 @@ import { constant, type Behavior } from "../../Behavior.ts";
|
||||
|
||||
// Some test constants
|
||||
|
||||
const TRANSPORT_1: LivekitTransport = {
|
||||
const TRANSPORT_1: LivekitTransportConfig = {
|
||||
type: "livekit",
|
||||
livekit_service_url: "https://lk.example.org",
|
||||
livekit_alias: "!alias:example.org",
|
||||
};
|
||||
|
||||
const TRANSPORT_2: LivekitTransport = {
|
||||
const TRANSPORT_2: LivekitTransportConfig = {
|
||||
type: "livekit",
|
||||
livekit_service_url: "https://lk.sample.com",
|
||||
livekit_alias: "!alias:sample.com",
|
||||
};
|
||||
|
||||
let fakeConnectionFactory: ConnectionFactory;
|
||||
@@ -49,7 +47,7 @@ beforeEach(() => {
|
||||
vi.mocked(fakeConnectionFactory).createConnection = vi
|
||||
.fn()
|
||||
.mockImplementation(
|
||||
(scope: ObservableScope, transport: LivekitTransport) => {
|
||||
(scope: ObservableScope, transport: LivekitTransportConfig) => {
|
||||
const mockConnection = {
|
||||
transport,
|
||||
remoteParticipants$: new BehaviorSubject([]),
|
||||
@@ -209,15 +207,15 @@ describe("connectionManagerData$ stream", () => {
|
||||
// Used in test to control fake connections' remoteParticipants$ streams
|
||||
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
|
||||
|
||||
function keyForTransport(transport: LivekitTransport): string {
|
||||
return `${transport.livekit_service_url}|${transport.livekit_alias}`;
|
||||
function keyForTransport(transport: LivekitTransportConfig): string {
|
||||
return `${transport.livekit_service_url}`;
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
fakeRemoteParticipantsStreams = new Map();
|
||||
|
||||
function getRemoteParticipantsFor(
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
): Behavior<RemoteParticipant[]> {
|
||||
return (
|
||||
fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
|
||||
@@ -229,7 +227,7 @@ describe("connectionManagerData$ stream", () => {
|
||||
vi.mocked(fakeConnectionFactory).createConnection = vi
|
||||
.fn()
|
||||
.mockImplementation(
|
||||
(scope: ObservableScope, transport: LivekitTransport) => {
|
||||
(scope: ObservableScope, transport: LivekitTransportConfig) => {
|
||||
const fakeRemoteParticipants$ = new BehaviorSubject<
|
||||
RemoteParticipant[]
|
||||
>([]);
|
||||
|
||||
@@ -6,7 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { combineLatest, map, of, switchMap } from "rxjs";
|
||||
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
import { type RemoteParticipant } from "livekit-client";
|
||||
@@ -42,8 +42,10 @@ export class ConnectionManagerData {
|
||||
}
|
||||
}
|
||||
|
||||
private getKey(transport: LivekitTransport): string {
|
||||
return transport.livekit_service_url + "|" + transport.livekit_alias;
|
||||
private getKey(transport: LivekitTransportConfig): string {
|
||||
// This is enough as a key because the ConnectionManager is already scoped by room.
|
||||
// We also do not need to consider the slotId at this point since each `MatrixRTCSession` is already scoped by `slotDescription: {id, application}`.
|
||||
return transport.livekit_service_url;
|
||||
}
|
||||
|
||||
public getConnections(): Connection[] {
|
||||
@@ -51,15 +53,15 @@ export class ConnectionManagerData {
|
||||
}
|
||||
|
||||
public getConnectionForTransport(
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
): Connection | null {
|
||||
return this.store.get(this.getKey(transport))?.connection ?? null;
|
||||
}
|
||||
|
||||
public getParticipantsForTransport(
|
||||
transport: LivekitTransport,
|
||||
transport: LivekitTransportConfig,
|
||||
): RemoteParticipant[] {
|
||||
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
|
||||
const key = this.getKey(transport);
|
||||
const existing = this.store.get(key);
|
||||
if (existing) {
|
||||
return existing.participants;
|
||||
@@ -72,7 +74,7 @@ interface Props {
|
||||
scope: ObservableScope;
|
||||
connectionFactory: ConnectionFactory;
|
||||
localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
|
||||
remoteTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
||||
remoteTransports$: Behavior<Epoch<LivekitTransportConfig[]>>;
|
||||
|
||||
logger: Logger;
|
||||
ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
@@ -123,7 +125,7 @@ export function createConnectionManager$({
|
||||
* externally this is modified via `registerTransports()`.
|
||||
*/
|
||||
const localAndRemoteTransports$: Behavior<
|
||||
Epoch<(LivekitTransport | LocalTransportWithSFUConfig)[]>
|
||||
Epoch<(LivekitTransportConfig | LocalTransportWithSFUConfig)[]>
|
||||
> = scope.behavior(
|
||||
combineLatest([remoteTransports$, localTransport$]).pipe(
|
||||
// Combine local and remote transports into one transport array
|
||||
@@ -168,19 +170,13 @@ export function createConnectionManager$({
|
||||
// This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field
|
||||
const { transport, sfuConfig } = transportWithOrWithoutSfuConfig;
|
||||
yield {
|
||||
keys: [
|
||||
transport.livekit_service_url,
|
||||
transport.livekit_alias,
|
||||
sfuConfig,
|
||||
],
|
||||
keys: [transport.livekit_service_url, sfuConfig],
|
||||
data: undefined,
|
||||
};
|
||||
} else {
|
||||
const transport = transportWithOrWithoutSfuConfig;
|
||||
yield {
|
||||
keys: [
|
||||
transport.livekit_service_url,
|
||||
transport.livekit_alias,
|
||||
transportWithOrWithoutSfuConfig.livekit_service_url,
|
||||
undefined as undefined | SFUConfig,
|
||||
],
|
||||
data: undefined,
|
||||
@@ -188,13 +184,12 @@ export function createConnectionManager$({
|
||||
}
|
||||
}
|
||||
},
|
||||
(scope, _data$, serviceUrl, alias, sfuConfig) => {
|
||||
(scope, _data$, serviceUrl, sfuConfig) => {
|
||||
const connection = connectionFactory.createConnection(
|
||||
scope,
|
||||
{
|
||||
type: "livekit",
|
||||
livekit_service_url: serviceUrl,
|
||||
livekit_alias: alias,
|
||||
},
|
||||
ownMembershipIdentity,
|
||||
logger,
|
||||
@@ -254,7 +249,7 @@ export function createConnectionManager$({
|
||||
return { connectionManagerData$ };
|
||||
}
|
||||
|
||||
function removeDuplicateTransports<T extends LivekitTransport>(
|
||||
function removeDuplicateTransports<T extends LivekitTransportConfig>(
|
||||
transports: T[],
|
||||
): T[] {
|
||||
return transports.reduce((acc, transport) => {
|
||||
|
||||
@@ -65,6 +65,7 @@ describe("ECConnectionFactory - Audio inputs options", () => {
|
||||
|
||||
const ecConnectionFactory = new ECConnectionFactory(
|
||||
mockClient,
|
||||
"!roomid:example.org",
|
||||
mockMediaDevices({}),
|
||||
new BehaviorSubject<ProcessorState>({
|
||||
supported: true,
|
||||
@@ -105,6 +106,7 @@ describe("ECConnectionFactory - ControlledAudioDevice", () => {
|
||||
|
||||
const ecConnectionFactory = new ECConnectionFactory(
|
||||
mockClient,
|
||||
"!roomid:example.org",
|
||||
mockMediaDevices({
|
||||
audioOutput: {
|
||||
available$: constant(new Map<never, never>()),
|
||||
|
||||
@@ -7,8 +7,8 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
|
||||
import {
|
||||
type LivekitTransport,
|
||||
type CallMembership,
|
||||
type LivekitTransportConfig,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { combineLatest, filter, map } from "rxjs";
|
||||
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
|
||||
@@ -62,7 +62,7 @@ export interface RemoteMatrixLivekitMember extends MatrixLivekitMember {
|
||||
interface Props {
|
||||
scope: ObservableScope;
|
||||
membershipsWithTransport$: Behavior<
|
||||
Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]>
|
||||
Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
|
||||
>;
|
||||
connectionManager: IConnectionManager;
|
||||
}
|
||||
@@ -147,18 +147,12 @@ export function createMatrixLivekitMembers$({
|
||||
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)
|
||||
|
||||
// TODO add this to the JS-SDK
|
||||
export function areLivekitTransportsEqual<T extends LivekitTransport>(
|
||||
export function areLivekitTransportsEqual<T extends LivekitTransportConfig>(
|
||||
t1: T | null,
|
||||
t2: T | null,
|
||||
): boolean {
|
||||
if (t1 && t2)
|
||||
return (
|
||||
t1.livekit_service_url === t2.livekit_service_url &&
|
||||
// In case we have different lk rooms in the same SFU (depends on the livekit authorization service)
|
||||
// It is only needed in case the livekit authorization service is not behaving as expected (or custom implementation)
|
||||
// Also LivekitTransport is planned to become a `ConnectionIdentifier` which moves this equal somewhere else.
|
||||
t1.livekit_alias === t2.livekit_alias
|
||||
);
|
||||
if (!t1 && !t2) return true;
|
||||
return false;
|
||||
if (t1 && t2) {
|
||||
return t1.livekit_service_url === t2.livekit_service_url;
|
||||
}
|
||||
return !t1 && !t2;
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import { BehaviorSubject } from "rxjs";
|
||||
import { type Room as LivekitRoom } from "livekit-client";
|
||||
import EventEmitter from "events";
|
||||
import fetchMock from "fetch-mock";
|
||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { logger } from "matrix-js-sdk/lib/logger";
|
||||
|
||||
import {
|
||||
@@ -71,6 +71,7 @@ beforeEach(() => {
|
||||
|
||||
ecConnectionFactory = new ECConnectionFactory(
|
||||
mockClient,
|
||||
"!roomid:example.org",
|
||||
mockMediaDevices({}),
|
||||
new BehaviorSubject<ProcessorState>({
|
||||
supported: true,
|
||||
@@ -148,7 +149,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
a: expect.toSatisfy((co) =>
|
||||
areLivekitTransportsEqual(
|
||||
co.transport,
|
||||
bobMembership.transports[0]! as LivekitTransport,
|
||||
bobMembership.transports[0]! as LivekitTransportConfig,
|
||||
),
|
||||
),
|
||||
});
|
||||
@@ -185,7 +186,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
expect(
|
||||
areLivekitTransportsEqual(
|
||||
connection.transport,
|
||||
carlMembership.transports[0]! as LivekitTransport,
|
||||
carlMembership.transports[0]! as LivekitTransportConfig,
|
||||
),
|
||||
).toBe(true);
|
||||
return true;
|
||||
@@ -215,7 +216,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
expect(
|
||||
areLivekitTransportsEqual(
|
||||
connection.transport,
|
||||
daveMembership.transports[0]! as LivekitTransport,
|
||||
daveMembership.transports[0]! as LivekitTransportConfig,
|
||||
),
|
||||
).toBe(true);
|
||||
return true;
|
||||
|
||||
@@ -7,10 +7,10 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import {
|
||||
type CallMembership,
|
||||
isLivekitTransport,
|
||||
type LivekitTransport,
|
||||
type LivekitTransportConfig,
|
||||
type MatrixRTCSession,
|
||||
MatrixRTCSessionEvent,
|
||||
isLivekitTransportConfig,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { fromEvent } from "rxjs";
|
||||
|
||||
@@ -27,19 +27,26 @@ export const membershipsAndTransports$ = (
|
||||
memberships$: Behavior<Epoch<CallMembership[]>>,
|
||||
): {
|
||||
membershipsWithTransport$: Behavior<
|
||||
Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]>
|
||||
Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
|
||||
>;
|
||||
transports$: Behavior<Epoch<LivekitTransport[]>>;
|
||||
transports$: Behavior<Epoch<LivekitTransportConfig[]>>;
|
||||
} => {
|
||||
/**
|
||||
* Lists the transports used by ourselves, plus all other MatrixRTC session
|
||||
* members. For completeness this also lists the preferred transport and
|
||||
* whether we are in multi-SFU mode or sticky events mode (because
|
||||
* advertisedTransport$ wants to read them at the same time, and bundling data
|
||||
* together when it might change together is what you have to do in RxJS to
|
||||
* avoid reading inconsistent state or observing too many changes.)
|
||||
* members.
|
||||
* For completeness this also lists the preferred transport and
|
||||
* whether we are in multi-SFU mode or sticky events mode.
|
||||
* `advertisedTransport$` reads these values together, so bundling them avoids inconsistent state or
|
||||
* excessive updates when using RxJS.
|
||||
*/
|
||||
const membershipsWithTransport$ = scope.behavior(
|
||||
const membershipsWithTransport$: Behavior<
|
||||
Epoch<
|
||||
{
|
||||
membership: CallMembership;
|
||||
transport: LivekitTransportConfig | undefined;
|
||||
}[]
|
||||
>
|
||||
> = scope.behavior(
|
||||
memberships$.pipe(
|
||||
mapEpoch((memberships) => {
|
||||
return memberships.map((membership) => {
|
||||
@@ -47,14 +54,16 @@ export const membershipsAndTransports$ = (
|
||||
const transport = membership.getTransport(oldestMembership);
|
||||
return {
|
||||
membership,
|
||||
transport: isLivekitTransport(transport) ? transport : undefined,
|
||||
transport: isLivekitTransportConfig(transport)
|
||||
? transport
|
||||
: undefined,
|
||||
};
|
||||
});
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const transports$ = scope.behavior(
|
||||
const transports$: Behavior<Epoch<LivekitTransportConfig[]>> = scope.behavior(
|
||||
membershipsWithTransport$.pipe(
|
||||
mapEpoch((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))),
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user