Merge pull request #3576 from element-hq/toger5/fix-skipped-tests

Refactor local transport testing and local membership initialization
This commit is contained in:
Timo
2025-11-24 11:36:27 +01:00
committed by GitHub
25 changed files with 1489 additions and 459 deletions

View File

@@ -64,6 +64,14 @@
"developer_mode": {
"always_show_iphone_earpiece": "Show iPhone earpiece option on all platforms",
"crypto_version": "Crypto version: {{version}}",
"custom_livekit_url": {
"current_url": "Currently set to: ",
"from_config": "Currently, no overwrite is set. Url from well-known or config is used.",
"label": "Custom Livekit-url",
"reset": "Reset overwrite",
"save": "Save",
"saving": "Saving..."
},
"debug_tile_layout_label": "Debug tile layout",
"device_id": "Device ID: {{id}}",
"duplicate_tiles_label": "Number of additional tile copies per participant",

View File

@@ -159,7 +159,11 @@ export const widgetTest = test.extend<MyFixtures>({
} = await registerUser(browser, userB);
// Invite the second user
await ewPage1.getByRole("button", { name: "Add", exact: true }).click();
await ewPage1
.getByRole("navigation", { name: "Room list" })
.getByRole("button", { name: "New conversation" })
.click();
await ewPage1.getByRole("menuitem", { name: "New Room" }).click();
await ewPage1.getByRole("textbox", { name: "Name" }).fill("Welcome Room");
await ewPage1.getByRole("button", { name: "Create room" }).click();

View File

@@ -122,7 +122,7 @@ export interface ConfigOptions {
delayed_leave_event_delay_ms?: number;
/**
* The time (in milliseconds) after which a we consider a delayed event restart http request to have failed.
* The time (in milliseconds) after which we consider a delayed event restart http request to have failed.
* Setting this to a lower value will result in more frequent retries but also a higher chance of failiour.
*
* In the presence of network packet loss (hurting TCP connections), the custom delayedEventRestartLocalTimeoutMs

View File

@@ -21,7 +21,16 @@ export type OpenIDClientParts = Pick<
MatrixClient,
"getOpenIdToken" | "getDeviceId"
>;
/**
* Gets a bearer token from the homeserver and then use it to authenticate
* to the matrix RTC backend in order to get acces to the SFU.
* It has built-in retry for calls to the homeserver with a backoff policy.
* @param client
* @param serviceUrl
* @param matrixRoomId
* @returns Object containing the token information
* @throws FailToGetOpenIdToken
*/
export async function getSFUConfigWithOpenID(
client: OpenIDClientParts,
serviceUrl: string,

View File

@@ -22,6 +22,7 @@ import {
WebBrowserIcon,
} from "@vector-im/compound-design-tokens/assets/web/icons";
import { Button } from "@vector-im/compound-web";
import { logger } from "matrix-js-sdk/lib/logger";
import {
ConnectionLostError,
@@ -53,7 +54,7 @@ const ErrorPage: FC<ErrorPageProps> = ({
widget,
}: ErrorPageProps): ReactElement => {
const { t } = useTranslation();
logger.error("Error boundary caught:", error);
let icon: ComponentType<SVGAttributes<SVGElement>>;
switch (error.category) {
case ErrorCategory.CONFIGURATION_ISSUE:

View File

@@ -159,6 +159,7 @@ export const GroupCallView: FC<Props> = ({
};
}, [rtcSession]);
// TODO move this into the callViewModel LocalMembership.ts
useTypedEventEmitter(
rtcSession,
MatrixRTCSessionEvent.MembershipManagerError,

View File

@@ -266,7 +266,7 @@ export const InCallView: FC<InCallViewProps> = ({
const sharingScreen = useBehavior(vm.sharingScreen$);
const ringOverlay = useBehavior(vm.ringOverlay$);
const fatalCallError = useBehavior(vm.configError$);
const fatalCallError = useBehavior(vm.fatalError$);
// Stop the rendering and throw for the error boundary
if (fatalCallError) throw fatalCallError;

View File

@@ -0,0 +1,95 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { describe, expect, it, vi } from "vitest";
import { render, waitFor } from "@testing-library/react";
import type { MatrixClient } from "matrix-js-sdk";
import type { Room as LivekitRoom } from "livekit-client";
import { DeveloperSettingsTab } from "./DeveloperSettingsTab";
// Mock url params hook to avoid environment-dependent snapshot churn.
vi.mock("../UrlParams", () => ({
useUrlParams: (): { mocked: boolean; answer: number } => ({
mocked: true,
answer: 42,
}),
}));
// Provide a minimal mock of a Livekit Room structure used by the component.
function createMockLivekitRoom(
wsUrl: string,
serverInfo: object,
metadata: string,
): { isLocal: boolean; url: string; room: LivekitRoom } {
const mockRoom = {
serverInfo,
metadata,
engine: { client: { ws: { url: wsUrl } } },
} as unknown as LivekitRoom;
return {
isLocal: true,
url: wsUrl,
room: mockRoom,
};
}
// Minimal MatrixClient mock with only the methods used by the component.
function createMockMatrixClient(): MatrixClient {
return {
doesServerSupportUnstableFeature: vi.fn().mockResolvedValue(true), // ensure stickyEventsSupported eventually becomes true
getCrypto: (): { getVersion: () => string } | undefined => ({
getVersion: () => "crypto-1.0.0",
}),
getUserId: () => "@alice:example.org",
getDeviceId: () => "DEVICE123",
} as unknown as MatrixClient;
}
describe("DeveloperSettingsTab", () => {
it("renders and matches snapshot", async () => {
const client = createMockMatrixClient();
const livekitRooms: {
room: LivekitRoom;
url: string;
isLocal?: boolean;
}[] = [
createMockLivekitRoom(
"wss://local-sfu.example.org",
{ region: "local", version: "1.2.3" },
"local-metadata",
),
{
isLocal: false,
url: "wss://remote-sfu.example.org",
room: {
serverInfo: { region: "remote", version: "4.5.6" },
metadata: "remote-metadata",
engine: { client: { ws: { url: "wss://remote-sfu.example.org" } } },
} as unknown as LivekitRoom,
},
];
const { container } = render(
<DeveloperSettingsTab
client={client}
livekitRooms={livekitRooms}
env={{ MY_MOCK_ENV: 10, ENV: "test" } as unknown as ImportMetaEnv}
/>,
);
// Wait for the async sticky events feature check to resolve so the final UI
// (e.g. enabled Matrix_2_0 radio button) appears deterministically.
await waitFor(() =>
expect(client.doesServerSupportUnstableFeature).toHaveBeenCalled(),
);
expect(container).toMatchSnapshot();
});
});

View File

@@ -11,8 +11,8 @@ import {
useCallback,
useEffect,
useMemo,
useState,
useId,
useState,
} from "react";
import { useTranslation } from "react-i18next";
import {
@@ -21,6 +21,7 @@ import {
} from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import {
EditInPlace,
Root as Form,
Heading,
HelpMessage,
@@ -38,6 +39,7 @@ import {
muteAllAudio as muteAllAudioSetting,
alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting,
matrixRTCMode as matrixRTCModeSetting,
customLivekitUrl as customLivekitUrlSetting,
MatrixRTCMode,
} from "./settings";
import type { Room as LivekitRoom } from "livekit-client";
@@ -47,9 +49,14 @@ import { useUrlParams } from "../UrlParams";
interface Props {
client: MatrixClient;
livekitRooms?: { room: LivekitRoom; url: string; isLocal?: boolean }[];
env: ImportMetaEnv;
}
export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
export const DeveloperSettingsTab: FC<Props> = ({
client,
livekitRooms,
env,
}) => {
const { t } = useTranslation();
const [duplicateTiles, setDuplicateTiles] = useSetting(duplicateTilesSetting);
const [debugTileLayout, setDebugTileLayout] = useSetting(
@@ -85,6 +92,15 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
alwaysShowIphoneEarpieceSetting,
);
const [customLivekitUrl, setCustomLivekitUrl] = useSetting(
customLivekitUrlSetting,
);
const [customLivekitUrlTextBuffer, setCustomLivekitUrlTextBuffer] =
useState(customLivekitUrl);
useEffect(() => {
setCustomLivekitUrlTextBuffer(customLivekitUrl);
}, [customLivekitUrl]);
const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting);
const urlParams = useUrlParams();
@@ -101,7 +117,7 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
}, [livekitRooms]);
return (
<Form>
<>
<p>
{t("developer_mode.hostname", {
hostname: window.location.hostname || "unknown",
@@ -200,55 +216,93 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
)}
/>{" "}
</FieldRow>
<EditInPlace
onSubmit={(e) => e.preventDefault()}
helpLabel={
customLivekitUrl === null
? t("developer_mode.custom_livekit_url.from_config")
: t("developer_mode.custom_livekit_url.current_url") +
customLivekitUrl
}
label={t("developer_mode.custom_livekit_url.label")}
saveButtonLabel={t("developer_mode.custom_livekit_url.save")}
savingLabel={t("developer_mode.custom_livekit_url.saving")}
cancelButtonLabel={t("developer_mode.custom_livekit_url.reset")}
onSave={useCallback(
(e: React.FormEvent<HTMLFormElement>) => {
setCustomLivekitUrl(
customLivekitUrlTextBuffer === ""
? null
: customLivekitUrlTextBuffer,
);
},
[setCustomLivekitUrl, customLivekitUrlTextBuffer],
)}
value={customLivekitUrlTextBuffer ?? ""}
onChange={useCallback(
(event: ChangeEvent<HTMLInputElement>): void => {
setCustomLivekitUrlTextBuffer(event.target.value);
},
[setCustomLivekitUrlTextBuffer],
)}
onCancel={useCallback(
(e: React.FormEvent<HTMLFormElement>) => {
setCustomLivekitUrl(null);
},
[setCustomLivekitUrl],
)}
/>
<Heading as="h3" type="body" weight="semibold" size="lg">
{t("developer_mode.matrixRTCMode.title")}
</Heading>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Legacy}
value={MatrixRTCMode.Legacy}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Legacy.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Legacy.description")}
</HelpMessage>
</InlineField>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Compatibil}
value={MatrixRTCMode.Compatibil}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Comptibility.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Comptibility.description")}
</HelpMessage>
</InlineField>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Matrix_2_0}
value={MatrixRTCMode.Matrix_2_0}
disabled={!stickyEventsSupported}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Matrix_2_0.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Matrix_2_0.description")}
</HelpMessage>
</InlineField>
<Form>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Legacy}
value={MatrixRTCMode.Legacy}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Legacy.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Legacy.description")}
</HelpMessage>
</InlineField>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Compatibil}
value={MatrixRTCMode.Compatibil}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Comptibility.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Comptibility.description")}
</HelpMessage>
</InlineField>
<InlineField
name={matrixRTCModeRadioGroup}
control={
<RadioControl
checked={matrixRTCMode === MatrixRTCMode.Matrix_2_0}
value={MatrixRTCMode.Matrix_2_0}
disabled={!stickyEventsSupported}
onChange={onMatrixRTCModeChange}
/>
}
>
<Label>{t("developer_mode.matrixRTCMode.Matrix_2_0.label")}</Label>
<HelpMessage>
{t("developer_mode.matrixRTCMode.Matrix_2_0.description")}
</HelpMessage>
</InlineField>
</Form>
{livekitRooms?.map((livekitRoom) => (
<>
<h3>
@@ -270,9 +324,9 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
</>
))}
<p>{t("developer_mode.environment_variables")}</p>
<pre>{JSON.stringify(import.meta.env, null, 2)}</pre>
<pre>{JSON.stringify(env, null, 2)}</pre>
<p>{t("developer_mode.url_params")}</p>
<pre>{JSON.stringify(urlParams, null, 2)}</pre>
</Form>
</>
);
};

View File

@@ -209,7 +209,11 @@ export const SettingsModal: FC<Props> = ({
key: "developer",
name: t("settings.developer_tab_title"),
content: (
<DeveloperSettingsTab client={client} livekitRooms={livekitRooms} />
<DeveloperSettingsTab
env={import.meta.env}
client={client}
livekitRooms={livekitRooms}
/>
),
};

View File

@@ -0,0 +1,411 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
exports[`DeveloperSettingsTab > renders and matches snapshot 1`] = `
<div>
<p>
Hostname: localhost
</p>
<p>
Element Call version: dev
</p>
<p>
Crypto version: crypto-1.0.0
</p>
<p>
Matrix ID: @alice:example.org
</p>
<p>
Device ID: DEVICE123
</p>
<div
class="fieldRow"
>
<div
class="field inputField"
>
<input
aria-describedby="«r1»"
id="duplicateTiles"
min="0"
type="number"
value="0"
/>
<label
for="duplicateTiles"
>
Number of additional tile copies per participant
</label>
</div>
</div>
<div
class="fieldRow"
>
<div
class="field checkboxField"
>
<input
aria-describedby="«r2»"
id="debugTileLayout"
type="checkbox"
/>
<label
for="debugTileLayout"
>
<div
class="checkbox"
>
<svg
fill="none"
height="24"
stroke="#000"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
viewBox="0 0 24 24"
width="24"
xmlns="http://www.w3.org/2000/svg"
>
<path
d="m20 6-11 11-5-5"
/>
</svg>
</div>
Debug tile layout
</label>
</div>
</div>
<div
class="fieldRow"
>
<div
class="field checkboxField"
>
<input
aria-describedby="«r3»"
id="showConnectionStats"
type="checkbox"
/>
<label
for="showConnectionStats"
>
<div
class="checkbox"
>
<svg
fill="none"
height="24"
stroke="#000"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
viewBox="0 0 24 24"
width="24"
xmlns="http://www.w3.org/2000/svg"
>
<path
d="m20 6-11 11-5-5"
/>
</svg>
</div>
Show connection statistics
</label>
</div>
</div>
<div
class="fieldRow"
>
<div
class="field checkboxField"
>
<input
aria-describedby="«r4»"
id="muteAllAudio"
type="checkbox"
/>
<label
for="muteAllAudio"
>
<div
class="checkbox"
>
<svg
fill="none"
height="24"
stroke="#000"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
viewBox="0 0 24 24"
width="24"
xmlns="http://www.w3.org/2000/svg"
>
<path
d="m20 6-11 11-5-5"
/>
</svg>
</div>
Mute all audio (participants, reactions, join sounds)
</label>
</div>
</div>
<div
class="fieldRow"
>
<div
class="field checkboxField"
>
<input
aria-describedby="«r5»"
id="alwaysShowIphoneEarpiece"
type="checkbox"
/>
<label
for="alwaysShowIphoneEarpiece"
>
<div
class="checkbox"
>
<svg
fill="none"
height="24"
stroke="#000"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
viewBox="0 0 24 24"
width="24"
xmlns="http://www.w3.org/2000/svg"
>
<path
d="m20 6-11 11-5-5"
/>
</svg>
</div>
Show iPhone earpiece option on all platforms
</label>
</div>
</div>
<form
class="_root_19upo_16"
>
<div
class="_field_19upo_26"
>
<label
class="_label_19upo_59"
for="radix-«r6»"
>
Custom Livekit-url
</label>
<div
class="_controls_17lij_8"
>
<input
aria-describedby="radix-«r7»"
class="_control_sqdq4_10"
id="radix-«r6»"
name="input"
title=""
value=""
/>
</div>
<span
class="_message_19upo_85 _help-message_19upo_91"
id="radix-«r7»"
>
Currently, no overwrite is set. Url from well-known or config is used.
</span>
</div>
</form>
<h3
class="_typography_6v6n8_153 _font-body-lg-semibold_6v6n8_74"
>
MatrixRTC mode
</h3>
<form
class="_root_19upo_16"
>
<div
class="_inline-field_19upo_32"
>
<div
class="_inline-field-control_19upo_44"
>
<div
class="_container_1e0uz_10"
>
<input
aria-describedby="radix-«r9» radix-«rb» radix-«rd»"
checked=""
class="_input_1e0uz_18"
id="radix-«r8»"
name="«r0»"
title=""
type="radio"
value="legacy"
/>
<div
class="_ui_1e0uz_19"
/>
</div>
</div>
<div
class="_inline-field-body_19upo_38"
>
<label
class="_label_19upo_59"
for="radix-«r8»"
>
Legacy: state events & oldest membership SFU
</label>
<span
class="_message_19upo_85 _help-message_19upo_91"
id="radix-«r9»"
>
Compatible with old versions of EC that do not support multi SFU
</span>
</div>
</div>
<div
class="_inline-field_19upo_32"
>
<div
class="_inline-field-control_19upo_44"
>
<div
class="_container_1e0uz_10"
>
<input
aria-describedby="radix-«r9» radix-«rb» radix-«rd»"
class="_input_1e0uz_18"
id="radix-«ra»"
name="«r0»"
title=""
type="radio"
value="compatibil"
/>
<div
class="_ui_1e0uz_19"
/>
</div>
</div>
<div
class="_inline-field-body_19upo_38"
>
<label
class="_label_19upo_59"
for="radix-«ra»"
>
Compatibility: state events & multi SFU
</label>
<span
class="_message_19upo_85 _help-message_19upo_91"
id="radix-«rb»"
>
Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)
</span>
</div>
</div>
<div
class="_inline-field_19upo_32"
>
<div
class="_inline-field-control_19upo_44"
>
<div
class="_container_1e0uz_10"
>
<input
aria-describedby="radix-«r9» radix-«rb» radix-«rd»"
class="_input_1e0uz_18"
id="radix-«rc»"
name="«r0»"
title=""
type="radio"
value="matrix_2_0"
/>
<div
class="_ui_1e0uz_19"
/>
</div>
</div>
<div
class="_inline-field-body_19upo_38"
>
<label
class="_label_19upo_59"
for="radix-«rc»"
>
Matrix 2.0: sticky events & multi SFU
</label>
<span
class="_message_19upo_85 _help-message_19upo_91"
id="radix-«rd»"
>
Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later
</span>
</div>
</div>
</form>
<h3>
LiveKit SFU: wss://local-sfu.example.org
</h3>
<p>
ws-url:
wss://local-sfu.example.org/
</p>
<p>
LiveKit Server Info
(
local
)
</p>
<pre
class="pre"
>
{
"region": "local",
"version": "1.2.3"
}
local-metadata
</pre>
<h3>
LiveKit SFU: wss://remote-sfu.example.org
</h3>
<p>
LiveKit Server Info
(
remote
)
</p>
<pre
class="pre"
>
{
"region": "remote",
"version": "4.5.6"
}
remote-metadata
</pre>
<p>
Environment variables
</p>
<pre>
{
"MY_MOCK_ENV": 10,
"ENV": "test"
}
</pre>
<p>
URL parameters
</p>
<pre>
{
"mocked": true,
"answer": 42
}
</pre>
</div>
`;

View File

@@ -134,3 +134,8 @@ export const matrixRTCMode = new Setting<MatrixRTCMode>(
"matrix-rtc-mode",
MatrixRTCMode.Legacy,
);
export const customLivekitUrl = new Setting<string | null>(
"custom-livekit-url",
null,
);

View File

@@ -16,7 +16,10 @@ import { BehaviorSubject } from "rxjs";
* distinction between Behaviors and Observables, see
* https://monoid.dk/post/behaviors-and-streams-why-both/.
*/
export type Behavior<T> = Omit<BehaviorSubject<T>, "next" | "observers">;
export type Behavior<T> = Omit<
BehaviorSubject<T>,
"next" | "observers" | "error"
>;
/**
* Creates a Behavior which never changes in value.

View File

@@ -6,8 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { test, vi, onTestFinished, it, describe, expect } from "vitest";
import EventEmitter from "events";
import { test, vi, onTestFinished, it, describe } from "vitest";
import {
BehaviorSubject,
combineLatest,
@@ -19,12 +18,11 @@ import {
of,
switchMap,
} from "rxjs";
import { SyncState, type MatrixClient } from "matrix-js-sdk";
import { SyncState } from "matrix-js-sdk";
import {
ConnectionState,
type LocalTrackPublication,
type RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import * as ComponentsCore from "@livekit/components-core";
import {
@@ -36,27 +34,18 @@ import {
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { createCallViewModel$ } from "./CallViewModel";
import { type Layout } from "../layout-types.ts";
import {
mockLocalParticipant,
mockMatrixRoom,
mockMatrixRoomMember,
mockRemoteParticipant,
withTestScheduler,
mockRtcMembership,
MockRTCSession,
mockMediaDevices,
mockMuteStates,
mockConfig,
testScope,
mockLivekitRoom,
exampleTransport,
} from "../../utils/test.ts";
import { E2eeType } from "../../e2ee/e2eeType.ts";
import type { RaisedHandInfo, ReactionInfo } from "../../reactions/index.ts";
import {
aliceId,
aliceParticipant,
@@ -71,10 +60,6 @@ import {
import { MediaDevices } from "../MediaDevices.ts";
import { getValue } from "../../utils/observable.ts";
import { type Behavior, constant } from "../Behavior.ts";
import {
type ElementCallError,
MatrixRTCTransportMissingError,
} from "../../utils/errors.ts";
import { withCallViewModel } from "./CallViewModelTestUtils.ts";
vi.mock("rxjs", async (importOriginal) => ({
@@ -245,71 +230,6 @@ function mockRingEvent(
const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
describe("CallViewModel", () => {
// TODO: Restore this test. It requires makeTransport to not be mocked, unlike
// the rest of the tests in this file… what do we do?
it.skip("test missing RTC config error", async () => {
const rtcMemberships$ = new BehaviorSubject<CallMembership[]>([]);
const emitter = new EventEmitter();
const client = vi.mocked<MatrixClient>({
on: emitter.on.bind(emitter),
off: emitter.off.bind(emitter),
getSyncState: vi.fn().mockReturnValue(SyncState.Syncing),
getUserId: vi.fn().mockReturnValue("@user:localhost"),
getUser: vi.fn().mockReturnValue(null),
getDeviceId: vi.fn().mockReturnValue("DEVICE"),
credentials: {
userId: "@user:localhost",
},
getCrypto: vi.fn().mockReturnValue(undefined),
getDomain: vi.fn().mockReturnValue("example.org"),
} as unknown as MatrixClient);
const matrixRoom = mockMatrixRoom({
roomId: "!myRoomId:example.com",
client,
getMember: vi.fn().mockReturnValue(undefined),
});
const fakeRtcSession = new MockRTCSession(matrixRoom).withMemberships(
rtcMemberships$,
);
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({});
const callVM = createCallViewModel$(
testScope(),
fakeRtcSession.asMockedSession(),
matrixRoom,
mockMediaDevices({}),
mockMuteStates(),
{
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
autoLeaveWhenOthersLeft: false,
livekitRoomFactory: (): LivekitRoom =>
mockLivekitRoom({
localParticipant,
disconnect: async () => Promise.resolve(),
setE2EEEnabled: async () => Promise.resolve(),
}),
},
new BehaviorSubject({} as Record<string, RaisedHandInfo>),
new BehaviorSubject({} as Record<string, ReactionInfo>),
constant({ processor: undefined, supported: false }),
);
const failPromise = Promise.withResolvers<ElementCallError>();
callVM.configError$.subscribe((error) => {
if (error) {
failPromise.resolve(error);
}
});
const error = await failPromise.promise;
expect(error).toBeInstanceOf(MatrixRTCTransportMissingError);
});
test("participants are retained during a focus switch", () => {
withTestScheduler(({ behavior, expectObservable }) => {
// Participants disappear on frame 2 and come back on frame 3

View File

@@ -41,7 +41,10 @@ import {
timer,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import {
type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { type IWidgetApiRequest } from "matrix-widget-api";
import {
@@ -94,8 +97,11 @@ import {
} from "../layout-types.ts";
import { type ElementCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts";
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
import {
createLocalMembership$,
enterRTCSession,
LivekitState,
type LocalMemberConnectionState,
} from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
@@ -120,6 +126,8 @@ import {
createMatrixMemberMetadata$,
createRoomMembers$,
} from "./remoteMembers/MatrixMemberMetadata.ts";
import { Publisher } from "./localMember/Publisher.ts";
import { type Connection } from "./remoteMembers/Connection.ts";
const logger = rootLogger.getChild("[CallViewModel]");
//TODO
@@ -230,7 +238,7 @@ export interface CallViewModel {
* This is a fatal error that prevents the call from being created/joined.
* Should render a blocking error screen.
*/
configError$: Behavior<ElementCallError | null>;
fatalError$: Behavior<ElementCallError | null>;
// participants and counts
/**
@@ -357,9 +365,9 @@ export function createCallViewModel$(
reactionsSubject$: Observable<Record<string, ReactionInfo>>,
trackProcessorState$: Behavior<ProcessorState>,
): CallViewModel {
const userId = matrixRoom.client.getUserId()!;
const deviceId = matrixRoom.client.getDeviceId()!;
const client = matrixRoom.client;
const userId = client.getUserId()!;
const deviceId = client.getDeviceId()!;
const livekitKeyProvider = getE2eeKeyProvider(
options.encryptionSystem,
matrixRTCSession,
@@ -393,7 +401,7 @@ export function createCallViewModel$(
const localTransport$ = createLocalTransport$({
scope: scope,
memberships$: memberships$,
client: matrixRoom.client,
client,
roomId: matrixRoom.roomId,
useOldestMember$: scope.behavior(
matrixRTCMode.value$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
@@ -401,7 +409,7 @@ export function createCallViewModel$(
});
const connectionFactory = new ECConnectionFactory(
matrixRoom.client,
client,
mediaDevices,
trackProcessorState$,
livekitKeyProvider,
@@ -446,15 +454,31 @@ export function createCallViewModel$(
const localMembership = createLocalMembership$({
scope: scope,
homeserverConnected$: createHomeserverConnected$(
scope,
client,
matrixRTCSession,
),
muteStates: muteStates,
mediaDevices: mediaDevices,
joinMatrixRTC: async (transport: LivekitTransport) => {
return enterRTCSession(
matrixRTCSession,
transport,
connectOptions$.value,
);
},
createPublisherFactory: (connection: Connection) => {
return new Publisher(
scope,
connection,
mediaDevices,
muteStates,
trackProcessorState$,
);
},
connectionManager: connectionManager,
matrixRTCSession: matrixRTCSession,
matrixRoom: matrixRoom,
localTransport$: localTransport$,
trackProcessorState$: trackProcessorState$,
widget,
options: connectOptions$,
logger: logger.getChild(`[${Date.now()}]`),
});
@@ -1442,7 +1466,14 @@ export function createCallViewModel$(
hoverScreen: (): void => screenHover$.next(),
unhoverScreen: (): void => screenUnhover$.next(),
configError$: localMembership.configError$,
fatalError$: scope.behavior(
localMembership.connectionState.livekit$.pipe(
filter((v) => v.state === LivekitState.Error),
map((s) => s.error),
),
null,
),
participantCount$: participantCount$,
audioParticipants$: audioParticipants$,

View File

@@ -0,0 +1,202 @@
/*
Copyright 2025 Element Creations Ltd.
Copyright 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { EventEmitter } from "events";
import { ClientEvent, SyncState } from "matrix-js-sdk";
import { MembershipManagerEvent, Status } from "matrix-js-sdk/lib/matrixrtc";
import { ObservableScope } from "../../ObservableScope";
import { createHomeserverConnected$ } from "./HomeserverConnected";
/**
* Minimal stub of a Matrix client sufficient for our tests:
```
createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
)
```
*/
class MockMatrixClient extends EventEmitter {
private syncState: SyncState;
public constructor(initial: SyncState) {
super();
this.syncState = initial;
}
public setSyncState(state: SyncState): void {
this.syncState = state;
// Matrix's Sync event in createHomeserverConnected$ expects [SyncState]
this.emit(ClientEvent.Sync, [state]);
}
public getSyncState(): SyncState {
return this.syncState;
}
}
/**
* Minimal stub of MatrixRTCSession (membership manager):
```
createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
)
```
*/
class MockMatrixRTCSession extends EventEmitter {
public membershipStatus: Status;
public probablyLeft: boolean;
public constructor(props: {
membershipStatus: Status;
probablyLeft: boolean;
}) {
super();
this.membershipStatus = props.membershipStatus;
this.probablyLeft = props.probablyLeft;
}
public setMembershipStatus(status: Status): void {
this.membershipStatus = status;
this.emit(MembershipManagerEvent.StatusChanged);
}
public setProbablyLeft(flag: boolean): void {
this.probablyLeft = flag;
this.emit(MembershipManagerEvent.ProbablyLeft);
}
}
describe("createHomeserverConnected$", () => {
let scope: ObservableScope;
let client: MockMatrixClient;
let session: MockMatrixRTCSession;
beforeEach(() => {
scope = new ObservableScope();
client = new MockMatrixClient(SyncState.Error); // start disconnected
session = new MockMatrixRTCSession({
membershipStatus: Status.Disconnected,
probablyLeft: false,
});
});
afterEach(() => {
scope.end();
});
// LLM generated test cases. They are a bit overkill but I improved the mocking so it is
// easy enough to read them so I think they can stay.
it("is false when sync state is not Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
expect(hsConnected$.value).toBe(false);
});
it("remains false while membership status is not Connected even if sync is Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // membership still disconnected
});
it("is false when membership status transitions to Connected but ProbablyLeft is true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Make sync loop OK
client.setSyncState(SyncState.Syncing);
// Indicate probable leave before connection
session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(false);
});
it("becomes true only when all three conditions are satisfied", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// 1. Sync loop connected
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // not yet membership connected
// 2. Membership connected
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); // probablyLeft is false
});
it("drops back to false when sync loop leaves Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Reach connected state
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
// Sync loop error => should flip false
client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false);
});
it("drops back to false when membership status becomes disconnected", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setMembershipStatus(Status.Disconnected);
expect(hsConnected$.value).toBe(false);
});
it("drops to false when ProbablyLeft is emitted after being true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
});
it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
// Simulate clearing the flag (in realistic scenario membership manager would update)
session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true);
});
it("composite sequence reflects each individual failure reason", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session);
// Initially false (sync error + disconnected + not probably left)
expect(hsConnected$.value).toBe(false);
// Fix sync only
client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false);
// Fix membership
session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true);
// Introduce probablyLeft -> false
session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false);
// Restore notProbablyLeft -> true again
session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true);
// Drop sync -> false
client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false);
});
});

