The source of the local participant is the createLocalMembership$ and

not the MatrixLivekitMembers!

Co-authored-by: Valere <bill.carson@valrsoft.com>
This commit is contained in:
Timo K
2025-11-12 12:09:31 +01:00
parent 8671d3fd67
commit 9f4d954cfa
7 changed files with 130 additions and 45 deletions

View File

@@ -74,16 +74,16 @@
"matrix_id": "Matrix ID: {{id}}",
"matrixRTCMode": {
"Comptibility": {
"label": "Compatibility: state events & multi SFU",
"description": "Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)"
"description": "Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)",
"label": "Compatibility: state events & multi SFU"
},
"Legacy": {
"label": "Legacy: state events & oldest membership SFU",
"description": "Compatible with old versions of EC that do not support multi SFU"
"description": "Compatible with old versions of EC that do not support multi SFU",
"label": "Legacy: state events & oldest membership SFU"
},
"Matrix_2_0": {
"label": "Matrix 2.0: sticky events & multi SFU",
"description": "Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later"
"description": "Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later",
"label": "Matrix 2.0: sticky events & multi SFU"
}
},
"mute_all_audio": "Mute all audio (participants, reactions, join sounds)",

View File

@@ -103,7 +103,10 @@ import {
} from "../SessionBehaviors.ts";
import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts";
import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/MatrixLivekitMembers.ts";
import {
createMatrixLivekitMembers$,
type MatrixLivekitMember,
} from "./remoteMembers/MatrixLivekitMembers.ts";
import {
createCallNotificationLifecycle$,
createReceivedDecline$,
@@ -269,6 +272,38 @@ export class CallViewModel {
options: this.connectOptions$,
});
private localRtcMembership$ = this.scope.behavior(
this.memberships$.pipe(
map(
(memberships) =>
memberships.value.find(
(membership) =>
membership.userId === this.userId &&
membership.deviceId === this.deviceId,
) ?? null,
),
),
);
private localMatrixLivekitMemberUninitialized = {
membership$: this.localRtcMembership$,
participant$: this.localMembership.participant$,
connection$: this.localMembership.connection$,
userId: this.userId,
};
private localMatrixLivekitMember$: Behavior<MatrixLivekitMember | null> =
this.scope.behavior(
this.localRtcMembership$.pipe(
switchMap((membership) => {
if (!membership) return of(null);
return of(
// casting is save here since we know that localRtcMembership$ is !== null since we reached this case.
this.localMatrixLivekitMemberUninitialized as MatrixLivekitMember,
);
}),
),
);
// ------------------------------------------------------------------------
private callLifecycle = createCallNotificationLifecycle$({
@@ -283,6 +318,7 @@ export class CallViewModel {
localUser: { userId: this.userId, deviceId: this.deviceId },
});
public autoLeave$ = this.callLifecycle.autoLeave$;
// ------------------------------------------------------------------------
/**
@@ -377,12 +413,10 @@ export class CallViewModel {
),
);
private roomMembers$ = createRoomMembers$(this.scope, this.matrixRoom);
private matrixMemberMetadataStore = createMatrixMemberMetadata$(
this.scope,
this.scope.behavior(this.memberships$.pipe(map((mems) => mems.value))),
this.roomMembers$,
createRoomMembers$(this.scope, this.matrixRoom),
);
/**
@@ -390,22 +424,55 @@ export class CallViewModel {
*/
// TODO this also needs the local participant to be added.
private readonly userMedia$ = this.scope.behavior<UserMedia[]>(
combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]).pipe(
combineLatest([
this.localMatrixLivekitMember$,
this.matrixLivekitMembers$,
duplicateTiles.value$,
]).pipe(
// Generate a collection of MediaItems from the list of expected (whether
// present or missing) LiveKit participants.
generateItems(
function* ([{ value: matrixLivekitMembers }, duplicateTiles]) {
function* ([
localMatrixLivekitMember,
{ value: matrixLivekitMembers },
duplicateTiles,
]) {
// add local member if available
if (localMatrixLivekitMember) {
const {
userId,
participant$,
connection$,
// membership$,
} = localMatrixLivekitMember;
const participantId = participant$.value?.identity; // should be membership$.value.membershipID which is not optional
// const participantId = membership$.value.membershipID;
if (participantId) {
for (let dup = 0; dup < 1 + duplicateTiles; dup++) {
yield {
keys: [dup, participantId, userId, participant$, connection$],
data: undefined,
};
}
}
}
// add remote members that are available
for (const {
participantId,
userId,
participant$,
connection$,
} of matrixLivekitMembers)
for (let dup = 0; dup < 1 + duplicateTiles; dup++)
// membership$
} of matrixLivekitMembers) {
const participantId = participant$.value?.identity;
// const participantId = membership$.value?.identity;
if (!participantId) continue;
for (let dup = 0; dup < 1 + duplicateTiles; dup++) {
yield {
keys: [dup, participantId, userId, participant$, connection$],
data: undefined,
};
}
}
},
(
scope,

View File

@@ -10,6 +10,7 @@ import {
type E2EEOptions,
type Participant,
ParticipantEvent,
type LocalParticipant,
} from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core";
import {
@@ -54,6 +55,7 @@ import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts";
import { Config } from "../../../config/Config.ts";
import { type Connection } from "../remoteMembers/Connection.ts";
export enum LivekitState {
Uninitialized = "uninitialized",
@@ -82,8 +84,8 @@ type LocalMemberMatrixState =
| { state: MatrixState.Disconnected };
export interface LocalMemberConnectionState {
livekit$: BehaviorSubject<LocalMemberLivekitState>;
matrix$: BehaviorSubject<LocalMemberMatrixState>;
livekit$: Behavior<LocalMemberLivekitState>;
matrix$: Behavior<LocalMemberMatrixState>;
}
/*
@@ -145,7 +147,8 @@ export const createLocalMembership$ = ({
// Use null here since behavior cannot be initialised with undefined.
sharingScreen$: Behavior<boolean | null>;
toggleScreenSharing: (() => void) | null;
participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>;
// deprecated fields
/** @deprecated use state instead*/
homeserverConnected$: Behavior<boolean>;
@@ -317,6 +320,7 @@ export const createLocalMembership$ = ({
state.livekit$.next({ state: LivekitState.Error, error });
});
});
combineLatest([localTransport$, connectRequested$]).subscribe(
([transport, connectRequested]) => {
if (
@@ -515,6 +519,9 @@ export const createLocalMembership$ = ({
alternativeScreenshareToggle,
);
const participant$ = scope.behavior(
connection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
);
return {
startTracks,
requestConnect,
@@ -526,6 +533,8 @@ export const createLocalMembership$ = ({
configError$,
sharingScreen$,
toggleScreenSharing,
participant$,
connection$,
};
};

View File

@@ -389,15 +389,17 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = [];
const s = connection.participants$.subscribe((publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
});
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
},
);
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
@@ -513,9 +515,11 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = [];
const s = connection.participants$.subscribe((publishers) => {
observedPublishers.push(publishers);
});
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
},
);
onTestFinished(() => s.unsubscribe());
let participants: RemoteParticipant[] = [

View File

@@ -179,12 +179,13 @@ export class Connection {
}
/**
* An observable of the participants that are publishing on this connection.
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly participants$: Behavior<PublishingParticipant[]>;
public readonly remoteParticipantsWithTracks$: Behavior<
PublishingParticipant[]
>;
/**
* The media transport to connect to.
@@ -211,7 +212,9 @@ export class Connection {
this.transport = transport;
this.client = client;
this.participants$ = scope.behavior(
// REMOTE participants with track!!!
// this.remoteParticipantsWithTracks$
this.remoteParticipantsWithTracks$ = scope.behavior(
// only tracks remote participants
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
@@ -219,10 +222,11 @@ export class Connection {
RoomEvent.TrackUnpublished,
],
}).pipe(
map((participants) => [
this.livekitRoom.localParticipant,
...participants,
]),
map((participants) => {
return participants.filter(
(participant) => participant.getTrackPublications().length > 0,
);
}),
),
[],
);

View File

@@ -55,8 +55,8 @@ export class ConnectionManagerData {
public getConnectionForTransport(
transport: LivekitTransport,
): Connection | undefined {
return this.store.get(this.getKey(transport))?.[0];
): Connection | null {
return this.store.get(this.getKey(transport))?.[0] ?? null;
}
public getParticipantForTransport(
@@ -181,7 +181,7 @@ export function createConnectionManager$({
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants =
connections.value.map((connection) => {
return connection.participants$.pipe(
return connection.remoteParticipantsWithTracks$.pipe(
map((participants) => ({
connection,
participants,

View File

@@ -30,13 +30,14 @@ const logger = rootLogger.getChild("MatrixLivekitMembers");
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitMember {
participantId: string;
userId: string;
membership$: Behavior<CallMembership>;
participant$: Behavior<
LocalLivekitParticipant | RemoteLivekitParticipant | null
>;
connection$: Behavior<Connection | undefined>;
connection$: Behavior<Connection | null>;
// participantId: string; We do not want a participantId here since it will be generated by the jwt
// TODO decide if we can also drop the userId. Its in the matrix membership anyways.
userId: string;
}
interface Props {
@@ -96,7 +97,7 @@ export function createMatrixLivekitMembers$({
participants.find((p) => p.identity == participantId) ?? null;
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
: null;
yield {
keys: [participantId, membership.userId],