From c8ef8d6a246beb97d1f8e6a80e54f25420752a33 Mon Sep 17 00:00:00 2001 From: Timo K Date: Thu, 30 Oct 2025 01:13:06 +0100 Subject: [PATCH] start moving over/removing things from the CallViewModel --- src/state/CallViewModel.ts | 418 +++--------------- src/state/ownMember/OwnMembership.ts | 160 ++++--- src/state/remoteMembers/Connection.test.ts | 5 +- src/state/remoteMembers/ConnectionManager.ts | 10 +- src/state/remoteMembers/displayname.ts | 10 +- .../remoteMembers/matrixLivekitMerger.ts | 95 ++-- src/utils/displayname.ts | 2 +- 7 files changed, 231 insertions(+), 469 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 90a1f682..c8f68cbb 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -137,7 +137,16 @@ import { import { ElementCallError, UnknownCallError } from "../utils/errors.ts"; import { ObservableScope } from "./ObservableScope.ts"; import { memberDisplaynames$ } from "./remoteMembers/displayname.ts"; +import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts"; +import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts"; +//TODO +// Larger rename +// member,membership -> rtcMember +// participant -> livekitParticipant +// matrixLivekitItem -> callMember +// js-sdk +// callMembership -> rtcMembership export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; autoLeaveWhenOthersLeft?: boolean; @@ -205,6 +214,29 @@ export class CallViewModel { null, ); + private memberships$ = this.scope.behavior( + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + (_, memberships: CallMembership[]) => memberships, + ), + ); + + private connectionManager = new ConnectionManager( + this.scope, + this.matrixRoom.client, + this.mediaDevices, + this.trackProcessorState$, + this.e2eeLivekitOptions(), + ); + + private matrixLivekitMerger = new MatrixLivekitMerger( + this.scope, + this.memberships$, + this.connectionManager, + this.matrixRoom, + ); + /** * If there is a configuration error with the call (e.g. misconfigured E2EE). * This is a fatal error that prevents the call from being created/joined. @@ -221,7 +253,7 @@ export class CallViewModel { this.join$.next(); } - // CODESMALL + // CODESMELL? // This is functionally the same Observable as leave$, except here it's // hoisted to the top of the class. This enables the cyclic dependency between // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> @@ -302,112 +334,28 @@ export class CallViewModel { ), ); - // DISCUSSION move to ConnectionManager - /** - * The local connection over which we will publish our media. It could - * possibly also have some remote users' media available on it. - * null when not joined. - */ - private readonly localConnection$: Behavior | null> = - this.scope.behavior( - generateKeyed$< - Async | null, - PublishConnection, - Async | null - >( - this.localTransport$, - (transport, createOrGet) => - transport && - mapAsync(transport, (transport) => - createOrGet( - // Stable key that uniquely idenifies the transport - JSON.stringify({ - url: transport.livekit_service_url, - alias: transport.livekit_alias, - }), - (scope) => - new PublishConnection( - { - transport, - client: this.matrixRoom.client, - scope, - remoteTransports$: this.remoteTransports$, - livekitRoomFactory: this.options.livekitRoomFactory, - }, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ), - ), - ), - ), - ); - - // DISCUSSION move to ConnectionManager - public readonly livekitConnectionState$ = - // TODO: This options.connectionState$ behavior is a small hack inserted - // here to facilitate testing. This would likely be better served by - // breaking CallViewModel down into more naturally testable components. - this.options.connectionState$ ?? - this.scope.behavior( - this.localConnection$.pipe( - switchMap((c) => - c?.state === "ready" - ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? - c.value.state$.pipe( - switchMap((s) => { - if (s.state === "ConnectedToLkRoom") - return s.connectionState$; - return of(ConnectionState.Disconnected); - }), - ) - : of(ConnectionState.Disconnected), - ), - ), - ); - - /** - * A list of the connections that should be active at any given time. - */ - // DISCUSSION move to ConnectionManager - private readonly connections$ = this.scope.behavior( - combineLatest( - [this.localConnection$, this.remoteConnections$], - (local, remote) => [ - ...(local?.state === "ready" ? [local.value] : []), - ...remote.values(), - ], - ), - ); - - /** - * Emits with connections whenever they should be started or stopped. - */ - // DISCUSSION move to ConnectionManager - private readonly connectionInstructions$ = this.connections$.pipe( - pairwise(), - map(([prev, next]) => { - const start = new Set(next.values()); - for (const connection of prev) start.delete(connection); - const stop = new Set(prev.values()); - for (const connection of next) stop.delete(connection); - - return { start, stop }; - }), - ); - - public readonly allLivekitRooms$ = this.scope.behavior( - this.connections$.pipe( - map((connections) => - [...connections.values()].map((c) => ({ - room: c.livekitRoom, - url: c.transport.livekit_service_url, - isLocal: c instanceof PublishConnection, - })), - ), - ), - ); + // // DISCUSSION move to ConnectionManager + // public readonly livekitConnectionState$ = + // // TODO: This options.connectionState$ behavior is a small hack inserted + // // here to facilitate testing. This would likely be better served by + // // breaking CallViewModel down into more naturally testable components. + // this.options.connectionState$ ?? + // this.scope.behavior( + // this.localConnection$.pipe( + // switchMap((c) => + // c?.state === "ready" + // ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? + // c.value.state$.pipe( + // switchMap((s) => { + // if (s.state === "ConnectedToLkRoom") + // return s.connectionState$; + // return of(ConnectionState.Disconnected); + // }), + // ) + // : of(ConnectionState.Disconnected), + // ), + // ), + // ); private readonly userId = this.matrixRoom.client.getUserId()!; private readonly deviceId = this.matrixRoom.client.getDeviceId()!; @@ -450,114 +398,6 @@ export class CallViewModel { ), ); - /** - * Whether we are "fully" connected to the call. Accounts for both the - * connection to the MatrixRTC session and the LiveKit publish connection. - */ - // DISCUSSION own membership manager - private readonly connected$ = this.scope.behavior( - and$( - this.matrixConnected$, - this.livekitConnectionState$.pipe( - map((state) => state === ConnectionState.Connected), - ), - ), - ); - - /** - * Whether we should tell the user that we're reconnecting to the call. - */ - // DISCUSSION own membership manager - public readonly reconnecting$ = this.scope.behavior( - this.connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), - ), - ); - - /** - * Lists the transports used by ourselves, plus all other MatrixRTC session - * members. For completeness this also lists the preferred transport and - * whether we are in multi-SFU mode or sticky events mode (because - * advertisedTransport$ wants to read them at the same time, and bundling data - * together when it might change together is what you have to do in RxJS to - * avoid reading inconsistent state or observing too many changes.) - */ - // TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports. - // DISCUSS move the local part to the own membership file - private readonly transports$: Behavior<{ - local: Async; - remote: { membership: CallMembership; transport: LivekitTransport }[]; - preferred: Async; - multiSfu: boolean; - preferStickyEvents: boolean; - } | null> = this.scope.behavior( - this.joined$.pipe( - switchMap((joined) => - joined - ? combineLatest( - [ - this.preferredTransport$, - this.memberships$, - multiSfu.value$, - preferStickyEvents.value$, - ], - (preferred, memberships, preferMultiSfu, preferStickyEvents) => { - // Multi-SFU must be implicitly enabled when using sticky events - const multiSfu = preferStickyEvents || preferMultiSfu; - - const oldestMembership = - this.matrixRTCSession.getOldestMembership(); - const remote = memberships.flatMap((m) => { - if (m.userId === this.userId && m.deviceId === this.deviceId) - return []; - const t = m.getTransport(oldestMembership ?? m); - return t && isLivekitTransport(t) - ? [{ membership: m, transport: t }] - : []; - }); - - let local = preferred; - if (!multiSfu) { - const oldest = this.matrixRTCSession.getOldestMembership(); - if (oldest !== undefined) { - const selection = oldest.getTransport(oldest); - // TODO selection can be null if no transport is configured should we report an error? - if (selection && isLivekitTransport(selection)) - local = ready(selection); - } - } - - if (local.state === "error") { - this._configError$.next( - local.value instanceof ElementCallError - ? local.value - : new UnknownCallError(local.value), - ); - } - - return { - local, - remote, - preferred, - multiSfu, - preferStickyEvents, - }; - }, - ) - : of(null), - ), - ), - ); - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. @@ -569,95 +409,7 @@ export class CallViewModel { // DISCUSSION own membership manager ALSO this probably can be simplifis private readonly pretendToBeDisconnected$ = this.reconnecting$; - /** - * Lists, for each LiveKit room, the LiveKit participants whose media should - * be presented. - */ - private readonly participantsByRoom$ = this.scope.behavior< - { - livekitRoom: LivekitRoom; - url: string; // Included for use as a React key - participants: { - id: string; - participant: LocalParticipant | RemoteParticipant | undefined; - member: RoomMember; - }[]; - }[] - >( - // TODO: Move this logic into Connection/PublishConnection if possible - this.localConnection$ - .pipe( - switchMap((localConnection) => { - if (localConnection?.state !== "ready") return []; - const memberError = (): never => { - throw new Error("No room member for call membership"); - }; - const localParticipant = { - id: `${this.userId}:${this.deviceId}`, - participant: localConnection.value.livekitRoom.localParticipant, - member: - this.matrixRoom.getMember(this.userId ?? "") ?? memberError(), - }; - - return this.remoteConnections$.pipe( - switchMap((remoteConnections) => - combineLatest( - [localConnection.value, ...remoteConnections].map((c) => - c.publishingParticipants$.pipe( - map((ps) => { - const participants: { - id: string; - participant: - | LocalParticipant - | RemoteParticipant - | undefined; - member: RoomMember; - }[] = ps.map(({ participant, membership }) => ({ - id: `${membership.userId}:${membership.deviceId}`, - participant, - member: - getRoomMemberFromRtcMember( - membership, - this.matrixRoom, - )?.member ?? memberError(), - })); - if (c === localConnection.value) - participants.push(localParticipant); - - return { - livekitRoom: c.livekitRoom, - url: c.transport.livekit_service_url, - participants, - }; - }), - ), - ), - ), - ), - ); - }), - ) - .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)), - ); - - /** - * Lists, for each LiveKit room, the LiveKit participants whose audio should - * be rendered. - */ - // (This is effectively just participantsByRoom$ with a stricter type) - public readonly audioParticipants$ = this.scope.behavior( - this.participantsByRoom$.pipe( - map((data) => - data.map(({ livekitRoom, url, participants }) => ({ - livekitRoom, - url, - participants: participants.flatMap(({ participant }) => - participant instanceof RemoteParticipant ? [participant] : [], - ), - })), - ), - ), - ); + public readonly audioParticipants$; // now will be created based on the connectionmanager public readonly handsRaised$ = this.scope.behavior( this.handsRaisedSubject$.pipe(pauseWhen(this.pretendToBeDisconnected$)), @@ -677,17 +429,19 @@ export class CallViewModel { ), ); - memberDisplaynames$ = memberDisplaynames$( - this.matrixRoom, - this.memberships$, - this.scope, - this.userId, - this.deviceId, - ); + // Now will be added to the matricLivekitMerger + // memberDisplaynames$ = memberDisplaynames$( + // this.matrixRoom, + // this.memberships$, + // this.scope, + // this.userId, + // this.deviceId, + // ); /** * List of MediaItems that we want to have tiles for. */ + // TODO KEEP THIS!! and adapt it to what our membershipManger returns private readonly mediaItems$ = this.scope.behavior( generateKeyed$< [typeof this.participantsByRoom$.value, number], @@ -790,10 +544,12 @@ export class CallViewModel { * - There can be multiple participants for one Matrix user if they join from * multiple devices. */ + // TODO KEEP THIS!! and adapt it to what our membershipManger returns public readonly participantCount$ = this.scope.behavior( this.memberships$.pipe(map((ms) => ms.length)), ); + // TODO convert all ring and all others left logic into one callLifecycleTracker$(didSendCallNotification$,matrixLivekitItem$): {autoLeave$,callPickupState$} private readonly allOthersLeft$ = this.memberships$.pipe( pairwise(), filter( @@ -1687,46 +1443,8 @@ export class CallViewModel { private readonly reactionsSubject$: Observable< Record >, - private readonly trackProcessorState$: Observable, + private readonly trackProcessorState$: Behavior, ) { - // Start and stop local and remote connections as needed - // DISCUSSION connection manager - this.connectionInstructions$ - .pipe(this.scope.bind()) - .subscribe(({ start, stop }) => { - for (const c of stop) { - logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); - c.stop().catch((err) => { - // TODO: better error handling - logger.error( - `Fail to stop connection to ${c.transport.livekit_service_url}`, - err, - ); - }); - } - for (const c of start) { - c.start().then( - () => - logger.info(`Connected to ${c.transport.livekit_service_url}`), - (e) => { - // We only want to report fatal errors `_configError$` for the publish connection. - // If there is an error with another connection, it will not terminate the call and will be displayed - // on eacn tile. - if ( - c instanceof PublishConnection && - e instanceof ElementCallError - ) { - this._configError$.next(e); - } - logger.error( - `Failed to start connection to ${c.transport.livekit_service_url}`, - e, - ); - }, - ); - } - }); - // Start and stop session membership as needed this.scope.reconcile(this.advertisedTransport$, async (advertised) => { if (advertised !== null) { diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts index 4ba4c380..56d40b3e 100644 --- a/src/state/ownMember/OwnMembership.ts +++ b/src/state/ownMember/OwnMembership.ts @@ -19,84 +19,116 @@ const ownMembership$ = ( connected: Behavior; transport: Behavior; } => { + const userId = this.matrixRoom.client.getUserId()!; + const deviceId = this.matrixRoom.client.getDeviceId()!; + const connection = connectionManager.registerTransports( constant([transport]), ); const publisher = new Publisher(connection); + // HOW IT WAS PREVIEOUSLY CREATED + // new PublishConnection( + // { + // transport, + // client: this.matrixRoom.client, + // scope, + // remoteTransports$: this.remoteTransports$, + // livekitRoomFactory: this.options.livekitRoomFactory, + // }, + // this.mediaDevices, + // this.muteStates, + // this.e2eeLivekitOptions(), + // this.scope.behavior(this.trackProcessorState$), + // ), /** - * Lists the transports used by ourselves, plus all other MatrixRTC session - * members. For completeness this also lists the preferred transport and - * whether we are in multi-SFU mode or sticky events mode (because - * advertisedTransport$ wants to read them at the same time, and bundling data - * together when it might change together is what you have to do in RxJS to - * avoid reading inconsistent state or observing too many changes.) + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). */ - // TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports. - // DISCUSS move to MatrixLivekitMerger - const transport$: Behavior<{ - local: Async; - preferred: Async; + // DISCUSS move to ownMembership + private readonly preferredTransport$ = this.scope.behavior( + async$(makeTransport(this.matrixRTCSession)), + ); + + /** + * The transport over which we should be actively publishing our media. + * null when not joined. + */ + // DISCUSSION ownMembershipManager + private readonly localTransport$: Behavior | null> = + this.scope.behavior( + this.transports$.pipe( + map((transports) => transports?.local ?? null), + distinctUntilChanged | null>(deepCompare), + ), + ); + + /** + * The transport we should advertise in our MatrixRTC membership (plus whether + * it is a multi-SFU transport and whether we should use sticky events). + */ + // DISCUSSION ownMembershipManager + private readonly advertisedTransport$: Behavior<{ multiSfu: boolean; preferStickyEvents: boolean; + transport: LivekitTransport; } | null> = this.scope.behavior( - this.joined$.pipe( - switchMap((joined) => - joined - ? combineLatest( - [ - this.preferredTransport$, - this.memberships$, - multiSfu.value$, - preferStickyEvents.value$, - ], - (preferred, memberships, preferMultiSfu, preferStickyEvents) => { - // Multi-SFU must be implicitly enabled when using sticky events - const multiSfu = preferStickyEvents || preferMultiSfu; + this.transports$.pipe( + map((transports) => + transports?.local.state === "ready" && + transports.preferred.state === "ready" + ? { + multiSfu: transports.multiSfu, + preferStickyEvents: transports.preferStickyEvents, + // In non-multi-SFU mode we should always advertise the preferred + // SFU to minimize the number of membership updates + transport: transports.multiSfu + ? transports.local.value + : transports.preferred.value, + } + : null, + ), + distinctUntilChanged<{ + multiSfu: boolean; + preferStickyEvents: boolean; + transport: LivekitTransport; + } | null>(deepCompare), + ), + ); - const oldestMembership = - this.matrixRTCSession.getOldestMembership(); - const remote = memberships.flatMap((m) => { - if (m.userId === this.userId && m.deviceId === this.deviceId) - return []; - const t = m.getTransport(oldestMembership ?? m); - return t && isLivekitTransport(t) - ? [{ membership: m, transport: t }] - : []; - }); - - let local = preferred; - if (!multiSfu) { - const oldest = this.matrixRTCSession.getOldestMembership(); - if (oldest !== undefined) { - const selection = oldest.getTransport(oldest); - // TODO selection can be null if no transport is configured should we report an error? - if (selection && isLivekitTransport(selection)) - local = ready(selection); - } - } - - if (local.state === "error") { - this._configError$.next( - local.value instanceof ElementCallError - ? local.value - : new UnknownCallError(local.value), - ); - } - - return { - local, - remote, - preferred, - multiSfu, - preferStickyEvents, - }; - }, - ) - : of(null), + // MATRIX RELATED + // + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + // DISCUSSION own membership manager + private readonly connected$ = this.scope.behavior( + and$( + this.matrixConnected$, + this.livekitConnectionState$.pipe( + map((state) => state === ConnectionState.Connected), ), ), ); + /** + * Whether we should tell the user that we're reconnecting to the call. + */ + // DISCUSSION own membership manager + public readonly reconnecting$ = this.scope.behavior( + this.connected$.pipe( + // We are reconnecting if we previously had some successful initial + // connection but are now disconnected + scan( + ({ connectedPreviously }, connectedNow) => ({ + connectedPreviously: connectedPreviously || connectedNow, + reconnecting: connectedPreviously && !connectedNow, + }), + { connectedPreviously: false, reconnecting: false }, + ), + map(({ reconnecting }) => reconnecting), + ), + ); return { connected: true, transport$ }; }; diff --git a/src/state/remoteMembers/Connection.test.ts b/src/state/remoteMembers/Connection.test.ts index 0719e2c5..4af64578 100644 --- a/src/state/remoteMembers/Connection.test.ts +++ b/src/state/remoteMembers/Connection.test.ts @@ -17,7 +17,7 @@ import { } from "vitest"; import { BehaviorSubject, of } from "rxjs"; import { - ConnectionState, + ConnectionState as LivekitConnectionState, type LocalParticipant, type RemoteParticipant, type Room as LivekitRoom, @@ -36,12 +36,11 @@ import { type ConnectionOpts, type ConnectionState, type PublishingParticipant, - RemoteConnection, + Connection, } from "./Connection.ts"; import { ObservableScope } from "../ObservableScope.ts"; import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; import { FailToGetOpenIdToken } from "../../utils/errors.ts"; -import { PublishConnection } from "../ownMember/Publisher.ts"; import { mockMediaDevices, mockMuteStates } from "../../utils/test.ts"; import type { ProcessorState } from "../../livekit/TrackProcessorContext.tsx"; import { type MuteStates } from "../MuteStates.ts"; diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index b7a37b11..f15bee10 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -42,11 +42,11 @@ export type ParticipantByMemberIdMap = Map< export class ConnectionManager { private livekitRoomFactory: () => LivekitRoom; public constructor( - private client: MatrixClient, private scope: ObservableScope, + private client: MatrixClient, private devices: MediaDevices, - private processorState: ProcessorState, - private e2eeLivekitOptions$: Behavior, + private processorState$: Behavior, + private e2eeLivekitOptions: E2EEOptions | undefined, private logger?: Logger, livekitRoomFactory?: () => LivekitRoom, ) { @@ -55,8 +55,8 @@ export class ConnectionManager { new LivekitRoom( generateRoomOption( this.devices, - this.processorState, - this.e2eeLivekitOptions$.value, + this.processorState$.value, + this.e2eeLivekitOptions, ), ); this.livekitRoomFactory = livekitRoomFactory ?? defaultFactory; diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index f288e2d0..67e11f99 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -5,11 +5,13 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type Room, type RoomMember, RoomStateEvent } from "matrix-js-sdk"; +import { type RoomMember, RoomStateEvent } from "matrix-js-sdk"; import { combineLatest, fromEvent, type Observable, startWith } from "rxjs"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; +// eslint-disable-next-line rxjs/no-internal +import { type HasEventTargetAddRemove } from "rxjs/internal/observable/fromEvent"; import { type ObservableScope } from "../ObservableScope"; import { @@ -22,11 +24,13 @@ import { type Behavior } from "../Behavior"; * Displayname for each member of the call. This will disambiguate * any displayname that clashes with another member. Only members * joined to the call are considered here. + * + * @returns Map uses the rtc member idenitfier as the key. */ // don't do this work more times than we need to. This is achieved by converting to a behavior: export const memberDisplaynames$ = ( scope: ObservableScope, - matrixRoom: Room, + matrixRoom: Pick & HasEventTargetAddRemove, memberships$: Observable, userId: string, deviceId: string, @@ -73,7 +77,7 @@ export const memberDisplaynames$ = ( export function getRoomMemberFromRtcMember( rtcMember: CallMembership, - room: MatrixRoom, + room: Pick, ): { id: string; member: RoomMember | undefined } { return { id: rtcMember.userId + ":" + rtcMember.deviceId, diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index e77306c1..0411a0ca 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -5,38 +5,23 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - type RemoteParticipant, - type Participant as LivekitParticipant, -} from "livekit-client"; +import { type Participant as LivekitParticipant } from "livekit-client"; import { isLivekitTransport, type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, startWith, type Observable } from "rxjs"; +// eslint-disable-next-line rxjs/no-internal +import { type HasEventTargetAddRemove } from "rxjs/internal/observable/fromEvent"; import type { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; // import type { Logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../Behavior"; import { type ObservableScope } from "../ObservableScope"; import { type ConnectionManager } from "./ConnectionManager"; -import { getRoomMemberFromRtcMember } from "./displayname"; - -/** - * Represents participant publishing or expected to publish on the connection. - * It is paired with its associated rtc membership. - */ -export type PublishingParticipant = { - /** - * The LiveKit participant publishing on this connection, or undefined if the participant is not currently (yet) connected to the livekit room. - */ - participant: RemoteParticipant | undefined; - /** - * The rtc call membership associated with this participant. - */ - membership: CallMembership; -}; +import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; +import { type Connection } from "./Connection"; /** * Represent a matrix call member and his associated livekit participation. @@ -45,10 +30,16 @@ export type PublishingParticipant = { */ export interface MatrixLivekitItem { membership: CallMembership; - livekitParticipant?: LivekitParticipant; - //TODO Try to remove this! Its waaay to much information - // Just use to get the member's avatar + displayName: string; + participant?: LivekitParticipant; + connection?: Connection; + /** + * TODO Try to remove this! Its waaay to much information. + * Just get the member's avatar + * @deprecated + */ member?: RoomMember; + mxcAvatarUrl?: string; } // Alternative structure idea: @@ -73,13 +64,17 @@ export class MatrixLivekitMerger { // private readonly logger: Logger; public constructor( + private scope: ObservableScope, private memberships$: Observable, private connectionManager: ConnectionManager, - private scope: ObservableScope, // TODO this is too much information for that class, // apparently needed to get a room member to later get the Avatar // => Extract an AvatarService instead? - private matrixRoom: MatrixRoom, + // Better with just `getMember` + private matrixRoom: Pick & + HasEventTargetAddRemove, + private userId: string, + private deviceId: string, // parentLogger: Logger, ) { // this.logger = parentLogger.getChild("MatrixLivekitMerger"); @@ -93,6 +88,13 @@ export class MatrixLivekitMerger { /// PRIVATES // ======================================= private start$(): Observable { + const displaynameMap$ = memberDisplaynames$( + this.scope, + this.matrixRoom, + this.memberships$, + this.userId, + this.deviceId, + ); const membershipsWithTransport$ = this.mapMembershipsToMembershipWithTransport$(); @@ -101,26 +103,33 @@ export class MatrixLivekitMerger { return combineLatest([ membershipsWithTransport$, this.connectionManager.allParticipantsByMemberId$, + displaynameMap$, ]).pipe( - map(([memberships, participantsByMemberId]) => { - const items = memberships.map(({ membership, transport }) => { - const participantsWithConnection = participantsByMemberId.get( - membership.membershipID, - ); - const participant = - transport && - participantsWithConnection?.find((p) => - areLivekitTransportsEqual(p.connection.transport, transport), + map(([memberships, participantsByMemberId, displayNameMap]) => { + const items: MatrixLivekitItem[] = memberships.map( + ({ membership, transport }) => { + const participantsWithConnection = participantsByMemberId.get( + membership.membershipID, ); - return { - livekitParticipant: participant, - membership, - // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - member: - // Why a member error? if we have a call membership there is a room member - getRoomMemberFromRtcMember(membership, this.matrixRoom)?.member, - } as MatrixLivekitItem; - }); + const participant = + transport && + participantsWithConnection?.find((p) => + areLivekitTransportsEqual(p.connection.transport, transport), + ); + const member = getRoomMemberFromRtcMember( + membership, + this.matrixRoom, + )?.member; + return { + ...participant, + membership, + // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + member, + displayName: displayNameMap.get(membership.membershipID) ?? "---", + mxcAvatarUrl: member?.getMxcAvatarUrl(), + }; + }, + ); return items; }), ); diff --git a/src/utils/displayname.ts b/src/utils/displayname.ts index 8e989d3b..1e141255 100644 --- a/src/utils/displayname.ts +++ b/src/utils/displayname.ts @@ -41,7 +41,7 @@ function removeHiddenChars(str: string): string { export function shouldDisambiguate( member: { rawDisplayName?: string; userId: string }, memberships: CallMembership[], - room: Room, + room: Pick, ): boolean { const { rawDisplayName: displayName, userId } = member; if (!displayName || displayName === userId) return false;