diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index add8154a..41582039 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -88,6 +88,7 @@ import { ReactionsOverlay } from "./ReactionsOverlay"; import { CallEventAudioRenderer } from "./CallEventAudioRenderer"; import { debugTileLayout as debugTileLayoutSetting, + matrixRTCMode as matrixRTCModeSetting, useSetting, } from "../settings/settings"; import { ReactionsReader } from "../reactions/ReactionsReader"; @@ -144,6 +145,7 @@ export const ActiveCall: FC = (props) => { encryptionSystem: props.e2eeSystem, autoLeaveWhenOthersLeft, waitForCallPickup: waitForCallPickup && sendNotificationType === "ring", + matrixRTCMode$: matrixRTCModeSetting.value$, }, reactionsReader.raisedHands$, reactionsReader.reactions$, diff --git a/src/state/CallViewModel/CallViewModel.test.ts b/src/state/CallViewModel/CallViewModel.test.ts index 2e5b5700..86cde12a 100644 --- a/src/state/CallViewModel/CallViewModel.test.ts +++ b/src/state/CallViewModel/CallViewModel.test.ts @@ -60,7 +60,8 @@ import { import { MediaDevices } from "../MediaDevices.ts"; import { getValue } from "../../utils/observable.ts"; import { type Behavior, constant } from "../Behavior.ts"; -import { withCallViewModel } from "./CallViewModelTestUtils.ts"; +import { withCallViewModel as withCallViewModelInMode } from "./CallViewModelTestUtils.ts"; +import { MatrixRTCMode } from "../../settings/settings.ts"; vi.mock("rxjs", async (importOriginal) => ({ ...(await importOriginal()), @@ -229,7 +230,13 @@ function mockRingEvent( // need a value to fill in for them when emitting notifications const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent; -describe("CallViewModel", () => { +describe.each([ + [MatrixRTCMode.Legacy], + [MatrixRTCMode.Compatibil], + [MatrixRTCMode.Matrix_2_0], +])("CallViewModel (%s mode)", (mode) => { + const withCallViewModel = withCallViewModelInMode(mode); + test("participants are retained during a focus switch", () => { withTestScheduler(({ behavior, expectObservable }) => { // Participants disappear on frame 2 and come back on frame 3 diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index aac88a3b..8f8b20cf 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -53,11 +53,15 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "../MediaViewModel"; -import { accumulate, generateItems, pauseWhen } from "../../utils/observable"; +import { + accumulate, + filterBehavior, + generateItems, + pauseWhen, +} from "../../utils/observable"; import { duplicateTiles, MatrixRTCMode, - matrixRTCMode, playReactionsSound, showReactions, } from "../../settings/settings"; @@ -111,7 +115,8 @@ import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts"; import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts"; import { createMatrixLivekitMembers$, - type MatrixLivekitMember, + type TaggedParticipant, + type LocalMatrixLivekitMember, } from "./remoteMembers/MatrixLivekitMembers.ts"; import { type AutoLeaveReason, @@ -150,6 +155,8 @@ export interface CallViewModelOptions { connectionState$?: Behavior; /** Optional behavior overriding the computed window size, mainly for testing purposes. */ windowSize$?: Behavior<{ width: number; height: number }>; + /** The version & compatibility mode of MatrixRTC that we should use. */ + matrixRTCMode$: Behavior; } // Do not play any sounds if the participant count has exceeded this @@ -406,7 +413,7 @@ export function createCallViewModel$( client, roomId: matrixRoom.roomId, useOldestMember$: scope.behavior( - matrixRTCMode.value$.pipe(map((v) => v === MatrixRTCMode.Legacy)), + options.matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)), ), }); @@ -458,7 +465,7 @@ export function createCallViewModel$( }); const connectOptions$ = scope.behavior( - matrixRTCMode.value$.pipe( + options.matrixRTCMode$.pipe( map((mode) => ({ encryptMedia: livekitKeyProvider !== undefined, // TODO. This might need to get called again on each change of matrixRTCMode... @@ -512,22 +519,21 @@ export function createCallViewModel$( ), ); - const localMatrixLivekitMemberUninitialized = { - membership$: localRtcMembership$, - participant$: localMembership.participant$, - connection$: localMembership.connection$, - userId: userId, - }; - - const localMatrixLivekitMember$: Behavior = + const localMatrixLivekitMember$: Behavior = scope.behavior( localRtcMembership$.pipe( - switchMap((membership) => { - if (!membership) return of(null); - return of( - // casting is save here since we know that localRtcMembership$ is !== null since we reached this case. - localMatrixLivekitMemberUninitialized as MatrixLivekitMember, - ); + filterBehavior((membership) => membership !== null), + map((membership$) => { + if (membership$ === null) return null; + return { + membership$, + participant: { + type: "local" as const, + value$: localMembership.participant$, + }, + connection$: localMembership.connection$, + userId, + }; }), ), ); @@ -596,7 +602,7 @@ export function createCallViewModel$( const members = membersWithEpoch.value; const a$ = combineLatest( members.map((member) => - combineLatest([member.connection$, member.participant$]).pipe( + combineLatest([member.connection$, member.participant.value$]).pipe( map(([connection, participant]) => { // do not render audio for local participant if (!connection || !participant || participant.isLocal) @@ -674,7 +680,7 @@ export function createCallViewModel$( let localParticipantId: string | undefined = undefined; // add local member if available if (localMatrixLivekitMember) { - const { userId, participant$, connection$, membership$ } = + const { userId, participant, connection$, membership$ } = localMatrixLivekitMember; localParticipantId = `${userId}:${membership$.value.deviceId}`; // should be membership$.value.membershipID which is not optional // const participantId = membership$.value.membershipID; @@ -685,7 +691,7 @@ export function createCallViewModel$( dup, localParticipantId, userId, - participant$, + participant satisfies TaggedParticipant as TaggedParticipant, // Widen the type safely connection$, ], data: undefined, @@ -696,7 +702,7 @@ export function createCallViewModel$( // add remote members that are available for (const { userId, - participant$, + participant, connection$, membership$, } of matrixLivekitMembers) { @@ -705,7 +711,7 @@ export function createCallViewModel$( // const participantId = membership$.value?.identity; for (let dup = 0; dup < 1 + duplicateTiles; dup++) { yield { - keys: [dup, participantId, userId, participant$, connection$], + keys: [dup, participantId, userId, participant, connection$], data: undefined, }; } @@ -717,7 +723,7 @@ export function createCallViewModel$( dup, participantId, userId, - participant$, + participant, connection$, ) => { const livekitRoom$ = scope.behavior( @@ -736,7 +742,7 @@ export function createCallViewModel$( scope, `${participantId}:${dup}`, userId, - participant$, + participant, options.encryptionSystem, livekitRoom$, focusUrl$, diff --git a/src/state/CallViewModel/CallViewModelTestUtils.ts b/src/state/CallViewModel/CallViewModelTestUtils.ts index f80b4bcb..b6f53275 100644 --- a/src/state/CallViewModel/CallViewModelTestUtils.ts +++ b/src/state/CallViewModel/CallViewModelTestUtils.ts @@ -53,6 +53,7 @@ import { import { type Behavior, constant } from "../Behavior"; import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type MediaDevices } from "../MediaDevices"; +import { type MatrixRTCMode } from "../../settings/settings"; mockConfig({ livekit: { livekit_service_url: "http://my-default-service-url.com" }, @@ -80,117 +81,125 @@ export interface CallViewModelInputs { const localParticipant = mockLocalParticipant({ identity: "" }); -export function withCallViewModel( - { - remoteParticipants$ = constant([]), - rtcMembers$ = constant([localRtcMember]), - livekitConnectionState$: connectionState$ = constant( - ConnectionState.Connected, - ), - speaking = new Map(), - mediaDevices = mockMediaDevices({}), - initialSyncState = SyncState.Syncing, - windowSize$ = constant({ width: 1000, height: 800 }), - }: Partial = {}, - continuation: ( - vm: CallViewModel, - rtcSession: MockRTCSession, - subjects: { raisedHands$: BehaviorSubject> }, - setSyncState: (value: SyncState) => void, - ) => void, - options: CallViewModelOptions = { - encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, - autoLeaveWhenOthersLeft: false, - }, -): void { - let syncState = initialSyncState; - const setSyncState = (value: SyncState): void => { - const prev = syncState; - syncState = value; - room.client.emit(ClientEvent.Sync, value, prev); - }; - const room = mockMatrixRoom({ - client: new (class extends EventEmitter { - public getUserId(): string | undefined { - return localRtcMember.userId; - } +export function withCallViewModel(mode: MatrixRTCMode) { + return ( + { + remoteParticipants$ = constant([]), + rtcMembers$ = constant([localRtcMember]), + livekitConnectionState$: connectionState$ = constant( + ConnectionState.Connected, + ), + speaking = new Map(), + mediaDevices = mockMediaDevices({}), + initialSyncState = SyncState.Syncing, + windowSize$ = constant({ width: 1000, height: 800 }), + }: Partial = {}, + continuation: ( + vm: CallViewModel, + rtcSession: MockRTCSession, + subjects: { + raisedHands$: BehaviorSubject>; + }, + setSyncState: (value: SyncState) => void, + ) => void, + options: Partial = {}, + ): void => { + let syncState = initialSyncState; + const setSyncState = (value: SyncState): void => { + const prev = syncState; + syncState = value; + room.client.emit(ClientEvent.Sync, value, prev); + }; + const room = mockMatrixRoom({ + client: new (class extends EventEmitter { + public getUserId(): string | undefined { + return localRtcMember.userId; + } - public getDeviceId(): string { - return localRtcMember.deviceId; - } + public getDeviceId(): string { + return localRtcMember.deviceId; + } - public getDomain(): string { - return "example.com"; - } + public getDomain(): string { + return "example.com"; + } - public getSyncState(): SyncState { - return syncState; - } - })() as Partial as MatrixClient, - getMembers: () => Array.from(roomMembers.values()), - getMembersWithMembership: () => Array.from(roomMembers.values()), - }); - const rtcSession = new MockRTCSession(room, []).withMemberships(rtcMembers$); - const participantsSpy = vi - .spyOn(ComponentsCore, "connectedParticipantsObserver") - .mockReturnValue(remoteParticipants$); - const mediaSpy = vi - .spyOn(ComponentsCore, "observeParticipantMedia") - .mockImplementation((p) => - of({ participant: p } as Partial< - ComponentsCore.ParticipantMedia - > as ComponentsCore.ParticipantMedia), + public getSyncState(): SyncState { + return syncState; + } + })() as Partial as MatrixClient, + getMembers: () => Array.from(roomMembers.values()), + getMembersWithMembership: () => Array.from(roomMembers.values()), + }); + const rtcSession = new MockRTCSession(room, []).withMemberships( + rtcMembers$, ); - const eventsSpy = vi - .spyOn(ComponentsCore, "observeParticipantEvents") - .mockImplementation((p, ...eventTypes) => { - if (eventTypes.includes(ParticipantEvent.IsSpeakingChanged)) { - return (speaking.get(p) ?? of(false)).pipe( - map((s): Participant => ({ ...p, isSpeaking: s }) as Participant), - ); - } else { - return of(p); - } + const participantsSpy = vi + .spyOn(ComponentsCore, "connectedParticipantsObserver") + .mockReturnValue(remoteParticipants$); + const mediaSpy = vi + .spyOn(ComponentsCore, "observeParticipantMedia") + .mockImplementation((p) => + of({ participant: p } as Partial< + ComponentsCore.ParticipantMedia + > as ComponentsCore.ParticipantMedia), + ); + const eventsSpy = vi + .spyOn(ComponentsCore, "observeParticipantEvents") + .mockImplementation((p, ...eventTypes) => { + if (eventTypes.includes(ParticipantEvent.IsSpeakingChanged)) { + return (speaking.get(p) ?? of(false)).pipe( + map((s): Participant => ({ ...p, isSpeaking: s }) as Participant), + ); + } else { + return of(p); + } + }); + + const roomEventSelectorSpy = vi + .spyOn(ComponentsCore, "roomEventSelector") + .mockImplementation((_room, _eventType) => of()); + const muteStates = mockMuteStates(); + const raisedHands$ = new BehaviorSubject>( + {}, + ); + const reactions$ = new BehaviorSubject>({}); + + const vm = createCallViewModel$( + testScope(), + rtcSession.asMockedSession(), + room, + mediaDevices, + muteStates, + { + encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, + autoLeaveWhenOthersLeft: false, + livekitRoomFactory: (): LivekitRoom => + mockLivekitRoom({ + localParticipant, + disconnect: async () => Promise.resolve(), + setE2EEEnabled: async () => Promise.resolve(), + }), + connectionState$, + windowSize$, + matrixRTCMode$: constant(mode), + ...options, + }, + raisedHands$, + reactions$, + new BehaviorSubject({ + processor: undefined, + supported: undefined, + }), + ); + + onTestFinished(() => { + participantsSpy.mockRestore(); + mediaSpy.mockRestore(); + eventsSpy.mockRestore(); + roomEventSelectorSpy.mockRestore(); }); - const roomEventSelectorSpy = vi - .spyOn(ComponentsCore, "roomEventSelector") - .mockImplementation((_room, _eventType) => of()); - const muteStates = mockMuteStates(); - const raisedHands$ = new BehaviorSubject>({}); - const reactions$ = new BehaviorSubject>({}); - - const vm = createCallViewModel$( - testScope(), - rtcSession.asMockedSession(), - room, - mediaDevices, - muteStates, - { - ...options, - livekitRoomFactory: (): LivekitRoom => - mockLivekitRoom({ - localParticipant, - disconnect: async () => Promise.resolve(), - setE2EEEnabled: async () => Promise.resolve(), - }), - connectionState$, - windowSize$, - }, - raisedHands$, - reactions$, - new BehaviorSubject({ - processor: undefined, - supported: undefined, - }), - ); - - onTestFinished(() => { - participantsSpy.mockRestore(); - mediaSpy.mockRestore(); - eventsSpy.mockRestore(); - roomEventSelectorSpy.mockRestore(); - }); - - continuation(vm, rtcSession, { raisedHands$: raisedHands$ }, setSyncState); + continuation(vm, rtcSession, { raisedHands$: raisedHands$ }, setSyncState); + }; } diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 8f9471d0..4318708e 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -32,7 +32,6 @@ import { Connection, ConnectionState, type ConnectionOpts, - type PublishingParticipant, } from "./Connection.ts"; import { ObservableScope } from "../../ObservableScope.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; @@ -395,7 +394,7 @@ describe("Publishing participants observations", () => { const bobIsAPublisher = Promise.withResolvers(); const danIsAPublisher = Promise.withResolvers(); - const observedPublishers: PublishingParticipant[][] = []; + const observedPublishers: RemoteParticipant[][] = []; const s = connection.remoteParticipantsWithTracks$.subscribe( (publishers) => { observedPublishers.push(publishers); @@ -408,7 +407,7 @@ describe("Publishing participants observations", () => { }, ); onTestFinished(() => s.unsubscribe()); - // The publishingParticipants$ observable is derived from the current members of the + // The remoteParticipants$ observable is derived from the current members of the // livekitRoom and the rtc membership in order to publish the members that are publishing // on this connection. @@ -450,7 +449,7 @@ describe("Publishing participants observations", () => { const connection = setupRemoteConnection(); - let observedPublishers: PublishingParticipant[][] = []; + let observedPublishers: RemoteParticipant[][] = []; const s = connection.remoteParticipantsWithTracks$.subscribe( (publishers) => { observedPublishers.push(publishers); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 8b4479e8..3d685c8a 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -13,7 +13,6 @@ import { import { ConnectionError, type Room as LivekitRoom, - type LocalParticipant, type RemoteParticipant, RoomEvent, } from "livekit-client"; @@ -35,8 +34,6 @@ import { UnknownCallError, } from "../../../utils/errors.ts"; -export type PublishingParticipant = LocalParticipant | RemoteParticipant; - export interface ConnectionOpts { /** The media transport to connect to. */ transport: LivekitTransport; @@ -103,9 +100,7 @@ export class Connection { * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ - public readonly remoteParticipantsWithTracks$: Behavior< - PublishingParticipant[] - >; + public readonly remoteParticipantsWithTracks$: Behavior; /** * Whether the connection has been stopped. diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index 484a44e7..da1da06f 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { BehaviorSubject } from "rxjs"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { type Participant as LivekitParticipant } from "livekit-client"; +import { type RemoteParticipant } from "livekit-client"; import { logger } from "matrix-js-sdk/lib/logger"; import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts"; @@ -201,23 +201,20 @@ describe("connections$ stream", () => { describe("connectionManagerData$ stream", () => { // Used in test to control fake connections' remoteParticipantsWithTracks$ streams - let fakePublishingParticipantsStreams: Map< - string, - Behavior - >; + let fakeRemoteParticipantsStreams: Map>; function keyForTransport(transport: LivekitTransport): string { return `${transport.livekit_service_url}|${transport.livekit_alias}`; } beforeEach(() => { - fakePublishingParticipantsStreams = new Map(); + fakeRemoteParticipantsStreams = new Map(); - function getPublishingParticipantsFor( + function getRemoteParticipantsFor( transport: LivekitTransport, - ): Behavior { + ): Behavior { return ( - fakePublishingParticipantsStreams.get(keyForTransport(transport)) ?? + fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ?? new BehaviorSubject([]) ); } @@ -227,13 +224,12 @@ describe("connectionManagerData$ stream", () => { .fn() .mockImplementation( (transport: LivekitTransport, scope: ObservableScope) => { - const fakePublishingParticipants$ = new BehaviorSubject< - LivekitParticipant[] + const fakeRemoteParticipants$ = new BehaviorSubject< + RemoteParticipant[] >([]); const mockConnection = { transport, - remoteParticipantsWithTracks$: - getPublishingParticipantsFor(transport), + remoteParticipantsWithTracks$: getRemoteParticipantsFor(transport), } as unknown as Connection; vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).stop = vi.fn(); @@ -242,36 +238,36 @@ describe("connectionManagerData$ stream", () => { void mockConnection.stop(); }); - fakePublishingParticipantsStreams.set( + fakeRemoteParticipantsStreams.set( keyForTransport(transport), - fakePublishingParticipants$, + fakeRemoteParticipants$, ); return mockConnection; }, ); }); - test("Should report connections with the publishing participants", () => { + test("Should report connections with the remote participants", () => { withTestScheduler(({ expectObservable, schedule, behavior }) => { // Setup the fake participants streams behavior // ============================== - fakePublishingParticipantsStreams.set( + fakeRemoteParticipantsStreams.set( keyForTransport(TRANSPORT_1), behavior("oa-b", { o: [], - a: [{ identity: "user1A" } as LivekitParticipant], + a: [{ identity: "user1A" } as RemoteParticipant], b: [ - { identity: "user1A" } as LivekitParticipant, - { identity: "user1B" } as LivekitParticipant, + { identity: "user1A" } as RemoteParticipant, + { identity: "user1B" } as RemoteParticipant, ], }), ); - fakePublishingParticipantsStreams.set( + fakeRemoteParticipantsStreams.set( keyForTransport(TRANSPORT_2), behavior("o-a", { o: [], - a: [{ identity: "user2A" } as LivekitParticipant], + a: [{ identity: "user2A" } as RemoteParticipant], }), ); // ============================== diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 755ba3dd..6c2d64e0 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -12,7 +12,7 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, of, switchMap, tap } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; -import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; +import { type RemoteParticipant } from "livekit-client"; import { type Behavior } from "../../Behavior.ts"; import { type Connection } from "./Connection.ts"; @@ -22,17 +22,12 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; export class ConnectionManagerData { - private readonly store: Map< - string, - [Connection, (LocalParticipant | RemoteParticipant)[]] - > = new Map(); + private readonly store: Map = + new Map(); public constructor() {} - public add( - connection: Connection, - participants: (LocalParticipant | RemoteParticipant)[], - ): void { + public add(connection: Connection, participants: RemoteParticipant[]): void { const key = this.getKey(connection.transport); const existing = this.store.get(key); if (!existing) { @@ -58,7 +53,7 @@ export class ConnectionManagerData { public getParticipantForTransport( transport: LivekitTransport, - ): (LocalParticipant | RemoteParticipant)[] { + ): RemoteParticipant[] { const key = transport.livekit_service_url + "|" + transport.livekit_alias; return this.store.get(key)?.[1] ?? []; } @@ -172,23 +167,24 @@ export function createConnectionManager$({ const epoch = connections.epoch; // Map the connections to list of {connection, participants}[] - const listOfConnectionsWithPublishingParticipants = - connections.value.map((connection) => { + const listOfConnectionsWithRemoteParticipants = connections.value.map( + (connection) => { return connection.remoteParticipantsWithTracks$.pipe( map((participants) => ({ connection, participants, })), ); - }); + }, + ); // probably not required - if (listOfConnectionsWithPublishingParticipants.length === 0) { + if (listOfConnectionsWithRemoteParticipants.length === 0) { return of(new Epoch(new ConnectionManagerData(), epoch)); } // combineLatest the several streams into a single stream with the ConnectionManagerData - return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( + return combineLatest(listOfConnectionsWithRemoteParticipants).pipe( map( (lists) => new Epoch( diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts index e675f723..77c00015 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.test.ts @@ -15,7 +15,7 @@ import { combineLatest, map, type Observable } from "rxjs"; import { type IConnectionManager } from "./ConnectionManager.ts"; import { - type MatrixLivekitMember, + type RemoteMatrixLivekitMember, createMatrixLivekitMembers$, } from "./MatrixLivekitMembers.ts"; import { @@ -100,12 +100,12 @@ test("should signal participant not yet connected to livekit", () => { }); expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expectObservable(data[0].membership$).toBe("a", { a: bobMembership, }); - expectObservable(data[0].participant$).toBe("a", { + expectObservable(data[0].participant.value$).toBe("a", { a: null, }); expectObservable(data[0].connection$).toBe("a", { @@ -180,12 +180,12 @@ test("should signal participant on a connection that is publishing", () => { }); expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expectObservable(data[0].membership$).toBe("a", { a: bobMembership, }); - expectObservable(data[0].participant$).toBe("a", { + expectObservable(data[0].participant.value$).toBe("a", { a: expect.toSatisfy((participant) => { expect(participant).toBeDefined(); expect(participant!.identity).toEqual(bobParticipantId); @@ -231,12 +231,12 @@ test("should signal participant on a connection that is not publishing", () => { }); expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(1); expectObservable(data[0].membership$).toBe("a", { a: bobMembership, }); - expectObservable(data[0].participant$).toBe("a", { + expectObservable(data[0].participant.value$).toBe("a", { a: null, }); expectObservable(data[0].connection$).toBe("a", { @@ -296,7 +296,7 @@ describe("Publication edge case", () => { expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( "a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(2); expectObservable(data[0].membership$).toBe("a", { a: bobMembership, @@ -305,7 +305,7 @@ describe("Publication edge case", () => { // The real connection should be from transportA as per the membership a: connectionA, }); - expectObservable(data[0].participant$).toBe("a", { + expectObservable(data[0].participant.value$).toBe("a", { a: expect.toSatisfy((participant) => { expect(participant).toBeDefined(); expect(participant!.identity).toEqual(bobParticipantId); @@ -362,7 +362,7 @@ describe("Publication edge case", () => { expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( "a", { - a: expect.toSatisfy((data: MatrixLivekitMember[]) => { + a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => { expect(data.length).toEqual(2); expectObservable(data[0].membership$).toBe("a", { a: bobMembership, @@ -371,7 +371,7 @@ describe("Publication edge case", () => { // The real connection should be from transportA as per the membership a: connectionA, }); - expectObservable(data[0].participant$).toBe("a", { + expectObservable(data[0].participant.value$).toBe("a", { // No participant as Bob is not publishing on his membership transport a: null, }); diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 79ad933c..4cca0d5b 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -5,10 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - type LocalParticipant as LocalLivekitParticipant, - type RemoteParticipant as RemoteLivekitParticipant, -} from "livekit-client"; +import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type LivekitTransport, type CallMembership, @@ -24,22 +21,44 @@ import { generateItemsWithEpoch } from "../../../utils/observable"; const logger = rootLogger.getChild("[MatrixLivekitMembers]"); -/** - * Represents a Matrix call member and their associated LiveKit participation. - * `livekitParticipant` can be undefined if the member is not yet connected to the livekit room - * or if it has no livekit transport at all. - */ -export interface MatrixLivekitMember { +interface LocalTaggedParticipant { + type: "local"; + value$: Behavior; +} +interface RemoteTaggedParticipant { + type: "remote"; + value$: Behavior; +} +export type TaggedParticipant = + | LocalTaggedParticipant + | RemoteTaggedParticipant; + +interface MatrixLivekitMember { membership$: Behavior; - participant$: Behavior< - LocalLivekitParticipant | RemoteLivekitParticipant | null - >; connection$: Behavior; // participantId: string; We do not want a participantId here since it will be generated by the jwt // TODO decide if we can also drop the userId. Its in the matrix membership anyways. userId: string; } +/** + * Represents the local Matrix call member and their associated LiveKit participation. + * `livekitParticipant` can be null if the member is not yet connected to the livekit room + * or if it has no livekit transport at all. + */ +export interface LocalMatrixLivekitMember extends MatrixLivekitMember { + participant: LocalTaggedParticipant; +} + +/** + * Represents a remote Matrix call member and their associated LiveKit participation. + * `livekitParticipant` can be null if the member is not yet connected to the livekit room + * or if it has no livekit transport at all. + */ +export interface RemoteMatrixLivekitMember extends MatrixLivekitMember { + participant: RemoteTaggedParticipant; +} + interface Props { scope: ObservableScope; membershipsWithTransport$: Behavior< @@ -61,7 +80,7 @@ export function createMatrixLivekitMembers$({ scope, membershipsWithTransport$, connectionManager, -}: Props): Behavior> { +}: Props): Behavior> { /** * Stream of all the call members and their associated livekit data (if available). */ @@ -110,12 +129,14 @@ export function createMatrixLivekitMembers$({ logger.debug( `Generating member for participantId: ${participantId}, userId: ${userId}`, ); + const { participant$, ...rest } = scope.splitBehavior(data$); // will only get called once per `participantId, userId` pair. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. return { participantId, userId, - ...scope.splitBehavior(data$), + participant: { type: "remote" as const, value$: participant$ }, + ...rest, }; }, ), diff --git a/src/state/CallViewModel/remoteMembers/integration.test.ts b/src/state/CallViewModel/remoteMembers/integration.test.ts index e3aa6be8..2c3591a5 100644 --- a/src/state/CallViewModel/remoteMembers/integration.test.ts +++ b/src/state/CallViewModel/remoteMembers/integration.test.ts @@ -29,7 +29,7 @@ import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx" import { areLivekitTransportsEqual, createMatrixLivekitMembers$, - type MatrixLivekitMember, + type RemoteMatrixLivekitMember, } from "./MatrixLivekitMembers.ts"; import { createConnectionManager$ } from "./ConnectionManager.ts"; import { membershipsAndTransports$ } from "../../SessionBehaviors.ts"; @@ -132,7 +132,7 @@ test("bob, carl, then bob joining no tracks yet", () => { }); expectObservable(matrixLivekitItems$).toBe(vMarble, { - a: expect.toSatisfy((e: Epoch) => { + a: expect.toSatisfy((e: Epoch) => { const items = e.value; expect(items.length).toBe(1); const item = items[0]!; @@ -147,12 +147,12 @@ test("bob, carl, then bob joining no tracks yet", () => { ), ), }); - expectObservable(item.participant$).toBe("a", { + expectObservable(item.participant.value$).toBe("a", { a: null, }); return true; }), - b: expect.toSatisfy((e: Epoch) => { + b: expect.toSatisfy((e: Epoch) => { const items = e.value; expect(items.length).toBe(2); @@ -161,7 +161,7 @@ test("bob, carl, then bob joining no tracks yet", () => { expectObservable(item.membership$).toBe("a", { a: bobMembership, }); - expectObservable(item.participant$).toBe("a", { + expectObservable(item.participant.value$).toBe("a", { a: null, }); } @@ -172,7 +172,7 @@ test("bob, carl, then bob joining no tracks yet", () => { expectObservable(item.membership$).toBe("a", { a: carlMembership, }); - expectObservable(item.participant$).toBe("a", { + expectObservable(item.participant.value$).toBe("a", { a: null, }); expectObservable(item.connection$).toBe("a", { @@ -189,7 +189,7 @@ test("bob, carl, then bob joining no tracks yet", () => { } return true; }), - c: expect.toSatisfy((e: Epoch) => { + c: expect.toSatisfy((e: Epoch) => { const items = e.value; expect(items.length).toBe(3); @@ -216,7 +216,7 @@ test("bob, carl, then bob joining no tracks yet", () => { return true; }), }); - expectObservable(item.participant$).toBe("a", { + expectObservable(item.participant.value$).toBe("a", { a: null, }); } diff --git a/src/state/CallViewModelWidget.test.ts b/src/state/CallViewModelWidget.test.ts index afcf69ba..5d6442f1 100644 --- a/src/state/CallViewModelWidget.test.ts +++ b/src/state/CallViewModelWidget.test.ts @@ -15,6 +15,7 @@ import { constant } from "./Behavior.ts"; import { aliceParticipant, localRtcMember } from "../utils/test-fixtures.ts"; import { ElementWidgetActions, widget } from "../widget.ts"; import { E2eeType } from "../e2ee/e2eeType.ts"; +import { MatrixRTCMode } from "../settings/settings.ts"; vi.mock("@livekit/components-core", { spy: true }); @@ -34,36 +35,43 @@ vi.mock("../widget", () => ({ }, })); -it("expect leave when ElementWidgetActions.HangupCall is called", async () => { - const pr = Promise.withResolvers(); - withCallViewModel( - { - remoteParticipants$: constant([aliceParticipant]), - rtcMembers$: constant([localRtcMember]), - }, - (vm: CallViewModel) => { - vm.leave$.subscribe((s: string) => { - pr.resolve(s); - }); +it.each([ + [MatrixRTCMode.Legacy], + [MatrixRTCMode.Compatibil], + [MatrixRTCMode.Matrix_2_0], +])( + "expect leave when ElementWidgetActions.HangupCall is called (%s mode)", + async (mode) => { + const pr = Promise.withResolvers(); + withCallViewModel(mode)( + { + remoteParticipants$: constant([aliceParticipant]), + rtcMembers$: constant([localRtcMember]), + }, + (vm: CallViewModel) => { + vm.leave$.subscribe((s: string) => { + pr.resolve(s); + }); - widget!.lazyActions!.emit( - ElementWidgetActions.HangupCall, - new CustomEvent(ElementWidgetActions.HangupCall, { - detail: { - action: "im.vector.hangup", - api: "toWidget", - data: {}, - requestId: "widgetapi-1761237395918", - widgetId: "mrUjS9T6uKUOWHMxXvLbSv0F", - }, - }), - ); - }, - { - encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, - }, - ); + widget!.lazyActions!.emit( + ElementWidgetActions.HangupCall, + new CustomEvent(ElementWidgetActions.HangupCall, { + detail: { + action: "im.vector.hangup", + api: "toWidget", + data: {}, + requestId: "widgetapi-1761237395918", + widgetId: "mrUjS9T6uKUOWHMxXvLbSv0F", + }, + }), + ); + }, + { + encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, + }, + ); - const source = await pr.promise; - expect(source).toBe("user"); -}); + const source = await pr.promise; + expect(source).toBe("user"); + }, +); diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 38f22122..690870e6 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -27,6 +27,7 @@ import type { ReactionOption } from "../reactions"; import { observeSpeaker$ } from "./observeSpeaker.ts"; import { generateItems } from "../utils/observable.ts"; import { ScreenShare } from "./ScreenShare.ts"; +import { type TaggedParticipant } from "./CallViewModel/remoteMembers/MatrixLivekitMembers.ts"; /** * Sorting bins defining the order in which media tiles appear in the layout. @@ -68,40 +69,46 @@ enum SortingBin { * for inclusion in the call layout and tracks associated screen shares. */ export class UserMedia { - public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal - ? new LocalUserMediaViewModel( - this.scope, - this.id, - this.userId, - this.participant$ as Behavior, - this.encryptionSystem, - this.livekitRoom$, - this.focusUrl$, - this.mediaDevices, - this.displayName$, - this.mxcAvatarUrl$, - this.scope.behavior(this.handRaised$), - this.scope.behavior(this.reaction$), - ) - : new RemoteUserMediaViewModel( - this.scope, - this.id, - this.userId, - this.participant$ as Behavior, - this.encryptionSystem, - this.livekitRoom$, - this.focusUrl$, - this.pretendToBeDisconnected$, - this.displayName$, - this.mxcAvatarUrl$, - this.scope.behavior(this.handRaised$), - this.scope.behavior(this.reaction$), - ); + public readonly vm: UserMediaViewModel = + this.participant.type === "local" + ? new LocalUserMediaViewModel( + this.scope, + this.id, + this.userId, + this.participant.value$, + this.encryptionSystem, + this.livekitRoom$, + this.focusUrl$, + this.mediaDevices, + this.displayName$, + this.mxcAvatarUrl$, + this.scope.behavior(this.handRaised$), + this.scope.behavior(this.reaction$), + ) + : new RemoteUserMediaViewModel( + this.scope, + this.id, + this.userId, + this.participant.value$, + this.encryptionSystem, + this.livekitRoom$, + this.focusUrl$, + this.pretendToBeDisconnected$, + this.displayName$, + this.mxcAvatarUrl$, + this.scope.behavior(this.handRaised$), + this.scope.behavior(this.reaction$), + ); private readonly speaker$ = this.scope.behavior( observeSpeaker$(this.vm.speaking$), ); + // TypeScript needs this widening of the type to happen in a separate statement + private readonly participant$: Behavior< + LocalParticipant | RemoteParticipant | null + > = this.participant.value$; + /** * All screen share media associated with this user media. */ @@ -184,9 +191,7 @@ export class UserMedia { private readonly scope: ObservableScope, public readonly id: string, private readonly userId: string, - private readonly participant$: Behavior< - LocalParticipant | RemoteParticipant | null - >, + private readonly participant: TaggedParticipant, private readonly encryptionSystem: EncryptionSystem, private readonly livekitRoom$: Behavior, private readonly focusUrl$: Behavior, diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index d1034e7b..be677367 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -5,11 +5,12 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { test } from "vitest"; -import { Subject } from "rxjs"; +import { expect, test } from "vitest"; +import { type Observable, of, Subject, switchMap } from "rxjs"; import { withTestScheduler } from "./test"; -import { generateItems, pauseWhen } from "./observable"; +import { filterBehavior, generateItems, pauseWhen } from "./observable"; +import { type Behavior } from "../state/Behavior"; test("pauseWhen", () => { withTestScheduler(({ behavior, expectObservable }) => { @@ -72,3 +73,31 @@ test("generateItems", () => { expectObservable(scope4$).toBe(scope4Marbles); }); }); + +test("filterBehavior", () => { + withTestScheduler(({ behavior, expectObservable }) => { + // Filtering the input should segment it into 2 modes of non-null behavior. + const inputMarbles = " abcxabx"; + const filteredMarbles = "a--xa-x"; + + const input$ = behavior(inputMarbles, { + a: "a", + b: "b", + c: "c", + x: null, + }); + const filtered$: Observable | null> = input$.pipe( + filterBehavior((value) => typeof value === "string"), + ); + + expectObservable(filtered$).toBe(filteredMarbles, { + a: expect.any(Object), + x: null, + }); + expectObservable( + filtered$.pipe( + switchMap((value$) => (value$ === null ? of(null) : value$)), + ), + ).toBe(inputMarbles, { a: "a", b: "b", c: "c", x: null }); + }); +}); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 053921cd..a6dafea3 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -22,6 +22,7 @@ import { withLatestFrom, BehaviorSubject, type OperatorFunction, + distinctUntilChanged, } from "rxjs"; import { type Behavior } from "../state/Behavior"; @@ -185,6 +186,28 @@ export function generateItemsWithEpoch< ); } +/** + * Segments a behavior into periods during which its value matches the filter + * (outputting a behavior with a narrowed type) and periods during which it does + * not match (outputting null). + */ +export function filterBehavior( + predicate: (value: T) => value is S, +): OperatorFunction | null> { + return (input$) => + input$.pipe( + scan | null>((acc$, input) => { + if (predicate(input)) { + const output$ = acc$ ?? new BehaviorSubject(input); + output$.next(input); + return output$; + } + return null; + }, null), + distinctUntilChanged(), + ); +} + function generateItemsInternal< Input, Keys extends [unknown, ...unknown[]], diff --git a/src/utils/test-viewmodel.ts b/src/utils/test-viewmodel.ts index 98c45d86..0745be72 100644 --- a/src/utils/test-viewmodel.ts +++ b/src/utils/test-viewmodel.ts @@ -37,6 +37,7 @@ import { import { aliceRtcMember, localRtcMember } from "./test-fixtures"; import { type RaisedHandInfo, type ReactionInfo } from "../reactions"; import { constant } from "../state/Behavior"; +import { MatrixRTCMode } from "../settings/settings"; mockConfig({ livekit: { livekit_service_url: "https://example.com" } }); @@ -162,6 +163,7 @@ export function getBasicCallViewModelEnvironment( setE2EEEnabled: async () => Promise.resolve(), }), connectionState$: constant(ConnectionState.Connected), + matrixRTCMode$: constant(MatrixRTCMode.Legacy), ...callViewModelOptions, }, handRaisedSubject$,