From 83c630c97b3200938d855ed29adb2811c7d4a781 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 9 May 2025 09:57:44 +0200 Subject: [PATCH] fix: Livekit openned connection leaks --- src/livekit/useECConnectionState.test.tsx | 111 +++++++++++++++++++++- src/livekit/useECConnectionState.ts | 61 +++++++++++- src/utils/cancellable.ts | 18 ++++ 3 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 src/utils/cancellable.ts diff --git a/src/livekit/useECConnectionState.test.tsx b/src/livekit/useECConnectionState.test.tsx index 2118e4ff..72324884 100644 --- a/src/livekit/useECConnectionState.test.tsx +++ b/src/livekit/useECConnectionState.test.tsx @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { type FC, useCallback, useState } from "react"; -import { test, vi } from "vitest"; +import { describe, expect, test, vi, vitest } from "vitest"; import { ConnectionError, ConnectionErrorReason, @@ -15,6 +15,7 @@ import { import userEvent from "@testing-library/user-event"; import { render, screen } from "@testing-library/react"; import { MemoryRouter } from "react-router-dom"; +import { defer, sleep } from "matrix-js-sdk/lib/utils"; import { useECConnectionState } from "./useECConnectionState"; import { type SFUConfig } from "./openIDSFU"; @@ -73,3 +74,111 @@ test.each<[string, ConnectionError]>([ screen.getByText("Insufficient capacity"); }, ); + +describe("Leaking connection prevention", () => { + function createTestComponent(mockRoom: Room): FC { + const TestComponent: FC = () => { + const [sfuConfig, setSfuConfig] = useState( + undefined, + ); + const connect = useCallback( + () => setSfuConfig({ url: "URL", jwt: "JWT token" }), + [], + ); + useECConnectionState("default", false, mockRoom, sfuConfig); + return ; + }; + return TestComponent; + } + + test("Should cancel pending connections when the component is unmounted", async () => { + const connectCall = vi.fn(); + const pendingConnection = defer(); + // let pendingDisconnection = defer() + const disconnectMock = vi.fn(); + + const mockRoom = { + on: () => {}, + off: () => {}, + once: () => {}, + connect: async () => { + connectCall.call(undefined); + return await pendingConnection.promise; + }, + disconnect: disconnectMock, + localParticipant: { + getTrackPublication: () => {}, + createTracks: () => [], + }, + } as unknown as Room; + + const TestComponent = createTestComponent(mockRoom); + + const { unmount } = render(); + const user = userEvent.setup(); + await user.click(screen.getByRole("button", { name: "Connect" })); + + expect(connectCall).toHaveBeenCalled(); + // unmount while the connection is pending + unmount(); + + // resolve the pending connection + pendingConnection.resolve(); + + await vitest.waitUntil( + () => { + return disconnectMock.mock.calls.length > 0; + }, + { + timeout: 1000, + interval: 100, + }, + ); + + // There should be some cleaning up to avoid leaking an open connection + expect(disconnectMock).toHaveBeenCalledTimes(1); + }); + + test("Should cancel about to open but not yet opened connection", async () => { + const createTracksCall = vi.fn(); + const pendingCreateTrack = defer(); + // let pendingDisconnection = defer() + const disconnectMock = vi.fn(); + const connectMock = vi.fn(); + + const mockRoom = { + on: () => {}, + off: () => {}, + once: () => {}, + connect: connectMock, + disconnect: disconnectMock, + localParticipant: { + getTrackPublication: () => {}, + createTracks: async () => { + createTracksCall.call(undefined); + await pendingCreateTrack.promise; + return []; + }, + }, + } as unknown as Room; + + const TestComponent = createTestComponent(mockRoom); + + const { unmount } = render(); + const user = userEvent.setup(); + await user.click(screen.getByRole("button", { name: "Connect" })); + + expect(createTracksCall).toHaveBeenCalled(); + // unmount while createTracks is pending + unmount(); + + // resolve createTracks + pendingCreateTrack.resolve(); + + // Yield to the event loop to let the connection attempt finish + await sleep(100); + + // The operation should have been aborted before even calling connect. + expect(connectMock).not.toHaveBeenCalled(); + }); +}); diff --git a/src/livekit/useECConnectionState.ts b/src/livekit/useECConnectionState.ts index d877dde7..83b21539 100644 --- a/src/livekit/useECConnectionState.ts +++ b/src/livekit/useECConnectionState.ts @@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details. */ import { - type AudioCaptureOptions, ConnectionError, ConnectionState, type LocalTrack, @@ -25,6 +24,7 @@ import { InsufficientCapacityError, UnknownCallError, } from "../utils/errors.ts"; +import { Cancellable } from "../utils/cancellable.ts"; declare global { interface Window { @@ -60,6 +60,7 @@ async function doConnect( sfuConfig: SFUConfig, audioEnabled: boolean, initialDeviceId: string | undefined, + cancellable: Cancellable, ): Promise { // Always create an audio track manually. // livekit (by default) keeps the mic track open when you mute, but if you start muted, @@ -84,17 +85,38 @@ async function doConnect( const audioTracks = await livekitRoom!.localParticipant.createTracks({ audio: { deviceId: initialDeviceId }, }); + if (audioTracks.length < 1) { logger.info("Tried to pre-create local audio track but got no tracks"); } else { preCreatedAudioTrack = audioTracks[0]; } + // There was a yield point previously (awaiting for the track to be created) so we need to check + // if the operation was cancelled and stop connecting if needed. + if (cancellable.isCancelled()) { + logger.info( + "[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted", + ); + preCreatedAudioTrack?.stop(); + return; + } + logger.info("Pre-created microphone track"); } catch (e) { logger.error("Failed to pre-create microphone track", e); } - if (!audioEnabled) await preCreatedAudioTrack?.mute(); + if (!audioEnabled) { + await preCreatedAudioTrack?.mute(); + // There was a yield point. Check if the operation was cancelled and stop connecting. + if (cancellable.isCancelled()) { + logger.info( + "[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted", + ); + preCreatedAudioTrack?.stop(); + return; + } + } // check again having awaited for the track to create if ( @@ -107,9 +129,18 @@ async function doConnect( return; } - logger.info("Connecting & publishing"); + logger.info("[Lifecycle] Connecting & publishing"); try { await connectAndPublish(livekitRoom, sfuConfig, preCreatedAudioTrack, []); + if (cancellable.isCancelled()) { + logger.info( + "[Lifecycle] Signal Aborted: Connected but operation was cancelled. Force disconnect", + ); + livekitRoom?.disconnect().catch((err) => { + logger.error("Failed to disconnect from SFU", err); + }); + return; + } } catch (e) { preCreatedAudioTrack?.stop(); logger.debug("Stopped precreated audio tracks."); @@ -250,6 +281,22 @@ export function useECConnectionState( const currentSFUConfig = useRef(Object.assign({}, sfuConfig)); + // Protection against potential leaks, where the component to be unmounted and there is + // still a pending doConnect promise. This would lead the user to still be in the call even + // if the component is unmounted. + const cancelBag = useRef(new Set()); + + // This is a cleanup function that will be called when the component is unmounted. + // It will cancel all cancellables in the bag + useEffect(() => { + const bag = cancelBag.current; + return (): void => { + bag.forEach((cancellable) => { + cancellable.cancel(); + }); + }; + }, []); + // Id we are transitioning from a valid config to another valid one, we need // to explicitly switch focus useEffect(() => { @@ -276,11 +323,14 @@ export function useECConnectionState( // always capturing audio: it helps keep bluetooth headsets in the right mode and // mobile browsers to know we're doing a call. setIsInDoConnect(true); + const cancellable = new Cancellable(); + cancelBag.current.add(cancellable); doConnect( livekitRoom!, sfuConfig!, initialAudioEnabled, initialDeviceId, + cancellable, ) .catch((e) => { if (e instanceof ElementCallError) { @@ -289,7 +339,10 @@ export function useECConnectionState( setError(new UnknownCallError(e)); } else logger.error("Failed to connect to SFU", e); }) - .finally(() => setIsInDoConnect(false)); + .finally(() => { + cancelBag.current.delete(cancellable); + setIsInDoConnect(false); + }); } currentSFUConfig.current = Object.assign({}, sfuConfig); diff --git a/src/utils/cancellable.ts b/src/utils/cancellable.ts new file mode 100644 index 00000000..7d387b0c --- /dev/null +++ b/src/utils/cancellable.ts @@ -0,0 +1,18 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +export class Cancellable { + public constructor(private cancelled = false) {} + + public cancel(): void { + this.cancelled = true; + } + + public isCancelled(): boolean { + return this.cancelled; + } +}