Actually leave the MatrixRTC session again

This commit is contained in:
Robin
2025-10-03 21:00:45 -04:00
parent 1820cac3f6
commit 1fff71ace1
3 changed files with 116 additions and 80 deletions

View File

@@ -37,7 +37,6 @@ import {
Subject,
combineLatest,
concat,
concatMap,
distinctUntilChanged,
endWith,
filter,
@@ -678,6 +677,9 @@ export class CallViewModel extends ViewModel {
),
);
/**
* Emits with connections whenever they should be started or stopped.
*/
private readonly connectionInstructions$ = this.connections$.pipe(
pairwise(),
map(([prev, next]) => {
@@ -688,20 +690,6 @@ export class CallViewModel extends ViewModel {
return { start, stop };
}),
this.scope.share,
);
/**
* Emits with a connection whenever it should be started.
*/
private readonly startConnection$ = this.connectionInstructions$.pipe(
concatMap(({ start }) => start),
);
/**
* Emits with a connection whenever it should be stopped.
*/
private readonly stopConnection$ = this.connectionInstructions$.pipe(
concatMap(({ stop }) => stop),
);
public readonly allLivekitRooms$ = this.scope.behavior(
@@ -1947,61 +1935,70 @@ export class CallViewModel extends ViewModel {
super();
// Start and stop local and remote connections as needed
this.startConnection$.pipe(this.scope.bind()).subscribe(
(c) =>
void c.start().then(
() => logger.info(`Connected to ${c.transport.livekit_service_url}`),
(e) =>
logger.error(
`Failed to start connection to ${c.transport.livekit_service_url}`,
e,
),
),
);
this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => {
logger.info(`Disconnecting from ${c.transport.livekit_service_url}`);
c.stop();
});
// Start and stop session membership as needed
this.localTransport$.pipe(this.scope.bind()).subscribe((localTransport) => {
if (localTransport?.state === "ready") {
void enterRTCSession(
this.matrixRTCSession,
localTransport.value,
this.options.encryptionSystem.kind !== E2eeType.NONE,
true,
true,
multiSfu.value$.value,
)
.catch((e) => logger.error("Error entering RTC session", e))
.then(() =>
// Update our member event when our mute state changes.
this.muteStates.video.enabled$
.pipe(this.scope.bind(), takeUntil(this.leave$))
// eslint-disable-next-line rxjs/no-nested-subscribe
.subscribe(
(videoEnabled) =>
// TODO: Ensure that these calls are serialized in case of
// fast video toggling
void this.matrixRTCSession.updateCallIntent(
videoEnabled ? "video" : "audio",
),
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const c of stop) {
logger.info(`Disconnecting from ${c.transport.livekit_service_url}`);
c.stop();
}
for (const c of start) {
c.start().then(
() =>
logger.info(`Connected to ${c.transport.livekit_service_url}`),
(e) =>
logger.error(
`Failed to start connection to ${c.transport.livekit_service_url}`,
e,
),
);
}
});
return (): void =>
// Start and stop session membership as needed
this.scope.reconcile(this.localTransport$, async (localTransport) => {
if (localTransport?.state === "ready") {
try {
await enterRTCSession(
this.matrixRTCSession,
localTransport.value,
this.options.encryptionSystem.kind !== E2eeType.NONE,
true,
true,
multiSfu.value$.value,
);
} catch (e) {
logger.error("Error entering RTC session", e);
}
// Update our member event when our mute state changes.
const muteSubscription = this.muteStates.video.enabled$.subscribe(
(videoEnabled) =>
// TODO: Ensure that these calls are serialized in case of
// fast video toggling
void this.matrixRTCSession.updateCallIntent(
videoEnabled ? "video" : "audio",
),
);
return async (): Promise<void> => {
muteSubscription.unsubscribe();
// Only sends Matrix leave event. The LiveKit session will disconnect
// as soon as either the stopConnection$ handler above gets to it or
// the view model is destroyed.
void this.matrixRTCSession
.leaveRoomSession()
.catch((e) => logger.error("Error leaving RTC session", e))
.then(async () =>
widget?.api.transport
.send(ElementWidgetActions.HangupCall, {})
.catch((e) => logger.error("Failed to send hangup action", e)),
try {
await this.matrixRTCSession.leaveRoomSession();
} catch (e) {
logger.error("Error leaving RTC session", e);
}
try {
await widget?.api.transport.send(
ElementWidgetActions.HangupCall,
{},
);
} catch (e) {
logger.error("Failed to send hangup action", e);
}
};
}
});

View File

@@ -125,6 +125,24 @@ export class Connection {
export class PublishConnection extends Connection {
public async start(): Promise<void> {
this.stopped = false;
this.muteStates.audio.setHandler(async (desired) => {
try {
await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);
} catch (e) {
logger.error("Failed to update LiveKit audio input mute state", e);
}
return this.livekitRoom.localParticipant.isMicrophoneEnabled;
});
this.muteStates.video.setHandler(async (desired) => {
try {
await this.livekitRoom.localParticipant.setCameraEnabled(desired);
} catch (e) {
logger.error("Failed to update LiveKit video input mute state", e);
}
return this.livekitRoom.localParticipant.isCameraEnabled;
});
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
@@ -213,23 +231,6 @@ export class PublishConnection extends Connection {
);
trackProcessorSync(track$, trackerProcessorState$);
this.muteStates.audio.setHandler(async (desired) => {
try {
await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);
} catch (e) {
logger.error("Failed to update LiveKit audio input mute state", e);
}
return this.livekitRoom.localParticipant.isMicrophoneEnabled;
});
this.muteStates.video.setHandler(async (desired) => {
try {
await this.livekitRoom.localParticipant.setCameraEnabled(desired);
} catch (e) {
logger.error("Failed to update LiveKit video input mute state", e);
}
return this.livekitRoom.localParticipant.isCameraEnabled;
});
const syncDevice = (
kind: MediaDeviceKind,
selected$: Observable<SelectedDevice | undefined>,

View File

@@ -7,7 +7,10 @@ Please see LICENSE in the repository root for full details.
import {
BehaviorSubject,
catchError,
distinctUntilChanged,
EMPTY,
endWith,
filter,
type Observable,
share,
@@ -95,6 +98,41 @@ export class ObservableScope {
)
.subscribe(callback);
}
// TODO-MULTI-SFU Dear Future Robin, please document this. Love, Past Robin.
public reconcile<T>(
value$: Behavior<T>,
callback: (value: T) => Promise<(() => Promise<void>) | undefined>,
): void {
let latestValue: T | typeof nothing = nothing;
let reconciledValue: T | typeof nothing = nothing;
let cleanUp: (() => Promise<void>) | undefined = undefined;
let callbackPromise: Promise<(() => Promise<void>) | undefined>;
value$
.pipe(
catchError(() => EMPTY),
this.bind(),
endWith(nothing),
)
.subscribe((value) => {
void (async (): Promise<void> => {
if (latestValue === nothing) {
latestValue = value;
while (latestValue !== reconciledValue) {
await cleanUp?.();
reconciledValue = latestValue;
if (latestValue !== nothing) {
callbackPromise = callback(latestValue);
cleanUp = await callbackPromise;
}
}
latestValue = nothing;
} else {
latestValue = value;
}
})();
});
}
}
/**