Refactor ring$ observable (#3504)

* Refactor ring$ observable

Signed-off-by: Timo K <toger5@hotmail.de>

* fix ci

Signed-off-by: Timo K <toger5@hotmail.de>

* fix regression test

Signed-off-by: Timo K <toger5@hotmail.de>

---------

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo
2025-09-19 17:43:31 +02:00
committed by GitHub
parent 317b2dc796
commit 9b9c08ed61
4 changed files with 71 additions and 69 deletions

View File

@@ -125,6 +125,7 @@ import { prefetchSounds } from "../soundUtils";
import { useAudioContext } from "../useAudioContext";
import ringtoneMp3 from "../sound/ringtone.mp3?url";
import ringtoneOgg from "../sound/ringtone.ogg?url";
import { ObservableScope } from "../state/ObservableScope.ts";
const canScreenshare = "getDisplayMedia" in (navigator.mediaDevices ?? {});
@@ -144,8 +145,13 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
sfuConfig,
props.e2eeSystem,
);
const connStateObservable$ = useObservable(
(inputs$) => inputs$.pipe(map(([connState]) => connState)),
const observableScope = useInitial(() => new ObservableScope());
const connStateBehavior$ = useObservable(
(inputs$) =>
observableScope.behavior(
inputs$.pipe(map(([connState]) => connState)),
connState,
),
[connState],
);
const [vm, setVm] = useState<CallViewModel | null>(null);
@@ -188,7 +194,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
waitForCallPickup:
waitForCallPickup && sendNotificationType === "ring",
},
connStateObservable$,
connStateBehavior$,
reactionsReader.raisedHands$,
reactionsReader.reactions$,
);
@@ -204,7 +210,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
livekitRoom,
mediaDevices,
props.e2eeSystem,
connStateObservable$,
connStateBehavior$,
autoLeaveWhenOthersLeft,
sendNotificationType,
waitForCallPickup,

View File

