Prove that the remote members modules only output remote members

They had loose types that were allowing them also output local members. They don't do this, it's just misleading.
This commit is contained in:
Robin
2025-12-08 23:01:44 -05:00
parent cc8e250d96
commit 47cd343d44
4 changed files with 33 additions and 47 deletions

View File

@@ -32,7 +32,6 @@ import {
Connection,
type ConnectionOpts,
type ConnectionState,
type PublishingParticipant,
} from "./Connection.ts";
import { ObservableScope } from "../../ObservableScope.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
@@ -381,7 +380,7 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = [];
const observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
@@ -394,7 +393,7 @@ describe("Publishing participants observations", () => {
},
);
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the
// The remoteParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection.
@@ -436,7 +435,7 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = [];
let observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);

View File

@@ -14,7 +14,6 @@ import {
ConnectionError,
type ConnectionState as LivekitConenctionState,
type Room as LivekitRoom,
type LocalParticipant,
type RemoteParticipant,
RoomEvent,
} from "livekit-client";
@@ -34,8 +33,6 @@ import {
SFURoomCreationRestrictedError,
} from "../../../utils/errors.ts";
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
export interface ConnectionOpts {
/** The media transport to connect to. */
transport: LivekitTransport;
@@ -89,9 +86,7 @@ export class Connection {
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly remoteParticipantsWithTracks$: Behavior<
PublishingParticipant[]
>;
public readonly remoteParticipantsWithTracks$: Behavior<RemoteParticipant[]>;
/**
* Whether the connection has been stopped.

View File

@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { type RemoteParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
@@ -201,23 +201,20 @@ describe("connections$ stream", () => {
describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' remoteParticipantsWithTracks$ streams
let fakePublishingParticipantsStreams: Map<
string,
Behavior<LivekitParticipant[]>
>;
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
function keyForTransport(transport: LivekitTransport): string {
return `${transport.livekit_service_url}|${transport.livekit_alias}`;
}
beforeEach(() => {
fakePublishingParticipantsStreams = new Map();
fakeRemoteParticipantsStreams = new Map();
function getPublishingParticipantsFor(
function getRemoteParticipantsFor(
transport: LivekitTransport,
): Behavior<LivekitParticipant[]> {
): Behavior<RemoteParticipant[]> {
return (
fakePublishingParticipantsStreams.get(keyForTransport(transport)) ??
fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
new BehaviorSubject([])
);
}
@@ -227,13 +224,12 @@ describe("connectionManagerData$ stream", () => {
.fn()
.mockImplementation(
(transport: LivekitTransport, scope: ObservableScope) => {
const fakePublishingParticipants$ = new BehaviorSubject<
LivekitParticipant[]
const fakeRemoteParticipants$ = new BehaviorSubject<
RemoteParticipant[]
>([]);
const mockConnection = {
transport,
remoteParticipantsWithTracks$:
getPublishingParticipantsFor(transport),
remoteParticipantsWithTracks$: getRemoteParticipantsFor(transport),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn();
@@ -242,36 +238,36 @@ describe("connectionManagerData$ stream", () => {
void mockConnection.stop();
});
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(transport),
fakePublishingParticipants$,
fakeRemoteParticipants$,
);
return mockConnection;
},
);
});
test("Should report connections with the publishing participants", () => {
test("Should report connections with the remote participants", () => {
withTestScheduler(({ expectObservable, schedule, behavior }) => {
// Setup the fake participants streams behavior
// ==============================
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_1),
behavior("oa-b", {
o: [],
a: [{ identity: "user1A" } as LivekitParticipant],
a: [{ identity: "user1A" } as RemoteParticipant],
b: [
{ identity: "user1A" } as LivekitParticipant,
{ identity: "user1B" } as LivekitParticipant,
{ identity: "user1A" } as RemoteParticipant,
{ identity: "user1B" } as RemoteParticipant,
],
}),
);
fakePublishingParticipantsStreams.set(
fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_2),
behavior("o-a", {
o: [],
a: [{ identity: "user2A" } as LivekitParticipant],
a: [{ identity: "user2A" } as RemoteParticipant],
}),
);
// ==============================

View File

@@ -12,7 +12,7 @@ import {
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../../Behavior.ts";
import { type Connection } from "./Connection.ts";
@@ -22,17 +22,12 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<
string,
[Connection, (LocalParticipant | RemoteParticipant)[]]
> = new Map();
private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
new Map();
public constructor() {}
public add(
connection: Connection,
participants: (LocalParticipant | RemoteParticipant)[],
): void {
public add(connection: Connection, participants: RemoteParticipant[]): void {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
@@ -58,7 +53,7 @@ export class ConnectionManagerData {
public getParticipantForTransport(
transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] {
): RemoteParticipant[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key);
if (existing) {
@@ -182,23 +177,24 @@ export function createConnectionManager$({
const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants =
connections.value.map((connection) => {
const listOfConnectionsWithRemoteParticipants = connections.value.map(
(connection) => {
return connection.remoteParticipantsWithTracks$.pipe(
map((participants) => ({
connection,
participants,
})),
);
});
},
);
// probably not required
if (listOfConnectionsWithPublishingParticipants.length === 0) {
if (listOfConnectionsWithRemoteParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
map(
(lists) =>
new Epoch(