From efa9a9cc455271bfb2d37e281999cf7fafe2c522 Mon Sep 17 00:00:00 2001 From: Timo K Date: Wed, 27 Aug 2025 18:41:03 +0200 Subject: [PATCH] introduce publishingParticipants$ Signed-off-by: Timo K --- src/state/CallViewModel.ts | 76 +++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index ff0e0e1d..51bf2134 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -55,6 +55,7 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, isLivekitFocusConfig, + LivekitFocusConfig, type MatrixRTCSession, MatrixRTCSessionEvent, MembershipManagerEvent, @@ -467,7 +468,6 @@ class Connection { for (const track of tracks) { await this.livekitRoom.localParticipant.publishTrack(track); } - // await this.livekitRoom.localParticipant.enableCameraAndMicrophone(); } } @@ -478,17 +478,45 @@ class Connection { this.stopped = true; } - public readonly participants$ = this.scope.behavior( + public readonly participantsIncludingJustSubscribers$ = this.scope.behavior( connectedParticipantsObserver(this.livekitRoom), [], ); + public readonly publishingParticipants$ = ( + memberships$: Behavior, + ): Observable => + this.scope.behavior( + combineLatest([ + connectedParticipantsObserver(this.livekitRoom), + memberships$, + ]).pipe( + map(([participants, memberships]) => { + const publishingMembers = membershipsFocusUrl( + memberships, + this.matrixRTCSession, + ) + .filter((f) => f.livekit_service_url === this.serviceUrl) + .map((f) => f.membership); + return publishingMembers + .map((m) => + participants.find( + (p) => p.identity === `${m.sender}:${m.deviceId}`, + ), + ) + .filter((p): p is RemoteParticipant => !!p); + }), + ), + [], + ); + public constructor( private readonly livekitRoom: LivekitRoom, private readonly serviceUrl: string, private readonly livekitAlias: string, private readonly client: MatrixClient, private readonly scope: ObservableScope, + private readonly matrixRTCSession: MatrixRTCSession, ) {} } @@ -500,7 +528,7 @@ export class CallViewModel extends ViewModel { private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); - private readonly livekitRoom = new LivekitRoom({ + private readonly localConnectionLivekitRoom = new LivekitRoom({ ...defaultLiveKitOptions, e2ee: this.e2eeOptions, }); @@ -510,11 +538,12 @@ export class CallViewModel extends ViewModel { private readonly localConnection = this.localFocus.then( (focus) => new Connection( - this.livekitRoom, + this.localConnectionLivekitRoom, focus.livekit_service_url, this.livekitAlias, this.matrixRTCSession.room.client, this.scope, + this.matrixRTCSession, ), ); @@ -530,10 +559,9 @@ export class CallViewModel extends ViewModel { map( (memberships) => new Set( - memberships - .map((m) => this.matrixRTCSession.resolveActiveFocus(m)) - .filter((f) => f !== undefined && isLivekitFocusConfig(f)) - .map((f) => f.livekit_service_url), + membershipsFocusUrl(memberships, this.matrixRTCSession).map( + (f) => f.livekit_service_url, + ), ), ), ); @@ -561,6 +589,7 @@ export class CallViewModel extends ViewModel { this.livekitAlias, this.matrixRTCSession.room.client, this.scope, + this.matrixRTCSession, ), ); } @@ -675,7 +704,7 @@ export class CallViewModel extends ViewModel { private readonly remoteParticipants$ = this.scope .behavior< RemoteParticipant[] - >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) + >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participantsIncludingJustSubscribers$, ...[...remoteConnections.values()].map((c) => c.participantsIncludingJustSubscribers$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) .pipe(pauseWhen(this.pretendToBeDisconnected$)); /** @@ -749,7 +778,7 @@ export class CallViewModel extends ViewModel { private readonly mediaItems$ = this.scope.behavior( combineLatest([ this.remoteParticipants$, - observeParticipantMedia(this.livekitRoom.localParticipant), + observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant), duplicateTiles.value$, // Also react to changes in the MatrixRTC session list. // The session list will also be update if a room membership changes. @@ -823,7 +852,7 @@ export class CallViewModel extends ViewModel { member, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( @@ -848,7 +877,7 @@ export class CallViewModel extends ViewModel { member, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(matrixIdentifier) ?? "[👻]"), @@ -890,7 +919,7 @@ export class CallViewModel extends ViewModel { undefined, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( @@ -1746,7 +1775,7 @@ export class CallViewModel extends ViewModel { // that our own media is displayed on screen. this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { const publications = - this.livekitRoom.localParticipant.trackPublications.values(); + this.localConnectionLivekitRoom.localParticipant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -1788,3 +1817,22 @@ export class CallViewModel extends ViewModel { this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? } } + +const membershipsFocusUrl = ( + memberships: CallMembership[], + matrixRTCSession: MatrixRTCSession, +): { livekit_service_url: string; membership: CallMembership }[] => { + return memberships + .map( + (m) => + [matrixRTCSession.resolveActiveFocus(m), m] as [ + LivekitFocusConfig | undefined, + CallMembership, + ], + ) + .filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f)) + .map(([f, m]) => ({ + livekit_service_url: f!.livekit_service_url, + membership: m, + })); +};