View File

@@ -0,0 +1,85 @@
/*
Copyright 2025 Element Creations Ltd.
Copyright 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
MembershipManagerEvent,
Status,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { ClientEvent, type MatrixClient, SyncState } from "matrix-js-sdk";
import { fromEvent, startWith, map, tap, type Observable } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type ObservableScope } from "../../ObservableScope";
import { type Behavior } from "../../Behavior";
import { and$ } from "../../../utils/observable";
import { type NodeStyleEventEmitter } from "../../../utils/test";
/**
* Logger instance (scoped child) for homeserver connection updates.
*/
const logger = rootLogger.getChild("[HomeserverConnected]");
/**
* Behavior representing whether we consider ourselves connected to the Matrix homeserver
* for the purposes of a MatrixRTC session.
*
* Becomes FALSE if ANY sub-condition is fulfilled:
* 1. Sync loop is not in SyncState.Syncing
* 2. membershipStatus !== Status.Connected
* 3. probablyLeft === true
*/
export function createHomeserverConnected$(
scope: ObservableScope,
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
): Behavior<boolean> {
const syncing$ = (
fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]>
).pipe(
startWith([client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
);
const membershipConnected$ = fromEvent(
matrixRTCSession,
MembershipManagerEvent.StatusChanged,
).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
);
// This is basically notProbablyLeft$
//
// probablyLeft is computed by a local timer that mimics the server delayed event.
// If we locally predict our server event timed out. We consider ourselves as probablyLeft
// even though we might not yet have received the delayed event leave.
//
// If that is not the case we certainly still have a valid membership on the matrix network
// independet if the sync currently works.
const certainlyConnected$ = fromEvent(
matrixRTCSession,
MembershipManagerEvent.ProbablyLeft,
).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
);
const connectedCombined$ = and$(
syncing$,
membershipConnected$,
certainlyConnected$,
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
);
return scope.behavior(connectedCombined$);
}

