From 93a1a6a3be2414dbb38fe44512bee5c10634d640 Mon Sep 17 00:00:00 2001 From: Timo <16718859+toger5@users.noreply.github.com> Date: Fri, 19 Sep 2025 16:29:50 +0200 Subject: [PATCH] Backport: Fix: never stop ring feedback on the sender side (#3503) * make ring$ a behavior and add code comments to justify/explain the change. Signed-off-by: Timo K * Add test: reproduce "ring does not stop" race. Signed-off-by: Timo K --------- Signed-off-by: Timo K --- src/state/CallViewModel.test.ts | 43 +++++++++++-- src/state/CallViewModel.ts | 104 +++++++++++++++++--------------- 2 files changed, 96 insertions(+), 51 deletions(-) diff --git a/src/state/CallViewModel.test.ts b/src/state/CallViewModel.test.ts index b6935a9b..7bfb4a82 100644 --- a/src/state/CallViewModel.test.ts +++ b/src/state/CallViewModel.test.ts @@ -266,7 +266,7 @@ const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent; interface CallViewModelInputs { remoteParticipants$: Behavior; rtcMembers$: Behavior[]>; - connectionState$: Observable; + livekitConnectionState$: Observable; speaking: Map>; mediaDevices: MediaDevices; initialSyncState: SyncState; @@ -276,7 +276,7 @@ function withCallViewModel( { remoteParticipants$ = constant([]), rtcMembers$ = constant([localRtcMember]), - connectionState$ = of(ConnectionState.Connected), + livekitConnectionState$: connectionState$ = of(ConnectionState.Connected), speaking = new Map(), mediaDevices = mockMediaDevices({}), initialSyncState = SyncState.Syncing, @@ -384,7 +384,7 @@ test("participants are retained during a focus switch", () => { b: [], }), rtcMembers$: constant([localRtcMember, aliceRtcMember, bobRtcMember]), - connectionState$: behavior(connectionInputMarbles, { + livekitConnectionState$: behavior(connectionInputMarbles, { c: ConnectionState.Connected, s: ECAddonConnectionState.ECSwitchingFocus, }), @@ -1251,6 +1251,41 @@ describe("waitForCallPickup$", () => { }); }); + test("regression test: does stop ringing in case livekitConnectionState$ emits after didSendCallNotification$ has already emitted", () => { + withTestScheduler(({ schedule, expectObservable, behavior }) => { + withCallViewModel( + { + livekitConnectionState$: behavior("d 9ms c", { + d: ConnectionState.Disconnected, + c: ConnectionState.Connected, + }), + }, + (vm, rtcSession) => { + // Fire a call notification IMMEDIATELY (its important for this test, that this happens before the livekitConnectionState$ emits) + schedule("n", { + n: () => { + rtcSession.emit( + MatrixRTCSessionEvent.DidSendCallNotification, + mockRingEvent("$notif1", 30), + mockLegacyRingEvent, + ); + }, + }); + + expectObservable(vm.callPickupState$).toBe("a 9ms b 29ms c", { + a: "unknown", + b: "ringing", + c: "timeout", + }); + }, + { + waitForCallPickup: true, + encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, + }, + ); + }); + }); + test("ringing -> success if someone joins before timeout", () => { withTestScheduler(({ behavior, schedule, expectObservable }) => { // Someone joins at 20ms (both LiveKit participant and MatrixRTC member) @@ -1305,7 +1340,7 @@ describe("waitForCallPickup$", () => { a: [localRtcMember], b: [localRtcMember, aliceRtcMember], }), - connectionState$, + livekitConnectionState$: connectionState$, }, (vm, rtcSession) => { // Notify at 5ms so we enter ringing, then get disconnected 5ms later diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 8289369f..d0edf7a2 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -880,60 +880,68 @@ export class CallViewModel extends ViewModel { ? this.allOthersLeft$ : NEVER; + private readonly didSendCallNotification$ = fromEvent( + this.matrixRTCSession, + MatrixRTCSessionEvent.DidSendCallNotification, + ) as Observable< + Parameters< + MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification] + > + >; /** * Whenever the RTC session tells us that it intends to ring the remote * participant's devices, this emits an Observable tracking the current state of * that ringing process. */ - private readonly ring$: Observable< - Observable<"ringing" | "timeout" | "decline"> - > = ( - fromEvent( - this.matrixRTCSession, - MatrixRTCSessionEvent.DidSendCallNotification, - ) as Observable< - Parameters< - MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification] - > - > - ).pipe( - filter( - ([notificationEvent]) => notificationEvent.notification_type === "ring", - ), - map(([notificationEvent]) => { - const lifetimeMs = notificationEvent?.lifetime ?? 0; - return concat( - lifetimeMs === 0 - ? // If no lifetime, skip the ring state - EMPTY - : // Ring until lifetime ms have passed - timer(lifetimeMs).pipe( - ignoreElements(), - startWith("ringing" as const), - ), - // The notification lifetime has timed out, meaning ringing has likely - // stopped on all receiving clients. - of("timeout" as const), - NEVER, - ).pipe( - takeUntil( - ( - fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable< - Parameters - > - ).pipe( - filter( - ([event]) => - event.getType() === EventType.RTCDecline && - event.getRelation()?.rel_type === "m.reference" && - event.getRelation()?.event_id === notificationEvent.event_id && - event.getSender() !== this.userId, + // This is a behavior since we need to store the latest state for when we subscribe to this after `didSendCallNotification$` + // has already emitted but we still need the latest observable with a timeout timer that only gets created on after receiving `notificationEvent`. + // A behavior will emit the latest observable with the running timer to new subscribers. + // see also: callPickupState$ and in particular the line: `return this.ring$.pipe(mergeAll());` here we otherwise might get an EMPTY observable if + // `ring$` would not be a behavior. + private readonly ring$: Behavior< + Observable<"ringing" | "timeout" | "decline"> | Observable + > = this.scope.behavior( + this.didSendCallNotification$.pipe( + filter( + ([notificationEvent]) => notificationEvent.notification_type === "ring", + ), + map(([notificationEvent]) => { + const lifetimeMs = notificationEvent?.lifetime ?? 0; + return concat( + lifetimeMs === 0 + ? // If no lifetime, skip the ring state + EMPTY + : // Ring until lifetime ms have passed + timer(lifetimeMs).pipe( + ignoreElements(), + startWith("ringing" as const), + ), + // The notification lifetime has timed out, meaning ringing has likely + // stopped on all receiving clients. + of("timeout" as const), + NEVER, + ).pipe( + takeUntil( + ( + fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable< + Parameters + > + ).pipe( + filter( + ([event]) => + event.getType() === EventType.RTCDecline && + event.getRelation()?.rel_type === "m.reference" && + event.getRelation()?.event_id === + notificationEvent.event_id && + event.getSender() !== this.userId, + ), ), ), - ), - endWith("decline" as const), - ); - }), + endWith("decline" as const), + ); + }), + ), + EMPTY, ); /** @@ -972,6 +980,8 @@ export class CallViewModel extends ViewModel { return of("success" as const); } // Show the ringing state of the most recent ringing attempt. + // ring$ is a behavior so it will emit the latest observable which very well might already have a running timer. + // this is important in case livekitConnectionState$ after didSendCallNotification$ has already emitted. return this.ring$.pipe(switchAll()); }), // The state starts as 'unknown' because we don't know if the RTC