Refactor Matrix/LiveKit session merging

- Replace MatrixLivekitItem with MatrixLivekitMember, add displayName$
  and participantId, and use explicit LiveKit participant types
- Make sessionBehaviors$ accept a props object and return a typed
  RxRtcSession
- Update CallViewModel to use the new session behaviors, rebuild media
  items from matrixLivekitMembers, handle missing connections and use
  participantId-based keys
- Change localMembership/localTransport to accept Behavior-based
  options, read options.value for enterRTCSession, and fix advertised
  transport selection order
- Update tests and minor UI adjustments (settings modal livekitRooms
  stubbed) and fix JSON formatting in locales
This commit is contained in:
Timo K
2025-11-05 17:55:36 +01:00
parent 107ef16d94
commit 4d0de2fb71
10 changed files with 172 additions and 130 deletions

View File

@@ -74,16 +74,16 @@
"matrix_id": "Matrix ID: {{id}}",
"matrixRTCMode": {
"Comptibility": {
"label": "Compatibility: state events & multi SFU"
"description": "Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)",
"label": "Compatibility: state events & multi SFU",
"description": "Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)"
},
"Legacy": {
"label": "Legacy: state events & oldest membership SFU"
"description": "Compatible with old versions of EC that do not support multi SFU",
"label": "Legacy: state events & oldest membership SFU",
"description": "Compatible with old versions of EC that do not support multi SFU"
},
"Matrix_2_0": {
"label": "Matrix 2.0: sticky events & multi SFU"
"description": "Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later",
"label": "Matrix 2.0: sticky events & multi SFU",
"description": "Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later"
}
},
"mute_all_audio": "Mute all audio (participants, reactions, join sounds)",

View File

