From 62ef49ca0561ae7fc509f84ab01484635b7df33a Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 29 Oct 2025 15:20:06 +0100 Subject: [PATCH] temp Co-authored-by: Valere --- .../remoteMembers/MatrixLivekitMerger.test.ts | 30 +++++ .../remoteMembers/matrixLivekitMerger.ts | 120 ++++++++++-------- 2 files changed, 96 insertions(+), 54 deletions(-) create mode 100644 src/state/remoteMembers/MatrixLivekitMerger.test.ts diff --git a/src/state/remoteMembers/MatrixLivekitMerger.test.ts b/src/state/remoteMembers/MatrixLivekitMerger.test.ts new file mode 100644 index 00000000..df7aca0d --- /dev/null +++ b/src/state/remoteMembers/MatrixLivekitMerger.test.ts @@ -0,0 +1,30 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + test, + vi, + onTestFinished, + it, + describe, + expect, + beforeEach, + afterEach, +} from "vitest"; + +import { MatrixLivekitMerger } from "./matrixLivekitMerger"; +import { ObservableScope } from "../ObservableScope"; + +let testScope: ObservableScope; + +beforeEach(() => { + testScope = new ObservableScope(); +}); + +afterEach(() => { + testScope.end(); +}); diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index 37a13f5f..4cd68663 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -1,5 +1,5 @@ /* -Copyright 2025 Element c. +Copyright 2025 Element Creations Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. @@ -23,7 +23,6 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, - fromEvent, map, startWith, switchMap, @@ -36,6 +35,7 @@ import { Behavior, constant } from "../Behavior"; import { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; import { getRoomMemberFromRtcMember } from "./displayname"; import { pauseWhen } from "../../utils/observable"; +import { Logger } from "matrix-js-sdk/lib/logger"; // TODOs: // - make ConnectionManager its own actual class @@ -44,16 +44,17 @@ class ConnectionManager { public setTansports(transports$: Behavior): void {} public readonly connections$: Observable = constant([]); // connection is used to find the transport (to find matching callmembership) & for the livekitRoom - public readonly participantsByMemberId$: Behavior< - Map< - ParticipantId, - // It can be an array because a bad behaving client could be publishingParticipants$ - // multiple times to several livekit rooms. - { participant: LivekitParticipant; connection: Connection }[] - > - > = constant(new Map()); + public readonly participantsByMemberId$: Behavior = + constant(new Map()); } +export type ParticipantByMemberIdMap = Map< + ParticipantId, + // It can be an array because a bad behaving client could be publishingParticipants$ + // multiple times to several livekit rooms. + { participant: LivekitParticipant; connection: Connection }[] +>; + /** * Represents participant publishing or expected to publish on the connection. * It is paired with its associated rtc membership. @@ -111,27 +112,20 @@ interface LivekitRoomWithParticipants { * - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data. */ export class MatrixLivekitMerger { - /** - * The MatrixRTC session participants. - */ - // Note that MatrixRTCSession already filters the call memberships by users - // that are joined to the room; we don't need to perform extra filtering here. - public readonly memberships$ = this.scope.behavior( - fromEvent( - this.matrixRTCSession, - MatrixRTCSessionEvent.MembershipsChanged, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.memberships), - ), - ); + private readonly logger: Logger; + public constructor( - private matrixRTCSession: MatrixRTCSession, + private memberships$: Observable, private connectionManager: ConnectionManager, private scope: ObservableScope, + // TODO this is too much information for that class, + // apparently needed to get a room member to later get the Avatar + // => Extract an AvatarService instead? private matrixRoom: MatrixRoom, + parentLogger: Logger, ) { + this.logger = parentLogger.createChildLogger("MatrixLivekitMerger"); connectionManager.setTansports(this.transports$); } @@ -146,11 +140,9 @@ export class MatrixLivekitMerger { private readonly membershipsWithTransport$ = this.scope.behavior( this.memberships$.pipe( map((memberships) => { - const oldestMembership = this.matrixRTCSession.getOldestMembership(); return memberships.map((membership) => { - const transport = membership.getTransport( - oldestMembership ?? membership, - ); + const oldestMembership = memberships[0] ?? membership; + const transport = membership.getTransport(oldestMembership); return { membership, transport: isLivekitTransport(transport) ? transport : undefined, @@ -205,45 +197,65 @@ export class MatrixLivekitMerger { // Filters the livekit partic private participantsByMemberId$ = this.participantsWithConnection$.pipe( map((participantsWithConnections) => { - const participantsByMemberId = new Map(); - participantsWithConnections.forEach(({ participant, connection }) => { - if (participant.getTrackPublications().length > 0) { - const currentVal = participantsByMemberId.get(participant.identity); - participantsByMemberId.set(participant.identity, { - connection, - participants: - currentVal === undefined - ? [participant] - : ([...currentVal, participant] as Participant[]), - }); - } - }); + const participantsByMemberId = participantsWithConnections.reduce( + (acc, test) => { + const { participant, connection } = test; + if (participant.getTrackPublications().length > 0) { + const currentVal = acc.get(participant.identity); + if (!currentVal) { + acc.set(participant.identity, [{ connection, participant }]); + } else { + // already known + // This is user is publishing on several SFUs + currentVal.push({ connection, participant }); + this.logger.info( + `Participant ${participant.identity} is publishing on several SFUs ${currentVal.join()}`, + ); + } + } + return acc; + }, + new Map() as ParticipantByMemberIdMap, + ); + return participantsByMemberId; }), ); public readonly matrixLivekitItems$ = this.scope .behavior( - this.allPublishingParticipants$.pipe( - map((participants) => { - const matrixLivekitItems: MatrixLivekitItem[] = participants.map( - ({ participant, membership }) => ({ - participant, + combineLatest([ + this.membershipsWithTransport$, + this.participantsByMemberId$, + ]).pipe( + map(([memberships, participantsByMemberId]) => { + const items = memberships.map(({ membership, transport }) => { + const participantsWithConnection = participantsByMemberId.get( + membership.membershipID, + ); + const participant = + transport && + participantsWithConnection?.find((p) => + areLivekitTransportsEqual(p.connection.transport, transport), + ); + return { + livekitParticipant: participant, membership, - id: `${membership.userId}:${membership.deviceId}`, // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) member: - getRoomMemberFromRtcMember(membership, this.matrixRoom) - ?.member ?? memberError(), - }), - ); - return matrixLivekitItems; + // Why a member error? if we have a call membership there is a room member + getRoomMemberFromRtcMember(membership, this.matrixRoom)?.member, + } as MatrixLivekitItem; + }); + return items; }), ), ) - .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)); + .pipe(startWith([])); } +// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) + // TODO add this to the JS-SDK function areLivekitTransportsEqual( t1: LivekitTransport,