diff --git a/src/livekit/openIDSFU.ts b/src/livekit/openIDSFU.ts index 073f6c75..c9c26c83 100644 --- a/src/livekit/openIDSFU.ts +++ b/src/livekit/openIDSFU.ts @@ -21,7 +21,14 @@ export type OpenIDClientParts = Pick< MatrixClient, "getOpenIdToken" | "getDeviceId" >; - +/** + * + * @param client + * @param serviceUrl + * @param matrixRoomId + * @returns + * @throws FailToGetOpenIdToken + */ export async function getSFUConfigWithOpenID( client: OpenIDClientParts, serviceUrl: string, diff --git a/src/room/GroupCallView.tsx b/src/room/GroupCallView.tsx index 076667a9..75438f7f 100644 --- a/src/room/GroupCallView.tsx +++ b/src/room/GroupCallView.tsx @@ -159,6 +159,7 @@ export const GroupCallView: FC = ({ }; }, [rtcSession]); + // TODO move this into the callViewModel LocalMembership.ts useTypedEventEmitter( rtcSession, MatrixRTCSessionEvent.MembershipManagerError, diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 28293d29..b17d3aae 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -266,7 +266,7 @@ export const InCallView: FC = ({ const sharingScreen = useBehavior(vm.sharingScreen$); const ringOverlay = useBehavior(vm.ringOverlay$); - const fatalCallError = useBehavior(vm.configError$); + const fatalCallError = useBehavior(vm.fatalError$); // Stop the rendering and throw for the error boundary if (fatalCallError) throw fatalCallError; diff --git a/src/state/Behavior.ts b/src/state/Behavior.ts index 3c88dc00..53a826e1 100644 --- a/src/state/Behavior.ts +++ b/src/state/Behavior.ts @@ -16,7 +16,10 @@ import { BehaviorSubject } from "rxjs"; * distinction between Behaviors and Observables, see * https://monoid.dk/post/behaviors-and-streams-why-both/. */ -export type Behavior = Omit, "next" | "observers">; +export type Behavior = Omit< + BehaviorSubject, + "next" | "observers" | "error" +>; /** * Creates a Behavior which never changes in value. diff --git a/src/state/CallViewModel/CallViewModel.test.ts b/src/state/CallViewModel/CallViewModel.test.ts index 3a621f33..76be5f65 100644 --- a/src/state/CallViewModel/CallViewModel.test.ts +++ b/src/state/CallViewModel/CallViewModel.test.ts @@ -6,8 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { test, vi, onTestFinished, it, describe, expect } from "vitest"; -import EventEmitter from "events"; +import { test, vi, onTestFinished, it, describe } from "vitest"; import { BehaviorSubject, combineLatest, @@ -19,12 +18,11 @@ import { of, switchMap, } from "rxjs"; -import { SyncState, type MatrixClient } from "matrix-js-sdk"; +import { SyncState } from "matrix-js-sdk"; import { ConnectionState, type LocalTrackPublication, type RemoteParticipant, - type Room as LivekitRoom, } from "livekit-client"; import * as ComponentsCore from "@livekit/components-core"; import { @@ -36,27 +34,18 @@ import { type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import { createCallViewModel$ } from "./CallViewModel"; import { type Layout } from "../layout-types.ts"; import { mockLocalParticipant, - mockMatrixRoom, mockMatrixRoomMember, mockRemoteParticipant, withTestScheduler, mockRtcMembership, - MockRTCSession, - mockMediaDevices, - mockMuteStates, - mockConfig, testScope, - mockLivekitRoom, exampleTransport, } from "../../utils/test.ts"; import { E2eeType } from "../../e2ee/e2eeType.ts"; -import type { RaisedHandInfo, ReactionInfo } from "../../reactions/index.ts"; import { aliceId, aliceParticipant, @@ -71,10 +60,6 @@ import { import { MediaDevices } from "../MediaDevices.ts"; import { getValue } from "../../utils/observable.ts"; import { type Behavior, constant } from "../Behavior.ts"; -import { - type ElementCallError, - MatrixRTCTransportMissingError, -} from "../../utils/errors.ts"; import { withCallViewModel } from "./CallViewModelTestUtils.ts"; vi.mock("rxjs", async (importOriginal) => ({ @@ -245,71 +230,6 @@ function mockRingEvent( const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent; describe("CallViewModel", () => { - // TODO: Restore this test. It requires makeTransport to not be mocked, unlike - // the rest of the tests in this file… what do we do? - it.skip("test missing RTC config error", async () => { - const rtcMemberships$ = new BehaviorSubject([]); - const emitter = new EventEmitter(); - const client = vi.mocked({ - on: emitter.on.bind(emitter), - off: emitter.off.bind(emitter), - getSyncState: vi.fn().mockReturnValue(SyncState.Syncing), - getUserId: vi.fn().mockReturnValue("@user:localhost"), - getUser: vi.fn().mockReturnValue(null), - getDeviceId: vi.fn().mockReturnValue("DEVICE"), - credentials: { - userId: "@user:localhost", - }, - getCrypto: vi.fn().mockReturnValue(undefined), - getDomain: vi.fn().mockReturnValue("example.org"), - } as unknown as MatrixClient); - - const matrixRoom = mockMatrixRoom({ - roomId: "!myRoomId:example.com", - client, - getMember: vi.fn().mockReturnValue(undefined), - }); - - const fakeRtcSession = new MockRTCSession(matrixRoom).withMemberships( - rtcMemberships$, - ); - - mockConfig({}); - - vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({}); - - const callVM = createCallViewModel$( - testScope(), - fakeRtcSession.asMockedSession(), - matrixRoom, - mockMediaDevices({}), - mockMuteStates(), - { - encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, - autoLeaveWhenOthersLeft: false, - livekitRoomFactory: (): LivekitRoom => - mockLivekitRoom({ - localParticipant, - disconnect: async () => Promise.resolve(), - setE2EEEnabled: async () => Promise.resolve(), - }), - }, - new BehaviorSubject({} as Record), - new BehaviorSubject({} as Record), - constant({ processor: undefined, supported: false }), - ); - - const failPromise = Promise.withResolvers(); - callVM.configError$.subscribe((error) => { - if (error) { - failPromise.resolve(error); - } - }); - - const error = await failPromise.promise; - expect(error).toBeInstanceOf(MatrixRTCTransportMissingError); - }); - test("participants are retained during a focus switch", () => { withTestScheduler(({ behavior, expectObservable }) => { // Participants disappear on frame 2 and come back on frame 3 diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index eb270641..f8deddc3 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -41,7 +41,10 @@ import { timer, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; -import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; +import { + type LivekitTransport, + type MatrixRTCSession, +} from "matrix-js-sdk/lib/matrixrtc"; import { type IWidgetApiRequest } from "matrix-widget-api"; import { @@ -95,7 +98,10 @@ import { import { type ElementCallError } from "../../utils/errors.ts"; import { type ObservableScope } from "../ObservableScope.ts"; import { + createHomeserverConnected$, createLocalMembership$, + enterRTCSession, + LivekitState, type LocalMemberConnectionState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; @@ -120,6 +126,8 @@ import { createMatrixMemberMetadata$, createRoomMembers$, } from "./remoteMembers/MatrixMemberMetadata.ts"; +import { Publisher } from "./localMember/Publisher.ts"; +import { type Connection } from "./remoteMembers/Connection.ts"; const logger = rootLogger.getChild("[CallViewModel]"); //TODO @@ -230,7 +238,7 @@ export interface CallViewModel { * This is a fatal error that prevents the call from being created/joined. * Should render a blocking error screen. */ - configError$: Behavior; + fatalError$: Behavior; // participants and counts /** @@ -446,15 +454,31 @@ export function createCallViewModel$( const localMembership = createLocalMembership$({ scope: scope, + homeserverConnected$: createHomeserverConnected$( + scope, + matrixRoom, + matrixRTCSession, + ), muteStates: muteStates, - mediaDevices: mediaDevices, + joinMatrixRTC: async (transport: LivekitTransport) => { + return enterRTCSession( + matrixRTCSession, + transport, + connectOptions$.value, + ); + }, + createPublisherFactory: (connection: Connection) => { + return new Publisher( + scope, + connection, + mediaDevices, + muteStates, + trackProcessorState$, + ); + }, connectionManager: connectionManager, matrixRTCSession: matrixRTCSession, - matrixRoom: matrixRoom, localTransport$: localTransport$, - trackProcessorState$: trackProcessorState$, - widget, - options: connectOptions$, logger: logger.getChild(`[${Date.now()}]`), }); @@ -1442,7 +1466,14 @@ export function createCallViewModel$( hoverScreen: (): void => screenHover$.next(), unhoverScreen: (): void => screenUnhover$.next(), - configError$: localMembership.configError$, + fatalError$: scope.behavior( + localMembership.connectionState.livekit$.pipe( + filter((v) => v.state === LivekitState.Error), + map((s) => s.error), + ), + null, + ), + participantCount$: participantCount$, audioParticipants$: audioParticipants$, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 716740d3..390fa1a8 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -6,154 +6,234 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; -import { expect, test, vi } from "vitest"; +import { + type LivekitTransport, + type MatrixRTCSession, +} from "matrix-js-sdk/lib/matrixrtc"; +import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import EventEmitter from "events"; +import { map } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { MatrixRTCMode } from "../../../settings/settings"; -import { mockConfig } from "../../../utils/test"; -import { enterRTCSession } from "./LocalMembership"; +import { + mockConfig, + mockMuteStates, + withTestScheduler, +} from "../../../utils/test"; +import { + createLocalMembership$, + enterRTCSession, + LivekitState, +} from "./LocalMembership"; +import { MatrixRTCTransportMissingError } from "../../../utils/errors"; +import { Epoch } from "../../ObservableScope"; +import { constant } from "../../Behavior"; +import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; +import { type Publisher } from "./Publisher"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); vi.mock("../../../UrlParams", () => ({ getUrlParams })); -vi.mock("../../../widget", async (importOriginal) => ({ - ...(await importOriginal()), - widget: { - api: { - setAlwaysOnScreen: (): void => {}, - transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() }, - }, - lazyActions: new EventEmitter(), - }, -})); - -test("It joins the correct Session", async () => { - const focusFromOlderMembership = { - type: "livekit", - livekit_service_url: "http://my-oldest-member-service-url.com", - livekit_alias: "my-oldest-member-service-alias", - }; - - const focusConfigFromWellKnown = { - type: "livekit", - livekit_service_url: "http://my-well-known-service-url.com", - }; - const focusConfigFromWellKnown2 = { - type: "livekit", - livekit_service_url: "http://my-well-known-service-url2.com", - }; - const clientWellKnown = { - "org.matrix.msc4143.rtc_foci": [ - focusConfigFromWellKnown, - focusConfigFromWellKnown2, - ], - }; - - mockConfig({ - livekit: { livekit_service_url: "http://my-default-service-url.com" }, - }); - - vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation( - async (domain) => { - if (domain === "example.org") { - return Promise.resolve(clientWellKnown); - } - return Promise.resolve({}); - }, - ); - - const mockedSession = vi.mocked({ - room: { - roomId: "roomId", - client: { - getDomain: vi.fn().mockReturnValue("example.org"), - getOpenIdToken: vi.fn().mockResolvedValue({ - access_token: "ACCCESS_TOKEN", - token_type: "Bearer", - matrix_server_name: "localhost", - expires_in: 10000, - }), - }, - }, - memberships: [], - getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership), - getOldestMembership: vi.fn().mockReturnValue({ - getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]), - }), - joinRoomSession: vi.fn(), - }) as unknown as MatrixRTCSession; - - await enterRTCSession( - mockedSession, - { - livekit_alias: "roomId", - livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit", - }, - { - encryptMedia: true, - matrixRTCMode: MATRIX_RTC_MODE, - }, - ); - - expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith( - [ - { - livekit_alias: "roomId", - livekit_service_url: "http://my-well-known-service-url.com", +// vi.mock("../../../widget", async (importOriginal) => ({ +// ...(await importOriginal()), +// widget: { +// api: { +// setAlwaysOnScreen: (): void => {}, +// transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() }, +// }, +// lazyActions: new EventEmitter(), +// }, +// })); +describe("LocalMembership", () => { + describe("enterRTCSession", () => { + it("It joins the correct Session", async () => { + const focusFromOlderMembership = { type: "livekit", - }, - ], - undefined, - expect.objectContaining({ - manageMediaKeys: true, - useLegacyMemberEvents: false, - }), - ); -}); + livekit_service_url: "http://my-oldest-member-service-url.com", + livekit_alias: "my-oldest-member-service-alias", + }; -test("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { - mockConfig({}); - vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ - "org.matrix.msc4143.rtc_foci": [ - { + const focusConfigFromWellKnown = { type: "livekit", livekit_service_url: "http://my-well-known-service-url.com", - }, - ], + }; + const focusConfigFromWellKnown2 = { + type: "livekit", + livekit_service_url: "http://my-well-known-service-url2.com", + }; + const clientWellKnown = { + "org.matrix.msc4143.rtc_foci": [ + focusConfigFromWellKnown, + focusConfigFromWellKnown2, + ], + }; + + mockConfig({ + livekit: { livekit_service_url: "http://my-default-service-url.com" }, + }); + + vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation( + async (domain) => { + if (domain === "example.org") { + return Promise.resolve(clientWellKnown); + } + return Promise.resolve({}); + }, + ); + + const mockedSession = vi.mocked({ + room: { + roomId: "roomId", + client: { + getDomain: vi.fn().mockReturnValue("example.org"), + getOpenIdToken: vi.fn().mockResolvedValue({ + access_token: "ACCCESS_TOKEN", + token_type: "Bearer", + matrix_server_name: "localhost", + expires_in: 10000, + }), + }, + }, + memberships: [], + getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership), + getOldestMembership: vi.fn().mockReturnValue({ + getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]), + }), + joinRoomSession: vi.fn(), + }) as unknown as MatrixRTCSession; + + await enterRTCSession( + mockedSession, + { + livekit_alias: "roomId", + livekit_service_url: "http://my-well-known-service-url.com", + type: "livekit", + }, + { + encryptMedia: true, + matrixRTCMode: MATRIX_RTC_MODE, + }, + ); + + expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith( + [ + { + livekit_alias: "roomId", + livekit_service_url: "http://my-well-known-service-url.com", + type: "livekit", + }, + ], + undefined, + expect.objectContaining({ + manageMediaKeys: true, + useLegacyMemberEvents: false, + }), + ); + }); + + it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { + mockConfig({}); + vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ + "org.matrix.msc4143.rtc_foci": [ + { + type: "livekit", + livekit_service_url: "http://my-well-known-service-url.com", + }, + ], + }); + + const mockedSession = vi.mocked({ + room: { + roomId: "roomId", + client: { + getDomain: vi.fn().mockReturnValue("example.org"), + getOpenIdToken: vi.fn().mockResolvedValue({ + access_token: "ACCCESS_TOKEN", + token_type: "Bearer", + matrix_server_name: "localhost", + expires_in: 10000, + }), + }, + }, + memberships: [], + getFocusInUse: vi.fn(), + joinRoomSession: vi.fn(), + }) as unknown as MatrixRTCSession; + + await enterRTCSession( + mockedSession, + { + livekit_alias: "roomId", + livekit_service_url: "http://my-well-known-service-url.com", + type: "livekit", + }, + { + encryptMedia: true, + matrixRTCMode: MATRIX_RTC_MODE, + }, + ); + }); }); - const mockedSession = vi.mocked({ - room: { - roomId: "roomId", - client: { - getDomain: vi.fn().mockReturnValue("example.org"), - getOpenIdToken: vi.fn().mockResolvedValue({ - access_token: "ACCCESS_TOKEN", - token_type: "Bearer", - matrix_server_name: "localhost", - expires_in: 10000, - }), - }, - }, - memberships: [], - getFocusInUse: vi.fn(), - joinRoomSession: vi.fn(), - }) as unknown as MatrixRTCSession; + const defaultCreateLocalMemberValues = { + options: constant({ + encryptMedia: false, + matrixRTCMode: MatrixRTCMode.Matrix_2_0, + }), + matrixRTCSession: { + updateCallIntent: () => {}, + leaveRoomSession: () => {}, + } as unknown as MatrixRTCSession, + muteStates: mockMuteStates(), + isHomeserverConnected: constant(true), + trackProcessorState$: constant({ + supported: false, + processor: undefined, + }), + logger: logger, + createPublisherFactory: (): Publisher => ({}) as unknown as Publisher, + joinMatrixRTC: async (): Promise => {}, + homeserverConnected$: constant(true), + }; - await enterRTCSession( - mockedSession, - { - livekit_alias: "roomId", - livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit", - }, - { - encryptMedia: true, - matrixRTCMode: MATRIX_RTC_MODE, - }, - ); + it("throws error on missing RTC config error", () => { + withTestScheduler(({ scope, hot, expectObservable }) => { + const goodTransport = { + livekit_service_url: "other", + } as LivekitTransport; + + const localTransport$ = scope.behavior( + hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), + goodTransport, + ); + + const mockConnectionManager = { + transports$: scope.behavior( + localTransport$.pipe(map((t) => new Epoch([t]))), + ), + connectionManagerData$: constant( + new Epoch(new ConnectionManagerData()), + ), + }; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: mockConnectionManager, + localTransport$, + }); + + expectObservable(localMembership.connectionState.livekit$).toBe("ne", { + n: { state: LivekitState.Uninitialized }, + e: { + state: LivekitState.Error, + error: expect.toSatisfy( + (e) => e instanceof MatrixRTCTransportMissingError, + ), + }, + }); + }); + }); }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 68b34d94..01267764 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -13,14 +13,14 @@ import { } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { - type LivekitTransport, - type MatrixRTCSession, MembershipManagerEvent, Status, + type LivekitTransport, + type MatrixRTCSession, } from "matrix-js-sdk/lib/matrixrtc"; -import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk"; import { BehaviorSubject, + catchError, combineLatest, distinctUntilChanged, fromEvent, @@ -32,23 +32,17 @@ import { switchMap, tap, } from "rxjs"; -import { type Logger } from "matrix-js-sdk/lib/logger"; +import { logger, type Logger } from "matrix-js-sdk/lib/logger"; +import { ClientEvent, type Room, SyncState } from "matrix-js-sdk"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../../ObservableScope"; -import { Publisher } from "./Publisher"; +import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; -import { type ProcessorState } from "../../../livekit/TrackProcessorContext"; -import { type MediaDevices } from "../../MediaDevices"; import { and$ } from "../../../utils/observable"; import { ElementCallError, UnknownCallError } from "../../../utils/errors"; -import { - ElementWidgetActions, - widget, - type WidgetHelpers, -} from "../../../widget"; -import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers"; +import { ElementWidgetActions, widget } from "../../../widget"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; @@ -68,7 +62,7 @@ export enum LivekitState { } type LocalMemberLivekitState = - | { state: LivekitState.Error; error: string } + | { state: LivekitState.Error; error: ElementCallError } | { state: LivekitState.Connected } | { state: LivekitState.Connecting } | { state: LivekitState.Uninitialized } @@ -79,12 +73,14 @@ export enum MatrixState { Connected = "connected", Disconnected = "disconnected", Connecting = "connecting", + Error = "Error", } type LocalMemberMatrixState = | { state: MatrixState.Connected } | { state: MatrixState.Connecting } - | { state: MatrixState.Disconnected }; + | { state: MatrixState.Disconnected } + | { state: MatrixState.Error; error: Error }; export interface LocalMemberConnectionState { livekit$: Behavior; @@ -102,17 +98,21 @@ export interface LocalMemberConnectionState { * - Publisher.publishTracks() * - send join state/sticky event */ + interface Props { - options: Behavior; + // TODO add a comment into some code style readme or file header callviewmodel + // that the inputs for those createSomething$() functions should NOT contain any js-sdk objectes scope: ObservableScope; - mediaDevices: MediaDevices; muteStates: MuteStates; connectionManager: IConnectionManager; - matrixRTCSession: MatrixRTCSession; - matrixRoom: MatrixRoom; + createPublisherFactory: (connection: Connection) => Publisher; + joinMatrixRTC: (trasnport: LivekitTransport) => Promise; + homeserverConnected$: Behavior; localTransport$: Behavior; - trackProcessorState$: Behavior; - widget: WidgetHelpers | null; + matrixRTCSession: Pick< + MatrixRTCSession, + "updateCallIntent" | "leaveRoomSession" + >; logger: Logger; } @@ -131,18 +131,15 @@ interface Props { */ export const createLocalMembership$ = ({ scope, - options, - muteStates, - mediaDevices, connectionManager, - matrixRTCSession, - localTransport$, - matrixRoom, - trackProcessorState$, - widget, + localTransport$: localTransportCanThrow$, + homeserverConnected$, + createPublisherFactory, + joinMatrixRTC, logger: parentLogger, + muteStates, + matrixRTCSession, }: Props): { - // publisher: Publisher requestConnect: () => LocalMemberConnectionState; startTracks: () => Behavior; requestDisconnect: () => Observable | null; @@ -154,17 +151,13 @@ export const createLocalMembership$ = ({ toggleScreenSharing: (() => void) | null; participant$: Behavior; connection$: Behavior; - // deprecated fields - /** @deprecated use state instead*/ homeserverConnected$: Behavior; + // deprecated fields /** @deprecated use state instead*/ connected$: Behavior; // this needs to be discussed /** @deprecated use state instead*/ reconnecting$: Behavior; - // also needs to be disccues - /** @deprecated use state instead*/ - configError$: Behavior; } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); @@ -188,18 +181,32 @@ export const createLocalMembership$ = ({ // This should be used in a combineLatest with publisher$ to connect. const tracks$ = new BehaviorSubject([]); + // unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. + const localTransport$ = scope.behavior( + localTransportCanThrow$.pipe( + catchError((e: unknown) => { + if (e instanceof ElementCallError) { + state.livekit$.next({ state: LivekitState.Error, error: e }); + } else { + logger.error("Unknown error from localTransport$", e); + } + return of(null); + }), + ), + ); + // Drop Epoch data here since we will not combine this anymore const localConnection$ = scope.behavior( - combineLatest([connectionManager.connections$, localTransport$]).pipe( - map(([connections, localTransport]) => { + combineLatest([ + connectionManager.connectionManagerData$, + localTransport$, + ]).pipe( + map(([connectionData, localTransport]) => { if (localTransport === null) { return null; } - return ( - connections.value.find((connection) => - areLivekitTransportsEqual(connection.transport, localTransport), - ) ?? null - ); + + return connectionData.value.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -208,40 +215,6 @@ export const createLocalMembership$ = ({ }), ), ); - /** - * Whether we are connected to the MatrixRTC session. - */ - const homeserverConnected$ = scope.behavior( - // To consider ourselves connected to MatrixRTC, we check the following: - and$( - // The client is connected to the sync loop - ( - fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< - [SyncState] - > - ).pipe( - startWith([matrixRoom.client.getSyncState()]), - map(([state]) => state === SyncState.Syncing), - ), - // Room state observed by session says we're connected - fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), - ), - // Also watch out for warnings that we've likely hit a timeout and our - // delayed leave event is being sent (this condition is here because it - // provides an earlier warning than the sync loop timeout, and we wouldn't - // see the actual leave event until we reconnect to the sync loop) - fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( - startWith(null), - map(() => matrixRTCSession.probablyLeft !== true), - ), - ).pipe( - tap((connected) => { - logger.info(`Homeserver connected update: ${connected}`); - }), - ), - ); // /** // * Whether we are "fully" connected to the call. Accounts for both the @@ -265,18 +238,15 @@ export const createLocalMembership$ = ({ localConnection$.pipe(scope.bind()).subscribe((connection) => { if (connection !== null && publisher$.value === null) { // TODO looks strange to not change publisher if connection changes. - publisher$.next( - new Publisher( - scope, - connection, - mediaDevices, - muteStates, - trackProcessorState$, - ), - ); + // @valere will take care of this! + publisher$.next(createPublisherFactory(connection)); } }); + // const mutestate= publisher$.pipe(switchMap((publisher) => { + // return publisher.muteState$ + // }); + combineLatest([publisher$, trackStartRequested$]).subscribe( ([publisher, shouldStartTracks]) => { if (publisher && shouldStartTracks) { @@ -359,14 +329,21 @@ export const createLocalMembership$ = ({ } state.matrix$.next({ state: MatrixState.Connecting }); logger.info("Matrix State connecting"); - enterRTCSession(matrixRTCSession, transport, options.value).catch( - (error) => { - logger.error(error); - }, - ); + + joinMatrixRTC(transport).catch((error) => { + logger.error(error); + state.matrix$.next({ state: MatrixState.Error, error }); + }); }, ); + // TODO add this and update `state.matrix$` based on it. + // useTypedEventEmitter( + // rtcSession, + // MatrixRTCSessionEvent.MembershipManagerError, + // (error) => setExternalError(new ConnectionLostError()), + // ); + const requestConnect = (): LocalMemberConnectionState => { trackStartRequested$.next(true); connectRequested$.next(true); @@ -440,18 +417,25 @@ export const createLocalMembership$ = ({ } } }); + // TODO: Refactor updateCallIntent to sth like this: + // combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{ + // matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), + // })) + // - const configError$ = new BehaviorSubject(null); // TODO I do not fully understand what this does. // Is it needed? // Is this at the right place? // Can this be simplified? // Start and stop session membership as needed - scope.reconcile(localTransport$, async (advertised) => { - if (advertised !== null && advertised !== undefined) { + // Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file) + // MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block. + // @toger5 will try to take care of this. + scope.reconcile(localTransport$, async (transport) => { + if (transport !== null && transport !== undefined) { try { - await enterRTCSession(matrixRTCSession, advertised, options.value); - configError$.next(null); + state.matrix$.next({ state: MatrixState.Connecting }); + await joinMatrixRTC(transport); } catch (e) { logger.error("Error entering RTC session", e); } @@ -493,14 +477,13 @@ export const createLocalMembership$ = ({ return s.error instanceof ElementCallError ? s.error : new UnknownCallError(s.error); - } else { - return null; } }), scope.bind(), ) - .subscribe((fatalError) => { - configError$.next(fatalError); + .subscribe((error) => { + if (error !== undefined) + state.livekit$.next({ state: LivekitState.Error, error }); }); /** @@ -509,9 +492,9 @@ export const createLocalMembership$ = ({ const sharingScreen$ = scope.behavior( localConnection$.pipe( switchMap((c) => - c === null - ? of(false) - : observeSharingScreen$(c.livekitRoom.localParticipant), + c !== null && c.livekitRoom + ? observeSharingScreen$(c.livekitRoom.localParticipant) + : of(false), ), ), ); @@ -539,7 +522,7 @@ export const createLocalMembership$ = ({ : null; const participant$ = scope.behavior( - localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)), + localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), ); return { startTracks, @@ -549,7 +532,7 @@ export const createLocalMembership$ = ({ homeserverConnected$, connected$, reconnecting$, - configError$, + sharingScreen$, toggleScreenSharing, participant$, @@ -603,6 +586,7 @@ export async function enterRTCSession( const { sendNotificationType: notificationType, callIntent } = getUrlParams(); const multiSFU = matrixRTCMode !== MatrixRTCMode.Legacy; // Multi-sfu does not need a preferred foci list. just the focus that is actually used. + // TODO where/how do we track errors originating from the ongoing rtcSession? rtcSession.joinRoomSession( multiSFU ? [] : [transport], multiSFU ? transport : undefined, @@ -631,3 +615,44 @@ export async function enterRTCSession( await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); } } + +/** + * Whether we are connected to the MatrixRTC session. + */ +export function createHomeserverConnected$( + scope: ObservableScope, + matrixRoom: Room, + matrixRTCSession: MatrixRTCSession, +): Behavior { + return scope.behavior( + // To consider ourselves connected to MatrixRTC, we check the following: + and$( + // The client is connected to the sync loop + ( + fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< + [SyncState] + > + ).pipe( + startWith([matrixRoom.client.getSyncState()]), + map(([state]) => state === SyncState.Syncing), + ), + // Room state observed by session says we're connected + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + startWith(null), + map(() => matrixRTCSession.membershipStatus === Status.Connected), + ), + // Also watch out for warnings that we've likely hit a timeout and our + // delayed leave event is being sent (this condition is here because it + // provides an earlier warning than the sync loop timeout, and we wouldn't + // see the actual leave event until we reconnect to the sync loop) + fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( + startWith(null), + map(() => matrixRTCSession.probablyLeft !== true), + ), + ).pipe( + tap((connected) => { + logger.info(`Homeserver connected update: ${connected}`); + }), + ), + ); +} diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts new file mode 100644 index 00000000..d543f97a --- /dev/null +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -0,0 +1,120 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; + +import { mockConfig, flushPromises } from "../../../utils/test"; +import { createLocalTransport$ } from "./LocalTransport"; +import { constant } from "../../Behavior"; +import { Epoch, ObservableScope } from "../../ObservableScope"; +import { + MatrixRTCTransportMissingError, + FailToGetOpenIdToken, +} from "../../../utils/errors"; +import * as openIDSFU from "../../../livekit/openIDSFU"; + +describe("LocalTransport", () => { + let scope: ObservableScope; + beforeEach(() => (scope = new ObservableScope())); + afterEach(() => scope.end()); + + it("throws if config is missing", async () => { + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(false), + memberships$: constant(new Epoch([])), + client: { + getDomain: () => "", + // These won't be called in this error path but satisfy the type + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + await flushPromises(); + + expect(() => localTransport$.value).toThrow( + new MatrixRTCTransportMissingError(""), + ); + }); + + it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => { + // Provide a valid config so makeTransportInternal resolves a transport + const scope = new ObservableScope(); + mockConfig({ + livekit: { livekit_service_url: "https://lk.example.org" }, + }); + const resolver = Promise.withResolvers(); + vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockImplementation( + async () => { + await resolver.promise; + throw new FailToGetOpenIdToken(new Error("no openid")); + }, + ); + const observations: unknown[] = []; + const errors: Error[] = []; + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(false), + memberships$: constant(new Epoch([])), + client: { + // Use empty domain to skip .well-known and use config directly + getDomain: () => "", + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + localTransport$.subscribe( + (o) => observations.push(o), + (e) => errors.push(e), + ); + resolver.resolve(); + await flushPromises(); + + const expectedError = new FailToGetOpenIdToken(new Error("no openid")); + expect(observations).toStrictEqual([null]); + expect(errors).toStrictEqual([expectedError]); + expect(() => localTransport$.value).toThrow(expectedError); + }); + + it("emits preferred transport after OpenID resolves", async () => { + // Use config so transport discovery succeeds, but delay OpenID JWT fetch + mockConfig({ + livekit: { livekit_service_url: "https://lk.example.org" }, + }); + + const openIdResolver = Promise.withResolvers(); + + vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( + openIdResolver.promise, + ); + + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(false), + memberships$: constant(new Epoch([])), + client: { + getDomain: () => "", + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + + openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" }); + expect(localTransport$.value).toBe(null); + await flushPromises(); + // final + expect(localTransport$.value).toStrictEqual({ + livekit_alias: "!room:example.org", + livekit_service_url: "https://lk.example.org", + type: "livekit", + }); + }); +}); diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 6bb31e57..bd5ae92f 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -21,24 +21,21 @@ import { type Behavior } from "../../Behavior.ts"; import { type Epoch, type ObservableScope } from "../../ObservableScope.ts"; import { Config } from "../../../config/Config.ts"; import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts"; -import { getSFUConfigWithOpenID } from "../../../livekit/openIDSFU.ts"; +import { + getSFUConfigWithOpenID, + type OpenIDClientParts, +} from "../../../livekit/openIDSFU.ts"; import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; /* - * - get well known - * - get oldest membership - * - get transport to use - * - get openId + jwt token - * - wait for createTrack() call - * - create tracks - * - wait for join() call - * - Publisher.publishTracks() - * - send join state/sticky event + * It figures out “which LiveKit focus URL/alias the local user should use,” + * optionally aligning with the oldest member, and ensures the SFU path is primed + * before advertising that choice. */ interface Props { scope: ObservableScope; memberships$: Behavior>; - client: MatrixClient; + client: Pick & OpenIDClientParts; roomId: string; useOldestMember$: Behavior; } @@ -49,6 +46,8 @@ interface Props { * * @prop useOldestMember Whether to use the same transport as the oldest member. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. + * + * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ export const createLocalTransport$ = ({ scope, @@ -75,6 +74,8 @@ export const createLocalTransport$ = ({ /** * The transport that we would personally prefer to publish on (if not for the * transport preferences of others, perhaps). + * + * @throws */ const preferredTransport$: Behavior = scope.behavior( from(makeTransport(client, roomId)), @@ -103,10 +104,18 @@ export const createLocalTransport$ = ({ const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; -async function makeTransportInternal( - client: MatrixClient, +/** + * + * @param client + * @param roomId + * @returns + * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken + */ +async function makeTransport( + client: Pick & OpenIDClientParts, roomId: string, ): Promise { + let transport: LivekitTransport | undefined; logger.log("Searching for a preferred transport"); //TODO refactor this to use the jwt service returned alias. const livekitAlias = roomId; @@ -124,7 +133,7 @@ async function makeTransportInternal( "Using LiveKit transport from local storage: ", transportFromStorage, ); - return transportFromStorage; + transport = transportFromStorage; } // Prioritize the .well-known/matrix/client, if available, over the configured SFU @@ -136,12 +145,11 @@ async function makeTransportInternal( FOCI_WK_KEY ]; if (Array.isArray(wellKnownFoci)) { - const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( - (f) => f && isLivekitTransportConfig(f), - ); - if (transport !== undefined) { + const wellKnownTransport: LivekitTransportConfig | undefined = + wellKnownFoci.find((f) => f && isLivekitTransportConfig(f)); + if (wellKnownTransport !== undefined) { logger.log("Using LiveKit transport from .well-known: ", transport); - return { ...transport, livekit_alias: livekitAlias }; + transport = { ...wellKnownTransport, livekit_alias: livekitAlias }; } } } @@ -154,26 +162,15 @@ async function makeTransportInternal( livekit_alias: livekitAlias, }; logger.log("Using LiveKit transport from config: ", transportFromConf); - return transportFromConf; + transport = transportFromConf; } + if (!transport) throw new MatrixRTCTransportMissingError(domain ?? ""); // this will call the jwt/sfu/get endpoint to pre create the livekit room. - throw new MatrixRTCTransportMissingError(domain ?? ""); -} + await getSFUConfigWithOpenID( + client, + transport.livekit_service_url, + transport.livekit_alias, + ); -async function makeTransport( - client: MatrixClient, - roomId: string, -): Promise { - const transport = await makeTransportInternal(client, roomId); - // this will call the jwt/sfu/get endpoint to pre create the livekit room. - try { - await getSFUConfigWithOpenID( - client, - transport.livekit_service_url, - transport.livekit_alias, - ); - } catch (e) { - logger.warn(`Failed to get SFU config for transport: ${e}`); - } return transport; } diff --git a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts index 9f448cd9..f58fcb76 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionFactory.ts @@ -23,6 +23,7 @@ import type { Behavior } from "../../Behavior.ts"; import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"; import { defaultLiveKitOptions } from "../../../livekit/options.ts"; +// TODO evaluate if this should be done like the Publisher Factory export interface ConnectionFactory { createConnection( transport: LivekitTransport, diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index 5887442c..484a44e7 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -11,7 +11,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Participant as LivekitParticipant } from "livekit-client"; import { logger } from "matrix-js-sdk/lib/logger"; -import { Epoch, ObservableScope } from "../../ObservableScope.ts"; +import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts"; import { createConnectionManager$, type ConnectionManagerData, @@ -73,7 +73,7 @@ afterEach(() => { describe("connections$ stream", () => { test("Should create and start new connections for each transports", () => { withTestScheduler(({ behavior, expectObservable }) => { - const { connections$ } = createConnectionManager$({ + const { connectionManagerData$ } = createConnectionManager$({ scope: testScope, connectionFactory: fakeConnectionFactory, inputTransports$: behavior("a", { @@ -82,7 +82,9 @@ describe("connections$ stream", () => { logger: logger, }); - expectObservable(connections$).toBe("a", { + expectObservable( + connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())), + ).toBe("a", { a: expect.toSatisfy((e: Epoch) => { const connections = e.value; expect(connections.length).toBe(2); @@ -110,7 +112,7 @@ describe("connections$ stream", () => { test("Should start connection only once", () => { withTestScheduler(({ behavior, expectObservable }) => { - const { connections$ } = createConnectionManager$({ + const { connectionManagerData$ } = createConnectionManager$({ scope: testScope, connectionFactory: fakeConnectionFactory, inputTransports$: behavior("abcdef", { @@ -124,7 +126,9 @@ describe("connections$ stream", () => { logger: logger, }); - expectObservable(connections$).toBe("xxxxxa", { + expectObservable( + connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())), + ).toBe("xxxxxa", { x: expect.anything(), a: expect.toSatisfy((e: Epoch) => { const connections = e.value; @@ -153,7 +157,7 @@ describe("connections$ stream", () => { test("Should cleanup connections when not needed anymore", () => { withTestScheduler(({ behavior, expectObservable }) => { - const { connections$ } = createConnectionManager$({ + const { connectionManagerData$ } = createConnectionManager$({ scope: testScope, connectionFactory: fakeConnectionFactory, inputTransports$: behavior("abc", { @@ -164,7 +168,9 @@ describe("connections$ stream", () => { logger: logger, }); - expectObservable(connections$).toBe("xab", { + expectObservable( + connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())), + ).toBe("xab", { x: expect.anything(), a: expect.toSatisfy((e: Epoch) => { const connections = e.value; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index f284c9e3..d9a0380e 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -94,7 +94,6 @@ interface Props { export interface IConnectionManager { transports$: Behavior>; connectionManagerData$: Behavior>; - connections$: Behavior>; } /** * Crete a `ConnectionManager` @@ -217,7 +216,7 @@ export function createConnectionManager$({ new Epoch(new ConnectionManagerData()), ); - return { transports$, connectionManagerData$, connections$ }; + return { transports$, connectionManagerData$ }; } function removeDuplicateTransports( diff --git a/src/state/MuteStates.ts b/src/state/MuteStates.ts index 50be5e05..0d8e2e43 100644 --- a/src/state/MuteStates.ts +++ b/src/state/MuteStates.ts @@ -157,6 +157,7 @@ export class MuteStates { private readonly mediaDevices: MediaDevices, private readonly joined$: Observable, ) { + logger.log("widget", widget); if (widget !== null) { // Sync our mute states with the hosting client const widgetApiState$ = combineLatest( diff --git a/src/utils/test.ts b/src/utils/test.ts index 4fec433c..5fda90e8 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -80,6 +80,7 @@ export async function flushPromises(): Promise { } export interface OurRunHelpers extends RunHelpers { + scheduler: TestScheduler; /** * Schedules a sequence of actions to happen, as described by a marble * diagram. @@ -123,6 +124,7 @@ export function withTestScheduler( continuation({ ...helpers, scope, + scheduler, schedule(marbles, actions) { const actionsObservable$ = helpers .cold(marbles)