diff --git a/src/state/Behavior.ts b/src/state/Behavior.ts index 3c88dc00..71b18a55 100644 --- a/src/state/Behavior.ts +++ b/src/state/Behavior.ts @@ -18,6 +18,10 @@ import { BehaviorSubject } from "rxjs"; */ export type Behavior = Omit, "next" | "observers">; +export type BehaviorWithEpoch = Behavior & { + pipeEpoch(): Behavior<{ value: T; epoch: number }>; +}; + /** * Creates a Behavior which never changes in value. */ diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 420ecab4..714ca62c 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -226,7 +226,10 @@ export class CallViewModel { [this.localTransport$, this.membershipsAndTransports.transports$], (localTransport, transports) => { const localTransportAsArray = localTransport ? [localTransport] : []; - return [...localTransportAsArray, ...transports]; + return transports.mapInner((transports) => [ + ...localTransportAsArray, + ...transports, + ]); }, ), ); diff --git a/src/state/ObservableScope.test.ts b/src/state/ObservableScope.test.ts new file mode 100644 index 00000000..d53084da --- /dev/null +++ b/src/state/ObservableScope.test.ts @@ -0,0 +1,56 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { describe, expect, it } from "vitest"; + +import { Epoch, mapEpoch, trackEpoch } from "./ObservableScope"; +import { withTestScheduler } from "../utils/test"; + +describe("Epoch", () => { + it("should map the value correctly", () => { + const epoch = new Epoch(1); + const mappedEpoch = epoch.mapInner((v) => v + 1); + expect(mappedEpoch.value).toBe(2); + expect(mappedEpoch.epoch).toBe(0); + }); + + it("should be tracked from an observable", () => { + withTestScheduler(({ expectObservable, behavior }) => { + const observable$ = behavior("abc", { + a: 1, + b: 2, + c: 3, + }); + const epochObservable$ = observable$.pipe(trackEpoch()); + expectObservable(epochObservable$).toBe("abc", { + a: expect.toSatisfy((e) => e.epoch === 0 && e.value === 1), + b: expect.toSatisfy((e) => e.epoch === 1 && e.value === 2), + c: expect.toSatisfy((e) => e.epoch === 2 && e.value === 3), + }); + }); + }); + + it("can be mapped without loosing epoch information", () => { + withTestScheduler(({ expectObservable, behavior }) => { + const observable$ = behavior("abc", { + a: "A", + b: "B", + c: "C", + }); + const epochObservable$ = observable$.pipe(trackEpoch()); + const derivedEpoch$ = epochObservable$.pipe( + mapEpoch((e) => e + "-mapped"), + ); + + expectObservable(derivedEpoch$).toBe("abc", { + a: new Epoch("A-mapped", 0), + b: new Epoch("B-mapped", 1), + c: new Epoch("C-mapped", 2), + }); + }); + }); +}); diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 879445e6..fbf92ada 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -12,7 +12,9 @@ import { EMPTY, endWith, filter, + map, type Observable, + type OperatorFunction, share, take, takeUntil, @@ -151,3 +153,107 @@ export class ObservableScope { * The global scope, a scope which never ends. */ export const globalScope = new ObservableScope(); + +/** + * `Epoch`'s can be used to create `Behavior`s and `Observable`s which derivitives can be merged + * with `combinedLatest` without duplicated emissions. + * + * This is useful in the following example: + * ``` + * const rootObs$ = of("red","green","blue"); + * const derivedObs$ = rootObs$.pipe( + * map((v)=> {red:"fire", green:"grass", blue:"water"}[v]) + * ); + * const otherDerivedObs$ = rootObs$.pipe( + * map((v)=> {red:"tomatoes", green:"leaves", blue:"sky"}[v]) + * ); + * const mergedObs$ = combineLatest([rootObs$, derivedObs$, otherDerivedObs$]).pipe( + * map(([color, a,b]) => color + " like " + a + " and " + b) + * ); + * + * ``` + * will result in 6 emissions with mismatching items like "red like fire and leaves" + * + * # Use Epoch + * ``` + * const rootObs$ = of(1,2,3).pipe(trackEpoch()); + * const derivedObs$ = rootObs$.pipe( + * mapEpoch((v)=> "this number: " + v) + * ); + * const otherDerivedObs$ = rootObs$.pipe( + * mapEpoch((v)=> "multiplied by: " + v) + * ); + * const mergedObs$ = combineLatest([derivedObs$, otherDerivedObs$]).pipe( + * filter((values) => values.every((v) => v.epoch === values[0].v)), + * map(([color, a, b]) => color + " like " + a + " and " + b) + * ); + * + * ``` + * will result in 3 emissions all matching (e.g. "blue like water and sky") + */ +export class Epoch { + public readonly epoch: number; + public readonly value: T; + + public constructor(value: T, epoch?: number) { + this.value = value; + this.epoch = epoch ?? 0; + } + /** + * Maps the value inside the epoch to a new value while keeping the epoch number. + * # usage + * ``` + * const myEpoch$ = myObservable$.pipe( + * map(trackEpoch()), + * // this is the preferred way using mapEpoch + * mapEpoch((v)=> v+1) + * // This is how inner map can be used: + * map((epoch) => epoch.innerMap((v)=> v+1)) + * // It is equivalent to: + * map((epoch) => new Epoch(epoch.value + 1, epoch.epoch)) + * ) + * ``` + * See also `Epoch` + */ + public mapInner(map: (value: T) => U): Epoch { + return new Epoch(map(this.value), this.epoch); + } +} + +/** + * A `pipe` compatible map oparator that keeps the epoch in tact but allows mapping the value. + * # usage + * ``` + * const myEpoch$ = myObservable$.pipe( + * map(trackEpoch()), + * // this is the preferred way using mapEpoch + * mapEpoch((v)=> v+1) + * // This is how inner map can be used: + * map((epoch) => epoch.innerMap((v)=> v+1)) + * // It is equivalent to: + * map((epoch) => new Epoch(epoch.value + 1, epoch.epoch)) + * ) + * ``` + * See also `Epoch` + */ +export function mapEpoch( + mapFn: (value: T) => U, +): OperatorFunction, Epoch> { + return map((e) => e.mapInner(mapFn)); +} +/** + * # usage + * ``` + * const myEpoch$ = myObservable$.pipe( + * map(trackEpoch()), + * map((epoch) => epoch.innerMap((v)=> v+1)) + * ) + * const derived = myEpoch$.pipe( + * mapEpoch((v)=>v^2) + * ) + * ``` + * See also `Epoch` + */ +export function trackEpoch(): OperatorFunction> { + return map>((value, number) => new Epoch(value, number)); +} diff --git a/src/state/SessionBehaviors.ts b/src/state/SessionBehaviors.ts index 80e9f09c..d44ad33a 100644 --- a/src/state/SessionBehaviors.ts +++ b/src/state/SessionBehaviors.ts @@ -12,19 +12,24 @@ import { type MatrixRTCSession, MatrixRTCSessionEvent, } from "matrix-js-sdk/lib/matrixrtc"; -import { fromEvent, map } from "rxjs"; +import { fromEvent } from "rxjs"; -import { type ObservableScope } from "./ObservableScope"; +import { + type Epoch, + mapEpoch, + trackEpoch, + type ObservableScope, +} from "./ObservableScope"; import { type Behavior } from "./Behavior"; export const membershipsAndTransports$ = ( scope: ObservableScope, - memberships$: Behavior, + memberships$: Behavior>, ): { membershipsWithTransport$: Behavior< - { membership: CallMembership; transport?: LivekitTransport }[] + Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> >; - transports$: Behavior; + transports$: Behavior>; } => { /** * Lists the transports used by ourselves, plus all other MatrixRTC session @@ -36,7 +41,7 @@ export const membershipsAndTransports$ = ( */ const membershipsWithTransport$ = scope.behavior( memberships$.pipe( - map((memberships) => { + mapEpoch((memberships) => { return memberships.map((membership) => { const oldestMembership = memberships[0] ?? membership; const transport = membership.getTransport(oldestMembership); @@ -51,7 +56,7 @@ export const membershipsAndTransports$ = ( const transports$ = scope.behavior( membershipsWithTransport$.pipe( - map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), + mapEpoch((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), ), ); @@ -64,12 +69,12 @@ export const membershipsAndTransports$ = ( export const createMemberships$ = ( scope: ObservableScope, matrixRTCSession: MatrixRTCSession, -): Behavior => { +): Behavior> => { return scope.behavior( fromEvent( matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged, (_, memberships: CallMembership[]) => memberships, - ), + ).pipe(trackEpoch()), ); }; diff --git a/src/state/localMember/LocalMembership.ts b/src/state/localMember/LocalMembership.ts index 83337064..6a400c37 100644 --- a/src/state/localMember/LocalMembership.ts +++ b/src/state/localMember/LocalMembership.ts @@ -28,23 +28,20 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../Behavior"; -import { - type ConnectionManagerReturn, - type createConnectionManager$, -} from "../remoteMembers/ConnectionManager"; +import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../ObservableScope"; import { Publisher } from "./Publisher"; import { type MuteStates } from "../MuteStates"; import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type MediaDevices } from "../MediaDevices"; import { and$ } from "../../utils/observable"; -import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger"; import { enterRTCSession, type EnterRTCSessionOptions, } from "../../rtcSessionHelpers"; import { type ElementCallError } from "../../utils/errors"; import { ElementWidgetActions, type WidgetHelpers } from "../../widget"; +import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers"; enum LivekitState { UNINITIALIZED = "uninitialized", @@ -93,7 +90,7 @@ interface Props { scope: ObservableScope; mediaDevices: MediaDevices; muteStates: MuteStates; - connectionManager: ConnectionManagerReturn; + connectionManager: IConnectionManager; matrixRTCSession: MatrixRTCSession; matrixRoom: MatrixRoom; localTransport$: Behavior; @@ -153,12 +150,13 @@ export const createLocalMembership$ = ({ // This should be used in a combineLatest with publisher$ to connect. const tracks$ = new BehaviorSubject([]); + // Drop Epoch data here since we will not combine this anymore const connection$ = scope.behavior( combineLatest( [connectionManager.connections$, localTransport$], (connections, transport) => { if (transport === undefined) return undefined; - return connections.find((connection) => + return connections.value.find((connection) => areLivekitTransportsEqual(connection.transport, transport), ); }, diff --git a/src/state/localMember/LocalTransport.ts b/src/state/localMember/LocalTransport.ts index a1b0d329..bdcfcffc 100644 --- a/src/state/localMember/LocalTransport.ts +++ b/src/state/localMember/LocalTransport.ts @@ -13,13 +13,17 @@ import { isLivekitTransportConfig, } from "matrix-js-sdk/lib/matrixrtc"; import { type MatrixClient } from "matrix-js-sdk"; -import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs"; +import { combineLatest, distinctUntilChanged, first, from } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { deepCompare } from "matrix-js-sdk/lib/utils"; import { type Behavior } from "../Behavior.ts"; -import { type ObservableScope } from "../ObservableScope.ts"; +import { + type Epoch, + mapEpoch, + type ObservableScope, +} from "../ObservableScope.ts"; import { Config } from "../../config/Config.ts"; import { MatrixRTCTransportMissingError } from "../../utils/errors.ts"; import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts"; @@ -37,7 +41,7 @@ import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts"; */ interface Props { scope: ObservableScope; - memberships$: Behavior; + memberships$: Behavior>; client: MatrixClient; roomId: string; useOldestMember$: Behavior; @@ -63,7 +67,7 @@ export const createLocalTransport$ = ({ */ const oldestMemberTransport$ = scope.behavior( memberships$.pipe( - map((memberships) => memberships[0].getTransport(memberships[0])), + mapEpoch((memberships) => memberships[0].getTransport(memberships[0])), first((t) => t != undefined && isLivekitTransport(t)), ), undefined, diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index 245db7c1..49ab6b71 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -19,7 +19,7 @@ import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type Behavior } from "../Behavior"; import { type Connection } from "./Connection"; -import { type ObservableScope } from "../ObservableScope"; +import { Epoch, type ObservableScope } from "../ObservableScope"; import { generateKeyed$ } from "../../utils/observable"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; @@ -90,13 +90,13 @@ export class ConnectionManagerData { interface Props { scope: ObservableScope; connectionFactory: ConnectionFactory; - inputTransports$: Behavior; + inputTransports$: Behavior>; } // TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { - transports$: Behavior; - connectionManagerData$: Behavior; - connections$: Behavior; + transports$: Behavior>; + connectionManagerData$: Behavior>; + connections$: Behavior>; } /** * Crete a `ConnectionManager` @@ -133,8 +133,10 @@ export function createConnectionManager$({ */ const transports$ = scope.behavior( combineLatest([running$, inputTransports$]).pipe( - map(([running, transports]) => (running ? transports : [])), - map((transports) => removeDuplicateTransports(transports)), + map(([running, transports]) => + transports.mapInner((transport) => (running ? transport : [])), + ), + map((transports) => transports.mapInner(removeDuplicateTransports)), ), ); @@ -142,7 +144,7 @@ export function createConnectionManager$({ * Connections for each transport in use by one or more session members. */ const connections$ = scope.behavior( - generateKeyed$( + generateKeyed$, Connection, Epoch>( transports$, (transports, createOrGet) => { const createConnection = @@ -162,46 +164,50 @@ export function createConnectionManager$({ return connection; }; - return transports.map((transport) => { - const key = - transport.livekit_service_url + "|" + transport.livekit_alias; - return createOrGet(key, createConnection(transport)); + return transports.mapInner((transports) => { + return transports.map((transport) => { + const key = + transport.livekit_service_url + "|" + transport.livekit_alias; + return createOrGet(key, createConnection(transport)); + }); }); }, ), ); - const connectionManagerData$: Behavior = - scope.behavior( - connections$.pipe( - switchMap((connections) => { - // Map the connections to list of {connection, participants}[] - const listOfConnectionsWithPublishingParticipants = connections.map( - (connection) => { - return connection.participantsWithTrack$.pipe( - map((participants) => ({ - connection, - participants, - })), - ); - }, - ); - // combineLatest the several streams into a single stream with the ConnectionManagerData - return combineLatest( - listOfConnectionsWithPublishingParticipants, - ).pipe( - map((lists) => - lists.reduce((data, { connection, participants }) => { - data.add(connection, participants); - return data; - }, new ConnectionManagerData()), - ), - ); - }), - ), - // start empty - new ConnectionManagerData(), - ); + const connectionManagerData$ = scope.behavior( + connections$.pipe( + switchMap((connections) => { + const epoch = connections.epoch; + + // Map the connections to list of {connection, participants}[] + const listOfConnectionsWithPublishingParticipants = + connections.value.map((connection) => { + return connection.participantsWithTrack$.pipe( + map((participants) => ({ + connection, + participants, + })), + ); + }); + + // combineLatest the several streams into a single stream with the ConnectionManagerData + return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( + map( + (lists) => + new Epoch( + lists.reduce((data, { connection, participants }) => { + data.add(connection, participants); + return data; + }, new ConnectionManagerData()), + epoch, + ), + ), + ); + }), + ), + ); + return { transports$, connectionManagerData$, connections$ }; } diff --git a/src/state/remoteMembers/MatrixLivekitMembers.ts b/src/state/remoteMembers/MatrixLivekitMembers.ts index 28a9cca9..d7937bbe 100644 --- a/src/state/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/remoteMembers/MatrixLivekitMembers.ts @@ -13,14 +13,15 @@ import { type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, map } from "rxjs"; +import { combineLatest, filter, map } from "rxjs"; // eslint-disable-next-line rxjs/no-internal import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; +import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; -import { type ObservableScope } from "../ObservableScope"; +import { Epoch, mapEpoch, type ObservableScope } from "../ObservableScope"; import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; import { type Connection } from "./Connection"; @@ -47,7 +48,7 @@ export interface MatrixLivekitMember { interface Props { scope: ObservableScope; membershipsWithTransport$: Behavior< - { membership: CallMembership; transport?: LivekitTransport }[] + Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> >; connectionManager: IConnectionManager; // TODO this is too much information for that class, @@ -74,7 +75,7 @@ export function createMatrixLivekitMembers$({ membershipsWithTransport$, connectionManager, matrixRoom, -}: Props): Behavior { +}: Props): Behavior> { /** * Stream of all the call members and their associated livekit data (if available). */ @@ -82,7 +83,7 @@ export function createMatrixLivekitMembers$({ const displaynameMap$ = memberDisplaynames$( scope, matrixRoom, - membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))), + membershipsWithTransport$.pipe(mapEpoch((v) => v.map((v) => v.membership))), ); return scope.behavior( @@ -91,48 +92,52 @@ export function createMatrixLivekitMembers$({ connectionManager.connectionManagerData$, displaynameMap$, ]).pipe( - // filter( - // ([membershipsWithTransports, managerData, displaynames]) => - // // for each change in - // displaynames.size === membershipsWithTransports.length && - // displaynames.size === managerData.getConnections().length, - // ), - map(([memberships, managerData, displaynames]) => { - const items: MatrixLivekitMember[] = memberships.map( - ({ membership, transport }) => { - // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to - const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; + filter((values) => + values.every((value) => value.epoch === values[0].epoch), + ), + map( + ([ + { value: membershipsWithTransports, epoch }, + { value: managerData }, + { value: displaynames }, + ]) => { + const items: MatrixLivekitMember[] = membershipsWithTransports.map( + ({ membership, transport }) => { + // TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to + const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; - const participants = transport - ? managerData.getParticipantForTransport(transport) - : []; - const participant = participants.find( - (p) => p.identity == participantId, - ); - const member = getRoomMemberFromRtcMember( - membership, - matrixRoom, - )?.member; - const connection = transport - ? managerData.getConnectionForTransport(transport) - : undefined; - const displayName = displaynames.get(participantId); - return { - participant, - membership, - connection, - // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. - member: member as RoomMember, - displayName, - mxcAvatarUrl: member?.getMxcAvatarUrl(), - participantId, - }; - }, - ); - return items; - }), + const participants = transport + ? managerData.getParticipantForTransport(transport) + : []; + const participant = participants.find( + (p) => p.identity == participantId, + ); + const member = getRoomMemberFromRtcMember( + membership, + matrixRoom, + )?.member; + const connection = transport + ? managerData.getConnectionForTransport(transport) + : undefined; + const displayName = displaynames.get(participantId); + return { + participant, + membership, + connection, + // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. + member: member as RoomMember, + displayName, + mxcAvatarUrl: member?.getMxcAvatarUrl(), + participantId, + }; + }, + ); + return new Epoch(items, epoch); + }, + ), ), + // new Epoch([]), ); } diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index e735147d..8f2b3f64 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -16,14 +16,15 @@ import { import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; +// eslint-disable-next-line rxjs/no-internal +import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; -import { type ObservableScope } from "../ObservableScope"; +import { Epoch, type ObservableScope } from "../ObservableScope"; import { calculateDisplayName, shouldDisambiguate, } from "../../utils/displayname"; import { type Behavior } from "../Behavior"; -import type { NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEvent.ts"; /** * Displayname for each member of the call. This will disambiguate @@ -36,8 +37,8 @@ import type { NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEve export const memberDisplaynames$ = ( scope: ObservableScope, matrixRoom: Pick & NodeStyleEventEmitter, - memberships$: Observable, -): Behavior> => + memberships$: Observable>, +): Behavior>> => scope.behavior( combineLatest([ // Handle call membership changes @@ -46,7 +47,8 @@ export const memberDisplaynames$ = ( fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), ]).pipe( - map(([memberships, _displayNames]) => { + map(([epochMemberships, _displayNames]) => { + const { epoch, value: memberships } = epochMemberships; const displaynameMap = new Map(); const room = matrixRoom; @@ -68,10 +70,10 @@ export const memberDisplaynames$ = ( calculateDisplayName(member, disambiguate), ); } - return displaynameMap; + return new Epoch(displaynameMap, epoch); }), ), - new Map(), + new Epoch(new Map()), ); export function getRoomMemberFromRtcMember( diff --git a/src/state/remoteMembers/integration.test.ts b/src/state/remoteMembers/integration.test.ts index 14085568..9ce4cf33 100644 --- a/src/state/remoteMembers/integration.test.ts +++ b/src/state/remoteMembers/integration.test.ts @@ -14,7 +14,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { logger } from "matrix-js-sdk/lib/logger"; -import { ObservableScope } from "../ObservableScope.ts"; +import { type Epoch, ObservableScope, trackEpoch } from "../ObservableScope.ts"; import { ECConnectionFactory } from "./ConnectionFactory.ts"; import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; import { @@ -107,25 +107,20 @@ afterEach(() => { }); test("bob, carl, then bob joining no tracks yet", () => { - withTestScheduler(({ expectObservable, behavior }) => { + withTestScheduler(({ expectObservable, behavior, scope }) => { const bobMembership = mockCallMembership("@bob:example.com", "BDEV000"); const carlMembership = mockCallMembership("@carl:example.com", "CDEV000"); const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000"); - // We add the `---` because there is a limitation in rxjs marbles https://github.com/ReactiveX/rxjs/issues/5677 - // Because we several values emitted at the same frame, so we use the grouping format - // e.g. a(bc) to indicate that b and c are emitted at the same time. But rxjs marbles advance the - // time by the number of characters in the marble diagram, so we need to add some padding to avoid so that - // the next emission is testable - // ab---c--- - // a(bc)(de) - const eMarble = "ab----c----"; - const vMarble = "a(xxb)(xxc)"; - const memberships$ = behavior(eMarble, { - a: [bobMembership], - b: [bobMembership, carlMembership], - c: [bobMembership, carlMembership, daveMembership], - }); + const eMarble = "abc"; + const vMarble = "abc"; + const memberships$ = scope.behavior( + behavior(eMarble, { + a: [bobMembership], + b: [bobMembership, carlMembership], + c: [bobMembership, carlMembership, daveMembership], + }).pipe(trackEpoch()), + ); const membershipsAndTransports = membershipsAndTransports$( testScope, @@ -147,7 +142,8 @@ test("bob, carl, then bob joining no tracks yet", () => { }); expectObservable(matrixLivekitItems$).toBe(vMarble, { - a: expect.toSatisfy((items: MatrixLivekitMember[]) => { + a: expect.toSatisfy((e: Epoch) => { + const items = e.value; expect(items.length).toBe(1); const item = items[0]!; expect(item.membership).toStrictEqual(bobMembership); @@ -160,7 +156,8 @@ test("bob, carl, then bob joining no tracks yet", () => { expect(item.participant).toBeUndefined(); return true; }), - b: expect.toSatisfy((items: MatrixLivekitMember[]) => { + b: expect.toSatisfy((e: Epoch) => { + const items = e.value; expect(items.length).toBe(2); { @@ -185,7 +182,8 @@ test("bob, carl, then bob joining no tracks yet", () => { } return true; }), - c: expect.toSatisfy((items: MatrixLivekitMember[]) => { + c: expect.toSatisfy((e: Epoch) => { + const items = e.value; logger.info(`E Items length: ${items.length}`); expect(items.length).toBe(3); {