diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index d9d68da3..420ecab4 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -52,7 +52,7 @@ import { throttleTime, timer, } from "rxjs"; -import { logger } from "matrix-js-sdk/lib/logger"; +import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { type MatrixRTCSession, MatrixRTCSessionEvent, @@ -110,17 +110,17 @@ import { } from "./layout-types.ts"; import { type ElementCallError } from "../utils/errors.ts"; import { type ObservableScope } from "./ObservableScope.ts"; -import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts"; import { createLocalMembership$ } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createMemberships$, - createSessionMembershipsAndTransports$, membershipsAndTransports$, } from "./SessionBehaviors.ts"; import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts"; import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts"; +import { createMatrixLivekitMembers$ } from "./remoteMembers/MatrixLivekitMembers.ts"; +const logger = rootLogger.getChild("[CallViewModel]"); //TODO // Larger rename // member,membership -> rtcMember @@ -193,10 +193,8 @@ export class CallViewModel { } : undefined; - private memberships$ = createMemberships$({ - scope: this.scope, - matrixRTCSession: this.matrixRTCSession, - }); + private memberships$ = createMemberships$(this.scope, this.matrixRTCSession); + private membershipsAndTransports = membershipsAndTransports$( this.scope, this.memberships$, @@ -225,7 +223,7 @@ export class CallViewModel { // Can contain duplicates. The connection manager will take care of this. private allTransports$ = this.scope.behavior( combineLatest( - [this.localTransport$, this.sessionBehaviors.transports$], + [this.localTransport$, this.membershipsAndTransports.transports$], (localTransport, transports) => { const localTransportAsArray = localTransport ? [localTransport] : []; return [...localTransportAsArray, ...transports]; @@ -243,11 +241,10 @@ export class CallViewModel { private matrixLivekitMembers$ = createMatrixLivekitMembers$({ scope: this.scope, - membershipsWithTransport$: this.sessionBehaviors.membershipsWithTransport$, + membershipsWithTransport$: + this.membershipsAndTransports.membershipsWithTransport$, connectionManager: this.connectionManager, matrixRoom: this.matrixRoom, - userId: this.userId, - deviceId: this.deviceId, }); private connectOptions$ = this.scope.behavior( @@ -357,7 +354,7 @@ export class CallViewModel { connection, participant, member, - displayName$, + displayName, participantId, } of matrixLivekitMembers) { if (connection === undefined) { @@ -368,7 +365,7 @@ export class CallViewModel { const mediaId = `${participantId}:${i}`; const lkRoom = connection?.livekitRoom; const url = connection?.transport.livekit_service_url; - const dpName$ = displayName$.pipe(map((n) => n ?? "[👻]")); + const item = createOrGet( mediaId, (scope) => @@ -385,7 +382,7 @@ export class CallViewModel { url, this.mediaDevices, this.pretendToBeDisconnected$, - dpName$, + constant(displayName ?? "[👻]"), this.handsRaised$.pipe( map((v) => v[participantId]?.time ?? null), ), @@ -412,7 +409,7 @@ export class CallViewModel { lkRoom, url, this.pretendToBeDisconnected$, - dpName$, + constant(displayName ?? "[👻]"), ), ), ); diff --git a/src/state/SessionBehaviors.ts b/src/state/SessionBehaviors.ts index ed207835..80e9f09c 100644 --- a/src/state/SessionBehaviors.ts +++ b/src/state/SessionBehaviors.ts @@ -17,11 +17,6 @@ import { fromEvent, map } from "rxjs"; import { type ObservableScope } from "./ObservableScope"; import { type Behavior } from "./Behavior"; -interface Props { - scope: ObservableScope; - matrixRTCSession: MatrixRTCSession; -} - export const membershipsAndTransports$ = ( scope: ObservableScope, memberships$: Behavior, @@ -66,10 +61,10 @@ export const membershipsAndTransports$ = ( }; }; -export const createMemberships$ = ({ - scope, - matrixRTCSession, -}: Props): Behavior => { +export const createMemberships$ = ( + scope: ObservableScope, + matrixRTCSession: MatrixRTCSession, +): Behavior => { return scope.behavior( fromEvent( matrixRTCSession, @@ -78,29 +73,3 @@ export const createMemberships$ = ({ ), ); }; - -export const createSessionMembershipsAndTransports$ = ({ - scope, - matrixRTCSession, -}: Props): { - memberships$: Behavior; - membershipsWithTransport$: Behavior< - { membership: CallMembership; transport?: LivekitTransport }[] - >; - transports$: Behavior; -} => { - const memberships$ = scope.behavior( - fromEvent( - matrixRTCSession, - MatrixRTCSessionEvent.MembershipsChanged, - (_, memberships: CallMembership[]) => memberships, - ), - ); - - const memberAndTransport = membershipsAndTransports$(scope, memberships$); - - return { - memberships$, - ...memberAndTransport, - }; -}; diff --git a/src/state/remoteMembers/ConnectionManager.test.ts b/src/state/remoteMembers/ConnectionManager.test.ts index 8be5bc35..1b1a6ffe 100644 --- a/src/state/remoteMembers/ConnectionManager.test.ts +++ b/src/state/remoteMembers/ConnectionManager.test.ts @@ -12,7 +12,7 @@ import { type Participant as LivekitParticipant } from "livekit-client"; import { ObservableScope } from "../ObservableScope.ts"; import { - type ConnectionManagerReturn, + type IConnectionManager, createConnectionManager$, } from "./ConnectionManager.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; @@ -47,7 +47,7 @@ let connectionManagerInputs: { connectionFactory: ConnectionFactory; inputTransports$: BehaviorSubject; }; -let manager: ConnectionManagerReturn; +let manager: IConnectionManager; beforeEach(() => { testScope = new ObservableScope(); diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index 2cb6957d..7cee0756 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -21,7 +21,7 @@ import { type Behavior } from "../Behavior"; import { type Connection } from "./Connection"; import { type ObservableScope } from "../ObservableScope"; import { generateKeyed$ } from "../../utils/observable"; -import { areLivekitTransportsEqual } from "./MatrixLivekitMembers"; +import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; export class ConnectionManagerData { @@ -93,13 +93,11 @@ interface Props { inputTransports$: Behavior; } // TODO - write test for scopes (do we really need to bind scope) - -export interface ConnectionManagerReturn { - deduplicatedTransports$: Behavior; +export interface IConnectionManager { + transports$: Behavior; connectionManagerData$: Behavior; connections$: Behavior; } - /** * Crete a `ConnectionManager` * @param scope the observable scope used by this object. @@ -118,7 +116,7 @@ export function createConnectionManager$({ scope, connectionFactory, inputTransports$, -}: Props): ConnectionManagerReturn { +}: Props): IConnectionManager { const logger = rootLogger.getChild("ConnectionManager"); const running$ = new BehaviorSubject(true); @@ -133,10 +131,13 @@ export function createConnectionManager$({ * It is build based on the list of subscribed transports (`transportsSubscriptions$`). * externally this is modified via `registerTransports()`. */ - const deduplicatedTransports$ = scope.behavior( + const transports$ = scope.behavior( combineLatest([running$, inputTransports$]).pipe( - map(([running, transports]) => (running ? transports : [])), - map(removeDuplicateTransports), + map(([running, transports]) => ({ + epoch: transports.epoch, + value: running ? transports.value : [], + })), + map((transports) => removeDuplicateTransports(transports.value)), ), ); @@ -145,7 +146,7 @@ export function createConnectionManager$({ */ const connections$ = scope.behavior( generateKeyed$( - deduplicatedTransports$, + transports$, (transports, createOrGet) => { const createConnection = ( @@ -204,7 +205,7 @@ export function createConnectionManager$({ // start empty new ConnectionManagerData(), ); - return { deduplicatedTransports$, connectionManagerData$, connections$ }; + return { transports$, connectionManagerData$, connections$ }; } function removeDuplicateTransports( diff --git a/src/state/remoteMembers/MatrixLivekitMerger.test.ts b/src/state/remoteMembers/MatrixLivekitMembers.test.ts similarity index 94% rename from src/state/remoteMembers/MatrixLivekitMerger.test.ts rename to src/state/remoteMembers/MatrixLivekitMembers.test.ts index 71a1398c..75534e1f 100644 --- a/src/state/remoteMembers/MatrixLivekitMerger.test.ts +++ b/src/state/remoteMembers/MatrixLivekitMembers.test.ts @@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details. */ import { describe, test, vi, expect, beforeEach, afterEach } from "vitest"; -import { BehaviorSubject } from "rxjs"; import { type CallMembership, type LivekitTransport, @@ -14,14 +13,14 @@ import { import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { getParticipantId } from "matrix-js-sdk/lib/matrixrtc/utils"; -import { type ConnectionManagerReturn } from "./ConnectionManager.ts"; +import { type IConnectionManager } from "./ConnectionManager.ts"; import { type MatrixLivekitMember, createMatrixLivekitMembers$, areLivekitTransportsEqual, -} from "./MatrixLivekitMembers"; -import { ObservableScope } from "../ObservableScope"; -import { ConnectionManagerData } from "./ConnectionManager"; +} from "./MatrixLivekitMembers.ts"; +import { ObservableScope } from "../ObservableScope.ts"; +import { ConnectionManagerData } from "./ConnectionManager.ts"; import { mockCallMembership, mockRemoteParticipant, @@ -32,8 +31,6 @@ import { type Connection } from "./Connection.ts"; let testScope: ObservableScope; let mockMatrixRoom: MatrixRoom; -const userId = "@local:example.com"; -const deviceId = "DEVICE000"; // The merger beeing tested @@ -87,8 +84,6 @@ test("should signal participant not yet connected to livekit", () => { connections$: behavior("a", { a: [] }), }, matrixRoom: mockMatrixRoom, - userId, - deviceId, }); expectObservable(matrixLivekitMember$).toBe("a", { @@ -106,14 +101,14 @@ test("should signal participant not yet connected to livekit", () => { function aConnectionManager( data: ConnectionManagerData, - behavior: Pick, -): ConnectionManagerReturn { + behavior: OurRunHelpers["behavior"], +): IConnectionManager { return { connectionManagerData$: behavior("a", { a: data }), transports$: behavior("a", { - a: [data.getConnections().map((connection) => connection.transport)], + a: data.getConnections().map((connection) => connection.transport), }), - connections$: behavior("a", { a: [data.getConnections()] }), + connections$: behavior("a", { a: data.getConnections() }), }; } @@ -154,8 +149,6 @@ test("should signal participant on a connection that is publishing", () => { }), connectionManager: aConnectionManager(connectionWithPublisher, behavior), matrixRoom: mockMatrixRoom, - userId, - deviceId, }); expectObservable(matrixLivekitMember$).toBe("a", { @@ -205,8 +198,6 @@ test("should signal participant on a connection that is not publishing", () => { }), connectionManager: aConnectionManager(connectionWithPublisher, behavior), matrixRoom: mockMatrixRoom, - userId, - deviceId, }); expectObservable(matrixLivekitMember$).toBe("a", { @@ -278,8 +269,6 @@ describe("Publication edge case", () => { behavior, ), matrixRoom: mockMatrixRoom, - userId, - deviceId, }); expectObservable(matrixLivekitMember$).toBe("a", { @@ -352,8 +341,6 @@ describe("Publication edge case", () => { behavior, ), matrixRoom: mockMatrixRoom, - userId, - deviceId, }); expectObservable(matrixLivekitMember$).toBe("a", { diff --git a/src/state/remoteMembers/MatrixLivekitMembers.ts b/src/state/remoteMembers/MatrixLivekitMembers.ts index dd54d092..0f5234fa 100644 --- a/src/state/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/remoteMembers/MatrixLivekitMembers.ts @@ -13,14 +13,14 @@ import { type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, map, type Observable } from "rxjs"; +import { combineLatest, filter, map, skipWhile, type Observable } from "rxjs"; // eslint-disable-next-line rxjs/no-internal import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { type Behavior } from "../Behavior"; +import { type IConnectionManager } from "./ConnectionManager"; import { type ObservableScope } from "../ObservableScope"; -import type * as ConnectionManager from "./ConnectionManager"; import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; import { type Connection } from "./Connection"; @@ -31,7 +31,7 @@ import { type Connection } from "./Connection"; */ export interface MatrixLivekitMember { membership: CallMembership; - displayName$: Behavior; + displayName?: string; participant?: LocalLivekitParticipant | RemoteLivekitParticipant; connection?: Connection; /** @@ -49,14 +49,12 @@ interface Props { membershipsWithTransport$: Behavior< { membership: CallMembership; transport?: LivekitTransport }[] >; - connectionManager: ConnectionManager.ConnectionManagerReturn; + connectionManager: IConnectionManager; // TODO this is too much information for that class, // apparently needed to get a room member to later get the Avatar // => Extract an AvatarService instead? // Better with just `getMember` matrixRoom: Pick & NodeStyleEventEmitter; - userId: string; - deviceId: string; } // Alternative structure idea: // const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable => { @@ -76,27 +74,30 @@ export function createMatrixLivekitMembers$({ membershipsWithTransport$, connectionManager, matrixRoom, - userId, - deviceId, }: Props): Behavior { /** * Stream of all the call members and their associated livekit data (if available). */ - function createMatrixLivekitMember$(): Observable { - const displaynameMap$ = memberDisplaynames$( - scope, - matrixRoom, - membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))), - userId, - deviceId, - ); + const displaynameMap$ = memberDisplaynames$( + scope, + matrixRoom, + membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))), + ); - return combineLatest([ + return scope.behavior( + combineLatest([ membershipsWithTransport$, connectionManager.connectionManagerData$, + displaynameMap$, ]).pipe( - map(([memberships, managerData]) => { + filter( + ([membershipsWithTransports, managerData, displaynames]) => + // for each change in + displaynames.size === membershipsWithTransports.length && + displaynames.size === managerData.getConnections().length, + ), + map(([memberships, managerData, displaynames]) => { const items: MatrixLivekitMember[] = memberships.map( ({ membership, transport }) => { // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to @@ -115,22 +116,15 @@ export function createMatrixLivekitMembers$({ const connection = transport ? managerData.getConnectionForTransport(transport) : undefined; - const displayName$ = scope.behavior( - displaynameMap$.pipe( - map( - (displayNameMap) => - displayNameMap.get(membership.membershipID) ?? "---", - ), - ), - ); + const displayName = displaynames.get(participantId); return { participant, membership, connection, - // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. member: member as RoomMember, - displayName$, + displayName, mxcAvatarUrl: member?.getMxcAvatarUrl(), participantId, }; @@ -138,10 +132,8 @@ export function createMatrixLivekitMembers$({ ); return items; }), - ); - } - - return scope.behavior(createMatrixLivekitMember$(), []); + ), + ); } // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index 35236030..e735147d 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -37,8 +37,6 @@ export const memberDisplaynames$ = ( scope: ObservableScope, matrixRoom: Pick & NodeStyleEventEmitter, memberships$: Observable, - userId: string, - deviceId: string, ): Behavior> => scope.behavior( combineLatest([ @@ -49,12 +47,7 @@ export const memberDisplaynames$ = ( // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), ]).pipe( map(([memberships, _displayNames]) => { - const displaynameMap = new Map([ - [ - `${userId}:${deviceId}`, - matrixRoom.getMember(userId)?.rawDisplayName ?? userId, - ], - ]); + const displaynameMap = new Map(); const room = matrixRoom; // We only consider RTC members for disambiguation as they are the only visible members. diff --git a/src/state/remoteMembers/integration.test.ts b/src/state/remoteMembers/integration.test.ts index be134306..55db4009 100644 --- a/src/state/remoteMembers/integration.test.ts +++ b/src/state/remoteMembers/integration.test.ts @@ -26,8 +26,12 @@ import { createMatrixLivekitMembers$, type MatrixLivekitMember, } from "./MatrixLivekitMembers.ts"; -import { createConnectionManager$ } from "./ConnectionManager.ts"; +import { + ConnectionManagerData, + createConnectionManager$, +} from "./ConnectionManager.ts"; import { membershipsAndTransports$ } from "../SessionBehaviors.ts"; +import { Connection } from "./Connection.ts"; // Test the integration of ConnectionManager and MatrixLivekitMerger @@ -109,61 +113,79 @@ test("example test 2", () => { const bobMembership = mockCallMembership("@bob:example.com", "BDEV000"); const carlMembership = mockCallMembership("@carl:example.com", "CDEV000"); const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000"); - const memberships$ = behavior("ab---c", { + const memberships$ = behavior("abc", { a: [bobMembership], b: [bobMembership, carlMembership], c: [bobMembership, carlMembership, daveMembership], }); - const transports$ = testScope.behavior( - memberships$.pipe( - map((memberships) => { - return memberships.map((membership) => { - return membership.getTransport(memberships[0]) as LivekitTransport; - }); - }), - ), + const membershipsAndTransports = membershipsAndTransports$( + testScope, + memberships$, ); const connectionManager = createConnectionManager$({ scope: testScope, connectionFactory: ecConnectionFactory, - inputTransports$: transports$, + inputTransports$: membershipsAndTransports.transports$, }); - const marixLivekitItems$ = createMatrixLivekitMembers$({ + const matrixLivekitItems$ = createMatrixLivekitMembers$({ scope: testScope, - membershipsWithTransport$: membershipsAndTransports$( - testScope, - memberships$, - ).membershipsWithTransport$, + membershipsWithTransport$: + membershipsAndTransports.membershipsWithTransport$, connectionManager, matrixRoom: mockMatrixRoom, - userId: "local:example.org", - deviceId: "ME00", }); - expectObservable(marixLivekitItems$).toBe("a(bb)(cc)", { + expectObservable(membershipsAndTransports.transports$).toBe("abc", { + a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1), + b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2), + c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 3), + }); + + expectObservable(membershipsAndTransports.membershipsWithTransport$).toBe( + "abc", + { + a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1), + b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2), + c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 3), + }, + ); + + expectObservable(connectionManager.transports$).toBe("abc", { + a: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1), + b: expect.toSatisfy((t: LivekitTransport[]) => t.length === 1), + c: expect.toSatisfy((t: LivekitTransport[]) => t.length === 2), + }); + + expectObservable(connectionManager.connectionManagerData$).toBe("abc", { + a: expect.toSatisfy( + (d: ConnectionManagerData) => d.getConnections().length === 1, + ), + b: expect.toSatisfy( + (d: ConnectionManagerData) => d.getConnections().length === 1, + ), + c: expect.toSatisfy( + (d: ConnectionManagerData) => d.getConnections().length === 2, + ), + }); + + expectObservable(connectionManager.connections$).toBe("abc", { + a: expect.toSatisfy((t: Connection[]) => t.length === 1), + b: expect.toSatisfy((t: Connection[]) => t.length === 1), + c: expect.toSatisfy((t: Connection[]) => t.length === 2), + }); + + expectObservable(matrixLivekitItems$).toBe("abc", { a: expect.toSatisfy((items: MatrixLivekitMember[]) => { - expect(items.length).toBe(1); - const item = items[0]!; - expect(item.membership).toStrictEqual(bobMembership); - expect(item.participant).toBeUndefined(); - return true; - }), - b: expect.toSatisfy((items: MatrixLivekitMember[]) => { - // TODO - // expect(items.length).toBe(2); - // + // expect(items.length).toBe(1); // const item = items[0]!; // expect(item.membership).toStrictEqual(bobMembership); // expect(item.participant).toBeUndefined(); - // - // { - // const item = items[1]!; - // expect(item.membership).toStrictEqual(carlMembership); - // expect(item.participant).toBeUndefined(); - // } + return true; + }), + b: expect.toSatisfy((items: MatrixLivekitMember[]) => { return true; }), c: expect.toSatisfy(() => true), diff --git a/src/utils/test.ts b/src/utils/test.ts index 96d274d1..bb19f2b1 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -78,11 +78,11 @@ export interface OurRunHelpers extends RunHelpers { * diagram. */ schedule: (marbles: string, actions: Record void>) => void; - behavior( + behavior: ( marbles: string, values?: { [marble: string]: T }, error?: unknown, - ): Behavior; + ) => Behavior; scope: ObservableScope; }