diff --git a/playwright/widget/hotswap-legacy-compat.test.ts b/playwright/widget/hotswap-legacy-compat.test.ts index a2edb27d..e4695624 100644 --- a/playwright/widget/hotswap-legacy-compat.test.ts +++ b/playwright/widget/hotswap-legacy-compat.test.ts @@ -27,9 +27,6 @@ import { HOST1, HOST2, TestHelpers } from "./test-helpers"; widgetTest( `Test swapping publisher from ${HOST1} to ${HOST2}`, async ({ addUser, browserName }) => { - // ALWAYS SKIPT THE TEST SINCE IT IS EXPECTED TO FAIL. - // confirmed locally that its failing without: https://github.com/element-hq/element-call/pull/3675 - test.skip(true); test.skip( browserName === "firefox", "The is test is not working on firefox CI environment. No mic/audio device inputs so cam/mic are disabled", diff --git a/src/livekit/MatrixAudioRenderer.tsx b/src/livekit/MatrixAudioRenderer.tsx index 5a4b2257..10579c1b 100644 --- a/src/livekit/MatrixAudioRenderer.tsx +++ b/src/livekit/MatrixAudioRenderer.tsx @@ -78,6 +78,7 @@ export function LivekitRoomAudioRenderer({ .filter((ref) => { const isValid = validIdentities.includes(ref.participant.identity); if (!isValid) { + // TODO make sure to also skip the warn logging for the local identity // Log that there is an invalid identity, that means that someone is publishing audio that is not expected to be in the call. prefixedLogger.warn( `Audio track ${ref.participant.identity} from ${url} has no matching matrix call member`, diff --git a/src/settings/DeveloperSettingsTab.tsx b/src/settings/DeveloperSettingsTab.tsx index a94dca26..91a2e241 100644 --- a/src/settings/DeveloperSettingsTab.tsx +++ b/src/settings/DeveloperSettingsTab.tsx @@ -316,6 +316,7 @@ export const DeveloperSettingsTab: FC = ({ })}

LivekitAlias: {livekitRoom.livekitAlias}

+

connectionState (wont hot reload): {livekitRoom.room.state}

{livekitRoom.isLocal &&

ws-url: {localSfuUrl?.href}

}

{t("developer_mode.livekit_server_info")}( diff --git a/src/settings/__snapshots__/DeveloperSettingsTab.test.tsx.snap b/src/settings/__snapshots__/DeveloperSettingsTab.test.tsx.snap index 57afe4d9..cfa25ca5 100644 --- a/src/settings/__snapshots__/DeveloperSettingsTab.test.tsx.snap +++ b/src/settings/__snapshots__/DeveloperSettingsTab.test.tsx.snap @@ -359,6 +359,9 @@ exports[`DeveloperSettingsTab > renders and matches snapshot 1`] = ` LivekitAlias: TestAlias

+

+ connectionState (wont hot reload): +

ws-url: wss://local-sfu.example.org/ @@ -401,6 +404,9 @@ exports[`DeveloperSettingsTab > renders and matches snapshot 1`] = ` LivekitAlias: TestAlias2

+

+ connectionState (wont hot reload): +

LiveKit Server Info ( diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index bf3e9521..554060bf 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -529,7 +529,6 @@ export function createCallViewModel$( }, createPublisherFactory: (connection: Connection) => { return new Publisher( - scope, connection, mediaDevices, muteStates, diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index af12c98b..9d2ded79 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -297,6 +297,9 @@ describe("LocalMembership", () => { seed += 1; logger.info(`creating [${a}]`); const p = { + // It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does + // all the cleanup we need. + destroy: vi.fn(), stopPublishing: vi.fn().mockImplementation(() => { logger.info(`stopPublishing [${a}]`); }), @@ -325,13 +328,12 @@ describe("LocalMembership", () => { await flushPromises(); localTransport$.next(bTransport); await flushPromises(); + expect(publisherFactory).toHaveBeenCalledTimes(2); expect(publishers.length).toBe(2); // stop the first Publisher and let the second one life. - expect(publishers[0].stopTracks).toHaveBeenCalled(); - expect(publishers[1].stopTracks).not.toHaveBeenCalled(); - expect(publishers[0].stopPublishing).toHaveBeenCalled(); - expect(publishers[1].stopPublishing).not.toHaveBeenCalled(); + expect(publishers[0].destroy).toHaveBeenCalled(); + expect(publishers[1].destroy).not.toHaveBeenCalled(); expect(publisherFactory.mock.calls[0][0].transport).toBe( aTransport.transport, ); @@ -341,7 +343,7 @@ describe("LocalMembership", () => { scope.end(); await flushPromises(); // stop all tracks after ending scopes - expect(publishers[1].stopPublishing).toHaveBeenCalled(); + expect(publishers[1].destroy).toHaveBeenCalled(); // expect(publishers[1].stopTracks).toHaveBeenCalled(); defaultCreateLocalMemberValues.createPublisherFactory.mockReset(); @@ -359,8 +361,9 @@ describe("LocalMembership", () => { defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( () => { const p = { - stopPublishing: vi.fn(), - stopTracks: vi.fn(), + // It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does + // all the cleanup we need. + destroy: vi.fn(), createAndSetupTracks: vi.fn().mockImplementation(async () => { tracks$.next([{}, {}] as LocalTrack[]); return Promise.resolve(); @@ -395,11 +398,11 @@ describe("LocalMembership", () => { localMembership.startTracks(); await flushPromises(); expect(publishers[0].createAndSetupTracks).toHaveBeenCalled(); - // expect(localMembership.tracks$.value.length).toBe(2); + scope.end(); await flushPromises(); // stop all tracks after ending scopes - expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].destroy).toHaveBeenCalled(); // expect(publishers[0].stopTracks).toHaveBeenCalled(); publisherFactory.mockClear(); }); @@ -416,27 +419,22 @@ describe("LocalMembership", () => { ); const publishers: Publisher[] = []; - const tracks$ = new BehaviorSubject([]); const publishing$ = new BehaviorSubject(false); const createTrackResolver = Promise.withResolvers(); const publishResolver = Promise.withResolvers(); defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( () => { const p = { - stopPublishing: vi.fn(), - stopTracks: vi.fn().mockImplementation(() => { - logger.info("stopTracks"); - tracks$.next([]); - }), + // It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does + // all the cleanup we need. + destroy: vi.fn(), createAndSetupTracks: vi.fn().mockImplementation(async () => { await createTrackResolver.promise; - tracks$.next([{}, {}] as LocalTrack[]); }), startPublishing: vi.fn().mockImplementation(async () => { await publishResolver.promise; publishing$.next(true); }), - tracks$, publishing$, }; publishers.push(p as unknown as Publisher); @@ -536,7 +534,7 @@ describe("LocalMembership", () => { (localMembership.localMemberState$.value as any).media, ).toStrictEqual(PublishState.Publishing); - expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); + expect(publishers[0].destroy).not.toHaveBeenCalled(); expect(localMembership.localMemberState$.isStopped).toBe(false); scope.end(); @@ -547,7 +545,7 @@ describe("LocalMembership", () => { (localMembership.localMemberState$.value as any).media, ).toStrictEqual(PublishState.Publishing); // stop all tracks after ending scopes - expect(publishers[0].stopPublishing).toHaveBeenCalled(); + expect(publishers[0].destroy).toHaveBeenCalled(); // expect(publishers[0].stopTracks).toHaveBeenCalled(); }); // TODO add tests for matrix local matrix participation. diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 4749e942..44b6c63b 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -310,13 +310,17 @@ export const createLocalMembership$ = ({ // - destruct all current streams // - overwrite current publisher scope.reconcile(localConnection$, async (connection) => { + logger.info( + "reconcile based on new localConnection:", + connection?.transport.livekit_service_url, + ); if (connection !== null) { const publisher = createPublisherFactory(connection); publisher$.next(publisher); + // Clean-up callback return Promise.resolve(async (): Promise => { - await publisher.stopPublishing(); - await publisher.stopTracks(); + await publisher.destroy(); }); } }); diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 0625866d..7e1c4155 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -16,9 +16,9 @@ import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { combineLatest, distinctUntilChanged, - first, from, map, + of, switchMap, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; @@ -126,36 +126,41 @@ export const createLocalTransport$ = ({ * The transport over which we should be actively publishing our media. * undefined when not joined. */ - const oldestMemberTransport$ = scope.behavior( - combineLatest([memberships$]).pipe( - map(([memberships]) => { - const oldestMember = memberships.value[0]; - const transport = oldestMember?.getTransport(memberships.value[0]); - if (!transport) return null; - return transport; - }), - first((t) => t != null && isLivekitTransport(t)), - switchMap((transport) => { - // Get the open jwt token to connect to the sfu - const computeLocalTransportWithSFUConfig = - async (): Promise => { - return { - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }; - }; - return from(computeLocalTransportWithSFUConfig()); - }), - ), - null, - ); + const oldestMemberTransport$ = + scope.behavior( + combineLatest([memberships$, useOldestMember$]).pipe( + map(([memberships, useOldestMember]) => { + if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member + const oldestMember = memberships.value[0]; + const transport = oldestMember?.getTransport(oldestMember); + if (!transport) return null; + return transport; + }), + switchMap((transport) => { + if (transport !== null && isLivekitTransport(transport)) { + // Get the open jwt token to connect to the sfu + const computeLocalTransportWithSFUConfig = + async (): Promise => { + // await sleep(1000); + return { + transport, + sfuConfig: await getSFUConfigWithOpenID( + client, + ownMembershipIdentity, + transport.livekit_service_url, + roomId, + { forceJwtEndpoint: JwtEndpointVersion.Legacy }, + logger, + ), + }; + }; + return from(computeLocalTransportWithSFUConfig()); + } + return of(null); + }), + ), + null, + ); /** * The transport that we would personally prefer to publish on (if not for the @@ -200,14 +205,23 @@ export const createLocalTransport$ = ({ oldestMemberTransport$, preferredTransport$, ]).pipe( - map(([useOldestMember, oldestMemberTransport, preferredTransport]) => - useOldestMember + map(([useOldestMember, oldestMemberTransport, preferredTransport]) => { + return useOldestMember ? (oldestMemberTransport ?? preferredTransport) - : preferredTransport, - ), - distinctUntilChanged((t1, t2) => - areLivekitTransportsEqual(t1?.transport ?? null, t2?.transport ?? null), - ), + : preferredTransport; + }), + distinctUntilChanged((t1, t2) => { + logger.info( + "Local Transport Update from:", + t1?.transport.livekit_service_url, + " to ", + t2?.transport.livekit_service_url, + ); + return areLivekitTransportsEqual( + t1?.transport ?? null, + t2?.transport ?? null, + ); + }), ), ); }; diff --git a/src/state/CallViewModel/localMember/Publisher.test.ts b/src/state/CallViewModel/localMember/Publisher.test.ts index 38a80bed..a0eaa2fd 100644 --- a/src/state/CallViewModel/localMember/Publisher.test.ts +++ b/src/state/CallViewModel/localMember/Publisher.test.ts @@ -183,7 +183,6 @@ describe("Publisher", () => { beforeEach(() => { publisher = new Publisher( - scope, connection, mockMediaDevices({}), muteStates, @@ -192,7 +191,9 @@ describe("Publisher", () => { ); }); - afterEach(() => {}); + afterEach(async () => { + await publisher.destroy(); + }); it("Should not create tracks if started muted to avoid unneeded permission requests", async () => { const createTracksSpy = vi.spyOn( @@ -207,6 +208,38 @@ describe("Publisher", () => { expect(createTracksSpy).not.toHaveBeenCalled(); }); + it("should unsetHandler and stop tracks on destroy", async () => { + // setup all spies + const unsetVideoSpy = vi.spyOn( + ( + publisher as unknown as { + muteStates: { video: { unsetHandler: () => void } }; + } + ).muteStates.video, + "unsetHandler", + ); + const unsetAudioSpy = vi.spyOn( + ( + publisher as unknown as { + muteStates: { audio: { unsetHandler: () => void } }; + } + ).muteStates.audio, + "unsetHandler", + ); + const scopeEndSpy = vi.spyOn( + (publisher as unknown as { scope: { end: () => void } }).scope, + "end", + ); + const stopTracksSpy = vi.spyOn(publisher, "stopTracks"); + // destroy publisher + await publisher.destroy(); + + expect(stopTracksSpy).toHaveBeenCalledOnce(); + expect(unsetVideoSpy).toHaveBeenCalledOnce(); + expect(unsetAudioSpy).toHaveBeenCalledOnce(); + expect(scopeEndSpy).toHaveBeenCalled(); + }); + it("Should minimize permission request by querying create at once", async () => { const enableCameraAndMicrophoneSpy = vi.spyOn( localParticipant, @@ -267,7 +300,6 @@ describe("Publisher", () => { let publisher: Publisher; beforeEach(() => { publisher = new Publisher( - scope, connection, mockMediaDevices({}), muteStates, @@ -275,6 +307,9 @@ describe("Publisher", () => { logger, ); }); + afterEach(async () => { + await publisher.destroy(); + }); test.each([ { mutes: { audioEnabled: true, videoEnabled: false } }, @@ -320,7 +355,6 @@ describe("Bug fix", () => { it("wrongly publish tracks while muted", async () => { // setLogLevel(`debug`); const publisher = new Publisher( - scope, connection, mockMediaDevices({}), muteStates, @@ -356,5 +390,6 @@ describe("Bug fix", () => { expect(track!.mute).toHaveBeenCalled(); expect(track!.isMuted).toBe(true); } + await publisher.destroy(); }); }); diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index 27c53726..8df38743 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,7 +32,7 @@ import { import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { type Connection } from "../remoteMembers/Connection.ts"; -import { type ObservableScope } from "../../ObservableScope.ts"; +import { ObservableScope } from "../../ObservableScope.ts"; /** * A wrapper for a Connection object. @@ -47,9 +47,10 @@ export class Publisher { */ public shouldPublish = false; + private readonly scope = new ObservableScope(); + /** * Creates a new Publisher. - * @param scope - The observable scope to use for managing the publisher. * @param connection - The connection to use for publishing. * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. @@ -57,7 +58,6 @@ export class Publisher { * @param logger - The logger to use for logging :D. */ public constructor( - private scope: ObservableScope, private connection: Pick, //setE2EEEnabled, devices: MediaDevices, private readonly muteStates: MuteStates, @@ -65,7 +65,6 @@ export class Publisher { private logger: Logger, ) { const { controlledAudioDevices } = getUrlParams(); - const room = connection.livekitRoom; room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => { @@ -73,17 +72,11 @@ export class Publisher { }); // Setup track processor syncing (blur) - this.observeTrackProcessors(scope, room, trackerProcessorState$); + this.observeTrackProcessors(this.scope, room, trackerProcessorState$); // Observe media device changes and update LiveKit active devices accordingly - this.observeMediaDevices(scope, devices, controlledAudioDevices); + this.observeMediaDevices(this.scope, devices, controlledAudioDevices); - this.workaroundRestartAudioInputTrackChrome(devices, scope); - this.scope.onEnd(() => { - this.logger.info("Scope ended -> stop publishing all tracks"); - void this.stopPublishing(); - muteStates.audio.unsetHandler(); - muteStates.video.unsetHandler(); - }); + this.workaroundRestartAudioInputTrackChrome(devices, this.scope); this.connection.livekitRoom.localParticipant.on( ParticipantEvent.LocalTrackPublished, @@ -91,6 +84,21 @@ export class Publisher { ); } + public async destroy(): Promise { + this.scope.end(); + this.logger.info("Scope ended -> unset handler"); + this.muteStates.audio.unsetHandler(); + this.muteStates.video.unsetHandler(); + + this.logger.info(`Start to stop tracks`); + try { + await this.stopTracks(); + this.logger.info(`Done to stop tracks`); + } catch (e) { + this.logger.error(`Failed to stop tracks: ${e}`); + } + } + // LiveKit will publish the tracks as soon as they are created // but we want to control when tracks are published. // We cannot just mute the tracks, even if this will effectively stop the publishing, diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 028b28f6..f649e931 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -155,6 +155,16 @@ export class Connection { const { url, jwt, livekitAlias } = this.existingSFUConfig ?? (await this.getSFUConfigForRemoteConnection()); + this.logger.debug( + "Starting Connection to: ", + this.transport.livekit_service_url, + "jwt: ", + jwt, + "wss: ", + url, + "livekitAlias: ", + livekitAlias, + ); this._livekitAlias = livekitAlias; // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; @@ -171,8 +181,11 @@ export class Connection { }); try { + this.logger.info(`livekitRoom.connect ${url}`); await this.livekitRoom.connect(url, jwt); + this.logger.info(`livekitRoom.connect SUCCESS ${url}`); } catch (e) { + this.logger.info(`livekitRoom.connect FAILED ${url}`, e); // LiveKit uses 503 to indicate that the server has hit its track limits. // https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171 // It also errors with a status code of 200 (yes, really) for room @@ -233,12 +246,15 @@ export class Connection { */ public async stop(): Promise { this.logger.debug( - `Stopping connection to ${this.transport.livekit_service_url}`, + `stop: disconnecing from lk room ${this.transport.livekit_service_url}`, ); if (this.stopped) return; await this.livekitRoom.disconnect(); this._state$.next(ConnectionState.Stopped); this.stopped = true; + this.logger.debug( + `stop: DONE disconnecing from lk room ${this.transport.livekit_service_url}`, + ); } private readonly client: OpenIDClientParts; @@ -255,9 +271,11 @@ export class Connection { public constructor(opts: ConnectionOpts, logger: Logger) { this.ownMembershipIdentity = opts.ownMembershipIdentity; this.existingSFUConfig = opts.existingSFUConfig; - this.logger = logger.getChild("[Connection]"); + this.logger = logger.getChild( + "[Connection " + opts.transport.livekit_service_url + "]", + ); this.logger.info( - `Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, + `constructor: ${opts.transport.livekit_service_url} alias: ${opts.transport.livekit_alias} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`, ); const { transport, client, scope } = opts; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 4295c5f2..8bc008ea 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -189,10 +189,6 @@ export function createConnectionManager$({ } }, (scope, _data$, serviceUrl, alias, sfuConfig) => { - logger.debug( - `Creating connection to ${serviceUrl} (${alias}, withSfuConfig (local connection?): ${JSON.stringify(sfuConfig) ?? "no config->remote connection"})`, - ); - const connection = connectionFactory.createConnection( scope, {