diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 9492b2f0..a5847f0e 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -118,7 +118,7 @@ export const ActiveCall: FC = (props) => { useEffect(() => { if (livekitRoom !== undefined) { const vm = new CallViewModel( - props.rtcSession.room, + props.rtcSession, livekitRoom, props.e2eeSystem, connStateObservable, @@ -127,7 +127,7 @@ export const ActiveCall: FC = (props) => { return (): void => vm.destroy(); } }, [ - props.rtcSession.room, + props.rtcSession, livekitRoom, props.e2eeSystem, connStateObservable, diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index db2833b8..734d2410 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -18,12 +18,9 @@ import { RemoteParticipant, Track, } from "livekit-client"; +import { Room as MatrixRoom, RoomMember } from "matrix-js-sdk/src/matrix"; import { - Room as MatrixRoom, - RoomMember, - RoomStateEvent, -} from "matrix-js-sdk/src/matrix"; -import { + BehaviorSubject, EMPTY, Observable, Subject, @@ -51,6 +48,10 @@ import { withLatestFrom, } from "rxjs"; import { logger } from "matrix-js-sdk/src/logger"; +import { + MatrixRTCSession, + MatrixRTCSessionEvent, +} from "matrix-js-sdk/src/matrixrtc"; import { ViewModel } from "./ViewModel"; import { @@ -164,28 +165,37 @@ enum SortingBin { class UserMedia { private readonly scope = new ObservableScope(); public readonly vm: UserMediaViewModel; + public participant: BehaviorSubject< + LocalParticipant | RemoteParticipant | undefined + >; + public readonly speaker: Observable; public readonly presenter: Observable; - public constructor( public readonly id: string, member: RoomMember | undefined, - participant: LocalParticipant | RemoteParticipant, + participant: LocalParticipant | RemoteParticipant | undefined, encryptionSystem: EncryptionSystem, ) { - this.vm = participant.isLocal - ? new LocalUserMediaViewModel( - id, - member, - participant as LocalParticipant, - encryptionSystem, - ) - : new RemoteUserMediaViewModel( - id, - member, - participant as RemoteParticipant, - encryptionSystem, - ); + this.participant = new BehaviorSubject(participant); + + if (participant && participant.isLocal) { + this.vm = new LocalUserMediaViewModel( + this.id, + member, + this.participant.asObservable() as Observable, + encryptionSystem, + ); + } else { + this.vm = new RemoteUserMediaViewModel( + id, + member, + this.participant.asObservable() as Observable< + RemoteParticipant | undefined + >, + encryptionSystem, + ); + } this.speaker = this.vm.speaking.pipe( // Require 1 s of continuous speaking to become a speaker, and 60 s of @@ -195,7 +205,7 @@ class UserMedia { timer(s ? 1000 : 60000), // If the speaking flag resets to its original value during this time, // end the silencing window to stick with that original value - this.vm.speaking.pipe(filter((s1) => s1 !== s)), + this.vm!.speaking.pipe(filter((s1) => s1 !== s)), ), ), startWith(false), @@ -205,13 +215,21 @@ class UserMedia { this.scope.state(), ); - this.presenter = observeParticipantEvents( - participant, - ParticipantEvent.TrackPublished, - ParticipantEvent.TrackUnpublished, - ParticipantEvent.LocalTrackPublished, - ParticipantEvent.LocalTrackUnpublished, - ).pipe(map((p) => p.isScreenShareEnabled)); + this.presenter = this.participant.pipe( + switchMap( + (p) => + (p && + observeParticipantEvents( + p, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ).pipe(map((p) => p.isScreenShareEnabled))) ?? + of(false), + ), + this.scope.state(), + ); } public destroy(): void { @@ -222,6 +240,7 @@ class UserMedia { class ScreenShare { public readonly vm: ScreenShareViewModel; + private participant: BehaviorSubject; public constructor( id: string, @@ -229,10 +248,12 @@ class ScreenShare { participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, ) { + this.participant = new BehaviorSubject(participant); + this.vm = new ScreenShareViewModel( id, member, - participant, + this.participant.asObservable(), encryptionSystem, ); } @@ -244,7 +265,7 @@ class ScreenShare { type MediaItem = UserMedia | ScreenShare; -function findMatrixMember( +function findMatrixRoomMember( room: MatrixRoom, id: string, ): RoomMember | undefined { @@ -344,8 +365,15 @@ export class CallViewModel extends ViewModel { this.remoteParticipants, observeParticipantMedia(this.livekitRoom.localParticipant), duplicateTiles.value, - // Also react to changes in the list of members - fromEvent(this.matrixRoom, RoomStateEvent.Update).pipe(startWith(null)), + // Also react to changes in the MatrixRTC session list: + fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + ).pipe(startWith(null)), + // fromEvent( + // this.matrixRTCSession, + // MatrixRTCSessionEvent.EncryptionKeyChanged, + // ).pipe(startWith(null)), ]).pipe( scan( ( @@ -354,42 +382,64 @@ export class CallViewModel extends ViewModel { ) => { const newItems = new Map( function* (this: CallViewModel): Iterable<[string, MediaItem]> { - for (const p of [localParticipant, ...remoteParticipants]) { - const id = p === localParticipant ? "local" : p.identity; - const member = findMatrixMember(this.matrixRoom, id); - if (member === undefined) - logger.warn( - `Ruh, roh! No matrix member found for SFU participant '${p.identity}': creating g-g-g-ghost!`, + for (const rtcMember of this.matrixRTCSession.memberships) { + const room = this.matrixRTCSession.room; + // WARN! This is not exactly the sender but the user defined in the state key. + // This will be available once we change to the new "member as object" format in the MatrixRTC object. + let mediaId = rtcMember.sender + ":" + rtcMember.deviceId; + let participant = undefined; + if ( + rtcMember.sender === room.client.getUserId()! && + rtcMember.deviceId === room.client.getDeviceId() + ) { + mediaId = "local"; + participant = localParticipant; + } else { + participant = remoteParticipants.find( + (p) => p.identity === mediaId, ); + } + + const member = findMatrixRoomMember(room, mediaId); - // Create as many tiles for this participant as called for by - // the duplicateTiles option for (let i = 0; i < 1 + duplicateTiles; i++) { - const userMediaId = `${id}:${i}`; + const indexedMediaId = `${mediaId}:${i}`; + const prevMedia = prevItems.get(indexedMediaId); + if (prevMedia && prevMedia instanceof UserMedia) { + if ( + prevMedia.participant.value === undefined && + participant !== undefined + ) { + // Update the BahviourSubject in the UserMedia. + prevMedia.participant.next(participant); + } + } yield [ - userMediaId, - prevItems.get(userMediaId) ?? + indexedMediaId, + // We create UserMedia with or without a participant. + // This will be the initial value of a BehaviourSubject. + // Once a participant appears we will update the BehaviourSubject. (see above) + prevMedia ?? new UserMedia( - userMediaId, + mediaId, member, - p, + participant, + this.encryptionSystem, + ), + ]; + } + if (participant && participant.isScreenShareEnabled) { + const screenShareId = `${mediaId}:screen-share`; + yield [ + screenShareId, + prevItems.get(screenShareId) ?? + new ScreenShare( + screenShareId, + member, + participant, this.encryptionSystem, ), ]; - - if (p.isScreenShareEnabled) { - const screenShareId = `${userMediaId}:screen-share`; - yield [ - screenShareId, - prevItems.get(screenShareId) ?? - new ScreenShare( - screenShareId, - member, - p, - this.encryptionSystem, - ), - ]; - } } } }.bind(this)(), @@ -432,42 +482,43 @@ export class CallViewModel extends ViewModel { distinctUntilChanged(), ); - private readonly spotlightSpeaker: Observable = - this.userMedia.pipe( - switchMap((mediaItems) => - mediaItems.length === 0 - ? of([]) - : combineLatest( - mediaItems.map((m) => - m.vm.speaking.pipe(map((s) => [m, s] as const)), - ), + private readonly spotlightSpeaker: Observable< + UserMediaViewModel | undefined + > = this.userMedia.pipe( + switchMap((mediaItems) => + mediaItems.length === 0 + ? of([]) + : combineLatest( + mediaItems.map((m) => + m.vm.speaking.pipe(map((s) => [m, s] as const)), ), - ), - scan<(readonly [UserMedia, boolean])[], UserMedia, null>( - (prev, mediaItems) => { - // Only remote users that are still in the call should be sticky - const [stickyMedia, stickySpeaking] = - (!prev?.vm.local && mediaItems.find(([m]) => m === prev)) || []; - // Decide who to spotlight: - // If the previous speaker is still speaking, stick with them rather - // than switching eagerly to someone else - return stickySpeaking - ? stickyMedia! - : // Otherwise, select any remote user who is speaking - (mediaItems.find(([m, s]) => !m.vm.local && s)?.[0] ?? - // Otherwise, stick with the person who was last speaking - stickyMedia ?? - // Otherwise, spotlight an arbitrary remote user - mediaItems.find(([m]) => !m.vm.local)?.[0] ?? - // Otherwise, spotlight the local user - mediaItems.find(([m]) => m.vm.local)![0]); - }, - null, - ), - map((speaker) => speaker.vm), - this.scope.state(), - throttleTime(1600, undefined, { leading: true, trailing: true }), - ); + ), + ), + scan<(readonly [UserMedia, boolean])[], UserMedia | undefined, null>( + (prev, mediaItems) => { + // Only remote users that are still in the call should be sticky + const [stickyMedia, stickySpeaking] = + (!prev?.vm.local && mediaItems.find(([m]) => m === prev)) || []; + // Decide who to spotlight: + // If the previous speaker is still speaking, stick with them rather + // than switching eagerly to someone else + return stickySpeaking + ? stickyMedia! + : // Otherwise, select any remote user who is speaking + (mediaItems.find(([m, s]) => !m.vm.local && s)?.[0] ?? + // Otherwise, stick with the person who was last speaking + stickyMedia ?? + // Otherwise, spotlight an arbitrary remote user + mediaItems.find(([m]) => !m.vm.local)?.[0] ?? + // Otherwise, spotlight the local user + mediaItems.find(([m]) => m.vm.local)?.[0]); + }, + null, + ), + map((speaker) => speaker?.vm), + this.scope.state(), + throttleTime(1600, undefined, { leading: true, trailing: true }), + ); private readonly grid: Observable = this.userMedia.pipe( switchMap((mediaItems) => { @@ -510,20 +561,29 @@ export class CallViewModel extends ViewModel { > = this.screenShares.pipe( map((screenShares) => screenShares.length > 0 - ? ([of(screenShares.map((m) => m.vm)), this.spotlightSpeaker] as const) + ? ([ + of(screenShares.map((m) => m.vm)), + this.spotlightSpeaker.pipe( + map((speaker) => (speaker && speaker) ?? null), + ), + ] as const) : ([ - this.spotlightSpeaker.pipe(map((speaker) => [speaker!])), + this.spotlightSpeaker.pipe( + map((speaker) => (speaker && [speaker]) ?? []), + ), this.spotlightSpeaker.pipe( switchMap((speaker) => - speaker.local - ? of(null) - : this.localUserMedia.pipe( - switchMap((vm) => - vm.alwaysShow.pipe( - map((alwaysShow) => (alwaysShow ? vm : null)), + speaker + ? speaker.local + ? of(null) + : this.localUserMedia.pipe( + switchMap((vm) => + vm.alwaysShow.pipe( + map((alwaysShow) => (alwaysShow ? vm : null)), + ), ), - ), - ), + ) + : of(null), ), ), ] as const), @@ -843,7 +903,7 @@ export class CallViewModel extends ViewModel { public constructor( // A call is permanently tied to a single Matrix room and LiveKit room - private readonly matrixRoom: MatrixRoom, + private readonly matrixRTCSession: MatrixRTCSession, private readonly livekitRoom: LivekitRoom, private readonly encryptionSystem: EncryptionSystem, private readonly connectionState: Observable, diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 51a821af..32c09257 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -68,33 +68,50 @@ export function useDisplayName(vm: MediaViewModel): string { } export function observeTrackReference( - participant: Participant, + participant: Observable, source: Track.Source, -): Observable { - return observeParticipantMedia(participant).pipe( - map(() => ({ - participant, - publication: participant.getTrackPublication(source), - source, - })), - distinctUntilKeyChanged("publication"), +): Observable { + const obs = participant.pipe( + switchMap((p) => { + if (p) { + return observeParticipantMedia(p).pipe( + map(() => ({ + participant: p, + publication: p.getTrackPublication(source), + source, + })), + distinctUntilKeyChanged("publication"), + ); + } else { + return of(undefined); + } + }), ); + return obs; } abstract class BaseMediaViewModel extends ViewModel { /** * Whether the media belongs to the local user. */ - public readonly local = this.participant.isLocal; + public readonly local = this.participant.pipe( + // We can assume, that the user is not local if the participant is undefined + // We assume the local LK participant will always be available. + map((p) => p?.isLocal ?? false), + ); /** * The LiveKit video track for this media. */ - public readonly video: Observable; + public readonly video: Observable; /** * Whether there should be a warning that this media is unencrypted. */ public readonly unencryptedWarning: Observable; + public readonly isRTCParticipantAvailable = this.participant.pipe( + map((p) => !!p), + ); + public constructor( /** * An opaque identifier for this media. @@ -106,7 +123,12 @@ abstract class BaseMediaViewModel extends ViewModel { // TODO: Fully separate the data layer from the UI layer by keeping the // member object internal public readonly member: RoomMember | undefined, - protected readonly participant: LocalParticipant | RemoteParticipant, + // We dont necassarly have a participant if a user connects via MatrixRTC but not (not yet) through + // livekit. + protected readonly participant: Observable< + LocalParticipant | RemoteParticipant | undefined + >, + encryptionSystem: EncryptionSystem, audioSource: AudioSource, videoSource: VideoSource, @@ -122,8 +144,8 @@ abstract class BaseMediaViewModel extends ViewModel { [audio, this.video], (a, v) => encryptionSystem.kind !== E2eeType.NONE && - (a.publication?.isEncrypted === false || - v.publication?.isEncrypted === false), + (a?.publication?.isEncrypted === false || + v?.publication?.isEncrypted === false), ).pipe(this.scope.state()); } } @@ -143,12 +165,20 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { /** * Whether the participant is speaking. */ - public readonly speaking = observeParticipantEvents( - this.participant, - ParticipantEvent.IsSpeakingChanged, - ).pipe( - map((p) => p.isSpeaking), - this.scope.state(), + public readonly speaking = this.participant.pipe( + switchMap((p) => { + if (p) { + return observeParticipantEvents( + p, + ParticipantEvent.IsSpeakingChanged, + ).pipe( + map((p) => p.isSpeaking), + this.scope.state(), + ); + } else { + return of(false); + } + }), ); /** @@ -169,7 +199,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public constructor( id: string, member: RoomMember | undefined, - participant: LocalParticipant | RemoteParticipant, + participant: Observable, encryptionSystem: EncryptionSystem, ) { super( @@ -181,12 +211,17 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { Track.Source.Camera, ); - const media = observeParticipantMedia(participant).pipe(this.scope.state()); + // const media = observeParticipantMedia(participant).pipe(this.scope.state()); + + const media = participant.pipe( + switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)), + this.scope.state(), + ); this.audioEnabled = media.pipe( - map((m) => m.microphoneTrack?.isMuted === false), + map((m) => m?.microphoneTrack?.isMuted === false), ); this.videoEnabled = media.pipe( - map((m) => m.cameraTrack?.isMuted === false), + map((m) => m?.cameraTrack?.isMuted === false), ); } @@ -204,7 +239,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { */ public readonly mirror = this.video.pipe( switchMap((v) => { - const track = v.publication?.track; + const track = v?.publication?.track; if (!(track instanceof LocalTrack)) return of(false); // Watch for track restarts, because they indicate a camera switch return fromEvent(track, TrackEvent.Restarted).pipe( @@ -226,7 +261,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, member: RoomMember | undefined, - participant: LocalParticipant, + participant: Observable, encryptionSystem: EncryptionSystem, ) { super(id, member, participant, encryptionSystem); @@ -286,17 +321,16 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { public constructor( id: string, member: RoomMember | undefined, - participant: RemoteParticipant, + participant: Observable, encryptionSystem: EncryptionSystem, ) { super(id, member, participant, encryptionSystem); // Sync the local volume with LiveKit - this.localVolume - .pipe(this.scope.bind()) - .subscribe((volume) => - (this.participant as RemoteParticipant).setVolume(volume), - ); + combineLatest([ + participant, + this.localVolume.pipe(this.scope.bind()), + ]).subscribe(([p, volume]) => p && p.setVolume(volume)); } public toggleLocallyMuted(): void { @@ -319,7 +353,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public constructor( id: string, member: RoomMember | undefined, - participant: LocalParticipant | RemoteParticipant, + participant: Observable, encryptionSystem: EncryptionSystem, ) { super(