Refactor combined$ to return [connected, reason] tuple

This commit is contained in:
fkwp
2026-05-12 11:05:03 +02:00
parent 9c3b89695d
commit 6c987eb0d4
4 changed files with 78 additions and 68 deletions

View File

@@ -100,13 +100,13 @@ describe("createHomeserverConnected$", () => {
// Note: gracePeriodMs is set to 0 to avoid debouncing delays in tests
it("reports syncing reason when sync state is not Syncing", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
expect(hsConnected.combined$.value).toBe("sync");
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
it("reports membership reason when sync is Syncing but membership is not Connected", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe("membership");
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
});
it("reports probablyLeft reason when membership transitions to Connected but ProbablyLeft is true", () => {
@@ -116,17 +116,17 @@ describe("createHomeserverConnected$", () => {
// Indicate probable leave before connection
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBe("probablyLeft");
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
});
it("becomes null (connected) only when all three conditions are satisfied", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// 1. Sync loop connected
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe("membership"); // not yet membership connected
expect(hsConnected.combined$.value).toEqual([false, "membership"]); // not yet membership connected
// 2. Membership connected
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull(); // probablyLeft is false
expect(hsConnected.combined$.value).toEqual([true, null]); // probablyLeft is false
});
it("returns syncing reason when sync loop leaves Syncing", () => {
@@ -134,72 +134,72 @@ describe("createHomeserverConnected$", () => {
// Reach connected state
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
// Sync loop error => should report syncing reason
client.setSyncState(SyncState.Error);
expect(hsConnected.combined$.value).toBe("sync");
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
it("returns membershipConnected reason when membership status becomes disconnected", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setMembershipStatus(Status.Disconnected);
expect(hsConnected.combined$.value).toBe("membership");
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
});
it("returns certainlyConnected reason when ProbablyLeft is emitted", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe("probablyLeft");
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
});
it("recovers to null (connected) if ProbablyLeft becomes false again while other conditions remain true", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe("probablyLeft");
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
// Simulate clearing the flag (in realistic scenario membership manager would update)
session.setProbablyLeft(false);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
});
it("composite sequence reflects each individual failure reason", () => {
const hsConnected = createHomeserverConnected$(scope, client, session, 0);
// Initially: sync error + membership disconnected → syncing wins (highest priority)
expect(hsConnected.combined$.value).toBe("sync");
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
// Fix sync only → membershipConnected is now the blocker
client.setSyncState(SyncState.Syncing);
expect(hsConnected.combined$.value).toBe("membership");
expect(hsConnected.combined$.value).toEqual([false, "membership"]);
// Fix membership → all conditions satisfied
session.setMembershipStatus(Status.Connected);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
// Introduce probablyLeft → certainlyConnected
session.setProbablyLeft(true);
expect(hsConnected.combined$.value).toBe("probablyLeft");
expect(hsConnected.combined$.value).toEqual([false, "probablyLeft"]);
// Restore notProbablyLeft → connected again
session.setProbablyLeft(false);
expect(hsConnected.combined$.value).toBeNull();
expect(hsConnected.combined$.value).toEqual([true, null]);
// Drop sync → syncing reason
client.setSyncState(SyncState.Error);
expect(hsConnected.combined$.value).toBe("sync");
expect(hsConnected.combined$.value).toEqual([false, "sync"]);
});
});
@@ -222,24 +222,24 @@ describe("createHomeserverConnected$ - combined$ reason values", () => {
scope.end();
});
it("is null when all three conditions are satisfied", () => {
it("is [true, null] when all three conditions are satisfied", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(combined$.value).toBeNull();
expect(combined$.value).toEqual([true, null]);
});
it("reports syncing when sync loop is not Syncing", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// client starts with SyncState.Error, membership also disconnected
expect(combined$.value).toBe("sync");
expect(combined$.value).toEqual([false, "sync"]);
});
it("reports membershipConnected when sync is fine but membership is not Connected", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
client.setSyncState(SyncState.Syncing);
// session still Status.Disconnected
expect(combined$.value).toBe("membership");
expect(combined$.value).toEqual([false, "membership"]);
});
it("reports certainlyConnected when probablyLeft is true", () => {
@@ -247,32 +247,32 @@ describe("createHomeserverConnected$ - combined$ reason values", () => {
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
session.setProbablyLeft(true);
expect(combined$.value).toBe("probablyLeft");
expect(combined$.value).toEqual([false, "probablyLeft"]);
});
it("prioritises syncing over membershipConnected when both fail", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// Both sync (Error) and membership (Disconnected) are failing
expect(combined$.value).toBe("sync");
expect(combined$.value).toEqual([false, "sync"]);
});
it("updates reason as conditions change", () => {
const { combined$ } = createHomeserverConnected$(scope, client, session, 0);
// Initially: syncing fails
expect(combined$.value).toBe("sync");
expect(combined$.value).toEqual([false, "sync"]);
// Fix sync → membershipConnected is now the blocker
client.setSyncState(SyncState.Syncing);
expect(combined$.value).toBe("membership");
expect(combined$.value).toEqual([false, "membership"]);
// Fix membership → probablyLeft makes certainlyConnected fail
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(combined$.value).toBe("probablyLeft");
expect(combined$.value).toEqual([false, "probablyLeft"]);
// Clear probablyLeft → all conditions satisfied
session.setProbablyLeft(false);
expect(combined$.value).toBeNull();
expect(combined$.value).toEqual([true, null]);
});
});
@@ -304,8 +304,8 @@ describe("createHomeserverConnected$ - Grace Period", () => {
GRACE_PERIOD,
);
expectObservable(hsConnected.combined$).toBe(expectedConnectedMarbles, {
y: null,
n: "sync",
y: [true, null],
n: [false, "sync"],
});
});
}

