Merge branch 'livekit' into valere/playwright/spa_sticky_test

This commit is contained in:
Valere
2026-01-16 17:41:39 +01:00
12 changed files with 164 additions and 87 deletions

View File

@@ -27,9 +27,6 @@ import { HOST1, HOST2, TestHelpers } from "./test-helpers";
widgetTest(
`Test swapping publisher from ${HOST1} to ${HOST2}`,
async ({ addUser, browserName }) => {
// ALWAYS SKIPT THE TEST SINCE IT IS EXPECTED TO FAIL.
// confirmed locally that its failing without: https://github.com/element-hq/element-call/pull/3675
test.skip(true);
test.skip(
browserName === "firefox",
"The is test is not working on firefox CI environment. No mic/audio device inputs so cam/mic are disabled",

View File

@@ -78,6 +78,7 @@ export function LivekitRoomAudioRenderer({
.filter((ref) => {
const isValid = validIdentities.includes(ref.participant.identity);
if (!isValid) {
// TODO make sure to also skip the warn logging for the local identity
// Log that there is an invalid identity, that means that someone is publishing audio that is not expected to be in the call.
prefixedLogger.warn(
`Audio track ${ref.participant.identity} from ${url} has no matching matrix call member`,

View File

@@ -316,6 +316,7 @@ export const DeveloperSettingsTab: FC<Props> = ({
})}
</h4>
<p>LivekitAlias: {livekitRoom.livekitAlias}</p>
<p>connectionState (wont hot reload): {livekitRoom.room.state}</p>
{livekitRoom.isLocal && <p>ws-url: {localSfuUrl?.href}</p>}
<p>
{t("developer_mode.livekit_server_info")}(

View File

@@ -359,6 +359,9 @@ exports[`DeveloperSettingsTab > renders and matches snapshot 1`] = `
LivekitAlias:
TestAlias
</p>
<p>
connectionState (wont hot reload):
</p>
<p>
ws-url:
wss://local-sfu.example.org/
@@ -401,6 +404,9 @@ exports[`DeveloperSettingsTab > renders and matches snapshot 1`] = `
LivekitAlias:
TestAlias2
</p>
<p>
connectionState (wont hot reload):
</p>
<p>
LiveKit Server Info
(

View File

@@ -529,7 +529,6 @@ export function createCallViewModel$(
},
createPublisherFactory: (connection: Connection) => {
return new Publisher(
scope,
connection,
mediaDevices,
muteStates,

View File

@@ -297,6 +297,9 @@ describe("LocalMembership", () => {
seed += 1;
logger.info(`creating [${a}]`);
const p = {
// It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does
// all the cleanup we need.
destroy: vi.fn(),
stopPublishing: vi.fn().mockImplementation(() => {
logger.info(`stopPublishing [${a}]`);
}),
@@ -325,13 +328,12 @@ describe("LocalMembership", () => {
await flushPromises();
localTransport$.next(bTransport);
await flushPromises();
expect(publisherFactory).toHaveBeenCalledTimes(2);
expect(publishers.length).toBe(2);
// stop the first Publisher and let the second one life.
expect(publishers[0].stopTracks).toHaveBeenCalled();
expect(publishers[1].stopTracks).not.toHaveBeenCalled();
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[1].stopPublishing).not.toHaveBeenCalled();
expect(publishers[0].destroy).toHaveBeenCalled();
expect(publishers[1].destroy).not.toHaveBeenCalled();
expect(publisherFactory.mock.calls[0][0].transport).toBe(
aTransport.transport,
);
@@ -341,7 +343,7 @@ describe("LocalMembership", () => {
scope.end();
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[1].stopPublishing).toHaveBeenCalled();
expect(publishers[1].destroy).toHaveBeenCalled();
// expect(publishers[1].stopTracks).toHaveBeenCalled();
defaultCreateLocalMemberValues.createPublisherFactory.mockReset();
@@ -359,8 +361,9 @@ describe("LocalMembership", () => {
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
const p = {
stopPublishing: vi.fn(),
stopTracks: vi.fn(),
// It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does
// all the cleanup we need.
destroy: vi.fn(),
createAndSetupTracks: vi.fn().mockImplementation(async () => {
tracks$.next([{}, {}] as LocalTrack[]);
return Promise.resolve();
@@ -395,11 +398,11 @@ describe("LocalMembership", () => {
localMembership.startTracks();
await flushPromises();
expect(publishers[0].createAndSetupTracks).toHaveBeenCalled();
// expect(localMembership.tracks$.value.length).toBe(2);
scope.end();
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].destroy).toHaveBeenCalled();
// expect(publishers[0].stopTracks).toHaveBeenCalled();
publisherFactory.mockClear();
});
@@ -416,27 +419,22 @@ describe("LocalMembership", () => {
);
const publishers: Publisher[] = [];
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
const publishing$ = new BehaviorSubject<boolean>(false);
const createTrackResolver = Promise.withResolvers<void>();
const publishResolver = Promise.withResolvers<void>();
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
const p = {
stopPublishing: vi.fn(),
stopTracks: vi.fn().mockImplementation(() => {
logger.info("stopTracks");
tracks$.next([]);
}),
// It is enought to check if destroy is called. Destroy itself is tested in the publisher to make sure it does
// all the cleanup we need.
destroy: vi.fn(),
createAndSetupTracks: vi.fn().mockImplementation(async () => {
await createTrackResolver.promise;
tracks$.next([{}, {}] as LocalTrack[]);
}),
startPublishing: vi.fn().mockImplementation(async () => {
await publishResolver.promise;
publishing$.next(true);
}),
tracks$,
publishing$,
};
publishers.push(p as unknown as Publisher);
@@ -536,7 +534,7 @@ describe("LocalMembership", () => {
(localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
expect(publishers[0].stopPublishing).not.toHaveBeenCalled();
expect(publishers[0].destroy).not.toHaveBeenCalled();
expect(localMembership.localMemberState$.isStopped).toBe(false);
scope.end();
@@ -547,7 +545,7 @@ describe("LocalMembership", () => {
(localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].destroy).toHaveBeenCalled();
// expect(publishers[0].stopTracks).toHaveBeenCalled();
});
// TODO add tests for matrix local matrix participation.

View File

@@ -310,13 +310,17 @@ export const createLocalMembership$ = ({
// - destruct all current streams
// - overwrite current publisher
scope.reconcile(localConnection$, async (connection) => {
logger.info(
"reconcile based on new localConnection:",
connection?.transport.livekit_service_url,
);
if (connection !== null) {
const publisher = createPublisherFactory(connection);
publisher$.next(publisher);
// Clean-up callback
return Promise.resolve(async (): Promise<void> => {
await publisher.stopPublishing();
await publisher.stopTracks();
await publisher.destroy();
});
}
});

View File

@@ -16,9 +16,9 @@ import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import {
combineLatest,
distinctUntilChanged,
first,
from,
map,
of,
switchMap,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
@@ -126,36 +126,41 @@ export const createLocalTransport$ = ({
* The transport over which we should be actively publishing our media.
* undefined when not joined.
*/
const oldestMemberTransport$ = scope.behavior(
combineLatest([memberships$]).pipe(
map(([memberships]) => {
const oldestMember = memberships.value[0];
const transport = oldestMember?.getTransport(memberships.value[0]);
if (!transport) return null;
return transport;
}),
first((t) => t != null && isLivekitTransport(t)),
switchMap((transport) => {
// Get the open jwt token to connect to the sfu
const computeLocalTransportWithSFUConfig =
async (): Promise<LocalTransportWithSFUConfig> => {
return {
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
};
};
return from(computeLocalTransportWithSFUConfig());
}),
),
null,
);
const oldestMemberTransport$ =
scope.behavior<LocalTransportWithSFUConfig | null>(
combineLatest([memberships$, useOldestMember$]).pipe(
map(([memberships, useOldestMember]) => {
if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member
const oldestMember = memberships.value[0];
const transport = oldestMember?.getTransport(oldestMember);
if (!transport) return null;
return transport;
}),
switchMap((transport) => {
if (transport !== null && isLivekitTransport(transport)) {
// Get the open jwt token to connect to the sfu
const computeLocalTransportWithSFUConfig =
async (): Promise<LocalTransportWithSFUConfig> => {
// await sleep(1000);
return {
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
};
};
return from(computeLocalTransportWithSFUConfig());
}
return of(null);
}),
),
null,
);
/**
* The transport that we would personally prefer to publish on (if not for the
@@ -200,14 +205,23 @@ export const createLocalTransport$ = ({
oldestMemberTransport$,
preferredTransport$,
]).pipe(
map(([useOldestMember, oldestMemberTransport, preferredTransport]) =>
useOldestMember
map(([useOldestMember, oldestMemberTransport, preferredTransport]) => {
return useOldestMember
? (oldestMemberTransport ?? preferredTransport)
: preferredTransport,
),
distinctUntilChanged((t1, t2) =>
areLivekitTransportsEqual(t1?.transport ?? null, t2?.transport ?? null),
),
: preferredTransport;
}),
distinctUntilChanged((t1, t2) => {
logger.info(
"Local Transport Update from:",
t1?.transport.livekit_service_url,
" to ",
t2?.transport.livekit_service_url,
);
return areLivekitTransportsEqual(
t1?.transport ?? null,
t2?.transport ?? null,
);
}),
),
);
};

View File

@@ -183,7 +183,6 @@ describe("Publisher", () => {
beforeEach(() => {
publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
@@ -192,7 +191,9 @@ describe("Publisher", () => {
);
});
afterEach(() => {});
afterEach(async () => {
await publisher.destroy();
});
it("Should not create tracks if started muted to avoid unneeded permission requests", async () => {
const createTracksSpy = vi.spyOn(
@@ -207,6 +208,38 @@ describe("Publisher", () => {
expect(createTracksSpy).not.toHaveBeenCalled();
});
it("should unsetHandler and stop tracks on destroy", async () => {
// setup all spies
const unsetVideoSpy = vi.spyOn(
(
publisher as unknown as {
muteStates: { video: { unsetHandler: () => void } };
}
).muteStates.video,
"unsetHandler",
);
const unsetAudioSpy = vi.spyOn(
(
publisher as unknown as {
muteStates: { audio: { unsetHandler: () => void } };
}
).muteStates.audio,
"unsetHandler",
);
const scopeEndSpy = vi.spyOn(
(publisher as unknown as { scope: { end: () => void } }).scope,
"end",
);
const stopTracksSpy = vi.spyOn(publisher, "stopTracks");
// destroy publisher
await publisher.destroy();
expect(stopTracksSpy).toHaveBeenCalledOnce();
expect(unsetVideoSpy).toHaveBeenCalledOnce();
expect(unsetAudioSpy).toHaveBeenCalledOnce();
expect(scopeEndSpy).toHaveBeenCalled();
});
it("Should minimize permission request by querying create at once", async () => {
const enableCameraAndMicrophoneSpy = vi.spyOn(
localParticipant,
@@ -267,7 +300,6 @@ describe("Publisher", () => {
let publisher: Publisher;
beforeEach(() => {
publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
@@ -275,6 +307,9 @@ describe("Publisher", () => {
logger,
);
});
afterEach(async () => {
await publisher.destroy();
});
test.each([
{ mutes: { audioEnabled: true, videoEnabled: false } },
@@ -320,7 +355,6 @@ describe("Bug fix", () => {
it("wrongly publish tracks while muted", async () => {
// setLogLevel(`debug`);
const publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
@@ -356,5 +390,6 @@ describe("Bug fix", () => {
expect(track!.mute).toHaveBeenCalled();
expect(track!.isMuted).toBe(true);
}
await publisher.destroy();
});
});

View File

@@ -32,7 +32,7 @@ import {
import { getUrlParams } from "../../../UrlParams.ts";
import { observeTrackReference$ } from "../../MediaViewModel.ts";
import { type Connection } from "../remoteMembers/Connection.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import { ObservableScope } from "../../ObservableScope.ts";
/**
* A wrapper for a Connection object.
@@ -47,9 +47,10 @@ export class Publisher {
*/
public shouldPublish = false;
private readonly scope = new ObservableScope();
/**
* Creates a new Publisher.
* @param scope - The observable scope to use for managing the publisher.
* @param connection - The connection to use for publishing.
* @param devices - The media devices to use for audio and video input.
* @param muteStates - The mute states for audio and video.
@@ -57,7 +58,6 @@ export class Publisher {
* @param logger - The logger to use for logging :D.
*/
public constructor(
private scope: ObservableScope,
private connection: Pick<Connection, "livekitRoom" | "state$">, //setE2EEEnabled,
devices: MediaDevices,
private readonly muteStates: MuteStates,
@@ -65,7 +65,6 @@ export class Publisher {
private logger: Logger,
) {
const { controlledAudioDevices } = getUrlParams();
const room = connection.livekitRoom;
room.setE2EEEnabled(room.options.e2ee !== undefined)?.catch((e: Error) => {
@@ -73,17 +72,11 @@ export class Publisher {
});
// Setup track processor syncing (blur)
this.observeTrackProcessors(scope, room, trackerProcessorState$);
this.observeTrackProcessors(this.scope, room, trackerProcessorState$);
// Observe media device changes and update LiveKit active devices accordingly
this.observeMediaDevices(scope, devices, controlledAudioDevices);
this.observeMediaDevices(this.scope, devices, controlledAudioDevices);
this.workaroundRestartAudioInputTrackChrome(devices, scope);
this.scope.onEnd(() => {
this.logger.info("Scope ended -> stop publishing all tracks");
void this.stopPublishing();
muteStates.audio.unsetHandler();
muteStates.video.unsetHandler();
});
this.workaroundRestartAudioInputTrackChrome(devices, this.scope);
this.connection.livekitRoom.localParticipant.on(
ParticipantEvent.LocalTrackPublished,
@@ -91,6 +84,21 @@ export class Publisher {
);
}
public async destroy(): Promise<void> {
this.scope.end();
this.logger.info("Scope ended -> unset handler");
this.muteStates.audio.unsetHandler();
this.muteStates.video.unsetHandler();
this.logger.info(`Start to stop tracks`);
try {
await this.stopTracks();
this.logger.info(`Done to stop tracks`);
} catch (e) {
this.logger.error(`Failed to stop tracks: ${e}`);
}
}
// LiveKit will publish the tracks as soon as they are created
// but we want to control when tracks are published.
// We cannot just mute the tracks, even if this will effectively stop the publishing,

View File

@@ -155,6 +155,16 @@ export class Connection {
const { url, jwt, livekitAlias } =
this.existingSFUConfig ??
(await this.getSFUConfigForRemoteConnection());
this.logger.debug(
"Starting Connection to: ",
this.transport.livekit_service_url,
"jwt: ",
jwt,
"wss: ",
url,
"livekitAlias: ",
livekitAlias,
);
this._livekitAlias = livekitAlias;
// If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return;
@@ -171,8 +181,11 @@ export class Connection {
});
try {
this.logger.info(`livekitRoom.connect ${url}`);
await this.livekitRoom.connect(url, jwt);
this.logger.info(`livekitRoom.connect SUCCESS ${url}`);
} catch (e) {
this.logger.info(`livekitRoom.connect FAILED ${url}`, e);
// LiveKit uses 503 to indicate that the server has hit its track limits.
// https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171
// It also errors with a status code of 200 (yes, really) for room
@@ -233,12 +246,15 @@ export class Connection {
*/
public async stop(): Promise<void> {
this.logger.debug(
`Stopping connection to ${this.transport.livekit_service_url}`,
`stop: disconnecing from lk room ${this.transport.livekit_service_url}`,
);
if (this.stopped) return;
await this.livekitRoom.disconnect();
this._state$.next(ConnectionState.Stopped);
this.stopped = true;
this.logger.debug(
`stop: DONE disconnecing from lk room ${this.transport.livekit_service_url}`,
);
}
private readonly client: OpenIDClientParts;
@@ -255,9 +271,11 @@ export class Connection {
public constructor(opts: ConnectionOpts, logger: Logger) {
this.ownMembershipIdentity = opts.ownMembershipIdentity;
this.existingSFUConfig = opts.existingSFUConfig;
this.logger = logger.getChild("[Connection]");
this.logger = logger.getChild(
"[Connection " + opts.transport.livekit_service_url + "]",
);
this.logger.info(
`Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
`constructor: ${opts.transport.livekit_service_url} alias: ${opts.transport.livekit_alias} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`,
);
const { transport, client, scope } = opts;

View File

@@ -189,10 +189,6 @@ export function createConnectionManager$({
}
},
(scope, _data$, serviceUrl, alias, sfuConfig) => {
logger.debug(
`Creating connection to ${serviceUrl} (${alias}, withSfuConfig (local connection?): ${JSON.stringify(sfuConfig) ?? "no config->remote connection"})`,
);
const connection = connectionFactory.createConnection(
scope,
{