Fix: Publisher re-enable tracks after room connected in case of timout

This commit is contained in:
Valere
2026-01-14 16:01:55 +01:00
parent 27e351270c
commit fadb3d8a83
3 changed files with 89 additions and 8 deletions

View File

@@ -12,6 +12,8 @@ import {
type LocalTrack,
type LocalTrackPublication,
ParticipantEvent,
PublishTrackError,
RoomEvent,
Track,
} from "livekit-client";
import { BehaviorSubject } from "rxjs";
@@ -357,4 +359,50 @@ describe("Bug fix", () => {
expect(track!.isMuted).toBe(true);
}
});
// When the connection is created, we call createAndSetupTracks immediately.
// But the livekit room connection is not yet established: `Connection#start` must
// first get the sfu config, and then connect to the livekit room.
// Livekit has support for this case by queuing publications until the room is connected,
// but this can time out if the room connection takes too long (15s)
it("Recovers failed publication due to room connection timeout", async () => {
// setLogLevel(`debug`);
const publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
constant({ supported: false, processor: undefined }),
logger,
);
audioEnabled$.next(true);
videoEnabled$.next(true);
const enableCameraAndMicrophoneSpy = vi.spyOn(
localParticipant,
"enableCameraAndMicrophone",
);
enableCameraAndMicrophoneSpy
.mockImplementationOnce(() => {
throw new PublishTrackError(
"publishing rejected as engine not connected within timeout",
408,
);
})
.mockImplementationOnce(async () => {
return Promise.resolve();
});
// call createAndSetupTracks which will attempt to publish and fail with the simulated timeout
await publisher.createAndSetupTracks();
// Now simulate the connection finally beeing connected
connection.livekitRoom.emit(RoomEvent.Connected);
await flushPromises();
// Should have called enableCameraAndMicrophone again to retry publication
expect(enableCameraAndMicrophoneSpy).toHaveBeenCalledTimes(2);
});
});

View File

@@ -11,6 +11,7 @@ import {
LocalVideoTrack,
ParticipantEvent,
type Room as LivekitRoom,
RoomEvent as LivekitRoomEvent,
Track,
} from "livekit-client";
import {
@@ -89,6 +90,23 @@ export class Publisher {
ParticipantEvent.LocalTrackPublished,
this.onLocalTrackPublished.bind(this),
);
this.connection.livekitRoom.once(LivekitRoomEvent.Connected, () => {
// When `createAndSetupTracks` is called before the connection is established,
// there is an internal timeout in livekit that could be triggered if it takes
// too long to connect.
// It is no-op if tracks are already created, so it is safe to call it again.
const audio = this.muteStates.audio.enabled$.value;
const video = this.muteStates.video.enabled$.value;
this.enableTracks(audio, video, this.connection.livekitRoom).catch(
(e) => {
this.logger.error(
"Failed to enable tracks after connection established",
e,
);
},
);
});
}
// LiveKit will publish the tracks as soon as they are created
@@ -157,18 +175,31 @@ export class Publisher {
// We are using the `ParticipantEvent.LocalTrackPublished` to be notified
// when tracks are actually published, and at that point
// we can pause upstream if needed (depending on if startPublishing has been called).
if (audio && video) {
// Enable both at once in order to have a single permission prompt!
void lkRoom.localParticipant.enableCameraAndMicrophone();
} else if (audio) {
void lkRoom.localParticipant.setMicrophoneEnabled(true);
} else if (video) {
void lkRoom.localParticipant.setCameraEnabled(true);
}
this.enableTracks(audio, video, lkRoom).catch((e) => {
// If it is PublisherTrackError (408), i.e the publishing timed out,
// because it took too long to connet to the room, we are safe because
// we registered to the RoomEvent.Connected to create the tracks once connected.
this.logger.error("Failed to enable tracks", e);
});
return Promise.resolve();
}
private async enableTracks(
audio: boolean,
video: boolean,
lkRoom: LivekitRoom,
): Promise<void> {
if (audio && video) {
// Enable both at once in order to have a single permission prompt!
await lkRoom.localParticipant.enableCameraAndMicrophone();
} else if (audio) {
await lkRoom.localParticipant.setMicrophoneEnabled(true);
} else if (video) {
await lkRoom.localParticipant.setCameraEnabled(true);
}
}
private async pauseUpstreams(
lkRoom: LivekitRoom,
sources: Track.Source[],

View File

@@ -176,6 +176,7 @@ export function withTestScheduler(
interface EmitterMock<T> {
on: (...args: unknown[]) => T;
off: (...args: unknown[]) => T;
once: (...args: unknown[]) => T;
addListener: (...args: unknown[]) => T;
removeListener: (...args: unknown[]) => T;
emit: (event: string | symbol, ...args: unknown[]) => boolean;
@@ -186,6 +187,7 @@ export function mockEmitter<T>(): EmitterMock<T> {
return {
on: ee.on.bind(ee) as unknown as (...args: unknown[]) => T,
off: ee.off.bind(ee) as unknown as (...args: unknown[]) => T,
once: ee.once.bind(ee) as unknown as (...args: unknown[]) => T,
addListener: ee.addListener.bind(ee) as unknown as (
...args: unknown[]
) => T,