View File

@@ -6,154 +6,224 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import { expect, test, vi } from "vitest";
import {
type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { describe, expect, it, vi } from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import EventEmitter from "events";
import { map } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { MatrixRTCMode } from "../../../settings/settings";
import { mockConfig } from "../../../utils/test";
import { enterRTCSession } from "./LocalMembership";
import {
mockConfig,
mockMuteStates,
withTestScheduler,
} from "../../../utils/test";
import {
createLocalMembership$,
enterRTCSession,
LivekitState,
} from "./LocalMembership";
import { MatrixRTCTransportMissingError } from "../../../utils/errors";
import { Epoch } from "../../ObservableScope";
import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
import { type Publisher } from "./Publisher";
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
vi.mock("../../../UrlParams", () => ({ getUrlParams }));
vi.mock("../../../widget", async (importOriginal) => ({
...(await importOriginal()),
widget: {
api: {
setAlwaysOnScreen: (): void => {},
transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() },
},
lazyActions: new EventEmitter(),
},
}));
test("It joins the correct Session", async () => {
const focusFromOlderMembership = {
type: "livekit",
livekit_service_url: "http://my-oldest-member-service-url.com",
livekit_alias: "my-oldest-member-service-alias",
};
const focusConfigFromWellKnown = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com",
};
const focusConfigFromWellKnown2 = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url2.com",
};
const clientWellKnown = {
"org.matrix.msc4143.rtc_foci": [
focusConfigFromWellKnown,
focusConfigFromWellKnown2,
],
};
mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" },
});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation(
async (domain) => {
if (domain === "example.org") {
return Promise.resolve(clientWellKnown);
}
return Promise.resolve({});
},
);
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership),
getOldestMembership: vi.fn().mockReturnValue({
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]),
}),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
describe("LocalMembership", () => {
describe("enterRTCSession", () => {
it("It joins the correct Session", async () => {
const focusFromOlderMembership = {
type: "livekit",
},
],
undefined,
expect.objectContaining({
manageMediaKeys: true,
useLegacyMemberEvents: false,
}),
);
});
livekit_service_url: "http://my-oldest-member-service-url.com",
livekit_alias: "my-oldest-member-service-alias",
};
test("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
"org.matrix.msc4143.rtc_foci": [
{
const focusConfigFromWellKnown = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com",
},
],
};
const focusConfigFromWellKnown2 = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url2.com",
};
const clientWellKnown = {
"org.matrix.msc4143.rtc_foci": [
focusConfigFromWellKnown,
focusConfigFromWellKnown2,
],
};
mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" },
});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation(
async (domain) => {
if (domain === "example.org") {
return Promise.resolve(clientWellKnown);
}
return Promise.resolve({});
},
);
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership),
getOldestMembership: vi.fn().mockReturnValue({
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]),
}),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
],
undefined,
expect.objectContaining({
manageMediaKeys: true,
useLegacyMemberEvents: false,
}),
);
});
it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
"org.matrix.msc4143.rtc_foci": [
{
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com",
},
],
});
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
});
});
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
const defaultCreateLocalMemberValues = {
options: constant({
encryptMedia: false,
matrixRTCMode: MatrixRTCMode.Matrix_2_0,
}),
matrixRTCSession: {
updateCallIntent: () => {},
leaveRoomSession: () => {},
} as unknown as MatrixRTCSession,
muteStates: mockMuteStates(),
isHomeserverConnected: constant(true),
trackProcessorState$: constant({
supported: false,
processor: undefined,
}),
logger: logger,
createPublisherFactory: (): Publisher => ({}) as unknown as Publisher,
joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected$: constant(true),
};
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
it("throws error on missing RTC config error", () => {
withTestScheduler(({ scope, hot, expectObservable }) => {
const goodTransport = {
livekit_service_url: "other",
} as LivekitTransport;
const localTransport$ = scope.behavior<LivekitTransport>(
hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")),
goodTransport,
);
const mockConnectionManager = {
transports$: scope.behavior(
localTransport$.pipe(map((t) => new Epoch([t]))),
),
connectionManagerData$: constant(
new Epoch(new ConnectionManagerData()),
),
};
const localMembership = createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
connectionManager: mockConnectionManager,
localTransport$,
});
expectObservable(localMembership.connectionState.livekit$).toBe("ne", {
n: { state: LivekitState.Uninitialized },
e: {
state: LivekitState.Error,
error: expect.toSatisfy(
(e) => e instanceof MatrixRTCTransportMissingError,
),
},
});
});
});
});

