diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index d7735b26..1977bf4a 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -118,12 +118,12 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { type Connection, RemoteConnection } from "./Connection"; +import { type Connection, RemoteConnection } from "./remoteMembers/Connection.ts"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; -import { PublishConnection } from "./PublishConnection.ts"; +import { PublishConnection } from "./ownMember/PublishConnection.ts"; import { type Async, async$, mapAsync, ready } from "./Async"; import { sharingScreen$, UserMedia } from "./UserMedia.ts"; import { ScreenShare } from "./ScreenShare.ts"; @@ -138,6 +138,7 @@ import { } from "./layout-types.ts"; import { ElementCallError, UnknownCallError } from "../utils/errors.ts"; import { ObservableScope } from "./ObservableScope.ts"; +import { memberDisplaynames$ } from "./remoteMembers/displayname.ts"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -217,10 +218,12 @@ export class CallViewModel { private readonly join$ = new Subject(); + // DISCUSS BAD ? public join(): void { this.join$.next(); } + // CODESMALL // This is functionally the same Observable as leave$, except here it's // hoisted to the top of the class. This enables the cyclic dependency between // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> @@ -233,6 +236,7 @@ export class CallViewModel { * Whether we are joined to the call. This reflects our local state rather * than whether all connections are truly up and running. */ + // DISCUSS ? lets think why we need joined and how to do it better private readonly joined$ = this.scope.behavior( this.join$.pipe( map(() => true), @@ -246,26 +250,290 @@ export class CallViewModel { ); /** - * The MatrixRTC session participants. + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). */ - // Note that MatrixRTCSession already filters the call memberships by users - // that are joined to the room; we don't need to perform extra filtering here. - private readonly memberships$ = this.scope.behavior( - fromEvent( - this.matrixRTCSession, - MatrixRTCSessionEvent.MembershipsChanged, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.memberships), + // DISCUSS move to ownMembership + private readonly preferredTransport$ = this.scope.behavior( + async$(makeTransport(this.matrixRTCSession)), + ); + + /** + * The transport over which we should be actively publishing our media. + * null when not joined. + */ + // DISCUSSION ownMembershipManager + private readonly localTransport$: Behavior | null> = + this.scope.behavior( + this.transports$.pipe( + map((transports) => transports?.local ?? null), + distinctUntilChanged | null>(deepCompare), + ), + ); + + /** + * The transport we should advertise in our MatrixRTC membership (plus whether + * it is a multi-SFU transport and whether we should use sticky events). + */ + // DISCUSSION ownMembershipManager + private readonly advertisedTransport$: Behavior<{ + multiSfu: boolean; + preferStickyEvents: boolean; + transport: LivekitTransport; + } | null> = this.scope.behavior( + this.transports$.pipe( + map((transports) => + transports?.local.state === "ready" && + transports.preferred.state === "ready" + ? { + multiSfu: transports.multiSfu, + preferStickyEvents: transports.preferStickyEvents, + // In non-multi-SFU mode we should always advertise the preferred + // SFU to minimize the number of membership updates + transport: transports.multiSfu + ? transports.local.value + : transports.preferred.value, + } + : null, + ), + distinctUntilChanged<{ + multiSfu: boolean; + preferStickyEvents: boolean; + transport: LivekitTransport; + } | null>(deepCompare), + ), + ); + + // DISCUSSION move to ConnectionManager + /** + * The local connection over which we will publish our media. It could + * possibly also have some remote users' media available on it. + * null when not joined. + */ + private readonly localConnection$: Behavior | null> = + this.scope.behavior( + generateKeyed$< + Async | null, + PublishConnection, + Async | null + >( + this.localTransport$, + (transport, createOrGet) => + transport && + mapAsync(transport, (transport) => + createOrGet( + // Stable key that uniquely idenifies the transport + JSON.stringify({ + url: transport.livekit_service_url, + alias: transport.livekit_alias, + }), + (scope) => + new PublishConnection( + { + transport, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + livekitRoomFactory: this.options.livekitRoomFactory, + }, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + ), + ), + ), + ); + + // DISCUSSION move to ConnectionManager + public readonly livekitConnectionState$ = + // TODO: This options.connectionState$ behavior is a small hack inserted + // here to facilitate testing. This would likely be better served by + // breaking CallViewModel down into more naturally testable components. + this.options.connectionState$ ?? + this.scope.behavior( + this.localConnection$.pipe( + switchMap((c) => + c?.state === "ready" + ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? + c.value.state$.pipe( + switchMap((s) => { + if (s.state === "ConnectedToLkRoom") + return s.connectionState$; + return of(ConnectionState.Disconnected); + }), + ) + : of(ConnectionState.Disconnected), + ), + ), + ); + + /** + * Connections for each transport in use by one or more session members that + * is *distinct* from the local transport. + */ + // DISCUSSION move to ConnectionManager + private readonly remoteConnections$ = this.scope.behavior( + generateKeyed$( + this.transports$, + (transports, createOrGet) => { + const connections: Connection[] = []; + + // Until the local transport becomes ready we have no idea which + // transports will actually need a dedicated remote connection + if (transports?.local.state === "ready") { + // TODO: Handle custom transport.livekit_alias values here + const localServiceUrl = transports.local.value.livekit_service_url; + const remoteServiceUrls = new Set( + transports.remote.map( + ({ transport }) => transport.livekit_service_url, + ), + ); + remoteServiceUrls.delete(localServiceUrl); + + for (const remoteServiceUrl of remoteServiceUrls) + connections.push( + createOrGet( + remoteServiceUrl, + (scope) => + new RemoteConnection( + { + transport: { + type: "livekit", + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + }, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + livekitRoomFactory: this.options.livekitRoomFactory, + }, + this.e2eeLivekitOptions(), + ), + ), + ); + } + + return connections; + }, ), ); /** - * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). + * A list of the connections that should be active at any given time. */ - private readonly preferredTransport$ = this.scope.behavior( - async$(makeTransport(this.matrixRTCSession)), + // DISCUSSION move to ConnectionManager + private readonly connections$ = this.scope.behavior( + combineLatest( + [this.localConnection$, this.remoteConnections$], + (local, remote) => [ + ...(local?.state === "ready" ? [local.value] : []), + ...remote.values(), + ], + ), + ); + + /** + * Emits with connections whenever they should be started or stopped. + */ + // DISCUSSION move to ConnectionManager + private readonly connectionInstructions$ = this.connections$.pipe( + pairwise(), + map(([prev, next]) => { + const start = new Set(next.values()); + for (const connection of prev) start.delete(connection); + const stop = new Set(prev.values()); + for (const connection of next) stop.delete(connection); + + return { start, stop }; + }), + ); + + public readonly allLivekitRooms$ = this.scope.behavior( + this.connections$.pipe( + map((connections) => + [...connections.values()].map((c) => ({ + room: c.livekitRoom, + url: c.transport.livekit_service_url, + isLocal: c instanceof PublishConnection, + })), + ), + ), + ); + + private readonly userId = this.matrixRoom.client.getUserId()!; + private readonly deviceId = this.matrixRoom.client.getDeviceId()!; + + /** + * Whether we are connected to the MatrixRTC session. + */ + // DISCUSSION own membership manager + private readonly matrixConnected$ = this.scope.behavior( + // To consider ourselves connected to MatrixRTC, we check the following: + and$( + // The client is connected to the sync loop + ( + fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable< + [SyncState] + > + ).pipe( + startWith([this.matrixRoom.client.getSyncState()]), + map(([state]) => state === SyncState.Syncing), + ), + // Room state observed by session says we're connected + fromEvent( + this.matrixRTCSession, + MembershipManagerEvent.StatusChanged, + ).pipe( + startWith(null), + map(() => this.matrixRTCSession.membershipStatus === Status.Connected), + ), + // Also watch out for warnings that we've likely hit a timeout and our + // delayed leave event is being sent (this condition is here because it + // provides an earlier warning than the sync loop timeout, and we wouldn't + // see the actual leave event until we reconnect to the sync loop) + fromEvent( + this.matrixRTCSession, + MembershipManagerEvent.ProbablyLeft, + ).pipe( + startWith(null), + map(() => this.matrixRTCSession.probablyLeft !== true), + ), + ), + ); + + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + // DISCUSSION own membership manager + private readonly connected$ = this.scope.behavior( + and$( + this.matrixConnected$, + this.livekitConnectionState$.pipe( + map((state) => state === ConnectionState.Connected), + ), + ), + ); + + /** + * Whether we should tell the user that we're reconnecting to the call. + */ + // DISCUSSION own membership manager + public readonly reconnecting$ = this.scope.behavior( + this.connected$.pipe( + // We are reconnecting if we previously had some successful initial + // connection but are now disconnected + scan( + ({ connectedPreviously }, connectedNow) => ({ + connectedPreviously: connectedPreviously || connectedNow, + reconnecting: connectedPreviously && !connectedNow, + }), + { connectedPreviously: false, reconnecting: false }, + ), + map(({ reconnecting }) => reconnecting), + ), ); /** @@ -276,7 +544,8 @@ export class CallViewModel { * together when it might change together is what you have to do in RxJS to * avoid reading inconsistent state or observing too many changes.) */ - // TODO-MULTI-SFU find a better name for this. with the addition of sticky events it's no longer just about transports. + // TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports. + // DISCUSS move the local part to the own membership file private readonly transports$: Behavior<{ local: Async; remote: { membership: CallMembership; transport: LivekitTransport }[]; @@ -342,282 +611,6 @@ export class CallViewModel { ), ); - /** - * Lists the transports used by each MatrixRTC session member other than - * ourselves. - */ - private readonly remoteTransports$ = this.scope.behavior( - this.transports$.pipe(map((transports) => transports?.remote ?? [])), - ); - - /** - * The transport over which we should be actively publishing our media. - * null when not joined. - */ - private readonly localTransport$: Behavior | null> = - this.scope.behavior( - this.transports$.pipe( - map((transports) => transports?.local ?? null), - distinctUntilChanged | null>(deepCompare), - ), - ); - - /** - * The transport we should advertise in our MatrixRTC membership (plus whether - * it is a multi-SFU transport and whether we should use sticky events). - */ - private readonly advertisedTransport$: Behavior<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null> = this.scope.behavior( - this.transports$.pipe( - map((transports) => - transports?.local.state === "ready" && - transports.preferred.state === "ready" - ? { - multiSfu: transports.multiSfu, - preferStickyEvents: transports.preferStickyEvents, - // In non-multi-SFU mode we should always advertise the preferred - // SFU to minimize the number of membership updates - transport: transports.multiSfu - ? transports.local.value - : transports.preferred.value, - } - : null, - ), - distinctUntilChanged<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null>(deepCompare), - ), - ); - - /** - * The local connection over which we will publish our media. It could - * possibly also have some remote users' media available on it. - * null when not joined. - */ - private readonly localConnection$: Behavior | null> = - this.scope.behavior( - generateKeyed$< - Async | null, - PublishConnection, - Async | null - >( - this.localTransport$, - (transport, createOrGet) => - transport && - mapAsync(transport, (transport) => - createOrGet( - // Stable key that uniquely idenifies the transport - JSON.stringify({ - url: transport.livekit_service_url, - alias: transport.livekit_alias, - }), - (scope) => - new PublishConnection( - { - transport, - client: this.matrixRoom.client, - scope, - remoteTransports$: this.remoteTransports$, - livekitRoomFactory: this.options.livekitRoomFactory, - }, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ), - ), - ), - ), - ); - - public readonly livekitConnectionState$ = - // TODO: This options.connectionState$ behavior is a small hack inserted - // here to facilitate testing. This would likely be better served by - // breaking CallViewModel down into more naturally testable components. - this.options.connectionState$ ?? - this.scope.behavior( - this.localConnection$.pipe( - switchMap((c) => - c?.state === "ready" - ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? - c.value.transportState$.pipe( - switchMap((s) => { - if (s.state === "ConnectedToLkRoom") - return s.connectionState$; - return of(ConnectionState.Disconnected); - }), - ) - : of(ConnectionState.Disconnected), - ), - ), - ); - - /** - * Connections for each transport in use by one or more session members that - * is *distinct* from the local transport. - */ - private readonly remoteConnections$ = this.scope.behavior( - generateKeyed$( - this.transports$, - (transports, createOrGet) => { - const connections: Connection[] = []; - - // Until the local transport becomes ready we have no idea which - // transports will actually need a dedicated remote connection - if (transports?.local.state === "ready") { - // TODO: Handle custom transport.livekit_alias values here - const localServiceUrl = transports.local.value.livekit_service_url; - const remoteServiceUrls = new Set( - transports.remote.map( - ({ transport }) => transport.livekit_service_url, - ), - ); - remoteServiceUrls.delete(localServiceUrl); - - for (const remoteServiceUrl of remoteServiceUrls) - connections.push( - createOrGet( - remoteServiceUrl, - (scope) => - new RemoteConnection( - { - transport: { - type: "livekit", - livekit_service_url: remoteServiceUrl, - livekit_alias: this.livekitAlias, - }, - client: this.matrixRoom.client, - scope, - remoteTransports$: this.remoteTransports$, - livekitRoomFactory: this.options.livekitRoomFactory, - }, - this.e2eeLivekitOptions(), - ), - ), - ); - } - - return connections; - }, - ), - ); - - /** - * A list of the connections that should be active at any given time. - */ - private readonly connections$ = this.scope.behavior( - combineLatest( - [this.localConnection$, this.remoteConnections$], - (local, remote) => [ - ...(local?.state === "ready" ? [local.value] : []), - ...remote.values(), - ], - ), - ); - - /** - * Emits with connections whenever they should be started or stopped. - */ - private readonly connectionInstructions$ = this.connections$.pipe( - pairwise(), - map(([prev, next]) => { - const start = new Set(next.values()); - for (const connection of prev) start.delete(connection); - const stop = new Set(prev.values()); - for (const connection of next) stop.delete(connection); - - return { start, stop }; - }), - ); - - public readonly allLivekitRooms$ = this.scope.behavior( - this.connections$.pipe( - map((connections) => - [...connections.values()].map((c) => ({ - room: c.livekitRoom, - url: c.transport.livekit_service_url, - isLocal: c instanceof PublishConnection, - })), - ), - ), - ); - - private readonly userId = this.matrixRoom.client.getUserId()!; - private readonly deviceId = this.matrixRoom.client.getDeviceId()!; - - /** - * Whether we are connected to the MatrixRTC session. - */ - private readonly matrixConnected$ = this.scope.behavior( - // To consider ourselves connected to MatrixRTC, we check the following: - and$( - // The client is connected to the sync loop - ( - fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable< - [SyncState] - > - ).pipe( - startWith([this.matrixRoom.client.getSyncState()]), - map(([state]) => state === SyncState.Syncing), - ), - // Room state observed by session says we're connected - fromEvent( - this.matrixRTCSession, - MembershipManagerEvent.StatusChanged, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.membershipStatus === Status.Connected), - ), - // Also watch out for warnings that we've likely hit a timeout and our - // delayed leave event is being sent (this condition is here because it - // provides an earlier warning than the sync loop timeout, and we wouldn't - // see the actual leave event until we reconnect to the sync loop) - fromEvent( - this.matrixRTCSession, - MembershipManagerEvent.ProbablyLeft, - ).pipe( - startWith(null), - map(() => this.matrixRTCSession.probablyLeft !== true), - ), - ), - ); - - /** - * Whether we are "fully" connected to the call. Accounts for both the - * connection to the MatrixRTC session and the LiveKit publish connection. - */ - private readonly connected$ = this.scope.behavior( - and$( - this.matrixConnected$, - this.livekitConnectionState$.pipe( - map((state) => state === ConnectionState.Connected), - ), - ), - ); - - /** - * Whether we should tell the user that we're reconnecting to the call. - */ - public readonly reconnecting$ = this.scope.behavior( - this.connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), - ), - ); - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. @@ -626,6 +619,7 @@ export class CallViewModel { // that the LiveKit connection is still functional while the homeserver is // down, for example, and we want to avoid making people worry that the app is // in a split-brained state. + // DISCUSSION own membership manager ALSO this probably can be simplifis private readonly pretendToBeDisconnected$ = this.reconnecting$; /** @@ -718,57 +712,6 @@ export class CallViewModel { ), ); - /** - * Displaynames for each member of the call. This will disambiguate - * any displaynames that clashes with another member. Only members - * joined to the call are considered here. - */ - // It turns out that doing the disambiguation above is rather expensive on Safari (10x slower - // than on Chrome/Firefox). This means it is important that we multicast the result so that we - // don't do this work more times than we need to. This is achieved by converting to a behavior: - public readonly memberDisplaynames$ = this.scope.behavior( - combineLatest( - [ - // Handle call membership changes - this.memberships$, - // Additionally handle display name changes (implicitly reacting to them) - fromEvent(this.matrixRoom, RoomStateEvent.Members).pipe( - startWith(null), - ), - // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), - ], - (memberships, _displaynames) => { - const displaynameMap = new Map([ - [ - `${this.userId}:${this.deviceId}`, - this.matrixRoom.getMember(this.userId)?.rawDisplayName ?? - this.userId, - ], - ]); - const room = this.matrixRoom; - - // We only consider RTC members for disambiguation as they are the only visible members. - for (const rtcMember of memberships) { - const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`; - const { member } = getRoomMemberFromRtcMember(rtcMember, room); - if (!member) { - logger.error( - "Could not find member for media id:", - matrixIdentifier, - ); - continue; - } - const disambiguate = shouldDisambiguate(member, memberships, room); - displaynameMap.set( - matrixIdentifier, - calculateDisplayName(member, disambiguate), - ); - } - return displaynameMap; - }, - ), - ); - public readonly handsRaised$ = this.scope.behavior( this.handsRaisedSubject$.pipe(pauseWhen(this.pretendToBeDisconnected$)), ); @@ -787,6 +730,14 @@ export class CallViewModel { ), ); + memberDisplaynames$ = memberDisplaynames$( + this.matrixRoom, + this.memberships$, + this.scope, + this.userId, + this.deviceId, + ); + /** * List of MediaItems that we want to have tiles for. */ @@ -1655,6 +1606,8 @@ export class CallViewModel { /** * Emits an array of reactions that should be visible on the screen. */ + // DISCUSSION move this into a reaction file + // const {visibleReactions$, audibleReactions$} = reactionsObservables$(showReactionSetting$, ) public readonly visibleReactions$ = this.scope.behavior( showReactions.value$.pipe( switchMap((show) => (show ? this.reactions$ : of({}))), @@ -1790,6 +1743,7 @@ export class CallViewModel { private readonly trackProcessorState$: Observable, ) { // Start and stop local and remote connections as needed + // DISCUSSION connection manager this.connectionInstructions$ .pipe(this.scope.bind()) .subscribe(({ start, stop }) => { @@ -1947,13 +1901,3 @@ function getE2eeKeyProvider( return keyProvider; } } - -function getRoomMemberFromRtcMember( - rtcMember: CallMembership, - room: MatrixRoom, -): { id: string; member: RoomMember | undefined } { - return { - id: rtcMember.userId + ":" + rtcMember.deviceId, - member: room.getMember(rtcMember.userId) ?? undefined, - }; -} diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts new file mode 100644 index 00000000..1a5c1b24 --- /dev/null +++ b/src/state/ownMember/OwnMembership.ts @@ -0,0 +1,85 @@ +import { Behavior } from "../Behavior"; + +const ownMembership$ = ( + multiSfu: boolean, + preferStickyEvents: boolean, +): { + connected: Behavior; + transport: Behavior; +} => { + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. For completeness this also lists the preferred transport and + * whether we are in multi-SFU mode or sticky events mode (because + * advertisedTransport$ wants to read them at the same time, and bundling data + * together when it might change together is what you have to do in RxJS to + * avoid reading inconsistent state or observing too many changes.) + */ + // TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports. + // DISCUSS move to MatrixLivekitMerger + const transport$: Behavior<{ + local: Async; + preferred: Async; + multiSfu: boolean; + preferStickyEvents: boolean; + } | null> = this.scope.behavior( + this.joined$.pipe( + switchMap((joined) => + joined + ? combineLatest( + [ + this.preferredTransport$, + this.memberships$, + multiSfu.value$, + preferStickyEvents.value$, + ], + (preferred, memberships, preferMultiSfu, preferStickyEvents) => { + // Multi-SFU must be implicitly enabled when using sticky events + const multiSfu = preferStickyEvents || preferMultiSfu; + + const oldestMembership = + this.matrixRTCSession.getOldestMembership(); + const remote = memberships.flatMap((m) => { + if (m.userId === this.userId && m.deviceId === this.deviceId) + return []; + const t = m.getTransport(oldestMembership ?? m); + return t && isLivekitTransport(t) + ? [{ membership: m, transport: t }] + : []; + }); + + let local = preferred; + if (!multiSfu) { + const oldest = this.matrixRTCSession.getOldestMembership(); + if (oldest !== undefined) { + const selection = oldest.getTransport(oldest); + // TODO selection can be null if no transport is configured should we report an error? + if (selection && isLivekitTransport(selection)) + local = ready(selection); + } + } + + if (local.state === "error") { + this._configError$.next( + local.value instanceof ElementCallError + ? local.value + : new UnknownCallError(local.value), + ); + } + + return { + local, + remote, + preferred, + multiSfu, + preferStickyEvents, + }; + }, + ) + : of(null), + ), + ), + ); + + return { connected: true, transport$ }; +}; diff --git a/src/state/PublishConnection.ts b/src/state/ownMember/PublishConnection.ts similarity index 94% rename from src/state/PublishConnection.ts rename to src/state/ownMember/PublishConnection.ts index cfbcba90..3feb8a52 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/ownMember/PublishConnection.ts @@ -21,19 +21,19 @@ import { } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import type { Behavior } from "./Behavior.ts"; -import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; -import type { MuteStates } from "./MuteStates.ts"; +import type { Behavior } from "../Behavior.ts"; +import type { MediaDevices, SelectedDevice } from "../MediaDevices.ts"; +import type { MuteStates } from "../MuteStates.ts"; import { type ProcessorState, trackProcessorSync, -} from "../livekit/TrackProcessorContext.tsx"; -import { getUrlParams } from "../UrlParams.ts"; -import { defaultLiveKitOptions } from "../livekit/options.ts"; -import { getValue } from "../utils/observable.ts"; -import { observeTrackReference$ } from "./MediaViewModel.ts"; -import { Connection, type ConnectionOpts } from "./Connection.ts"; -import { type ObservableScope } from "./ObservableScope.ts"; +} from "../../livekit/TrackProcessorContext.tsx"; +import { getUrlParams } from "../../UrlParams.ts"; +import { defaultLiveKitOptions } from "../../livekit/options.ts"; +import { getValue } from "../../utils/observable.ts"; +import { observeTrackReference$ } from "../MediaViewModel.ts"; +import { Connection, type ConnectionOpts } from "../remoteMembers/Connection.ts"; +import { type ObservableScope } from "../ObservableScope.ts"; /** * A connection to the local LiveKit room, the one the user is publishing to. diff --git a/src/state/Connection.test.ts b/src/state/remoteMembers/Connection.test.ts similarity index 94% rename from src/state/Connection.test.ts rename to src/state/remoteMembers/Connection.test.ts index b5389db4..3b0f42ee 100644 --- a/src/state/Connection.test.ts +++ b/src/state/remoteMembers/Connection.test.ts @@ -34,17 +34,17 @@ import type { } from "matrix-js-sdk/lib/matrixrtc"; import { type ConnectionOpts, - type TransportState, + type ConnectionState, type PublishingParticipant, RemoteConnection, } from "./Connection.ts"; -import { ObservableScope } from "./ObservableScope.ts"; -import { type OpenIDClientParts } from "../livekit/openIDSFU.ts"; -import { FailToGetOpenIdToken } from "../utils/errors.ts"; -import { PublishConnection } from "./PublishConnection.ts"; -import { mockMediaDevices, mockMuteStates } from "../utils/test.ts"; -import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx"; -import { type MuteStates } from "./MuteStates.ts"; +import { ObservableScope } from "../ObservableScope.ts"; +import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; +import { FailToGetOpenIdToken } from "../../utils/errors.ts"; +import { PublishConnection } from "../ownMember/PublishConnection.ts"; +import { mockMediaDevices, mockMuteStates } from "../../utils/test.ts"; +import type { ProcessorState } from "../../livekit/TrackProcessorContext.tsx"; +import { type MuteStates } from "../MuteStates.ts"; let testScope: ObservableScope; @@ -161,7 +161,7 @@ describe("Start connection states", () => { }; const connection = new RemoteConnection(opts, undefined); - expect(connection.transportState$.getValue().state).toEqual("Initialized"); + expect(connection.state$.getValue().state).toEqual("Initialized"); }); it("fail to getOpenId token then error state", async () => { @@ -178,8 +178,8 @@ describe("Start connection states", () => { const connection = new RemoteConnection(opts, undefined); - const capturedStates: TransportState[] = []; - const s = connection.transportState$.subscribe((value) => { + const capturedStates: ConnectionState[] = []; + const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); onTestFinished(() => s.unsubscribe()); @@ -231,8 +231,8 @@ describe("Start connection states", () => { const connection = new RemoteConnection(opts, undefined); - const capturedStates: TransportState[] = []; - const s = connection.transportState$.subscribe((value) => { + const capturedStates: ConnectionState[] = []; + const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); onTestFinished(() => s.unsubscribe()); @@ -288,8 +288,8 @@ describe("Start connection states", () => { const connection = new RemoteConnection(opts, undefined); - const capturedStates: TransportState[] = []; - const s = connection.transportState$.subscribe((value) => { + const capturedStates: ConnectionState[] = []; + const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); onTestFinished(() => s.unsubscribe()); @@ -345,8 +345,8 @@ describe("Start connection states", () => { const connection = setupRemoteConnection(); - const capturedStates: TransportState[] = []; - const s = connection.transportState$.subscribe((value) => { + const capturedStates: ConnectionState[] = []; + const s = connection.state$.subscribe((value) => { capturedStates.push(value); }); onTestFinished(() => s.unsubscribe()); @@ -401,7 +401,7 @@ describe("Publishing participants observations", () => { const bobIsAPublisher = Promise.withResolvers(); const danIsAPublisher = Promise.withResolvers(); const observedPublishers: PublishingParticipant[][] = []; - const s = connection.publishingParticipants$.subscribe((publishers) => { + const s = connection.allLivekitParticipants$.subscribe((publishers) => { observedPublishers.push(publishers); if ( publishers.some( @@ -538,7 +538,7 @@ describe("Publishing participants observations", () => { const connection = setupRemoteConnection(); let observedPublishers: PublishingParticipant[][] = []; - const s = connection.publishingParticipants$.subscribe((publishers) => { + const s = connection.allLivekitParticipants$.subscribe((publishers) => { observedPublishers.push(publishers); }); onTestFinished(() => s.unsubscribe()); diff --git a/src/state/Connection.ts b/src/state/remoteMembers/Connection.ts similarity index 84% rename from src/state/Connection.ts rename to src/state/remoteMembers/Connection.ts index 005c1359..72239de0 100644 --- a/src/state/Connection.ts +++ b/src/state/remoteMembers/Connection.ts @@ -11,7 +11,7 @@ import { } from "@livekit/components-core"; import { ConnectionError, - type ConnectionState, + type ConnectionState as LivekitConenctionState, type E2EEOptions, type RemoteParticipant, Room as LivekitRoom, @@ -21,21 +21,21 @@ import { type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; -import { logger } from "matrix-js-sdk/lib/logger"; import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; +import { type Logger } from "matrix-js-sdk/lib/logger"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig, -} from "../livekit/openIDSFU"; -import { type Behavior } from "./Behavior"; -import { type ObservableScope } from "./ObservableScope"; -import { defaultLiveKitOptions } from "../livekit/options"; +} from "../../livekit/openIDSFU.ts"; +import { type Behavior } from "../Behavior.ts"; +import { type ObservableScope } from "../ObservableScope.ts"; +import { defaultLiveKitOptions } from "../../livekit/options.ts"; import { InsufficientCapacityError, SFURoomCreationRestrictedError, -} from "../utils/errors.ts"; +} from "../../utils/errors.ts"; export interface ConnectionOpts { /** The media transport to connect to. */ @@ -44,8 +44,14 @@ export interface ConnectionOpts { client: OpenIDClientParts; /** The observable scope to use for this connection. */ scope: ObservableScope; - /** An observable of the current RTC call memberships and their associated transports. */ - remoteTransports$: Behavior< + /** + * An observable of the current RTC call memberships and their associated transports. + * Used to differentiate between publishing and subscribging participants on each connection. + * Used to find out which rtc member should upload to this connection (publishingParticipants$). + * The livekit room gives access to all the users subscribing to this connection, we need + * to filter out the ones that are uploading to this connection. + */ + membershipsWithTransport$: Behavior< { membership: CallMembership; transport: LivekitTransport }[] >; @@ -53,7 +59,7 @@ export interface ConnectionOpts { livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; } -export type TransportState = +export type ConnectionState = | { state: "Initialized" } | { state: "FetchingConfig"; transport: LivekitTransport } | { state: "ConnectingToLkRoom"; transport: LivekitTransport } @@ -61,7 +67,7 @@ export type TransportState = | { state: "FailedToStart"; error: Error; transport: LivekitTransport } | { state: "ConnectedToLkRoom"; - connectionState$: Observable; + livekitConnectionState$: Observable; transport: LivekitTransport; } | { state: "Stopped"; transport: LivekitTransport }; @@ -88,15 +94,14 @@ export type PublishingParticipant = { */ export class Connection { // Private Behavior - private readonly _transportState$ = new BehaviorSubject({ + private readonly _state$ = new BehaviorSubject({ state: "Initialized", }); /** * The current state of the connection to the media transport. */ - public readonly transportState$: Behavior = - this._transportState$; + public readonly state$: Behavior = this._state$; /** * Whether the connection has been stopped. @@ -118,7 +123,7 @@ export class Connection { public async start(): Promise { this.stopped = false; try { - this._transportState$.next({ + this._state$.next({ state: "FetchingConfig", transport: this.transport, }); @@ -126,7 +131,7 @@ export class Connection { // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._transportState$.next({ + this._state$.next({ state: "ConnectingToLkRoom", transport: this.transport, }); @@ -157,13 +162,13 @@ export class Connection { // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._transportState$.next({ + this._state$.next({ state: "ConnectedToLkRoom", transport: this.transport, - connectionState$: connectionStateObserver(this.livekitRoom), + livekitConnectionState$: connectionStateObserver(this.livekitRoom), }); } catch (error) { - this._transportState$.next({ + this._state$.next({ state: "FailedToStart", error: error instanceof Error ? error : new Error(`${error}`), transport: this.transport, @@ -188,7 +193,7 @@ export class Connection { public async stop(): Promise { if (this.stopped) return; await this.livekitRoom.disconnect(); - this._transportState$.next({ + this._state$.next({ state: "Stopped", transport: this.transport, }); @@ -218,23 +223,22 @@ export class Connection { protected constructor( public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, + logger?: Logger, ) { - logger.log( + logger?.info( `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, ); - const { transport, client, scope, remoteTransports$ } = opts; + const { transport, client, scope, membershipsWithTransport$ } = opts; this.transport = transport; this.client = client; - const participantsIncludingSubscribers$ = scope.behavior( - connectedParticipantsObserver(this.livekitRoom), - [], - ); + const participantsIncludingSubscribers$: Behavior = + scope.behavior(connectedParticipantsObserver(this.livekitRoom), []); this.publishingParticipants$ = scope.behavior( combineLatest( - [participantsIncludingSubscribers$, remoteTransports$], + [participantsIncludingSubscribers$, membershipsWithTransport$], (participants, remoteTransports) => remoteTransports // Find all members that claim to publish on this connection diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts new file mode 100644 index 00000000..ec3231c6 --- /dev/null +++ b/src/state/remoteMembers/displayname.ts @@ -0,0 +1,78 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { type Room, type RoomMember, RoomStateEvent } from "matrix-js-sdk"; +import { combineLatest, fromEvent, type Observable, startWith } from "rxjs"; +import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; + +import { type ObservableScope } from "../ObservableScope"; +import { calculateDisplayName, shouldDisambiguate } from "../../utils/displayname"; + +/** + * Displayname for each member of the call. This will disambiguate + * any displayname that clashes with another member. Only members + * joined to the call are considered here. + */ +// don't do this work more times than we need to. This is achieved by converting to a behavior: +export const memberDisplaynames$ = ( + matrixRoom: Room, + memberships$: Observable, + scope: ObservableScope, + userId: string, + deviceId: string, +) => + scope.behavior( + combineLatest( + [ + // Handle call membership changes + memberships$, + // Additionally handle display name changes (implicitly reacting to them) + fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), + // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), + ], + (memberships, _displaynames) => { + const displaynameMap = new Map([ + [ + `${userId}:${deviceId}`, + matrixRoom.getMember(userId)?.rawDisplayName ?? userId, + ], + ]); + const room = matrixRoom; + + // We only consider RTC members for disambiguation as they are the only visible members. + for (const rtcMember of memberships) { + const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`; + const { member } = getRoomMemberFromRtcMember(rtcMember, room); + if (!member) { + logger.error( + "Could not find member for media id:", + matrixIdentifier, + ); + continue; + } + const disambiguate = shouldDisambiguate(member, memberships, room); + displaynameMap.set( + matrixIdentifier, + calculateDisplayName(member, disambiguate), + ); + } + return displaynameMap; + }, + ), + ); + +export function getRoomMemberFromRtcMember( + rtcMember: CallMembership, + room: MatrixRoom, +): { id: string; member: RoomMember | undefined } { + return { + id: rtcMember.userId + ":" + rtcMember.deviceId, + member: room.getMember(rtcMember.userId) ?? undefined, + }; +} diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts new file mode 100644 index 00000000..935f36cb --- /dev/null +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -0,0 +1,199 @@ +/* +Copyright 2025 Element c. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + LocalParticipant, + Participant, + RemoteParticipant, + type Participant as LivekitParticipant, + type Room as LivekitRoom, +} from "livekit-client"; +import { + type MatrixRTCSession, + MatrixRTCSessionEvent, + type CallMembership, + type Transport, + LivekitTransport, + isLivekitTransport, +} from "matrix-js-sdk/lib/matrixrtc"; +import { + combineLatest, + fromEvent, + map, + startWith, + switchMap, + type Observable, +} from "rxjs"; + +import { type ObservableScope } from "../ObservableScope"; +import { type Connection } from "./Connection"; +import { Behavior } from "../Behavior"; +import { RoomMember } from "matrix-js-sdk"; +import { getRoomMemberFromRtcMember } from "./displayname"; + + +// TODOs: +// - make ConnectionManager its own actual class +// - write test for scopes (do we really need to bind scope) +class ConnectionManager { + constructor(transports$: Observable) {} + public readonly connections$: Observable; +} + +/** + * Represent a matrix call member and his associated livekit participation. + * `livekitParticipant` can be undefined if the member is not yet connected to the livekit room + * or if it has no livekit transport at all. + */ + +export interface MatrixLivekitItem { + callMembership: CallMembership; + livekitParticipant?: LivekitParticipant; +} + +// Alternative structure idea: +// const livekitMatrixItems$ = (callMemberships$,connectionManager,scope): Observable => { + + // Map of Connection -> to (callMembership, LivekitParticipant?)) +type participants = {participant: LocalParticipant | RemoteParticipant}[] + +interface LivekitRoomWithParticipants { + livekitRoom: LivekitRoom; + url: string; // Included for use as a React key + participants: { + // What id is that?? + // Looks like it userId:Deviceid? + id: string; + participant: LocalParticipant | RemoteParticipant | undefined; + // Why do we fetch a full room member here? + // looks like it is only for avatars? + // TODO: Remove that. have some Avatar Provider that can fetch avatar for user ids. + member: RoomMember; + }[]; +} + +/** + * Combines MatrixRtc and Livekit worlds. + * + * It has a small public interface: + * - in (via constructor): + * - an observable of CallMembership[] to track the call members (The matrix side) + * - a `ConnectionManager` for the lk rooms (The livekit side) + * - out (via public Observable): + * - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data. + */ +export class MatrixLivekitMerger { + public remoteMatrixLivekitItems$: Observable; + + /** + * The MatrixRTC session participants. + */ + // Note that MatrixRTCSession already filters the call memberships by users + // that are joined to the room; we don't need to perform extra filtering here. + private readonly memberships$ = this.scope.behavior( + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ).pipe( + startWith(null), + map(() => this.matrixRTCSession.memberships), + ), + ); + + public constructor( + private matrixRTCSession: MatrixRTCSession, + private connectionManager: ConnectionManager, + private scope: ObservableScope, + ) { + const publishingParticipants$ = combineLatest([ + this.memberships$, + connectionManager.connections$, + ]).pipe(map(), this.scope.bind()); + this.remoteMatrixLivekitItems$ = combineLatest([ + callMemberships$, + connectionManager.connections$, + ]).pipe(this.scope.bind()); + // Implementation goes here + } + + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. For completeness this also lists the preferred transport and + * whether we are in multi-SFU mode or sticky events mode (because + * advertisedTransport$ wants to read them at the same time, and bundling data + * together when it might change together is what you have to do in RxJS to + * avoid reading inconsistent state or observing too many changes.) + */ + private readonly membershipsWithTransport$: Behavior<{ + membership: CallMembership; + transport?: LivekitTransport; + } | null> = this.scope.behavior( + this.memberships$.pipe( + map((memberships) => { + const oldestMembership = this.matrixRTCSession.getOldestMembership(); + + memberships.map((membership) => { + let transport = membership.getTransport(oldestMembership ?? membership) + return { membership, transport: isLivekitTransport(transport) ? transport : undefined }; + }) + }), + ), + ); + + /** + * Lists the transports used by each MatrixRTC session member other than + * ourselves. + */ + // private readonly remoteTransports$ = this.scope.behavior( + // this.membershipsWithTransport$.pipe( + // map((transports) => transports?.remote ?? []), + // ), + // ); + + /** + * Lists, for each LiveKit room, the LiveKit participants whose media should + * be presented. + */ + private readonly participantsByRoom$ = this.scope.behavior( + // TODO: Move this logic into Connection/PublishConnection if possible + + this.connectionManager.connections$.pipe( + switchMap((connections) => { + connections.map((c)=>c.publishingParticipants$.pipe( + map((publishingParticipants) => { + const participants: { + id: string; + participant: LivekitParticipant | undefined; + member: RoomMember; + }[] = publishingParticipants.map(({ participant, membership }) => ({ + // TODO update to UUID + id: `${membership.userId}:${membership.deviceId}`, + participant, + // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + member: + getRoomMemberFromRtcMember( + membership, + this.matrixRoom, + )?.member ?? memberError(), + })); + + return { + livekitRoom: c.livekitRoom, + url: c.transport.livekit_service_url, + participants, + }; + }), + ), + ), + ), + ), + ); + }), + ) + .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)), + ); +}