From 3de0bbcfc960a2c9c4b2a3328db1c4bc07ce0a1c Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 29 Oct 2025 12:37:14 +0100 Subject: [PATCH] temp Co-authored-by: Valere --- src/state/remoteMembers/Connection.ts | 32 ++--- .../remoteMembers/matrixLivekitMerger.ts | 124 +++++++++++++++--- 2 files changed, 118 insertions(+), 38 deletions(-) diff --git a/src/state/remoteMembers/Connection.ts b/src/state/remoteMembers/Connection.ts index 72239de0..97127a48 100644 --- a/src/state/remoteMembers/Connection.ts +++ b/src/state/remoteMembers/Connection.ts @@ -16,8 +16,10 @@ import { type RemoteParticipant, Room as LivekitRoom, type RoomOptions, + Participant, } from "livekit-client"; import { + ParticipantId, type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; @@ -37,6 +39,8 @@ import { SFURoomCreationRestrictedError, } from "../../utils/errors.ts"; +export type PublishingParticipant = Participant; + export interface ConnectionOpts { /** The media transport to connect to. */ transport: LivekitTransport; @@ -72,21 +76,6 @@ export type ConnectionState = } | { state: "Stopped"; transport: LivekitTransport }; -/** - * Represents participant publishing or expected to publish on the connection. - * It is paired with its associated rtc membership. - */ -export type PublishingParticipant = { - /** - * The LiveKit participant publishing on this connection, or undefined if the participant is not currently (yet) connected to the livekit room. - */ - participant: RemoteParticipant | undefined; - /** - * The rtc call membership associated with this participant. - */ - membership: CallMembership; -}; - /** * A connection to a Matrix RTC LiveKit backend. * @@ -205,7 +194,11 @@ export class Connection { * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ + public readonly publishingParticipants$: Behavior; + public readonly participantsWithPublishTrack$: Behavior< + PublishingParticipant[] + >; /** * The media transport to connect to. @@ -233,12 +226,15 @@ export class Connection { this.transport = transport; this.client = client; - const participantsIncludingSubscribers$: Behavior = - scope.behavior(connectedParticipantsObserver(this.livekitRoom), []); + this.participantsWithPublishTrack$ = scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); + // Legacy using callMemberships this.publishingParticipants$ = scope.behavior( combineLatest( - [participantsIncludingSubscribers$, membershipsWithTransport$], + [this.participantsIncludingSubscribers$, membershipsWithTransport$], (participants, remoteTransports) => remoteTransports // Find all members that claim to publish on this connection diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index ef2fb852..37a13f5f 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -19,6 +19,7 @@ import { type Transport, LivekitTransport, isLivekitTransport, + ParticipantId, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, @@ -40,20 +41,44 @@ import { pauseWhen } from "../../utils/observable"; // - make ConnectionManager its own actual class // - write test for scopes (do we really need to bind scope) class ConnectionManager { - constructor(transports$: Observable) {} - public startWithMemberships(memberships$: Behavior) {} + public setTansports(transports$: Behavior): void {} public readonly connections$: Observable = constant([]); + // connection is used to find the transport (to find matching callmembership) & for the livekitRoom + public readonly participantsByMemberId$: Behavior< + Map< + ParticipantId, + // It can be an array because a bad behaving client could be publishingParticipants$ + // multiple times to several livekit rooms. + { participant: LivekitParticipant; connection: Connection }[] + > + > = constant(new Map()); } +/** + * Represents participant publishing or expected to publish on the connection. + * It is paired with its associated rtc membership. + */ +export type PublishingParticipant = { + /** + * The LiveKit participant publishing on this connection, or undefined if the participant is not currently (yet) connected to the livekit room. + */ + participant: RemoteParticipant | undefined; + /** + * The rtc call membership associated with this participant. + */ + membership: CallMembership; +}; + /** * Represent a matrix call member and his associated livekit participation. * `livekitParticipant` can be undefined if the member is not yet connected to the livekit room * or if it has no livekit transport at all. */ - export interface MatrixLivekitItem { membership: CallMembership; livekitParticipant?: LivekitParticipant; + //TODO Try to remove this! Its waaay to much information + // Just use to get the member's avatar member?: RoomMember; } @@ -107,7 +132,7 @@ export class MatrixLivekitMerger { private scope: ObservableScope, private matrixRoom: MatrixRoom, ) { - connectionManager.startWithMemberships(this.memberships$); + connectionManager.setTansports(this.transports$); } /** @@ -118,17 +143,12 @@ export class MatrixLivekitMerger { * together when it might change together is what you have to do in RxJS to * avoid reading inconsistent state or observing too many changes.) */ - // TODO pass this over to our conncetions - private readonly membershipsWithTransport$: Behavior<{ - membership: CallMembership; - transport?: LivekitTransport; - } | null> = this.scope.behavior( + private readonly membershipsWithTransport$ = this.scope.behavior( this.memberships$.pipe( map((memberships) => { const oldestMembership = this.matrixRTCSession.getOldestMembership(); - - memberships.map((membership) => { - let transport = membership.getTransport( + return memberships.map((membership) => { + const transport = membership.getTransport( oldestMembership ?? membership, ); return { @@ -140,14 +160,65 @@ export class MatrixLivekitMerger { ), ); - private allPublishingParticipants$ = this.connectionManager.connections$.pipe( - switchMap((connections) => { - const listOfPublishingParticipants = connections.map( - (connection) => connection.publishingParticipants$, - ); - return combineLatest(listOfPublishingParticipants).pipe( - map((list) => list.flatMap((innerList) => innerList)), - ); + private readonly transports$ = this.scope.behavior( + this.membershipsWithTransport$.pipe( + map((membershipsWithTransport) => + membershipsWithTransport.reduce((acc, { transport }) => { + if ( + transport && + !acc.some((t) => areLivekitTransportsEqual(t, transport)) + ) { + acc.push(transport); + } + return acc; + }, [] as LivekitTransport[]), + ), + ), + ); + + // TODO move this over this the connection manager + // We have a lost of connections, for each of these these + // connection we create a stream of (participant, connection) tuples. + // Then we combine the several streams (1 per Connection) into a single stream of tuples. + private participantsWithConnection$ = + this.connectionManager.connections$.pipe( + switchMap((connections) => { + const listsOfParticipantWithConnection = connections.map( + (connection) => { + return connection.participantsWithPublishTrack$.pipe( + map((participants) => + participants.map((p) => ({ + participant: p, + connection, + })), + ), + ); + }, + ); + return combineLatest(listsOfParticipantWithConnection).pipe( + map((lists) => lists.flatMap((list) => list)), + ); + }), + ); + + // TODO move this over this the connection manager + // Filters the livekit partic + private participantsByMemberId$ = this.participantsWithConnection$.pipe( + map((participantsWithConnections) => { + const participantsByMemberId = new Map(); + participantsWithConnections.forEach(({ participant, connection }) => { + if (participant.getTrackPublications().length > 0) { + const currentVal = participantsByMemberId.get(participant.identity); + participantsByMemberId.set(participant.identity, { + connection, + participants: + currentVal === undefined + ? [participant] + : ([...currentVal, participant] as Participant[]), + }); + } + }); + return participantsByMemberId; }), ); @@ -172,3 +243,16 @@ export class MatrixLivekitMerger { ) .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)); } + +// TODO add this to the JS-SDK +function areLivekitTransportsEqual( + t1: LivekitTransport, + t2: LivekitTransport, +): boolean { + return ( + t1.livekit_service_url === t2.livekit_service_url && + // In case we have different lk rooms in the same SFU (depends on the livekit authorization service) + // It is only needed in case the livekit authorization service is not behaving as expected (or custom implementation) + t1.livekit_alias === t2.livekit_alias + ); +}