From 284a52c23cbbed7b43f9aa4a46836751f675d1e8 Mon Sep 17 00:00:00 2001 From: Timo K Date: Mon, 1 Dec 2025 12:43:17 +0100 Subject: [PATCH] mvp --- godot/index.html | 6 +- godot/main.ts | 198 ++++++++++-------- src/state/CallViewModel/CallViewModel.ts | 24 ++- .../CallViewModel/localMember/Publisher.ts | 33 ++- .../remoteMembers/Connection.test.ts | 28 ++- .../CallViewModel/remoteMembers/Connection.ts | 29 +-- .../remoteMembers/ConnectionFactory.ts | 2 +- .../remoteMembers/ConnectionManager.test.ts | 52 ++--- .../remoteMembers/ConnectionManager.ts | 39 ++-- .../MatrixLivekitMembers.test.ts | 121 ++++++----- .../remoteMembers/MatrixLivekitMembers.ts | 11 +- .../remoteMembers/integration.test.ts | 4 +- 12 files changed, 296 insertions(+), 251 deletions(-) diff --git a/godot/index.html b/godot/index.html index ff654748..7d5f96c0 100644 --- a/godot/index.html +++ b/godot/index.html @@ -20,7 +20,7 @@ await window.matrixRTCSdk.join(); console.info("matrixRTCSdk joined "); - // sdk.data$.subscribe((data) => { + // window.matrixRTCSdk.data$.subscribe((data) => { // console.log(data); // const div = document.getElementById("data"); // div.appendChild(document.createTextNode(data)); @@ -36,9 +36,9 @@ - +
diff --git a/godot/main.ts b/godot/main.ts index 98bb4972..ede612cb 100644 --- a/godot/main.ts +++ b/godot/main.ts @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ // import { type InitResult } from "../src/ClientContext"; -import { map, type Observable, of, Subject, switchMap } from "rxjs"; +import { map, type Observable, of, Subject, switchMap, tap } from "rxjs"; import { MatrixRTCSessionEvent } from "matrix-js-sdk/lib/matrixrtc"; import { type TextStreamInfo } from "livekit-client/dist/src/room/types"; import { @@ -36,7 +36,7 @@ interface MatrixRTCSdk { /** @throws on leave errors */ leave: () => void; data$: Observable<{ sender: string; data: string }>; - sendData?: (data: Record) => Promise; + sendData?: (data: unknown) => Promise; } export async function createMatrixRTCSdk(): Promise { logger.info("Hello"); @@ -67,97 +67,115 @@ export async function createMatrixRTCSdk(): Promise { // create data listener const data$ = new Subject<{ sender: string; data: string }>(); - // const lkTextStreamHandlerFunction = async ( - // reader: TextStreamReader, - // participantInfo: { identity: string }, - // livekitRoom: LivekitRoom, - // ): Promise => { - // const info = reader.info; - // console.log( - // `Received text stream from ${participantInfo.identity}\n` + - // ` Topic: ${info.topic}\n` + - // ` Timestamp: ${info.timestamp}\n` + - // ` ID: ${info.id}\n` + - // ` Size: ${info.size}`, // Optional, only available if the stream was sent with `sendText` - // ); + const lkTextStreamHandlerFunction = async ( + reader: TextStreamReader, + participantInfo: { identity: string }, + livekitRoom: LivekitRoom, + ): Promise => { + const info = reader.info; + logger.info( + `Received text stream from ${participantInfo.identity}\n` + + ` Topic: ${info.topic}\n` + + ` Timestamp: ${info.timestamp}\n` + + ` ID: ${info.id}\n` + + ` Size: ${info.size}`, // Optional, only available if the stream was sent with `sendText` + ); - // const participants = callViewModel.livekitRoomItems$.value.find( - // (i) => i.livekitRoom === livekitRoom, - // )?.participants; - // if (participants && participants.includes(participantInfo.identity)) { - // const text = await reader.readAll(); - // console.log(`Received text: ${text}`); - // data$.next({ sender: participantInfo.identity, data: text }); - // } else { - // logger.warn( - // "Received text from unknown participant", - // participantInfo.identity, - // ); - // } - // }; + const participants = callViewModel.livekitRoomItems$.value.find( + (i) => i.livekitRoom === livekitRoom, + )?.participants; + if (participants && participants.includes(participantInfo.identity)) { + const text = await reader.readAll(); + logger.info(`Received text: ${text}`); + data$.next({ sender: participantInfo.identity, data: text }); + } else { + logger.warn( + "Received text from unknown participant", + participantInfo.identity, + ); + } + }; - // const livekitRoomItemsSub = callViewModel.livekitRoomItems$ - // .pipe(currentAndPrev) - // .subscribe({ - // next: ({ prev, current }) => { - // const prevRooms = prev.map((i) => i.livekitRoom); - // const currentRooms = current.map((i) => i.livekitRoom); - // const addedRooms = currentRooms.filter((r) => !prevRooms.includes(r)); - // const removedRooms = prevRooms.filter((r) => !currentRooms.includes(r)); - // addedRooms.forEach((r) => - // r.registerTextStreamHandler( - // TEXT_LK_TOPIC, - // (reader, participantInfo) => - // void lkTextStreamHandlerFunction(reader, participantInfo, r), - // ), - // ); - // removedRooms.forEach((r) => - // r.unregisterTextStreamHandler(TEXT_LK_TOPIC), - // ); - // }, - // complete: () => { - // logger.info("Livekit room items subscription completed"); - // for (const item of callViewModel.livekitRoomItems$.value) { - // logger.info("unregistering room item from room", item.url); - // item.livekitRoom.unregisterTextStreamHandler(TEXT_LK_TOPIC); - // } - // }, - // }); + const livekitRoomItemsSub = callViewModel.livekitRoomItems$ + .pipe( + tap((beforecurrentAndPrev) => { + logger.info( + `LiveKit room items updated: ${beforecurrentAndPrev.length}`, + beforecurrentAndPrev, + ); + }), + currentAndPrev, + tap((aftercurrentAndPrev) => { + logger.info( + `LiveKit room items updated: ${aftercurrentAndPrev.current.length}, ${aftercurrentAndPrev.prev.length}`, + aftercurrentAndPrev, + ); + }), + ) + .subscribe({ + next: ({ prev, current }) => { + const prevRooms = prev.map((i) => i.livekitRoom); + const currentRooms = current.map((i) => i.livekitRoom); + const addedRooms = currentRooms.filter((r) => !prevRooms.includes(r)); + const removedRooms = prevRooms.filter((r) => !currentRooms.includes(r)); + addedRooms.forEach((r) => { + logger.info(`Registering text stream handler for room `); + r.registerTextStreamHandler( + TEXT_LK_TOPIC, + (reader, participantInfo) => + void lkTextStreamHandlerFunction(reader, participantInfo, r), + ); + }); + removedRooms.forEach((r) => { + logger.info(`Unregistering text stream handler for room `); + r.unregisterTextStreamHandler(TEXT_LK_TOPIC); + }); + }, + complete: () => { + logger.info("Livekit room items subscription completed"); + for (const item of callViewModel.livekitRoomItems$.value) { + logger.info("unregistering room item from room", item.url); + item.livekitRoom.unregisterTextStreamHandler(TEXT_LK_TOPIC); + } + }, + }); // create sendData function - // const sendFn: Behavior<(data: string) => Promise> = - // scope.behavior( - // callViewModel.localMatrixLivekitMember$.pipe( - // switchMap((m) => { - // if (!m) - // return of((data: string): never => { - // throw Error("local membership not yet ready."); - // }); - // return m.participant$.pipe( - // map((p) => { - // if (p === null) { - // return (data: string): never => { - // throw Error("local participant not yet ready to send data."); - // }; - // } else { - // return async (data: string): Promise => - // p.sendText(data, { topic: TEXT_LK_TOPIC }); - // } - // }), - // ); - // }), - // ), - // ); + const sendFn: Behavior<(data: string) => Promise> = + scope.behavior( + callViewModel.localmatrixLivekitMembers$.pipe( + switchMap((m) => { + if (!m) + return of((data: string): never => { + throw Error("local membership not yet ready."); + }); + return m.participant$.pipe( + map((p) => { + if (p === null) { + return (data: string): never => { + throw Error("local participant not yet ready to send data."); + }; + } else { + return async (data: string): Promise => + p.sendText(data, { topic: TEXT_LK_TOPIC }); + } + }), + ); + }), + ), + ); - // const sendData = async (data: Record): Promise => { - // const dataString = JSON.stringify(data); - // try { - // const info = await sendFn.value(dataString); - // logger.info(`Sent text with stream ID: ${info.id}`); - // } catch (e) { - // console.error("failed sending: ", dataString, e); - // } - // }; + const sendData = async (data: unknown): Promise => { + const dataString = JSON.stringify(data); + logger.info("try sending: ", dataString); + try { + await Promise.resolve(); + const info = await sendFn.value(dataString); + logger.info(`Sent text with stream ID: ${info.id}`); + } catch (e) { + logger.error("failed sending: ", dataString, e); + } + }; // after hangup gets called const leaveSubs = callViewModel.leave$.subscribe(() => { @@ -202,9 +220,9 @@ export async function createMatrixRTCSdk(): Promise { leave: (): void => { callViewModel.hangup(); leaveSubs.unsubscribe(); - // livekitRoomItemsSub.unsubscribe(); + livekitRoomItemsSub.unsubscribe(); }, data$, - // sendData, + sendData, }; } diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 253eb05e..a4738f77 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -264,7 +264,7 @@ export interface CallViewModel { livekitRoomItems$: Behavior; /** use the layout instead, this is just for the godot export. */ userMedia$: Behavior; - localMatrixLivekitMember$: Behavior; + localmatrixLivekitMembers$: Behavior; /** List of participants raising their hand */ handsRaised$: Behavior>; /** List of reactions. Keys are: membership.membershipId (currently predefined as: `${membershipEvent.userId}:${membershipEvent.deviceId}`)*/ @@ -449,7 +449,7 @@ export function createCallViewModel$( logger: logger, }); - const matrixLivekitMembers$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: scope, membershipsWithTransport$: membershipsAndTransports.membershipsWithTransport$, @@ -515,7 +515,7 @@ export function createCallViewModel$( userId: userId, }; - const localMatrixLivekitMember$: Behavior = + const localmatrixLivekitMembers$: Behavior = scope.behavior( localRtcMembership$.pipe( switchMap((membership) => { @@ -607,8 +607,11 @@ export function createCallViewModel$( const reconnecting$ = localMembership.reconnecting$; const pretendToBeDisconnected$ = reconnecting$; - const audioParticipants$ = scope.behavior( + const livekitRoomItems$ = scope.behavior( matrixLivekitMembers$.pipe( + tap((val) => { + logger.debug("matrixLivekitMembers$ updated", val.value); + }), switchMap((membersWithEpoch) => { const members = membersWithEpoch.value; const a$ = combineLatest( @@ -649,6 +652,12 @@ export function createCallViewModel$( return acc; }, []), ), + tap((val) => { + logger.debug( + "livekitRoomItems$ updated", + val.map((v) => v.url), + ); + }), ), [], ); @@ -676,7 +685,7 @@ export function createCallViewModel$( */ const userMedia$ = scope.behavior( combineLatest([ - localMatrixLivekitMember$, + localmatrixLivekitMembers$, matrixLivekitMembers$, duplicateTiles.value$, ]).pipe( @@ -1489,8 +1498,7 @@ export function createCallViewModel$( ), participantCount$: participantCount$, - livekitRoomItems$: audioParticipants$, - + livekitRoomItems$, handsRaised$: handsRaised$, reactions$: reactions$, joinSoundEffect$: joinSoundEffect$, @@ -1510,7 +1518,7 @@ export function createCallViewModel$( pip$: pip$, layout$: layout$, userMedia$, - localMatrixLivekitMember$, + localmatrixLivekitMembers$, tileStoreGeneration$: tileStoreGeneration$, showSpotlightIndicators$: showSpotlightIndicators$, showSpeakingIndicators$: showSpeakingIndicators$, diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 11f35424..2508637e 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -56,15 +56,15 @@ export class Publisher { devices: MediaDevices, private readonly muteStates: MuteStates, trackerProcessorState$: Behavior, - private logger?: Logger, + private logger: Logger, ) { - this.logger?.info("[PublishConnection] Create LiveKit room"); + this.logger.info("[PublishConnection] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => { - this.logger?.error("Failed to set E2EE enabled on room", e); + this.logger.error("Failed to set E2EE enabled on room", e); }); // Setup track processor syncing (blur) @@ -74,7 +74,7 @@ export class Publisher { this.workaroundRestartAudioInputTrackChrome(devices, scope); this.scope.onEnd(() => { - this.logger?.info( + this.logger.info( "[PublishConnection] Scope ended -> stop publishing all tracks", ); void this.stopPublishing(); @@ -132,13 +132,14 @@ export class Publisher { video, }) .catch((error) => { - this.logger?.error("Failed to create tracks", error); + this.logger.error("Failed to create tracks", error); })) ?? []; } return this.tracks; } public async startPublishing(): Promise { + this.logger.info("Start publishing"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { @@ -150,7 +151,7 @@ export class Publisher { reject(new Error("Failed to connect to LiveKit server")); break; default: - this.logger?.info("waiting for connection: ", s.state); + this.logger.info("waiting for connection: ", s.state); } }); try { @@ -160,12 +161,14 @@ export class Publisher { } finally { sub.unsubscribe(); } + this.logger.info("publish ", this.tracks.length, "tracks"); for (const track of this.tracks) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger?.error("Failed to publish track", error); + this.logger.error("Failed to publish track", error); }); + this.logger.info("published track ", track.kind, track.id); // TODO: check if the connection is still active? and break the loop if not? } @@ -229,7 +232,7 @@ export class Publisher { .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { - this.logger?.error(`Failed to restart audio device track`, e); + this.logger.error(`Failed to restart audio device track`, e); }); } }); @@ -249,7 +252,7 @@ export class Publisher { selected$.pipe(scope.bind()).subscribe((device) => { if (lkRoom.state != LivekitConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; - this.logger?.info( + this.logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", lkRoom.getActiveDevice(kind), " !== ", @@ -262,7 +265,7 @@ export class Publisher { lkRoom .switchActiveDevice(kind, device.id) .catch((e: Error) => - this.logger?.error( + this.logger.error( `Failed to sync ${kind} device with LiveKit`, e, ), @@ -287,10 +290,7 @@ export class Publisher { try { await lkRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit audio input mute state", - e, - ); + this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); @@ -298,10 +298,7 @@ export class Publisher { try { await lkRoom.localParticipant.setCameraEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit video input mute state", - e, - ); + this.logger.error("Failed to update LiveKit video input mute state", e); } return lkRoom.localParticipant.isCameraEnabled; }); diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 3f58bcf6..f719e86b 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -382,17 +382,15 @@ describe("Publishing participants observations", () => { const bobIsAPublisher = Promise.withResolvers(); const danIsAPublisher = Promise.withResolvers(); const observedPublishers: PublishingParticipant[][] = []; - const s = connection.remoteParticipantsWithTracks$.subscribe( - (publishers) => { - observedPublishers.push(publishers); - if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) { - bobIsAPublisher.resolve(); - } - if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) { - danIsAPublisher.resolve(); - } - }, - ); + const s = connection.remoteParticipants$.subscribe((publishers) => { + observedPublishers.push(publishers); + if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) { + bobIsAPublisher.resolve(); + } + if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) { + danIsAPublisher.resolve(); + } + }); onTestFinished(() => s.unsubscribe()); // The publishingParticipants$ observable is derived from the current members of the // livekitRoom and the rtc membership in order to publish the members that are publishing @@ -437,11 +435,9 @@ describe("Publishing participants observations", () => { const connection = setupRemoteConnection(); let observedPublishers: PublishingParticipant[][] = []; - const s = connection.remoteParticipantsWithTracks$.subscribe( - (publishers) => { - observedPublishers.push(publishers); - }, - ); + const s = connection.remoteParticipants$.subscribe((publishers) => { + observedPublishers.push(publishers); + }); onTestFinished(() => s.unsubscribe()); let participants: RemoteParticipant[] = [ diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c17fae2b..fd75e551 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -19,7 +19,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map, type Observable } from "rxjs"; +import { BehaviorSubject, type Observable } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -146,6 +146,10 @@ export class Connection { transport: this.transport, livekitConnectionState$: connectionStateObserver(this.livekitRoom), }); + this.logger.info( + "Connected to LiveKit room", + this.transport.livekit_service_url, + ); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ @@ -189,9 +193,7 @@ export class Connection { * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ - public readonly remoteParticipantsWithTracks$: Behavior< - PublishingParticipant[] - >; + public readonly remoteParticipants$: Behavior; /** * The media transport to connect to. @@ -213,7 +215,7 @@ export class Connection { public constructor(opts: ConnectionOpts, logger: Logger) { this.logger = logger.getChild("[Connection]"); this.logger.info( - `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, + `Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, ); const { transport, client, scope } = opts; @@ -223,20 +225,21 @@ export class Connection { // REMOTE participants with track!!! // this.remoteParticipantsWithTracks$ - this.remoteParticipantsWithTracks$ = scope.behavior( + this.remoteParticipants$ = scope.behavior( // only tracks remote participants connectedParticipantsObserver(this.livekitRoom, { additionalRoomEvents: [ RoomEvent.TrackPublished, RoomEvent.TrackUnpublished, ], - }).pipe( - map((participants) => { - return participants.filter( - (participant) => participant.getTrackPublications().length > 0, - ); - }), - ), + }), + // .pipe( + // map((participants) => { + // return participants.filter( + // (participant) => participant.getTrackPublications().length > 0, + // ); + // }), + // ) [], ); diff --git a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts index f58fcb76..0fb0b5a7 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts @@ -13,7 +13,7 @@ import { type BaseKeyProvider, } from "livekit-client"; import { type Logger } from "matrix-js-sdk/lib/logger"; -import E2EEWorker from "livekit-client/e2ee-worker?worker"; +import E2EEWorker from "livekit-client/e2ee-worker?worker&inline"; import { type ObservableScope } from "../../ObservableScope.ts"; import { Connection } from "./Connection.ts"; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index 484a44e7..b5076285 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -289,47 +289,47 @@ describe("connectionManagerData$ stream", () => { a: expect.toSatisfy((e) => { const data: ConnectionManagerData = e.value; expect(data.getConnections().length).toBe(2); - expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0); - expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0); + expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(0); + expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0); return true; }), b: expect.toSatisfy((e) => { const data: ConnectionManagerData = e.value; expect(data.getConnections().length).toBe(2); - expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1); - expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0); - expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( - "user1A", - ); + expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1); + expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0); + expect( + data.getParticipantsForTransport(TRANSPORT_1)[0].identity, + ).toBe("user1A"); return true; }), c: expect.toSatisfy((e) => { const data: ConnectionManagerData = e.value; expect(data.getConnections().length).toBe(2); - expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1); - expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1); - expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( - "user1A", - ); - expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe( - "user2A", - ); + expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1); + expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1); + expect( + data.getParticipantsForTransport(TRANSPORT_1)[0].identity, + ).toBe("user1A"); + expect( + data.getParticipantsForTransport(TRANSPORT_2)[0].identity, + ).toBe("user2A"); return true; }), d: expect.toSatisfy((e) => { const data: ConnectionManagerData = e.value; expect(data.getConnections().length).toBe(2); - expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2); - expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1); - expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe( - "user1A", - ); - expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe( - "user1B", - ); - expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe( - "user2A", - ); + expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(2); + expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1); + expect( + data.getParticipantsForTransport(TRANSPORT_1)[0].identity, + ).toBe("user1A"); + expect( + data.getParticipantsForTransport(TRANSPORT_1)[1].identity, + ).toBe("user1B"); + expect( + data.getParticipantsForTransport(TRANSPORT_2)[0].identity, + ).toBe("user2A"); return true; }), }); diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index d9a0380e..bd07cfa1 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -24,7 +24,10 @@ import { type ConnectionFactory } from "./ConnectionFactory.ts"; export class ConnectionManagerData { private readonly store: Map< string, - [Connection, (LocalParticipant | RemoteParticipant)[]] + { + connection: Connection; + participants: (LocalParticipant | RemoteParticipant)[]; + } > = new Map(); public constructor() {} @@ -36,9 +39,9 @@ export class ConnectionManagerData { const key = this.getKey(connection.transport); const existing = this.store.get(key); if (!existing) { - this.store.set(key, [connection, participants]); + this.store.set(key, { connection, participants }); } else { - existing[1].push(...participants); + existing.participants.push(...participants); } } @@ -47,25 +50,26 @@ export class ConnectionManagerData { } public getConnections(): Connection[] { - return Array.from(this.store.values()).map(([connection]) => connection); + return Array.from(this.store.values()).map(({ connection }) => connection); } public getConnectionForTransport( transport: LivekitTransport, ): Connection | null { - return this.store.get(this.getKey(transport))?.[0] ?? null; + return this.store.get(this.getKey(transport))?.connection ?? null; } - public getParticipantForTransport( + public getParticipantsForTransport( transport: LivekitTransport, ): (LocalParticipant | RemoteParticipant)[] { const key = transport.livekit_service_url + "|" + transport.livekit_alias; const existing = this.store.get(key); if (existing) { - return existing[1]; + return existing.participants; } return []; } + /** * Get all connections where the given participant is publishing. * In theory, there could be several connections where the same participant is publishing but with @@ -76,8 +80,12 @@ export class ConnectionManagerData { participantId: ParticipantId, ): Connection[] { const connections: Connection[] = []; - for (const [connection, participants] of this.store.values()) { - if (participants.some((p) => p.identity === participantId)) { + for (const { connection, participants } of this.store.values()) { + if ( + participants.some( + (participant) => participant?.identity === participantId, + ) + ) { connections.push(connection); } } @@ -183,23 +191,24 @@ export function createConnectionManager$({ const epoch = connections.epoch; // Map the connections to list of {connection, participants}[] - const listOfConnectionsWithPublishingParticipants = - connections.value.map((connection) => { - return connection.remoteParticipantsWithTracks$.pipe( + const listOfConnectionsWithParticipants = connections.value.map( + (connection) => { + return connection.remoteParticipants$.pipe( map((participants) => ({ connection, participants, })), ); - }); + }, + ); // probably not required - if (listOfConnectionsWithPublishingParticipants.length === 0) { + if (listOfConnectionsWithParticipants.length === 0) { return of(new Epoch(new ConnectionManagerData(), epoch)); } // combineLatest the several streams into a single stream with the ConnectionManagerData - return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( + return combineLatest(listOfConnectionsWithParticipants).pipe( map( (lists) => new Epoch( diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts index e675f723..7547a68b 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts @@ -91,7 +91,7 @@ test("should signal participant not yet connected to livekit", () => { }), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { @@ -99,21 +99,24 @@ test("should signal participant not yet connected to livekit", () => { } as unknown as IConnectionManager, }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: null, - }); - expectObservable(data[0].connection$).toBe("a", { - a: null, - }); - return true; - }), - }); + expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe( + "a", + { + a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: null, + }); + expectObservable(data[0].connection$).toBe("a", { + a: null, + }); + return true; + }), + }, + ); }); }); @@ -171,7 +174,7 @@ test("should signal participant on a connection that is publishing", () => { }), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { @@ -179,25 +182,28 @@ test("should signal participant on a connection that is publishing", () => { } as unknown as IConnectionManager, }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: expect.toSatisfy((participant) => { - expect(participant).toBeDefined(); - expect(participant!.identity).toEqual(bobParticipantId); - return true; - }), - }); - expectObservable(data[0].connection$).toBe("a", { - a: connection, - }); - return true; - }), - }); + expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe( + "a", + { + a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: expect.toSatisfy((participant) => { + expect(participant).toBeDefined(); + expect(participant!.identity).toEqual(bobParticipantId); + return true; + }), + }); + expectObservable(data[0].connection$).toBe("a", { + a: connection, + }); + return true; + }), + }, + ); }); }); @@ -222,7 +228,7 @@ test("should signal participant on a connection that is not publishing", () => { }), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior(membershipsWithTransport$), connectionManager: { @@ -230,21 +236,24 @@ test("should signal participant on a connection that is not publishing", () => { } as unknown as IConnectionManager, }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { - expect(data.length).toEqual(1); - expectObservable(data[0].membership$).toBe("a", { - a: bobMembership, - }); - expectObservable(data[0].participant$).toBe("a", { - a: null, - }); - expectObservable(data[0].connection$).toBe("a", { - a: connection, - }); - return true; - }), - }); + expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe( + "a", + { + a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + expect(data.length).toEqual(1); + expectObservable(data[0].membership$).toBe("a", { + a: bobMembership, + }); + expectObservable(data[0].participant$).toBe("a", { + a: null, + }); + expectObservable(data[0].connection$).toBe("a", { + a: connection, + }); + return true; + }), + }, + ); }); }); @@ -283,7 +292,7 @@ describe("Publication edge case", () => { }), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior( membershipsWithTransport$, @@ -293,7 +302,7 @@ describe("Publication edge case", () => { } as unknown as IConnectionManager, }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( + expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe( "a", { a: expect.toSatisfy((data: MatrixLivekitMember[]) => { @@ -349,7 +358,7 @@ describe("Publication edge case", () => { }), ); - const matrixLivekitMember$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: testScope.behavior( membershipsWithTransport$, @@ -359,7 +368,7 @@ describe("Publication edge case", () => { } as unknown as IConnectionManager, }); - expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( + expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe( "a", { a: expect.toSatisfy((data: MatrixLivekitMember[]) => { diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 2f152630..72e2883a 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -61,12 +61,12 @@ export function createMatrixLivekitMembers$({ scope, membershipsWithTransport$, connectionManager, -}: Props): Behavior> { +}: Props): { matrixLivekitMembers$: Behavior> } { /** * Stream of all the call members and their associated livekit data (if available). */ - return scope.behavior( + const matrixLivekitMembers$ = scope.behavior( combineLatest([ membershipsWithTransport$, connectionManager.connectionManagerData$, @@ -91,7 +91,7 @@ export function createMatrixLivekitMembers$({ const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; const participants = transport - ? managerData.getParticipantForTransport(transport) + ? managerData.getParticipantsForTransport(transport) : []; const participant = participants.find((p) => p.identity == participantId) ?? null; @@ -121,6 +121,11 @@ export function createMatrixLivekitMembers$({ ), ), ); + return { + matrixLivekitMembers$, + // TODO add only publishing participants... maybe. disucss at least + // scope.behavior(matrixLivekitMembers$.pipe(map((items) => items.value.map((i)=>{ i.})))) + }; } // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) diff --git a/src/state/CallViewModel/remoteMembers/integration.test.ts b/src/state/CallViewModel/remoteMembers/integration.test.ts index e3aa6be8..cafffb38 100644 --- a/src/state/CallViewModel/remoteMembers/integration.test.ts +++ b/src/state/CallViewModel/remoteMembers/integration.test.ts @@ -124,14 +124,14 @@ test("bob, carl, then bob joining no tracks yet", () => { logger: logger, }); - const matrixLivekitItems$ = createMatrixLivekitMembers$({ + const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({ scope: testScope, membershipsWithTransport$: membershipsAndTransports.membershipsWithTransport$, connectionManager, }); - expectObservable(matrixLivekitItems$).toBe(vMarble, { + expectObservable(matrixLivekitMembers$).toBe(vMarble, { a: expect.toSatisfy((e: Epoch) => { const items = e.value; expect(items.length).toBe(1);