diff --git a/src/livekit/MatrixAudioRenderer.test.tsx b/src/livekit/MatrixAudioRenderer.test.tsx index e2464eb8..07592732 100644 --- a/src/livekit/MatrixAudioRenderer.test.tsx +++ b/src/livekit/MatrixAudioRenderer.test.tsx @@ -24,10 +24,9 @@ import { mockMatrixRoomMember, mockMediaDevices, mockRtcMembership, - mockTrack + mockTrack, } from "../utils/test"; - export const TestAudioContextConstructor = vi.fn(() => testAudioContext); const MediaDevicesProvider = MediaDevicesContext.MediaDevicesContext.Provider; @@ -65,8 +64,8 @@ it("should render for member", () => { const p = { id: "test:123", participant: undefined, - member: carol - } + member: carol, + }; const livekitRoom = mockLivekitRoom( {}, { @@ -95,8 +94,8 @@ it("should not render without member", () => { const p = { id: "test:123", participant: undefined, - member: carol - } + member: carol, + }; const livekitRoom = mockLivekitRoom( {}, { @@ -122,8 +121,8 @@ it("should not setup audioContext gain and pan if there is no need to.", () => { const p = { id: "test:123", participant: undefined, - member: carol - } + member: carol, + }; const livekitRoom = mockLivekitRoom( {}, { @@ -160,8 +159,8 @@ it("should setup audioContext gain and pan", () => { const p = { id: "test:123", participant: undefined, - member: carol - } + member: carol, + }; const livekitRoom = mockLivekitRoom( {}, { @@ -173,7 +172,8 @@ it("should setup audioContext gain and pan", () => { + livekitRoom={livekitRoom} + /> , ); diff --git a/src/room/GroupCallView.test.tsx b/src/room/GroupCallView.test.tsx index 22d99b31..8c4a276a 100644 --- a/src/room/GroupCallView.test.tsx +++ b/src/room/GroupCallView.test.tsx @@ -167,7 +167,7 @@ function createGroupCallView( muteStates={muteState} widget={widget} joined={true} - setJoined={function(value: boolean): void { }} + setJoined={function (value: boolean): void {}} /> diff --git a/src/room/RoomPage.tsx b/src/room/RoomPage.tsx index 3924437b..e9527e03 100644 --- a/src/room/RoomPage.tsx +++ b/src/room/RoomPage.tsx @@ -116,20 +116,22 @@ export const RoomPage: FC = () => { const groupCallView = (): ReactNode => { switch (groupCallState.kind) { case "loaded": - return muteStates && ( - + return ( + muteStates && ( + + ) ); case "waitForInvite": case "canKnock": { @@ -148,31 +150,35 @@ export const RoomPage: FC = () => { ); return ( - muteStates && knock?.()} - enterLabel={label} - waitingForInvite={groupCallState.kind === "waitForInvite"} - confineToRoom={confineToRoom} - hideHeader={header !== "standard"} - participantCount={null} - muteStates={muteStates} - onShareClick={null} - /> + muteStates && ( + knock?.()} + enterLabel={label} + waitingForInvite={groupCallState.kind === "waitForInvite"} + confineToRoom={confineToRoom} + hideHeader={header !== "standard"} + participantCount={null} + muteStates={muteStates} + onShareClick={null} + /> + ) ); } case "loading": diff --git a/src/rtcSessionHelpers.test.ts b/src/rtcSessionHelpers.test.ts index 1058628f..258d2f9a 100644 --- a/src/rtcSessionHelpers.test.ts +++ b/src/rtcSessionHelpers.test.ts @@ -23,38 +23,37 @@ vi.mock("./widget", () => ({ ...actualWidget, widget: { api: { - setAlwaysOnScreen: (): void => { - }, - transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() } + setAlwaysOnScreen: (): void => {}, + transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() }, }, - lazyActions: new EventEmitter() - } + lazyActions: new EventEmitter(), + }, })); test("It joins the correct Session", async () => { const focusFromOlderMembership = { type: "livekit", livekit_service_url: "http://my-oldest-member-service-url.com", - livekit_alias: "my-oldest-member-service-alias" + livekit_alias: "my-oldest-member-service-alias", }; const focusConfigFromWellKnown = { type: "livekit", - livekit_service_url: "http://my-well-known-service-url.com" + livekit_service_url: "http://my-well-known-service-url.com", }; const focusConfigFromWellKnown2 = { type: "livekit", - livekit_service_url: "http://my-well-known-service-url2.com" + livekit_service_url: "http://my-well-known-service-url2.com", }; const clientWellKnown = { "org.matrix.msc4143.rtc_foci": [ focusConfigFromWellKnown, - focusConfigFromWellKnown2 - ] + focusConfigFromWellKnown2, + ], }; mockConfig({ - livekit: { livekit_service_url: "http://my-default-service-url.com" } + livekit: { livekit_service_url: "http://my-default-service-url.com" }, }); vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation( @@ -63,7 +62,7 @@ test("It joins the correct Session", async () => { return Promise.resolve(clientWellKnown); } return Promise.resolve({}); - } + }, ); const mockedSession = vi.mocked({ @@ -75,64 +74,67 @@ test("It joins the correct Session", async () => { access_token: "ACCCESS_TOKEN", token_type: "Bearer", matrix_server_name: "localhost", - expires_in: 10000 - }) - } + expires_in: 10000, + }), + }, }, memberships: [], getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership), getOldestMembership: vi.fn().mockReturnValue({ - getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]) + getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]), }), - joinRoomSession: vi.fn() + joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession(mockedSession, { + await enterRTCSession( + mockedSession, + { livekit_alias: "roomId", livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit" + type: "livekit", }, - true); + true, + ); expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith( [ { livekit_alias: "my-oldest-member-service-alias", livekit_service_url: "http://my-oldest-member-service-url.com", - type: "livekit" + type: "livekit", }, { livekit_alias: "roomId", livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit" + type: "livekit", }, { livekit_alias: "roomId", livekit_service_url: "http://my-well-known-service-url2.com", - type: "livekit" + type: "livekit", }, { livekit_alias: "roomId", livekit_service_url: "http://my-default-service-url.com", - type: "livekit" - } + type: "livekit", + }, ], { focus_selection: "oldest_membership", - type: "livekit" + type: "livekit", }, { manageMediaKeys: false, useLegacyMemberEvents: false, useNewMembershipManager: true, - useExperimentalToDeviceTransport: false - } + useExperimentalToDeviceTransport: false, + }, ); }); async function testLeaveRTCSession( cause: "user" | "error", - expectClose: boolean + expectClose: boolean, ): Promise { vi.clearAllMocks(); const session = { leaveRoomSession: vi.fn() } as unknown as MatrixRTCSession; @@ -140,18 +142,18 @@ async function testLeaveRTCSession( expect(session.leaveRoomSession).toHaveBeenCalled(); expect(widget!.api.transport.send).toHaveBeenCalledWith( ElementWidgetActions.HangupCall, - expect.anything() + expect.anything(), ); if (expectClose) { expect(widget!.api.transport.send).toHaveBeenCalledWith( ElementWidgetActions.Close, - expect.anything() + expect.anything(), ); expect(widget!.api.transport.stop).toHaveBeenCalled(); } else { expect(widget!.api.transport.send).not.toHaveBeenCalledWith( ElementWidgetActions.Close, - expect.anything() + expect.anything(), ); expect(widget!.api.transport.stop).not.toHaveBeenCalled(); } @@ -179,24 +181,26 @@ test("It fails with configuration error if no live kit url config is set in fall room: { roomId: "roomId", client: { - getDomain: vi.fn().mockReturnValue("example.org") - } + getDomain: vi.fn().mockReturnValue("example.org"), + }, }, memberships: [], getFocusInUse: vi.fn(), - joinRoomSession: vi.fn() + joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await expect(enterRTCSession( - mockedSession, - { - livekit_alias: "roomId", - livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit" - }, - true - )).rejects.toThrowError( - expect.objectContaining({ code: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT }) + await expect( + enterRTCSession( + mockedSession, + { + livekit_alias: "roomId", + livekit_service_url: "http://my-well-known-service-url.com", + type: "livekit", + }, + true, + ), + ).rejects.toThrowError( + expect.objectContaining({ code: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT }), ); }); @@ -206,9 +210,9 @@ test("It should not fail with configuration error if homeserver config has livek "org.matrix.msc4143.rtc_foci": [ { type: "livekit", - livekit_service_url: "http://my-well-known-service-url.com" - } - ] + livekit_service_url: "http://my-well-known-service-url.com", + }, + ], }); const mockedSession = vi.mocked({ @@ -220,19 +224,22 @@ test("It should not fail with configuration error if homeserver config has livek access_token: "ACCCESS_TOKEN", token_type: "Bearer", matrix_server_name: "localhost", - expires_in: 10000 - }) - } + expires_in: 10000, + }), + }, }, memberships: [], getFocusInUse: vi.fn(), - joinRoomSession: vi.fn() + joinRoomSession: vi.fn(), }) as unknown as MatrixRTCSession; - await enterRTCSession(mockedSession, { + await enterRTCSession( + mockedSession, + { livekit_alias: "roomId", livekit_service_url: "http://my-well-known-service-url.com", - type: "livekit" + type: "livekit", }, - true); + true, + ); }); diff --git a/src/state/Async.ts b/src/state/Async.ts index 45676759..79de4140 100644 --- a/src/state/Async.ts +++ b/src/state/Async.ts @@ -5,14 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { - catchError, - from, - map, - type Observable, - of, - startWith -} from "rxjs"; +import { catchError, from, map, type Observable, of, startWith } from "rxjs"; // TODO where are all the comments? ::cry:: // There used to be an unitialized state!, a state might not start in loading @@ -34,13 +27,15 @@ export function async$(promise: Promise): Observable> { return from(promise).pipe( map(ready), startWith(loading), - catchError((e: unknown) => of(error(e as Error ?? new Error("Unknown error")))), + catchError((e: unknown) => + of(error((e as Error) ?? new Error("Unknown error"))), + ), ); } export function mapAsync( async: Async, - project: (value: A) => B + project: (value: A) => B, ): Async { return async.state === "ready" ? ready(project(async.value)) : async; } diff --git a/src/state/CallViewModel.test.ts b/src/state/CallViewModel.test.ts index d9cad2b7..acc6a991 100644 --- a/src/state/CallViewModel.test.ts +++ b/src/state/CallViewModel.test.ts @@ -352,7 +352,10 @@ function withCallViewModel( options, raisedHands$, reactions$, - new BehaviorSubject({ processor: undefined, supported: undefined }), + new BehaviorSubject({ + processor: undefined, + supported: undefined, + }), ); onTestFinished(() => { diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 4b8ff879..f517908f 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -126,7 +126,11 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { type Connection, type ConnectionOpts, RemoteConnection } from "./Connection"; +import { + type Connection, + type ConnectionOpts, + RemoteConnection, +} from "./Connection"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; @@ -485,7 +489,6 @@ export class CallViewModel extends ViewModel { ), ); - /** * The MatrixRTC session participants. */ @@ -574,7 +577,6 @@ export class CallViewModel extends ViewModel { (transport) => transport && mapAsync(transport, (transport) => { - const opts: ConnectionOpts = { transport, client: this.matrixRTCSession.room.client, @@ -582,15 +584,16 @@ export class CallViewModel extends ViewModel { remoteTransports$: this.remoteTransports$, }; return { - connection: new PublishConnection( - opts, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ), - transport, - }}), + connection: new PublishConnection( + opts, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + transport, + }; + }), ), ), ); @@ -605,14 +608,14 @@ export class CallViewModel extends ViewModel { this.localConnection$.pipe( switchMap((c) => c?.state === "ready" - // TODO mapping to ConnectionState for compatibility, but we should use the full state? - ? c.value.focusedConnectionState$.pipe( - map((s) => { - if (s.state === "ConnectedToLkRoom") return s.connectionState; - return ConnectionState.Disconnected - }), - distinctUntilChanged(), - ) + ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? + c.value.focusedConnectionState$.pipe( + map((s) => { + if (s.state === "ConnectedToLkRoom") return s.connectionState; + return ConnectionState.Disconnected; + }), + distinctUntilChanged(), + ) : of(ConnectionState.Disconnected), ), ), @@ -659,8 +662,11 @@ export class CallViewModel extends ViewModel { client: this.matrixRTCSession.room.client, scope: this.scope, remoteTransports$: this.remoteTransports$, - } - nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions()); + }; + nextConnection = new RemoteConnection( + args, + this.e2eeLivekitOptions(), + ); } else { logger.log( "SFU remoteConnections$ use prev connection: ", @@ -1952,16 +1958,20 @@ export class CallViewModel extends ViewModel { .pipe(this.scope.bind()) .subscribe(({ start, stop }) => { for (const c of stop) { - logger.info(`Disconnecting from ${c.localTransport.livekit_service_url}`); + logger.info( + `Disconnecting from ${c.localTransport.livekit_service_url}`, + ); c.stop().catch((err) => { // TODO: better error handling logger.error("MuteState: handler error", err); - });; + }); } for (const c of start) { c.start().then( () => - logger.info(`Connected to ${c.localTransport.livekit_service_url}`), + logger.info( + `Connected to ${c.localTransport.livekit_service_url}`, + ), (e) => logger.error( `Failed to start connection to ${c.localTransport.livekit_service_url}`, diff --git a/src/state/Connection.test.ts b/src/state/Connection.test.ts index 74a61515..69942270 100644 --- a/src/state/Connection.test.ts +++ b/src/state/Connection.test.ts @@ -5,21 +5,37 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { afterEach, describe, expect, it, type Mock, type MockedObject, vi } from "vitest"; +import { + afterEach, + describe, + expect, + it, + type Mock, + type MockedObject, + vi, +} from "vitest"; import { BehaviorSubject, of } from "rxjs"; import { ConnectionState, type LocalParticipant, type RemoteParticipant, type Room as LivekitRoom, - RoomEvent, type RoomOptions + RoomEvent, + type RoomOptions, } from "livekit-client"; import fetchMock from "fetch-mock"; import EventEmitter from "events"; import { type IOpenIDToken } from "matrix-js-sdk"; -import type { CallMembership, LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; -import { type ConnectionOpts, type FocusConnectionState, RemoteConnection } from "./Connection.ts"; +import type { + CallMembership, + LivekitTransport, +} from "matrix-js-sdk/lib/matrixrtc"; +import { + type ConnectionOpts, + type FocusConnectionState, + RemoteConnection, +} from "./Connection.ts"; import { ObservableScope } from "./ObservableScope.ts"; import { type OpenIDClientParts } from "../livekit/openIDSFU.ts"; import { FailToGetOpenIdToken } from "../utils/errors.ts"; @@ -38,28 +54,30 @@ let localParticipantEventEmiter: EventEmitter; let fakeLocalParticipant: MockedObject; let fakeRoomEventEmiter: EventEmitter; -let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>; +let fakeMembershipsFocusMap$: BehaviorSubject< + { membership: CallMembership; transport: LivekitTransport }[] +>; const livekitFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", - type: "livekit" + type: "livekit", }; function setupTest(): void { testScope = new ObservableScope(); client = vi.mocked({ - getOpenIdToken: vi.fn().mockResolvedValue( - { - "access_token": "rYsmGUEwNjKgJYyeNUkZseJN", - "token_type": "Bearer", - "matrix_server_name": "example.org", - "expires_in": 3600 - } - ), - getDeviceId: vi.fn().mockReturnValue("ABCDEF") + getOpenIdToken: vi.fn().mockResolvedValue({ + access_token: "rYsmGUEwNjKgJYyeNUkZseJN", + token_type: "Bearer", + matrix_server_name: "example.org", + expires_in: 3600, + }), + getDeviceId: vi.fn().mockReturnValue("ABCDEF"), } as unknown as OpenIDClientParts); - fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>([]); + fakeMembershipsFocusMap$ = new BehaviorSubject< + { membership: CallMembership; transport: LivekitTransport }[] + >([]); localParticipantEventEmiter = new EventEmitter(); @@ -69,9 +87,15 @@ function setupTest(): void { getTrackPublication: vi.fn().mockReturnValue(undefined), on: localParticipantEventEmiter.on.bind(localParticipantEventEmiter), off: localParticipantEventEmiter.off.bind(localParticipantEventEmiter), - addListener: localParticipantEventEmiter.addListener.bind(localParticipantEventEmiter), - removeListener: localParticipantEventEmiter.removeListener.bind(localParticipantEventEmiter), - removeAllListeners: localParticipantEventEmiter.removeAllListeners.bind(localParticipantEventEmiter) + addListener: localParticipantEventEmiter.addListener.bind( + localParticipantEventEmiter, + ), + removeListener: localParticipantEventEmiter.removeListener.bind( + localParticipantEventEmiter, + ), + removeAllListeners: localParticipantEventEmiter.removeAllListeners.bind( + localParticipantEventEmiter, + ), } as unknown as LocalParticipant); fakeRoomEventEmiter = new EventEmitter(); @@ -84,56 +108,45 @@ function setupTest(): void { on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter), off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter), addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter), - removeListener: fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), - removeAllListeners: fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), - setE2EEEnabled: vi.fn().mockResolvedValue(undefined) + removeListener: + fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), + removeAllListeners: + fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), + setE2EEEnabled: vi.fn().mockResolvedValue(undefined), } as unknown as LivekitRoom); - } function setupRemoteConnection(): RemoteConnection { - const opts: ConnectionOpts = { client: client, transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom + livekitRoomFactory: () => fakeLivekitRoom, }; - fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, - () => { - return { - status: 200, - body: - { - "url": "wss://matrix-rtc.m.localhost/livekit/sfu", - "jwt": "ATOKEN" - } - }; - } - ); + fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, () => { + return { + status: 200, + body: { + url: "wss://matrix-rtc.m.localhost/livekit/sfu", + jwt: "ATOKEN", + }, + }; + }); - fakeLivekitRoom - .connect - .mockResolvedValue(undefined); + fakeLivekitRoom.connect.mockResolvedValue(undefined); - return new RemoteConnection( - opts, - undefined - ); + return new RemoteConnection(opts, undefined); } - afterEach(() => { vi.useRealTimers(); vi.clearAllMocks(); fetchMock.reset(); }); - describe("Start connection states", () => { - it("start in initialized state", () => { setupTest(); @@ -142,15 +155,13 @@ describe("Start connection states", () => { transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom + livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new RemoteConnection( - opts, - undefined - ); + const connection = new RemoteConnection(opts, undefined); - expect(connection.focusedConnectionState$.getValue().state) - .toEqual("Initialized"); + expect(connection.focusedConnectionState$.getValue().state).toEqual( + "Initialized", + ); }); it("fail to getOpenId token then error state", async () => { @@ -162,31 +173,27 @@ describe("Start connection states", () => { transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom + livekitRoomFactory: () => fakeLivekitRoom, }; - - const connection = new RemoteConnection( - opts, - undefined - ); + const connection = new RemoteConnection(opts, undefined); const capturedStates: FocusConnectionState[] = []; connection.focusedConnectionState$.subscribe((value) => { capturedStates.push(value); }); - const deferred = Promise.withResolvers(); - client.getOpenIdToken.mockImplementation(async (): Promise => { - return await deferred.promise; - }); + client.getOpenIdToken.mockImplementation( + async (): Promise => { + return await deferred.promise; + }, + ); - connection.start() - .catch(() => { - // expected to throw - }); + connection.start().catch(() => { + // expected to throw + }); let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); @@ -199,11 +206,14 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState!.state === "FailedToStart") { expect(capturedState!.error.message).toEqual("Something went wrong"); - expect(capturedState!.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); + expect(capturedState!.focus.livekit_alias).toEqual( + livekitFocus.livekit_alias, + ); } else { - expect.fail("Expected FailedToStart state but got " + capturedState?.state); + expect.fail( + "Expected FailedToStart state but got " + capturedState?.state, + ); } - }); it("fail to get JWT token and error state", async () => { @@ -215,13 +225,10 @@ describe("Start connection states", () => { transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom + livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new RemoteConnection( - opts, - undefined - ); + const connection = new RemoteConnection(opts, undefined); const capturedStates: FocusConnectionState[] = []; connection.focusedConnectionState$.subscribe((value) => { @@ -230,21 +237,17 @@ describe("Start connection states", () => { const deferredSFU = Promise.withResolvers(); // mock the /sfu/get call - fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, - async () => { - await deferredSFU.promise; - return { - status: 500, - body: "Internal Server Error" - }; - } - ); + fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, async () => { + await deferredSFU.promise; + return { + status: 500, + body: "Internal Server Error", + }; + }); - - connection.start() - .catch(() => { - // expected to throw - }); + connection.start().catch(() => { + // expected to throw + }); let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); @@ -256,15 +259,19 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState?.state === "FailedToStart") { - expect(capturedState?.error.message).toContain("SFU Config fetch failed with exception Error"); - expect(capturedState?.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); + expect(capturedState?.error.message).toContain( + "SFU Config fetch failed with exception Error", + ); + expect(capturedState?.focus.livekit_alias).toEqual( + livekitFocus.livekit_alias, + ); } else { - expect.fail("Expected FailedToStart state but got " + capturedState?.state); + expect.fail( + "Expected FailedToStart state but got " + capturedState?.state, + ); } - }); - it("fail to connect to livekit error state", async () => { setupTest(); vi.useFakeTimers(); @@ -274,46 +281,36 @@ describe("Start connection states", () => { transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: () => fakeLivekitRoom + livekitRoomFactory: () => fakeLivekitRoom, }; - const connection = new RemoteConnection( - opts, - undefined - ); + const connection = new RemoteConnection(opts, undefined); const capturedStates: FocusConnectionState[] = []; connection.focusedConnectionState$.subscribe((value) => { capturedStates.push(value); }); - const deferredSFU = Promise.withResolvers(); // mock the /sfu/get call - fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, - () => { - return { - status: 200, - body: - { - "url": "wss://matrix-rtc.m.localhost/livekit/sfu", - "jwt": "ATOKEN" - } - }; - } - ); + fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, () => { + return { + status: 200, + body: { + url: "wss://matrix-rtc.m.localhost/livekit/sfu", + jwt: "ATOKEN", + }, + }; + }); - fakeLivekitRoom - .connect - .mockImplementation(async () => { - await deferredSFU.promise; - throw new Error("Failed to connect to livekit"); - }); + fakeLivekitRoom.connect.mockImplementation(async () => { + await deferredSFU.promise; + throw new Error("Failed to connect to livekit"); + }); - connection.start() - .catch(() => { - // expected to throw - }); + connection.start().catch(() => { + // expected to throw + }); let capturedState = capturedStates.pop(); expect(capturedState).toBeDefined(); @@ -326,12 +323,17 @@ describe("Start connection states", () => { capturedState = capturedStates.pop(); if (capturedState && capturedState?.state === "FailedToStart") { - expect(capturedState.error.message).toContain("Failed to connect to livekit"); - expect(capturedState.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); + expect(capturedState.error.message).toContain( + "Failed to connect to livekit", + ); + expect(capturedState.focus.livekit_alias).toEqual( + livekitFocus.livekit_alias, + ); } else { - expect.fail("Expected FailedToStart state but got " + JSON.stringify(capturedState)); + expect.fail( + "Expected FailedToStart state but got " + JSON.stringify(capturedState), + ); } - }); it("connection states happy path", async () => { @@ -356,7 +358,6 @@ describe("Start connection states", () => { expect(connectingState?.state).toEqual("ConnectingToLkRoom"); const connectedState = capturedState.shift(); expect(connectedState?.state).toEqual("ConnectedToLkRoom"); - }); it("should relay livekit events once connected", async () => { @@ -378,7 +379,7 @@ describe("Start connection states", () => { ConnectionState.SignalReconnecting, ConnectionState.Connecting, ConnectionState.Connected, - ConnectionState.Reconnecting + ConnectionState.Reconnecting, ]; for (const state of states) { fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); @@ -387,12 +388,18 @@ describe("Start connection states", () => { for (const state of states) { const s = capturedState.shift(); expect(s?.state).toEqual("ConnectedToLkRoom"); - const connectedState = s as FocusConnectionState & { state: "ConnectedToLkRoom" }; + const connectedState = s as FocusConnectionState & { + state: "ConnectedToLkRoom"; + }; expect(connectedState.connectionState).toEqual(state); // should always have the focus info - expect(connectedState.focus.livekit_alias).toEqual(livekitFocus.livekit_alias); - expect(connectedState.focus.livekit_service_url).toEqual(livekitFocus.livekit_service_url); + expect(connectedState.focus.livekit_alias).toEqual( + livekitFocus.livekit_alias, + ); + expect(connectedState.focus.livekit_service_url).toEqual( + livekitFocus.livekit_service_url, + ); } // If the state is not ConnectedToLkRoom, no events should be relayed anymore @@ -403,10 +410,8 @@ describe("Start connection states", () => { } expect(capturedState.length).toEqual(0); - }); - it("shutting down the scope should stop the connection", async () => { setupTest(); vi.useFakeTimers(); @@ -423,7 +428,6 @@ describe("Start connection states", () => { const stopSpy = vi.spyOn(connection, "stop"); testScope.end(); - expect(stopSpy).toHaveBeenCalled(); expect(fakeLivekitRoom.disconnect).toHaveBeenCalled(); @@ -437,26 +441,22 @@ describe("Start connection states", () => { expect(capturedState.length).toEqual(0); }); - }); - function fakeRemoteLivekitParticipant(id: string): RemoteParticipant { return vi.mocked({ - identity: id + identity: id, } as unknown as RemoteParticipant); } function fakeRtcMemberShip(userId: string, deviceId: string): CallMembership { return vi.mocked({ sender: userId, - deviceId: deviceId + deviceId: deviceId, } as unknown as CallMembership); } describe("Publishing participants observations", () => { - - it("should emit the list of publishing participants", async () => { setupTest(); @@ -464,13 +464,24 @@ describe("Publishing participants observations", () => { const bobIsAPublisher = Promise.withResolvers(); const danIsAPublisher = Promise.withResolvers(); - const observedPublishers: { participant: RemoteParticipant; membership: CallMembership }[][] = []; + const observedPublishers: { + participant: RemoteParticipant; + membership: CallMembership; + }[][] = []; connection.publishingParticipants$.subscribe((publishers) => { observedPublishers.push(publishers); - if (publishers.some((p) => p.participant.identity === "@bob:example.org:DEV111")) { + if ( + publishers.some( + (p) => p.participant.identity === "@bob:example.org:DEV111", + ) + ) { bobIsAPublisher.resolve(); } - if (publishers.some((p) => p.participant.identity === "@dan:example.org:DEV333")) { + if ( + publishers.some( + (p) => p.participant.identity === "@dan:example.org:DEV333", + ) + ) { danIsAPublisher.resolve(); } }); @@ -482,14 +493,13 @@ describe("Publishing participants observations", () => { fakeRemoteLivekitParticipant("@alice:example.org:DEV000"), fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), fakeRemoteLivekitParticipant("@carol:example.org:DEV222"), - fakeRemoteLivekitParticipant("@dan:example.org:DEV333") + fakeRemoteLivekitParticipant("@dan:example.org:DEV333"), ]; // Let's simulate 3 members on the livekitRoom - vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") - .mockReturnValue( - new Map(participants.map((p) => [p.identity, p])) - ); + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue( + new Map(participants.map((p) => [p.identity, p])), + ); for (const participant of participants) { fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); @@ -498,20 +508,27 @@ describe("Publishing participants observations", () => { // At this point there should be no publishers expect(observedPublishers.pop()!.length).toEqual(0); - const otherFocus: LivekitTransport = { livekit_alias: "!roomID:example.org", livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt", - type: "livekit" + type: "livekit", }; - const rtcMemberships = [ // Say bob is on the same focus - { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus }, + { + membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), + transport: livekitFocus, + }, // Alice and carol is on a different focus - { membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), transport: otherFocus }, - { membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), transport: otherFocus } + { + membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), + transport: otherFocus, + }, + { + membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), + transport: otherFocus, + }, // NO DAVE YET ]; // signal this change in rtc memberships @@ -521,53 +538,74 @@ describe("Publishing participants observations", () => { await bobIsAPublisher.promise; const publishers = observedPublishers.pop(); expect(publishers?.length).toEqual(1); - expect(publishers?.[0].participant.identity).toEqual("@bob:example.org:DEV111"); + expect(publishers?.[0].participant.identity).toEqual( + "@bob:example.org:DEV111", + ); // Now let's make dan join the rtc memberships - rtcMemberships - .push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), transport: livekitFocus }); + rtcMemberships.push({ + membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), + transport: livekitFocus, + }); fakeMembershipsFocusMap$.next(rtcMemberships); // We should have bob and dan has publishers now await danIsAPublisher.promise; const twoPublishers = observedPublishers.pop(); expect(twoPublishers?.length).toEqual(2); - expect(twoPublishers?.some((p) => p.participant.identity === "@bob:example.org:DEV111")).toBeTruthy(); - expect(twoPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy(); + expect( + twoPublishers?.some( + (p) => p.participant.identity === "@bob:example.org:DEV111", + ), + ).toBeTruthy(); + expect( + twoPublishers?.some( + (p) => p.participant.identity === "@dan:example.org:DEV333", + ), + ).toBeTruthy(); // Now let's make bob leave the livekit room - participants = participants.filter((p) => p.identity !== "@bob:example.org:DEV111"); - vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") - .mockReturnValue( - new Map(participants.map((p) => [p.identity, p])) - ); - fakeRoomEventEmiter.emit(RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111")); + participants = participants.filter( + (p) => p.identity !== "@bob:example.org:DEV111", + ); + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue( + new Map(participants.map((p) => [p.identity, p])), + ); + fakeRoomEventEmiter.emit( + RoomEvent.ParticipantDisconnected, + fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), + ); const updatedPublishers = observedPublishers.pop(); expect(updatedPublishers?.length).toEqual(1); - expect(updatedPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy(); + expect( + updatedPublishers?.some( + (p) => p.participant.identity === "@dan:example.org:DEV333", + ), + ).toBeTruthy(); }); - it("should be scoped to parent scope", (): void => { setupTest(); const connection = setupRemoteConnection(); - let observedPublishers: { participant: RemoteParticipant; membership: CallMembership }[][] = []; + let observedPublishers: { + participant: RemoteParticipant; + membership: CallMembership; + }[][] = []; connection.publishingParticipants$.subscribe((publishers) => { observedPublishers.push(publishers); }); let participants: RemoteParticipant[] = [ - fakeRemoteLivekitParticipant("@bob:example.org:DEV111") + fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), ]; // Let's simulate 3 members on the livekitRoom - vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") - .mockReturnValue( - new Map(participants.map((p) => [p.identity, p])) - ); + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue( + new Map(participants.map((p) => [p.identity, p])), + ); for (const participant of participants) { fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); @@ -578,7 +616,10 @@ describe("Publishing participants observations", () => { const rtcMemberships = [ // Say bob is on the same focus - { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus } + { + membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), + transport: livekitFocus, + }, ]; // signal this change in rtc memberships fakeMembershipsFocusMap$.next(rtcMemberships); @@ -586,27 +627,31 @@ describe("Publishing participants observations", () => { // We should have bob has a publisher now const publishers = observedPublishers.pop(); expect(publishers?.length).toEqual(1); - expect(publishers?.[0].participant.identity).toEqual("@bob:example.org:DEV111"); + expect(publishers?.[0].participant.identity).toEqual( + "@bob:example.org:DEV111", + ); // end the parent scope testScope.end(); observedPublishers = []; // SHOULD NOT emit any more publishers as the scope is ended - participants = participants.filter((p) => p.identity !== "@bob:example.org:DEV111"); - vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get") - .mockReturnValue( - new Map(participants.map((p) => [p.identity, p])) - ); - fakeRoomEventEmiter.emit(RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111")); + participants = participants.filter( + (p) => p.identity !== "@bob:example.org:DEV111", + ); + vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue( + new Map(participants.map((p) => [p.identity, p])), + ); + fakeRoomEventEmiter.emit( + RoomEvent.ParticipantDisconnected, + fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), + ); expect(observedPublishers.length).toEqual(0); }); }); - describe("PublishConnection", () => { - // let fakeBlurProcessor: ProcessorWrapper; let roomFactoryMock: Mock<() => LivekitRoom>; let muteStates: MockedObject; @@ -616,7 +661,6 @@ describe("PublishConnection", () => { roomFactoryMock = vi.fn().mockReturnValue(fakeLivekitRoom); - muteStates = mockMuteStates(); // fakeBlurProcessor = vi.mocked>({ @@ -626,20 +670,15 @@ describe("PublishConnection", () => { // getOptions: vi.fn().mockReturnValue({ strength: 0.5 }), // isRunning: vi.fn().mockReturnValue(false) // }); - - } - describe("Livekit room creation", () => { - - function createSetup(): void { setUpPublishConnection(); const fakeTrackProcessorSubject$ = new BehaviorSubject({ supported: true, - processor: undefined + processor: undefined, }); const opts: ConnectionOpts = { @@ -647,28 +686,25 @@ describe("PublishConnection", () => { transport: livekitFocus, remoteTransports$: fakeMembershipsFocusMap$, scope: testScope, - livekitRoomFactory: roomFactoryMock + livekitRoomFactory: roomFactoryMock, }; const audioInput = { available$: of(new Map([["mic1", { id: "mic1" }]])), selected$: new BehaviorSubject({ id: "mic1" }), - select(): void { - } + select(): void {}, }; const videoInput = { available$: of(new Map([["cam1", { id: "cam1" }]])), selected$: new BehaviorSubject({ id: "cam1" }), - select(): void { - } + select(): void {}, }; const audioOutput = { available$: of(new Map([["speaker", { id: "speaker" }]])), selected$: new BehaviorSubject({ id: "speaker" }), - select(): void { - } + select(): void {}, }; // TODO understand what is wrong with our mocking that requires ts-expect-error @@ -678,7 +714,7 @@ describe("PublishConnection", () => { // @ts-expect-error Mocking only videoInput, // @ts-expect-error Mocking only - audioOutput + audioOutput, }); new PublishConnection( @@ -686,18 +722,17 @@ describe("PublishConnection", () => { fakeDevices, muteStates, undefined, - fakeTrackProcessorSubject$ + fakeTrackProcessorSubject$, ); - } it("should create room with proper initial audio and video settings", () => { - createSetup(); expect(roomFactoryMock).toHaveBeenCalled(); - const lastCallArgs = roomFactoryMock.mock.calls[roomFactoryMock.mock.calls.length - 1]; + const lastCallArgs = + roomFactoryMock.mock.calls[roomFactoryMock.mock.calls.length - 1]; const roomOptions = lastCallArgs.pop() as unknown as RoomOptions; expect(roomOptions).toBeDefined(); @@ -705,7 +740,6 @@ describe("PublishConnection", () => { expect(roomOptions!.videoCaptureDefaults?.deviceId).toEqual("cam1"); expect(roomOptions!.audioCaptureDefaults?.deviceId).toEqual("mic1"); expect(roomOptions!.audioOutput?.deviceId).toEqual("speaker"); - }); it("respect controlledAudioDevices", () => { @@ -719,7 +753,6 @@ describe("PublishConnection", () => { // }) // }; // }); - }); }); }); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 42423938..e5e108b7 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -5,12 +5,27 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; -import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client"; -import { type CallMembership, type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; +import { + connectedParticipantsObserver, + connectionStateObserver, +} from "@livekit/components-core"; +import { + type ConnectionState, + type E2EEOptions, + Room as LivekitRoom, + type RoomOptions, +} from "livekit-client"; +import { + type CallMembership, + type LivekitTransport, +} from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest } from "rxjs"; -import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; +import { + getSFUConfigWithOpenID, + type OpenIDClientParts, + type SFUConfig, +} from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; import { type ObservableScope } from "./ObservableScope"; import { defaultLiveKitOptions } from "../livekit/options"; @@ -23,20 +38,26 @@ export interface ConnectionOpts { /** The observable scope to use for this connection. */ scope: ObservableScope; /** An observable of the current RTC call memberships and their associated focus. */ - remoteTransports$: Behavior<{ membership: CallMembership; transport: LivekitTransport }[]>; + remoteTransports$: Behavior< + { membership: CallMembership; transport: LivekitTransport }[] + >; /** Optional factory to create the Livekit room, mainly for testing purposes. */ livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; } export type FocusConnectionState = - | { state: 'Initialized' } - | { state: 'FetchingConfig', focus: LivekitTransport } - | { state: 'ConnectingToLkRoom', focus: LivekitTransport } - | { state: 'PublishingTracks', focus: LivekitTransport } - | { state: 'FailedToStart', error: Error, focus: LivekitTransport } - | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitTransport } - | { state: 'Stopped', focus: LivekitTransport }; + | { state: "Initialized" } + | { state: "FetchingConfig"; focus: LivekitTransport } + | { state: "ConnectingToLkRoom"; focus: LivekitTransport } + | { state: "PublishingTracks"; focus: LivekitTransport } + | { state: "FailedToStart"; error: Error; focus: LivekitTransport } + | { + state: "ConnectedToLkRoom"; + connectionState: ConnectionState; + focus: LivekitTransport; + } + | { state: "Stopped"; focus: LivekitTransport }; /** * A connection to a Matrix RTC LiveKit backend. @@ -44,10 +65,9 @@ export type FocusConnectionState = * Expose observables for participants and connection state. */ export class Connection { - // Private Behavior - private readonly _focusedConnectionState$ - = new BehaviorSubject({ state: 'Initialized' }); + private readonly _focusedConnectionState$ = + new BehaviorSubject({ state: "Initialized" }); /** * The current state of the connection to the focus server. @@ -71,31 +91,44 @@ export class Connection { public async start(): Promise { this.stopped = false; try { - this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.localTransport }); + this._focusedConnectionState$.next({ + state: "FetchingConfig", + focus: this.localTransport, + }); // TODO could this be loaded earlier to save time? const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; - this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.localTransport }); + this._focusedConnectionState$.next({ + state: "ConnectingToLkRoom", + focus: this.localTransport, + }); await this.livekitRoom.connect(url, jwt); // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; - this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.localTransport, connectionState: this.livekitRoom.state }); + this._focusedConnectionState$.next({ + state: "ConnectedToLkRoom", + focus: this.localTransport, + connectionState: this.livekitRoom.state, + }); } catch (error) { - this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.localTransport }); + this._focusedConnectionState$.next({ + state: "FailedToStart", + error: error instanceof Error ? error : new Error(`${error}`), + focus: this.localTransport, + }); throw error; } } - protected async getSFUConfigWithOpenID(): Promise { return await getSFUConfigWithOpenID( this.client, this.localTransport.livekit_service_url, - this.localTransport.livekit_alias - ) + this.localTransport.livekit_alias, + ); } /** * Stops the connection. @@ -106,11 +139,13 @@ export class Connection { public async stop(): Promise { if (this.stopped) return; await this.livekitRoom.disconnect(); - this._focusedConnectionState$.next({ state: 'Stopped', focus: this.localTransport }); + this._focusedConnectionState$.next({ + state: "Stopped", + focus: this.localTransport, + }); this.stopped = true; } - /** * An observable of the participants that are publishing on this connection. * This is derived from `participantsIncludingSubscribers$` and `membershipsFocusMap$`. @@ -135,20 +170,20 @@ export class Connection { public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, ) { - const { transport, client, scope, remoteTransports$ } = - opts; + const { transport, client, scope, remoteTransports$ } = opts; - this.livekitRoom = livekitRoom + this.livekitRoom = livekitRoom; this.localTransport = transport; this.client = client; this.focusedConnectionState$ = scope.behavior( - this._focusedConnectionState$, { state: 'Initialized' } + this._focusedConnectionState$, + { state: "Initialized" }, ); const participantsIncludingSubscribers$ = scope.behavior( connectedParticipantsObserver(this.livekitRoom), - [] + [], ); this.publishingParticipants$ = scope.behavior( @@ -161,7 +196,7 @@ export class Connection { transport.livekit_service_url === this.localTransport.livekit_service_url ? [membership] - : [] + : [], ) // Pair with their associated LiveKit participant (if any) // Uses flatMap to filter out memberships with no associated rtc participant ([]) @@ -171,18 +206,22 @@ export class Connection { return participant ? [{ participant, membership }] : []; }), ), - [] + [], ); - scope.behavior( - connectionStateObserver(this.livekitRoom) - ).subscribe((connectionState) => { - const current = this._focusedConnectionState$.value; - // Only update the state if we are already connected to the LiveKit room. - if (current.state === 'ConnectedToLkRoom') { - this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', connectionState, focus: current.focus }); - } - }); + scope + .behavior(connectionStateObserver(this.livekitRoom)) + .subscribe((connectionState) => { + const current = this._focusedConnectionState$.value; + // Only update the state if we are already connected to the LiveKit room. + if (current.state === "ConnectedToLkRoom") { + this._focusedConnectionState$.next({ + state: "ConnectedToLkRoom", + connectionState, + focus: current.focus, + }); + } + }); scope.onEnd(() => void this.stop()); } @@ -195,17 +234,21 @@ export class Connection { * It does not publish any local tracks. */ export class RemoteConnection extends Connection { - /** * Creates a new remote connection to a matrix RTC LiveKit backend. * @param opts * @param sharedE2eeOption - The shared E2EE options to use for the connection. */ - public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) { - const factory = opts.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); + public constructor( + opts: ConnectionOpts, + sharedE2eeOption: E2EEOptions | undefined, + ) { + const factory = + opts.livekitRoomFactory ?? + ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); const livekitRoom = factory({ ...defaultLiveKitOptions, - e2ee: sharedE2eeOption + e2ee: sharedE2eeOption, }); super(livekitRoom, opts); } diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts index 8381c092..c35c71e4 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/PublishConnection.ts @@ -10,15 +10,24 @@ import { LocalVideoTrack, Room as LivekitRoom, type RoomOptions, - Track + Track, } from "livekit-client"; -import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; +import { + map, + NEVER, + type Observable, + type Subscription, + switchMap, +} from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import type { Behavior } from "./Behavior.ts"; import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; import type { MuteStates } from "./MuteStates.ts"; -import { type ProcessorState, trackProcessorSync } from "../livekit/TrackProcessorContext.tsx"; +import { + type ProcessorState, + trackProcessorSync, +} from "../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../UrlParams.ts"; import { defaultLiveKitOptions } from "../livekit/options.ts"; import { getValue } from "../utils/observable.ts"; @@ -31,7 +40,6 @@ import { type ObservableScope } from "./ObservableScope.ts"; * This connection will publish the local user's audio and video tracks. */ export class PublishConnection extends Connection { - /** * Creates a new PublishConnection. * @param args - The connection options. {@link ConnectionOpts} @@ -45,15 +53,22 @@ export class PublishConnection extends Connection { devices: MediaDevices, private readonly muteStates: MuteStates, e2eeLivekitOptions: E2EEOptions | undefined, - trackerProcessorState$: Behavior + trackerProcessorState$: Behavior, ) { const { scope } = args; logger.info("[LivekitRoom] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); - const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); + const factory = + args.livekitRoomFactory ?? + ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); const room = factory( - generateRoomOption(devices, trackerProcessorState$.value, controlledAudioDevices, e2eeLivekitOptions) + generateRoomOption( + devices, + trackerProcessorState$.value, + controlledAudioDevices, + e2eeLivekitOptions, + ), ); room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => { logger.error("Failed to set E2EE enabled on room", e); @@ -83,14 +98,14 @@ export class PublishConnection extends Connection { public async start(): Promise { this.stopped = false; - await super.start() + await super.start(); if (this.stopped) return; // TODO this can throw errors? It will also prompt for permissions if not already granted const tracks = await this.livekitRoom.localParticipant.createTracks({ audio: this.muteStates.audio.enabled$.value, - video: this.muteStates.video.enabled$.value + video: this.muteStates.video.enabled$.value, }); if (this.stopped) return; for (const track of tracks) { @@ -100,7 +115,7 @@ export class PublishConnection extends Connection { if (this.stopped) return; // TODO: check if the connection is still active? and break the loop if not? } - }; + } /// Private methods @@ -112,17 +127,19 @@ export class PublishConnection extends Connection { // in the LocalParticipant object for the track object and there's not a nice // way to do that generically. There is usually no OS-level default video capture // device anyway, and audio outputs work differently. - private workaroundRestartAudioInputTrackChrome(devices: MediaDevices, scope: ObservableScope): void { - + private workaroundRestartAudioInputTrackChrome( + devices: MediaDevices, + scope: ObservableScope, + ): void { devices.audioInput.selected$ .pipe( switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), - scope.bind() + scope.bind(), ) .subscribe(() => { if (this.livekitRoom.state != ConnectionState.Connected) return; const activeMicTrack = Array.from( - this.livekitRoom.localParticipant.audioTrackPublications.values() + this.livekitRoom.localParticipant.audioTrackPublications.values(), ).find((d) => d.source === Track.Source.Microphone)?.track; if ( @@ -147,11 +164,15 @@ export class PublishConnection extends Connection { }); } -// Observe changes in the selected media devices and update the LiveKit room accordingly. - private observeMediaDevices(scope: ObservableScope, devices: MediaDevices, controlledAudioDevices: boolean):void { + // Observe changes in the selected media devices and update the LiveKit room accordingly. + private observeMediaDevices( + scope: ObservableScope, + devices: MediaDevices, + controlledAudioDevices: boolean, + ): void { const syncDevice = ( kind: MediaDeviceKind, - selected$: Observable + selected$: Observable, ): Subscription => selected$.pipe(scope.bind()).subscribe((device) => { if (this.livekitRoom.state != ConnectionState.Connected) return; @@ -160,7 +181,7 @@ export class PublishConnection extends Connection { "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", this.livekitRoom.getActiveDevice(kind), " !== ", - device?.id + device?.id, ); if ( device !== undefined && @@ -169,7 +190,7 @@ export class PublishConnection extends Connection { this.livekitRoom .switchActiveDevice(kind, device.id) .catch((e) => - logger.error(`Failed to sync ${kind} device with LiveKit`, e) + logger.error(`Failed to sync ${kind} device with LiveKit`, e), ); } }); @@ -208,21 +229,23 @@ export class PublishConnection extends Connection { }); } - private observeTrackProcessors(scope: ObservableScope, room: LivekitRoom, trackerProcessorState$: Behavior): void { + private observeTrackProcessors( + scope: ObservableScope, + room: LivekitRoom, + trackerProcessorState$: Behavior, + ): void { const track$ = scope.behavior( observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( map((trackRef) => { const track = trackRef?.publication?.track; return track instanceof LocalVideoTrack ? track : null; - }) - ) + }), + ), ); trackProcessorSync(track$, trackerProcessorState$); } - } - // Generate the initial LiveKit RoomOptions based on the current media devices and processor state. function generateRoomOption( devices: MediaDevices, @@ -235,11 +258,11 @@ function generateRoomOption( videoCaptureDefaults: { ...defaultLiveKitOptions.videoCaptureDefaults, deviceId: devices.videoInput.selected$.value?.id, - processor: processorState.processor + processor: processorState.processor, }, audioCaptureDefaults: { ...defaultLiveKitOptions.audioCaptureDefaults, - deviceId: devices.audioInput.selected$.value?.id + deviceId: devices.audioInput.selected$.value?.id, }, audioOutput: { // When using controlled audio devices, we don't want to set the @@ -247,8 +270,8 @@ function generateRoomOption( // (also the id does not need to match a browser device id) deviceId: controlledAudioDevices ? undefined - : getValue(devices.audioOutput.selected$)?.id + : getValue(devices.audioOutput.selected$)?.id, }, - e2ee: e2eeLivekitOptions + e2ee: e2eeLivekitOptions, }; } diff --git a/src/tile/MediaView.test.tsx b/src/tile/MediaView.test.tsx index 57be00ef..3637e8de 100644 --- a/src/tile/MediaView.test.tsx +++ b/src/tile/MediaView.test.tsx @@ -19,7 +19,11 @@ import { type ComponentProps } from "react"; import { MediaView } from "./MediaView"; import { EncryptionStatus } from "../state/MediaViewModel"; -import { mockLocalParticipant, mockMatrixRoomMember, mockRtcMembership } from "../utils/test"; +import { + mockLocalParticipant, + mockMatrixRoomMember, + mockRtcMembership, +} from "../utils/test"; describe("MediaView", () => { const participant = mockLocalParticipant({}); diff --git a/src/utils/test.ts b/src/utils/test.ts index b77e63c0..98a2addf 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -419,7 +419,9 @@ export const deviceStub = { select(): void {}, }; -export function mockMediaDevices(data: Partial): MockedObject { +export function mockMediaDevices( + data: Partial, +): MockedObject { return vi.mocked({ audioInput: deviceStub, audioOutput: deviceStub,