diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index f634233e..a81eca6a 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -69,7 +69,11 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "./MediaViewModel"; -import { accumulate, finalizeValue } from "../utils/observable"; +import { + accumulate, + finalizeValue, + singleSubscriberSubject$, +} from "../utils/observable"; import { ObservableScope } from "./ObservableScope"; import { duplicateTiles, @@ -316,7 +320,7 @@ class UserMedia { newParticipant: LocalParticipant | RemoteParticipant | undefined, ): void { if (this.participant$.value !== newParticipant) { - // Update the BehaviourSubject in the UserMedia. + // Update the BehaviorSubject in the UserMedia. this.participant$.next(newParticipant); } } @@ -469,33 +473,40 @@ export class CallViewModel extends ViewModel { * any displaynames that clashes with another member. Only members * joined to the call are considered here. */ - public readonly memberDisplaynames$ = merge( - // Handle call membership changes. - fromEvent(this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged), - // Handle room membership changes (and displayname updates) - fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members), - ).pipe( - startWith(null), - map(() => { - const displaynameMap = new Map(); - const { room, memberships } = this.matrixRTCSession; + // eslint-disable-next-line rxjs/no-exposed-subjects + public readonly memberDisplaynames$ = singleSubscriberSubject$( + merge( + fromEvent(this.matrixRTCSession.room, RoomStateEvent.Members), + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ), + ).pipe( + startWith(null), + map(() => { + const displaynameMap = new Map(); + const { room, memberships } = this.matrixRTCSession; - // We only consider RTC members for disambiguation as they are the only visible members. - for (const rtcMember of memberships) { - const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; - const { member } = getRoomMemberFromRtcMember(rtcMember, room); - if (!member) { - logger.error("Could not find member for media id:", matrixIdentifier); - continue; + // We only consider RTC members for disambiguation as they are the only visible members. + for (const rtcMember of memberships) { + const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`; + const { member } = getRoomMemberFromRtcMember(rtcMember, room); + if (!member) { + logger.error( + "Could not find member for media id:", + matrixIdentifier, + ); + continue; + } + const disambiguate = shouldDisambiguate(member, memberships, room); + displaynameMap.set( + matrixIdentifier, + calculateDisplayName(member, disambiguate), + ); } - const disambiguate = shouldDisambiguate(member, memberships, room); - displaynameMap.set( - matrixIdentifier, - calculateDisplayName(member, disambiguate), - ); - } - return displaynameMap; - }), + return displaynameMap; + }), + ), ); /** @@ -569,8 +580,8 @@ export class CallViewModel extends ViewModel { yield [ indexedMediaId, // We create UserMedia with or without a participant. - // This will be the initial value of a BehaviourSubject. - // Once a participant appears we will update the BehaviourSubject. (see above) + // This will be the initial value of a BehaviorSubject. + // Once a participant appears we will update the BehaviorSubject. (see above) prevMedia ?? new UserMedia( indexedMediaId, @@ -579,7 +590,7 @@ export class CallViewModel extends ViewModel { this.encryptionSystem, this.livekitRoom, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixIdentifier) ?? "[👻]"), + map((m) => m?.get(matrixIdentifier) ?? "[👻]"), ), this.handsRaised$.pipe( map((v) => v[matrixIdentifier]?.time ?? null), @@ -602,7 +613,7 @@ export class CallViewModel extends ViewModel { this.encryptionSystem, this.livekitRoom, this.memberDisplaynames$.pipe( - map((m) => m.get(matrixIdentifier) ?? "[👻]"), + map((m) => m?.get(matrixIdentifier) ?? "[👻]"), ), ), ]; @@ -643,7 +654,9 @@ export class CallViewModel extends ViewModel { this.encryptionSystem, this.livekitRoom, this.memberDisplaynames$.pipe( - map((m) => m.get(participant.identity) ?? "[👻]"), + map( + (m) => m?.get(participant.identity) ?? "[👻]", + ), ), of(null), of(null), diff --git a/src/utils/observable.ts b/src/utils/observable.ts index d0d8ec71..bcd58f24 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -5,7 +5,15 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type Observable, defer, finalize, scan, startWith, tap } from "rxjs"; +import { + BehaviorSubject, + type Observable, + defer, + finalize, + scan, + startWith, + tap, +} from "rxjs"; const nothing = Symbol("nothing"); @@ -38,3 +46,16 @@ export function accumulate( return (events$: Observable): Observable => events$.pipe(scan(update, initial), startWith(initial)); } + +/** + * A constructor that takes a source Observable and returns a BehaviorSubject. + * This makes sure we only subscribe to the source once. This is useful when using fromEvent. + */ +export function singleSubscriberSubject$( + observable$: Observable, + initial?: T, +): BehaviorSubject { + const subject$ = new BehaviorSubject(initial); + observable$.subscribe(subject$); + return subject$; +}