diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index e2cd6c55..d7f48dfa 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -67,7 +67,7 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "./MediaViewModel"; -import { accumulate, generateKeyed$, pauseWhen } from "../utils/observable"; +import { accumulate, generateMap$, pauseWhen } from "../utils/observable"; import { duplicateTiles, MatrixRTCMode, @@ -110,7 +110,7 @@ import { } from "./layout-types.ts"; import { type ElementCallError } from "../utils/errors.ts"; import { type ObservableScope } from "./ObservableScope.ts"; -import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts"; +import { createMatrixLivekitMembers$ } from "./remoteMembers/MatrixLivekitMembers.ts"; import { createLocalMembership$ } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createSessionMembershipsAndTransports$ } from "./SessionBehaviors.ts"; @@ -335,24 +335,17 @@ export class CallViewModel { */ // TODO KEEP THIS!! and adapt it to what our membershipManger returns private readonly mediaItems$ = this.scope.behavior( - generateKeyed$< - [typeof this.matrixLivekitMembers$.value, number], - MediaItem, - MediaItem[] - >( + generateMap$( // Generate a collection of MediaItems from the list of expected (whether // present or missing) LiveKit participants. combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]), - ([matrixLivekitMembers, duplicateTiles], createOrGet) => { + function* ([matrixLivekitMembers, duplicateTiles]) { const items: MediaItem[] = []; - for (const { - connection, - participant, - member, - displayName$, + for (const [ participantId, - } of matrixLivekitMembers) { + { connection$, participant$, member$, displayName$ }, + ] of matrixLivekitMembers) { if (connection === undefined) { logger.warn("connection is not yet initialised."); continue; @@ -361,7 +354,6 @@ export class CallViewModel { const mediaId = `${participantId}:${i}`; const lkRoom = connection?.livekitRoom; const url = connection?.transport.livekit_service_url; - const dpName$ = displayName$.pipe(map((n) => n ?? "[👻]")); const item = createOrGet( mediaId, (scope) => @@ -378,7 +370,7 @@ export class CallViewModel { url, this.mediaDevices, this.pretendToBeDisconnected$, - dpName$, + displayName$, this.handsRaised$.pipe( map((v) => v[participantId]?.time ?? null), ), @@ -405,7 +397,7 @@ export class CallViewModel { lkRoom, url, this.pretendToBeDisconnected$, - dpName$, + displayName$, ), ), ); diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index f596de2d..f5f3f99b 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -20,8 +20,8 @@ import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type Behavior } from "../Behavior"; import { type Connection } from "./Connection"; import { type ObservableScope } from "../ObservableScope"; -import { generateKeyed$ } from "../../utils/observable"; -import { areLivekitTransportsEqual } from "./matrixLivekitMerger"; +import { generateItems$ } from "../../utils/observable"; +import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; export class ConnectionManagerData { @@ -142,31 +142,28 @@ export function createConnectionManager$({ * Connections for each transport in use by one or more session members. */ const connections$ = scope.behavior( - generateKeyed$( + generateItems$( transports$, - (transports, createOrGet) => { - const createConnection = - ( - transport: LivekitTransport, - ): ((scope: ObservableScope) => Connection) => - (scope) => { - const connection = connectionFactory.createConnection( - transport, - scope, - logger, - ); - // Start the connection immediately - // Use connection state to track connection progress - void connection.start(); - // TODO subscribe to connection state to retry or log issues? - return connection; + function* (transports) { + for (const transport of transports) + yield { + // We need to serialize the transport to a string to properly use it + // as a Map key, but we also need the real transport object in order + // to construct the connection; pass it through as the item's data. + key: `${transport.livekit_service_url}|${transport.livekit_alias}`, + data: transport, }; - - return transports.map((transport) => { - const key = - transport.livekit_service_url + "|" + transport.livekit_alias; - return createOrGet(key, createConnection(transport)); - }); + }, + (scope, key, transport$) => { + const connection = connectionFactory.createConnection( + transport$.value, + scope, + logger, + ); + // Start the connection immediately + // Use connection state to track connection progress + void connection.start(); + return connection; }, ), ); diff --git a/src/state/remoteMembers/MatrixLivekitMembers.ts b/src/state/remoteMembers/MatrixLivekitMembers.ts index cb9f1709..c6d8ee1f 100644 --- a/src/state/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/remoteMembers/MatrixLivekitMembers.ts @@ -13,10 +13,23 @@ import { type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, map, startWith, type Observable } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { + combineLatest, + filter, + fromEvent, + map, + startWith, + switchMap, + type Observable, +} from "rxjs"; // eslint-disable-next-line rxjs/no-internal import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; -import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; +import { + RoomStateEvent, + type Room as MatrixRoom, + type RoomMember, +} from "matrix-js-sdk"; // import type { Logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../Behavior"; @@ -24,6 +37,7 @@ import { type ObservableScope } from "../ObservableScope"; import { type createConnectionManager$ } from "./ConnectionManager"; import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; import { type Connection } from "./Connection"; +import { generateItems$ } from "../../utils/observable"; /** * Represent a matrix call member and his associated livekit participation. @@ -31,18 +45,20 @@ import { type Connection } from "./Connection"; * or if it has no livekit transport at all. */ export interface MatrixLivekitMember { - membership: CallMembership; + participantId: string; + membership$: Behavior; displayName$: Behavior; - participant?: LocalLivekitParticipant | RemoteLivekitParticipant; - connection?: Connection; + participant$: + | Behavior + | Behavior; + connection$: Behavior; + mxcAvatarUrl$: Behavior; /** * TODO Try to remove this! Its waaay to much information. * Just get the member's avatar * @deprecated */ - member: RoomMember; - mxcAvatarUrl?: string; - participantId: string; + member$: Behavior; } interface Props { @@ -85,7 +101,7 @@ export function createMatrixLivekitMembers$({ */ function createMatrixLivekitMember$(): Observable { - const displaynameMap$ = memberDisplaynames$( + const displayNameMap$ = memberDisplaynames$( scope, matrixRoom, membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))), @@ -93,51 +109,75 @@ export function createMatrixLivekitMembers$({ deviceId, ); - return combineLatest([ - membershipsWithTransport$, - connectionManager.connectionManagerData$, - ]).pipe( - map(([memberships, managerData]) => { - const items: MatrixLivekitMember[] = memberships.map( - ({ membership, transport }) => { - // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to - const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; + return generateItems$( + combineLatest([ + membershipsWithTransport$, + connectionManager.connectionManagerData$, + ]), + function* ([memberships, managerData]) { + for (const { membership, transport } of memberships) { + // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to + const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; - const participants = transport - ? managerData.getParticipantForTransport(transport) - : []; - const participant = participants.find( - (p) => p.identity == participantId, - ); - const member = getRoomMemberFromRtcMember( - membership, - matrixRoom, - )?.member; - const connection = transport - ? managerData.getConnectionForTransport(transport) - : undefined; - const displayName$ = scope.behavior( - displaynameMap$.pipe( - map( - (displayNameMap) => - displayNameMap.get(membership.membershipID) ?? "---", - ), - ), - ); - return { + const participants = transport + ? managerData.getParticipantForTransport(transport) + : []; + const participant = participants.find( + (p) => p.identity == participantId, + ); + const member = getRoomMemberFromRtcMember( + membership, + matrixRoom, + )?.member; + if (member === undefined) { + logger.warn(`No room member for participant ${participantId}`); + continue; + } + + const connection = transport + ? managerData.getConnectionForTransport(transport) + : undefined; + + yield { + key: participantId, + data: { participant, membership, connection, // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. - member: member as RoomMember, - displayName$, - mxcAvatarUrl: member?.getMxcAvatarUrl(), - participantId, - }; - }, - ); - return items; + member, + }, + }; + } + }, + (scope, participantId, data$): MatrixLivekitMember => ({ + participantId, + membership$: scope.behavior(data$.pipe(map((data) => data.membership))), + displayName$: scope.behavior( + displayNameMap$.pipe( + map((displayNames) => displayNames.get(participantId)), + filter((name) => name !== undefined), + ), + "", + ), + participant$: scope.behavior( + data$.pipe(map((data) => data.participant)), + // Assert that a local participant will never become a remote + // participant or vice versa + ) as + | Behavior + | Behavior, + connection$: scope.behavior(data$.pipe(map((data) => data.connection))), + mxcAvatarUrl$: scope.behavior( + // React to avatar changes + fromEvent(matrixRoom, RoomStateEvent.Members).pipe( + startWith(null), + switchMap(() => + data$.pipe(map((data) => data.member.getMxcAvatarUrl())), + ), + ), + ), + member$: scope.behavior(data$.pipe(map((data) => data.member))), }), ); } diff --git a/src/utils/observable.ts b/src/utils/observable.ts index eb817991..9ee713dd 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -20,6 +20,7 @@ import { takeWhile, tap, withLatestFrom, + BehaviorSubject, } from "rxjs"; import { type Behavior } from "../state/Behavior"; @@ -120,69 +121,54 @@ export function pauseWhen(pause$: Behavior) { } /** - * Maps a changing input value to an output value consisting of items that have - * automatically generated ObservableScopes tied to a key. Items will be - * automatically created when their key is requested for the first time, reused - * when the same key is requested at a later time, and destroyed (have their - * scope ended) when the key is no longer requested. + * Maps a changing input value to a collection of items that each capture some + * dynamic data and are tied to a key. Items will be automatically created when + * their key is requested for the first time, reused when the same key is + * requested acy later time, and destroyed (have their scope ended) when the key + * is no longer requested. * * @param input$ The input value to be mapped. - * @param project A function mapping input values to output values. This - * function receives an additional callback `createOrGet` which can be used - * within the function body to request that an item be generated for a certain - * key. The caller provides a factory which will be used to create the item if - * it is being requested for the first time. Otherwise, the item previously - * existing under that key will be returned. + * @param generator A generator function yielding a key and the currently + * associated data for each item that it wants to exist. + * @param factory A function constructing an actual item, given the item's key, + * dynamic data, and an automatically managed ObservableScope for the item. */ -export function generateKeyed$( - input$: Observable, - project: ( - input: In, - createOrGet: ( - key: string, - factory: (scope: ObservableScope) => Item, - ) => Item, - ) => Out, -): Observable { +export function generateItems$( + input$: Observable, + generator: (input: Input) => Generator<{ key: Key; data: Data }, void, void>, + factory: (scope: ObservableScope, key: Key, data$: Behavior) => Item, +): Observable { return input$.pipe( // Keep track of the existing items over time, so we can reuse them - scan< - In, - { - items: Map; - output: Out; - }, - { items: Map } - >( - (state, data) => { - const nextItems = new Map< - string, - { item: Item; scope: ObservableScope } - >(); + scan((prevItems, input) => { + const nextItems = new Map< + Key, + { scope: ObservableScope; data$: BehaviorSubject; item: Item } + >(); - const output = project(data, (key, factory) => { - let item = state.items.get(key); - if (item === undefined) { - // First time requesting the key; create the item - const scope = new ObservableScope(); - item = { item: factory(scope), scope }; - } - nextItems.set(key, item); - return item.item; - }); + for (const { key, data } of generator(input)) { + let item = prevItems.get(key); + if (item === undefined) { + // First time requesting the key; create the item + const scope = new ObservableScope(); + const data$ = new BehaviorSubject(data); + item = { scope, data$, item: factory(scope, key, data$) }; + } else { + item.data$.next(data); + } + nextItems.set(key, item); + } - // Destroy all items that are no longer being requested - for (const [key, { scope }] of state.items) - if (!nextItems.has(key)) scope.end(); + // Destroy all items that are no longer being requested + for (const [key, { scope }] of prevItems) + if (!nextItems.has(key)) scope.end(); - return { items: nextItems, output }; - }, - { items: new Map() }, - ), - finalizeValue((state) => { + return nextItems; + }, new Map; item: Item }>()), + finalizeValue((items) => { // Destroy all remaining items when no longer subscribed - for (const { scope } of state.items.values()) scope.end(); + for (const { scope } of items.values()) scope.end(); }), - map(({ output }) => output), + map((items) => [...items.values()].map(({ item }) => item)), ); }