Use a more suitable filter operator to compute local member

This commit is contained in:
Robin
2025-12-15 12:52:23 -05:00
parent 6c622fc6e7
commit 00d4b8e985
3 changed files with 69 additions and 16 deletions

View File

@@ -52,7 +52,12 @@ import {
ScreenShareViewModel,
type UserMediaViewModel,
} from "../MediaViewModel";
import { accumulate, generateItems, pauseWhen } from "../../utils/observable";
import {
accumulate,
filterBehavior,
generateItems,
pauseWhen,
} from "../../utils/observable";
import {
duplicateTiles,
MatrixRTCMode,
@@ -505,16 +510,13 @@ export function createCallViewModel$(
),
);
const localMatrixLivekitMember$ =
scope.behavior<MatrixLivekitMember<"local"> | null>(
const localMatrixLivekitMember$: Behavior<MatrixLivekitMember<"local"> | null> =
scope.behavior(
localRtcMembership$.pipe(
generateItems(
// Generate a local member when membership is non-null
function* (membership) {
if (membership !== null)
yield { keys: ["local"], data: membership };
},
(_scope, membership$) => ({
filterBehavior((membership) => membership !== null),
map((membership$) => {
if (membership$ === null) return null;
return {
membership$,
participant: {
type: "local" as const,
@@ -522,9 +524,8 @@ export function createCallViewModel$(
},
connection$: localMembership.connection$,
userId,
}),
),
map(([localMember]) => localMember ?? null),
};
}),
),
);

View File

@@ -5,11 +5,12 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { test } from "vitest";
import { Subject } from "rxjs";
import { expect, test } from "vitest";
import { type Observable, of, Subject, switchMap } from "rxjs";
import { withTestScheduler } from "./test";
import { generateItems, pauseWhen } from "./observable";
import { filterBehavior, generateItems, pauseWhen } from "./observable";
import { type Behavior } from "../state/Behavior";
test("pauseWhen", () => {
withTestScheduler(({ behavior, expectObservable }) => {
@@ -72,3 +73,31 @@ test("generateItems", () => {
expectObservable(scope4$).toBe(scope4Marbles);
});
});
test("filterBehavior", () => {
withTestScheduler(({ behavior, expectObservable }) => {
// Filtering the input should segment it into 2 modes of non-null behavior.
const inputMarbles = " abcxabx";
const filteredMarbles = "a--xa-x";
const input$ = behavior(inputMarbles, {
a: "a",
b: "b",
c: "c",
x: null,
});
const filtered$: Observable<Behavior<string> | null> = input$.pipe(
filterBehavior((value) => typeof value === "string"),
);
expectObservable(filtered$).toBe(filteredMarbles, {
a: expect.any(Object),
x: null,
});
expectObservable(
filtered$.pipe(
switchMap((value$) => (value$ === null ? of(null) : value$)),
),
).toBe(inputMarbles, { a: "a", b: "b", c: "c", x: null });
});
});

View File

@@ -22,6 +22,7 @@ import {
withLatestFrom,
BehaviorSubject,
type OperatorFunction,
distinctUntilChanged,
} from "rxjs";
import { type Behavior } from "../state/Behavior";
@@ -185,6 +186,28 @@ export function generateItemsWithEpoch<
);
}
/**
* Segments a behavior into periods during which its value matches the filter
* (outputting a behavior with a narrowed type) and periods during which it does
* not match (outputting null).
*/
export function filterBehavior<T, S extends T>(
predicate: (value: T) => value is S,
): OperatorFunction<T, Behavior<S> | null> {
return (input$) =>
input$.pipe(
scan<T, BehaviorSubject<S> | null>((acc$, input) => {
if (predicate(input)) {
const output$ = acc$ ?? new BehaviorSubject(input);
output$.next(input);
return output$;
}
return null;
}, null),
distinctUntilChanged(),
);
}
function generateItemsInternal<
Input,
Keys extends [unknown, ...unknown[]],