fix: Livekit openned connection leaks

This commit is contained in:
Valere
2025-05-09 09:57:44 +02:00
parent 02fa7594e9
commit 83c630c97b
3 changed files with 185 additions and 5 deletions

View File

@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
*/
import { type FC, useCallback, useState } from "react";
import { test, vi } from "vitest";
import { describe, expect, test, vi, vitest } from "vitest";
import {
ConnectionError,
ConnectionErrorReason,
@@ -15,6 +15,7 @@ import {
import userEvent from "@testing-library/user-event";
import { render, screen } from "@testing-library/react";
import { MemoryRouter } from "react-router-dom";
import { defer, sleep } from "matrix-js-sdk/lib/utils";
import { useECConnectionState } from "./useECConnectionState";
import { type SFUConfig } from "./openIDSFU";
@@ -73,3 +74,111 @@ test.each<[string, ConnectionError]>([
screen.getByText("Insufficient capacity");
},
);
describe("Leaking connection prevention", () => {
function createTestComponent(mockRoom: Room): FC {
const TestComponent: FC = () => {
const [sfuConfig, setSfuConfig] = useState<SFUConfig | undefined>(
undefined,
);
const connect = useCallback(
() => setSfuConfig({ url: "URL", jwt: "JWT token" }),
[],
);
useECConnectionState("default", false, mockRoom, sfuConfig);
return <button onClick={connect}>Connect</button>;
};
return TestComponent;
}
test("Should cancel pending connections when the component is unmounted", async () => {
const connectCall = vi.fn();
const pendingConnection = defer<void>();
// let pendingDisconnection = defer<void>()
const disconnectMock = vi.fn();
const mockRoom = {
on: () => {},
off: () => {},
once: () => {},
connect: async () => {
connectCall.call(undefined);
return await pendingConnection.promise;
},
disconnect: disconnectMock,
localParticipant: {
getTrackPublication: () => {},
createTracks: () => [],
},
} as unknown as Room;
const TestComponent = createTestComponent(mockRoom);
const { unmount } = render(<TestComponent />);
const user = userEvent.setup();
await user.click(screen.getByRole("button", { name: "Connect" }));
expect(connectCall).toHaveBeenCalled();
// unmount while the connection is pending
unmount();
// resolve the pending connection
pendingConnection.resolve();
await vitest.waitUntil(
() => {
return disconnectMock.mock.calls.length > 0;
},
{
timeout: 1000,
interval: 100,
},
);
// There should be some cleaning up to avoid leaking an open connection
expect(disconnectMock).toHaveBeenCalledTimes(1);
});
test("Should cancel about to open but not yet opened connection", async () => {
const createTracksCall = vi.fn();
const pendingCreateTrack = defer<void>();
// let pendingDisconnection = defer<void>()
const disconnectMock = vi.fn();
const connectMock = vi.fn();
const mockRoom = {
on: () => {},
off: () => {},
once: () => {},
connect: connectMock,
disconnect: disconnectMock,
localParticipant: {
getTrackPublication: () => {},
createTracks: async () => {
createTracksCall.call(undefined);
await pendingCreateTrack.promise;
return [];
},
},
} as unknown as Room;
const TestComponent = createTestComponent(mockRoom);
const { unmount } = render(<TestComponent />);
const user = userEvent.setup();
await user.click(screen.getByRole("button", { name: "Connect" }));
expect(createTracksCall).toHaveBeenCalled();
// unmount while createTracks is pending
unmount();
// resolve createTracks
pendingCreateTrack.resolve();
// Yield to the event loop to let the connection attempt finish
await sleep(100);
// The operation should have been aborted before even calling connect.
expect(connectMock).not.toHaveBeenCalled();
});
});

View File

@@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details.
*/
import {
type AudioCaptureOptions,
ConnectionError,
ConnectionState,
type LocalTrack,
@@ -25,6 +24,7 @@ import {
InsufficientCapacityError,
UnknownCallError,
} from "../utils/errors.ts";
import { Cancellable } from "../utils/cancellable.ts";
declare global {
interface Window {
@@ -60,6 +60,7 @@ async function doConnect(
sfuConfig: SFUConfig,
audioEnabled: boolean,
initialDeviceId: string | undefined,
cancellable: Cancellable,
): Promise<void> {
// Always create an audio track manually.
// livekit (by default) keeps the mic track open when you mute, but if you start muted,
@@ -84,17 +85,38 @@ async function doConnect(
const audioTracks = await livekitRoom!.localParticipant.createTracks({
audio: { deviceId: initialDeviceId },
});
if (audioTracks.length < 1) {
logger.info("Tried to pre-create local audio track but got no tracks");
} else {
preCreatedAudioTrack = audioTracks[0];
}
// There was a yield point previously (awaiting for the track to be created) so we need to check
// if the operation was cancelled and stop connecting if needed.
if (cancellable.isCancelled()) {
logger.info(
"[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted",
);
preCreatedAudioTrack?.stop();
return;
}
logger.info("Pre-created microphone track");
} catch (e) {
logger.error("Failed to pre-create microphone track", e);
}
if (!audioEnabled) await preCreatedAudioTrack?.mute();
if (!audioEnabled) {
await preCreatedAudioTrack?.mute();
// There was a yield point. Check if the operation was cancelled and stop connecting.
if (cancellable.isCancelled()) {
logger.info(
"[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted",
);
preCreatedAudioTrack?.stop();
return;
}
}
// check again having awaited for the track to create
if (
@@ -107,9 +129,18 @@ async function doConnect(
return;
}
logger.info("Connecting & publishing");
logger.info("[Lifecycle] Connecting & publishing");
try {
await connectAndPublish(livekitRoom, sfuConfig, preCreatedAudioTrack, []);
if (cancellable.isCancelled()) {
logger.info(
"[Lifecycle] Signal Aborted: Connected but operation was cancelled. Force disconnect",
);
livekitRoom?.disconnect().catch((err) => {
logger.error("Failed to disconnect from SFU", err);
});
return;
}
} catch (e) {
preCreatedAudioTrack?.stop();
logger.debug("Stopped precreated audio tracks.");
@@ -250,6 +281,22 @@ export function useECConnectionState(
const currentSFUConfig = useRef(Object.assign({}, sfuConfig));
// Protection against potential leaks, where the component to be unmounted and there is
// still a pending doConnect promise. This would lead the user to still be in the call even
// if the component is unmounted.
const cancelBag = useRef(new Set<Cancellable>());
// This is a cleanup function that will be called when the component is unmounted.
// It will cancel all cancellables in the bag
useEffect(() => {
const bag = cancelBag.current;
return (): void => {
bag.forEach((cancellable) => {
cancellable.cancel();
});
};
}, []);
// Id we are transitioning from a valid config to another valid one, we need
// to explicitly switch focus
useEffect(() => {
@@ -276,11 +323,14 @@ export function useECConnectionState(
// always capturing audio: it helps keep bluetooth headsets in the right mode and
// mobile browsers to know we're doing a call.
setIsInDoConnect(true);
const cancellable = new Cancellable();
cancelBag.current.add(cancellable);
doConnect(
livekitRoom!,
sfuConfig!,
initialAudioEnabled,
initialDeviceId,
cancellable,
)
.catch((e) => {
if (e instanceof ElementCallError) {
@@ -289,7 +339,10 @@ export function useECConnectionState(
setError(new UnknownCallError(e));
} else logger.error("Failed to connect to SFU", e);
})
.finally(() => setIsInDoConnect(false));
.finally(() => {
cancelBag.current.delete(cancellable);
setIsInDoConnect(false);
});
}
currentSFUConfig.current = Object.assign({}, sfuConfig);

18
src/utils/cancellable.ts Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
export class Cancellable {
public constructor(private cancelled = false) {}
public cancel(): void {
this.cancelled = true;
}
public isCancelled(): boolean {
return this.cancelled;
}
}