@@ -138,7 +138,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
},
reactionsReader.raisedHands$,
reactionsReader.reactions$,
trackProcessorState$,
scope.behavior(trackProcessorState$),
);
setVm(vm);
@@ -247,7 +247,7 @@ export const InCallView: FC<InCallViewProps> = ({
() => void toggleRaisedHand(),
);
const allLivekitRooms = useBehavior(vm.allLivekitRooms$);
// const allLivekitRooms = useBehavior(vm.allLivekitRooms$);
const audioParticipants = useBehavior(vm.audioParticipants$);
const participantCount = useBehavior(vm.participantCount$);
const reconnecting = useBehavior(vm.reconnecting$);
@@ -841,7 +841,8 @@ export const InCallView: FC<InCallViewProps> = ({
onDismiss={closeSettings}
tab={settingsTab}
onTabChange={setSettingsTab}
livekitRooms={allLivekitRooms}
// TODO expose correct data to setttings modal
livekitRooms={[]}
/>
</>
)}

View File

@@ -12,8 +12,9 @@ import EventEmitter from "events";
import { enterRTCSession } from "../src/rtcSessionHelpers";
import { mockConfig } from "./utils/test";
import { MatrixRTCMode } from "./settings/settings";
const USE_MUTI_SFU = false;
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
vi.mock("./UrlParams", () => ({ getUrlParams }));
@@ -94,8 +95,7 @@ test("It joins the correct Session", async () => {
},
{
encryptMedia: true,
useMultiSfu: USE_MUTI_SFU,
preferStickyEvents: false,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
@@ -153,8 +153,7 @@ test("It should not fail with configuration error if homeserver config has livek
},
{
encryptMedia: true,
useMultiSfu: USE_MUTI_SFU,
preferStickyEvents: false,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
});

View File

@@ -21,7 +21,6 @@ import {
RoomEvent,
} from "matrix-js-sdk";
import {
BehaviorSubject,
combineLatest,
concat,
distinctUntilChanged,
@@ -38,7 +37,6 @@ import {
of,
pairwise,
race,
repeat,
scan,
skip,
skipWhile,
@@ -93,7 +91,6 @@ import {
import { shallowEquals } from "../utils/array";
import { type MediaDevices } from "./MediaDevices";
import { type Behavior, constant } from "./Behavior";
import { enterRTCSession } from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { type MuteStates } from "./MuteStates";
@@ -112,12 +109,12 @@ import {
type SpotlightPortraitLayoutMedia,
} from "./layout-types.ts";
import { type ElementCallError } from "../utils/errors.ts";
import { ObservableScope } from "./ObservableScope.ts";
import { type ObservableScope } from "./ObservableScope.ts";
import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts";
import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts";
import {
localMembership$,
LocalMemberState,
type LocalMemberState,
} from "./localMember/LocalMembership.ts";
import { localTransport$ as computeLocalTransport$ } from "./localMember/LocalTransport.ts";
import { sessionBehaviors$ } from "./SessionBehaviors.ts";
@@ -195,10 +192,10 @@ export class CallViewModel {
}
: undefined;
private sessionBehaviors = sessionBehaviors$(
this.scope,
this.matrixRTCSession,
);
private sessionBehaviors = sessionBehaviors$({
scope: this.scope,
matrixRTCSession: this.matrixRTCSession,
});
private memberships$ = this.sessionBehaviors.memberships$;
private localTransport$ = computeLocalTransport$({
@@ -211,6 +208,8 @@ export class CallViewModel {
),
});
// ------------------------------------------------------------------------
private connectionFactory = new ECConnectionFactory(
this.matrixRoom.client,
this.mediaDevices,
@@ -219,10 +218,14 @@ export class CallViewModel {
getUrlParams().controlledAudioDevices,
);
// Can contain duplicates. The connection manager will take care of this.
private allTransports$ = this.scope.behavior(
combineLatest(
[this.localTransport$, this.sessionBehaviors.transports$],
(l, t) => [...(l ? [l] : []), ...t],
(localTransport, transports) => {
const localTransportAsArray = localTransport ? [localTransport] : [];
return [...localTransportAsArray, ...transports];
},
),
);
@@ -232,6 +235,8 @@ export class CallViewModel {
this.allTransports$,
);
// ------------------------------------------------------------------------
private matrixLivekitMerger = new MatrixLivekitMerger(
this.scope,
this.sessionBehaviors.membershipsWithTransport$,
@@ -240,7 +245,7 @@ export class CallViewModel {
this.userId,
this.deviceId,
);
private matrixLivekitItems$ = this.matrixLivekitMerger.matrixLivekitItems$;
private matrixLivekitMembers$ = this.matrixLivekitMerger.matrixLivekitMember$;
private localMembership = localMembership$({
scope: this.scope,
@@ -297,12 +302,12 @@ export class CallViewModel {
// down, for example, and we want to avoid making people worry that the app is
// in a split-brained state.
// DISCUSSION own membership manager ALSO this probably can be simplifis
private readonly pretendToBeDisconnected$ =
this.localMembership.reconnecting$;
public reconnecting$ = this.localMembership.reconnecting$;
private readonly pretendToBeDisconnected$ = this.reconnecting$;
public readonly audioParticipants$ = this.scope.behavior(
this.matrixLivekitItems$.pipe(
map((items) => items.map((item) => item.participant)),
this.matrixLivekitMembers$.pipe(
map((members) => members.map((m) => m.participant)),
),
);
@@ -330,72 +335,82 @@ export class CallViewModel {
// TODO KEEP THIS!! and adapt it to what our membershipManger returns
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
generateKeyed$<
[typeof this.participantsByRoom$.value, number],
[typeof this.matrixLivekitMembers$.value, number],
MediaItem,
MediaItem[]
>(
// Generate a collection of MediaItems from the list of expected (whether
// present or missing) LiveKit participants.
combineLatest([this.participantsByRoom$, duplicateTiles.value$]),
([participantsByRoom, duplicateTiles], createOrGet) => {
combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]),
([matrixLivekitMembers, duplicateTiles], createOrGet) => {
const items: MediaItem[] = [];
for (const { livekitRoom, participants, url } of participantsByRoom) {
for (const { id, participant, member } of participants) {
for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${id}:${i}`;
const item = createOrGet(
mediaId,
(scope) =>
// We create UserMedia with or without a participant.
// This will be the initial value of a BehaviourSubject.
// Once a participant appears we will update the BehaviourSubject. (see below)
new UserMedia(
scope,
mediaId,
member,
participant,
this.options.encryptionSystem,
livekitRoom,
url,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"),
),
this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)),
this.reactions$.pipe(map((v) => v[id] ?? undefined)),
for (const {
connection,
participant,
member,
displayName$,
participantId,
} of matrixLivekitMembers) {
if (connection === undefined) {
logger.warn("connection is not yet initialised.");
continue;
}
for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${participantId}:${i}`;
const lkRoom = connection?.livekitRoom;
const url = connection?.transport.livekit_service_url;
const dpName$ = displayName$.pipe(map((n) => n ?? "[👻]"));
const item = createOrGet(
mediaId,
(scope) =>
// We create UserMedia with or without a participant.
// This will be the initial value of a BehaviourSubject.
// Once a participant appears we will update the BehaviourSubject. (see below)
new UserMedia(
scope,
mediaId,
member,
participant,
this.options.encryptionSystem,
lkRoom,
url,
this.mediaDevices,
this.pretendToBeDisconnected$,
dpName$,
this.handsRaised$.pipe(
map((v) => v[participantId]?.time ?? null),
),
);
items.push(item);
(item as UserMedia).updateParticipant(participant);
this.reactions$.pipe(
map((v) => v[participantId] ?? undefined),
),
),
);
items.push(item);
(item as UserMedia).updateParticipant(participant);
if (participant?.isScreenShareEnabled) {
const screenShareId = `${mediaId}:screen-share`;
items.push(
createOrGet(
screenShareId,
(scope) =>
new ScreenShare(
scope,
screenShareId,
member,
participant,
this.options.encryptionSystem,
livekitRoom,
url,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"),
),
),
),
);
}
if (participant?.isScreenShareEnabled) {
const screenShareId = `${mediaId}:screen-share`;
items.push(
createOrGet(
screenShareId,
(scope) =>
new ScreenShare(
scope,
screenShareId,
member,
participant,
this.options.encryptionSystem,
lkRoom,
url,
this.pretendToBeDisconnected$,
dpName$,
),
),
);
}
}
}
return items;
},
),

View File

@@ -17,16 +17,29 @@ import { fromEvent, map } from "rxjs";
import { type ObservableScope } from "./ObservableScope";
import { type Behavior } from "./Behavior";
export const sessionBehaviors$ = (
scope: ObservableScope,
matrixRTCSession: MatrixRTCSession,
): {
interface Props {
scope: ObservableScope;
matrixRTCSession: MatrixRTCSession;
}
/**
* Wraps behaviors that we extract from an matrixRTCSession.
*/
interface RxRtcSession {
/**
* some prop
*/
memberships$: Behavior<CallMembership[]>;
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[]
>;
transports$: Behavior<LivekitTransport[]>;
} => {
}
export const sessionBehaviors$ = ({
scope,
matrixRTCSession,
}: Props): RxRtcSession => {
const memberships$ = scope.behavior(
fromEvent(
matrixRTCSession,

View File

@@ -40,9 +40,8 @@ import {
enterRTCSession,
type EnterRTCSessionOptions,
} from "../../rtcSessionHelpers";
import { ElementCallError } from "../../utils/errors";
import { Widget } from "matrix-widget-api";
import { ElementWidgetActions, WidgetHelpers } from "../../widget";
import { type ElementCallError } from "../../utils/errors";
import { ElementWidgetActions, type WidgetHelpers } from "../../widget";
enum LivekitState {
UNINITIALIZED = "uninitialized",
@@ -87,6 +86,7 @@ export interface LocalMemberState {
* - send join state/sticky event
*/
interface Props {
options: Behavior<EnterRTCSessionOptions>;
scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates;
@@ -113,6 +113,7 @@ interface Props {
*/
export const localMembership$ = ({
scope,
options,
muteStates,
mediaDevices,
connectionManager,
@@ -124,7 +125,7 @@ export const localMembership$ = ({
widget,
}: Props): {
// publisher: Publisher
requestConnect: (options: EnterRTCSessionOptions) => LocalMemberState;
requestConnect: () => LocalMemberState;
startTracks: () => Behavior<LocalTrack[]>;
requestDisconnect: () => Observable<LocalMemberLivekitState> | null;
state: LocalMemberState; // TODO this is probably superseeded by joinState$
@@ -268,9 +269,7 @@ export const localMembership$ = ({
return tracks$;
};
const requestConnect = (
options: EnterRTCSessionOptions,
): LocalMemberState => {
const requestConnect = (): LocalMemberState => {
if (state.livekit$.value === null) {
startTracks();
state.livekit$.next({ state: LivekitState.CONNECTING });
@@ -290,7 +289,7 @@ export const localMembership$ = ({
localTransport$.pipe(
tap((transport) => {
if (transport !== undefined) {
enterRTCSession(matrixRTCSession, transport, options).catch(
enterRTCSession(matrixRTCSession, transport, options.value).catch(
(error) => {
logger.error(error);
},
@@ -379,7 +378,7 @@ export const localMembership$ = ({
if (advertised !== null && advertised !== undefined) {
try {
configError$.next(null);
await enterRTCSession(matrixRTCSession, advertised, options);
await enterRTCSession(matrixRTCSession, advertised, options.value);
} catch (e) {
logger.error("Error entering RTC session", e);
}

View File

@@ -77,13 +77,12 @@ export const localTransport$ = ({
scope.behavior(from(makeTransport(client, roomId)), undefined);
/**
* The transport we should advertise in our MatrixRTC membership (plus whether
* it is a multi-SFU transport and whether we should use sticky events).
* The transport we should advertise in our MatrixRTC membership.
*/
const advertisedTransport$ = scope.behavior(
combineLatest(
[useOldestMember$, preferredTransport$, oldestMemberTransport$],
(useOldestMember, preferredTransport, oldestMemberTransport) =>
[useOldestMember$, oldestMemberTransport$, preferredTransport$],
(useOldestMember, oldestMemberTransport, preferredTransport) =>
useOldestMember ? oldestMemberTransport : preferredTransport,
).pipe<LivekitTransport>(distinctUntilChanged(deepCompare)),
undefined,

View File

@@ -107,7 +107,7 @@ export class ConnectionManager {
private readonly connectionFactory: ConnectionFactory,
private readonly inputTransports$: Behavior<LivekitTransport[]>,
) {
// TODO logger: only construct one logger from the client and make it compatible via a EC specific singleton.
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
this.logger = logger.getChild("ConnectionManager");
scope.onEnd(() => this.running$.next(false));
}

View File

@@ -23,7 +23,7 @@ import { type Room as MatrixRoom } from "matrix-js-sdk";
import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils";
import {
type MatrixLivekitItem,
type MatrixLivekitMember,
MatrixLivekitMerger,
} from "./matrixLivekitMerger";
import { ObservableScope } from "../ObservableScope";
@@ -79,10 +79,12 @@ afterEach(() => {
test("should signal participant not yet connected to livekit", () => {
fakeMemberships$.next([aliceRtcMember]);
let items: MatrixLivekitItem[] = [];
matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((emitted) => {
items = emitted;
});
let items: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$
.pipe(take(1))
.subscribe((emitted) => {
items = emitted;
});
expect(items).toHaveLength(1);
const item = items[0];
@@ -112,10 +114,12 @@ test("should signal participant on a connection that is publishing", () => {
]);
fakeManagerData$.next(managerData);
let items: MatrixLivekitItem[] = [];
matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((emitted) => {
items = emitted;
});
let items: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$
.pipe(take(1))
.subscribe((emitted) => {
items = emitted;
});
expect(items).toHaveLength(1);
const item = items[0];
@@ -136,7 +140,7 @@ test("should signal participant on a connection that is not publishing", () => {
managerData.add(fakeConnection, []);
fakeManagerData$.next(managerData);
matrixLivekitMerger.matrixLivekitItems$.pipe(take(1)).subscribe((items) => {
matrixLivekitMerger.matrixLivekitMember$.pipe(take(1)).subscribe((items) => {
expect(items).toHaveLength(1);
const item = items[0];
@@ -177,8 +181,8 @@ describe("Publication edge case", () => {
);
test("bob is publishing in several connections", () => {
let lastMatrixLkItems: MatrixLivekitItem[] = [];
matrixLivekitMerger.matrixLivekitItems$.subscribe((items) => {
let lastMatrixLkItems: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
lastMatrixLkItems = items;
});
@@ -218,8 +222,8 @@ describe("Publication edge case", () => {
});
test("bob is publishing in the wrong connection", () => {
let lastMatrixLkItems: MatrixLivekitItem[] = [];
matrixLivekitMerger.matrixLivekitItems$.subscribe((items) => {
let lastMatrixLkItems: MatrixLivekitMember[] = [];
matrixLivekitMerger.matrixLivekitMember$.subscribe((items) => {
lastMatrixLkItems = items;
});

View File

@@ -5,7 +5,10 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { type Participant as LivekitParticipant } from "livekit-client";
import {
type LocalParticipant as LocalLivekitParticipant,
type RemoteParticipant as RemoteLivekitParticipant,
} from "livekit-client";
import {
type LivekitTransport,
type CallMembership,
@@ -27,22 +30,23 @@ import { type Connection } from "./Connection";
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitItem {
export interface MatrixLivekitMember {
membership: CallMembership;
displayName: string;
participant?: LivekitParticipant;
displayName$: Behavior<string>;
participant?: LocalLivekitParticipant | RemoteLivekitParticipant;
connection?: Connection;
/**
* TODO Try to remove this! Its waaay to much information.
* Just get the member's avatar
* @deprecated
*/
member?: RoomMember;
member: RoomMember;
mxcAvatarUrl?: string;
participantId: string;
}
// Alternative structure idea:
// const livekitMatrixItems$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitItem[]> => {
// const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitMember[]> => {
/**
* Combines MatrixRtc and Livekit worlds.
@@ -52,13 +56,13 @@ export interface MatrixLivekitItem {
* - an observable of CallMembership[] to track the call members (The matrix side)
* - a `ConnectionManager` for the lk rooms (The livekit side)
* - out (via public Observable):
* - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data.
* - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data.
*/
export class MatrixLivekitMerger {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
public matrixLivekitItems$: Behavior<MatrixLivekitItem[]>;
public matrixLivekitMember$: Behavior<MatrixLivekitMember[]>;
// private readonly logger: Logger;
@@ -79,7 +83,7 @@ export class MatrixLivekitMerger {
) {
// this.logger = parentLogger.getChild("MatrixLivekitMerger");
this.matrixLivekitItems$ = this.scope.behavior(
this.matrixLivekitMember$ = this.scope.behavior(
this.start$().pipe(startWith([])),
);
}
@@ -87,7 +91,7 @@ export class MatrixLivekitMerger {
// =======================================
/// PRIVATES
// =======================================
private start$(): Observable<MatrixLivekitItem[]> {
private start$(): Observable<MatrixLivekitMember[]> {
const displaynameMap$ = memberDisplaynames$(
this.scope,
this.matrixRoom,
@@ -102,10 +106,9 @@ export class MatrixLivekitMerger {
return combineLatest([
membershipsWithTransport$,
this.connectionManager.connectionManagerData$,
displaynameMap$,
]).pipe(
map(([memberships, managerData, displayNameMap]) => {
const items: MatrixLivekitItem[] = memberships.map(
map(([memberships, managerData]) => {
const items: MatrixLivekitMember[] = memberships.map(
({ membership, transport }) => {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
@@ -123,14 +126,23 @@ export class MatrixLivekitMerger {
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
const displayName$ = this.scope.behavior(
displaynameMap$.pipe(
map(
(displayNameMap) =>
displayNameMap.get(membership.membershipID) ?? "---",
),
),
);
return {
participant,
membership,
connection,
// This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
member,
displayName: displayNameMap.get(membership.membershipID) ?? "---",
displayName$,
mxcAvatarUrl: member?.getMxcAvatarUrl(),
participantId,
};
},
);