diff --git a/locales/en/app.json b/locales/en/app.json index 9e8fbbd3..1ff066ea 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -108,11 +108,14 @@ "connection_lost_description": "You were disconnected from the call.", "e2ee_unsupported": "Incompatible browser", "e2ee_unsupported_description": "Your web browser does not support encrypted calls. Supported browsers include Chrome, Safari, and Firefox 117+.", + "failed_to_start_livekit": "Failed to start Livekit connection", "generic": "Something went wrong", "generic_description": "Submitting debug logs will help us track down the problem.", "insufficient_capacity": "Insufficient capacity", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", + "membership_manager": "Membership Manager Error", + "membership_manager_description": "The Membership Manager had to shut down. This is caused by many consequtive failed network requests.", "open_elsewhere": "Opened in another tab", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "room_creation_restricted": "Failed to create call", diff --git a/playwright/errors.spec.ts b/playwright/errors.spec.ts index 851e448d..0d36f7ab 100644 --- a/playwright/errors.spec.ts +++ b/playwright/errors.spec.ts @@ -75,7 +75,12 @@ test("Should automatically retry non fatal JWT errors", async ({ test("Should show error screen if call creation is restricted", async ({ page, + browserName, }) => { + test.skip( + browserName === "firefox", + "The is test is not working on firefox CI environment.", + ); await page.goto("/"); // We need the socket connection to fail, but this cannot be done by using the websocket route. diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index b17d3aae..6ae004d8 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -146,6 +146,8 @@ export const ActiveCall: FC = (props) => { reactionsReader.reactions$, scope.behavior(trackProcessorState$), ); + // TODO move this somewhere else once we use the callViewModel in the lobby as well! + vm.join(); setVm(vm); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index ded49d39..3c15958a 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -100,8 +100,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, - LivekitState, - type LocalMemberConnectionState, + RTCBackendState, } from "./localMember/LocalMembership.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -201,7 +200,7 @@ export interface CallViewModel { hangup: () => void; // joining - join: () => LocalMemberConnectionState; + join: () => void; // screen sharing /** @@ -473,6 +472,9 @@ export function createCallViewModel$( mediaDevices, muteStates, trackProcessorState$, + logger.getChild( + "[Publisher" + connection.transport.livekit_service_url + "]", + ), ); }, connectionManager: connectionManager, @@ -571,15 +573,6 @@ export function createCallViewModel$( ), ); - // CODESMELL? - // This is functionally the same Observable as leave$, except here it's - // hoisted to the top of the class. This enables the cyclic dependency between - // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ -> - // localConnection$ -> transports$ -> joined$ -> leave$. - const leaveHoisted$ = new Subject< - "user" | "timeout" | "decline" | "allOthersLeft" - >(); - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. @@ -590,7 +583,6 @@ export function createCallViewModel$( // in a split-brained state. // DISCUSSION own membership manager ALSO this probably can be simplifis const reconnecting$ = localMembership.reconnecting$; - const pretendToBeDisconnected$ = reconnecting$; const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( @@ -639,7 +631,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(pretendToBeDisconnected$)), + handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), ); const reactions$ = scope.behavior( @@ -652,7 +644,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(pretendToBeDisconnected$), + pauseWhen(reconnecting$), ), ); @@ -673,7 +665,7 @@ export function createCallViewModel$( { value: matrixLivekitMembers }, duplicateTiles, ]) { - let localParticipantId = undefined; + let localParticipantId: string | undefined = undefined; // add local member if available if (localMatrixLivekitMember) { const { userId, participant$, connection$, membership$ } = @@ -743,7 +735,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - pretendToBeDisconnected$, + reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -839,10 +831,7 @@ export function createCallViewModel$( merge( autoLeave$, merge(userHangup$, widgetHangup$).pipe(map(() => "user" as const)), - ).pipe( - scope.share, - tap((reason) => leaveHoisted$.next(reason)), - ); + ).pipe(scope.share); const spotlightSpeaker$ = scope.behavior( userMedia$.pipe( @@ -1460,16 +1449,13 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; - const join = localMembership.requestConnect; - // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? - join(); return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, ringOverlay$: ringOverlay$, leave$: leave$, hangup: (): void => userHangup$.next(), - join: join, + join: localMembership.requestConnect, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, @@ -1480,7 +1466,7 @@ export function createCallViewModel$( fatalError$: scope.behavior( localMembership.connectionState.livekit$.pipe( - filter((v) => v.state === LivekitState.Error), + filter((v) => v.state === RTCBackendState.Error), map((s) => s.error), ), null, diff --git a/src/state/CallViewModel/localMember/LocalMembership.test.ts b/src/state/CallViewModel/localMember/LocalMembership.test.ts index 9459d419..cff5c06d 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.test.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.test.ts @@ -12,29 +12,42 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { describe, expect, it, vi } from "vitest"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; -import { map } from "rxjs"; +import { BehaviorSubject, map, of } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; +import { + ConnectionState as LivekitConnectionState, + type LocalParticipant, + type LocalTrack, +} from "livekit-client"; import { MatrixRTCMode } from "../../../settings/settings"; import { + flushPromises, mockConfig, + mockLivekitRoom, mockMuteStates, withTestScheduler, } from "../../../utils/test"; import { createLocalMembership$, enterRTCSession, - LivekitState, + RTCBackendState, } from "./LocalMembership"; import { MatrixRTCTransportMissingError } from "../../../utils/errors"; -import { Epoch } from "../../ObservableScope"; +import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; +import { type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); vi.mock("../../../UrlParams", () => ({ getUrlParams })); +vi.mock("@livekit/components-core", () => ({ + observeParticipantEvents: vi + .fn() + .mockReturnValue(of({ isScreenShareEnabled: false })), +})); describe("LocalMembership", () => { describe("enterRTCSession", () => { @@ -183,7 +196,7 @@ describe("LocalMembership", () => { processor: undefined, }), logger: logger, - createPublisherFactory: (): Publisher => ({}) as unknown as Publisher, + createPublisherFactory: vi.fn(), joinMatrixRTC: async (): Promise => {}, homeserverConnected$: constant(true), }; @@ -216,9 +229,9 @@ describe("LocalMembership", () => { }); expectObservable(localMembership.connectionState.livekit$).toBe("ne", { - n: { state: LivekitState.Uninitialized }, + n: { state: RTCBackendState.WaitingForConnection }, e: { - state: LivekitState.Error, + state: RTCBackendState.Error, error: expect.toSatisfy( (e) => e instanceof MatrixRTCTransportMissingError, ), @@ -226,4 +239,254 @@ describe("LocalMembership", () => { }); }); }); + + const aTransport = { + livekit_service_url: "a", + } as LivekitTransport; + const bTransport = { + livekit_service_url: "b", + } as LivekitTransport; + + const connectionManagerData = new ConnectionManagerData(); + + connectionManagerData.add( + { + livekitRoom: mockLivekitRoom({ + localParticipant: { + isScreenShareEnabled: false, + trackPublications: [], + } as unknown as LocalParticipant, + }), + state$: constant({ + state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConnectionState.Connected), + }), + transport: aTransport, + } as unknown as Connection, + [], + ); + connectionManagerData.add( + { + state$: constant({ + state: "ConnectedToLkRoom", + }), + transport: bTransport, + } as unknown as Connection, + [], + ); + + it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(aTransport); + + const publishers: Publisher[] = []; + + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn(), + publishing$: constant(false), + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + localTransport$.next(bTransport); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledTimes(2); + expect(publishers.length).toBe(2); + // stop the first Publisher and let the second one life. + expect(publishers[0].stopTracks).toHaveBeenCalled(); + expect(publishers[1].stopTracks).not.toHaveBeenCalled(); + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopPublishing).not.toHaveBeenCalled(); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[1].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].stopTracks).toHaveBeenCalled(); + + defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); + }); + + it("only start tracks if requested", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(aTransport); + + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn(), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + tracks$.next([{}, {}] as LocalTrack[]); + return Promise.resolve(); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }, + localTransport$, + }); + await flushPromises(); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + localMembership.startTracks(); + await flushPromises(); + expect(localMembership.tracks$.value.length).toBe(2); + scope.end(); + await flushPromises(); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); + publisherFactory.mockClear(); + }); + // TODO add an integration test combining publisher and localMembership + // + it("tracks livekit state correctly", async () => { + const scope = new ObservableScope(); + + const localTransport$ = new BehaviorSubject(null); + const connectionManagerData$ = new BehaviorSubject< + Epoch + >(new Epoch(new ConnectionManagerData())); + const publishers: Publisher[] = []; + + const tracks$ = new BehaviorSubject([]); + const publishing$ = new BehaviorSubject(false); + const createTrackResolver = Promise.withResolvers(); + const publishResolver = Promise.withResolvers(); + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + const p = { + stopPublishing: vi.fn(), + stopTracks: vi.fn().mockImplementation(() => { + logger.info("stopTracks"); + tracks$.next([]); + }), + createAndSetupTracks: vi.fn().mockImplementation(async () => { + await createTrackResolver.promise; + tracks$.next([{}, {}] as LocalTrack[]); + }), + startPublishing: vi.fn().mockImplementation(async () => { + await publishResolver.promise; + publishing$.next(true); + }), + tracks$, + publishing$, + }; + publishers.push(p as unknown as Publisher); + return p; + }, + ); + + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: { + connectionManagerData$, + }, + localTransport$, + }); + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.WaitingForTransport, + }); + localTransport$.next(aTransport); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.WaitingForConnection, + }); + connectionManagerData$.next(new Epoch(connectionManagerData)); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.Initialized, + }); + expect(publisherFactory).toHaveBeenCalledOnce(); + expect(localMembership.tracks$.value.length).toBe(0); + + // ------- + localMembership.startTracks(); + // ------- + + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.CreatingTracks, + }); + createTrackResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.ReadyToPublish, + }); + + // ------- + localMembership.requestConnect(); + // ------- + + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.WaitingToPublish, + }); + + publishResolver.resolve(); + await flushPromises(); + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.Connected, + }); + expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); + + expect(localMembership.connectionState.livekit$.isStopped).toBe(false); + scope.end(); + await flushPromises(); + // stays in connected state because it is stopped before the update to tracks update the state. + expect(localMembership.connectionState.livekit$.value).toStrictEqual({ + state: RTCBackendState.Connected, + }); + // stop all tracks after ending scopes + expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].stopTracks).toHaveBeenCalled(); + }); + // TODO add tests for matrix local matrix participation. }); diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 36952c5a..60ae79b8 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -11,6 +11,7 @@ import { ParticipantEvent, type LocalParticipant, type ScreenShareCaptureOptions, + ConnectionState, } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; import { @@ -22,64 +23,83 @@ import { catchError, combineLatest, distinctUntilChanged, + from, map, type Observable, of, scan, + startWith, switchMap, tap, } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; -import { type Behavior } from "../../Behavior"; +import { constant, type Behavior } from "../../Behavior"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { ObservableScope } from "../../ObservableScope"; import { type Publisher } from "./Publisher"; import { type MuteStates } from "../../MuteStates"; import { and$ } from "../../../utils/observable"; -import { ElementCallError, UnknownCallError } from "../../../utils/errors"; +import { + ElementCallError, + MembershipManagerError, + UnknownCallError, +} from "../../../utils/errors"; import { ElementWidgetActions, widget } from "../../../widget"; import { getUrlParams } from "../../../UrlParams.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts"; import { Config } from "../../../config/Config.ts"; -import { - type Connection, - type ConnectionState, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; -export enum LivekitState { - Uninitialized = "uninitialized", - Connecting = "connecting", - Connected = "connected", +export enum RTCBackendState { Error = "error", + /** Not even a transport is available to the LocalMembership */ + WaitingForTransport = "waiting_for_transport", + /** A connection appeared so we can initialise the publisher */ + WaitingForConnection = "waiting_for_connection", + /** Connection and transport arrived, publisher Initialized */ + Initialized = "Initialized", + CreatingTracks = "creating_tracks", + ReadyToPublish = "ready_to_publish", + WaitingToPublish = "waiting_to_publish", + Connected = "connected", Disconnected = "disconnected", Disconnecting = "disconnecting", } -type LocalMemberLivekitState = - | { state: LivekitState.Error; error: ElementCallError } - | { state: LivekitState.Connected } - | { state: LivekitState.Connecting } - | { state: LivekitState.Uninitialized } - | { state: LivekitState.Disconnected } - | { state: LivekitState.Disconnecting }; +type LocalMemberRtcBackendState = + | { state: RTCBackendState.Error; error: ElementCallError } + | { state: RTCBackendState.WaitingForTransport } + | { state: RTCBackendState.WaitingForConnection } + | { state: RTCBackendState.Initialized } + | { state: RTCBackendState.CreatingTracks } + | { state: RTCBackendState.ReadyToPublish } + | { state: RTCBackendState.WaitingToPublish } + | { state: RTCBackendState.Connected } + | { state: RTCBackendState.Disconnected } + | { state: RTCBackendState.Disconnecting }; export enum MatrixState { + WaitingForTransport = "waiting_for_transport", + Ready = "ready", + Connecting = "connecting", Connected = "connected", Disconnected = "disconnected", - Connecting = "connecting", Error = "Error", } type LocalMemberMatrixState = | { state: MatrixState.Connected } + | { state: MatrixState.WaitingForTransport } + | { state: MatrixState.Ready } | { state: MatrixState.Connecting } | { state: MatrixState.Disconnected } | { state: MatrixState.Error; error: Error }; export interface LocalMemberConnectionState { - livekit$: Behavior; + livekit$: Behavior; matrix$: Behavior; } @@ -102,7 +122,7 @@ interface Props { muteStates: MuteStates; connectionManager: IConnectionManager; createPublisherFactory: (connection: Connection) => Publisher; - joinMatrixRTC: (trasnport: LivekitTransport) => Promise; + joinMatrixRTC: (transport: LivekitTransport) => Promise; homeserverConnected$: Behavior; localTransport$: Behavior; matrixRTCSession: Pick< @@ -136,48 +156,34 @@ export const createLocalMembership$ = ({ muteStates, matrixRTCSession, }: Props): { - requestConnect: () => LocalMemberConnectionState; + /** + * This starts audio and video tracks. They will be reused when calling `requestConnect`. + */ startTracks: () => Behavior; - requestDisconnect: () => Observable | null; + /** + * This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user + * connected to matrix and livekit. + */ + requestConnect: () => void; + requestDisconnect: () => void; connectionState: LocalMemberConnectionState; sharingScreen$: Behavior; /** * Callback to toggle screen sharing. If null, screen sharing is not possible. */ toggleScreenSharing: (() => void) | null; + tracks$: Behavior; participant$: Behavior; connection$: Behavior; homeserverConnected$: Behavior; - // deprecated fields - /** @deprecated use state instead*/ - connected$: Behavior; // this needs to be discussed /** @deprecated use state instead*/ reconnecting$: Behavior; } => { const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); - const state = { - livekit$: new BehaviorSubject({ - state: LivekitState.Uninitialized, - }), - matrix$: new BehaviorSubject({ - state: MatrixState.Disconnected, - }), - }; - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const trackStartRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const connectRequested$ = new BehaviorSubject(false); - - // This should be used in a combineLatest with publisher$ to connect. - const tracks$ = new BehaviorSubject([]); - - // unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. + // Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. const localTransport$ = scope.behavior( localTransportCanThrow$.pipe( catchError((e: unknown) => { @@ -191,7 +197,7 @@ export const createLocalMembership$ = ({ : new Error("Unknown error from localTransport"), ); } - state.livekit$.next({ state: LivekitState.Error, error }); + setLivekitError(error); return of(null); }), ), @@ -203,12 +209,12 @@ export const createLocalMembership$ = ({ connectionManager.connectionManagerData$, localTransport$, ]).pipe( - map(([connectionData, localTransport]) => { + map(([{ value: connectionData }, localTransport]) => { if (localTransport === null) { return null; } - return connectionData.value.getConnectionForTransport(localTransport); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -218,50 +224,36 @@ export const createLocalMembership$ = ({ ), ); + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + // /** // * Whether we are "fully" connected to the call. Accounts for both the // * connection to the MatrixRTC session and the LiveKit publish connection. // */ - // // TODO use this in combination with the MemberState. const connected$ = scope.behavior( and$( - homeserverConnected$, - localConnection$.pipe( - switchMap((c) => - c - ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) - : of(false), - ), + homeserverConnected$.pipe( + tap((v) => logger.debug("matrix: Connected state changed", v)), ), - ), - ); - - const publisher$ = new BehaviorSubject(null); - localConnection$.pipe(scope.bind()).subscribe((connection) => { - if (connection !== null && publisher$.value === null) { - // TODO looks strange to not change publisher if connection changes. - // @toger5 will take care of this! - publisher$.next(createPublisherFactory(connection)); - } - }); - - // const mutestate= publisher$.pipe(switchMap((publisher) => { - // return publisher.muteState$ - // }); - - combineLatest([publisher$, trackStartRequested$]).subscribe( - ([publisher, shouldStartTracks]) => { - if (publisher && shouldStartTracks) { - publisher - .createAndSetupTracks() - .then((tracks) => { - tracks$.next(tracks); - }) - .catch((error) => { - logger.error("Error creating tracks:", error); - }); - } - }, + localConnectionState$.pipe( + switchMap((state) => { + logger.debug("livekit: Connected state changed", state); + if (!state) return of(false); + if (state.state === "ConnectedToLkRoom") { + logger.debug( + "livekit: Connected state changed (inner livekitConnectionState$)", + state.livekitConnectionState$.value, + ); + return state.livekitConnectionState$.pipe( + map((lkState) => lkState === ConnectionState.Connected), + ); + } + return of(false); + }), + ), + ).pipe(tap((v) => logger.debug("combined: Connected state changed", v))), ); // MATRIX RELATED @@ -286,90 +278,217 @@ export const createLocalMembership$ = ({ ), ); + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const trackStartRequested = Promise.withResolvers(); + + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const connectRequested$ = new BehaviorSubject(false); + + /** + * The publisher is stored in here an abstracts creating and publishing tracks. + */ + const publisher$ = new BehaviorSubject(null); + /** + * Extract the tracks from the published. Also reacts to changing publishers. + */ + const tracks$ = scope.behavior( + publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))), + ); + const publishing$ = scope.behavior( + publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))), + ); + const startTracks = (): Behavior => { - trackStartRequested$.next(true); + trackStartRequested.resolve(); return tracks$; }; - combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => { - if ( - tracks.length === 0 || - // change this to !== Publishing - state.livekit$.value.state !== LivekitState.Uninitialized - ) { - return; + const requestConnect = (): void => { + trackStartRequested.resolve(); + connectRequested$.next(true); + }; + + const requestDisconnect = (): void => { + connectRequested$.next(false); + }; + + // Take care of the publisher$ + // create a new one as soon as a local Connection is available + // + // Recreate a new one once the local connection changes + // - stop publishing + // - destruct all current streams + // - overwrite current publisher + scope.reconcile(localConnection$, async (connection) => { + if (connection !== null) { + publisher$.next(createPublisherFactory(connection)); } - state.livekit$.next({ state: LivekitState.Connecting }); - publisher - ?.startPublishing() - .then(() => { - state.livekit$.next({ state: LivekitState.Connected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); + return Promise.resolve(async (): Promise => { + await publisher$?.value?.stopPublishing(); + publisher$?.value?.stopTracks(); + }); }); - combineLatest([localTransport$, connectRequested$]).subscribe( - // TODO reconnect when transport changes => create test. - ([transport, connectRequested]) => { - if ( - transport === null || - !connectRequested || - state.matrix$.value.state !== MatrixState.Disconnected - ) { - logger.info( - "Not yet connecting because: ", - "transport === null:", - transport === null, - "!connectRequested:", - !connectRequested, - "state.matrix$.value.state !== MatrixState.Disconnected:", - state.matrix$.value.state !== MatrixState.Disconnected, - ); - return; + // Use reconcile here to not run concurrent createAndSetupTracks calls + // `tracks$` will update once they are ready. + scope.reconcile( + scope.behavior( + combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]), + null, + ), + async (valueIfReady) => { + if (!valueIfReady) return; + const [publisher, tracks] = valueIfReady; + if (publisher && tracks.length === 0) { + await publisher.createAndSetupTracks().catch((e) => logger.error(e)); } - state.matrix$.next({ state: MatrixState.Connecting }); - logger.info("Matrix State connecting"); - - joinMatrixRTC(transport).catch((error) => { - logger.error(error); - state.matrix$.next({ state: MatrixState.Error, error }); - }); }, ); - // TODO add this and update `state.matrix$` based on it. - // useTypedEventEmitter( - // rtcSession, - // MatrixRTCSessionEvent.MembershipManagerError, - // (error) => setExternalError(new ConnectionLostError()), - // ); + // Based on `connectRequested$` we start publishing tracks. (once they are there!) + scope.reconcile( + scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), + async ([publisher, tracks, shouldConnect]) => { + if (shouldConnect === publisher?.publishing$.value) return; + if (tracks.length !== 0 && shouldConnect) { + try { + await publisher?.startPublishing(); + } catch (error) { + setLivekitError(error as ElementCallError); + } + } else if (tracks.length !== 0 && !shouldConnect) { + try { + await publisher?.stopPublishing(); + } catch (error) { + setLivekitError(new UnknownCallError(error as Error)); + } + } + }, + ); - const requestConnect = (): LocalMemberConnectionState => { - trackStartRequested$.next(true); - connectRequested$.next(true); - - return state; + const fatalLivekitError$ = new BehaviorSubject(null); + const setLivekitError = (e: ElementCallError): void => { + if (fatalLivekitError$.value !== null) + logger.error("Multiple Livkit Errors:", e); + else fatalLivekitError$.next(e); }; + const livekitState$: Behavior = scope.behavior( + combineLatest([ + publisher$, + localTransport$, + tracks$.pipe( + tap((t) => { + logger.info("tracks$: ", t); + }), + ), + publishing$, + connectRequested$, + from(trackStartRequested.promise).pipe( + map(() => true), + startWith(false), + ), + fatalLivekitError$, + ]).pipe( + map( + ([ + publisher, + localTransport, + tracks, + publishing, + shouldConnect, + shouldStartTracks, + error, + ]) => { + // read this: + // if(!) return {state: ...} + // if(!) return {state: } + // + // as: + // We do have but not yet so we are in + if (error !== null) return { state: RTCBackendState.Error, error }; + const hasTracks = tracks.length > 0; + if (!localTransport) + return { state: RTCBackendState.WaitingForTransport }; + if (!publisher) + return { state: RTCBackendState.WaitingForConnection }; + if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; + if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; + if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; + if (!publishing) return { state: RTCBackendState.WaitingToPublish }; + return { state: RTCBackendState.Connected }; + }, + ), + distinctUntilChanged(deepCompare), + ), + ); - const requestDisconnect = (): Behavior | null => { - if (state.livekit$.value.state !== LivekitState.Connected) return null; - state.livekit$.next({ state: LivekitState.Disconnecting }); - combineLatest([publisher$, tracks$], (publisher, tracks) => { - publisher - ?.stopPublishing() - .then(() => { - tracks.forEach((track) => track.stop()); - state.livekit$.next({ state: LivekitState.Disconnected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); - }); - - return state.livekit$; + const fatalMatrixError$ = new BehaviorSubject(null); + const setMatrixError = (e: ElementCallError): void => { + if (fatalMatrixError$.value !== null) + logger.error("Multiple Matrix Errors:", e); + else fatalMatrixError$.next(e); }; + const matrixState$: Behavior = scope.behavior( + combineLatest([ + localTransport$, + connectRequested$, + homeserverConnected$, + ]).pipe( + map(([localTransport, connectRequested, homeserverConnected]) => { + if (!localTransport) return { state: MatrixState.WaitingForTransport }; + if (!connectRequested) return { state: MatrixState.Ready }; + if (!homeserverConnected) return { state: MatrixState.Connecting }; + return { state: MatrixState.Connected }; + }), + ), + ); + + // Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ + scope.reconcile( + scope.behavior(combineLatest([localTransport$, connectRequested$])), + async ([transport, shouldConnect]) => { + if (!shouldConnect) return; + + if (!transport) return; + try { + await joinMatrixRTC(transport); + } catch (error) { + logger.error("Error entering RTC session", error); + if (error instanceof Error) + setMatrixError(new MembershipManagerError(error)); + } + + // Update our member event when our mute state changes. + const callIntentScope = new ObservableScope(); + // because this uses its own scope, we can start another reconciliation for the duration of one connection. + callIntentScope.reconcile( + muteStates.video.enabled$, + async (videoEnabled) => + matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), + ); + + return async (): Promise => { + callIntentScope.end(); + try { + // Update matrixRTCSession to allow udpating the transport without leaving the session! + await matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; + }, + ); + + const participant$ = scope.behavior( + localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), + ); // Pause upstream of all local media tracks when we're disconnected from // MatrixRTC, because it can be an unpleasant surprise for the app to say @@ -377,12 +496,12 @@ export const createLocalMembership$ = ({ // We use matrixConnected$ rather than reconnecting$ because we want to // pause tracks during the initial joining sequence too until we're sure // that our own media is displayed on screen. - combineLatest([localConnection$, homeserverConnected$]) + // TODO refactor this based no livekitState$ + combineLatest([participant$, homeserverConnected$]) .pipe(scope.bind()) - .subscribe(([connection, connected]) => { - if (connection?.state$.value.state !== "ConnectedToLkRoom") return; - const publications = - connection.livekitRoom.localParticipant.trackPublications.values(); + .subscribe(([participant, connected]) => { + if (!participant) return; + const publications = participant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -419,89 +538,17 @@ export const createLocalMembership$ = ({ } } }); - // TODO: Refactor updateCallIntent to sth like this: - // combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{ - // matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - // })) - // - - // TODO I do not fully understand what this does. - // Is it needed? - // Is this at the right place? - // Can this be simplified? - // Start and stop session membership as needed - // Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file) - // MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block. - // @toger5 will try to take care of this. - scope.reconcile(localTransport$, async (transport) => { - if (transport !== null && transport !== undefined) { - try { - state.matrix$.next({ state: MatrixState.Connecting }); - await joinMatrixRTC(transport); - } catch (e) { - logger.error("Error entering RTC session", e); - } - - // Update our member event when our mute state changes. - const intentScope = new ObservableScope(); - intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) => - matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), - ); - - return async (): Promise => { - intentScope.end(); - // Only sends Matrix leave event. The LiveKit session will disconnect - // as soon as either the stopConnection$ handler above gets to it or - // the view model is destroyed. - try { - await matrixRTCSession.leaveRoomSession(); - } catch (e) { - logger.error("Error leaving RTC session", e); - } - try { - await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; - } - }); - - localConnection$ - .pipe( - distinctUntilChanged(), - switchMap((c) => - c === null ? of({ state: "Initialized" } as ConnectionState) : c.state$, - ), - map((s) => { - logger.trace(`Local connection state update: ${s.state}`); - if (s.state == "FailedToStart") { - return s.error instanceof ElementCallError - ? s.error - : new UnknownCallError(s.error); - } - }), - scope.bind(), - ) - .subscribe((error) => { - if (error !== undefined) - state.livekit$.next({ state: LivekitState.Error, error }); - }); /** * Whether the user is currently sharing their screen. */ const sharingScreen$ = scope.behavior( - localConnection$.pipe( - switchMap((c) => - c !== null - ? observeSharingScreen$(c.livekitRoom.localParticipant) - : of(false), - ), + participant$.pipe( + switchMap((p) => (p !== null ? observeSharingScreen$(p) : of(false))), ), ); - let toggleScreenSharing = null; + let toggleScreenSharing: (() => void) | null = null; if ( "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !getUrlParams().hideScreensharing @@ -527,27 +574,26 @@ export const createLocalMembership$ = ({ // We also allow screen sharing to be toggled even if the connection // is still initializing or publishing tracks, because there's no // technical reason to disallow this. LiveKit will publish if it can. - localConnection$.value?.livekitRoom.localParticipant - .setScreenShareEnabled(targetScreenshareState, screenshareSettings) + participant$.value + ?.setScreenShareEnabled(targetScreenshareState, screenshareSettings) .catch(logger.error); }; } - const participant$ = scope.behavior( - localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)), - ); return { startTracks, requestConnect, requestDisconnect, - connectionState: state, + connectionState: { + livekit$: livekitState$, + matrix$: matrixState$, + }, + tracks$, + participant$, homeserverConnected$, - connected$, reconnecting$, - sharingScreen$, toggleScreenSharing, - participant$, connection$: localConnection$, }; }; diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index d543f97a..c1c36fa5 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details. import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { BehaviorSubject } from "rxjs"; import { mockConfig, flushPromises } from "../../../utils/test"; import { createLocalTransport$ } from "./LocalTransport"; @@ -117,4 +118,39 @@ describe("LocalTransport", () => { type: "livekit", }); }); + + it("updates local transport when oldest member changes", async () => { + // Use config so transport discovery succeeds, but delay OpenID JWT fetch + mockConfig({ + livekit: { livekit_service_url: "https://lk.example.org" }, + }); + const memberships$ = new BehaviorSubject(new Epoch([])); + const openIdResolver = Promise.withResolvers(); + + vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( + openIdResolver.promise, + ); + + const localTransport$ = createLocalTransport$({ + scope, + roomId: "!room:example.org", + useOldestMember$: constant(true), + memberships$, + client: { + getDomain: () => "", + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + }, + }); + + openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" }); + expect(localTransport$.value).toBe(null); + await flushPromises(); + // final + expect(localTransport$.value).toStrictEqual({ + livekit_alias: "!room:example.org", + livekit_service_url: "https://lk.example.org", + type: "livekit", + }); + }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts new file mode 100644 index 00000000..9b3e5b2a --- /dev/null +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -0,0 +1,140 @@ +/* +Copyright 2025 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + afterEach, + beforeEach, + describe, + expect, + it, + type Mock, + vi, +} from "vitest"; +import { ConnectionState as LivekitConenctionState } from "livekit-client"; +import { type BehaviorSubject } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; + +import { ObservableScope } from "../../ObservableScope"; +import { constant } from "../../Behavior"; +import { + mockLivekitRoom, + mockLocalParticipant, + mockMediaDevices, +} from "../../../utils/test"; +import { Publisher } from "./Publisher"; +import { + type Connection, + type ConnectionState, +} from "../remoteMembers/Connection"; +import { type MuteStates } from "../../MuteStates"; +import { FailToStartLivekitConnection } from "../../../utils/errors"; + +describe("Publisher", () => { + let scope: ObservableScope; + let connection: Connection; + let muteStates: MuteStates; + beforeEach(() => { + muteStates = { + audio: { + enabled$: constant(false), + unsetHandler: vi.fn(), + setHandler: vi.fn(), + }, + video: { + enabled$: constant(false), + unsetHandler: vi.fn(), + setHandler: vi.fn(), + }, + } as unknown as MuteStates; + scope = new ObservableScope(); + connection = { + state$: constant({ + state: "ConnectedToLkRoom", + livekitConnectionState$: constant(LivekitConenctionState.Connected), + }), + livekitRoom: mockLivekitRoom({ + localParticipant: mockLocalParticipant({}), + }), + } as unknown as Connection; + }); + + afterEach(() => scope.end()); + + it("throws if livekit room could not publish", async () => { + const publisher = new Publisher( + scope, + connection, + mockMediaDevices({}), + muteStates, + constant({ supported: false, processor: undefined }), + logger, + ); + + // should do nothing if no tracks have been created yet. + await publisher.startPublishing(); + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).not.toHaveBeenCalled(); + + await expect(publisher.createAndSetupTracks()).rejects.toThrow( + Error("audio and video is false"), + ); + + (muteStates.audio.enabled$ as BehaviorSubject).next(true); + + ( + connection.livekitRoom.localParticipant.createTracks as Mock + ).mockResolvedValue([{}, {}]); + + await expect(publisher.createAndSetupTracks()).resolves.not.toThrow(); + expect( + connection.livekitRoom.localParticipant.createTracks, + ).toHaveBeenCalledOnce(); + + // failiour due to localParticipant.publishTrack + ( + connection.livekitRoom.localParticipant.publishTrack as Mock + ).mockRejectedValue(Error("testError")); + + await expect(publisher.startPublishing()).rejects.toThrow( + new FailToStartLivekitConnection("testError"), + ); + + // does not try other conenction after the first one failed + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(1); + + // failiour due to connection.state$ + const beforeState = connection.state$.value; + (connection.state$ as BehaviorSubject).next({ + state: "FailedToStart", + error: Error("testStartError"), + }); + + await expect(publisher.startPublishing()).rejects.toThrow( + new FailToStartLivekitConnection("testStartError"), + ); + (connection.state$ as BehaviorSubject).next(beforeState); + + // does not try other conenction after the first one failed + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(1); + + // success case + ( + connection.livekitRoom.localParticipant.publishTrack as Mock + ).mockResolvedValue({}); + + await expect(publisher.startPublishing()).resolves.not.toThrow(); + + expect( + connection.livekitRoom.localParticipant.publishTrack, + ).toHaveBeenCalledTimes(3); + }); +}); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 11f35424..326dedaf 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -14,6 +14,7 @@ import { ConnectionState as LivekitConnectionState, } from "livekit-client"; import { + BehaviorSubject, map, NEVER, type Observable, @@ -33,6 +34,10 @@ import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; +import { + ElementCallError, + FailToStartLivekitConnection, +} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -40,7 +45,6 @@ import { type ObservableScope } from "../../ObservableScope.ts"; * The Publisher is also responsible for creating the media tracks. */ export class Publisher { - public tracks: LocalTrack[] = []; /** * Creates a new Publisher. * @param scope - The observable scope to use for managing the publisher. @@ -52,19 +56,19 @@ export class Publisher { */ public constructor( private scope: ObservableScope, - private connection: Connection, + private connection: Pick, //setE2EEEnabled, devices: MediaDevices, private readonly muteStates: MuteStates, trackerProcessorState$: Behavior, - private logger?: Logger, + private logger: Logger, ) { - this.logger?.info("[PublishConnection] Create LiveKit room"); + this.logger.info("Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const room = connection.livekitRoom; room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => { - this.logger?.error("Failed to set E2EE enabled on room", e); + this.logger.error("Failed to set E2EE enabled on room", e); }); // Setup track processor syncing (blur) @@ -74,13 +78,31 @@ export class Publisher { this.workaroundRestartAudioInputTrackChrome(devices, scope); this.scope.onEnd(() => { - this.logger?.info( - "[PublishConnection] Scope ended -> stop publishing all tracks", - ); + this.logger.info("Scope ended -> stop publishing all tracks"); void this.stopPublishing(); }); + + // TODO move mute state handling here using reconcile (instead of inside the mute state class) + // this.scope.reconcile( + // this.scope.behavior( + // combineLatest([this.muteStates.video.enabled$, this.tracks$]), + // ), + // async ([videoEnabled, tracks]) => { + // const track = tracks.find((t) => t.kind == Track.Kind.Video); + // if (!track) return; + + // if (videoEnabled) { + // await track.unmute(); + // } else { + // await track.mute(); + // } + // }, + // ); } + private _tracks$ = new BehaviorSubject[]>([]); + public tracks$ = this._tracks$ as Behavior[]>; + /** * Start the connection to LiveKit and publish local tracks. * @@ -94,51 +116,46 @@ export class Publisher { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - public async createAndSetupTracks(): Promise { + public async createAndSetupTracks(): Promise { + this.logger.debug("createAndSetupTracks called"); const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); - // TODO: This should be an autostarted connection no need to start here. just check the connection state. - // TODO: This will fetch the JWT token. Perhaps we could keep it preloaded - // instead? This optimization would only be safe for a publish connection, - // because we don't want to leak the user's intent to perhaps join a call to - // remote servers before they actually commit to it. - // const { promise, resolve, reject } = Promise.withResolvers(); - // const sub = this.connection.state$.subscribe((s) => { - // if (s.state === "FailedToStart") { - // reject(new Error("Disconnected from LiveKit server")); - // } else if (s.state === "ConnectedToLkRoom") { - // resolve(); - // } - // }); - // try { - // await promise; - // } catch (e) { - // throw e; - // } finally { - // sub.unsubscribe(); - // } // TODO-MULTI-SFU: Prepublish a microphone track const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; // createTracks throws if called with audio=false and video=false if (audio || video) { // TODO this can still throw errors? It will also prompt for permissions if not already granted - this.tracks = - (await lkRoom.localParticipant - .createTracks({ - audio, - video, - }) - .catch((error) => { - this.logger?.error("Failed to create tracks", error); - })) ?? []; + return lkRoom.localParticipant + .createTracks({ + audio, + video, + }) + .then((tracks) => { + this.logger.info( + "created track", + tracks.map((t) => t.kind + ", " + t.id), + ); + this._tracks$.next(tracks); + }) + .catch((error) => { + this.logger.error("Failed to create tracks", error); + }); } - return this.tracks; + throw Error("audio and video is false"); } + private _publishing$ = new BehaviorSubject(false); + public publishing$ = this.scope.behavior(this._publishing$); + /** + * + * @returns + * @throws ElementCallError + */ public async startPublishing(): Promise { + this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; const { promise, resolve, reject } = Promise.withResolvers(); const sub = this.connection.state$.subscribe((s) => { @@ -147,10 +164,14 @@ export class Publisher { resolve(); break; case "FailedToStart": - reject(new Error("Failed to connect to LiveKit server")); + reject( + s.error instanceof ElementCallError + ? s.error + : new FailToStartLivekitConnection(s.error.message), + ); break; default: - this.logger?.info("waiting for connection: ", s.state); + this.logger.info("waiting for connection: ", s.state); } }); try { @@ -160,19 +181,27 @@ export class Publisher { } finally { sub.unsubscribe(); } - for (const track of this.tracks) { + + for (const track of this.tracks$.value) { + this.logger.info("publish ", this.tracks$.value.length, "tracks"); // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { - this.logger?.error("Failed to publish track", error); + this.logger.error("Failed to publish track", error); + throw new FailToStartLivekitConnection( + error instanceof Error ? error.message : error, + ); }); + this.logger.info("published track ", track.kind, track.id); // TODO: check if the connection is still active? and break the loop if not? } - return this.tracks; + this._publishing$.next(true); + return this.tracks$.value; } public async stopPublishing(): Promise { + this.logger.debug("stopPublishing called"); // TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope // actually has the right lifetime this.muteStates.audio.unsetHandler(); @@ -184,7 +213,28 @@ export class Publisher { if (p.track !== undefined) tracks.push(p.track); }; localParticipant.trackPublications.forEach(addToTracksIfDefined); - await localParticipant.unpublishTracks(tracks); + this.logger.debug( + "list of tracks to unpublish:", + tracks.map((t) => t.kind + ", " + t.id), + "start unpublishing now", + ); + await localParticipant.unpublishTracks(tracks).catch((error) => { + this.logger.error("Failed to unpublish tracks", error); + throw error; + }); + this.logger.debug( + "unpublished tracks", + tracks.map((t) => t.kind + ", " + t.id), + ); + this._publishing$.next(false); + } + + /** + * Stops all tracks that are currently running + */ + public stopTracks(): void { + this.tracks$.value.forEach((t) => t.stop()); + this._tracks$.next([]); } /// Private methods @@ -221,6 +271,9 @@ export class Publisher { // the process of being restarted. activeMicTrack.mediaStreamTrack.readyState !== "ended" ) { + this.logger?.info( + "Restarting audio device track due to active media device changed (workaroundRestartAudioInputTrackChrome)", + ); // Restart the track, which will cause Livekit to do another // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because @@ -229,7 +282,7 @@ export class Publisher { .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { - this.logger?.error(`Failed to restart audio device track`, e); + this.logger.error(`Failed to restart audio device track`, e); }); } }); @@ -249,7 +302,7 @@ export class Publisher { selected$.pipe(scope.bind()).subscribe((device) => { if (lkRoom.state != LivekitConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; - this.logger?.info( + this.logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", lkRoom.getActiveDevice(kind), " !== ", @@ -262,7 +315,7 @@ export class Publisher { lkRoom .switchActiveDevice(kind, device.id) .catch((e: Error) => - this.logger?.error( + this.logger.error( `Failed to sync ${kind} device with LiveKit`, e, ), @@ -287,10 +340,7 @@ export class Publisher { try { await lkRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit audio input mute state", - e, - ); + this.logger.error("Failed to update LiveKit audio input mute state", e); } return lkRoom.localParticipant.isMicrophoneEnabled; }); @@ -298,10 +348,7 @@ export class Publisher { try { await lkRoom.localParticipant.setCameraEnabled(desired); } catch (e) { - this.logger?.error( - "Failed to update LiveKit video input mute state", - e, - ); + this.logger.error("Failed to update LiveKit video input mute state", e); } return lkRoom.localParticipant.isCameraEnabled; }); diff --git a/src/state/CallViewModel/remoteMembers/Connection.test.ts b/src/state/CallViewModel/remoteMembers/Connection.test.ts index 3f58bcf6..2ead768b 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.test.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.test.ts @@ -193,7 +193,7 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState!.state === "FailedToStart") { expect(capturedState!.error.message).toEqual("Something went wrong"); - expect(capturedState!.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -249,7 +249,7 @@ describe("Start connection states", () => { expect(capturedState?.error.message).toContain( "SFU Config fetch failed with exception Error", ); - expect(capturedState?.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { @@ -313,7 +313,7 @@ describe("Start connection states", () => { expect(capturedState.error.message).toContain( "Failed to connect to livekit", ); - expect(capturedState.transport.livekit_alias).toEqual( + expect(connection.transport.livekit_alias).toEqual( livekitFocus.livekit_alias, ); } else { diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index c17fae2b..4f3bbda4 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -19,7 +19,7 @@ import { RoomEvent, } from "livekit-client"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, map, type Observable } from "rxjs"; +import { BehaviorSubject, map } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { @@ -50,16 +50,14 @@ export interface ConnectionOpts { export type ConnectionState = | { state: "Initialized" } - | { state: "FetchingConfig"; transport: LivekitTransport } - | { state: "ConnectingToLkRoom"; transport: LivekitTransport } - | { state: "PublishingTracks"; transport: LivekitTransport } - | { state: "FailedToStart"; error: Error; transport: LivekitTransport } + | { state: "FetchingConfig" } + | { state: "ConnectingToLkRoom" } | { state: "ConnectedToLkRoom"; - livekitConnectionState$: Observable; - transport: LivekitTransport; + livekitConnectionState$: Behavior; } - | { state: "Stopped"; transport: LivekitTransport }; + | { state: "FailedToStart"; error: Error } + | { state: "Stopped" }; /** * A connection to a Matrix RTC LiveKit backend. @@ -77,6 +75,24 @@ export class Connection { */ public readonly state$: Behavior = this._state$; + /** + * The media transport to connect to. + */ + public readonly transport: LivekitTransport; + + public readonly livekitRoom: LivekitRoom; + + private scope: ObservableScope; + + /** + * An observable of the participants that are publishing on this connection. (Excluding our local participant) + * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. + * It filters the participants to only those that are associated with a membership that claims to publish on this connection. + */ + public readonly remoteParticipantsWithTracks$: Behavior< + PublishingParticipant[] + >; + /** * Whether the connection has been stopped. * @see Connection.stop @@ -96,7 +112,6 @@ export class Connection { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - // TODO dont make this throw and instead store a connection error state in this class? // TODO consider an autostart pattern... public async start(): Promise { this.logger.debug("Starting Connection"); @@ -104,7 +119,6 @@ export class Connection { try { this._state$.next({ state: "FetchingConfig", - transport: this.transport, }); const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect @@ -112,7 +126,6 @@ export class Connection { this._state$.next({ state: "ConnectingToLkRoom", - transport: this.transport, }); try { await this.livekitRoom.connect(url, jwt); @@ -143,15 +156,15 @@ export class Connection { this._state$.next({ state: "ConnectedToLkRoom", - transport: this.transport, - livekitConnectionState$: connectionStateObserver(this.livekitRoom), + livekitConnectionState$: this.scope.behavior( + connectionStateObserver(this.livekitRoom), + ), }); } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next({ state: "FailedToStart", error: error instanceof Error ? error : new Error(`${error}`), - transport: this.transport, }); throw error; } @@ -179,28 +192,11 @@ export class Connection { await this.livekitRoom.disconnect(); this._state$.next({ state: "Stopped", - transport: this.transport, }); this.stopped = true; } - /** - * An observable of the participants that are publishing on this connection. (Excluding our local participant) - * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. - * It filters the participants to only those that are associated with a membership that claims to publish on this connection. - */ - public readonly remoteParticipantsWithTracks$: Behavior< - PublishingParticipant[] - >; - - /** - * The media transport to connect to. - */ - public readonly transport: LivekitTransport; - private readonly client: OpenIDClientParts; - public readonly livekitRoom: LivekitRoom; - private readonly logger: Logger; /** @@ -217,6 +213,7 @@ export class Connection { ); const { transport, client, scope } = opts; + this.scope = scope; this.livekitRoom = opts.livekitRoomFactory(); this.transport = transport; this.client = client; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index d9a0380e..0b9f939c 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -92,7 +92,6 @@ interface Props { } // TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { - transports$: Behavior>; connectionManagerData$: Behavior>; } /** @@ -216,7 +215,7 @@ export function createConnectionManager$({ new Epoch(new ConnectionManagerData()), ); - return { transports$, connectionManagerData$ }; + return { connectionManagerData$ }; } function removeDuplicateTransports( diff --git a/src/state/ObservableScope.ts b/src/state/ObservableScope.ts index 27f501c7..e3fc644f 100644 --- a/src/state/ObservableScope.ts +++ b/src/state/ObservableScope.ts @@ -123,8 +123,22 @@ export class ObservableScope { callback: (value: T) => Promise<(() => Promise) | void>, ): void { let latestValue: T | typeof nothing = nothing; - let reconciledValue: T | typeof nothing = nothing; + let reconcilePromise: Promise | undefined = undefined; let cleanUp: (() => Promise) | void = undefined; + let prevVal: T | typeof nothing = nothing; + + // While this loop runs it will process the latest from `value$` until it caught up with the updates. + // It might skip updates from `value$` and only process the newest value after callback has resolved. + const reconcileLoop = async (): Promise => { + while (latestValue !== prevVal) { + await cleanUp?.(); // Call the previous value's clean-up handler + prevVal = latestValue; + + if (latestValue !== nothing) cleanUp = await callback(latestValue); // Sync current value... + // `latestValue` might have gotten updated during the `await callback`. That is why we loop here + } + }; + value$ .pipe( catchError(() => EMPTY), // Ignore errors @@ -132,23 +146,15 @@ export class ObservableScope { endWith(nothing), // Clean up when the scope ends ) .subscribe((value) => { - void (async (): Promise => { - if (latestValue === nothing) { - latestValue = value; - while (latestValue !== reconciledValue) { - await cleanUp?.(); // Call the previous value's clean-up handler - reconciledValue = latestValue; - if (latestValue !== nothing) - cleanUp = await callback(latestValue); // Sync current value - } - // Reset to signal that reconciliation is done for now - latestValue = nothing; - } else { - // There's already an instance of the above 'while' loop running - // concurrently. Just update the latest value and let it be handled. - latestValue = value; - } - })(); + // Always track the latest value! The `reconcileLoop` will run until it "processed" the "last" `latestValue`. + latestValue = value; + // There's already an instance of the below 'reconcileLoop' loop running + // concurrently. So lets let the loop handle it. NEVER instanciate two `reconcileLoop`s. + if (reconcilePromise) return; + + reconcilePromise = reconcileLoop().finally(() => { + reconcilePromise = undefined; + }); }); } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index b77c0ff0..bb37754a 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -13,6 +13,8 @@ export enum ErrorCode { */ MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", + INTERNAL_MEMBERSHIP_MANAGER = "INTERNAL_MEMBERSHIP_MANAGER", + FAILED_TO_START_LIVEKIT = "FAILED_TO_START_LIVEKIT", /** LiveKit indicates that the server has hit its track limits */ INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", E2EE_NOT_SUPPORTED = "E2EE_NOT_SUPPORTED", @@ -27,6 +29,7 @@ export enum ErrorCategory { NETWORK_CONNECTIVITY = "NETWORK_CONNECTIVITY", CLIENT_CONFIGURATION = "CLIENT_CONFIGURATION", UNKNOWN = "UNKNOWN", + SYSTEM_FAILURE = "SYSTEM_FAILURE", // SYSTEM_FAILURE / FEDERATION_FAILURE .. } @@ -83,6 +86,18 @@ export class ConnectionLostError extends ElementCallError { } } +export class MembershipManagerError extends ElementCallError { + public constructor(error: Error) { + super( + t("error.membership_manager"), + ErrorCode.INTERNAL_MEMBERSHIP_MANAGER, + ErrorCategory.SYSTEM_FAILURE, + t("error.membership_manager_description"), + error, + ); + } +} + export class E2EENotSupportedError extends ElementCallError { public constructor() { super( @@ -120,6 +135,17 @@ export class FailToGetOpenIdToken extends ElementCallError { } } +export class FailToStartLivekitConnection extends ElementCallError { + public constructor(e?: string) { + super( + t("error.failed_to_start_livekit"), + ErrorCode.FAILED_TO_START_LIVEKIT, + ErrorCategory.NETWORK_CONNECTIVITY, + e, + ); + } +} + export class InsufficientCapacityError extends ElementCallError { public constructor() { super( diff --git a/src/utils/test.ts b/src/utils/test.ts index 471d35d8..50a9add0 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -284,6 +284,8 @@ export function mockLivekitRoom( ): LivekitRoom { const livekitRoom = { options: {}, + setE2EEEnabled: vi.fn(), + ...mockEmitter(), ...room, } as Partial as LivekitRoom; @@ -306,7 +308,9 @@ export function mockLocalParticipant( return { isLocal: true, trackPublications: new Map(), - unpublishTracks: async () => Promise.resolve(), + publishTrack: vi.fn(), + unpublishTracks: vi.fn().mockResolvedValue([]), + createTracks: vi.fn(), getTrackPublication: () => ({}) as Partial as LocalTrackPublication, ...mockEmitter(),