@@ -266,7 +266,7 @@ const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
interface CallViewModelInputs {
remoteParticipants$: Behavior<RemoteParticipant[]>;
rtcMembers$: Behavior<Partial<CallMembership>[]>;
livekitConnectionState$: Observable<ECConnectionState>;
livekitConnectionState$: Behavior<ECConnectionState>;
speaking: Map<Participant, Observable<boolean>>;
mediaDevices: MediaDevices;
initialSyncState: SyncState;
@@ -276,7 +276,9 @@ function withCallViewModel(
{
remoteParticipants$ = constant([]),
rtcMembers$ = constant([localRtcMember]),
livekitConnectionState$: connectionState$ = of(ConnectionState.Connected),
livekitConnectionState$: connectionState$ = constant(
ConnectionState.Connected,
),
speaking = new Map(),
mediaDevices = mockMediaDevices({}),
initialSyncState = SyncState.Syncing,
@@ -1272,7 +1274,7 @@ describe("waitForCallPickup$", () => {
},
});
expectObservable(vm.callPickupState$).toBe("a 9ms b 29ms c", {
expectObservable(vm.callPickupState$).toBe("a 9ms b 19ms c", {
a: "unknown",
b: "ringing",
c: "timeout",

View File

@@ -898,58 +898,59 @@ export class CallViewModel extends ViewModel {
// A behavior will emit the latest observable with the running timer to new subscribers.
// see also: callPickupState$ and in particular the line: `return this.ring$.pipe(mergeAll());` here we otherwise might get an EMPTY observable if
// `ring$` would not be a behavior.
private readonly ring$: Behavior<
Observable<"ringing" | "timeout" | "decline"> | Observable<never>
> = this.scope.behavior(
this.didSendCallNotification$.pipe(
filter(
([notificationEvent]) => notificationEvent.notification_type === "ring",
),
map(([notificationEvent]) => {
const lifetimeMs = notificationEvent?.lifetime ?? 0;
return concat(
lifetimeMs === 0
? // If no lifetime, skip the ring state
EMPTY
: // Ring until lifetime ms have passed
timer(lifetimeMs).pipe(
ignoreElements(),
startWith("ringing" as const),
),
// The notification lifetime has timed out, meaning ringing has likely
// stopped on all receiving clients.
of("timeout" as const),
NEVER,
).pipe(
takeUntil(
(
fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable<
Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]>
>
).pipe(
filter(
([event]) =>
event.getType() === EventType.RTCDecline &&
event.getRelation()?.rel_type === "m.reference" &&
event.getRelation()?.event_id ===
notificationEvent.event_id &&
event.getSender() !== this.userId,
private readonly ring$: Behavior<"ringing" | "timeout" | "decline" | null> =
this.scope.behavior(
this.didSendCallNotification$.pipe(
filter(
([notificationEvent]) =>
notificationEvent.notification_type === "ring",
),
switchMap(([notificationEvent]) => {
const lifetimeMs = notificationEvent?.lifetime ?? 0;
return concat(
lifetimeMs === 0
? // If no lifetime, skip the ring state
of(null)
: // Ring until lifetime ms have passed
timer(lifetimeMs).pipe(
ignoreElements(),
startWith("ringing" as const),
),
// The notification lifetime has timed out, meaning ringing has likely
// stopped on all receiving clients.
of("timeout" as const),
// This makes sure we will not drop into the `endWith("decline" as const)` state
NEVER,
).pipe(
takeUntil(
(
fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable<
Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]>
>
).pipe(
filter(
([event]) =>
event.getType() === EventType.RTCDecline &&
event.getRelation()?.rel_type === "m.reference" &&
event.getRelation()?.event_id ===
notificationEvent.event_id &&
event.getSender() !== this.userId,
),
),
),
),
endWith("decline" as const),
);
}),
),
EMPTY,
);
endWith("decline" as const),
);
}),
),
null,
);
/**
* Whether some Matrix user other than ourself is joined to the call.
*/
private readonly someoneElseJoined$ = this.memberships$.pipe(
map((ms) => ms.some((m) => m.sender !== this.userId)),
);
) as Behavior<boolean>;
/**
* The current call pickup state of the call.
@@ -968,27 +969,19 @@ export class CallViewModel extends ViewModel {
? this.scope.behavior<
"unknown" | "ringing" | "timeout" | "decline" | "success"
>(
combineLatest([
this.livekitConnectionState$,
this.someoneElseJoined$,
]).pipe(
switchMap(([livekitConnectionState, someoneElseJoined]) => {
combineLatest(
[this.livekitConnectionState$, this.someoneElseJoined$, this.ring$],
(livekitConnectionState, someoneElseJoined, ring) => {
if (livekitConnectionState === ConnectionState.Disconnected) {
// Do not ring until we're connected.
return of("unknown" as const);
return "unknown" as const;
} else if (someoneElseJoined) {
return of("success" as const);
return "success" as const;
}
// Show the ringing state of the most recent ringing attempt.
// ring$ is a behavior so it will emit the latest observable which very well might already have a running timer.
// this is important in case livekitConnectionState$ after didSendCallNotification$ has already emitted.
return this.ring$.pipe(switchAll());
}),
// The state starts as 'unknown' because we don't know if the RTC
// session will actually send a notify event yet. It will only be
// known once we send our own membership and see that we were the
// first one to join.
startWith("unknown" as const),
// as long as we have not yet sent an RTC notification event, ring will be null -> callPickupState$ = unknown.
return ring ?? ("unknown" as const);
},
),
)
: constant(null);
@@ -1700,7 +1693,7 @@ export class CallViewModel extends ViewModel {
private readonly livekitRoom: LivekitRoom,
private readonly mediaDevices: MediaDevices,
private readonly options: CallViewModelOptions,
public readonly livekitConnectionState$: Observable<ECConnectionState>,
public readonly livekitConnectionState$: Behavior<ECConnectionState>,
private readonly handsRaisedSubject$: Observable<
Record<string, RaisedHandInfo>
>,

View File

@@ -39,6 +39,7 @@ import {
localRtcMember,
} from "./test-fixtures";
import { type RaisedHandInfo, type ReactionInfo } from "../reactions";
import { constant } from "../state/Behavior";
export function getBasicRTCSession(
members: RoomMember[],
@@ -154,7 +155,7 @@ export function getBasicCallViewModelEnvironment(
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
...callViewModelOptions,
},
of(ConnectionState.Connected),
constant(ConnectionState.Connected),
handRaisedSubject$,
reactionsSubject$,
);