Merge remote-tracking branch 'origin/livekit' into hs/support-homeserver-rtc-transport

This commit is contained in:
Half-Shot
2025-12-29 16:05:25 +00:00
26 changed files with 2134 additions and 466 deletions

View File

@@ -9,7 +9,7 @@ import { type KnipConfig } from "knip";
export default {
vite: {
config: ["vite.config.ts", "vite-embedded.config.ts"],
config: ["vite.config.ts", "vite-embedded.config.ts", "vite-sdk.config.ts"],
},
entry: ["src/main.tsx", "i18next-parser.config.ts"],
ignoreBinaries: [

View File

@@ -13,6 +13,8 @@
"build:embedded": "yarn build:full --config vite-embedded.config.js",
"build:embedded:production": "yarn build:embedded",
"build:embedded:development": "yarn build:embedded --mode development",
"build:sdk": "yarn build:full --config vite-sdk.config.js",
"build:sdk:development": "yarn build:sdk --mode development",
"serve": "vite preview",
"prettier:check": "prettier -c .",
"prettier:format": "prettier -w .",
@@ -111,6 +113,7 @@
"loglevel": "^1.9.1",
"matrix-js-sdk": "matrix-org/matrix-js-sdk#2218ec4e3102e841ba3e794e1c492c0a5aa6c1c3",
"matrix-widget-api": "^1.14.0",
"node-stdlib-browser": "^1.3.1",
"normalize.css": "^8.0.1",
"observable-hooks": "^4.2.3",
"pako": "^2.0.4",
@@ -133,6 +136,7 @@
"vite": "^7.0.0",
"vite-plugin-generate-file": "^0.3.0",
"vite-plugin-html": "^3.2.2",
"vite-plugin-node-stdlib-browser": "^0.2.1",
"vite-plugin-svgr": "^4.0.0",
"vitest": "^3.0.0",
"vitest-axe": "^1.0.0-pre.3"

35
sdk/README.md Normal file
View File

@@ -0,0 +1,35 @@
# SDK mode
EC can be build in sdk mode. This will result in a compiled js file that can be imported in very simple webapps.
It allows to use matrixRTC in combination with livekit without relying on element call.
This is done by instantiating the call view model and exposing some useful behaviors (observables) and methods.
This folder contains an example index.html file that showcases the sdk in use (hosted on localhost:8123 with a webserver ellowing cors (for example `npx serve -l 81234 --cors`)) as a godot engine HTML export template.
## Widgets
The sdk mode is particularly interesting to be used in widgets where you do not need to pay attention to matrix login/cs api ...
To create a widget see the example index.html file in this folder. And add it to EW via:
`/addwidget <widgetUrl>` (see **url parameters** for more details on `<widgetUrl>`)
### url parameters
```
widgetId = $matrix_widget_id
perParticipantE2EE = true
userId = $matrix_user_id
deviceId = $org.matrix.msc3819.matrix_device_id
baseUrl = $org.matrix.msc4039.matrix_base_url
```
`parentUrl = // will be inserted automatically`
Full template use as `<widgetUrl>`:
```
http://localhost:3000?widgetId=$matrix_widget_id&perParticipantE2EE=true&userId=$matrix_user_id&deviceId=$org.matrix.msc3819.matrix_device_id&baseUrl=$org.matrix.msc4039.matrix_base_url&roomId=$matrix_room_id
```
the `$` prefixed variables will be replaced by EW on widget instantiation. (e.g. `$matrix_user_id` -> `@user:example.com` (url encoding will also be applied automatically by EW) -> `%40user%3Aexample.com`)

55
sdk/helper.ts Normal file
View File

@@ -0,0 +1,55 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
/**
* This file contains helper functions and types for the MatrixRTC SDK.
*/
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { scan } from "rxjs";
import { widget as _widget } from "../src/widget";
import { type LivekitRoomItem } from "../src/state/CallViewModel/CallViewModel";
export const logger = rootLogger.getChild("[MatrixRTCSdk]");
if (!_widget) throw Error("No widget. This webapp can only start as a widget");
export const widget = _widget;
export const tryMakeSticky = (): void => {
logger.info("try making sticky MatrixRTCSdk");
void widget.api
.setAlwaysOnScreen(true)
.then(() => {
logger.info("sticky MatrixRTCSdk");
})
.catch((error) => {
logger.error("failed to make sticky MatrixRTCSdk", error);
});
};
export const TEXT_LK_TOPIC = "matrixRTC";
/**
* simple helper operator to combine the last emitted and the current emitted value of a rxjs observable
*
* I think there should be a builtin for this but i did not find it...
*/
export const currentAndPrev = scan<
LivekitRoomItem[],
{
prev: LivekitRoomItem[];
current: LivekitRoomItem[];
}
>(
({ current: lastCurrentVal }, items) => ({
prev: lastCurrentVal,
current: items,
}),
{
prev: [],
current: [],
},
);

87
sdk/index.html Normal file
View File

@@ -0,0 +1,87 @@
<!doctype html>
<html>
<head>
<title>Godot MatrixRTC Widget</title>
<meta charset="utf-8" />
<script type="module">
// TODO use the url where the matrixrtc-sdk.js file from dist is hosted
import { createMatrixRTCSdk } from "http://localhost:8123/matrixrtc-sdk.js";
try {
window.matrixRTCSdk = await createMatrixRTCSdk(
"com.github.toger5.godot-game",
);
console.info("createMatrixRTCSdk was created!");
} catch (e) {
console.error("createMatrixRTCSdk", e);
}
const sdk = window.matrixRTCSdk;
// This is the main bridging interface to godot
window.matrixRTCSdkGodot = {
dataObs: sdk.data$,
memberObs: sdk.members$,
// join: sdk.join, // lets stick with autojoin for now
sendData: sdk.sendData,
leave: sdk.leave,
connectedObs: sdk.connected$,
};
console.info("matrixRTCSdk join ", sdk);
const connectionState = sdk.join();
console.info("matrixRTCSdk joined");
const div = document.getElementById("data");
div.innerHTML = "<h3>Data:</h3>";
sdk.data$.subscribe((data) => {
const child = document.createElement("p");
child.innerHTML = JSON.stringify(data);
div.appendChild(child);
// TODO forward to godot
});
sdk.members$.subscribe((memberObjects) => {
// reset div
const div = document.getElementById("members");
div.innerHTML = "<h3>Members:</h3>";
// create member list
const members = memberObjects.map((member) => member.membership.sender);
console.info("members changed", members);
for (const m of members) {
console.info("member", m);
const child = document.createElement("p");
child.innerHTML = m;
div.appendChild(child);
}
});
sdk.connected$.subscribe((connected) => {
console.info("connected changed", connected);
const div = document.getElementById("connect_status");
div.innerHTML = connected ? "Connected" : "Disconnected";
});
let engine = new Engine($GODOT_CONFIG);
engine.startGame();
</script>
<!--// TODO use it as godot HTML template-->
<script src="$GODOT_URL"></script>
</head>
<body>
<canvas id="canvas"></canvas>
<div
id="overlay"
style="position: absolute; top: 0; right: 0; background-color: #ffffff10"
>
<div id="connect_status"></div>
<button onclick="window.matrixRTCSdk.leave();">Leave</button>
<button onclick="window.matrixRTCSdk.sendData({prop: 'Hello, world!'});">
Send Text
</button>
<div id="members"></div>
<div id="data"></div>
</div>
</body>
</html>

308
sdk/main.ts Normal file
View File

@@ -0,0 +1,308 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
/**
* This file is the entrypoint for the sdk build of element call: `yarn build:sdk`
* use in widgets.
* It exposes the `createMatrixRTCSdk` which creates the `MatrixRTCSdk` interface (see below) that
* can be used to join a rtc session and exchange realtime data.
* It takes care of all the tricky bits:
* - sending delayed events
* - finding the right sfu
* - handling the media stream
* - sending join/leave state or sticky events
* - setting up encryption and scharing keys
*/
import {
combineLatest,
map,
type Observable,
of,
shareReplay,
Subject,
switchMap,
tap,
} from "rxjs";
import {
type CallMembership,
MatrixRTCSession,
MatrixRTCSessionEvent,
} from "matrix-js-sdk/lib/matrixrtc";
import {
type Room as LivekitRoom,
type TextStreamReader,
type LocalParticipant,
type RemoteParticipant,
} from "livekit-client";
// TODO how can this get fixed? to just be part of `livekit-client`
// Can this be done in the tsconfig.json
import { type TextStreamInfo } from "../node_modules/livekit-client/dist/src/room/types";
import { type Behavior, constant } from "../src/state/Behavior";
import { createCallViewModel$ } from "../src/state/CallViewModel/CallViewModel";
import { ObservableScope } from "../src/state/ObservableScope";
import { getUrlParams } from "../src/UrlParams";
import { MuteStates } from "../src/state/MuteStates";
import { MediaDevices } from "../src/state/MediaDevices";
import { E2eeType } from "../src/e2ee/e2eeType";
import {
currentAndPrev,
logger,
TEXT_LK_TOPIC,
tryMakeSticky,
widget,
} from "./helper";
import { ElementWidgetActions } from "../src/widget";
import { type Connection } from "../src/state/CallViewModel/remoteMembers/Connection";
interface MatrixRTCSdk {
/**
* observe connected$ to track the state.
* @returns
*/
join: () => void;
/** @throws on leave errors */
leave: () => void;
data$: Observable<{ sender: string; data: string }>;
/**
* flattened list of members
*/
members$: Behavior<
{
connection: Connection | null;
membership: CallMembership;
participant: LocalParticipant | RemoteParticipant | null;
}[]
>;
/** Use the LocalMemberConnectionState returned from `join` for a more detailed connection state */
connected$: Behavior<boolean>;
sendData?: (data: unknown) => Promise<void>;
}
export async function createMatrixRTCSdk(
application: string = "m.call",
id: string = "",
): Promise<MatrixRTCSdk> {
logger.info("Hello");
const client = await widget.client;
logger.info("client created");
const scope = new ObservableScope();
const { roomId } = getUrlParams();
if (roomId === null) throw Error("could not get roomId from url params");
const room = client.getRoom(roomId);
if (room === null) throw Error("could not get room from client");
const mediaDevices = new MediaDevices(scope);
const muteStates = new MuteStates(scope, mediaDevices, constant(true));
const slot = { application, id };
const rtcSession = new MatrixRTCSession(
client,
room,
MatrixRTCSession.sessionMembershipsForSlot(room, slot),
slot,
);
const callViewModel = createCallViewModel$(
scope,
rtcSession,
room,
mediaDevices,
muteStates,
{ encryptionSystem: { kind: E2eeType.PER_PARTICIPANT } },
of({}),
of({}),
constant({ supported: false, processor: undefined }),
);
logger.info("CallViewModelCreated");
// create data listener
const data$ = new Subject<{ sender: string; data: string }>();
const lkTextStreamHandlerFunction = async (
reader: TextStreamReader,
participantInfo: { identity: string },
livekitRoom: LivekitRoom,
): Promise<void> => {
const info = reader.info;
logger.info(
`Received text stream from ${participantInfo.identity}\n` +
` Topic: ${info.topic}\n` +
` Timestamp: ${info.timestamp}\n` +
` ID: ${info.id}\n` +
` Size: ${info.size}`, // Optional, only available if the stream was sent with `sendText`
);
const participants = callViewModel.livekitRoomItems$.value.find(
(i) => i.livekitRoom === livekitRoom,
)?.participants;
if (participants && participants.includes(participantInfo.identity)) {
const text = await reader.readAll();
logger.info(`Received text: ${text}`);
data$.next({ sender: participantInfo.identity, data: text });
} else {
logger.warn(
"Received text from unknown participant",
participantInfo.identity,
);
}
};
const livekitRoomItemsSub = callViewModel.livekitRoomItems$
.pipe(
tap((beforecurrentAndPrev) => {
logger.info(
`LiveKit room items updated: ${beforecurrentAndPrev.length}`,
beforecurrentAndPrev,
);
}),
currentAndPrev,
tap((aftercurrentAndPrev) => {
logger.info(
`LiveKit room items updated: ${aftercurrentAndPrev.current.length}, ${aftercurrentAndPrev.prev.length}`,
aftercurrentAndPrev,
);
}),
)
.subscribe({
next: ({ prev, current }) => {
const prevRooms = prev.map((i) => i.livekitRoom);
const currentRooms = current.map((i) => i.livekitRoom);
const addedRooms = currentRooms.filter((r) => !prevRooms.includes(r));
const removedRooms = prevRooms.filter((r) => !currentRooms.includes(r));
addedRooms.forEach((r) => {
logger.info(`Registering text stream handler for room `);
r.registerTextStreamHandler(
TEXT_LK_TOPIC,
(reader, participantInfo) =>
void lkTextStreamHandlerFunction(reader, participantInfo, r),
);
});
removedRooms.forEach((r) => {
logger.info(`Unregistering text stream handler for room `);
r.unregisterTextStreamHandler(TEXT_LK_TOPIC);
});
},
complete: () => {
logger.info("Livekit room items subscription completed");
for (const item of callViewModel.livekitRoomItems$.value) {
logger.info("unregistering room item from room", item.url);
item.livekitRoom.unregisterTextStreamHandler(TEXT_LK_TOPIC);
}
},
});
// create sendData function
const sendFn: Behavior<(data: string) => Promise<TextStreamInfo>> =
scope.behavior(
callViewModel.localMatrixLivekitMember$.pipe(
switchMap((m) => {
if (!m)
return of((data: string): never => {
throw Error("local membership not yet ready.");
});
return m.participant.value$.pipe(
map((p) => {
if (p === null) {
return (data: string): never => {
throw Error("local participant not yet ready to send data.");
};
} else {
return async (data: string): Promise<TextStreamInfo> =>
p.sendText(data, { topic: TEXT_LK_TOPIC });
}
}),
);
}),
),
);
const sendData = async (data: unknown): Promise<void> => {
const dataString = JSON.stringify(data);
logger.info("try sending: ", dataString);
try {
await Promise.resolve();
const info = await sendFn.value(dataString);
logger.info(`Sent text with stream ID: ${info.id}`);
} catch (e) {
logger.error("failed sending: ", dataString, e);
}
};
// after hangup gets called
const leaveSubs = callViewModel.leave$.subscribe(() => {
const scheduleWidgetCloseOnLeave = async (): Promise<void> => {
const leaveResolver = Promise.withResolvers<void>();
logger.info("waiting for RTC leave");
rtcSession.on(MatrixRTCSessionEvent.JoinStateChanged, (isJoined) => {
logger.info("received RTC join update: ", isJoined);
if (!isJoined) leaveResolver.resolve();
});
await leaveResolver.promise;
logger.info("send Unstick");
await widget.api
.setAlwaysOnScreen(false)
.catch((e) =>
logger.error(
"Failed to set call widget `alwaysOnScreen` to false",
e,
),
);
logger.info("send Close");
await widget.api.transport
.send(ElementWidgetActions.Close, {})
.catch((e) => logger.error("Failed to send close action", e));
};
// schedule close first and then leave (scope.end)
void scheduleWidgetCloseOnLeave();
// actual hangup (ending scope will send the leave event.. its kinda odd. since you might end up closing the widget too fast)
scope.end();
});
logger.info("createMatrixRTCSdk done");
return {
join: (): void => {
// first lets try making the widget sticky
tryMakeSticky();
callViewModel.join();
},
leave: (): void => {
callViewModel.hangup();
leaveSubs.unsubscribe();
livekitRoomItemsSub.unsubscribe();
},
data$,
connected$: callViewModel.connected$,
members$: scope.behavior(
callViewModel.matrixLivekitMembers$.pipe(
switchMap((members) => {
const listOfMemberObservables = members.map((member) =>
combineLatest([
member.connection$,
member.membership$,
member.participant.value$,
]).pipe(
map(([connection, membership, participant]) => ({
connection,
membership,
participant,
})),
// using shareReplay instead of a Behavior here because the behavior would need
// a tricky scope.end() setup.
shareReplay({ bufferSize: 1, refCount: true }),
),
);
return combineLatest(listOfMemberObservables);
}),
),
[],
),
sendData,
};
}

View File

@@ -260,7 +260,7 @@ export const InCallView: FC<InCallViewProps> = ({
() => void toggleRaisedHand(),
);
const audioParticipants = useBehavior(vm.audioParticipants$);
const audioParticipants = useBehavior(vm.livekitRoomItems$);
const participantCount = useBehavior(vm.participantCount$);
const reconnecting = useBehavior(vm.reconnecting$);
const windowMode = useBehavior(vm.windowMode$);

View File

@@ -80,7 +80,7 @@ import {
} from "../../reactions";
import { shallowEquals } from "../../utils/array";
import { type MediaDevices } from "../MediaDevices";
import { type Behavior } from "../Behavior";
import { constant, type Behavior } from "../Behavior";
import { E2eeType } from "../../e2ee/e2eeType";
import { MatrixKeyProvider } from "../../e2ee/matrixKeyProvider";
import { type MuteStates } from "../MuteStates";
@@ -117,6 +117,7 @@ import {
createMatrixLivekitMembers$,
type TaggedParticipant,
type LocalMatrixLivekitMember,
type RemoteMatrixLivekitMember,
} from "./remoteMembers/MatrixLivekitMembers.ts";
import {
type AutoLeaveReason,
@@ -156,7 +157,7 @@ export interface CallViewModelOptions {
/** Optional behavior overriding the computed window size, mainly for testing purposes. */
windowSize$?: Behavior<{ width: number; height: number }>;
/** The version & compatibility mode of MatrixRTC that we should use. */
matrixRTCMode$: Behavior<MatrixRTCMode>;
matrixRTCMode$?: Behavior<MatrixRTCMode>;
}
// Do not play any sounds if the participant count has exceeded this
@@ -182,7 +183,7 @@ interface LayoutScanState {
}
type MediaItem = UserMedia | ScreenShare;
type AudioLivekitItem = {
export type LivekitRoomItem = {
livekitRoom: LivekitRoom;
participants: string[];
url: string;
@@ -205,8 +206,11 @@ export interface CallViewModel {
callPickupState$: Behavior<
"unknown" | "ringing" | "timeout" | "decline" | "success" | null
>;
/** Observable that emits when the user should leave the call (hangup pressed, widget action, error).
* THIS DOES NOT LEAVE THE CALL YET. The only way to leave the call (send the hangup event) is by ending the scope.
*/
leave$: Observable<"user" | AutoLeaveReason>;
/** Call to initiate hangup. Use in conbination with connectino state track the async hangup process. */
/** Call to initiate hangup. Use in conbination with reconnectino state track the async hangup process. */
hangup: () => void;
// joining
@@ -258,7 +262,11 @@ export interface CallViewModel {
*/
participantCount$: Behavior<number>;
/** Participants sorted by livekit room so they can be used in the audio rendering */
audioParticipants$: Behavior<AudioLivekitItem[]>;
livekitRoomItems$: Behavior<LivekitRoomItem[]>;
userMedia$: Behavior<UserMedia[]>;
/** use the layout instead, this is just for the sdk export. */
matrixLivekitMembers$: Behavior<RemoteMatrixLivekitMember[]>;
localMatrixLivekitMember$: Behavior<LocalMatrixLivekitMember | null>;
/** List of participants raising their hand */
handsRaised$: Behavior<Record<string, RaisedHandInfo>>;
/** List of reactions. Keys are: membership.membershipId (currently predefined as: `${membershipEvent.userId}:${membershipEvent.deviceId}`)*/
@@ -341,17 +349,15 @@ export interface CallViewModel {
switch: () => void;
} | null>;
// connection state
/**
* Whether various media/event sources should pretend to be disconnected from
* all network input, even if their connection still technically works.
* Whether the app is currently reconnecting to the LiveKit server and/or setting the matrix rtc room state.
*/
// We do this when the app is in the 'reconnecting' state, because it might be
// that the LiveKit connection is still functional while the homeserver is
// down, for example, and we want to avoid making people worry that the app is
// in a split-brained state.
// DISCUSSION own membership manager ALSO this probably can be simplifis
reconnecting$: Behavior<boolean>;
/**
* Shortcut for not requireing to parse and combine connectionState.matrix and connectionState.livekit
*/
connected$: Behavior<boolean>;
}
/**
@@ -381,6 +387,8 @@ export function createCallViewModel$(
options.encryptionSystem,
matrixRTCSession,
);
const matrixRTCMode$ =
options.matrixRTCMode$ ?? constant(MatrixRTCMode.Legacy);
// Each hbar seperates a block of input variables required for the CallViewModel to function.
// The outputs of this block is written under the hbar.
@@ -413,7 +421,7 @@ export function createCallViewModel$(
client,
roomId: matrixRoom.roomId,
useOldestMember$: scope.behavior(
options.matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
),
});
@@ -454,7 +462,7 @@ export function createCallViewModel$(
},
),
),
logger: logger,
logger,
});
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
@@ -465,7 +473,7 @@ export function createCallViewModel$(
});
const connectOptions$ = scope.behavior(
options.matrixRTCMode$.pipe(
matrixRTCMode$.pipe(
map((mode) => ({
encryptMedia: livekitKeyProvider !== undefined,
// TODO. This might need to get called again on each change of matrixRTCMode...
@@ -497,7 +505,7 @@ export function createCallViewModel$(
muteStates,
trackProcessorState$,
logger.getChild(
"[Publisher" + connection.transport.livekit_service_url + "]",
"[Publisher " + connection.transport.livekit_service_url + "]",
),
);
},
@@ -596,8 +604,11 @@ export function createCallViewModel$(
),
);
const audioParticipants$ = scope.behavior(
const livekitRoomItems$ = scope.behavior(
matrixLivekitMembers$.pipe(
tap((val) => {
logger.debug("matrixLivekitMembers$ updated", val.value);
}),
switchMap((membersWithEpoch) => {
const members = membersWithEpoch.value;
const a$ = combineLatest(
@@ -622,7 +633,7 @@ export function createCallViewModel$(
return a$;
}),
map((members) =>
members.reduce<AudioLivekitItem[]>((acc, curr) => {
members.reduce<LivekitRoomItem[]>((acc, curr) => {
if (!curr) return acc;
const existing = acc.find((item) => item.url === curr.url);
@@ -1492,10 +1503,7 @@ export function createCallViewModel$(
),
null,
),
participantCount$: participantCount$,
audioParticipants$: audioParticipants$,
handsRaised$: handsRaised$,
reactions$: reactions$,
joinSoundEffect$: joinSoundEffect$,
@@ -1514,6 +1522,16 @@ export function createCallViewModel$(
spotlight$: spotlight$,
pip$: pip$,
layout$: layout$,
userMedia$,
localMatrixLivekitMember$,
matrixLivekitMembers$: scope.behavior(
matrixLivekitMembers$.pipe(
map((members) => members.value),
tap((v) => {
logger.debug("matrixLivekitMembers$ updated (exported)", v);
}),
),
),
tileStoreGeneration$: tileStoreGeneration$,
showSpotlightIndicators$: showSpotlightIndicators$,
showSpeakingIndicators$: showSpeakingIndicators$,
@@ -1522,6 +1540,8 @@ export function createCallViewModel$(
earpieceMode$: earpieceMode$,
audioOutputSwitcher$: audioOutputSwitcher$,
reconnecting$: localMembership.reconnecting$,
livekitRoomItems$,
connected$: localMembership.connected$,
};
}

View File

@@ -254,10 +254,12 @@ describe("LocalMembership", () => {
const connectionTransportAConnecting = {
...connectionTransportAConnected,
state$: constant(ConnectionState.LivekitConnecting),
livekitRoom: mockLivekitRoom({}),
} as unknown as Connection;
const connectionTransportBConnected = {
state$: constant(ConnectionState.LivekitConnected),
transport: bTransport,
livekitRoom: mockLivekitRoom({}),
} as unknown as Connection;
it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => {
@@ -266,13 +268,17 @@ describe("LocalMembership", () => {
const localTransport$ = new BehaviorSubject(aTransport);
const publishers: Publisher[] = [];
let seed = 0;
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
const a = seed;
seed += 1;
logger.info(`creating [${a}]`);
const p = {
stopPublishing: vi.fn(),
stopPublishing: vi.fn().mockImplementation(() => {
logger.info(`stopPublishing [${a}]`);
}),
stopTracks: vi.fn(),
publishing$: constant(false),
};
publishers.push(p as unknown as Publisher);
return p;
@@ -310,7 +316,7 @@ describe("LocalMembership", () => {
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[1].stopPublishing).toHaveBeenCalled();
expect(publishers[1].stopTracks).toHaveBeenCalled();
// expect(publishers[1].stopTracks).toHaveBeenCalled();
defaultCreateLocalMemberValues.createPublisherFactory.mockReset();
});
@@ -358,15 +364,17 @@ describe("LocalMembership", () => {
});
await flushPromises();
expect(publisherFactory).toHaveBeenCalledOnce();
expect(localMembership.tracks$.value.length).toBe(0);
// expect(localMembership.tracks$.value.length).toBe(0);
expect(publishers[0].createAndSetupTracks).not.toHaveBeenCalled();
localMembership.startTracks();
await flushPromises();
expect(localMembership.tracks$.value.length).toBe(2);
expect(publishers[0].createAndSetupTracks).toHaveBeenCalled();
// expect(localMembership.tracks$.value.length).toBe(2);
scope.end();
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].stopTracks).toHaveBeenCalled();
// expect(publishers[0].stopTracks).toHaveBeenCalled();
publisherFactory.mockClear();
});
// TODO add an integration test combining publisher and localMembership
@@ -464,20 +472,20 @@ describe("LocalMembership", () => {
});
expect(publisherFactory).toHaveBeenCalledOnce();
expect(localMembership.tracks$.value.length).toBe(0);
// expect(localMembership.tracks$.value.length).toBe(0);
// -------
localMembership.startTracks();
// -------
await flushPromises();
expect(localMembership.localMemberState$.value).toStrictEqual({
matrix: RTCMemberStatus.Connected,
media: {
tracks: TrackState.Creating,
connection: ConnectionState.LivekitConnected,
},
});
// expect(localMembership.localMemberState$.value).toStrictEqual({
// matrix: RTCMemberStatus.Connected,
// media: {
// tracks: TrackState.Creating,
// connection: ConnectionState.LivekitConnected,
// },
// });
createTrackResolver.resolve();
await flushPromises();
expect(
@@ -492,7 +500,7 @@ describe("LocalMembership", () => {
expect(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Starting);
).toStrictEqual(PublishState.Publishing);
publishResolver.resolve();
await flushPromises();
@@ -513,7 +521,7 @@ describe("LocalMembership", () => {
).toStrictEqual(PublishState.Publishing);
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].stopTracks).toHaveBeenCalled();
// expect(publishers[0].stopTracks).toHaveBeenCalled();
});
// TODO add tests for matrix local matrix participation.
});

View File

@@ -6,11 +6,12 @@ Please see LICENSE in the repository root for full details.
*/
import {
type LocalTrack,
type Participant,
ParticipantEvent,
type LocalParticipant,
type ScreenShareCaptureOptions,
RoomEvent,
MediaDeviceFailure,
} from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core";
import {
@@ -24,6 +25,7 @@ import {
combineLatest,
distinctUntilChanged,
from,
fromEvent,
map,
type Observable,
of,
@@ -35,7 +37,7 @@ import {
import { type Logger } from "matrix-js-sdk/lib/logger";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { constant, type Behavior } from "../../Behavior.ts";
import { type Behavior } from "../../Behavior.ts";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import { type Publisher } from "./Publisher.ts";
@@ -66,17 +68,23 @@ export enum TransportState {
export enum PublishState {
WaitingForUser = "publish_waiting_for_user",
/** Implies lk connection is connected */
Starting = "publish_start_publishing",
// XXX: This state is removed for now since we do not have full control over
// track publication anymore with the publisher abstraction, might come back in the future?
// /** Implies lk connection is connected */
// Starting = "publish_start_publishing",
/** Implies lk connection is connected */
Publishing = "publish_publishing",
}
// TODO not sure how to map that correctly with the
// new publisher that does not manage tracks itself anymore
export enum TrackState {
/** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */
WaitingForUser = "tracks_waiting_for_user",
/** Implies lk connection is connected */
Creating = "tracks_creating",
// XXX: This state is removed for now since we do not have full control over
// track creation anymore with the publisher abstraction, might come back in the future?
// /** Implies lk connection is connected */
// Creating = "tracks_creating",
/** Implies lk connection is connected */
Ready = "tracks_ready",
}
@@ -150,9 +158,10 @@ export const createLocalMembership$ = ({
matrixRTCSession,
}: Props): {
/**
* This starts audio and video tracks. They will be reused when calling `requestPublish`.
* This request to start audio and video tracks.
* Can be called early to pre-emptively get media permissions and start devices.
*/
startTracks: () => Behavior<LocalTrack[]>;
startTracks: () => void;
/**
* This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user
* connected to matrix and livekit.
@@ -165,17 +174,21 @@ export const createLocalMembership$ = ({
* Callback to toggle screen sharing. If null, screen sharing is not possible.
*/
toggleScreenSharing: (() => void) | null;
tracks$: Behavior<LocalTrack[]>;
// tracks$: Behavior<LocalTrack[]>;
participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>;
/** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting
* Direct translation to the js-sdk membership manager connection `Status`.
/**
* Tracks the homserver and livekit connected state and based on that computes reconnecting.
*/
reconnecting$: Behavior<boolean>;
/** Shorthand for homeserverConnected.rtcSession === Status.Disconnected
* Direct translation to the js-sdk membership manager connection `Status`.
*/
disconnected$: Behavior<boolean>;
/**
* Fully connected
*/
connected$: Behavior<boolean>;
} => {
const logger = parentLogger.getChild("[LocalMembership]");
logger.debug(`Creating local membership..`);
@@ -221,6 +234,32 @@ export const createLocalMembership$ = ({
),
);
// Tracks error that happen when creating the local tracks.
const mediaErrors$ = localConnection$.pipe(
switchMap((connection) => {
if (!connection) {
return of(null);
} else {
return fromEvent(
connection.livekitRoom,
RoomEvent.MediaDevicesError,
(error: Error) => {
return MediaDeviceFailure.getFailure(error) ?? null;
},
);
}
}),
);
mediaErrors$.pipe(scope.bind()).subscribe((error) => {
if (error) {
logger.error(`Failed to create local tracks:`, error);
setMatrixError(
// TODO is it fatal? Do we need to create a new Specialized Error?
new UnknownCallError(new Error(`Media device error: ${error}`)),
);
}
});
// MATRIX RELATED
// This should be used in a combineLatest with publisher$ to connect.
@@ -235,19 +274,10 @@ export const createLocalMembership$ = ({
* The publisher is stored in here an abstracts creating and publishing tracks.
*/
const publisher$ = new BehaviorSubject<Publisher | null>(null);
/**
* Extract the tracks from the published. Also reacts to changing publishers.
*/
const tracks$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))),
);
const publishing$ = scope.behavior(
publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))),
);
const startTracks = (): Behavior<LocalTrack[]> => {
const startTracks = (): void => {
trackStartRequested.resolve();
return tracks$;
// This used to return the tracks, but now they are only accessible via the publisher.
};
const requestJoinAndPublish = (): void => {
@@ -273,7 +303,7 @@ export const createLocalMembership$ = ({
// Clean-up callback
return Promise.resolve(async (): Promise<void> => {
await publisher.stopPublishing();
publisher.stopTracks();
await publisher.stopTracks();
});
}
});
@@ -282,13 +312,16 @@ export const createLocalMembership$ = ({
// `tracks$` will update once they are ready.
scope.reconcile(
scope.behavior(
combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]),
combineLatest([
publisher$ /*, tracks$*/,
from(trackStartRequested.promise),
]),
null,
),
async (valueIfReady) => {
if (!valueIfReady) return;
const [publisher, tracks] = valueIfReady;
if (publisher && tracks.length === 0) {
const [publisher] = valueIfReady;
if (publisher) {
await publisher.createAndSetupTracks().catch((e) => logger.error(e));
}
},
@@ -296,12 +329,11 @@ export const createLocalMembership$ = ({
// Based on `connectRequested$` we start publishing tracks. (once they are there!)
scope.reconcile(
scope.behavior(
combineLatest([publisher$, tracks$, joinAndPublishRequested$]),
),
async ([publisher, tracks, shouldJoinAndPublish]) => {
if (shouldJoinAndPublish === publisher?.publishing$.value) return;
if (tracks.length !== 0 && shouldJoinAndPublish) {
scope.behavior(combineLatest([publisher$, joinAndPublishRequested$])),
async ([publisher, shouldJoinAndPublish]) => {
// Get the current publishing state to avoid redundant calls.
const isPublishing = publisher?.shouldPublish === true;
if (shouldJoinAndPublish && !isPublishing) {
try {
await publisher?.startPublishing();
} catch (error) {
@@ -309,7 +341,7 @@ export const createLocalMembership$ = ({
error instanceof Error ? error.message : String(error);
setPublishError(new FailToStartLivekitConnection(message));
}
} else if (tracks.length !== 0 && !shouldJoinAndPublish) {
} else if (isPublishing) {
try {
await publisher?.stopPublishing();
} catch (error) {
@@ -351,8 +383,6 @@ export const createLocalMembership$ = ({
combineLatest([
localConnectionState$,
localTransport$,
tracks$,
publishing$,
joinAndPublishRequested$,
from(trackStartRequested.promise).pipe(
map(() => true),
@@ -363,16 +393,13 @@ export const createLocalMembership$ = ({
([
localConnectionState,
localTransport,
tracks,
publishing,
shouldPublish,
shouldStartTracks,
]) => {
if (!localTransport) return null;
const hasTracks = tracks.length > 0;
let trackState: TrackState = TrackState.WaitingForUser;
if (hasTracks && shouldStartTracks) trackState = TrackState.Ready;
if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating;
const trackState: TrackState = shouldStartTracks
? TrackState.Ready
: TrackState.WaitingForUser;
if (
localConnectionState !== ConnectionState.LivekitConnected ||
@@ -383,7 +410,7 @@ export const createLocalMembership$ = ({
tracks: trackState,
};
if (!shouldPublish) return PublishState.WaitingForUser;
if (!publishing) return PublishState.Starting;
// if (!publishing) return PublishState.Starting;
return PublishState.Publishing;
},
),
@@ -613,9 +640,9 @@ export const createLocalMembership$ = ({
requestJoinAndPublish,
requestDisconnect,
localMemberState$,
tracks$,
participant$,
reconnecting$,
connected$: matrixAndLivekitConnected$,
disconnected$: scope.behavior(
homeserverConnected.rtsSession$.pipe(
map((state) => state === RTCSessionStatus.Disconnected),

View File

@@ -5,59 +5,320 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { afterEach, beforeEach, describe, expect, it, test, vi } from "vitest";
import {
afterEach,
beforeEach,
describe,
expect,
it,
type Mock,
vi,
} from "vitest";
import { ConnectionState as LivekitConenctionState } from "livekit-client";
import { type BehaviorSubject } from "rxjs";
ConnectionState as LivekitConnectionState,
LocalParticipant,
type LocalTrack,
type LocalTrackPublication,
ParticipantEvent,
Track,
} from "livekit-client";
import { BehaviorSubject } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior";
import {
flushPromises,
mockLivekitRoom,
mockLocalParticipant,
mockMediaDevices,
} from "../../../utils/test";
import { Publisher } from "./Publisher";
import { type Connection } from "../remoteMembers/Connection";
import { type MuteStates } from "../../MuteStates";
describe("Publisher", () => {
let scope: ObservableScope;
let connection: Connection;
let muteStates: MuteStates;
beforeEach(() => {
muteStates = {
audio: {
enabled$: constant(false),
unsetHandler: vi.fn(),
setHandler: vi.fn(),
},
video: {
enabled$: constant(false),
unsetHandler: vi.fn(),
setHandler: vi.fn(),
},
} as unknown as MuteStates;
scope = new ObservableScope();
connection = {
state$: constant(LivekitConenctionState.Connected),
livekitRoom: mockLivekitRoom({
localParticipant: mockLocalParticipant({}),
}),
} as unknown as Connection;
let scope: ObservableScope;
beforeEach(() => {
scope = new ObservableScope();
});
afterEach(() => scope.end());
function createMockLocalTrack(source: Track.Source): LocalTrack {
const track = {
source,
isMuted: false,
isUpstreamPaused: false,
} as Partial<LocalTrack> as LocalTrack;
vi.mocked(track).mute = vi.fn().mockImplementation(() => {
track.isMuted = true;
});
vi.mocked(track).unmute = vi.fn().mockImplementation(() => {
track.isMuted = false;
});
vi.mocked(track).pauseUpstream = vi.fn().mockImplementation(() => {
// @ts-expect-error - for that test we want to set isUpstreamPaused directly
track.isUpstreamPaused = true;
});
vi.mocked(track).resumeUpstream = vi.fn().mockImplementation(() => {
// @ts-expect-error - for that test we want to set isUpstreamPaused directly
track.isUpstreamPaused = false;
});
afterEach(() => scope.end());
return track;
}
it("throws if livekit room could not publish", async () => {
function createMockMuteState(enabled$: BehaviorSubject<boolean>): {
enabled$: BehaviorSubject<boolean>;
setHandler: (h: (enabled: boolean) => void) => void;
unsetHandler: () => void;
} {
let currentHandler = (enabled: boolean): void => {};
const ms = {
enabled$,
setHandler: vi.fn().mockImplementation((h: (enabled: boolean) => void) => {
currentHandler = h;
}),
unsetHandler: vi.fn().mockImplementation(() => {
currentHandler = (enabled: boolean): void => {};
}),
};
// forward enabled$ emissions to the current handler
enabled$.subscribe((enabled) => {
logger.info(`MockMuteState: enabled changed to ${enabled}`);
currentHandler(enabled);
});
return ms;
}
let connection: Connection;
let muteStates: MuteStates;
let localParticipant: LocalParticipant;
let audioEnabled$: BehaviorSubject<boolean>;
let videoEnabled$: BehaviorSubject<boolean>;
let trackPublications: LocalTrackPublication[];
// use it to control when track creation resolves, default to resolved
let createTrackLock: Promise<void>;
beforeEach(() => {
trackPublications = [];
audioEnabled$ = new BehaviorSubject(false);
videoEnabled$ = new BehaviorSubject(false);
createTrackLock = Promise.resolve();
muteStates = {
audio: createMockMuteState(audioEnabled$),
video: createMockMuteState(videoEnabled$),
} as unknown as MuteStates;
const mockSendDataPacket = vi.fn();
const mockEngine = {
client: {
sendUpdateLocalMetadata: vi.fn(),
},
on: vi.fn().mockReturnThis(),
sendDataPacket: mockSendDataPacket,
};
localParticipant = new LocalParticipant(
"local-sid",
"local-identity",
// @ts-expect-error - for that test we want a real LocalParticipant to have the pending publications logic
mockEngine,
{
adaptiveStream: true,
dynacase: false,
audioCaptureDefaults: {},
videoCaptureDefaults: {},
stopLocalTrackOnUnpublish: true,
reconnectPolicy: "always",
disconnectOnPageLeave: true,
},
new Map(),
{},
);
vi.mocked(localParticipant).createTracks = vi
.fn()
.mockImplementation(async (opts) => {
const tracks: LocalTrack[] = [];
if (opts.audio) {
tracks.push(createMockLocalTrack(Track.Source.Microphone));
}
if (opts.video) {
tracks.push(createMockLocalTrack(Track.Source.Camera));
}
await createTrackLock;
return tracks;
});
vi.mocked(localParticipant).publishTrack = vi
.fn()
.mockImplementation(async (track: LocalTrack) => {
const pub = {
track,
source: track.source,
mute: track.mute,
unmute: track.unmute,
} as Partial<LocalTrackPublication> as LocalTrackPublication;
trackPublications.push(pub);
localParticipant.emit(ParticipantEvent.LocalTrackPublished, pub);
return Promise.resolve(pub);
});
vi.mocked(localParticipant).getTrackPublication = vi
.fn()
.mockImplementation((source: Track.Source) => {
return trackPublications.find((pub) => pub.track?.source === source);
});
connection = {
state$: constant({
state: "ConnectedToLkRoom",
livekitConnectionState$: constant(LivekitConnectionState.Connected),
}),
livekitRoom: mockLivekitRoom({
localParticipant: localParticipant,
}),
} as unknown as Connection;
});
describe("Publisher", () => {
let publisher: Publisher;
beforeEach(() => {
publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
constant({ supported: false, processor: undefined }),
logger,
);
});
afterEach(() => {});
it("Should not create tracks if started muted to avoid unneeded permission requests", async () => {
const createTracksSpy = vi.spyOn(
connection.livekitRoom.localParticipant,
"createTracks",
);
audioEnabled$.next(false);
videoEnabled$.next(false);
await publisher.createAndSetupTracks();
expect(createTracksSpy).not.toHaveBeenCalled();
});
it("Should minimize permission request by querying create at once", async () => {
const enableCameraAndMicrophoneSpy = vi.spyOn(
localParticipant,
"enableCameraAndMicrophone",
);
const createTracksSpy = vi.spyOn(localParticipant, "createTracks");
audioEnabled$.next(true);
videoEnabled$.next(true);
await publisher.createAndSetupTracks();
await flushPromises();
expect(enableCameraAndMicrophoneSpy).toHaveBeenCalled();
// It should create both at once
expect(createTracksSpy).toHaveBeenCalledWith({
audio: true,
video: true,
});
});
it("Ensure no data is streamed until publish has been called", async () => {
audioEnabled$.next(true);
await publisher.createAndSetupTracks();
// The track should be created and paused
expect(localParticipant.createTracks).toHaveBeenCalledWith({
audio: true,
video: undefined,
});
await flushPromises();
expect(localParticipant.publishTrack).toHaveBeenCalled();
await flushPromises();
const track = localParticipant.getTrackPublication(
Track.Source.Microphone,
)?.track;
expect(track).toBeDefined();
expect(track!.pauseUpstream).toHaveBeenCalled();
expect(track!.isUpstreamPaused).toBe(true);
});
it("Ensure resume upstream when published is called", async () => {
videoEnabled$.next(true);
await publisher.createAndSetupTracks();
// await flushPromises();
await publisher.startPublishing();
const track = localParticipant.getTrackPublication(
Track.Source.Camera,
)?.track;
expect(track).toBeDefined();
// expect(track.pauseUpstream).toHaveBeenCalled();
expect(track!.isUpstreamPaused).toBe(false);
});
describe("Mute states", () => {
let publisher: Publisher;
beforeEach(() => {
publisher = new Publisher(
scope,
connection,
mockMediaDevices({}),
muteStates,
constant({ supported: false, processor: undefined }),
logger,
);
});
test.each([
{ mutes: { audioEnabled: true, videoEnabled: false } },
{ mutes: { audioEnabled: true, videoEnabled: false } },
])("only create the tracks that are unmuted $mutes", async ({ mutes }) => {
// Ensure all muted
audioEnabled$.next(mutes.audioEnabled);
videoEnabled$.next(mutes.videoEnabled);
vi.mocked(connection.livekitRoom.localParticipant).createTracks = vi
.fn()
.mockResolvedValue([]);
await publisher.createAndSetupTracks();
expect(
connection.livekitRoom.localParticipant.createTracks,
).toHaveBeenCalledOnce();
expect(
connection.livekitRoom.localParticipant.createTracks,
).toHaveBeenCalledWith({
audio: mutes.audioEnabled ? true : undefined,
video: mutes.videoEnabled ? true : undefined,
});
});
});
it("does mute unmute audio", async () => {});
});
describe("Bug fix", () => {
// There is a race condition when creating and publishing tracks while the mute state changes.
// This race condition could cause tracks to be published even though they are muted at the
// beginning of a call coming from lobby.
// This is caused by our stack using manually the low level API to create and publish tracks,
// but also using the higher level setMicrophoneEnabled and setCameraEnabled functions that also create
// and publish tracks, and managing pending publications.
// Race is as follow, on creation of the Publisher we create the tracks then publish them.
// If in the middle of that process the mute state changes:
// - the `setMicrophoneEnabled` will be no-op because it is not aware of our created track and can't see any pending publication
// - If start publication is requested it will publish the track even though there was a mute request.
it("wrongly publish tracks while muted", async () => {
// setLogLevel(`debug`);
const publisher = new Publisher(
scope,
connection,
@@ -66,56 +327,34 @@ describe("Publisher", () => {
constant({ supported: false, processor: undefined }),
logger,
);
audioEnabled$.next(true);
// should do nothing if no tracks have been created yet.
await publisher.startPublishing();
expect(
connection.livekitRoom.localParticipant.publishTrack,
).not.toHaveBeenCalled();
const resolvers = Promise.withResolvers<void>();
createTrackLock = resolvers.promise;
await expect(publisher.createAndSetupTracks()).rejects.toThrow(
Error("audio and video is false"),
);
// Initially the audio is unmuted, so creating tracks should publish the audio track
const createTracks = publisher.createAndSetupTracks();
void publisher.startPublishing();
void createTracks.then(() => {
void publisher.startPublishing();
});
// now mute the audio before allowing track creation to complete
audioEnabled$.next(false);
resolvers.resolve(undefined);
await createTracks;
(muteStates.audio.enabled$ as BehaviorSubject<boolean>).next(true);
await flushPromises();
(
connection.livekitRoom.localParticipant.createTracks as Mock
).mockResolvedValue([{}, {}]);
const track = localParticipant.getTrackPublication(
Track.Source.Microphone,
)?.track;
expect(track).toBeDefined();
await expect(publisher.createAndSetupTracks()).resolves.not.toThrow();
expect(
connection.livekitRoom.localParticipant.createTracks,
).toHaveBeenCalledOnce();
// failiour due to localParticipant.publishTrack
(
connection.livekitRoom.localParticipant.publishTrack as Mock
).mockRejectedValue(Error("testError"));
await expect(publisher.startPublishing()).rejects.toThrow(
new Error("testError"),
);
// does not try other conenction after the first one failed
expect(
connection.livekitRoom.localParticipant.publishTrack,
).toHaveBeenCalledTimes(1);
// does not try other conenction after the first one failed
expect(
connection.livekitRoom.localParticipant.publishTrack,
).toHaveBeenCalledTimes(1);
// success case
(
connection.livekitRoom.localParticipant.publishTrack as Mock
).mockResolvedValue({});
await expect(publisher.startPublishing()).resolves.not.toThrow();
expect(
connection.livekitRoom.localParticipant.publishTrack,
).toHaveBeenCalledTimes(3);
try {
expect(localParticipant.publishTrack).not.toHaveBeenCalled();
} catch {
expect(track!.mute).toHaveBeenCalled();
expect(track!.isMuted).toBe(true);
}
});
});

View File

@@ -6,15 +6,14 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
ConnectionState as LivekitConnectionState,
type LocalTrackPublication,
LocalVideoTrack,
ParticipantEvent,
type Room as LivekitRoom,
Track,
type LocalTrack,
type LocalTrackPublication,
ConnectionState as LivekitConnectionState,
} from "livekit-client";
import {
BehaviorSubject,
map,
NEVER,
type Observable,
@@ -41,14 +40,21 @@ import { type ObservableScope } from "../../ObservableScope.ts";
* The Publisher is also responsible for creating the media tracks.
*/
export class Publisher {
/**
* By default, livekit will start publishing tracks as soon as they are created.
* In the matrix RTC world, we want to control when tracks are published based
* on whether the user is part of the RTC session or not.
*/
public shouldPublish = false;
/**
* Creates a new Publisher.
* @param scope - The observable scope to use for managing the publisher.
* @param connection - The connection to use for publishing.
* @param devices - The media devices to use for audio and video input.
* @param muteStates - The mute states for audio and video.
* @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!.
* @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur).
* @param logger - The logger to use for logging :D.
*/
public constructor(
private scope: ObservableScope,
@@ -58,7 +64,6 @@ export class Publisher {
trackerProcessorState$: Behavior<ProcessorState>,
private logger: Logger,
) {
this.logger.info("Create LiveKit room");
const { controlledAudioDevices } = getUrlParams();
const room = connection.livekitRoom;
@@ -76,41 +81,63 @@ export class Publisher {
this.scope.onEnd(() => {
this.logger.info("Scope ended -> stop publishing all tracks");
void this.stopPublishing();
muteStates.audio.unsetHandler();
muteStates.video.unsetHandler();
});
// TODO move mute state handling here using reconcile (instead of inside the mute state class)
// this.scope.reconcile(
// this.scope.behavior(
// combineLatest([this.muteStates.video.enabled$, this.tracks$]),
// ),
// async ([videoEnabled, tracks]) => {
// const track = tracks.find((t) => t.kind == Track.Kind.Video);
// if (!track) return;
// if (videoEnabled) {
// await track.unmute();
// } else {
// await track.mute();
// }
// },
// );
this.connection.livekitRoom.localParticipant.on(
ParticipantEvent.LocalTrackPublished,
this.onLocalTrackPublished.bind(this),
);
}
private _tracks$ = new BehaviorSubject<LocalTrack<Track.Kind>[]>([]);
public tracks$ = this._tracks$ as Behavior<LocalTrack<Track.Kind>[]>;
// LiveKit will publish the tracks as soon as they are created
// but we want to control when tracks are published.
// We cannot just mute the tracks, even if this will effectively stop the publishing,
// it would also prevent the user from seeing their own video/audio preview.
// So for that we use pauseUpStream(): Stops sending media to the server by replacing
// the sender track with null, but keeps the local MediaStreamTrack active.
// The user can still see/hear themselves locally, but remote participants see nothing.
private onLocalTrackPublished(
localTrackPublication: LocalTrackPublication,
): void {
this.logger.info("Local track published", localTrackPublication);
const lkRoom = this.connection.livekitRoom;
if (!this.shouldPublish) {
this.pauseUpstreams(lkRoom, [localTrackPublication.source]).catch((e) => {
this.logger.error(`Failed to pause upstreams`, e);
});
}
// also check the mute state and apply it
if (localTrackPublication.source === Track.Source.Microphone) {
const enabled = this.muteStates.audio.enabled$.value;
lkRoom.localParticipant.setMicrophoneEnabled(enabled).catch((e) => {
this.logger.error(
`Failed to enable microphone track, enabled:${enabled}`,
e,
);
});
} else if (localTrackPublication.source === Track.Source.Camera) {
const enabled = this.muteStates.video.enabled$.value;
lkRoom.localParticipant.setCameraEnabled(enabled).catch((e) => {
this.logger.error(
`Failed to enable camera track, enabled:${enabled}`,
e,
);
});
}
}
/**
* Start the connection to LiveKit and publish local tracks.
* Create and setup local audio and video tracks based on the current mute states.
* It creates the tracks only if audio and/or video is enabled, to avoid unnecessary
* permission prompts.
*
* This will:
* wait for the connection to be ready.
// * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.)
// * 2. Use this token to request the SFU config to the MatrixRtc authentication service.
// * 3. Connect to the configured LiveKit room.
// * 4. Create local audio and video tracks based on the current mute states and publish them to the room.
* It also observes mute state changes to update LiveKit microphone/camera states accordingly.
* If a track is not created initially because disabled, it will be created when unmuting.
*
* This call is not blocking anymore, instead callers can listen to the
* `RoomEvent.MediaDevicesError` event in the LiveKit room to be notified of any errors.
*
* @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection.
* @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created.
*/
public async createAndSetupTracks(): Promise<void> {
this.logger.debug("createAndSetupTracks called");
@@ -118,119 +145,121 @@ export class Publisher {
// Observe mute state changes and update LiveKit microphone/camera states accordingly
this.observeMuteStates(this.scope);
// TODO-MULTI-SFU: Prepublish a microphone track
// Check if audio and/or video is enabled. We only create tracks if enabled,
// because it could prompt for permission, and we don't want to do that unnecessarily.
const audio = this.muteStates.audio.enabled$.value;
const video = this.muteStates.video.enabled$.value;
// createTracks throws if called with audio=false and video=false
if (audio || video) {
// TODO this can still throw errors? It will also prompt for permissions if not already granted
return lkRoom.localParticipant
.createTracks({
audio,
video,
})
.then((tracks) => {
this.logger.info(
"created track",
tracks.map((t) => t.kind + ", " + t.id),
);
this._tracks$.next(tracks);
})
.catch((error) => {
this.logger.error("Failed to create tracks", error);
});
// We don't await the creation, because livekit could block until the tracks
// are fully published, and not only that they are created.
// We don't have control on that, localParticipant creates and publishes the tracks
// asap.
// We are using the `ParticipantEvent.LocalTrackPublished` to be notified
// when tracks are actually published, and at that point
// we can pause upstream if needed (depending on if startPublishing has been called).
if (audio && video) {
// Enable both at once in order to have a single permission prompt!
void lkRoom.localParticipant.enableCameraAndMicrophone();
} else if (audio) {
void lkRoom.localParticipant.setMicrophoneEnabled(true);
} else if (video) {
void lkRoom.localParticipant.setCameraEnabled(true);
}
return Promise.resolve();
}
private async pauseUpstreams(
lkRoom: LivekitRoom,
sources: Track.Source[],
): Promise<void> {
for (const source of sources) {
const track = lkRoom.localParticipant.getTrackPublication(source)?.track;
if (track) {
await track.pauseUpstream();
} else {
this.logger.warn(
`No track found for source ${source} to pause upstream`,
);
}
}
}
private async resumeUpstreams(
lkRoom: LivekitRoom,
sources: Track.Source[],
): Promise<void> {
for (const source of sources) {
const track = lkRoom.localParticipant.getTrackPublication(source)?.track;
if (track) {
await track.resumeUpstream();
} else {
this.logger.warn(
`No track found for source ${source} to resume upstream`,
);
}
}
throw Error("audio and video is false");
}
private _publishing$ = new BehaviorSubject<boolean>(false);
public publishing$ = this.scope.behavior(this._publishing$);
/**
*
* Request to publish local tracks to the LiveKit room.
* This will wait for the connection to be ready before publishing.
* Livekit also have some local retry logic for publishing tracks.
* Can be called multiple times, localparticipant manages the state of published tracks (or pending publications).
*
* @returns
* @throws ElementCallError
*/
public async startPublishing(): Promise<LocalTrack[]> {
public async startPublishing(): Promise<void> {
if (this.shouldPublish) {
this.logger.debug(`Already publishing, ignoring startPublishing call`);
return;
}
this.shouldPublish = true;
this.logger.debug("startPublishing called");
const lkRoom = this.connection.livekitRoom;
// we do not need to do this since lk will wait in `localParticipant.publishTrack`
// const { promise, resolve, reject } = Promise.withResolvers<void>();
// const sub = this.connection.state$.subscribe((state) => {
// if (state instanceof Error) {
// const error =
// state instanceof ElementCallError
// ? state
// : new FailToStartLivekitConnection(state.message);
// reject(error);
// } else if (state === ConnectionState.LivekitConnected) {
// resolve();
// } else {
// this.logger.info("waiting for connection: ", state);
// }
// });
// try {
// await promise;
// } catch (e) {
// throw e;
// } finally {
// sub.unsubscribe();
// }
for (const track of this.tracks$.value) {
this.logger.info("publish ", this.tracks$.value.length, "tracks");
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
// with a timeout.
await lkRoom.localParticipant.publishTrack(track).catch((error) => {
this.logger.error("Failed to publish track", error);
// throw new FailToStartLivekitConnection(
// error instanceof Error ? error.message : error,
// );
throw error;
});
this.logger.info("published track ", track.kind, track.id);
// TODO: check if the connection is still active? and break the loop if not?
// Resume upstream for both audio and video tracks
// We need to call it explicitly because call setTrackEnabled does not always
// resume upstream. It will only if you switch the track from disabled to enabled,
// but if the track is already enabled but upstream is paused, it won't resume it.
// TODO what about screen share?
try {
await this.resumeUpstreams(lkRoom, [
Track.Source.Microphone,
Track.Source.Camera,
]);
} catch (e) {
this.logger.error(`Failed to resume upstreams`, e);
}
this._publishing$.next(true);
return this.tracks$.value;
}
public async stopPublishing(): Promise<void> {
this.logger.debug("stopPublishing called");
// TODO-MULTI-SFU: Move these calls back to ObservableScope.onEnd once scope
// actually has the right lifetime
this.muteStates.audio.unsetHandler();
this.muteStates.video.unsetHandler();
const localParticipant = this.connection.livekitRoom.localParticipant;
const tracks: LocalTrack[] = [];
const addToTracksIfDefined = (p: LocalTrackPublication): void => {
if (p.track !== undefined) tracks.push(p.track);
};
localParticipant.trackPublications.forEach(addToTracksIfDefined);
this.logger.debug(
"list of tracks to unpublish:",
tracks.map((t) => t.kind + ", " + t.id),
"start unpublishing now",
);
await localParticipant.unpublishTracks(tracks).catch((error) => {
this.logger.error("Failed to unpublish tracks", error);
throw error;
});
this.logger.debug(
"unpublished tracks",
tracks.map((t) => t.kind + ", " + t.id),
);
this._publishing$.next(false);
this.shouldPublish = false;
// Pause upstream will stop sending media to the server, while keeping
// the local MediaStreamTrack active, so the user can still see themselves.
await this.pauseUpstreams(this.connection.livekitRoom, [
Track.Source.Microphone,
Track.Source.Camera,
Track.Source.ScreenShare,
]);
}
/**
* Stops all tracks that are currently running
*/
public stopTracks(): void {
this.tracks$.value.forEach((t) => t.stop());
this._tracks$.next([]);
public async stopTracks(): Promise<void> {
const lkRoom = this.connection.livekitRoom;
for (const source of [
Track.Source.Microphone,
Track.Source.Camera,
Track.Source.ScreenShare,
]) {
const localPub = lkRoom.localParticipant.getTrackPublication(source);
if (localPub?.track) {
// stops and unpublishes the track
await lkRoom.localParticipant.unpublishTrack(localPub!.track, true);
}
}
}
/// Private methods
@@ -332,17 +361,31 @@ export class Publisher {
*/
private observeMuteStates(scope: ObservableScope): void {
const lkRoom = this.connection.livekitRoom;
this.muteStates.audio.setHandler(async (desired) => {
this.muteStates.audio.setHandler(async (enable) => {
try {
await lkRoom.localParticipant.setMicrophoneEnabled(desired);
this.logger.debug(
`handler: Setting LiveKit microphone enabled: ${enable}`,
);
await lkRoom.localParticipant.setMicrophoneEnabled(enable);
// Unmute will restart the track if it was paused upstream,
// but until explicitly requested, we want to keep it paused.
if (!this.shouldPublish && enable) {
await this.pauseUpstreams(lkRoom, [Track.Source.Microphone]);
}
} catch (e) {
this.logger.error("Failed to update LiveKit audio input mute state", e);
}
return lkRoom.localParticipant.isMicrophoneEnabled;
});
this.muteStates.video.setHandler(async (desired) => {
this.muteStates.video.setHandler(async (enable) => {
try {
await lkRoom.localParticipant.setCameraEnabled(desired);
this.logger.debug(`handler: Setting LiveKit camera enabled: ${enable}`);
await lkRoom.localParticipant.setCameraEnabled(enable);
// Unmute will restart the track if it was paused upstream,
// but until explicitly requested, we want to keep it paused.
if (!this.shouldPublish && enable) {
await this.pauseUpstreams(lkRoom, [Track.Source.Camera]);
}
} catch (e) {
this.logger.error("Failed to update LiveKit video input mute state", e);
}

View File

@@ -393,7 +393,7 @@ describe("remote participants", () => {
// livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection.
const participants: RemoteParticipant[] = [
let participants: RemoteParticipant[] = [
mockRemoteParticipant({ identity: "@alice:example.org:DEV000" }),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
mockRemoteParticipant({ identity: "@carol:example.org:DEV222" }),
@@ -415,7 +415,22 @@ describe("remote participants", () => {
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
);
// All remote participants should be present
// At this point there should be ~~no~~ publishers
// We do have publisher now, since we do not filter for publishers anymore (to also have participants with only data tracks)
// The filtering we do is just based on the matrixRTC member events.
expect(observedParticipants.pop()!.length).toEqual(4);
participants = [
mockRemoteParticipant({ identity: "@alice:example.org:DEV000" }),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
mockRemoteParticipant({ identity: "@carol:example.org:DEV222" }),
mockRemoteParticipant({ identity: "@dan:example.org:DEV333" }),
];
participants.forEach((p) =>
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
);
// At this point there should be no publishers
expect(observedParticipants.pop()!.length).toEqual(4);
});

View File

@@ -223,7 +223,7 @@ export class Connection {
public constructor(opts: ConnectionOpts, logger: Logger) {
this.logger = logger.getChild("[Connection]");
this.logger.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
`Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope } = opts;

View File

@@ -14,7 +14,8 @@ import {
type BaseE2EEManager,
} from "livekit-client";
import { type Logger } from "matrix-js-sdk/lib/logger";
import E2EEWorker from "livekit-client/e2ee-worker?worker";
// imported as inline to support worker when loaded from a cdn (cross domain)
import E2EEWorker from "livekit-client/e2ee-worker?worker&inline";
import { type ObservableScope } from "../../ObservableScope.ts";
import { Connection } from "./Connection.ts";

View File

@@ -285,47 +285,47 @@ describe("connectionManagerData$ stream", () => {
a: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(0);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
return true;
}),
b: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
return true;
}),
c: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
expect(
data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
).toBe("user2A");
return true;
}),
d: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe(
"user1B",
);
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(2);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
expect(
data.getParticipantsForTransport(TRANSPORT_1)[1].identity,
).toBe("user1B");
expect(
data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
).toBe("user2A");
return true;
}),
});

View File

@@ -19,8 +19,10 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
new Map();
private readonly store: Map<
string,
{ connection: Connection; participants: RemoteParticipant[] }
> = new Map();
public constructor() {}
@@ -28,9 +30,9 @@ export class ConnectionManagerData {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
this.store.set(key, [connection, participants]);
this.store.set(key, { connection, participants });
} else {
existing[1].push(...participants);
existing.participants.push(...participants);
}
}
@@ -39,20 +41,24 @@ export class ConnectionManagerData {
}
public getConnections(): Connection[] {
return Array.from(this.store.values()).map(([connection]) => connection);
return Array.from(this.store.values()).map(({ connection }) => connection);
}
public getConnectionForTransport(
transport: LivekitTransport,
): Connection | null {
return this.store.get(this.getKey(transport))?.[0] ?? null;
return this.store.get(this.getKey(transport))?.connection ?? null;
}
public getParticipantForTransport(
public getParticipantsForTransport(
transport: LivekitTransport,
): RemoteParticipant[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
return this.store.get(key)?.[1] ?? [];
const existing = this.store.get(key);
if (existing) {
return existing.participants;
}
return [];
}
}
@@ -162,6 +168,7 @@ export function createConnectionManager$({
);
// probably not required
if (listOfConnectionsWithRemoteParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}

View File

@@ -91,7 +91,7 @@ test("should signal participant not yet connected to livekit", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -99,21 +99,24 @@ test("should signal participant not yet connected to livekit", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
},
);
});
});
@@ -171,7 +174,7 @@ test("should signal participant on a connection that is publishing", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -179,25 +182,28 @@ test("should signal participant on a connection that is publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
});
});
@@ -222,7 +228,7 @@ test("should signal participant on a connection that is not publishing", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -230,21 +236,24 @@ test("should signal participant on a connection that is not publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant.value$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
});
});
@@ -283,7 +292,7 @@ describe("Publication edge case", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -293,7 +302,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
@@ -349,7 +358,7 @@ describe("Publication edge case", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -359,7 +368,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {

View File

@@ -110,7 +110,7 @@ export function createMatrixLivekitMembers$({
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
const participants = transport
? managerData.getParticipantForTransport(transport)
? managerData.getParticipantsForTransport(transport)
: [];
const participant =
participants.find((p) => p.identity == participantId) ?? null;

View File

@@ -124,14 +124,14 @@ test("bob, carl, then bob joining no tracks yet", () => {
logger: logger,
});
const matrixLivekitItems$ = createMatrixLivekitMembers$({
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager,
});
expectObservable(matrixLivekitItems$).toBe(vMarble, {
expectObservable(matrixLivekitMembers$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(1);

View File

@@ -311,6 +311,8 @@ export function mockLocalParticipant(
publishTrack: vi.fn(),
unpublishTracks: vi.fn().mockResolvedValue([]),
createTracks: vi.fn(),
setMicrophoneEnabled: vi.fn(),
setCameraEnabled: vi.fn(),
getTrackPublication: () =>
({}) as Partial<LocalTrackPublication> as LocalTrackPublication,
...mockEmitter(),

View File

@@ -64,6 +64,12 @@ export const widget = ((): WidgetHelpers | null => {
try {
const { widgetId, parentUrl } = getUrlParams();
const { roomId, userId, deviceId, baseUrl, e2eEnabled, allowIceFallback } =
getUrlParams();
if (!roomId) throw new Error("Room ID must be supplied");
if (!userId) throw new Error("User ID must be supplied");
if (!deviceId) throw new Error("Device ID must be supplied");
if (!baseUrl) throw new Error("Base URL must be supplied");
if (widgetId && parentUrl) {
const parentOrigin = new URL(parentUrl).origin;
logger.info("Widget API is available");
@@ -92,19 +98,6 @@ export const widget = ((): WidgetHelpers | null => {
// We need to do this now rather than later because it has capabilities to
// request, and is responsible for starting the transport (should it be?)
const {
roomId,
userId,
deviceId,
baseUrl,
e2eEnabled,
allowIceFallback,
} = getUrlParams();
if (!roomId) throw new Error("Room ID must be supplied");
if (!userId) throw new Error("User ID must be supplied");
if (!deviceId) throw new Error("Device ID must be supplied");
if (!baseUrl) throw new Error("Base URL must be supplied");
// These are all the event types the app uses
const sendEvent = [
EventType.CallNotify, // Sent as a deprecated fallback

View File

@@ -50,6 +50,11 @@
"plugins": [{ "name": "typescript-eslint-language-service" }]
},
"include": ["./src/**/*.ts", "./src/**/*.tsx", "./playwright/**/*.ts"],
"include": [
"./src/**/*.ts",
"./src/**/*.tsx",
"./playwright/**/*.ts",
"./sdk/**/*.ts"
],
"exclude": ["**.test.ts"]
}

28
vite-sdk.config.ts Normal file
View File

@@ -0,0 +1,28 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { defineConfig, mergeConfig } from "vite";
import nodePolyfills from "vite-plugin-node-stdlib-browser";
const base = "./";
// Config for embedded deployments (possibly hosted under a non-root path)
export default defineConfig(() => ({
worker: { format: "es" as const },
base, // Use relative URLs to allow the app to be hosted under any path
build: {
sourcemap: true,
manifest: true,
lib: {
formats: ["es" as const],
entry: "./sdk/main.ts",
name: "MatrixrtcSdk",
fileName: "matrixrtc-sdk",
},
},
plugins: [nodePolyfills()],
}));

View File

@@ -7,14 +7,17 @@ Please see LICENSE in the repository root for full details.
import {
loadEnv,
PluginOption,
searchForWorkspaceRoot,
type ConfigEnv,
type UserConfig,
} from "vite";
import svgrPlugin from "vite-plugin-svgr";
import { createHtmlPlugin } from "vite-plugin-html";
import { codecovVitePlugin } from "@codecov/vite-plugin";
import { sentryVitePlugin } from "@sentry/vite-plugin";
import react from "@vitejs/plugin-react";
import { realpathSync } from "fs";
import * as fs from "node:fs";
@@ -31,7 +34,7 @@ export default ({
// In future we might be able to do what is needed via code splitting at
// build time.
process.env.VITE_PACKAGE = packageType ?? "full";
const plugins = [
const plugins: PluginOption[] = [
react(),
svgrPlugin({
svgrOptions: {
@@ -41,16 +44,6 @@ export default ({
},
}),
createHtmlPlugin({
entry: "src/main.tsx",
inject: {
data: {
brand: env.VITE_PRODUCT_NAME || "Element Call",
packageType: process.env.VITE_PACKAGE,
},
},
}),
codecovVitePlugin({
enableBundleAnalysis: process.env.CODECOV_TOKEN !== undefined,
bundleName: "element-call",
@@ -73,6 +66,18 @@ export default ({
);
}
plugins.push(
createHtmlPlugin({
entry: "src/main.tsx",
inject: {
data: {
brand: env.VITE_PRODUCT_NAME || "Element Call",
packageType: process.env.VITE_PACKAGE,
},
},
}),
);
// The crypto WASM module is imported dynamically. Since it's common
// for developers to use a linked copy of matrix-js-sdk or Rust
// crypto (which could reside anywhere on their file system), Vite

867
yarn.lock

File diff suppressed because it is too large Load Diff