prettier !

This commit is contained in:
Valere
2025-10-07 16:24:02 +02:00
parent 7437961195
commit 529cb8a7ec
12 changed files with 547 additions and 421 deletions

View File

@@ -24,10 +24,9 @@ import {
mockMatrixRoomMember,
mockMediaDevices,
mockRtcMembership,
mockTrack
mockTrack,
} from "../utils/test";
export const TestAudioContextConstructor = vi.fn(() => testAudioContext);
const MediaDevicesProvider = MediaDevicesContext.MediaDevicesContext.Provider;
@@ -65,8 +64,8 @@ it("should render for member", () => {
const p = {
id: "test:123",
participant: undefined,
member: carol
}
member: carol,
};
const livekitRoom = mockLivekitRoom(
{},
{
@@ -95,8 +94,8 @@ it("should not render without member", () => {
const p = {
id: "test:123",
participant: undefined,
member: carol
}
member: carol,
};
const livekitRoom = mockLivekitRoom(
{},
{
@@ -122,8 +121,8 @@ it("should not setup audioContext gain and pan if there is no need to.", () => {
const p = {
id: "test:123",
participant: undefined,
member: carol
}
member: carol,
};
const livekitRoom = mockLivekitRoom(
{},
{
@@ -160,8 +159,8 @@ it("should setup audioContext gain and pan", () => {
const p = {
id: "test:123",
participant: undefined,
member: carol
}
member: carol,
};
const livekitRoom = mockLivekitRoom(
{},
{
@@ -173,7 +172,8 @@ it("should setup audioContext gain and pan", () => {
<LivekitRoomAudioRenderer
participants={[p]}
url={""}
livekitRoom={livekitRoom} />
livekitRoom={livekitRoom}
/>
</MediaDevicesProvider>,
);

View File

@@ -167,7 +167,7 @@ function createGroupCallView(
muteStates={muteState}
widget={widget}
joined={true}
setJoined={function(value: boolean): void { }}
setJoined={function (value: boolean): void {}}
/>
</ProcessorProvider>
</MediaDevicesContext>

View File

@@ -116,20 +116,22 @@ export const RoomPage: FC = () => {
const groupCallView = (): ReactNode => {
switch (groupCallState.kind) {
case "loaded":
return muteStates && (
<GroupCallView
widget={widget}
client={client!}
rtcSession={groupCallState.rtcSession}
joined={joined}
setJoined={setJoined}
isPasswordlessUser={passwordlessUser}
confineToRoom={confineToRoom}
preload={preload}
skipLobby={skipLobby || wasInWaitForInviteState.current}
header={header}
muteStates={muteStates}
/>
return (
muteStates && (
<GroupCallView
widget={widget}
client={client!}
rtcSession={groupCallState.rtcSession}
joined={joined}
setJoined={setJoined}
isPasswordlessUser={passwordlessUser}
confineToRoom={confineToRoom}
preload={preload}
skipLobby={skipLobby || wasInWaitForInviteState.current}
header={header}
muteStates={muteStates}
/>
)
);
case "waitForInvite":
case "canKnock": {
@@ -148,31 +150,35 @@ export const RoomPage: FC = () => {
</>
);
return (
muteStates && <LobbyView
client={client!}
matrixInfo={{
userId: client!.getUserId() ?? "",
displayName: userDisplayName ?? "",
avatarUrl: avatarUrl ?? "",
roomAlias: null,
roomId: groupCallState.roomSummary.room_id,
roomName: groupCallState.roomSummary.name ?? "",
roomAvatar: groupCallState.roomSummary.avatar_url ?? null,
e2eeSystem: {
kind: groupCallState.roomSummary["im.nheko.summary.encryption"]
? E2eeType.PER_PARTICIPANT
: E2eeType.NONE,
},
}}
onEnter={(): void => knock?.()}
enterLabel={label}
waitingForInvite={groupCallState.kind === "waitForInvite"}
confineToRoom={confineToRoom}
hideHeader={header !== "standard"}
participantCount={null}
muteStates={muteStates}
onShareClick={null}
/>
muteStates && (
<LobbyView
client={client!}
matrixInfo={{
userId: client!.getUserId() ?? "",
displayName: userDisplayName ?? "",
avatarUrl: avatarUrl ?? "",
roomAlias: null,
roomId: groupCallState.roomSummary.room_id,
roomName: groupCallState.roomSummary.name ?? "",
roomAvatar: groupCallState.roomSummary.avatar_url ?? null,
e2eeSystem: {
kind: groupCallState.roomSummary[
"im.nheko.summary.encryption"
]
? E2eeType.PER_PARTICIPANT
: E2eeType.NONE,
},
}}
onEnter={(): void => knock?.()}
enterLabel={label}
waitingForInvite={groupCallState.kind === "waitForInvite"}
confineToRoom={confineToRoom}
hideHeader={header !== "standard"}
participantCount={null}
muteStates={muteStates}
onShareClick={null}
/>
)
);
}
case "loading":

View File

@@ -23,38 +23,37 @@ vi.mock("./widget", () => ({
...actualWidget,
widget: {
api: {
setAlwaysOnScreen: (): void => {
},
transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() }
setAlwaysOnScreen: (): void => {},
transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() },
},
lazyActions: new EventEmitter()
}
lazyActions: new EventEmitter(),
},
}));
test("It joins the correct Session", async () => {
const focusFromOlderMembership = {
type: "livekit",
livekit_service_url: "http://my-oldest-member-service-url.com",
livekit_alias: "my-oldest-member-service-alias"
livekit_alias: "my-oldest-member-service-alias",
};
const focusConfigFromWellKnown = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com"
livekit_service_url: "http://my-well-known-service-url.com",
};
const focusConfigFromWellKnown2 = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url2.com"
livekit_service_url: "http://my-well-known-service-url2.com",
};
const clientWellKnown = {
"org.matrix.msc4143.rtc_foci": [
focusConfigFromWellKnown,
focusConfigFromWellKnown2
]
focusConfigFromWellKnown2,
],
};
mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" }
livekit: { livekit_service_url: "http://my-default-service-url.com" },
});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation(
@@ -63,7 +62,7 @@ test("It joins the correct Session", async () => {
return Promise.resolve(clientWellKnown);
}
return Promise.resolve({});
}
},
);
const mockedSession = vi.mocked({
@@ -75,64 +74,67 @@ test("It joins the correct Session", async () => {
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000
})
}
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership),
getOldestMembership: vi.fn().mockReturnValue({
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership])
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]),
}),
joinRoomSession: vi.fn()
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(mockedSession, {
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit"
type: "livekit",
},
true);
true,
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "my-oldest-member-service-alias",
livekit_service_url: "http://my-oldest-member-service-url.com",
type: "livekit"
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit"
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url2.com",
type: "livekit"
type: "livekit",
},
{
livekit_alias: "roomId",
livekit_service_url: "http://my-default-service-url.com",
type: "livekit"
}
type: "livekit",
},
],
{
focus_selection: "oldest_membership",
type: "livekit"
type: "livekit",
},
{
manageMediaKeys: false,
useLegacyMemberEvents: false,
useNewMembershipManager: true,
useExperimentalToDeviceTransport: false
}
useExperimentalToDeviceTransport: false,
},
);
});
async function testLeaveRTCSession(
cause: "user" | "error",
expectClose: boolean
expectClose: boolean,
): Promise<void> {
vi.clearAllMocks();
const session = { leaveRoomSession: vi.fn() } as unknown as MatrixRTCSession;
@@ -140,18 +142,18 @@ async function testLeaveRTCSession(
expect(session.leaveRoomSession).toHaveBeenCalled();
expect(widget!.api.transport.send).toHaveBeenCalledWith(
ElementWidgetActions.HangupCall,
expect.anything()
expect.anything(),
);
if (expectClose) {
expect(widget!.api.transport.send).toHaveBeenCalledWith(
ElementWidgetActions.Close,
expect.anything()
expect.anything(),
);
expect(widget!.api.transport.stop).toHaveBeenCalled();
} else {
expect(widget!.api.transport.send).not.toHaveBeenCalledWith(
ElementWidgetActions.Close,
expect.anything()
expect.anything(),
);
expect(widget!.api.transport.stop).not.toHaveBeenCalled();
}
@@ -179,24 +181,26 @@ test("It fails with configuration error if no live kit url config is set in fall
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org")
}
getDomain: vi.fn().mockReturnValue("example.org"),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn()
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await expect(enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit"
},
true
)).rejects.toThrowError(
expect.objectContaining({ code: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT })
await expect(
enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
true,
),
).rejects.toThrowError(
expect.objectContaining({ code: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT }),
);
});
@@ -206,9 +210,9 @@ test("It should not fail with configuration error if homeserver config has livek
"org.matrix.msc4143.rtc_foci": [
{
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com"
}
]
livekit_service_url: "http://my-well-known-service-url.com",
},
],
});
const mockedSession = vi.mocked({
@@ -220,19 +224,22 @@ test("It should not fail with configuration error if homeserver config has livek
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000
})
}
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn()
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(mockedSession, {
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit"
type: "livekit",
},
true);
true,
);
});

