From 1fff71ace1f11dcc869fdfc301114f9f77f67d9a Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 3 Oct 2025 21:00:45 -0400 Subject: [PATCH] Actually leave the MatrixRTC session again --- src/state/CallViewModel.ts | 123 +++++++++++++++++------------------ src/state/Connection.ts | 35 +++++----- src/state/ObservableScope.ts | 38 +++++++++++ 3 files changed, 116 insertions(+), 80 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6e333bec..1a20589c 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -37,7 +37,6 @@ import { Subject, combineLatest, concat, - concatMap, distinctUntilChanged, endWith, filter, @@ -678,6 +677,9 @@ export class CallViewModel extends ViewModel { ), ); + /** + * Emits with connections whenever they should be started or stopped. + */ private readonly connectionInstructions$ = this.connections$.pipe( pairwise(), map(([prev, next]) => { @@ -688,20 +690,6 @@ export class CallViewModel extends ViewModel { return { start, stop }; }), - this.scope.share, - ); - - /** - * Emits with a connection whenever it should be started. - */ - private readonly startConnection$ = this.connectionInstructions$.pipe( - concatMap(({ start }) => start), - ); - /** - * Emits with a connection whenever it should be stopped. - */ - private readonly stopConnection$ = this.connectionInstructions$.pipe( - concatMap(({ stop }) => stop), ); public readonly allLivekitRooms$ = this.scope.behavior( @@ -1947,61 +1935,70 @@ export class CallViewModel extends ViewModel { super(); // Start and stop local and remote connections as needed - this.startConnection$.pipe(this.scope.bind()).subscribe( - (c) => - void c.start().then( - () => logger.info(`Connected to ${c.transport.livekit_service_url}`), - (e) => - logger.error( - `Failed to start connection to ${c.transport.livekit_service_url}`, - e, - ), - ), - ); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => { - logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); - c.stop(); - }); - - // Start and stop session membership as needed - this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => { - if (localTransport?.state === "ready") { - void enterRTCSession( - this.matrixRTCSession, - localTransport.value, - this.options.encryptionSystem.kind !== E2eeType.NONE, - true, - true, - multiSfu.value$.value, - ) - .catch((e) => logger.error("Error entering RTC session", e)) - .then(() => - // Update our member event when our mute state changes. - this.muteStates.video.enabled$ - .pipe(this.scope.bind(), takeUntil(this.leave$)) - // eslint-disable-next-line rxjs/no-nested-subscribe - .subscribe( - (videoEnabled) => - // TODO: Ensure that these calls are serialized in case of - // fast video toggling - void this.matrixRTCSession.updateCallIntent( - videoEnabled ? "video" : "audio", - ), + this.connectionInstructions$ + .pipe(this.scope.bind()) + .subscribe(({ start, stop }) => { + for (const c of stop) { + logger.info(`Disconnecting from ${c.transport.livekit_service_url}`); + c.stop(); + } + for (const c of start) { + c.start().then( + () => + logger.info(`Connected to ${c.transport.livekit_service_url}`), + (e) => + logger.error( + `Failed to start connection to ${c.transport.livekit_service_url}`, + e, ), ); + } + }); - return (): void => + // Start and stop session membership as needed + this.scope.reconcile(this.localTransport$, async (localTransport) => { + if (localTransport?.state === "ready") { + try { + await enterRTCSession( + this.matrixRTCSession, + localTransport.value, + this.options.encryptionSystem.kind !== E2eeType.NONE, + true, + true, + multiSfu.value$.value, + ); + } catch (e) { + logger.error("Error entering RTC session", e); + } + // Update our member event when our mute state changes. + const muteSubscription = this.muteStates.video.enabled$.subscribe( + (videoEnabled) => + // TODO: Ensure that these calls are serialized in case of + // fast video toggling + void this.matrixRTCSession.updateCallIntent( + videoEnabled ? "video" : "audio", + ), + ); + + return async (): Promise => { + muteSubscription.unsubscribe(); // Only sends Matrix leave event. The LiveKit session will disconnect // as soon as either the stopConnection$ handler above gets to it or // the view model is destroyed. - void this.matrixRTCSession - .leaveRoomSession() - .catch((e) => logger.error("Error leaving RTC session", e)) - .then(async () => - widget?.api.transport - .send(ElementWidgetActions.HangupCall, {}) - .catch((e) => logger.error("Failed to send hangup action", e)), + try { + await this.matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send( + ElementWidgetActions.HangupCall, + {}, ); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; } }); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 4908e42f..55afdacf 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -125,6 +125,24 @@ export class Connection { export class PublishConnection extends Connection { public async start(): Promise { this.stopped = false; + + this.muteStates.audio.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit audio input mute state", e); + } + return this.livekitRoom.localParticipant.isMicrophoneEnabled; + }); + this.muteStates.video.setHandler(async (desired) => { + try { + await this.livekitRoom.localParticipant.setCameraEnabled(desired); + } catch (e) { + logger.error("Failed to update LiveKit video input mute state", e); + } + return this.livekitRoom.localParticipant.isCameraEnabled; + }); + const { url, jwt } = await this.sfuConfig; if (!this.stopped) await this.livekitRoom.connect(url, jwt); @@ -213,23 +231,6 @@ export class PublishConnection extends Connection { ); trackProcessorSync(track$, trackerProcessorState$); - this.muteStates.audio.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit audio input mute state", e); - } - return this.livekitRoom.localParticipant.isMicrophoneEnabled; - }); - this.muteStates.video.setHandler(async (desired) => { - try { - await this.livekitRoom.localParticipant.setCameraEnabled(desired); - } catch (e) { - logger.error("Failed to update LiveKit video input mute state", e); - } - return this.livekitRoom.localParticipant.isCameraEnabled; - }); - const syncDevice = ( kind: MediaDeviceKind, selected$: Observable, diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 8ac816ca..08a4b859 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -7,7 +7,10 @@ Please see LICENSE in the repository root for full details. import { BehaviorSubject, + catchError, distinctUntilChanged, + EMPTY, + endWith, filter, type Observable, share, @@ -95,6 +98,41 @@ export class ObservableScope { ) .subscribe(callback); } + + // TODO-MULTI-SFU Dear Future Robin, please document this. Love, Past Robin. + public reconcile( + value$: Behavior, + callback: (value: T) => Promise<(() => Promise) | undefined>, + ): void { + let latestValue: T | typeof nothing = nothing; + let reconciledValue: T | typeof nothing = nothing; + let cleanUp: (() => Promise) | undefined = undefined; + let callbackPromise: Promise<(() => Promise) | undefined>; + value$ + .pipe( + catchError(() => EMPTY), + this.bind(), + endWith(nothing), + ) + .subscribe((value) => { + void (async (): Promise => { + if (latestValue === nothing) { + latestValue = value; + while (latestValue !== reconciledValue) { + await cleanUp?.(); + reconciledValue = latestValue; + if (latestValue !== nothing) { + callbackPromise = callback(latestValue); + cleanUp = await callbackPromise; + } + } + latestValue = nothing; + } else { + latestValue = value; + } + })(); + }); + } } /**