diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index dd1190b7..f26c4b3b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -52,7 +52,7 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "../MediaViewModel"; -import { accumulate, generateKeyed$, pauseWhen } from "../../utils/observable"; +import { accumulate, generateItems, pauseWhen } from "../../utils/observable"; import { duplicateTiles, MatrixRTCMode, @@ -75,7 +75,7 @@ import { } from "../../reactions"; import { shallowEquals } from "../../utils/array"; import { type MediaDevices } from "../MediaDevices"; -import { type Behavior, constant } from "../Behavior"; +import { type Behavior } from "../Behavior"; import { E2eeType } from "../../e2ee/e2eeType"; import { MatrixKeyProvider } from "../../e2ee/matrixKeyProvider"; import { type MuteStates } from "../MuteStates"; @@ -370,103 +370,101 @@ export class CallViewModel { ); /** - * List of MediaItems that we want to have tiles for. + * List of user media (camera feeds) that we want tiles for. */ - // TODO KEEP THIS!! and adapt it to what our membershipManger returns // TODO this also needs the local participant to be added. - private readonly mediaItems$ = this.scope.behavior( - generateKeyed$< - [typeof this.matrixLivekitMembers$.value, number], - MediaItem, - MediaItem[] - >( + private readonly userMedia$ = this.scope.behavior( + combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]).pipe( // Generate a collection of MediaItems from the list of expected (whether // present or missing) LiveKit participants. - combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]), - ([{ value: matrixLivekitMembers }, duplicateTiles], createOrGet) => { - const items: MediaItem[] = []; - - for (const { - connection, - participant, - member, - displayName, + generateItems( + function* ([{ value: matrixLivekitMembers }, duplicateTiles]) { + for (const { + participantId, + userId, + participant$, + connection$, + displayName$, + mxcAvatarUrl$, + } of matrixLivekitMembers) + for (let dup = 0; dup < 1 + duplicateTiles; dup++) + yield { + keys: [ + dup, + participantId, + userId, + participant$, + connection$, + displayName$, + mxcAvatarUrl$, + ], + data: undefined, + }; + }, + ( + scope, + _data$, + dup, participantId, - } of matrixLivekitMembers) { - if (connection === undefined) { - logger.warn("connection is not yet initialised."); - continue; - } - for (let i = 0; i < 1 + duplicateTiles; i++) { - const mediaId = `${participantId}:${i}`; - const lkRoom = connection?.livekitRoom; - const url = connection?.transport.livekit_service_url; + userId, + participant$, + connection$, + displayName$, + mxcAvatarUrl$, + ) => { + const livekitRoom$ = scope.behavior( + connection$.pipe(map((c) => c?.livekitRoom)), + ); + const focusUrl$ = scope.behavior( + connection$.pipe(map((c) => c?.transport.livekit_service_url)), + ); - const item = createOrGet( - mediaId, - (scope) => - // We create UserMedia with or without a participant. - // This will be the initial value of a BehaviourSubject. - // Once a participant appears we will update the BehaviourSubject. (see below) - new UserMedia( - scope, - mediaId, - member, - participant, - this.options.encryptionSystem, - lkRoom, - url, - this.mediaDevices, - this.pretendToBeDisconnected$, - constant(displayName ?? "[👻]"), - this.handsRaised$.pipe( - map((v) => v[participantId]?.time ?? null), - ), - this.reactions$.pipe( - map((v) => v[participantId] ?? undefined), - ), - ), - ); - items.push(item); - (item as UserMedia).updateParticipant(participant); - - if (participant?.isScreenShareEnabled) { - const screenShareId = `${mediaId}:screen-share`; - items.push( - createOrGet( - screenShareId, - (scope) => - new ScreenShare( - scope, - screenShareId, - member, - participant, - this.options.encryptionSystem, - lkRoom, - url, - this.pretendToBeDisconnected$, - constant(displayName ?? "[👻]"), - ), - ), - ); - } - } - } - return items; - }, + return new UserMedia( + scope, + `${participantId}:${dup}`, + userId, + participant$, + this.options.encryptionSystem, + livekitRoom$, + focusUrl$, + this.mediaDevices, + this.pretendToBeDisconnected$, + displayName$, + mxcAvatarUrl$, + this.handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), + this.reactions$.pipe(map((v) => v[participantId] ?? undefined)), + ); + }, + ), ), ); /** - * List of MediaItems that we want to display, that are of type UserMedia + * List of all media items (user media and screen share media) that we want + * tiles for. */ - private readonly userMedia$ = this.scope.behavior( - this.mediaItems$.pipe( - map((mediaItems) => - mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), + private readonly mediaItems$ = this.scope.behavior( + this.userMedia$.pipe( + switchMap((userMedia) => + userMedia.length === 0 + ? of([]) + : combineLatest( + userMedia.map((m) => m.screenShares$), + (...screenShares) => [...userMedia, ...screenShares.flat(1)], + ), + ), + ), + ); + + /** + * List of MediaItems that we want to display, that are of type ScreenShare + */ + private readonly screenShares$ = this.scope.behavior( + this.mediaItems$.pipe( + map((mediaItems) => + mediaItems.filter((m): m is ScreenShare => m instanceof ScreenShare), ), ), - [], ); public readonly joinSoundEffect$ = this.userMedia$.pipe( @@ -544,17 +542,6 @@ export class CallViewModel { tap((reason) => this.leaveHoisted$.next(reason)), ); - /** - * List of MediaItems that we want to display, that are of type ScreenShare - */ - private readonly screenShares$ = this.scope.behavior( - this.mediaItems$.pipe( - map((mediaItems) => - mediaItems.filter((m): m is ScreenShare => m instanceof ScreenShare), - ), - ), - ); - private readonly spotlightSpeaker$ = this.scope.behavior( this.userMedia$.pipe( diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 1773eca1..c6b8b170 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -5,7 +5,13 @@ SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type LocalTrack, type E2EEOptions } from "livekit-client"; +import { + type LocalTrack, + type E2EEOptions, + type Participant, + ParticipantEvent, +} from "livekit-client"; +import { observeParticipantEvents } from "@livekit/components-core"; import { type LivekitTransport, type MatrixRTCSession, @@ -26,11 +32,9 @@ import { switchMap, take, takeWhile, - tap, } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; -import { sharingScreen$ as observeSharingScreen$ } from "../../UserMedia.ts"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../../ObservableScope"; @@ -521,3 +525,13 @@ export const createLocalMembership$ = ({ toggleScreenSharing, }; }; + +export function observeSharingScreen$(p: Participant): Observable { + return observeParticipantEvents( + p, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ).pipe(map((p) => p.isScreenShareEnabled)); +} diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 2859e49b..ce984aec 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -20,7 +20,7 @@ import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type Behavior } from "../../Behavior.ts"; import { type Connection } from "./Connection.ts"; import { Epoch, type ObservableScope } from "../../ObservableScope.ts"; -import { generateKeyed$ } from "../../../utils/observable.ts"; +import { generateItemsWithEpoch } from "../../../utils/observable.ts"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; @@ -144,34 +144,32 @@ export function createConnectionManager$({ * Connections for each transport in use by one or more session members. */ const connections$ = scope.behavior( - generateKeyed$, Connection, Epoch>( - transports$, - (transports, createOrGet) => { - const createConnection = - ( - transport: LivekitTransport, - ): ((scope: ObservableScope) => Connection) => - (scope) => { - const connection = connectionFactory.createConnection( - transport, - scope, - logger, - ); - // Start the connection immediately - // Use connection state to track connection progress - void connection.start(); - // TODO subscribe to connection state to retry or log issues? - return connection; - }; - - return transports.mapInner((transports) => { - return transports.map((transport) => { - const key = - transport.livekit_service_url + "|" + transport.livekit_alias; - return createOrGet(key, createConnection(transport)); - }); - }); - }, + transports$.pipe( + generateItemsWithEpoch( + function* (transports) { + for (const transport of transports) + yield { + keys: [transport.livekit_service_url, transport.livekit_alias], + data: undefined, + }; + }, + (scope, _data$, serviceUrl, alias) => { + const connection = connectionFactory.createConnection( + { + type: "livekit", + livekit_service_url: serviceUrl, + livekit_alias: alias, + }, + scope, + logger, + ); + // Start the connection immediately + // Use connection state to track connection progress + void connection.start(); + // TODO subscribe to connection state to retry or log issues? + return connection; + }, + ), ), ); diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 729ed547..4aaaadd4 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -16,32 +16,31 @@ import { import { combineLatest, filter, map } from "rxjs"; // eslint-disable-next-line rxjs/no-internal import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; -import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; +import { type Room as MatrixRoom } from "matrix-js-sdk"; +import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; import { Epoch, mapEpoch, type ObservableScope } from "../../ObservableScope"; -import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; +import { memberDisplaynames$ } from "./displayname"; import { type Connection } from "./Connection"; +import { generateItemsWithEpoch } from "../../../utils/observable"; /** - * Represent a matrix call member and his associated livekit participation. + * Represents a Matrix call member and their associated LiveKit participation. * `livekitParticipant` can be undefined if the member is not yet connected to the livekit room * or if it has no livekit transport at all. */ export interface MatrixLivekitMember { - membership: CallMembership; - displayName?: string; - participant?: LocalLivekitParticipant | RemoteLivekitParticipant; - connection?: Connection; - /** - * TODO Try to remove this! Its waaay to much information. - * Just get the member's avatar - * @deprecated - */ - member: RoomMember; - mxcAvatarUrl?: string; participantId: string; + userId: string; + membership$: Behavior; + participant$: Behavior< + LocalLivekitParticipant | RemoteLivekitParticipant | null + >; + connection$: Behavior; + displayName$: Behavior; + mxcAvatarUrl$: Behavior; } interface Props { @@ -100,44 +99,54 @@ export function createMatrixLivekitMembers$({ { value: membershipsWithTransports, epoch }, { value: managerData }, { value: displaynames }, - ]) => { - const items: MatrixLivekitMember[] = membershipsWithTransports.map( - ({ membership, transport }) => { - // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to - const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; + ]) => + new Epoch( + [membershipsWithTransports, managerData, displaynames] as const, + epoch, + ), + ), + generateItemsWithEpoch( + function* ([membershipsWithTransports, managerData, displaynames]) { + for (const { membership, transport } of membershipsWithTransports) { + // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to + const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; - const participants = transport - ? managerData.getParticipantForTransport(transport) - : []; - const participant = participants.find( - (p) => p.identity == participantId, - ); - const member = getRoomMemberFromRtcMember( + const participants = transport + ? managerData.getParticipantForTransport(transport) + : []; + const participant = + participants.find((p) => p.identity == participantId) ?? null; + // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + const member = matrixRoom.getMember(membership.userId); + const connection = transport + ? managerData.getConnectionForTransport(transport) + : undefined; + + let displayName = displaynames.get(membership.userId); + if (displayName === undefined) { + logger.warn(`No display name for user ${membership.userId}`); + displayName = ""; + } + + yield { + keys: [participantId, membership.userId], + data: { membership, - matrixRoom, - )?.member; - const connection = transport - ? managerData.getConnectionForTransport(transport) - : undefined; - const displayName = displaynames.get(participantId); - return { participant, - membership, connection, - // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. - member: member as RoomMember, displayName, mxcAvatarUrl: member?.getMxcAvatarUrl(), - participantId, - }; - }, - ); - return new Epoch(items, epoch); + }, + }; + } }, + (scope, data$, participantId, userId) => ({ + participantId, + userId, + ...scope.splitBehavior(data$), + }), ), ), - // new Epoch([]), ); } diff --git a/src/state/CallViewModel/remoteMembers/displayname.test.ts b/src/state/CallViewModel/remoteMembers/displayname.test.ts index 1bb376ba..efbe1235 100644 --- a/src/state/CallViewModel/remoteMembers/displayname.test.ts +++ b/src/state/CallViewModel/remoteMembers/displayname.test.ts @@ -97,7 +97,7 @@ test.skip("should always have our own user", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("a", { a: new Map([ - ["@local:example.com:DEVICE000", "@local:example.com"], + ["@local:example.com", "@local:example.com"], ]), }); }); @@ -130,9 +130,9 @@ test("should get displayName for users", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("a", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@alice:example.com:DEVICE1", "Alice"], - ["@bob:example.com:DEVICE1", "Bob"], + // ["@local:example.com", "it's a me"], + ["@alice:example.com", "Alice"], + ["@bob:example.com", "Bob"], ]), }); }); @@ -152,8 +152,8 @@ test("should use userId if no display name", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("a", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@no-name:foo.bar:D000", "@no-name:foo.bar"], + // ["@local:example.com", "it's a me"], + ["@no-name:foo.bar", "@no-name:foo.bar"], ]), }); }); @@ -179,12 +179,12 @@ test("should disambiguate users with same display name", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("a", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"], - ["@bob:example.com:DEVICE2", "Bob (@bob:example.com)"], - ["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"], - ["@carl:example.com:C000", "Carl (@carl:example.com)"], - ["@evil:example.com:E000", "Carl (@evil:example.com)"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob (@bob:example.com)"], + ["@bob:example.com", "Bob (@bob:example.com)"], + ["@bob:foo.bar", "Bob (@bob:foo.bar)"], + ["@carl:example.com", "Carl (@carl:example.com)"], + ["@evil:example.com", "Carl (@evil:example.com)"], ]), }); }); @@ -208,13 +208,13 @@ test("should disambiguate when needed", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("ab", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:DEVICE1", "Bob"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob"], ]), b: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"], - ["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob (@bob:example.com)"], + ["@bob:foo.bar", "Bob (@bob:foo.bar)"], ]), }); }); @@ -238,13 +238,13 @@ test.skip("should keep disambiguated name when other leave", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("ab", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"], - ["@bob:foo.bar:BOB000", "Bob (@bob:foo.bar)"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob (@bob:example.com)"], + ["@bob:foo.bar", "Bob (@bob:foo.bar)"], ]), b: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:DEVICE1", "Bob (@bob:example.com)"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob (@bob:example.com)"], ]), }); }); @@ -273,14 +273,14 @@ test("should disambiguate on name change", () => { expectObservable(dn$.pipe(map((e) => e.value))).toBe("ab", { a: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:B000", "Bob"], - ["@carl:example.com:C000", "Carl"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob"], + ["@carl:example.com", "Carl"], ]), b: new Map([ - // ["@local:example.com:DEVICE000", "it's a me"], - ["@bob:example.com:B000", "Bob (@bob:example.com)"], - ["@carl:example.com:C000", "Bob (@carl:example.com)"], + // ["@local:example.com", "it's a me"], + ["@bob:example.com", "Bob (@bob:example.com)"], + ["@carl:example.com", "Bob (@carl:example.com)"], ]), }); }); diff --git a/src/state/CallViewModel/remoteMembers/displayname.ts b/src/state/CallViewModel/remoteMembers/displayname.ts index c8484a9a..f56bc253 100644 --- a/src/state/CallViewModel/remoteMembers/displayname.ts +++ b/src/state/CallViewModel/remoteMembers/displayname.ts @@ -42,7 +42,7 @@ export function createRoomMembers$( * any displayname that clashes with another member. Only members * joined to the call are considered here. * - * @returns Map uses the rtc member idenitfier as the key. + * @returns Map uses the Matrix user ID as the key. */ // don't do this work more times than we need to. This is achieved by converting to a behavior: export const memberDisplaynames$ = ( @@ -66,19 +66,14 @@ export const memberDisplaynames$ = ( // We only consider RTC members for disambiguation as they are the only visible members. for (const rtcMember of memberships) { - // TODO a hard-coded participant ID ? should use rtcMember.membershipID instead? - const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`; - const { member } = getRoomMemberFromRtcMember(rtcMember, room); - if (!member) { - logger.error( - "Could not find member for participant id:", - matrixIdentifier, - ); + const member = room.getMember(rtcMember.userId); + if (member === null) { + logger.error(`Could not find member for user ${rtcMember.userId}`); continue; } const disambiguate = shouldDisambiguate(member, memberships, room); displaynameMap.set( - matrixIdentifier, + rtcMember.userId, calculateDisplayName(member, disambiguate), ); } @@ -87,13 +82,3 @@ export const memberDisplaynames$ = ( ), new Epoch(new Map()), ); - -export function getRoomMemberFromRtcMember( - rtcMember: CallMembership, - room: Pick, -): { id: string; member: RoomMember | undefined } { - return { - id: rtcMember.userId + ":" + rtcMember.deviceId, - member: room.getMember(rtcMember.userId) ?? undefined, - }; -} diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index b35f6112..74e64b93 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -27,7 +27,6 @@ import { RoomEvent as LivekitRoomEvent, RemoteTrack, } from "livekit-client"; -import { type RoomMember } from "matrix-js-sdk"; import { logger } from "matrix-js-sdk/lib/logger"; import { BehaviorSubject, @@ -44,6 +43,7 @@ import { startWith, switchMap, throttleTime, + distinctUntilChanged, } from "rxjs"; import { alwaysShowSelf } from "../settings/settings"; @@ -180,29 +180,35 @@ function observeRemoteTrackReceivingOkay$( } function encryptionErrorObservable$( - room: LivekitRoom, + room$: Behavior, participant: Participant, encryptionSystem: EncryptionSystem, criteria: string, ): Observable { - return roomEventSelector(room, LivekitRoomEvent.EncryptionError).pipe( - map((e) => { - const [err] = e; - if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { - return ( - // Ideally we would pull the participant identity from the field on the error. - // However, it gets lost in the serialization process between workers. - // So, instead we do a string match - (err?.message.includes(participant.identity) && - err?.message.includes(criteria)) ?? - false - ); - } else if (encryptionSystem.kind === E2eeType.SHARED_KEY) { - return !!err?.message.includes(criteria); - } + return room$.pipe( + switchMap((room) => { + if (room === undefined) return of(false); + return roomEventSelector(room, LivekitRoomEvent.EncryptionError).pipe( + map((e) => { + const [err] = e; + if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { + return ( + // Ideally we would pull the participant identity from the field on the error. + // However, it gets lost in the serialization process between workers. + // So, instead we do a string match + (err?.message.includes(participant.identity) && + err?.message.includes(criteria)) ?? + false + ); + } else if (encryptionSystem.kind === E2eeType.SHARED_KEY) { + return !!err?.message.includes(criteria); + } - return false; + return false; + }), + ); }), + distinctUntilChanged(), throttleTime(1000), // Throttle to avoid spamming the UI startWith(false), ); @@ -250,11 +256,9 @@ abstract class BaseMediaViewModel { */ public readonly id: string, /** - * The Matrix room member to which this media belongs. + * The Matrix user to which this media belongs. */ - // TODO: Fully separate the data layer from the UI layer by keeping the - // member object internal - public readonly member: RoomMember, + public readonly userId: string, // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // livekit. protected readonly participant$: Observable< @@ -264,9 +268,10 @@ abstract class BaseMediaViewModel { encryptionSystem: EncryptionSystem, audioSource: AudioSource, videoSource: VideoSource, - livekitRoom: LivekitRoom, - public readonly focusURL: string, + livekitRoom$: Behavior, + public readonly focusUrl$: Behavior, public readonly displayName$: Behavior, + public readonly mxcAvatarUrl$: Behavior, ) { const audio$ = this.observeTrackReference$(audioSource); this.video$ = this.observeTrackReference$(videoSource); @@ -294,13 +299,13 @@ abstract class BaseMediaViewModel { } else if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { return combineLatest([ encryptionErrorObservable$( - livekitRoom, + livekitRoom$, participant, encryptionSystem, "MissingKey", ), encryptionErrorObservable$( - livekitRoom, + livekitRoom$, participant, encryptionSystem, "InvalidKey", @@ -320,7 +325,7 @@ abstract class BaseMediaViewModel { } else { return combineLatest([ encryptionErrorObservable$( - livekitRoom, + livekitRoom$, participant, encryptionSystem, "InvalidKey", @@ -402,26 +407,28 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public constructor( scope: ObservableScope, id: string, - member: RoomMember, + userId: string, participant$: Observable, encryptionSystem: EncryptionSystem, - livekitRoom: LivekitRoom, - focusUrl: string, + livekitRoom$: Behavior, + focusUrl$: Behavior, displayName$: Behavior, + mxcAvatarUrl$: Behavior, public readonly handRaised$: Behavior, public readonly reaction$: Behavior, ) { super( scope, id, - member, + userId, participant$, encryptionSystem, Track.Source.Microphone, Track.Source.Camera, - livekitRoom, - focusUrl, + livekitRoom$, + focusUrl$, displayName$, + mxcAvatarUrl$, ); const media$ = this.scope.behavior( @@ -538,25 +545,27 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { public constructor( scope: ObservableScope, id: string, - member: RoomMember, + userId: string, participant$: Behavior, encryptionSystem: EncryptionSystem, - livekitRoom: LivekitRoom, - focusURL: string, + livekitRoom$: Behavior, + focusUrl$: Behavior, private readonly mediaDevices: MediaDevices, displayName$: Behavior, + mxcAvatarUrl$: Behavior, handRaised$: Behavior, reaction$: Behavior, ) { super( scope, id, - member, + userId, participant$, encryptionSystem, - livekitRoom, - focusURL, + livekitRoom$, + focusUrl$, displayName$, + mxcAvatarUrl$, handRaised$, reaction$, ); @@ -648,25 +657,27 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { public constructor( scope: ObservableScope, id: string, - member: RoomMember, + userId: string, participant$: Observable, encryptionSystem: EncryptionSystem, - livekitRoom: LivekitRoom, - focusUrl: string, + livekitRoom$: Behavior, + focusUrl$: Behavior, private readonly pretendToBeDisconnected$: Behavior, - displayname$: Behavior, + displayName$: Behavior, + mxcAvatarUrl$: Behavior, handRaised$: Behavior, reaction$: Behavior, ) { super( scope, id, - member, + userId, participant$, encryptionSystem, - livekitRoom, - focusUrl, - displayname$, + livekitRoom$, + focusUrl$, + displayName$, + mxcAvatarUrl$, handRaised$, reaction$, ); @@ -747,26 +758,28 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public constructor( scope: ObservableScope, id: string, - member: RoomMember, + userId: string, participant$: Observable, encryptionSystem: EncryptionSystem, - livekitRoom: LivekitRoom, - focusUrl: string, + livekitRoom$: Behavior, + focusUrl$: Behavior, private readonly pretendToBeDisconnected$: Behavior, - displayname$: Behavior, + displayName$: Behavior, + mxcAvatarUrl$: Behavior, public readonly local: boolean, ) { super( scope, id, - member, + userId, participant$, encryptionSystem, Track.Source.ScreenShareAudio, Track.Source.ScreenShare, - livekitRoom, - focusUrl, - displayname$, + livekitRoom$, + focusUrl$, + displayName$, + mxcAvatarUrl$, ); } } diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index d1d6c297..ae46a242 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -24,7 +24,11 @@ import { type Behavior } from "./Behavior"; type MonoTypeOperator = (o: Observable) => Observable; -export const noInitialValue = Symbol("nothing"); +type SplitBehavior = keyof T extends string | number + ? { [K in keyof T as `${K}$`]: Behavior } + : never; + +const nothing = Symbol("nothing"); /** * A scope which limits the execution lifetime of its bound Observables. @@ -59,7 +63,10 @@ export class ObservableScope { * Converts an Observable to a Behavior. If no initial value is specified, the * Observable must synchronously emit an initial value. */ - public behavior(setValue$: Observable, initialValue?: T): Behavior { + public behavior( + setValue$: Observable, + initialValue: T | typeof nothing = nothing, + ): Behavior { const subject$ = new BehaviorSubject(initialValue); // Push values from the Observable into the BehaviorSubject. // BehaviorSubjects have an undesirable feature where if you call 'complete', @@ -74,7 +81,7 @@ export class ObservableScope { subject$.error(err); }, }); - if (subject$.value === noInitialValue) + if (subject$.value === nothing) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } @@ -115,27 +122,27 @@ export class ObservableScope { value$: Behavior, callback: (value: T) => Promise<(() => Promise) | void>, ): void { - let latestValue: T | typeof noInitialValue = noInitialValue; - let reconciledValue: T | typeof noInitialValue = noInitialValue; + let latestValue: T | typeof nothing = nothing; + let reconciledValue: T | typeof nothing = nothing; let cleanUp: (() => Promise) | void = undefined; value$ .pipe( catchError(() => EMPTY), // Ignore errors this.bind(), // Limit to the duration of the scope - endWith(noInitialValue), // Clean up when the scope ends + endWith(nothing), // Clean up when the scope ends ) .subscribe((value) => { void (async (): Promise => { - if (latestValue === noInitialValue) { + if (latestValue === nothing) { latestValue = value; while (latestValue !== reconciledValue) { await cleanUp?.(); // Call the previous value's clean-up handler reconciledValue = latestValue; - if (latestValue !== noInitialValue) + if (latestValue !== nothing) cleanUp = await callback(latestValue); // Sync current value } // Reset to signal that reconciliation is done for now - latestValue = noInitialValue; + latestValue = nothing; } else { // There's already an instance of the above 'while' loop running // concurrently. Just update the latest value and let it be handled. @@ -144,6 +151,24 @@ export class ObservableScope { })(); }); } + + /** + * Splits a Behavior of objects with static properties into an object with + * Behavior properties. + * + * For example, splitting a Behavior<{ name: string, age: number }> results in + * an object of type { name$: Behavior age$: Behavior }. + */ + public splitBehavior( + input$: Behavior, + ): SplitBehavior { + return Object.fromEntries( + Object.keys(input$.value).map((key) => [ + `${key}$`, + this.behavior(input$.pipe(map((input) => input[key as keyof T]))), + ]), + ) as SplitBehavior; + } } /** diff --git a/src/state/ScreenShare.ts b/src/state/ScreenShare.ts index 9803a5f4..0a241cdf 100644 --- a/src/state/ScreenShare.ts +++ b/src/state/ScreenShare.ts @@ -4,7 +4,7 @@ Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { of, type Observable } from "rxjs"; +import { of } from "rxjs"; import { type LocalParticipant, type RemoteParticipant, @@ -13,7 +13,6 @@ import { import { type ObservableScope } from "./ObservableScope.ts"; import { ScreenShareViewModel } from "./MediaViewModel.ts"; -import type { RoomMember } from "matrix-js-sdk"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; import type { Behavior } from "./Behavior.ts"; @@ -28,24 +27,26 @@ export class ScreenShare { public constructor( private readonly scope: ObservableScope, id: string, - member: RoomMember, + userId: string, participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, - livekitRoom: LivekitRoom, - focusUrl: string, + livekitRoom$: Behavior, + focusUrl$: Behavior, pretendToBeDisconnected$: Behavior, - displayName$: Observable, + displayName$: Behavior, + mxcAvatarUrl$: Behavior, ) { this.vm = new ScreenShareViewModel( this.scope, id, - member, + userId, of(participant), encryptionSystem, - livekitRoom, - focusUrl, + livekitRoom$, + focusUrl$, pretendToBeDisconnected$, - this.scope.behavior(displayName$), + displayName$, + mxcAvatarUrl$, participant.isLocal, ); } diff --git a/src/state/TileStore.ts b/src/state/TileStore.ts index f6db0930..7b95bd8e 100644 --- a/src/state/TileStore.ts +++ b/src/state/TileStore.ts @@ -14,7 +14,7 @@ import { fillGaps } from "../utils/iter"; import { debugTileLayout } from "../settings/settings"; function debugEntries(entries: GridTileData[]): string[] { - return entries.map((e) => e.media.member?.rawDisplayName ?? "[👻]"); + return entries.map((e) => e.media.displayName$.value); } let DEBUG_ENABLED = false; @@ -156,7 +156,7 @@ export class TileStoreBuilder { public registerSpotlight(media: MediaViewModel[], maximised: boolean): void { if (DEBUG_ENABLED) logger.debug( - `[TileStore, ${this.generation}] register spotlight: ${media.map((m) => m.member?.rawDisplayName ?? "[👻]")}`, + `[TileStore, ${this.generation}] register spotlight: ${media.map((m) => m.displayName$.value)}`, ); if (this.spotlight !== null) throw new Error("Spotlight already set"); @@ -180,7 +180,7 @@ export class TileStoreBuilder { public registerGridTile(media: UserMediaViewModel): void { if (DEBUG_ENABLED) logger.debug( - `[TileStore, ${this.generation}] register grid tile: ${media.member?.rawDisplayName ?? "[👻]"}`, + `[TileStore, ${this.generation}] register grid tile: ${media.displayName$.value}`, ); if (this.spotlight !== null) { @@ -263,7 +263,7 @@ export class TileStoreBuilder { public registerPipTile(media: UserMediaViewModel): void { if (DEBUG_ENABLED) logger.debug( - `[TileStore, ${this.generation}] register PiP tile: ${media.member?.rawDisplayName ?? "[👻]"}`, + `[TileStore, ${this.generation}] register PiP tile: ${media.displayName$.value}`, ); // If there is a single grid tile that we can reuse diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 9eec3967..38f22122 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -5,17 +5,9 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - BehaviorSubject, - combineLatest, - map, - type Observable, - of, - switchMap, -} from "rxjs"; +import { combineLatest, map, type Observable, of, switchMap } from "rxjs"; import { type LocalParticipant, - type Participant, ParticipantEvent, type RemoteParticipant, type Room as LivekitRoom, @@ -29,11 +21,12 @@ import { type UserMediaViewModel, } from "./MediaViewModel.ts"; import type { Behavior } from "./Behavior.ts"; -import type { RoomMember } from "matrix-js-sdk"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; import type { MediaDevices } from "./MediaDevices.ts"; import type { ReactionOption } from "../reactions"; import { observeSpeaker$ } from "./observeSpeaker.ts"; +import { generateItems } from "../utils/observable.ts"; +import { ScreenShare } from "./ScreenShare.ts"; /** * Sorting bins defining the order in which media tiles appear in the layout. @@ -72,35 +65,35 @@ enum SortingBin { /** * A user media item to be presented in a tile. This is a thin wrapper around * UserMediaViewModel which additionally determines the media item's sorting bin - * for inclusion in the call layout. + * for inclusion in the call layout and tracks associated screen shares. */ export class UserMedia { - private readonly participant$ = new BehaviorSubject(this.initialParticipant); - public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal ? new LocalUserMediaViewModel( this.scope, this.id, - this.member, + this.userId, this.participant$ as Behavior, this.encryptionSystem, - this.livekitRoom, - this.focusURL, + this.livekitRoom$, + this.focusUrl$, this.mediaDevices, - this.scope.behavior(this.displayname$), + this.displayName$, + this.mxcAvatarUrl$, this.scope.behavior(this.handRaised$), this.scope.behavior(this.reaction$), ) : new RemoteUserMediaViewModel( this.scope, this.id, - this.member, + this.userId, this.participant$ as Behavior, this.encryptionSystem, - this.livekitRoom, - this.focusURL, + this.livekitRoom$, + this.focusUrl$, this.pretendToBeDisconnected$, - this.scope.behavior(this.displayname$), + this.displayName$, + this.mxcAvatarUrl$, this.scope.behavior(this.handRaised$), this.scope.behavior(this.reaction$), ); @@ -109,12 +102,55 @@ export class UserMedia { observeSpeaker$(this.vm.speaking$), ); - private readonly presenter$ = this.scope.behavior( + /** + * All screen share media associated with this user media. + */ + public readonly screenShares$ = this.scope.behavior( this.participant$.pipe( - switchMap((p) => (p === null ? of(false) : sharingScreen$(p))), + switchMap((p) => + p === null + ? of([]) + : observeParticipantEvents( + p, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ).pipe( + // Technically more than one screen share might be possible... our + // MediaViewModels don't support it though since they look for a unique + // track for the given source. So generateItems here is a bit overkill. + generateItems( + function* (p) { + if (p.isScreenShareEnabled) + yield { + keys: ["screen-share"], + data: undefined, + }; + }, + (scope, _data$, key) => + new ScreenShare( + scope, + `${this.id}:${key}`, + this.userId, + p, + this.encryptionSystem, + this.livekitRoom$, + this.focusUrl$, + this.pretendToBeDisconnected$, + this.displayName$, + this.mxcAvatarUrl$, + ), + ), + ), + ), ), ); + private readonly presenter$ = this.scope.behavior( + this.screenShares$.pipe(map((screenShares) => screenShares.length > 0)), + ); + /** * Which sorting bin the media item should be placed in. */ @@ -147,37 +183,18 @@ export class UserMedia { public constructor( private readonly scope: ObservableScope, public readonly id: string, - private readonly member: RoomMember, - private readonly initialParticipant: - | LocalParticipant - | RemoteParticipant - | null = null, + private readonly userId: string, + private readonly participant$: Behavior< + LocalParticipant | RemoteParticipant | null + >, private readonly encryptionSystem: EncryptionSystem, - private readonly livekitRoom: LivekitRoom, - private readonly focusURL: string, + private readonly livekitRoom$: Behavior, + private readonly focusUrl$: Behavior, private readonly mediaDevices: MediaDevices, private readonly pretendToBeDisconnected$: Behavior, - private readonly displayname$: Observable, + private readonly displayName$: Behavior, + private readonly mxcAvatarUrl$: Behavior, private readonly handRaised$: Observable, private readonly reaction$: Observable, ) {} - - public updateParticipant( - newParticipant: LocalParticipant | RemoteParticipant | null = null, - ): void { - if (this.participant$.value !== newParticipant) { - // Update the BehaviourSubject in the UserMedia. - this.participant$.next(newParticipant); - } - } -} - -export function sharingScreen$(p: Participant): Observable { - return observeParticipantEvents( - p, - ParticipantEvent.TrackPublished, - ParticipantEvent.TrackUnpublished, - ParticipantEvent.LocalTrackPublished, - ParticipantEvent.LocalTrackUnpublished, - ).pipe(map((p) => p.isScreenShareEnabled)); } diff --git a/src/tile/GridTile.tsx b/src/tile/GridTile.tsx index 1925eff6..57409869 100644 --- a/src/tile/GridTile.tsx +++ b/src/tile/GridTile.tsx @@ -58,7 +58,9 @@ interface TileProps { style?: ComponentProps["style"]; targetWidth: number; targetHeight: number; + focusUrl: string | undefined; displayName: string; + mxcAvatarUrl: string | undefined; showSpeakingIndicators: boolean; focusable: boolean; } @@ -81,7 +83,9 @@ const UserMediaTile: FC = ({ menuStart, menuEnd, className, + focusUrl, displayName, + mxcAvatarUrl, focusable, ...props }) => { @@ -145,7 +149,7 @@ const UserMediaTile: FC = ({ = ({ /> } displayName={displayName} + mxcAvatarUrl={mxcAvatarUrl} focusable={focusable} primaryButton={ primaryButton ?? ( @@ -190,7 +195,7 @@ const UserMediaTile: FC = ({ currentReaction={reaction ?? undefined} raisedHandOnClick={raisedHandOnClick} localParticipant={vm.local} - focusUrl={vm.focusURL} + focusUrl={focusUrl} audioStreamStats={audioStreamStats} videoStreamStats={videoStreamStats} {...props} @@ -359,7 +364,9 @@ export const GridTile: FC = ({ const ourRef = useRef(null); const ref = useMergedRefs(ourRef, theirRef); const media = useBehavior(vm.media$); + const focusUrl = useBehavior(media.focusUrl$); const displayName = useBehavior(media.displayName$); + const mxcAvatarUrl = useBehavior(media.mxcAvatarUrl$); if (media instanceof LocalUserMediaViewModel) { return ( @@ -367,7 +374,9 @@ export const GridTile: FC = ({ ref={ref} vm={media} onOpenProfile={onOpenProfile} + focusUrl={focusUrl} displayName={displayName} + mxcAvatarUrl={mxcAvatarUrl} {...props} /> ); @@ -376,7 +385,9 @@ export const GridTile: FC = ({ ); diff --git a/src/tile/MediaView.tsx b/src/tile/MediaView.tsx index 149b4177..e8a30cd4 100644 --- a/src/tile/MediaView.tsx +++ b/src/tile/MediaView.tsx @@ -7,7 +7,6 @@ Please see LICENSE in the repository root for full details. import { type TrackReferenceOrPlaceholder } from "@livekit/components-core"; import { animated } from "@react-spring/web"; -import { type RoomMember } from "matrix-js-sdk"; import { type FC, type ComponentProps, type ReactNode } from "react"; import { useTranslation } from "react-i18next"; import classNames from "classnames"; @@ -32,12 +31,13 @@ interface Props extends ComponentProps { video: TrackReferenceOrPlaceholder | undefined; videoFit: "cover" | "contain"; mirror: boolean; - member: RoomMember; + userId: string; videoEnabled: boolean; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; nameTagLeadingIcon?: ReactNode; displayName: string; + mxcAvatarUrl: string | undefined; focusable: boolean; primaryButton?: ReactNode; raisedHandTime?: Date; @@ -59,11 +59,12 @@ export const MediaView: FC = ({ video, videoFit, mirror, - member, + userId, videoEnabled, unencryptedWarning, nameTagLeadingIcon, displayName, + mxcAvatarUrl, focusable, primaryButton, encryptionStatus, @@ -94,10 +95,10 @@ export const MediaView: FC = ({ >
diff --git a/src/tile/SpotlightTile.tsx b/src/tile/SpotlightTile.tsx index 6034c846..48dd0f8c 100644 --- a/src/tile/SpotlightTile.tsx +++ b/src/tile/SpotlightTile.tsx @@ -27,7 +27,6 @@ import { useObservableRef } from "observable-hooks"; import { useTranslation } from "react-i18next"; import classNames from "classnames"; import { type TrackReferenceOrPlaceholder } from "@livekit/components-core"; -import { type RoomMember } from "matrix-js-sdk"; import FullScreenMaximiseIcon from "../icons/FullScreenMaximise.svg?react"; import FullScreenMinimiseIcon from "../icons/FullScreenMinimise.svg?react"; @@ -55,10 +54,12 @@ interface SpotlightItemBaseProps { targetHeight: number; video: TrackReferenceOrPlaceholder | undefined; videoEnabled: boolean; - member: RoomMember; + userId: string; unencryptedWarning: boolean; encryptionStatus: EncryptionStatus; + focusUrl: string | undefined; displayName: string; + mxcAvatarUrl: string | undefined; focusable: boolean; "aria-hidden"?: boolean; localParticipant: boolean; @@ -78,7 +79,7 @@ const SpotlightLocalUserMediaItem: FC = ({ ...props }) => { const mirror = useBehavior(vm.mirror$); - return ; + return ; }; SpotlightLocalUserMediaItem.displayName = "SpotlightLocalUserMediaItem"; @@ -134,7 +135,9 @@ const SpotlightItem: FC = ({ }) => { const ourRef = useRef(null); const ref = useMergedRefs(ourRef, theirRef); + const focusUrl = useBehavior(vm.focusUrl$); const displayName = useBehavior(vm.displayName$); + const mxcAvatarUrl = useBehavior(vm.mxcAvatarUrl$); const video = useBehavior(vm.video$); const videoEnabled = useBehavior(vm.videoEnabled$); const unencryptedWarning = useBehavior(vm.unencryptedWarning$); @@ -161,11 +164,13 @@ const SpotlightItem: FC = ({ className: classNames(styles.item, { [styles.snap]: snap }), targetWidth, targetHeight, - video, + video: video ?? undefined, videoEnabled, - member: vm.member, + userId: vm.userId, unencryptedWarning, + focusUrl, displayName, + mxcAvatarUrl, focusable, encryptionStatus, "aria-hidden": ariaHidden, diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index e039c846..d1034e7b 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -9,7 +9,7 @@ import { test } from "vitest"; import { Subject } from "rxjs"; import { withTestScheduler } from "./test"; -import { generateKeyed$, pauseWhen } from "./observable"; +import { generateItems, pauseWhen } from "./observable"; test("pauseWhen", () => { withTestScheduler(({ behavior, expectObservable }) => { @@ -24,7 +24,7 @@ test("pauseWhen", () => { }); }); -test("generateKeyed$ has the right output and ends scopes at the right times", () => { +test("generateItems", () => { const scope1$ = new Subject(); const scope2$ = new Subject(); const scope3$ = new Subject(); @@ -44,18 +44,27 @@ test("generateKeyed$ has the right output and ends scopes at the right times", ( const scope4Marbles = " ----yn"; expectObservable( - generateKeyed$(hot(inputMarbles), (input, createOrGet) => { - for (let i = 1; i <= +input; i++) { - createOrGet(i.toString(), (scope) => { + hot(inputMarbles).pipe( + generateItems( + function* (input) { + for (let i = 1; i <= +input; i++) { + yield { keys: [i], data: undefined }; + } + }, + (scope, data$, i) => { scopeSubjects[i - 1].next("y"); scope.onEnd(() => scopeSubjects[i - 1].next("n")); return i.toString(); - }); - } - return "abcd"[+input - 1]; - }), + }, + ), + ), subscriptionMarbles, - ).toBe(outputMarbles); + ).toBe(outputMarbles, { + a: ["1"], + b: ["1", "2"], + c: ["1", "2", "3"], + d: ["1", "2", "3", "4"], + }); expectObservable(scope1$).toBe(scope1Marbles); expectObservable(scope2$).toBe(scope2Marbles); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index eb817991..053921cd 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -20,10 +20,12 @@ import { takeWhile, tap, withLatestFrom, + BehaviorSubject, + type OperatorFunction, } from "rxjs"; import { type Behavior } from "../state/Behavior"; -import { ObservableScope } from "../state/ObservableScope"; +import { Epoch, ObservableScope } from "../state/ObservableScope"; const nothing = Symbol("nothing"); @@ -119,70 +121,156 @@ export function pauseWhen(pause$: Behavior) { ); } +interface ItemHandle { + scope: ObservableScope; + data$: BehaviorSubject; + item: Item; +} + /** - * Maps a changing input value to an output value consisting of items that have - * automatically generated ObservableScopes tied to a key. Items will be - * automatically created when their key is requested for the first time, reused - * when the same key is requested at a later time, and destroyed (have their - * scope ended) when the key is no longer requested. + * Maps a changing input value to a collection of items that each capture some + * dynamic data and are tied to a key. Items will be automatically created when + * their key is requested for the first time, reused when the same key is + * requested at a later time, and destroyed (have their scope ended) when the + * key is no longer requested. * * @param input$ The input value to be mapped. - * @param project A function mapping input values to output values. This - * function receives an additional callback `createOrGet` which can be used - * within the function body to request that an item be generated for a certain - * key. The caller provides a factory which will be used to create the item if - * it is being requested for the first time. Otherwise, the item previously - * existing under that key will be returned. + * @param generator A generator function yielding a tuple of keys and the + * currently associated data for each item that it wants to exist. + * @param factory A function constructing an individual item, given the item's key, + * dynamic data, and an automatically managed ObservableScope for the item. */ -export function generateKeyed$( - input$: Observable, - project: ( - input: In, - createOrGet: ( - key: string, - factory: (scope: ObservableScope) => Item, - ) => Item, - ) => Out, -): Observable { - return input$.pipe( - // Keep track of the existing items over time, so we can reuse them - scan< - In, - { - items: Map; - output: Out; - }, - { items: Map } - >( - (state, data) => { - const nextItems = new Map< - string, - { item: Item; scope: ObservableScope } - >(); +export function generateItems< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, +>( + generator: ( + input: Input, + ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, +): OperatorFunction { + return generateItemsInternal(generator, factory, (items) => items); +} - const output = project(data, (key, factory) => { - let item = state.items.get(key); - if (item === undefined) { - // First time requesting the key; create the item - const scope = new ObservableScope(); - item = { item: factory(scope), scope }; - } - nextItems.set(key, item); - return item.item; - }); - - // Destroy all items that are no longer being requested - for (const [key, { scope }] of state.items) - if (!nextItems.has(key)) scope.end(); - - return { items: nextItems, output }; - }, - { items: new Map() }, - ), - finalizeValue((state) => { - // Destroy all remaining items when no longer subscribed - for (const { scope } of state.items.values()) scope.end(); - }), - map(({ output }) => output), +/** + * Same as generateItems, but preserves epoch data. + */ +export function generateItemsWithEpoch< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, +>( + generator: ( + input: Input, + ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, +): OperatorFunction, Epoch> { + return generateItemsInternal( + function* (input) { + yield* generator(input.value); + }, + factory, + (items, input) => new Epoch(items, input.epoch), ); } + +function generateItemsInternal< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, + Output, +>( + generator: ( + input: Input, + ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, + project: (items: Item[], input: Input) => Output, +): OperatorFunction { + /* eslint-disable @typescript-eslint/no-explicit-any */ + return (input$) => + input$.pipe( + // Keep track of the existing items over time, so they can persist + scan< + Input, + { + map: Map; + items: Set>; + input: Input; + }, + { map: Map; items: Set> } + >( + ({ map: prevMap, items: prevItems }, input) => { + const nextMap = new Map(); + const nextItems = new Set>(); + + for (const { keys, data } of generator(input)) { + // Disable type checks for a second to grab the item out of a nested map + let i: any = prevMap; + for (const key of keys) i = i?.get(key); + let item = i as ItemHandle | undefined; + + if (item === undefined) { + // First time requesting the key; create the item + const scope = new ObservableScope(); + const data$ = new BehaviorSubject(data); + item = { scope, data$, item: factory(scope, data$, ...keys) }; + } else { + item.data$.next(data); + } + + // Likewise, disable type checks to insert the item in the nested map + let m: Map = nextMap; + for (let i = 0; i < keys.length - 1; i++) { + let inner = m.get(keys[i]); + if (inner === undefined) { + inner = new Map(); + m.set(keys[i], inner); + } + m = inner; + } + const finalKey = keys[keys.length - 1]; + if (m.has(finalKey)) + throw new Error( + `Keys must be unique (tried to generate multiple items for key ${keys})`, + ); + m.set(keys[keys.length - 1], item); + nextItems.add(item); + } + + // Destroy all items that are no longer being requested + for (const item of prevItems) + if (!nextItems.has(item)) item.scope.end(); + + return { map: nextMap, items: nextItems, input }; + }, + { map: new Map(), items: new Set() }, + ), + finalizeValue(({ items }) => { + // Destroy all remaining items when no longer subscribed + for (const { scope } of items) scope.end(); + }), + map(({ items, input }) => + project( + [...items].map(({ item }) => item), + input, + ), + ), + ); + /* eslint-enable @typescript-eslint/no-explicit-any */ +} diff --git a/src/utils/test-viewmodel.ts b/src/utils/test-viewmodel.ts index 5a0d7526..0e6c589a 100644 --- a/src/utils/test-viewmodel.ts +++ b/src/utils/test-viewmodel.ts @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, of } from "rxjs"; +import { BehaviorSubject } from "rxjs"; import { vitest } from "vitest"; import { type RelationsContainer } from "matrix-js-sdk/lib/models/relations-container"; import EventEmitter from "events"; @@ -158,7 +158,7 @@ export function getBasicCallViewModelEnvironment( }, handRaisedSubject$, reactionsSubject$, - of({ processor: undefined, supported: false }), + constant({ processor: undefined, supported: false }), ); return { vm, diff --git a/src/utils/test.ts b/src/utils/test.ts index bb19f2b1..d0f09576 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -304,18 +304,20 @@ export function createLocalMedia( localParticipant: LocalParticipant, mediaDevices: MediaDevices, ): LocalUserMediaViewModel { + const member = mockMatrixRoomMember(localRtcMember, roomMember); return new LocalUserMediaViewModel( testScope(), "local", - mockMatrixRoomMember(localRtcMember, roomMember), + member.userId, constant(localParticipant), { kind: E2eeType.PER_PARTICIPANT, }, - mockLivekitRoom({ localParticipant }), - "https://rtc-example.org", + constant(mockLivekitRoom({ localParticipant })), + constant("https://rtc-example.org"), mediaDevices, - constant(roomMember.rawDisplayName ?? "nodisplayname"), + constant(member.rawDisplayName ?? "nodisplayname"), + constant(member.getMxcAvatarUrl()), constant(null), constant(null), ); @@ -339,19 +341,23 @@ export function createRemoteMedia( roomMember: Partial, participant: Partial, ): RemoteUserMediaViewModel { + const member = mockMatrixRoomMember(localRtcMember, roomMember); const remoteParticipant = mockRemoteParticipant(participant); return new RemoteUserMediaViewModel( testScope(), "remote", - mockMatrixRoomMember(localRtcMember, roomMember), + member.userId, of(remoteParticipant), { kind: E2eeType.PER_PARTICIPANT, }, - mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }), - "https://rtc-example.org", + constant( + mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }), + ), + constant("https://rtc-example.org"), constant(false), - constant(roomMember.rawDisplayName ?? "nodisplayname"), + constant(member.rawDisplayName ?? "nodisplayname"), + constant(member.getMxcAvatarUrl()), constant(null), constant(null), );