From c3f49fa73b12d109e67abf3fd91f4b9b3d118205 Mon Sep 17 00:00:00 2001 From: Timo K Date: Fri, 19 Sep 2025 16:04:26 +0200 Subject: [PATCH] Refactor ring$ observable Signed-off-by: Timo K --- src/state/CallViewModel.ts | 117 +++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index d0edf7a2..072e84dc 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -898,58 +898,59 @@ export class CallViewModel extends ViewModel { // A behavior will emit the latest observable with the running timer to new subscribers. // see also: callPickupState$ and in particular the line: `return this.ring$.pipe(mergeAll());` here we otherwise might get an EMPTY observable if // `ring$` would not be a behavior. - private readonly ring$: Behavior< - Observable<"ringing" | "timeout" | "decline"> | Observable - > = this.scope.behavior( - this.didSendCallNotification$.pipe( - filter( - ([notificationEvent]) => notificationEvent.notification_type === "ring", - ), - map(([notificationEvent]) => { - const lifetimeMs = notificationEvent?.lifetime ?? 0; - return concat( - lifetimeMs === 0 - ? // If no lifetime, skip the ring state - EMPTY - : // Ring until lifetime ms have passed - timer(lifetimeMs).pipe( - ignoreElements(), - startWith("ringing" as const), - ), - // The notification lifetime has timed out, meaning ringing has likely - // stopped on all receiving clients. - of("timeout" as const), - NEVER, - ).pipe( - takeUntil( - ( - fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable< - Parameters - > - ).pipe( - filter( - ([event]) => - event.getType() === EventType.RTCDecline && - event.getRelation()?.rel_type === "m.reference" && - event.getRelation()?.event_id === - notificationEvent.event_id && - event.getSender() !== this.userId, + private readonly ring$: Behavior<"ringing" | "timeout" | "decline" | null> = + this.scope.behavior( + this.didSendCallNotification$.pipe( + filter( + ([notificationEvent]) => + notificationEvent.notification_type === "ring", + ), + switchMap(([notificationEvent]) => { + const lifetimeMs = notificationEvent?.lifetime ?? 0; + return concat( + lifetimeMs === 0 + ? // If no lifetime, skip the ring state + of(null) + : // Ring until lifetime ms have passed + timer(lifetimeMs).pipe( + ignoreElements(), + startWith("ringing" as const), + ), + // The notification lifetime has timed out, meaning ringing has likely + // stopped on all receiving clients. + of("timeout" as const), + // This makes sure we will not drop into the `endWith("decline" as const)` state + NEVER, + ).pipe( + takeUntil( + ( + fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable< + Parameters + > + ).pipe( + filter( + ([event]) => + event.getType() === EventType.RTCDecline && + event.getRelation()?.rel_type === "m.reference" && + event.getRelation()?.event_id === + notificationEvent.event_id && + event.getSender() !== this.userId, + ), ), ), - ), - endWith("decline" as const), - ); - }), - ), - EMPTY, - ); + endWith("decline" as const), + ); + }), + ), + null, + ); /** * Whether some Matrix user other than ourself is joined to the call. */ private readonly someoneElseJoined$ = this.memberships$.pipe( map((ms) => ms.some((m) => m.sender !== this.userId)), - ); + ) as Behavior; /** * The current call pickup state of the call. @@ -968,27 +969,19 @@ export class CallViewModel extends ViewModel { ? this.scope.behavior< "unknown" | "ringing" | "timeout" | "decline" | "success" >( - combineLatest([ - this.livekitConnectionState$, - this.someoneElseJoined$, - ]).pipe( - switchMap(([livekitConnectionState, someoneElseJoined]) => { + combineLatest( + [this.livekitConnectionState$, this.someoneElseJoined$, this.ring$], + (livekitConnectionState, someoneElseJoined, ring) => { if (livekitConnectionState === ConnectionState.Disconnected) { // Do not ring until we're connected. - return of("unknown" as const); - } else if (someoneElseJoined) { - return of("success" as const); + return "unknown" as const; + } else if (someoneElseJoined && ring !== null) { + return "success" as const; } // Show the ringing state of the most recent ringing attempt. - // ring$ is a behavior so it will emit the latest observable which very well might already have a running timer. - // this is important in case livekitConnectionState$ after didSendCallNotification$ has already emitted. - return this.ring$.pipe(switchAll()); - }), - // The state starts as 'unknown' because we don't know if the RTC - // session will actually send a notify event yet. It will only be - // known once we send our own membership and see that we were the - // first one to join. - startWith("unknown" as const), + // as long as we have not yet sent an RTC notification event, ring will be null -> callPickupState$ = unknown. + return ring ?? ("unknown" as const); + }, ), ) : constant(null); @@ -1700,7 +1693,7 @@ export class CallViewModel extends ViewModel { private readonly livekitRoom: LivekitRoom, private readonly mediaDevices: MediaDevices, private readonly options: CallViewModelOptions, - public readonly livekitConnectionState$: Observable, + public readonly livekitConnectionState$: Behavior, private readonly handsRaisedSubject$: Observable< Record >,