View File

@@ -16,20 +16,16 @@ import { observeParticipantEvents } from "@livekit/components-core";
import {
type LivekitTransport,
type MatrixRTCSession,
MembershipManagerEvent,
Status,
} from "matrix-js-sdk/lib/matrixrtc";
import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk";
import {
BehaviorSubject,
catchError,
combineLatest,
distinctUntilChanged,
fromEvent,
map,
type Observable,
of,
scan,
startWith,
switchMap,
tap,
} from "rxjs";
@@ -38,18 +34,11 @@ import { type Logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../../ObservableScope";
import { Publisher } from "./Publisher";
import { type Publisher } from "./Publisher";
import { type MuteStates } from "../../MuteStates";
import { type ProcessorState } from "../../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../../MediaDevices";
import { and$ } from "../../../utils/observable";
import { ElementCallError, UnknownCallError } from "../../../utils/errors";
import {
ElementWidgetActions,
widget,
type WidgetHelpers,
} from "../../../widget";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers";
import { ElementWidgetActions, widget } from "../../../widget";
import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts";
@@ -69,7 +58,7 @@ export enum LivekitState {
}
type LocalMemberLivekitState =
| { state: LivekitState.Error; error: string }
| { state: LivekitState.Error; error: ElementCallError }
| { state: LivekitState.Connected }
| { state: LivekitState.Connecting }
| { state: LivekitState.Uninitialized }
@@ -80,12 +69,14 @@ export enum MatrixState {
Connected = "connected",
Disconnected = "disconnected",
Connecting = "connecting",
Error = "Error",
}
type LocalMemberMatrixState =
| { state: MatrixState.Connected }
| { state: MatrixState.Connecting }
| { state: MatrixState.Disconnected };
| { state: MatrixState.Disconnected }
| { state: MatrixState.Error; error: Error };
export interface LocalMemberConnectionState {
livekit$: Behavior<LocalMemberLivekitState>;
@@ -103,17 +94,21 @@ export interface LocalMemberConnectionState {
* - Publisher.publishTracks()
* - send join state/sticky event
*/
interface Props {
options: Behavior<EnterRTCSessionOptions>;
// TODO add a comment into some code style readme or file header callviewmodel
// that the inputs for those createSomething$() functions should NOT contain any js-sdk objectes
scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates;
connectionManager: IConnectionManager;
matrixRTCSession: MatrixRTCSession;
matrixRoom: MatrixRoom;
createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (trasnport: LivekitTransport) => Promise<void>;
homeserverConnected$: Behavior<boolean>;
localTransport$: Behavior<LivekitTransport | null>;
trackProcessorState$: Behavior<ProcessorState>;
widget: WidgetHelpers | null;
matrixRTCSession: Pick<
MatrixRTCSession,
"updateCallIntent" | "leaveRoomSession"
>;
logger: Logger;
}
@@ -132,18 +127,15 @@ interface Props {
*/
export const createLocalMembership$ = ({
scope,
options,
muteStates,
mediaDevices,
connectionManager,
matrixRTCSession,
localTransport$,
matrixRoom,
trackProcessorState$,
widget,
localTransport$: localTransportCanThrow$,
homeserverConnected$,
createPublisherFactory,
joinMatrixRTC,
logger: parentLogger,
muteStates,
matrixRTCSession,
}: Props): {
// publisher: Publisher
requestConnect: () => LocalMemberConnectionState;
startTracks: () => Behavior<LocalTrack[]>;
requestDisconnect: () => Observable<LocalMemberLivekitState> | null;
@@ -155,17 +147,13 @@ export const createLocalMembership$ = ({
toggleScreenSharing: (() => void) | null;
participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>;
// deprecated fields
/** @deprecated use state instead*/
homeserverConnected$: Behavior<boolean>;
// deprecated fields
/** @deprecated use state instead*/
connected$: Behavior<boolean>;
// this needs to be discussed
/** @deprecated use state instead*/
reconnecting$: Behavior<boolean>;
// also needs to be disccues
/** @deprecated use state instead*/
configError$: Behavior<ElementCallError | null>;
} => {
const logger = parentLogger.getChild("[LocalMembership]");
logger.debug(`Creating local membership..`);
@@ -189,18 +177,38 @@ export const createLocalMembership$ = ({
// This should be used in a combineLatest with publisher$ to connect.
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
// unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
const localTransport$ = scope.behavior(
localTransportCanThrow$.pipe(
catchError((e: unknown) => {
let error: ElementCallError;
if (e instanceof ElementCallError) {
error = e;
} else {
error = new UnknownCallError(
e instanceof Error
? e
: new Error("Unknown error from localTransport"),
);
}
state.livekit$.next({ state: LivekitState.Error, error });
return of(null);
}),
),
);
// Drop Epoch data here since we will not combine this anymore
const localConnection$ = scope.behavior(
combineLatest([connectionManager.connections$, localTransport$]).pipe(
map(([connections, localTransport]) => {
combineLatest([
connectionManager.connectionManagerData$,
localTransport$,
]).pipe(
map(([connectionData, localTransport]) => {
if (localTransport === null) {
return null;
}
return (
connections.value.find((connection) =>
areLivekitTransportsEqual(connection.transport, localTransport),
) ?? null
);
return connectionData.value.getConnectionForTransport(localTransport);
}),
tap((connection) => {
logger.info(
@@ -209,40 +217,6 @@ export const createLocalMembership$ = ({
}),
),
);
/**
* Whether we are connected to the MatrixRTC session.
*/
const homeserverConnected$ = scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
),
);
// /**
// * Whether we are "fully" connected to the call. Accounts for both the
@@ -266,18 +240,15 @@ export const createLocalMembership$ = ({
localConnection$.pipe(scope.bind()).subscribe((connection) => {
if (connection !== null && publisher$.value === null) {
// TODO looks strange to not change publisher if connection changes.
publisher$.next(
new Publisher(
scope,
connection,
mediaDevices,
muteStates,
trackProcessorState$,
),
);
// @toger5 will take care of this!
publisher$.next(createPublisherFactory(connection));
}
});
// const mutestate= publisher$.pipe(switchMap((publisher) => {
// return publisher.muteState$
// });
combineLatest([publisher$, trackStartRequested$]).subscribe(
([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) {
@@ -360,14 +331,21 @@ export const createLocalMembership$ = ({
}
state.matrix$.next({ state: MatrixState.Connecting });
logger.info("Matrix State connecting");
enterRTCSession(matrixRTCSession, transport, options.value).catch(
(error) => {
logger.error(error);
},
);
joinMatrixRTC(transport).catch((error) => {
logger.error(error);
state.matrix$.next({ state: MatrixState.Error, error });
});
},
);
// TODO add this and update `state.matrix$` based on it.
// useTypedEventEmitter(
// rtcSession,
// MatrixRTCSessionEvent.MembershipManagerError,
// (error) => setExternalError(new ConnectionLostError()),
// );
const requestConnect = (): LocalMemberConnectionState => {
trackStartRequested$.next(true);
connectRequested$.next(true);
@@ -441,18 +419,25 @@ export const createLocalMembership$ = ({
}
}
});
// TODO: Refactor updateCallIntent to sth like this:
// combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{
// matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
// }))
//
const configError$ = new BehaviorSubject<ElementCallError | null>(null);
// TODO I do not fully understand what this does.
// Is it needed?
// Is this at the right place?
// Can this be simplified?
// Start and stop session membership as needed
scope.reconcile(localTransport$, async (advertised) => {
if (advertised !== null && advertised !== undefined) {
// Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file)
// MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block.
// @toger5 will try to take care of this.
scope.reconcile(localTransport$, async (transport) => {
if (transport !== null && transport !== undefined) {
try {
await enterRTCSession(matrixRTCSession, advertised, options.value);
configError$.next(null);
state.matrix$.next({ state: MatrixState.Connecting });
await joinMatrixRTC(transport);
} catch (e) {
logger.error("Error entering RTC session", e);
}
@@ -494,14 +479,13 @@ export const createLocalMembership$ = ({
return s.error instanceof ElementCallError
? s.error
: new UnknownCallError(s.error);
} else {
return null;
}
}),
scope.bind(),
)
.subscribe((fatalError) => {
configError$.next(fatalError);
.subscribe((error) => {
if (error !== undefined)
state.livekit$.next({ state: LivekitState.Error, error });
});
/**
@@ -510,9 +494,9 @@ export const createLocalMembership$ = ({
const sharingScreen$ = scope.behavior(
localConnection$.pipe(
switchMap((c) =>
c === null
? of(false)
: observeSharingScreen$(c.livekitRoom.localParticipant),
c !== null
? observeSharingScreen$(c.livekitRoom.localParticipant)
: of(false),
),
),
);
@@ -550,7 +534,7 @@ export const createLocalMembership$ = ({
}
const participant$ = scope.behavior(
localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
);
return {
startTracks,
@@ -560,7 +544,7 @@ export const createLocalMembership$ = ({
homeserverConnected$,
connected$,
reconnecting$,
configError$,
sharingScreen$,
toggleScreenSharing,
participant$,
@@ -614,6 +598,7 @@ export async function enterRTCSession(
const { sendNotificationType: notificationType, callIntent } = getUrlParams();
const multiSFU = matrixRTCMode !== MatrixRTCMode.Legacy;
// Multi-sfu does not need a preferred foci list. just the focus that is actually used.
// TODO where/how do we track errors originating from the ongoing rtcSession?
rtcSession.joinRoomSession(
multiSFU ? [] : [transport],
multiSFU ? transport : undefined,

View File

@@ -0,0 +1,120 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
import { mockConfig, flushPromises } from "../../../utils/test";
import { createLocalTransport$ } from "./LocalTransport";
import { constant } from "../../Behavior";
import { Epoch, ObservableScope } from "../../ObservableScope";
import {
MatrixRTCTransportMissingError,
FailToGetOpenIdToken,
} from "../../../utils/errors";
import * as openIDSFU from "../../../livekit/openIDSFU";
describe("LocalTransport", () => {
let scope: ObservableScope;
beforeEach(() => (scope = new ObservableScope()));
afterEach(() => scope.end());
it("throws if config is missing", async () => {
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
getDomain: () => "",
// These won't be called in this error path but satisfy the type
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
await flushPromises();
expect(() => localTransport$.value).toThrow(
new MatrixRTCTransportMissingError(""),
);
});
it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => {
// Provide a valid config so makeTransportInternal resolves a transport
const scope = new ObservableScope();
mockConfig({
livekit: { livekit_service_url: "https://lk.example.org" },
});
const resolver = Promise.withResolvers<void>();
vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockImplementation(
async () => {
await resolver.promise;
throw new FailToGetOpenIdToken(new Error("no openid"));
},
);
const observations: unknown[] = [];
const errors: Error[] = [];
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
// Use empty domain to skip .well-known and use config directly
getDomain: () => "",
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
localTransport$.subscribe(
(o) => observations.push(o),
(e) => errors.push(e),
);
resolver.resolve();
await flushPromises();
const expectedError = new FailToGetOpenIdToken(new Error("no openid"));
expect(observations).toStrictEqual([null]);
expect(errors).toStrictEqual([expectedError]);
expect(() => localTransport$.value).toThrow(expectedError);
});
it("emits preferred transport after OpenID resolves", async () => {
// Use config so transport discovery succeeds, but delay OpenID JWT fetch
mockConfig({
livekit: { livekit_service_url: "https://lk.example.org" },
});
const openIdResolver = Promise.withResolvers<openIDSFU.SFUConfig>();
vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue(
openIdResolver.promise,
);
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
getDomain: () => "",
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" });
expect(localTransport$.value).toBe(null);
await flushPromises();
// final
expect(localTransport$.value).toStrictEqual({
livekit_alias: "!room:example.org",
livekit_service_url: "https://lk.example.org",
type: "livekit",
});
});
});

View File

@@ -13,32 +13,39 @@ import {
isLivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { type MatrixClient } from "matrix-js-sdk";
import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import {
combineLatest,
distinctUntilChanged,
first,
from,
map,
switchMap,
} from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { type Behavior } from "../../Behavior.ts";
import { type Epoch, type ObservableScope } from "../../ObservableScope.ts";
import { Config } from "../../../config/Config.ts";
import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts";
import { getSFUConfigWithOpenID } from "../../../livekit/openIDSFU.ts";
import {
getSFUConfigWithOpenID,
type OpenIDClientParts,
} from "../../../livekit/openIDSFU.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
import { customLivekitUrl } from "../../../settings/settings.ts";
const logger = rootLogger.getChild("[LocalTransport]");
/*
* - get well known
* - get oldest membership
* - get transport to use
* - get openId + jwt token
* - wait for createTrack() call
* - create tracks
* - wait for join() call
* - Publisher.publishTracks()
* - send join state/sticky event
* It figures out “which LiveKit focus URL/alias the local user should use,”
* optionally aligning with the oldest member, and ensures the SFU path is primed
* before advertising that choice.
*/
interface Props {
scope: ObservableScope;
memberships$: Behavior<Epoch<CallMembership[]>>;
client: MatrixClient;
client: Pick<MatrixClient, "getDomain"> & OpenIDClientParts;
roomId: string;
useOldestMember$: Behavior<boolean>;
}
@@ -49,6 +56,8 @@ interface Props {
*
* @prop useOldestMember Whether to use the same transport as the oldest member.
* This will only update once the first oldest member appears. Will not recompute if the oldest member leaves.
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
export const createLocalTransport$ = ({
scope,
@@ -75,16 +84,20 @@ export const createLocalTransport$ = ({
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*
* @throws
*/
const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior(
from(makeTransport(client, roomId)),
customLivekitUrl.value$.pipe(
switchMap((customUrl) => from(makeTransport(client, roomId, customUrl))),
),
null,
);
/**
* The transport we should advertise in our MatrixRTC membership.
* The chosen transport we should advertise in our MatrixRTC membership.
*/
const advertisedTransport$ = scope.behavior(
return scope.behavior(
combineLatest([
useOldestMember$,
oldestMemberTransport$,
@@ -98,82 +111,78 @@ export const createLocalTransport$ = ({
distinctUntilChanged(areLivekitTransportsEqual),
),
);
return advertisedTransport$;
};
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
async function makeTransportInternal(
client: MatrixClient,
/**
*
* @param client
* @param roomId
* @returns
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
async function makeTransport(
client: Pick<MatrixClient, "getDomain"> & OpenIDClientParts,
roomId: string,
urlFromDevSettings: string | null,
): Promise<LivekitTransport> {
logger.log("Searching for a preferred transport");
let transport: LivekitTransport | undefined;
logger.trace("Searching for a preferred transport");
//TODO refactor this to use the jwt service returned alias.
const livekitAlias = roomId;
// TODO-MULTI-SFU: Either remove this dev tool or make it more official
const urlFromStorage =
localStorage.getItem("robin-matrixrtc-auth") ??
localStorage.getItem("timo-focus-url");
if (urlFromStorage !== null) {
// DEVTOOL: Highest priority: Load from devtool setting
if (urlFromDevSettings !== null) {
const transportFromStorage: LivekitTransport = {
type: "livekit",
livekit_service_url: urlFromStorage,
livekit_service_url: urlFromDevSettings,
livekit_alias: livekitAlias,
};
logger.log(
"Using LiveKit transport from local storage: ",
logger.info(
"Using LiveKit transport from dev tools: ",
transportFromStorage,
);
return transportFromStorage;
transport = transportFromStorage;
}
// Prioritize the .well-known/matrix/client, if available, over the configured SFU
// WELL_KNOWN: Prioritize the .well-known/matrix/client, if available, over the configured SFU
const domain = client.getDomain();
if (domain) {
if (domain && transport === undefined) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[
FOCI_WK_KEY
];
if (Array.isArray(wellKnownFoci)) {
const transport: LivekitTransportConfig | undefined = wellKnownFoci.find(
(f) => f && isLivekitTransportConfig(f),
);
if (transport !== undefined) {
logger.log("Using LiveKit transport from .well-known: ", transport);
return { ...transport, livekit_alias: livekitAlias };
const wellKnownTransport: LivekitTransportConfig | undefined =
wellKnownFoci.find((f) => f && isLivekitTransportConfig(f));
if (wellKnownTransport !== undefined) {
logger.info("Using LiveKit transport from .well-known: ", transport);
transport = { ...wellKnownTransport, livekit_alias: livekitAlias };
}
}
}
// CONFIG: Least prioritized; Load from config file
const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) {
if (urlFromConf && transport === undefined) {
const transportFromConf: LivekitTransport = {
type: "livekit",
livekit_service_url: urlFromConf,
livekit_alias: livekitAlias,
};
logger.log("Using LiveKit transport from config: ", transportFromConf);
return transportFromConf;
logger.info("Using LiveKit transport from config: ", transportFromConf);
transport = transportFromConf;
}
throw new MatrixRTCTransportMissingError(domain ?? "");
}
if (!transport) throw new MatrixRTCTransportMissingError(domain ?? ""); // this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID(
client,
transport.livekit_service_url,
transport.livekit_alias,
);
async function makeTransport(
client: MatrixClient,
roomId: string,
): Promise<LivekitTransport> {
const transport = await makeTransportInternal(client, roomId);
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
try {
await getSFUConfigWithOpenID(
client,
transport.livekit_service_url,
transport.livekit_alias,
);
} catch (e) {
logger.warn(`Failed to get SFU config for transport: ${e}`);
}
return transport;
}

View File

@@ -23,6 +23,7 @@ import type { Behavior } from "../../Behavior.ts";
import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
import { defaultLiveKitOptions } from "../../../livekit/options.ts";
// TODO evaluate if this should be done like the Publisher Factory
export interface ConnectionFactory {
createConnection(
transport: LivekitTransport,

View File

@@ -11,7 +11,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, ObservableScope } from "../../ObservableScope.ts";
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
import {
createConnectionManager$,
type ConnectionManagerData,
@@ -73,7 +73,7 @@ afterEach(() => {
describe("connections$ stream", () => {
test("Should create and start new connections for each transports", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("a", {
@@ -82,7 +82,9 @@ describe("connections$ stream", () => {
logger: logger,
});
expectObservable(connections$).toBe("a", {
expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("a", {
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
expect(connections.length).toBe(2);
@@ -110,7 +112,7 @@ describe("connections$ stream", () => {
test("Should start connection only once", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abcdef", {
@@ -124,7 +126,9 @@ describe("connections$ stream", () => {
logger: logger,
});
expectObservable(connections$).toBe("xxxxxa", {
expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("xxxxxa", {
x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
@@ -153,7 +157,7 @@ describe("connections$ stream", () => {
test("Should cleanup connections when not needed anymore", () => {
withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({
const { connectionManagerData$ } = createConnectionManager$({
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abc", {
@@ -164,7 +168,9 @@ describe("connections$ stream", () => {
logger: logger,
});
expectObservable(connections$).toBe("xab", {
expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("xab", {
x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;

View File

@@ -94,7 +94,6 @@ interface Props {
export interface IConnectionManager {
transports$: Behavior<Epoch<LivekitTransport[]>>;
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
connections$: Behavior<Epoch<Connection[]>>;
}
/**
* Crete a `ConnectionManager`
@@ -217,7 +216,7 @@ export function createConnectionManager$({
new Epoch(new ConnectionManagerData()),
);
return { transports$, connectionManagerData$, connections$ };
return { transports$, connectionManagerData$ };
}
function removeDuplicateTransports(

View File

@@ -79,6 +79,13 @@ export async function flushPromises(): Promise<void> {
await new Promise<void>((resolve) => window.setTimeout(resolve));
}
export type NodeEventHandler = (...args: unknown[]) => void;
export interface NodeStyleEventEmitter {
addListener(eventName: string | symbol, handler: NodeEventHandler): this;
removeListener(eventName: string | symbol, handler: NodeEventHandler): this;
}
export interface OurRunHelpers extends RunHelpers {
/**
* Schedules a sequence of actions to happen, as described by a marble