diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index f4f81776..f54fc9f5 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -52,7 +52,12 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "../MediaViewModel"; -import { accumulate, generateItems, pauseWhen } from "../../utils/observable"; +import { + accumulate, + filterBehavior, + generateItems, + pauseWhen, +} from "../../utils/observable"; import { duplicateTiles, MatrixRTCMode, @@ -505,16 +510,13 @@ export function createCallViewModel$( ), ); - const localMatrixLivekitMember$ = - scope.behavior | null>( + const localMatrixLivekitMember$: Behavior | null> = + scope.behavior( localRtcMembership$.pipe( - generateItems( - // Generate a local member when membership is non-null - function* (membership) { - if (membership !== null) - yield { keys: ["local"], data: membership }; - }, - (_scope, membership$) => ({ + filterBehavior((membership) => membership !== null), + map((membership$) => { + if (membership$ === null) return null; + return { membership$, participant: { type: "local" as const, @@ -522,9 +524,8 @@ export function createCallViewModel$( }, connection$: localMembership.connection$, userId, - }), - ), - map(([localMember]) => localMember ?? null), + }; + }), ), ); diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index d1034e7b..be677367 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -5,11 +5,12 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { test } from "vitest"; -import { Subject } from "rxjs"; +import { expect, test } from "vitest"; +import { type Observable, of, Subject, switchMap } from "rxjs"; import { withTestScheduler } from "./test"; -import { generateItems, pauseWhen } from "./observable"; +import { filterBehavior, generateItems, pauseWhen } from "./observable"; +import { type Behavior } from "../state/Behavior"; test("pauseWhen", () => { withTestScheduler(({ behavior, expectObservable }) => { @@ -72,3 +73,31 @@ test("generateItems", () => { expectObservable(scope4$).toBe(scope4Marbles); }); }); + +test("filterBehavior", () => { + withTestScheduler(({ behavior, expectObservable }) => { + // Filtering the input should segment it into 2 modes of non-null behavior. + const inputMarbles = " abcxabx"; + const filteredMarbles = "a--xa-x"; + + const input$ = behavior(inputMarbles, { + a: "a", + b: "b", + c: "c", + x: null, + }); + const filtered$: Observable | null> = input$.pipe( + filterBehavior((value) => typeof value === "string"), + ); + + expectObservable(filtered$).toBe(filteredMarbles, { + a: expect.any(Object), + x: null, + }); + expectObservable( + filtered$.pipe( + switchMap((value$) => (value$ === null ? of(null) : value$)), + ), + ).toBe(inputMarbles, { a: "a", b: "b", c: "c", x: null }); + }); +}); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 053921cd..a6dafea3 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -22,6 +22,7 @@ import { withLatestFrom, BehaviorSubject, type OperatorFunction, + distinctUntilChanged, } from "rxjs"; import { type Behavior } from "../state/Behavior"; @@ -185,6 +186,28 @@ export function generateItemsWithEpoch< ); } +/** + * Segments a behavior into periods during which its value matches the filter + * (outputting a behavior with a narrowed type) and periods during which it does + * not match (outputting null). + */ +export function filterBehavior( + predicate: (value: T) => value is S, +): OperatorFunction | null> { + return (input$) => + input$.pipe( + scan | null>((acc$, input) => { + if (predicate(input)) { + const output$ = acc$ ?? new BehaviorSubject(input); + output$.next(input); + return output$; + } + return null; + }, null), + distinctUntilChanged(), + ); +} + function generateItemsInternal< Input, Keys extends [unknown, ...unknown[]],