fix multi subscription to display name ambiguity map.

This results in the map being created proportionally to the number of members in the call. Each change in memberships will call multiple resubscription to the map observable.

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-03-06 22:29:02 +01:00
parent b5f5ab329a
commit c45f6ab23f
2 changed files with 67 additions and 33 deletions

View File

@@ -69,7 +69,11 @@ import {
ScreenShareViewModel,
type UserMediaViewModel,
} from "./MediaViewModel";
import { accumulate, finalizeValue } from "../utils/observable";
import {
accumulate,
finalizeValue,
singleSubscriberSubject$,
} from "../utils/observable";
import { ObservableScope } from "./ObservableScope";
import {
duplicateTiles,
@@ -316,7 +320,7 @@ class UserMedia {
newParticipant: LocalParticipant | RemoteParticipant | undefined,
): void {
if (this.participant$.value !== newParticipant) {
// Update the BehaviourSubject in the UserMedia.
// Update the BehaviorSubject in the UserMedia.
this.participant$.next(newParticipant);
}
}
@@ -469,33 +473,40 @@ export class CallViewModel extends ViewModel {
* any displaynames that clashes with another member. Only members
* joined to the call are considered here.
*/
public readonly memberDisplaynames$ = merge(
// Handle call membership changes.
fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged),
// Handle room membership changes (and displayname updates)
fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members),
).pipe(
startWith(null),
map(() => {
const displaynameMap = new Map<string, string>();
const { room, memberships } = this.matrixRTCSession;
// eslint-disable-next-line rxjs/no-exposed-subjects
public readonly memberDisplaynames$ = singleSubscriberSubject$(
merge(
fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members),
fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
),
).pipe(
startWith(null),
map(() => {
const displaynameMap = new Map<string, string>();
const { room, memberships } = this.matrixRTCSession;
// We only consider RTC members for disambiguation as they are the only visible members.
for (const rtcMember of memberships) {
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
if (!member) {
logger.error("Could not find member for media id:", matrixIdentifier);
continue;
// We only consider RTC members for disambiguation as they are the only visible members.
for (const rtcMember of memberships) {
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
if (!member) {
logger.error(
"Could not find member for media id:",
matrixIdentifier,
);
continue;
}
const disambiguate = shouldDisambiguate(member, memberships, room);
displaynameMap.set(
matrixIdentifier,
calculateDisplayName(member, disambiguate),
);
}
const disambiguate = shouldDisambiguate(member, memberships, room);
displaynameMap.set(
matrixIdentifier,
calculateDisplayName(member, disambiguate),
);
}
return displaynameMap;
}),
return displaynameMap;
}),
),
);
/**
@@ -569,8 +580,8 @@ export class CallViewModel extends ViewModel {
yield [
indexedMediaId,
// We create UserMedia with or without a participant.
// This will be the initial value of a BehaviourSubject.
// Once a participant appears we will update the BehaviourSubject. (see above)
// This will be the initial value of a BehaviorSubject.
// Once a participant appears we will update the BehaviorSubject. (see above)
prevMedia ??
new UserMedia(
indexedMediaId,
@@ -579,7 +590,7 @@ export class CallViewModel extends ViewModel {
this.encryptionSystem,
this.livekitRoom,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
map((m) => m?.get(matrixIdentifier) ?? "[👻]"),
),
this.handsRaised$.pipe(
map((v) => v[matrixIdentifier]?.time ?? null),
@@ -602,7 +613,7 @@ export class CallViewModel extends ViewModel {
this.encryptionSystem,
this.livekitRoom,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
map((m) => m?.get(matrixIdentifier) ?? "[👻]"),
),
),
];
@@ -643,7 +654,9 @@ export class CallViewModel extends ViewModel {
this.encryptionSystem,
this.livekitRoom,
this.memberDisplaynames$.pipe(
map((m) => m.get(participant.identity) ?? "[👻]"),
map(
(m) => m?.get(participant.identity) ?? "[👻]",
),
),
of(null),
of(null),

View File

@@ -5,7 +5,15 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { type Observable, defer, finalize, scan, startWith, tap } from "rxjs";
import {
BehaviorSubject,
type Observable,
defer,
finalize,
scan,
startWith,
tap,
} from "rxjs";
const nothing = Symbol("nothing");
@@ -38,3 +46,16 @@ export function accumulate<State, Event>(
return (events$: Observable<Event>): Observable<State> =>
events$.pipe(scan(update, initial), startWith(initial));
}
/**
* A constructor that takes a source Observable and returns a BehaviorSubject.
* This makes sure we only subscribe to the source once. This is useful when using fromEvent.
*/
export function singleSubscriberSubject$<T>(
observable$: Observable<T>,
initial?: T,
): BehaviorSubject<T | undefined> {
const subject$ = new BehaviorSubject(initial);
observable$.subscribe(subject$);
return subject$;
}