View File

@@ -40,10 +40,10 @@ export type HomeserverDisconnectReason = "sync" | "membership" | "probablyLeft";
export interface HomeserverConnected {
/**
* Emits `null` when the homeserver connection is healthy, or the reason for
* disconnection when one of the three sub-conditions fails.
* Emits `[true, null]` when the homeserver connection is healthy, or
* `[false, reason]` when one of the three sub-conditions fails.
*/
combined$: Behavior<HomeserverDisconnectReason | null>;
combined$: Behavior<[boolean, HomeserverDisconnectReason | null]>;
rtsSession$: Behavior<Status>;
}
@@ -117,14 +117,21 @@ export function createHomeserverConnected$(
const combined$ = scope.behavior(
combineLatest([syncing$, membershipConnected$, certainlyConnected$]).pipe(
map(([syncing, membership, certainly]) => {
if (!syncing) return "sync" as const;
if (!membership) return "membership" as const;
if (!certainly) return "probablyLeft" as const;
return null;
}),
tap((reason) => {
logger.info(`Homeserver connected update: ${reason ?? "connected"}`);
map(
([syncing, membership, certainly]): [
boolean,
HomeserverDisconnectReason | null,
] => {
if (!syncing) return [false, "sync"];
if (!membership) return [false, "membership"];
if (!certainly) return [false, "probablyLeft"];
return [true, null];
},
),
tap(([connected, reason]) => {
logger.info(
`Homeserver connected update: ${connected ? "connected" : reason}`,
);
}),
),
);

View File

@@ -225,7 +225,10 @@ describe("LocalMembership", () => {
createPublisherFactory: vi.fn(),
joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected: {
combined$: constant(null),
combined$: constant<[boolean, HomeserverDisconnectReason | null]>([
true,
null,
]),
rtsSession$: constant(RTCMemberStatus.Connected),
},
callId: "!test-room-id:example.org",
@@ -700,9 +703,9 @@ describe("LocalMembership", () => {
);
// Simulate startup where membership isn't established yet
const hsReason$ = new BehaviorSubject<HomeserverDisconnectReason | null>(
"membership",
);
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([false, "membership"]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
@@ -726,7 +729,7 @@ describe("LocalMembership", () => {
await flushPromises();
// Membership is established — call is now connected
hsReason$.next(null);
hsReason$.next([true, null]);
expect(trackSpy).not.toHaveBeenCalled();
@@ -740,9 +743,9 @@ describe("LocalMembership", () => {
"track",
);
const hsReason$ = new BehaviorSubject<HomeserverDisconnectReason | null>(
null,
);
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
@@ -765,8 +768,8 @@ describe("LocalMembership", () => {
await flushPromises();
hsReason$.next("sync");
hsReason$.next(null);
hsReason$.next([false, "sync"]);
hsReason$.next([true, null]);
expect(trackSpy).toHaveBeenCalledWith(
defaultCreateLocalMemberValues.callId,
@@ -799,9 +802,9 @@ describe("LocalMembership", () => {
scope,
...defaultCreateLocalMemberValues,
homeserverConnected: {
combined$: new BehaviorSubject<HomeserverDisconnectReason | null>(
null,
),
combined$: new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]),
rtsSession$: constant(RTCMemberStatus.Connected),
},
connectionManager: {
@@ -834,9 +837,9 @@ describe("LocalMembership", () => {
"track",
);
const hsReason$ = new BehaviorSubject<HomeserverDisconnectReason | null>(
null,
);
const hsReason$ = new BehaviorSubject<
[boolean, HomeserverDisconnectReason | null]
>([true, null]);
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
@@ -859,13 +862,13 @@ describe("LocalMembership", () => {
await flushPromises();
hsReason$.next("membership");
hsReason$.next(null);
hsReason$.next([false, "membership"]);
hsReason$.next([true, null]);
hsReason$.next("probablyLeft");
hsReason$.next("sync");
hsReason$.next("membership");
hsReason$.next(null);
hsReason$.next([false, "probablyLeft"]);
hsReason$.next([false, "sync"]);
hsReason$.next([false, "membership"]);
hsReason$.next([true, null]);
expect(trackSpy).toHaveBeenCalledTimes(2);
expect(trackSpy).toHaveBeenNthCalledWith(

View File

@@ -508,8 +508,8 @@ export const createLocalMembership$ = ({
map((state) => state === ConnectionState.LivekitConnected),
),
]).pipe(
map(([homeserverReason, livekitConnected]) => {
if (homeserverReason !== null) return homeserverReason;
map(([[hsConnected, hsReason], livekitConnected]) => {
if (!hsConnected) return hsReason!;
if (!livekitConnected) return "livekit" as const;
return null;
}),
@@ -650,7 +650,7 @@ export const createLocalMembership$ = ({
// TODO refactor this based no livekitState$
combineLatest([participant$, homeserverConnected.combined$])
.pipe(scope.bind())
.subscribe(([participant, connected]) => {
.subscribe(([participant, [connected]]) => {
if (!participant) return;
const publications = participant.trackPublications.values();
if (connected) {