introduce publishingParticipants$

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-27 18:41:03 +02:00
parent a58b37fd8b
commit efa9a9cc45

View File

@@ -55,6 +55,7 @@ import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocusConfig,
LivekitFocusConfig,
type MatrixRTCSession,
MatrixRTCSessionEvent,
MembershipManagerEvent,
@@ -467,7 +468,6 @@ class Connection {
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
// await this.livekitRoom.localParticipant.enableCameraAndMicrophone();
}
}
@@ -478,17 +478,45 @@ class Connection {
this.stopped = true;
}
public readonly participants$ = this.scope.behavior(
public readonly participantsIncludingJustSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$ = (
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
memberships$,
]).pipe(
map(([participants, memberships]) => {
const publishingMembers = membershipsFocusUrl(
memberships,
this.matrixRTCSession,
)
.filter((f) => f.livekit_service_url === this.serviceUrl)
.map((f) => f.membership);
return publishingMembers
.map((m) =>
participants.find(
(p) => p.identity === `${m.sender}:${m.deviceId}`,
),
)
.filter((p): p is RemoteParticipant => !!p);
}),
),
[],
);
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
private readonly matrixRTCSession: MatrixRTCSession,
) {}
}
@@ -500,7 +528,7 @@ export class CallViewModel extends ViewModel {
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly livekitRoom = new LivekitRoom({
private readonly localConnectionLivekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
});
@@ -510,11 +538,12 @@ export class CallViewModel extends ViewModel {
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
this.livekitRoom,
this.localConnectionLivekitRoom,
focus.livekit_service_url,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
),
);
@@ -530,10 +559,9 @@ export class CallViewModel extends ViewModel {
map(
(memberships) =>
new Set(
memberships
.map((m) => this.matrixRTCSession.resolveActiveFocus(m))
.filter((f) => f !== undefined && isLivekitFocusConfig(f))
.map((f) => f.livekit_service_url),
membershipsFocusUrl(memberships, this.matrixRTCSession).map(
(f) => f.livekit_service_url,
),
),
),
);
@@ -561,6 +589,7 @@ export class CallViewModel extends ViewModel {
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
),
);
}
@@ -675,7 +704,7 @@ export class CallViewModel extends ViewModel {
private readonly remoteParticipants$ = this.scope
.behavior<
RemoteParticipant[]
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participantsIncludingJustSubscribers$, ...[...remoteConnections.values()].map((c) => c.participantsIncludingJustSubscribers$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
.pipe(pauseWhen(this.pretendToBeDisconnected$));
/**
@@ -749,7 +778,7 @@ export class CallViewModel extends ViewModel {
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([
this.remoteParticipants$,
observeParticipantMedia(this.livekitRoom.localParticipant),
observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant),
duplicateTiles.value$,
// Also react to changes in the MatrixRTC session list.
// The session list will also be update if a room membership changes.
@@ -823,7 +852,7 @@ export class CallViewModel extends ViewModel {
member,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
@@ -848,7 +877,7 @@ export class CallViewModel extends ViewModel {
member,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
@@ -890,7 +919,7 @@ export class CallViewModel extends ViewModel {
undefined,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
@@ -1746,7 +1775,7 @@ export class CallViewModel extends ViewModel {
// that our own media is displayed on screen.
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
const publications =
this.livekitRoom.localParticipant.trackPublications.values();
this.localConnectionLivekitRoom.localParticipant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
@@ -1788,3 +1817,22 @@ export class CallViewModel extends ViewModel {
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?
}
}
const membershipsFocusUrl = (
memberships: CallMembership[],
matrixRTCSession: MatrixRTCSession,
): { livekit_service_url: string; membership: CallMembership }[] => {
return memberships
.map(
(m) =>
[matrixRTCSession.resolveActiveFocus(m), m] as [
LivekitFocusConfig | undefined,
CallMembership,
],
)
.filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f))
.map(([f, m]) => ({
livekit_service_url: f!.livekit_service_url,
membership: m,
}));
};