diff --git a/godot/index.html b/godot/index.html
index ff654748..7d5f96c0 100644
--- a/godot/index.html
+++ b/godot/index.html
@@ -20,7 +20,7 @@
await window.matrixRTCSdk.join();
console.info("matrixRTCSdk joined ");
- // sdk.data$.subscribe((data) => {
+ // window.matrixRTCSdk.data$.subscribe((data) => {
// console.log(data);
// const div = document.getElementById("data");
// div.appendChild(document.createTextNode(data));
@@ -36,9 +36,9 @@
-
+
diff --git a/godot/main.ts b/godot/main.ts
index 98bb4972..ede612cb 100644
--- a/godot/main.ts
+++ b/godot/main.ts
@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
*/
// import { type InitResult } from "../src/ClientContext";
-import { map, type Observable, of, Subject, switchMap } from "rxjs";
+import { map, type Observable, of, Subject, switchMap, tap } from "rxjs";
import { MatrixRTCSessionEvent } from "matrix-js-sdk/lib/matrixrtc";
import { type TextStreamInfo } from "livekit-client/dist/src/room/types";
import {
@@ -36,7 +36,7 @@ interface MatrixRTCSdk {
/** @throws on leave errors */
leave: () => void;
data$: Observable<{ sender: string; data: string }>;
- sendData?: (data: Record) => Promise;
+ sendData?: (data: unknown) => Promise;
}
export async function createMatrixRTCSdk(): Promise {
logger.info("Hello");
@@ -67,97 +67,115 @@ export async function createMatrixRTCSdk(): Promise {
// create data listener
const data$ = new Subject<{ sender: string; data: string }>();
- // const lkTextStreamHandlerFunction = async (
- // reader: TextStreamReader,
- // participantInfo: { identity: string },
- // livekitRoom: LivekitRoom,
- // ): Promise => {
- // const info = reader.info;
- // console.log(
- // `Received text stream from ${participantInfo.identity}\n` +
- // ` Topic: ${info.topic}\n` +
- // ` Timestamp: ${info.timestamp}\n` +
- // ` ID: ${info.id}\n` +
- // ` Size: ${info.size}`, // Optional, only available if the stream was sent with `sendText`
- // );
+ const lkTextStreamHandlerFunction = async (
+ reader: TextStreamReader,
+ participantInfo: { identity: string },
+ livekitRoom: LivekitRoom,
+ ): Promise => {
+ const info = reader.info;
+ logger.info(
+ `Received text stream from ${participantInfo.identity}\n` +
+ ` Topic: ${info.topic}\n` +
+ ` Timestamp: ${info.timestamp}\n` +
+ ` ID: ${info.id}\n` +
+ ` Size: ${info.size}`, // Optional, only available if the stream was sent with `sendText`
+ );
- // const participants = callViewModel.livekitRoomItems$.value.find(
- // (i) => i.livekitRoom === livekitRoom,
- // )?.participants;
- // if (participants && participants.includes(participantInfo.identity)) {
- // const text = await reader.readAll();
- // console.log(`Received text: ${text}`);
- // data$.next({ sender: participantInfo.identity, data: text });
- // } else {
- // logger.warn(
- // "Received text from unknown participant",
- // participantInfo.identity,
- // );
- // }
- // };
+ const participants = callViewModel.livekitRoomItems$.value.find(
+ (i) => i.livekitRoom === livekitRoom,
+ )?.participants;
+ if (participants && participants.includes(participantInfo.identity)) {
+ const text = await reader.readAll();
+ logger.info(`Received text: ${text}`);
+ data$.next({ sender: participantInfo.identity, data: text });
+ } else {
+ logger.warn(
+ "Received text from unknown participant",
+ participantInfo.identity,
+ );
+ }
+ };
- // const livekitRoomItemsSub = callViewModel.livekitRoomItems$
- // .pipe(currentAndPrev)
- // .subscribe({
- // next: ({ prev, current }) => {
- // const prevRooms = prev.map((i) => i.livekitRoom);
- // const currentRooms = current.map((i) => i.livekitRoom);
- // const addedRooms = currentRooms.filter((r) => !prevRooms.includes(r));
- // const removedRooms = prevRooms.filter((r) => !currentRooms.includes(r));
- // addedRooms.forEach((r) =>
- // r.registerTextStreamHandler(
- // TEXT_LK_TOPIC,
- // (reader, participantInfo) =>
- // void lkTextStreamHandlerFunction(reader, participantInfo, r),
- // ),
- // );
- // removedRooms.forEach((r) =>
- // r.unregisterTextStreamHandler(TEXT_LK_TOPIC),
- // );
- // },
- // complete: () => {
- // logger.info("Livekit room items subscription completed");
- // for (const item of callViewModel.livekitRoomItems$.value) {
- // logger.info("unregistering room item from room", item.url);
- // item.livekitRoom.unregisterTextStreamHandler(TEXT_LK_TOPIC);
- // }
- // },
- // });
+ const livekitRoomItemsSub = callViewModel.livekitRoomItems$
+ .pipe(
+ tap((beforecurrentAndPrev) => {
+ logger.info(
+ `LiveKit room items updated: ${beforecurrentAndPrev.length}`,
+ beforecurrentAndPrev,
+ );
+ }),
+ currentAndPrev,
+ tap((aftercurrentAndPrev) => {
+ logger.info(
+ `LiveKit room items updated: ${aftercurrentAndPrev.current.length}, ${aftercurrentAndPrev.prev.length}`,
+ aftercurrentAndPrev,
+ );
+ }),
+ )
+ .subscribe({
+ next: ({ prev, current }) => {
+ const prevRooms = prev.map((i) => i.livekitRoom);
+ const currentRooms = current.map((i) => i.livekitRoom);
+ const addedRooms = currentRooms.filter((r) => !prevRooms.includes(r));
+ const removedRooms = prevRooms.filter((r) => !currentRooms.includes(r));
+ addedRooms.forEach((r) => {
+ logger.info(`Registering text stream handler for room `);
+ r.registerTextStreamHandler(
+ TEXT_LK_TOPIC,
+ (reader, participantInfo) =>
+ void lkTextStreamHandlerFunction(reader, participantInfo, r),
+ );
+ });
+ removedRooms.forEach((r) => {
+ logger.info(`Unregistering text stream handler for room `);
+ r.unregisterTextStreamHandler(TEXT_LK_TOPIC);
+ });
+ },
+ complete: () => {
+ logger.info("Livekit room items subscription completed");
+ for (const item of callViewModel.livekitRoomItems$.value) {
+ logger.info("unregistering room item from room", item.url);
+ item.livekitRoom.unregisterTextStreamHandler(TEXT_LK_TOPIC);
+ }
+ },
+ });
// create sendData function
- // const sendFn: Behavior<(data: string) => Promise> =
- // scope.behavior(
- // callViewModel.localMatrixLivekitMember$.pipe(
- // switchMap((m) => {
- // if (!m)
- // return of((data: string): never => {
- // throw Error("local membership not yet ready.");
- // });
- // return m.participant$.pipe(
- // map((p) => {
- // if (p === null) {
- // return (data: string): never => {
- // throw Error("local participant not yet ready to send data.");
- // };
- // } else {
- // return async (data: string): Promise =>
- // p.sendText(data, { topic: TEXT_LK_TOPIC });
- // }
- // }),
- // );
- // }),
- // ),
- // );
+ const sendFn: Behavior<(data: string) => Promise> =
+ scope.behavior(
+ callViewModel.localmatrixLivekitMembers$.pipe(
+ switchMap((m) => {
+ if (!m)
+ return of((data: string): never => {
+ throw Error("local membership not yet ready.");
+ });
+ return m.participant$.pipe(
+ map((p) => {
+ if (p === null) {
+ return (data: string): never => {
+ throw Error("local participant not yet ready to send data.");
+ };
+ } else {
+ return async (data: string): Promise =>
+ p.sendText(data, { topic: TEXT_LK_TOPIC });
+ }
+ }),
+ );
+ }),
+ ),
+ );
- // const sendData = async (data: Record): Promise => {
- // const dataString = JSON.stringify(data);
- // try {
- // const info = await sendFn.value(dataString);
- // logger.info(`Sent text with stream ID: ${info.id}`);
- // } catch (e) {
- // console.error("failed sending: ", dataString, e);
- // }
- // };
+ const sendData = async (data: unknown): Promise => {
+ const dataString = JSON.stringify(data);
+ logger.info("try sending: ", dataString);
+ try {
+ await Promise.resolve();
+ const info = await sendFn.value(dataString);
+ logger.info(`Sent text with stream ID: ${info.id}`);
+ } catch (e) {
+ logger.error("failed sending: ", dataString, e);
+ }
+ };
// after hangup gets called
const leaveSubs = callViewModel.leave$.subscribe(() => {
@@ -202,9 +220,9 @@ export async function createMatrixRTCSdk(): Promise {
leave: (): void => {
callViewModel.hangup();
leaveSubs.unsubscribe();
- // livekitRoomItemsSub.unsubscribe();
+ livekitRoomItemsSub.unsubscribe();
},
data$,
- // sendData,
+ sendData,
};
}
diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts
index 253eb05e..a4738f77 100644
--- a/src/state/CallViewModel/CallViewModel.ts
+++ b/src/state/CallViewModel/CallViewModel.ts
@@ -264,7 +264,7 @@ export interface CallViewModel {
livekitRoomItems$: Behavior;
/** use the layout instead, this is just for the godot export. */
userMedia$: Behavior;
- localMatrixLivekitMember$: Behavior;
+ localmatrixLivekitMembers$: Behavior;
/** List of participants raising their hand */
handsRaised$: Behavior>;
/** List of reactions. Keys are: membership.membershipId (currently predefined as: `${membershipEvent.userId}:${membershipEvent.deviceId}`)*/
@@ -449,7 +449,7 @@ export function createCallViewModel$(
logger: logger,
});
- const matrixLivekitMembers$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: scope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
@@ -515,7 +515,7 @@ export function createCallViewModel$(
userId: userId,
};
- const localMatrixLivekitMember$: Behavior =
+ const localmatrixLivekitMembers$: Behavior =
scope.behavior(
localRtcMembership$.pipe(
switchMap((membership) => {
@@ -607,8 +607,11 @@ export function createCallViewModel$(
const reconnecting$ = localMembership.reconnecting$;
const pretendToBeDisconnected$ = reconnecting$;
- const audioParticipants$ = scope.behavior(
+ const livekitRoomItems$ = scope.behavior(
matrixLivekitMembers$.pipe(
+ tap((val) => {
+ logger.debug("matrixLivekitMembers$ updated", val.value);
+ }),
switchMap((membersWithEpoch) => {
const members = membersWithEpoch.value;
const a$ = combineLatest(
@@ -649,6 +652,12 @@ export function createCallViewModel$(
return acc;
}, []),
),
+ tap((val) => {
+ logger.debug(
+ "livekitRoomItems$ updated",
+ val.map((v) => v.url),
+ );
+ }),
),
[],
);
@@ -676,7 +685,7 @@ export function createCallViewModel$(
*/
const userMedia$ = scope.behavior(
combineLatest([
- localMatrixLivekitMember$,
+ localmatrixLivekitMembers$,
matrixLivekitMembers$,
duplicateTiles.value$,
]).pipe(
@@ -1489,8 +1498,7 @@ export function createCallViewModel$(
),
participantCount$: participantCount$,
- livekitRoomItems$: audioParticipants$,
-
+ livekitRoomItems$,
handsRaised$: handsRaised$,
reactions$: reactions$,
joinSoundEffect$: joinSoundEffect$,
@@ -1510,7 +1518,7 @@ export function createCallViewModel$(
pip$: pip$,
layout$: layout$,
userMedia$,
- localMatrixLivekitMember$,
+ localmatrixLivekitMembers$,
tileStoreGeneration$: tileStoreGeneration$,
showSpotlightIndicators$: showSpotlightIndicators$,
showSpeakingIndicators$: showSpeakingIndicators$,
diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts
index 11f35424..2508637e 100644
--- a/src/state/CallViewModel/localMember/Publisher.ts
+++ b/src/state/CallViewModel/localMember/Publisher.ts
@@ -56,15 +56,15 @@ export class Publisher {
devices: MediaDevices,
private readonly muteStates: MuteStates,
trackerProcessorState$: Behavior,
- private logger?: Logger,
+ private logger: Logger,
) {
- this.logger?.info("[PublishConnection] Create LiveKit room");
+ this.logger.info("[PublishConnection] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams();
const room = connection.livekitRoom;
room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => {
- this.logger?.error("Failed to set E2EE enabled on room", e);
+ this.logger.error("Failed to set E2EE enabled on room", e);
});
// Setup track processor syncing (blur)
@@ -74,7 +74,7 @@ export class Publisher {
this.workaroundRestartAudioInputTrackChrome(devices, scope);
this.scope.onEnd(() => {
- this.logger?.info(
+ this.logger.info(
"[PublishConnection] Scope ended -> stop publishing all tracks",
);
void this.stopPublishing();
@@ -132,13 +132,14 @@ export class Publisher {
video,
})
.catch((error) => {
- this.logger?.error("Failed to create tracks", error);
+ this.logger.error("Failed to create tracks", error);
})) ?? [];
}
return this.tracks;
}
public async startPublishing(): Promise {
+ this.logger.info("Start publishing");
const lkRoom = this.connection.livekitRoom;
const { promise, resolve, reject } = Promise.withResolvers();
const sub = this.connection.state$.subscribe((s) => {
@@ -150,7 +151,7 @@ export class Publisher {
reject(new Error("Failed to connect to LiveKit server"));
break;
default:
- this.logger?.info("waiting for connection: ", s.state);
+ this.logger.info("waiting for connection: ", s.state);
}
});
try {
@@ -160,12 +161,14 @@ export class Publisher {
} finally {
sub.unsubscribe();
}
+ this.logger.info("publish ", this.tracks.length, "tracks");
for (const track of this.tracks) {
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
// with a timeout.
await lkRoom.localParticipant.publishTrack(track).catch((error) => {
- this.logger?.error("Failed to publish track", error);
+ this.logger.error("Failed to publish track", error);
});
+ this.logger.info("published track ", track.kind, track.id);
// TODO: check if the connection is still active? and break the loop if not?
}
@@ -229,7 +232,7 @@ export class Publisher {
.getTrackPublication(Track.Source.Microphone)
?.audioTrack?.restartTrack()
.catch((e) => {
- this.logger?.error(`Failed to restart audio device track`, e);
+ this.logger.error(`Failed to restart audio device track`, e);
});
}
});
@@ -249,7 +252,7 @@ export class Publisher {
selected$.pipe(scope.bind()).subscribe((device) => {
if (lkRoom.state != LivekitConnectionState.Connected) return;
// if (this.connectionState$.value !== ConnectionState.Connected) return;
- this.logger?.info(
+ this.logger.info(
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
lkRoom.getActiveDevice(kind),
" !== ",
@@ -262,7 +265,7 @@ export class Publisher {
lkRoom
.switchActiveDevice(kind, device.id)
.catch((e: Error) =>
- this.logger?.error(
+ this.logger.error(
`Failed to sync ${kind} device with LiveKit`,
e,
),
@@ -287,10 +290,7 @@ export class Publisher {
try {
await lkRoom.localParticipant.setMicrophoneEnabled(desired);
} catch (e) {
- this.logger?.error(
- "Failed to update LiveKit audio input mute state",
- e,
- );
+ this.logger.error("Failed to update LiveKit audio input mute state", e);
}
return lkRoom.localParticipant.isMicrophoneEnabled;
});
@@ -298,10 +298,7 @@ export class Publisher {
try {
await lkRoom.localParticipant.setCameraEnabled(desired);
} catch (e) {
- this.logger?.error(
- "Failed to update LiveKit video input mute state",
- e,
- );
+ this.logger.error("Failed to update LiveKit video input mute state", e);
}
return lkRoom.localParticipant.isCameraEnabled;
});
diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts
index 3f58bcf6..f719e86b 100644
--- a/src/state/CallViewModel/remoteMembers/Connection.test.ts
+++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts
@@ -382,17 +382,15 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers();
const danIsAPublisher = Promise.withResolvers();
const observedPublishers: PublishingParticipant[][] = [];
- const s = connection.remoteParticipantsWithTracks$.subscribe(
- (publishers) => {
- observedPublishers.push(publishers);
- if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
- bobIsAPublisher.resolve();
- }
- if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
- danIsAPublisher.resolve();
- }
- },
- );
+ const s = connection.remoteParticipants$.subscribe((publishers) => {
+ observedPublishers.push(publishers);
+ if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
+ bobIsAPublisher.resolve();
+ }
+ if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
+ danIsAPublisher.resolve();
+ }
+ });
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
@@ -437,11 +435,9 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = [];
- const s = connection.remoteParticipantsWithTracks$.subscribe(
- (publishers) => {
- observedPublishers.push(publishers);
- },
- );
+ const s = connection.remoteParticipants$.subscribe((publishers) => {
+ observedPublishers.push(publishers);
+ });
onTestFinished(() => s.unsubscribe());
let participants: RemoteParticipant[] = [
diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts
index c17fae2b..fd75e551 100644
--- a/src/state/CallViewModel/remoteMembers/Connection.ts
+++ b/src/state/CallViewModel/remoteMembers/Connection.ts
@@ -19,7 +19,7 @@ import {
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
-import { BehaviorSubject, map, type Observable } from "rxjs";
+import { BehaviorSubject, type Observable } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import {
@@ -146,6 +146,10 @@ export class Connection {
transport: this.transport,
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
});
+ this.logger.info(
+ "Connected to LiveKit room",
+ this.transport.livekit_service_url,
+ );
} catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({
@@ -189,9 +193,7 @@ export class Connection {
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
- public readonly remoteParticipantsWithTracks$: Behavior<
- PublishingParticipant[]
- >;
+ public readonly remoteParticipants$: Behavior;
/**
* The media transport to connect to.
@@ -213,7 +215,7 @@ export class Connection {
public constructor(opts: ConnectionOpts, logger: Logger) {
this.logger = logger.getChild("[Connection]");
this.logger.info(
- `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
+ `Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope } = opts;
@@ -223,20 +225,21 @@ export class Connection {
// REMOTE participants with track!!!
// this.remoteParticipantsWithTracks$
- this.remoteParticipantsWithTracks$ = scope.behavior(
+ this.remoteParticipants$ = scope.behavior(
// only tracks remote participants
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
RoomEvent.TrackPublished,
RoomEvent.TrackUnpublished,
],
- }).pipe(
- map((participants) => {
- return participants.filter(
- (participant) => participant.getTrackPublications().length > 0,
- );
- }),
- ),
+ }),
+ // .pipe(
+ // map((participants) => {
+ // return participants.filter(
+ // (participant) => participant.getTrackPublications().length > 0,
+ // );
+ // }),
+ // )
[],
);
diff --git a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts
index f58fcb76..0fb0b5a7 100644
--- a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts
+++ b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts
@@ -13,7 +13,7 @@ import {
type BaseKeyProvider,
} from "livekit-client";
import { type Logger } from "matrix-js-sdk/lib/logger";
-import E2EEWorker from "livekit-client/e2ee-worker?worker";
+import E2EEWorker from "livekit-client/e2ee-worker?worker&inline";
import { type ObservableScope } from "../../ObservableScope.ts";
import { Connection } from "./Connection.ts";
diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts
index 484a44e7..b5076285 100644
--- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts
+++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts
@@ -289,47 +289,47 @@ describe("connectionManagerData$ stream", () => {
a: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
- expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0);
- expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
+ expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(0);
+ expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
return true;
}),
b: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
- expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
- expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
- expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
- "user1A",
- );
+ expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
+ expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
+ ).toBe("user1A");
return true;
}),
c: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
- expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
- expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
- expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
- "user1A",
- );
- expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
- "user2A",
- );
+ expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
+ expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
+ ).toBe("user1A");
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
+ ).toBe("user2A");
return true;
}),
d: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
- expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2);
- expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
- expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
- "user1A",
- );
- expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe(
- "user1B",
- );
- expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
- "user2A",
- );
+ expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(2);
+ expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
+ ).toBe("user1A");
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_1)[1].identity,
+ ).toBe("user1B");
+ expect(
+ data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
+ ).toBe("user2A");
return true;
}),
});
diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts
index d9a0380e..bd07cfa1 100644
--- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts
+++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts
@@ -24,7 +24,10 @@ import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<
string,
- [Connection, (LocalParticipant | RemoteParticipant)[]]
+ {
+ connection: Connection;
+ participants: (LocalParticipant | RemoteParticipant)[];
+ }
> = new Map();
public constructor() {}
@@ -36,9 +39,9 @@ export class ConnectionManagerData {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
- this.store.set(key, [connection, participants]);
+ this.store.set(key, { connection, participants });
} else {
- existing[1].push(...participants);
+ existing.participants.push(...participants);
}
}
@@ -47,25 +50,26 @@ export class ConnectionManagerData {
}
public getConnections(): Connection[] {
- return Array.from(this.store.values()).map(([connection]) => connection);
+ return Array.from(this.store.values()).map(({ connection }) => connection);
}
public getConnectionForTransport(
transport: LivekitTransport,
): Connection | null {
- return this.store.get(this.getKey(transport))?.[0] ?? null;
+ return this.store.get(this.getKey(transport))?.connection ?? null;
}
- public getParticipantForTransport(
+ public getParticipantsForTransport(
transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key);
if (existing) {
- return existing[1];
+ return existing.participants;
}
return [];
}
+
/**
* Get all connections where the given participant is publishing.
* In theory, there could be several connections where the same participant is publishing but with
@@ -76,8 +80,12 @@ export class ConnectionManagerData {
participantId: ParticipantId,
): Connection[] {
const connections: Connection[] = [];
- for (const [connection, participants] of this.store.values()) {
- if (participants.some((p) => p.identity === participantId)) {
+ for (const { connection, participants } of this.store.values()) {
+ if (
+ participants.some(
+ (participant) => participant?.identity === participantId,
+ )
+ ) {
connections.push(connection);
}
}
@@ -183,23 +191,24 @@ export function createConnectionManager$({
const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
- const listOfConnectionsWithPublishingParticipants =
- connections.value.map((connection) => {
- return connection.remoteParticipantsWithTracks$.pipe(
+ const listOfConnectionsWithParticipants = connections.value.map(
+ (connection) => {
+ return connection.remoteParticipants$.pipe(
map((participants) => ({
connection,
participants,
})),
);
- });
+ },
+ );
// probably not required
- if (listOfConnectionsWithPublishingParticipants.length === 0) {
+ if (listOfConnectionsWithParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData
- return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
+ return combineLatest(listOfConnectionsWithParticipants).pipe(
map(
(lists) =>
new Epoch(
diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts
index e675f723..7547a68b 100644
--- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts
+++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts
@@ -91,7 +91,7 @@ test("should signal participant not yet connected to livekit", () => {
}),
);
- const matrixLivekitMember$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -99,21 +99,24 @@ test("should signal participant not yet connected to livekit", () => {
} as unknown as IConnectionManager,
});
- expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
- a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
- expect(data.length).toEqual(1);
- expectObservable(data[0].membership$).toBe("a", {
- a: bobMembership,
- });
- expectObservable(data[0].participant$).toBe("a", {
- a: null,
- });
- expectObservable(data[0].connection$).toBe("a", {
- a: null,
- });
- return true;
- }),
- });
+ expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
+ "a",
+ {
+ a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
+ expect(data.length).toEqual(1);
+ expectObservable(data[0].membership$).toBe("a", {
+ a: bobMembership,
+ });
+ expectObservable(data[0].participant$).toBe("a", {
+ a: null,
+ });
+ expectObservable(data[0].connection$).toBe("a", {
+ a: null,
+ });
+ return true;
+ }),
+ },
+ );
});
});
@@ -171,7 +174,7 @@ test("should signal participant on a connection that is publishing", () => {
}),
);
- const matrixLivekitMember$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -179,25 +182,28 @@ test("should signal participant on a connection that is publishing", () => {
} as unknown as IConnectionManager,
});
- expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
- a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
- expect(data.length).toEqual(1);
- expectObservable(data[0].membership$).toBe("a", {
- a: bobMembership,
- });
- expectObservable(data[0].participant$).toBe("a", {
- a: expect.toSatisfy((participant) => {
- expect(participant).toBeDefined();
- expect(participant!.identity).toEqual(bobParticipantId);
- return true;
- }),
- });
- expectObservable(data[0].connection$).toBe("a", {
- a: connection,
- });
- return true;
- }),
- });
+ expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
+ "a",
+ {
+ a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
+ expect(data.length).toEqual(1);
+ expectObservable(data[0].membership$).toBe("a", {
+ a: bobMembership,
+ });
+ expectObservable(data[0].participant$).toBe("a", {
+ a: expect.toSatisfy((participant) => {
+ expect(participant).toBeDefined();
+ expect(participant!.identity).toEqual(bobParticipantId);
+ return true;
+ }),
+ });
+ expectObservable(data[0].connection$).toBe("a", {
+ a: connection,
+ });
+ return true;
+ }),
+ },
+ );
});
});
@@ -222,7 +228,7 @@ test("should signal participant on a connection that is not publishing", () => {
}),
);
- const matrixLivekitMember$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -230,21 +236,24 @@ test("should signal participant on a connection that is not publishing", () => {
} as unknown as IConnectionManager,
});
- expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
- a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
- expect(data.length).toEqual(1);
- expectObservable(data[0].membership$).toBe("a", {
- a: bobMembership,
- });
- expectObservable(data[0].participant$).toBe("a", {
- a: null,
- });
- expectObservable(data[0].connection$).toBe("a", {
- a: connection,
- });
- return true;
- }),
- });
+ expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
+ "a",
+ {
+ a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
+ expect(data.length).toEqual(1);
+ expectObservable(data[0].membership$).toBe("a", {
+ a: bobMembership,
+ });
+ expectObservable(data[0].participant$).toBe("a", {
+ a: null,
+ });
+ expectObservable(data[0].connection$).toBe("a", {
+ a: connection,
+ });
+ return true;
+ }),
+ },
+ );
});
});
@@ -283,7 +292,7 @@ describe("Publication edge case", () => {
}),
);
- const matrixLivekitMember$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -293,7 +302,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
- expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
+ expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
@@ -349,7 +358,7 @@ describe("Publication edge case", () => {
}),
);
- const matrixLivekitMember$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -359,7 +368,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
- expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
+ expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts
index 2f152630..72e2883a 100644
--- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts
+++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts
@@ -61,12 +61,12 @@ export function createMatrixLivekitMembers$({
scope,
membershipsWithTransport$,
connectionManager,
-}: Props): Behavior> {
+}: Props): { matrixLivekitMembers$: Behavior> } {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
- return scope.behavior(
+ const matrixLivekitMembers$ = scope.behavior(
combineLatest([
membershipsWithTransport$,
connectionManager.connectionManagerData$,
@@ -91,7 +91,7 @@ export function createMatrixLivekitMembers$({
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
const participants = transport
- ? managerData.getParticipantForTransport(transport)
+ ? managerData.getParticipantsForTransport(transport)
: [];
const participant =
participants.find((p) => p.identity == participantId) ?? null;
@@ -121,6 +121,11 @@ export function createMatrixLivekitMembers$({
),
),
);
+ return {
+ matrixLivekitMembers$,
+ // TODO add only publishing participants... maybe. disucss at least
+ // scope.behavior(matrixLivekitMembers$.pipe(map((items) => items.value.map((i)=>{ i.}))))
+ };
}
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)
diff --git a/src/state/CallViewModel/remoteMembers/integration.test.ts b/src/state/CallViewModel/remoteMembers/integration.test.ts
index e3aa6be8..cafffb38 100644
--- a/src/state/CallViewModel/remoteMembers/integration.test.ts
+++ b/src/state/CallViewModel/remoteMembers/integration.test.ts
@@ -124,14 +124,14 @@ test("bob, carl, then bob joining no tracks yet", () => {
logger: logger,
});
- const matrixLivekitItems$ = createMatrixLivekitMembers$({
+ const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager,
});
- expectObservable(matrixLivekitItems$).toBe(vMarble, {
+ expectObservable(matrixLivekitMembers$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch) => {
const items = e.value;
expect(items.length).toBe(1);