diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index f26c4b3b..b508ff80 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -268,19 +268,15 @@ export class CallViewModel { }); // ------------------------------------------------------------------------ - // CallNotificationLifecycle - // consider inlining these!!! - private sentCallNotification$ = createSentCallNotification$( - this.scope, - this.matrixRTCSession, - ); - private receivedDecline$ = createReceivedDecline$(this.matrixRoom); private callLifecycle = createCallNotificationLifecycle$({ scope: this.scope, memberships$: this.memberships$, - sentCallNotification$: this.sentCallNotification$, - receivedDecline$: this.receivedDecline$, + sentCallNotification$: createSentCallNotification$( + this.scope, + this.matrixRTCSession, + ), + receivedDecline$: createReceivedDecline$(this.matrixRoom), options: this.options, localUser: { userId: this.userId, deviceId: this.deviceId }, }); @@ -331,24 +327,44 @@ export class CallViewModel { public readonly audioParticipants$ = this.scope.behavior( this.matrixLivekitMembers$.pipe( + switchMap((membersWithEpoch) => { + const members = membersWithEpoch.value; + const a$ = combineLatest( + members.map((member) => + combineLatest([member.connection$, member.participant$]).pipe( + map(([connection, participant]) => { + // do not render audio for local participant + if (!connection || !participant || participant.isLocal) + return null; + const livekitRoom = connection.livekitRoom; + const url = connection.transport.livekit_service_url; + + return { url, livekitRoom, participant: participant.identity }; + }), + ), + ), + ); + return a$; + }), map((members) => - members.value.reduce((acc, curr) => { - const url = curr.connection?.transport.livekit_service_url; - const livekitRoom = curr.connection?.livekitRoom; - const participant = curr.participant?.identity; + members.reduce((acc, curr) => { + if (!curr) return acc; - if (!url || !livekitRoom || !participant) return acc; - - const existing = acc.find((item) => item.url === url); + const existing = acc.find((item) => item.url === curr.url); if (existing) { - existing.participants.push(participant); + existing.participants.push(curr.participant); } else { - acc.push({ livekitRoom, participants: [participant], url }); + acc.push({ + livekitRoom: curr.livekitRoom, + participants: [curr.participant], + url: curr.url, + }); } return acc; }, []), ), ), + [], ); public readonly handsRaised$ = this.scope.behavior( diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index b1fd71e9..94c89deb 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -16,14 +16,9 @@ import { type MatrixClient } from "matrix-js-sdk"; import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import { deepCompare } from "matrix-js-sdk/lib/utils"; import { type Behavior } from "../../Behavior.ts"; -import { - Epoch, - mapEpoch, - type ObservableScope, -} from "../../ObservableScope.ts"; +import { type Epoch, type ObservableScope } from "../../ObservableScope.ts"; import { Config } from "../../../config/Config.ts"; import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts"; import { getSFUConfigWithOpenID } from "../../../livekit/openIDSFU.ts"; diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index cae45d4a..60251541 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -18,7 +18,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, type Observable } from "rxjs"; +import { BehaviorSubject, map, type Observable } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -184,7 +184,7 @@ export class Connection { * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ - public readonly participantsWithTrack$: Behavior; + public readonly participants$: Behavior; /** * The media transport to connect to. @@ -211,13 +211,19 @@ export class Connection { this.transport = transport; this.client = client; - this.participantsWithTrack$ = scope.behavior( + this.participants$ = scope.behavior( + // only tracks remote participants connectedParticipantsObserver(this.livekitRoom, { additionalRoomEvents: [ RoomEvent.TrackPublished, RoomEvent.TrackUnpublished, ], - }), + }).pipe( + map((participants) => [ + this.livekitRoom.localParticipant, + ...participants, + ]), + ), [], ); diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index ce984aec..73ca3d16 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -181,7 +181,7 @@ export function createConnectionManager$({ // Map the connections to list of {connection, participants}[] const listOfConnectionsWithPublishingParticipants = connections.value.map((connection) => { - return connection.participantsWithTrack$.pipe( + return connection.participants$.pipe( map((participants) => ({ connection, participants, diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 764862f2..fbfd0563 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -105,6 +105,9 @@ export function createMatrixLivekitMembers$({ new Epoch([membershipsWithTransports, managerData] as const, epoch), ), generateItemsWithEpoch( + // Generator function. + // creates an array of `{key, data}[]` + // Each change in the keys (new key, missing key) will result in a call to the factory function. function* ([membershipsWithTransports, managerData]) { for (const { membership, transport } of membershipsWithTransports) { // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to @@ -125,8 +128,11 @@ export function createMatrixLivekitMembers$({ }; } }, + // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. (scope, data$, participantId, userId) => { const member = matrixRoom.getMember(userId); + // will only get called once per `participantId, userId` pair. + // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. return { participantId, userId, @@ -134,11 +140,9 @@ export function createMatrixLivekitMembers$({ displayName$: scope.behavior( displaynameMap$.pipe( map((displayNames) => { - const name = displayNames.get(userId); - if (name === undefined) { + const name = displayNames.get(userId) ?? ""; + if (name === "") logger.warn(`No display name for user ${userId}`); - return ""; - } return name; }), ), diff --git a/src/state/ObservableScope.test.ts b/src/state/ObservableScope.test.ts index 4b0f3b4f..b41f47c2 100644 --- a/src/state/ObservableScope.test.ts +++ b/src/state/ObservableScope.test.ts @@ -6,7 +6,8 @@ Please see LICENSE in the repository root for full details. */ import { describe, expect, it } from "vitest"; -import { BehaviorSubject, timer } from "rxjs"; +import { BehaviorSubject, combineLatest, timer } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { Epoch, @@ -72,4 +73,28 @@ describe("Epoch", () => { scope.behavior(a$, undefined); }); + + it("diamonds emits in a predictable order", () => { + const sb$ = new BehaviorSubject("initial"); + const root$ = sb$.pipe(trackEpoch()); + const derivedA$ = root$.pipe(mapEpoch((e) => e + "-A")); + const derivedB$ = root$.pipe(mapEpoch((e) => e + "-B")); + combineLatest([root$, derivedB$, derivedA$]).subscribe( + ([root, derivedA, derivedB]) => { + logger.log( + "combined" + + root.epoch + + root.value + + "\n" + + derivedA.epoch + + derivedA.value + + "\n" + + derivedB.epoch + + derivedB.value, + ); + }, + ); + sb$.next("updated"); + sb$.next("ANOTERUPDATE"); + }); });