View File

@@ -5,14 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
catchError,
from,
map,
type Observable,
of,
startWith
} from "rxjs";
import { catchError, from, map, type Observable, of, startWith } from "rxjs";
// TODO where are all the comments? ::cry::
// There used to be an unitialized state!, a state might not start in loading
@@ -34,13 +27,15 @@ export function async$<A>(promise: Promise<A>): Observable<Async<A>> {
return from(promise).pipe(
map(ready),
startWith(loading),
catchError((e: unknown) => of(error(e as Error ?? new Error("Unknown error")))),
catchError((e: unknown) =>
of(error((e as Error) ?? new Error("Unknown error"))),
),
);
}
export function mapAsync<A, B>(
async: Async<A>,
project: (value: A) => B
project: (value: A) => B,
): Async<B> {
return async.state === "ready" ? ready(project(async.value)) : async;
}

View File

@@ -352,7 +352,10 @@ function withCallViewModel(
options,
raisedHands$,
reactions$,
new BehaviorSubject<ProcessorState>({ processor: undefined, supported: undefined }),
new BehaviorSubject<ProcessorState>({
processor: undefined,
supported: undefined,
}),
);
onTestFinished(() => {

View File

@@ -126,7 +126,11 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { type Connection, type ConnectionOpts, RemoteConnection } from "./Connection";
import {
type Connection,
type ConnectionOpts,
RemoteConnection,
} from "./Connection";
import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
@@ -485,7 +489,6 @@ export class CallViewModel extends ViewModel {
),
);
/**
* The MatrixRTC session participants.
*/
@@ -574,7 +577,6 @@ export class CallViewModel extends ViewModel {
(transport) =>
transport &&
mapAsync(transport, (transport) => {
const opts: ConnectionOpts = {
transport,
client: this.matrixRTCSession.room.client,
@@ -582,15 +584,16 @@ export class CallViewModel extends ViewModel {
remoteTransports$: this.remoteTransports$,
};
return {
connection: new PublishConnection(
opts,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
transport,
}}),
connection: new PublishConnection(
opts,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
transport,
};
}),
),
),
);
@@ -605,14 +608,14 @@ export class CallViewModel extends ViewModel {
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
// TODO mapping to ConnectionState for compatibility, but we should use the full state?
? c.value.focusedConnectionState$.pipe(
map((s) => {
if (s.state === "ConnectedToLkRoom") return s.connectionState;
return ConnectionState.Disconnected
}),
distinctUntilChanged(),
)
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
c.value.focusedConnectionState$.pipe(
map((s) => {
if (s.state === "ConnectedToLkRoom") return s.connectionState;
return ConnectionState.Disconnected;
}),
distinctUntilChanged(),
)
: of(ConnectionState.Disconnected),
),
),
@@ -659,8 +662,11 @@ export class CallViewModel extends ViewModel {
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
}
nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions());
};
nextConnection = new RemoteConnection(
args,
this.e2eeLivekitOptions(),
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
@@ -1952,16 +1958,20 @@ export class CallViewModel extends ViewModel {
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const c of stop) {
logger.info(`Disconnecting from ${c.localTransport.livekit_service_url}`);
logger.info(
`Disconnecting from ${c.localTransport.livekit_service_url}`,
);
c.stop().catch((err) => {
// TODO: better error handling
logger.error("MuteState: handler error", err);
});;
});
}
for (const c of start) {
c.start().then(
() =>
logger.info(`Connected to ${c.localTransport.livekit_service_url}`),
logger.info(
`Connected to ${c.localTransport.livekit_service_url}`,
),
(e) =>
logger.error(
`Failed to start connection to ${c.localTransport.livekit_service_url}`,

View File

@@ -5,21 +5,37 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { afterEach, describe, expect, it, type Mock, type MockedObject, vi } from "vitest";
import {
afterEach,
describe,
expect,
it,
type Mock,
type MockedObject,
vi,
} from "vitest";
import { BehaviorSubject, of } from "rxjs";
import {
ConnectionState,
type LocalParticipant,
type RemoteParticipant,
type Room as LivekitRoom,
RoomEvent, type RoomOptions
RoomEvent,
type RoomOptions,
} from "livekit-client";
import fetchMock from "fetch-mock";
import EventEmitter from "events";
import { type IOpenIDToken } from "matrix-js-sdk";
import type { CallMembership, LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type ConnectionOpts, type FocusConnectionState, RemoteConnection } from "./Connection.ts";
import type {
CallMembership,
LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import {
type ConnectionOpts,
type FocusConnectionState,
RemoteConnection,
} from "./Connection.ts";
import { ObservableScope } from "./ObservableScope.ts";
import { type OpenIDClientParts } from "../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../utils/errors.ts";
@@ -38,28 +54,30 @@ let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>;
let fakeRoomEventEmiter: EventEmitter;
let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>;
let fakeMembershipsFocusMap$: BehaviorSubject<
{ membership: CallMembership; transport: LivekitTransport }[]
>;
const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org",
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
type: "livekit"
type: "livekit",
};
function setupTest(): void {
testScope = new ObservableScope();
client = vi.mocked<OpenIDClientParts>({
getOpenIdToken: vi.fn().mockResolvedValue(
{
"access_token": "rYsmGUEwNjKgJYyeNUkZseJN",
"token_type": "Bearer",
"matrix_server_name": "example.org",
"expires_in": 3600
}
),
getDeviceId: vi.fn().mockReturnValue("ABCDEF")
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "rYsmGUEwNjKgJYyeNUkZseJN",
token_type: "Bearer",
matrix_server_name: "example.org",
expires_in: 3600,
}),
getDeviceId: vi.fn().mockReturnValue("ABCDEF"),
} as unknown as OpenIDClientParts);
fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>([]);
fakeMembershipsFocusMap$ = new BehaviorSubject<
{ membership: CallMembership; transport: LivekitTransport }[]
>([]);
localParticipantEventEmiter = new EventEmitter();
@@ -69,9 +87,15 @@ function setupTest(): void {
getTrackPublication: vi.fn().mockReturnValue(undefined),
on: localParticipantEventEmiter.on.bind(localParticipantEventEmiter),
off: localParticipantEventEmiter.off.bind(localParticipantEventEmiter),
addListener: localParticipantEventEmiter.addListener.bind(localParticipantEventEmiter),
removeListener: localParticipantEventEmiter.removeListener.bind(localParticipantEventEmiter),
removeAllListeners: localParticipantEventEmiter.removeAllListeners.bind(localParticipantEventEmiter)
addListener: localParticipantEventEmiter.addListener.bind(
localParticipantEventEmiter,
),
removeListener: localParticipantEventEmiter.removeListener.bind(
localParticipantEventEmiter,
),
removeAllListeners: localParticipantEventEmiter.removeAllListeners.bind(
localParticipantEventEmiter,
),
} as unknown as LocalParticipant);
fakeRoomEventEmiter = new EventEmitter();
@@ -84,56 +108,45 @@ function setupTest(): void {
on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter),
off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter),
addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter),
removeListener: fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter),
removeAllListeners: fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter),
setE2EEEnabled: vi.fn().mockResolvedValue(undefined)
removeListener:
fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter),
removeAllListeners:
fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter),
setE2EEEnabled: vi.fn().mockResolvedValue(undefined),
} as unknown as LivekitRoom);
}
function setupRemoteConnection(): RemoteConnection {
const opts: ConnectionOpts = {
client: client,
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom
livekitRoomFactory: () => fakeLivekitRoom,
};
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`,
() => {
return {
status: 200,
body:
{
"url": "wss://matrix-rtc.m.localhost/livekit/sfu",
"jwt": "ATOKEN"
}
};
}
);
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, () => {
return {
status: 200,
body: {
url: "wss://matrix-rtc.m.localhost/livekit/sfu",
jwt: "ATOKEN",
},
};
});
fakeLivekitRoom
.connect
.mockResolvedValue(undefined);
fakeLivekitRoom.connect.mockResolvedValue(undefined);
return new RemoteConnection(
opts,
undefined
);
return new RemoteConnection(opts, undefined);
}
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
fetchMock.reset();
});
describe("Start connection states", () => {
it("start in initialized state", () => {
setupTest();
@@ -142,15 +155,13 @@ describe("Start connection states", () => {
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new RemoteConnection(
opts,
undefined
);
const connection = new RemoteConnection(opts, undefined);
expect(connection.focusedConnectionState$.getValue().state)
.toEqual("Initialized");
expect(connection.focusedConnectionState$.getValue().state).toEqual(
"Initialized",
);
});
it("fail to getOpenId token then error state", async () => {
@@ -162,31 +173,27 @@ describe("Start connection states", () => {
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new RemoteConnection(
opts,
undefined
);
const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = [];
connection.focusedConnectionState$.subscribe((value) => {
capturedStates.push(value);
});
const deferred = Promise.withResolvers<IOpenIDToken>();
client.getOpenIdToken.mockImplementation(async (): Promise<IOpenIDToken> => {
return await deferred.promise;
});
client.getOpenIdToken.mockImplementation(
async (): Promise<IOpenIDToken> => {
return await deferred.promise;
},
);
connection.start()
.catch(() => {
// expected to throw
});
connection.start().catch(() => {
// expected to throw
});
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
@@ -199,11 +206,14 @@ describe("Start connection states", () => {
capturedState = capturedStates.pop();
if (capturedState!.state === "FailedToStart") {
expect(capturedState!.error.message).toEqual("Something went wrong");
expect(capturedState!.focus.livekit_alias).toEqual(livekitFocus.livekit_alias);
expect(capturedState!.focus.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
expect.fail("Expected FailedToStart state but got " + capturedState?.state);
expect.fail(
"Expected FailedToStart state but got " + capturedState?.state,
);
}
});
it("fail to get JWT token and error state", async () => {
@@ -215,13 +225,10 @@ describe("Start connection states", () => {
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new RemoteConnection(
opts,
undefined
);
const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = [];
connection.focusedConnectionState$.subscribe((value) => {
@@ -230,21 +237,17 @@ describe("Start connection states", () => {
const deferredSFU = Promise.withResolvers<void>();
// mock the /sfu/get call
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`,
async () => {
await deferredSFU.promise;
return {
status: 500,
body: "Internal Server Error"
};
}
);
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, async () => {
await deferredSFU.promise;
return {
status: 500,
body: "Internal Server Error",
};
});
connection.start()
.catch(() => {
// expected to throw
});
connection.start().catch(() => {
// expected to throw
});
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
@@ -256,15 +259,19 @@ describe("Start connection states", () => {
capturedState = capturedStates.pop();
if (capturedState?.state === "FailedToStart") {
expect(capturedState?.error.message).toContain("SFU Config fetch failed with exception Error");
expect(capturedState?.focus.livekit_alias).toEqual(livekitFocus.livekit_alias);
expect(capturedState?.error.message).toContain(
"SFU Config fetch failed with exception Error",
);
expect(capturedState?.focus.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
expect.fail("Expected FailedToStart state but got " + capturedState?.state);
expect.fail(
"Expected FailedToStart state but got " + capturedState?.state,
);
}
});
it("fail to connect to livekit error state", async () => {
setupTest();
vi.useFakeTimers();
@@ -274,46 +281,36 @@ describe("Start connection states", () => {
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom
livekitRoomFactory: () => fakeLivekitRoom,
};
const connection = new RemoteConnection(
opts,
undefined
);
const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = [];
connection.focusedConnectionState$.subscribe((value) => {
capturedStates.push(value);
});
const deferredSFU = Promise.withResolvers<void>();
// mock the /sfu/get call
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`,
() => {
return {
status: 200,
body:
{
"url": "wss://matrix-rtc.m.localhost/livekit/sfu",
"jwt": "ATOKEN"
}
};
}
);
fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, () => {
return {
status: 200,
body: {
url: "wss://matrix-rtc.m.localhost/livekit/sfu",
jwt: "ATOKEN",
},
};
});
fakeLivekitRoom
.connect
.mockImplementation(async () => {
await deferredSFU.promise;
throw new Error("Failed to connect to livekit");
});
fakeLivekitRoom.connect.mockImplementation(async () => {
await deferredSFU.promise;
throw new Error("Failed to connect to livekit");
});
connection.start()
.catch(() => {
// expected to throw
});
connection.start().catch(() => {
// expected to throw
});
let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined();
@@ -326,12 +323,17 @@ describe("Start connection states", () => {
capturedState = capturedStates.pop();
if (capturedState && capturedState?.state === "FailedToStart") {
expect(capturedState.error.message).toContain("Failed to connect to livekit");
expect(capturedState.focus.livekit_alias).toEqual(livekitFocus.livekit_alias);
expect(capturedState.error.message).toContain(
"Failed to connect to livekit",
);
expect(capturedState.focus.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
expect.fail("Expected FailedToStart state but got " + JSON.stringify(capturedState));
expect.fail(
"Expected FailedToStart state but got " + JSON.stringify(capturedState),
);
}
});
it("connection states happy path", async () => {
@@ -356,7 +358,6 @@ describe("Start connection states", () => {
expect(connectingState?.state).toEqual("ConnectingToLkRoom");
const connectedState = capturedState.shift();
expect(connectedState?.state).toEqual("ConnectedToLkRoom");
});
it("should relay livekit events once connected", async () => {
@@ -378,7 +379,7 @@ describe("Start connection states", () => {
ConnectionState.SignalReconnecting,
ConnectionState.Connecting,
ConnectionState.Connected,
ConnectionState.Reconnecting
ConnectionState.Reconnecting,
];
for (const state of states) {
fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state);
@@ -387,12 +388,18 @@ describe("Start connection states", () => {
for (const state of states) {
const s = capturedState.shift();
expect(s?.state).toEqual("ConnectedToLkRoom");
const connectedState = s as FocusConnectionState & { state: "ConnectedToLkRoom" };
const connectedState = s as FocusConnectionState & {
state: "ConnectedToLkRoom";
};
expect(connectedState.connectionState).toEqual(state);
// should always have the focus info
expect(connectedState.focus.livekit_alias).toEqual(livekitFocus.livekit_alias);
expect(connectedState.focus.livekit_service_url).toEqual(livekitFocus.livekit_service_url);
expect(connectedState.focus.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
expect(connectedState.focus.livekit_service_url).toEqual(
livekitFocus.livekit_service_url,
);
}
// If the state is not ConnectedToLkRoom, no events should be relayed anymore
@@ -403,10 +410,8 @@ describe("Start connection states", () => {
}
expect(capturedState.length).toEqual(0);
});
it("shutting down the scope should stop the connection", async () => {
setupTest();
vi.useFakeTimers();
@@ -423,7 +428,6 @@ describe("Start connection states", () => {
const stopSpy = vi.spyOn(connection, "stop");
testScope.end();
expect(stopSpy).toHaveBeenCalled();
expect(fakeLivekitRoom.disconnect).toHaveBeenCalled();
@@ -437,26 +441,22 @@ describe("Start connection states", () => {
expect(capturedState.length).toEqual(0);
});
});
function fakeRemoteLivekitParticipant(id: string): RemoteParticipant {
return vi.mocked<RemoteParticipant>({
identity: id
identity: id,
} as unknown as RemoteParticipant);
}
function fakeRtcMemberShip(userId: string, deviceId: string): CallMembership {
return vi.mocked<CallMembership>({
sender: userId,
deviceId: deviceId
deviceId: deviceId,
} as unknown as CallMembership);
}
describe("Publishing participants observations", () => {
it("should emit the list of publishing participants", async () => {
setupTest();
@@ -464,13 +464,24 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: { participant: RemoteParticipant; membership: CallMembership }[][] = [];
const observedPublishers: {
participant: RemoteParticipant;
membership: CallMembership;
}[][] = [];
connection.publishingParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.participant.identity === "@bob:example.org:DEV111")) {
if (
publishers.some(
(p) => p.participant.identity === "@bob:example.org:DEV111",
)
) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.participant.identity === "@dan:example.org:DEV333")) {
if (
publishers.some(
(p) => p.participant.identity === "@dan:example.org:DEV333",
)
) {
danIsAPublisher.resolve();
}
});
@@ -482,14 +493,13 @@ describe("Publishing participants observations", () => {
fakeRemoteLivekitParticipant("@alice:example.org:DEV000"),
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
fakeRemoteLivekitParticipant("@carol:example.org:DEV222"),
fakeRemoteLivekitParticipant("@dan:example.org:DEV333")
fakeRemoteLivekitParticipant("@dan:example.org:DEV333"),
];
// Let's simulate 3 members on the livekitRoom
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get")
.mockReturnValue(
new Map(participants.map((p) => [p.identity, p]))
);
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue(
new Map(participants.map((p) => [p.identity, p])),
);
for (const participant of participants) {
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant);
@@ -498,20 +508,27 @@ describe("Publishing participants observations", () => {
// At this point there should be no publishers
expect(observedPublishers.pop()!.length).toEqual(0);
const otherFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org",
livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt",
type: "livekit"
type: "livekit",
};
const rtcMemberships = [
// Say bob is on the same focus
{ membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus },
{
membership: fakeRtcMemberShip("@bob:example.org", "DEV111"),
transport: livekitFocus,
},
// Alice and carol is on a different focus
{ membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), transport: otherFocus },
{ membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), transport: otherFocus }
{
membership: fakeRtcMemberShip("@alice:example.org", "DEV000"),
transport: otherFocus,
},
{
membership: fakeRtcMemberShip("@carol:example.org", "DEV222"),
transport: otherFocus,
},
// NO DAVE YET
];
// signal this change in rtc memberships
@@ -521,53 +538,74 @@ describe("Publishing participants observations", () => {
await bobIsAPublisher.promise;
const publishers = observedPublishers.pop();
expect(publishers?.length).toEqual(1);
expect(publishers?.[0].participant.identity).toEqual("@bob:example.org:DEV111");
expect(publishers?.[0].participant.identity).toEqual(
"@bob:example.org:DEV111",
);
// Now let's make dan join the rtc memberships
rtcMemberships
.push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), transport: livekitFocus });
rtcMemberships.push({
membership: fakeRtcMemberShip("@dan:example.org", "DEV333"),
transport: livekitFocus,
});
fakeMembershipsFocusMap$.next(rtcMemberships);
// We should have bob and dan has publishers now
await danIsAPublisher.promise;
const twoPublishers = observedPublishers.pop();
expect(twoPublishers?.length).toEqual(2);
expect(twoPublishers?.some((p) => p.participant.identity === "@bob:example.org:DEV111")).toBeTruthy();
expect(twoPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy();
expect(
twoPublishers?.some(
(p) => p.participant.identity === "@bob:example.org:DEV111",
),
).toBeTruthy();
expect(
twoPublishers?.some(
(p) => p.participant.identity === "@dan:example.org:DEV333",
),
).toBeTruthy();
// Now let's make bob leave the livekit room
participants = participants.filter((p) => p.identity !== "@bob:example.org:DEV111");
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get")
.mockReturnValue(
new Map(participants.map((p) => [p.identity, p]))
);
fakeRoomEventEmiter.emit(RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111"));
participants = participants.filter(
(p) => p.identity !== "@bob:example.org:DEV111",
);
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue(
new Map(participants.map((p) => [p.identity, p])),
);
fakeRoomEventEmiter.emit(
RoomEvent.ParticipantDisconnected,
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
);
const updatedPublishers = observedPublishers.pop();
expect(updatedPublishers?.length).toEqual(1);
expect(updatedPublishers?.some((p) => p.participant.identity === "@dan:example.org:DEV333")).toBeTruthy();
expect(
updatedPublishers?.some(
(p) => p.participant.identity === "@dan:example.org:DEV333",
),
).toBeTruthy();
});
it("should be scoped to parent scope", (): void => {
setupTest();
const connection = setupRemoteConnection();
let observedPublishers: { participant: RemoteParticipant; membership: CallMembership }[][] = [];
let observedPublishers: {
participant: RemoteParticipant;
membership: CallMembership;
}[][] = [];
connection.publishingParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
});
let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@bob:example.org:DEV111")
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
];
// Let's simulate 3 members on the livekitRoom
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get")
.mockReturnValue(
new Map(participants.map((p) => [p.identity, p]))
);
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue(
new Map(participants.map((p) => [p.identity, p])),
);
for (const participant of participants) {
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant);
@@ -578,7 +616,10 @@ describe("Publishing participants observations", () => {
const rtcMemberships = [
// Say bob is on the same focus
{ membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus }
{
membership: fakeRtcMemberShip("@bob:example.org", "DEV111"),
transport: livekitFocus,
},
];
// signal this change in rtc memberships
fakeMembershipsFocusMap$.next(rtcMemberships);
@@ -586,27 +627,31 @@ describe("Publishing participants observations", () => {
// We should have bob has a publisher now
const publishers = observedPublishers.pop();
expect(publishers?.length).toEqual(1);
expect(publishers?.[0].participant.identity).toEqual("@bob:example.org:DEV111");
expect(publishers?.[0].participant.identity).toEqual(
"@bob:example.org:DEV111",
);
// end the parent scope
testScope.end();
observedPublishers = [];
// SHOULD NOT emit any more publishers as the scope is ended
participants = participants.filter((p) => p.identity !== "@bob:example.org:DEV111");
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get")
.mockReturnValue(
new Map(participants.map((p) => [p.identity, p]))
);
fakeRoomEventEmiter.emit(RoomEvent.ParticipantDisconnected, fakeRemoteLivekitParticipant("@bob:example.org:DEV111"));
participants = participants.filter(
(p) => p.identity !== "@bob:example.org:DEV111",
);
vi.spyOn(fakeLivekitRoom, "remoteParticipants", "get").mockReturnValue(
new Map(participants.map((p) => [p.identity, p])),
);
fakeRoomEventEmiter.emit(
RoomEvent.ParticipantDisconnected,
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
);
expect(observedPublishers.length).toEqual(0);
});
});
describe("PublishConnection", () => {
// let fakeBlurProcessor: ProcessorWrapper<BackgroundOptions>;
let roomFactoryMock: Mock<() => LivekitRoom>;
let muteStates: MockedObject<MuteStates>;
@@ -616,7 +661,6 @@ describe("PublishConnection", () => {
roomFactoryMock = vi.fn().mockReturnValue(fakeLivekitRoom);
muteStates = mockMuteStates();
// fakeBlurProcessor = vi.mocked<ProcessorWrapper<BackgroundOptions>>({
@@ -626,20 +670,15 @@ describe("PublishConnection", () => {
// getOptions: vi.fn().mockReturnValue({ strength: 0.5 }),
// isRunning: vi.fn().mockReturnValue(false)
// });
}
describe("Livekit room creation", () => {
function createSetup(): void {
setUpPublishConnection();
const fakeTrackProcessorSubject$ = new BehaviorSubject<ProcessorState>({
supported: true,
processor: undefined
processor: undefined,
});
const opts: ConnectionOpts = {
@@ -647,28 +686,25 @@ describe("PublishConnection", () => {
transport: livekitFocus,
remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope,
livekitRoomFactory: roomFactoryMock
livekitRoomFactory: roomFactoryMock,
};
const audioInput = {
available$: of(new Map([["mic1", { id: "mic1" }]])),
selected$: new BehaviorSubject({ id: "mic1" }),
select(): void {
}
select(): void {},
};
const videoInput = {
available$: of(new Map([["cam1", { id: "cam1" }]])),
selected$: new BehaviorSubject({ id: "cam1" }),
select(): void {
}
select(): void {},
};
const audioOutput = {
available$: of(new Map([["speaker", { id: "speaker" }]])),
selected$: new BehaviorSubject({ id: "speaker" }),
select(): void {
}
select(): void {},
};
// TODO understand what is wrong with our mocking that requires ts-expect-error
@@ -678,7 +714,7 @@ describe("PublishConnection", () => {
// @ts-expect-error Mocking only
videoInput,
// @ts-expect-error Mocking only
audioOutput
audioOutput,
});
new PublishConnection(
@@ -686,18 +722,17 @@ describe("PublishConnection", () => {
fakeDevices,
muteStates,
undefined,
fakeTrackProcessorSubject$
fakeTrackProcessorSubject$,
);
}
it("should create room with proper initial audio and video settings", () => {
createSetup();
expect(roomFactoryMock).toHaveBeenCalled();
const lastCallArgs = roomFactoryMock.mock.calls[roomFactoryMock.mock.calls.length - 1];
const lastCallArgs =
roomFactoryMock.mock.calls[roomFactoryMock.mock.calls.length - 1];
const roomOptions = lastCallArgs.pop() as unknown as RoomOptions;
expect(roomOptions).toBeDefined();
@@ -705,7 +740,6 @@ describe("PublishConnection", () => {
expect(roomOptions!.videoCaptureDefaults?.deviceId).toEqual("cam1");
expect(roomOptions!.audioCaptureDefaults?.deviceId).toEqual("mic1");
expect(roomOptions!.audioOutput?.deviceId).toEqual("speaker");
});
it("respect controlledAudioDevices", () => {
@@ -719,7 +753,6 @@ describe("PublishConnection", () => {
// })
// };
// });
});
});
});

View File

@@ -5,12 +5,27 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core";
import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client";
import { type CallMembership, type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import {
connectedParticipantsObserver,
connectionStateObserver,
} from "@livekit/components-core";
import {
type ConnectionState,
type E2EEOptions,
Room as LivekitRoom,
type RoomOptions,
} from "livekit-client";
import {
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest } from "rxjs";
import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU";
import {
getSFUConfigWithOpenID,
type OpenIDClientParts,
type SFUConfig,
} from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope";
import { defaultLiveKitOptions } from "../livekit/options";
@@ -23,20 +38,26 @@ export interface ConnectionOpts {
/** The observable scope to use for this connection. */
scope: ObservableScope;
/** An observable of the current RTC call memberships and their associated focus. */
remoteTransports$: Behavior<{ membership: CallMembership; transport: LivekitTransport }[]>;
remoteTransports$: Behavior<
{ membership: CallMembership; transport: LivekitTransport }[]
>;
/** Optional factory to create the Livekit room, mainly for testing purposes. */
livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom;
}
export type FocusConnectionState =
| { state: 'Initialized' }
| { state: 'FetchingConfig', focus: LivekitTransport }
| { state: 'ConnectingToLkRoom', focus: LivekitTransport }
| { state: 'PublishingTracks', focus: LivekitTransport }
| { state: 'FailedToStart', error: Error, focus: LivekitTransport }
| { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitTransport }
| { state: 'Stopped', focus: LivekitTransport };
| { state: "Initialized" }
| { state: "FetchingConfig"; focus: LivekitTransport }
| { state: "ConnectingToLkRoom"; focus: LivekitTransport }
| { state: "PublishingTracks"; focus: LivekitTransport }
| { state: "FailedToStart"; error: Error; focus: LivekitTransport }
| {
state: "ConnectedToLkRoom";
connectionState: ConnectionState;
focus: LivekitTransport;
}
| { state: "Stopped"; focus: LivekitTransport };
/**
* A connection to a Matrix RTC LiveKit backend.
@@ -44,10 +65,9 @@ export type FocusConnectionState =
* Expose observables for participants and connection state.
*/
export class Connection {
// Private Behavior
private readonly _focusedConnectionState$
= new BehaviorSubject<FocusConnectionState>({ state: 'Initialized' });
private readonly _focusedConnectionState$ =
new BehaviorSubject<FocusConnectionState>({ state: "Initialized" });
/**
* The current state of the connection to the focus server.
@@ -71,31 +91,44 @@ export class Connection {
public async start(): Promise<void> {
this.stopped = false;
try {
this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.localTransport });
this._focusedConnectionState$.next({
state: "FetchingConfig",
focus: this.localTransport,
});
// TODO could this be loaded earlier to save time?
const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return;
this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.localTransport });
this._focusedConnectionState$.next({
state: "ConnectingToLkRoom",
focus: this.localTransport,
});
await this.livekitRoom.connect(url, jwt);
// If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return;
this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.localTransport, connectionState: this.livekitRoom.state });
this._focusedConnectionState$.next({
state: "ConnectedToLkRoom",
focus: this.localTransport,
connectionState: this.livekitRoom.state,
});
} catch (error) {
this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.localTransport });
this._focusedConnectionState$.next({
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
focus: this.localTransport,
});
throw error;
}
}
protected async getSFUConfigWithOpenID(): Promise<SFUConfig> {
return await getSFUConfigWithOpenID(
this.client,
this.localTransport.livekit_service_url,
this.localTransport.livekit_alias
)
this.localTransport.livekit_alias,
);
}
/**
* Stops the connection.
@@ -106,11 +139,13 @@ export class Connection {
public async stop(): Promise<void> {
if (this.stopped) return;
await this.livekitRoom.disconnect();
this._focusedConnectionState$.next({ state: 'Stopped', focus: this.localTransport });
this._focusedConnectionState$.next({
state: "Stopped",
focus: this.localTransport,
});
this.stopped = true;
}
/**
* An observable of the participants that are publishing on this connection.
* This is derived from `participantsIncludingSubscribers$` and `membershipsFocusMap$`.
@@ -135,20 +170,20 @@ export class Connection {
public readonly livekitRoom: LivekitRoom,
opts: ConnectionOpts,
) {
const { transport, client, scope, remoteTransports$ } =
opts;
const { transport, client, scope, remoteTransports$ } = opts;
this.livekitRoom = livekitRoom
this.livekitRoom = livekitRoom;
this.localTransport = transport;
this.client = client;
this.focusedConnectionState$ = scope.behavior(
this._focusedConnectionState$, { state: 'Initialized' }
this._focusedConnectionState$,
{ state: "Initialized" },
);
const participantsIncludingSubscribers$ = scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[]
[],
);
this.publishingParticipants$ = scope.behavior(
@@ -161,7 +196,7 @@ export class Connection {
transport.livekit_service_url ===
this.localTransport.livekit_service_url
? [membership]
: []
: [],
)
// Pair with their associated LiveKit participant (if any)
// Uses flatMap to filter out memberships with no associated rtc participant ([])
@@ -171,18 +206,22 @@ export class Connection {
return participant ? [{ participant, membership }] : [];
}),
),
[]
[],
);
scope.behavior<ConnectionState>(
connectionStateObserver(this.livekitRoom)
).subscribe((connectionState) => {
const current = this._focusedConnectionState$.value;
// Only update the state if we are already connected to the LiveKit room.
if (current.state === 'ConnectedToLkRoom') {
this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', connectionState, focus: current.focus });
}
});
scope
.behavior<ConnectionState>(connectionStateObserver(this.livekitRoom))
.subscribe((connectionState) => {
const current = this._focusedConnectionState$.value;
// Only update the state if we are already connected to the LiveKit room.
if (current.state === "ConnectedToLkRoom") {
this._focusedConnectionState$.next({
state: "ConnectedToLkRoom",
connectionState,
focus: current.focus,
});
}
});
scope.onEnd(() => void this.stop());
}
@@ -195,17 +234,21 @@ export class Connection {
* It does not publish any local tracks.
*/
export class RemoteConnection extends Connection {
/**
* Creates a new remote connection to a matrix RTC LiveKit backend.
* @param opts
* @param sharedE2eeOption - The shared E2EE options to use for the connection.
*/
public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) {
const factory = opts.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
public constructor(
opts: ConnectionOpts,
sharedE2eeOption: E2EEOptions | undefined,
) {
const factory =
opts.livekitRoomFactory ??
((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
const livekitRoom = factory({
...defaultLiveKitOptions,
e2ee: sharedE2eeOption
e2ee: sharedE2eeOption,
});
super(livekitRoom, opts);
}

View File

@@ -10,15 +10,24 @@ import {
LocalVideoTrack,
Room as LivekitRoom,
type RoomOptions,
Track
Track,
} from "livekit-client";
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
import {
map,
NEVER,
type Observable,
type Subscription,
switchMap,
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import type { Behavior } from "./Behavior.ts";
import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts";
import type { MuteStates } from "./MuteStates.ts";
import { type ProcessorState, trackProcessorSync } from "../livekit/TrackProcessorContext.tsx";
import {
type ProcessorState,
trackProcessorSync,
} from "../livekit/TrackProcessorContext.tsx";
import { getUrlParams } from "../UrlParams.ts";
import { defaultLiveKitOptions } from "../livekit/options.ts";
import { getValue } from "../utils/observable.ts";
@@ -31,7 +40,6 @@ import { type ObservableScope } from "./ObservableScope.ts";
* This connection will publish the local user's audio and video tracks.
*/
export class PublishConnection extends Connection {
/**
* Creates a new PublishConnection.
* @param args - The connection options. {@link ConnectionOpts}
@@ -45,15 +53,22 @@ export class PublishConnection extends Connection {
devices: MediaDevices,
private readonly muteStates: MuteStates,
e2eeLivekitOptions: E2EEOptions | undefined,
trackerProcessorState$: Behavior<ProcessorState>
trackerProcessorState$: Behavior<ProcessorState>,
) {
const { scope } = args;
logger.info("[LivekitRoom] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams();
const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
const factory =
args.livekitRoomFactory ??
((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
const room = factory(
generateRoomOption(devices, trackerProcessorState$.value, controlledAudioDevices, e2eeLivekitOptions)
generateRoomOption(
devices,
trackerProcessorState$.value,
controlledAudioDevices,
e2eeLivekitOptions,
),
);
room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => {
logger.error("Failed to set E2EE enabled on room", e);
@@ -83,14 +98,14 @@ export class PublishConnection extends Connection {
public async start(): Promise<void> {
this.stopped = false;
await super.start()
await super.start();
if (this.stopped) return;
// TODO this can throw errors? It will also prompt for permissions if not already granted
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: this.muteStates.audio.enabled$.value,
video: this.muteStates.video.enabled$.value
video: this.muteStates.video.enabled$.value,
});
if (this.stopped) return;
for (const track of tracks) {
@@ -100,7 +115,7 @@ export class PublishConnection extends Connection {
if (this.stopped) return;
// TODO: check if the connection is still active? and break the loop if not?
}
};
}
/// Private methods
@@ -112,17 +127,19 @@ export class PublishConnection extends Connection {
// in the LocalParticipant object for the track object and there's not a nice
// way to do that generically. There is usually no OS-level default video capture
// device anyway, and audio outputs work differently.
private workaroundRestartAudioInputTrackChrome(devices: MediaDevices, scope: ObservableScope): void {
private workaroundRestartAudioInputTrackChrome(
devices: MediaDevices,
scope: ObservableScope,
): void {
devices.audioInput.selected$
.pipe(
switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER),
scope.bind()
scope.bind(),
)
.subscribe(() => {
if (this.livekitRoom.state != ConnectionState.Connected) return;
const activeMicTrack = Array.from(
this.livekitRoom.localParticipant.audioTrackPublications.values()
this.livekitRoom.localParticipant.audioTrackPublications.values(),
).find((d) => d.source === Track.Source.Microphone)?.track;
if (
@@ -147,11 +164,15 @@ export class PublishConnection extends Connection {
});
}
// Observe changes in the selected media devices and update the LiveKit room accordingly.
private observeMediaDevices(scope: ObservableScope, devices: MediaDevices, controlledAudioDevices: boolean):void {
// Observe changes in the selected media devices and update the LiveKit room accordingly.
private observeMediaDevices(
scope: ObservableScope,
devices: MediaDevices,
controlledAudioDevices: boolean,
): void {
const syncDevice = (
kind: MediaDeviceKind,
selected$: Observable<SelectedDevice | undefined>
selected$: Observable<SelectedDevice | undefined>,
): Subscription =>
selected$.pipe(scope.bind()).subscribe((device) => {
if (this.livekitRoom.state != ConnectionState.Connected) return;
@@ -160,7 +181,7 @@ export class PublishConnection extends Connection {
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
this.livekitRoom.getActiveDevice(kind),
" !== ",
device?.id
device?.id,
);
if (
device !== undefined &&
@@ -169,7 +190,7 @@ export class PublishConnection extends Connection {
this.livekitRoom
.switchActiveDevice(kind, device.id)
.catch((e) =>
logger.error(`Failed to sync ${kind} device with LiveKit`, e)
logger.error(`Failed to sync ${kind} device with LiveKit`, e),
);
}
});
@@ -208,21 +229,23 @@ export class PublishConnection extends Connection {
});
}
private observeTrackProcessors(scope: ObservableScope, room: LivekitRoom, trackerProcessorState$: Behavior<ProcessorState>): void {
private observeTrackProcessors(
scope: ObservableScope,
room: LivekitRoom,
trackerProcessorState$: Behavior<ProcessorState>,
): void {
const track$ = scope.behavior(
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
map((trackRef) => {
const track = trackRef?.publication?.track;
return track instanceof LocalVideoTrack ? track : null;
})
)
}),
),
);
trackProcessorSync(track$, trackerProcessorState$);
}
}
// Generate the initial LiveKit RoomOptions based on the current media devices and processor state.
function generateRoomOption(
devices: MediaDevices,
@@ -235,11 +258,11 @@ function generateRoomOption(
videoCaptureDefaults: {
...defaultLiveKitOptions.videoCaptureDefaults,
deviceId: devices.videoInput.selected$.value?.id,
processor: processorState.processor
processor: processorState.processor,
},
audioCaptureDefaults: {
...defaultLiveKitOptions.audioCaptureDefaults,
deviceId: devices.audioInput.selected$.value?.id
deviceId: devices.audioInput.selected$.value?.id,
},
audioOutput: {
// When using controlled audio devices, we don't want to set the
@@ -247,8 +270,8 @@ function generateRoomOption(
// (also the id does not need to match a browser device id)
deviceId: controlledAudioDevices
? undefined
: getValue(devices.audioOutput.selected$)?.id
: getValue(devices.audioOutput.selected$)?.id,
},
e2ee: e2eeLivekitOptions
e2ee: e2eeLivekitOptions,
};
}

View File

@@ -19,7 +19,11 @@ import { type ComponentProps } from "react";
import { MediaView } from "./MediaView";
import { EncryptionStatus } from "../state/MediaViewModel";
import { mockLocalParticipant, mockMatrixRoomMember, mockRtcMembership } from "../utils/test";
import {
mockLocalParticipant,
mockMatrixRoomMember,
mockRtcMembership,
} from "../utils/test";
describe("MediaView", () => {
const participant = mockLocalParticipant({});

View File

@@ -419,7 +419,9 @@ export const deviceStub = {
select(): void {},
};
export function mockMediaDevices(data: Partial<MediaDevices>): MockedObject<MediaDevices> {
export function mockMediaDevices(
data: Partial<MediaDevices>,
): MockedObject<MediaDevices> {
return vi.mocked<MediaDevices>({
audioInput: deviceStub,
audioOutput: deviceStub,