mirror of
https://github.com/vector-im/element-call.git
synced 2026-02-20 04:57:03 +00:00
lots of work. noone knows if it works.
Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
@@ -5,15 +5,12 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import {
|
||||
observeParticipantEvents,
|
||||
observeParticipantMedia,
|
||||
} from "@livekit/components-core";
|
||||
import { observeParticipantEvents } from "@livekit/components-core";
|
||||
import {
|
||||
ConnectionState,
|
||||
type E2EEOptions,
|
||||
ExternalE2EEKeyProvider,
|
||||
Room as LivekitRoom,
|
||||
type Room as LivekitRoom,
|
||||
type LocalParticipant,
|
||||
ParticipantEvent,
|
||||
type RemoteParticipant,
|
||||
@@ -24,10 +21,10 @@ import {
|
||||
type EventTimelineSetHandlerMap,
|
||||
EventType,
|
||||
RoomEvent,
|
||||
type RoomMember,
|
||||
RoomStateEvent,
|
||||
SyncState,
|
||||
type Room as MatrixRoom,
|
||||
type RoomMember,
|
||||
} from "matrix-js-sdk";
|
||||
import {
|
||||
BehaviorSubject,
|
||||
@@ -116,7 +113,7 @@ import { shallowEquals } from "../utils/array";
|
||||
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
|
||||
import { type MediaDevices } from "./MediaDevices";
|
||||
import { constant, type Behavior } from "./Behavior";
|
||||
import { defaultLiveKitOptions } from "../livekit/options";
|
||||
|
||||
import {
|
||||
enterRTCSession,
|
||||
getLivekitAlias,
|
||||
@@ -411,31 +408,6 @@ class ScreenShare {
|
||||
|
||||
type MediaItem = UserMedia | ScreenShare;
|
||||
|
||||
function getE2eeOptions(
|
||||
e2eeSystem: EncryptionSystem,
|
||||
rtcSession: MatrixRTCSession,
|
||||
): E2EEOptions | undefined {
|
||||
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
|
||||
|
||||
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
|
||||
const keyProvider = new MatrixKeyProvider();
|
||||
keyProvider.setRTCSession(rtcSession);
|
||||
return {
|
||||
keyProvider,
|
||||
worker: new E2EEWorker(),
|
||||
};
|
||||
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
|
||||
const keyProvider = new ExternalE2EEKeyProvider();
|
||||
keyProvider
|
||||
.setKey(e2eeSystem.secret)
|
||||
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
|
||||
return {
|
||||
keyProvider,
|
||||
worker: new E2EEWorker(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function getRoomMemberFromRtcMember(
|
||||
rtcMember: CallMembership,
|
||||
room: MatrixRoom,
|
||||
@@ -459,29 +431,25 @@ function getRoomMemberFromRtcMember(
|
||||
}
|
||||
|
||||
export class CallViewModel extends ViewModel {
|
||||
private readonly e2eeOptions = getE2eeOptions(
|
||||
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
|
||||
|
||||
private readonly livekitE2EERoomOptions = getE2eeOptions(
|
||||
this.options.encryptionSystem,
|
||||
this.matrixRTCSession,
|
||||
);
|
||||
|
||||
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
|
||||
|
||||
private readonly localConnectionLivekitRoom = new LivekitRoom({
|
||||
...defaultLiveKitOptions,
|
||||
e2ee: this.e2eeOptions,
|
||||
});
|
||||
|
||||
private readonly localFocus = makeFocus(this.matrixRTCSession);
|
||||
|
||||
private readonly localConnection = this.localFocus.then(
|
||||
(focus) =>
|
||||
new PublishConnection(
|
||||
this.localConnectionLivekitRoom,
|
||||
focus,
|
||||
this.livekitAlias,
|
||||
this.matrixRTCSession.room.client,
|
||||
this.scope,
|
||||
this.membershipsAndFocusMap$,
|
||||
this.mediaDevices,
|
||||
this.livekitE2EERoomOptions,
|
||||
),
|
||||
);
|
||||
|
||||
@@ -506,12 +474,12 @@ export class CallViewModel extends ViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe(
|
||||
private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe(
|
||||
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
|
||||
);
|
||||
|
||||
private readonly remoteConnections$ = this.scope.behavior(
|
||||
combineLatest([this.localFocus, this.focusServiceUrls$]).pipe(
|
||||
combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe(
|
||||
accumulate(
|
||||
new Map<string, Connection>(),
|
||||
(prev, [localFocus, focusUrls]) => {
|
||||
@@ -528,10 +496,6 @@ export class CallViewModel extends ViewModel {
|
||||
focusUrl,
|
||||
);
|
||||
nextConnection = new Connection(
|
||||
new LivekitRoom({
|
||||
...defaultLiveKitOptions,
|
||||
e2ee: this.e2eeOptions,
|
||||
}),
|
||||
{
|
||||
livekit_service_url: focusUrl,
|
||||
livekit_alias: this.livekitAlias,
|
||||
@@ -541,6 +505,7 @@ export class CallViewModel extends ViewModel {
|
||||
this.matrixRTCSession.room.client,
|
||||
this.scope,
|
||||
this.membershipsAndFocusMap$,
|
||||
this.livekitE2EERoomOptions,
|
||||
);
|
||||
} else {
|
||||
logger.log(
|
||||
@@ -657,29 +622,54 @@ export class CallViewModel extends ViewModel {
|
||||
// in a split-brained state.
|
||||
private readonly pretendToBeDisconnected$ = this.reconnecting$;
|
||||
|
||||
/**
|
||||
* The RemoteParticipants including those that are being "held" on the screen
|
||||
*/
|
||||
private readonly remoteParticipants$ = this.scope
|
||||
.behavior<RemoteParticipant[]>(
|
||||
combineLatest(
|
||||
[this.localConnection, this.remoteConnections$],
|
||||
(localConnection, remoteConnections) => {
|
||||
const remoteConnectionsParticipants = [
|
||||
...remoteConnections.values(),
|
||||
].map((c) => c.publishingParticipants$);
|
||||
|
||||
return combineLatest(
|
||||
[
|
||||
localConnection.publishingParticipants$,
|
||||
...remoteConnectionsParticipants,
|
||||
],
|
||||
(...ps) => ps.flat(1),
|
||||
private readonly participants$ = this.scope
|
||||
.behavior<
|
||||
{
|
||||
participant: LocalParticipant | RemoteParticipant;
|
||||
member: RoomMember;
|
||||
livekitRoom: LivekitRoom;
|
||||
}[]
|
||||
>(
|
||||
from(this.localConnection).pipe(
|
||||
switchMap((localConnection) => {
|
||||
const memberError = (): never => {
|
||||
throw new Error("No room member for call membership");
|
||||
};
|
||||
const localParticipant = {
|
||||
participant: localConnection.livekitRoom.localParticipant,
|
||||
member:
|
||||
this.matrixRoom.getMember(this.userId ?? "") ?? memberError(),
|
||||
livekitRoom: localConnection.livekitRoom,
|
||||
};
|
||||
return this.remoteConnections$.pipe(
|
||||
switchMap((connections) =>
|
||||
combineLatest(
|
||||
[...connections.values()].map((c) =>
|
||||
c.publishingParticipants$.pipe(
|
||||
map((ps) =>
|
||||
ps.map(({ participant, membership }) => ({
|
||||
participant,
|
||||
member:
|
||||
getRoomMemberFromRtcMember(
|
||||
membership,
|
||||
this.matrixRoom,
|
||||
)?.member ?? memberError(),
|
||||
livekitRoom: c.livekitRoom,
|
||||
})),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
map((remoteParticipants) => [
|
||||
...remoteParticipants.flat(1),
|
||||
localParticipant,
|
||||
]),
|
||||
);
|
||||
},
|
||||
).pipe(switchAll(), startWith([])),
|
||||
}),
|
||||
),
|
||||
)
|
||||
.pipe(pauseWhen(this.pretendToBeDisconnected$));
|
||||
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$));
|
||||
|
||||
private readonly memberships$ = this.scope.behavior(
|
||||
fromEvent(
|
||||
@@ -760,8 +750,7 @@ export class CallViewModel extends ViewModel {
|
||||
*/
|
||||
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
|
||||
combineLatest([
|
||||
this.remoteParticipants$,
|
||||
observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant),
|
||||
this.participants$,
|
||||
duplicateTiles.value$,
|
||||
this.memberships$,
|
||||
showNonMemberTiles.value$,
|
||||
@@ -769,44 +758,17 @@ export class CallViewModel extends ViewModel {
|
||||
scan(
|
||||
(
|
||||
prevItems,
|
||||
[
|
||||
remoteParticipants,
|
||||
{ participant: localParticipant },
|
||||
duplicateTiles,
|
||||
memberships,
|
||||
showNonMemberTiles,
|
||||
],
|
||||
[participants, duplicateTiles, memberships, showNonMemberTiles],
|
||||
) => {
|
||||
const newItems = new Map(
|
||||
const newItems: Map<string, UserMedia | ScreenShare> = new Map(
|
||||
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
|
||||
const room = this.matrixRoom;
|
||||
// m.rtc.members are the basis for calculating what is visible in the call
|
||||
for (const rtcMember of memberships) {
|
||||
const { member, id: livekitParticipantId } =
|
||||
getRoomMemberFromRtcMember(rtcMember, room);
|
||||
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
|
||||
|
||||
let participant:
|
||||
| LocalParticipant
|
||||
| RemoteParticipant
|
||||
| undefined = undefined;
|
||||
if (livekitParticipantId === "local") {
|
||||
participant = localParticipant;
|
||||
} else {
|
||||
participant = remoteParticipants.find(
|
||||
(p) => p.identity === livekitParticipantId,
|
||||
);
|
||||
}
|
||||
|
||||
if (!member) {
|
||||
logger.error(
|
||||
"Could not find member for media id: ",
|
||||
livekitParticipantId,
|
||||
);
|
||||
}
|
||||
for (const { participant, member, livekitRoom } of participants) {
|
||||
const matrixId = participant.isLocal
|
||||
? "local"
|
||||
: participant.identity;
|
||||
for (let i = 0; i < 1 + duplicateTiles; i++) {
|
||||
const indexedMediaId = `${livekitParticipantId}:${i}`;
|
||||
let prevMedia = prevItems.get(indexedMediaId);
|
||||
const mediaId = `${matrixId}:${i}`;
|
||||
let prevMedia = prevItems.get(mediaId);
|
||||
if (prevMedia && prevMedia instanceof UserMedia) {
|
||||
prevMedia.updateParticipant(participant);
|
||||
if (prevMedia.vm.member === undefined) {
|
||||
@@ -819,33 +781,33 @@ export class CallViewModel extends ViewModel {
|
||||
}
|
||||
}
|
||||
yield [
|
||||
indexedMediaId,
|
||||
mediaId,
|
||||
// We create UserMedia with or without a participant.
|
||||
// This will be the initial value of a BehaviourSubject.
|
||||
// Once a participant appears we will update the BehaviourSubject. (see above)
|
||||
prevMedia ??
|
||||
new UserMedia(
|
||||
indexedMediaId,
|
||||
mediaId,
|
||||
member,
|
||||
participant,
|
||||
this.options.encryptionSystem,
|
||||
this.localConnectionLivekitRoom,
|
||||
livekitRoom,
|
||||
this.mediaDevices,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
|
||||
map((m) => m.get(matrixId) ?? "[👻]"),
|
||||
),
|
||||
this.handsRaised$.pipe(
|
||||
map((v) => v[matrixIdentifier]?.time ?? null),
|
||||
map((v) => v[matrixId]?.time ?? null),
|
||||
),
|
||||
this.reactions$.pipe(
|
||||
map((v) => v[matrixIdentifier] ?? undefined),
|
||||
map((v) => v[matrixId] ?? undefined),
|
||||
),
|
||||
),
|
||||
];
|
||||
|
||||
if (participant?.isScreenShareEnabled) {
|
||||
const screenShareId = `${indexedMediaId}:screen-share`;
|
||||
const screenShareId = `${mediaId}:screen-share`;
|
||||
yield [
|
||||
screenShareId,
|
||||
prevItems.get(screenShareId) ??
|
||||
@@ -854,10 +816,10 @@ export class CallViewModel extends ViewModel {
|
||||
member,
|
||||
participant,
|
||||
this.options.encryptionSystem,
|
||||
this.localConnectionLivekitRoom,
|
||||
livekitRoom,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
|
||||
map((m) => m.get(matrixId) ?? "[👻]"),
|
||||
),
|
||||
),
|
||||
];
|
||||
@@ -879,47 +841,51 @@ export class CallViewModel extends ViewModel {
|
||||
// - If one wants to test scalability using the LiveKit CLI.
|
||||
// - If an experimental project does not yet do the MatrixRTC bits.
|
||||
// - If someone wants to debug if the LiveKit connection works but MatrixRTC room state failed to arrive.
|
||||
const newNonMemberItems = showNonMemberTiles
|
||||
? new Map(
|
||||
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
|
||||
for (const participant of remoteParticipants) {
|
||||
for (let i = 0; i < 1 + duplicateTiles; i++) {
|
||||
const maybeNonMemberParticipantId =
|
||||
participant.identity + ":" + i;
|
||||
if (!newItems.has(maybeNonMemberParticipantId)) {
|
||||
const nonMemberId = maybeNonMemberParticipantId;
|
||||
yield [
|
||||
nonMemberId,
|
||||
prevItems.get(nonMemberId) ??
|
||||
new UserMedia(
|
||||
nonMemberId,
|
||||
undefined,
|
||||
participant,
|
||||
this.options.encryptionSystem,
|
||||
this.localConnectionLivekitRoom,
|
||||
this.mediaDevices,
|
||||
this.pretendToBeDisconnected$,
|
||||
this.memberDisplaynames$.pipe(
|
||||
map(
|
||||
(m) => m.get(participant.identity) ?? "[👻]",
|
||||
),
|
||||
),
|
||||
of(null),
|
||||
of(null),
|
||||
),
|
||||
];
|
||||
}
|
||||
}
|
||||
}
|
||||
}.bind(this)(),
|
||||
)
|
||||
: new Map();
|
||||
if (newNonMemberItems.size > 0) {
|
||||
logger.debug("Added NonMember items: ", newNonMemberItems);
|
||||
}
|
||||
// TODO-MULTI-SFU
|
||||
// const newNonMemberItems = showNonMemberTiles
|
||||
// ? new Map(
|
||||
// function* (
|
||||
// this: CallViewModel,
|
||||
// ): Iterable<[string, MediaItem]> {
|
||||
// for (const participant of remoteParticipants) {
|
||||
// for (let i = 0; i < 1 + duplicateTiles; i++) {
|
||||
// const maybeNonMemberParticipantId =
|
||||
// participant.identity + ":" + i;
|
||||
// if (!newItems.has(maybeNonMemberParticipantId)) {
|
||||
// const nonMemberId = maybeNonMemberParticipantId;
|
||||
// yield [
|
||||
// nonMemberId,
|
||||
// prevItems.get(nonMemberId) ??
|
||||
// new UserMedia(
|
||||
// nonMemberId,
|
||||
// undefined,
|
||||
// participant,
|
||||
// this.options.encryptionSystem,
|
||||
// localConnection.livekitRoom,
|
||||
// this.mediaDevices,
|
||||
// this.pretendToBeDisconnected$,
|
||||
// this.memberDisplaynames$.pipe(
|
||||
// map(
|
||||
// (m) =>
|
||||
// m.get(participant.identity) ?? "[👻]",
|
||||
// ),
|
||||
// ),
|
||||
// of(null),
|
||||
// of(null),
|
||||
// ),
|
||||
// ];
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }.bind(this)(),
|
||||
// )
|
||||
// : new Map();
|
||||
// if (newNonMemberItems.size > 0) {
|
||||
// logger.debug("Added NonMember items: ", newNonMemberItems);
|
||||
// }
|
||||
|
||||
const combinedNew = new Map([
|
||||
...newNonMemberItems.entries(),
|
||||
// ...newNonMemberItems.entries(),
|
||||
...newItems.entries(),
|
||||
]);
|
||||
|
||||
@@ -1840,66 +1806,77 @@ export class CallViewModel extends ViewModel {
|
||||
// We use matrixConnected$ rather than reconnecting$ because we want to
|
||||
// pause tracks during the initial joining sequence too until we're sure
|
||||
// that our own media is displayed on screen.
|
||||
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
|
||||
const publications =
|
||||
this.localConnectionLivekitRoom.localParticipant.trackPublications.values();
|
||||
if (connected) {
|
||||
for (const p of publications) {
|
||||
if (p.track?.isUpstreamPaused === true) {
|
||||
const kind = p.track.kind;
|
||||
logger.log(
|
||||
`Resumming ${kind} track (MatrixRTC connection present)`,
|
||||
);
|
||||
p.track
|
||||
.resumeUpstream()
|
||||
.catch((e) =>
|
||||
logger.error(
|
||||
`Failed to resume ${kind} track after MatrixRTC reconnection`,
|
||||
e,
|
||||
),
|
||||
void this.localConnection.then((localConnection) =>
|
||||
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
|
||||
const publications =
|
||||
localConnection.livekitRoom.localParticipant.trackPublications.values();
|
||||
if (connected) {
|
||||
for (const p of publications) {
|
||||
if (p.track?.isUpstreamPaused === true) {
|
||||
const kind = p.track.kind;
|
||||
logger.log(
|
||||
`Resuming ${kind} track (MatrixRTC connection present)`,
|
||||
);
|
||||
p.track
|
||||
.resumeUpstream()
|
||||
.catch((e) =>
|
||||
logger.error(
|
||||
`Failed to resume ${kind} track after MatrixRTC reconnection`,
|
||||
e,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const p of publications) {
|
||||
if (p.track?.isUpstreamPaused === false) {
|
||||
const kind = p.track.kind;
|
||||
logger.log(
|
||||
`Pausing ${kind} track (uncertain MatrixRTC connection)`,
|
||||
);
|
||||
p.track
|
||||
.pauseUpstream()
|
||||
.catch((e) =>
|
||||
logger.error(
|
||||
`Failed to pause ${kind} track after entering uncertain MatrixRTC connection`,
|
||||
e,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const p of publications) {
|
||||
if (p.track?.isUpstreamPaused === false) {
|
||||
const kind = p.track.kind;
|
||||
logger.log(
|
||||
`Pausing ${kind} track (uncertain MatrixRTC connection)`,
|
||||
);
|
||||
p.track
|
||||
.pauseUpstream()
|
||||
.catch((e) =>
|
||||
logger.error(
|
||||
`Failed to pause ${kind} track after entering uncertain MatrixRTC connection`,
|
||||
e,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
// Join automatically
|
||||
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?
|
||||
}
|
||||
}
|
||||
|
||||
export const membershipsFocusUrl = (
|
||||
memberships: CallMembership[],
|
||||
matrixRTCSession: MatrixRTCSession,
|
||||
): { livekit_service_url: string; membership: CallMembership }[] => {
|
||||
return memberships
|
||||
.map(
|
||||
(m) =>
|
||||
[matrixRTCSession.resolveActiveFocus(m), m] as [
|
||||
LivekitFocusConfig | undefined,
|
||||
CallMembership,
|
||||
],
|
||||
)
|
||||
.filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f))
|
||||
.map(([f, m]) => ({
|
||||
livekit_service_url: f!.livekit_service_url,
|
||||
membership: m,
|
||||
}));
|
||||
};
|
||||
// TODO-MULTI-SFU // Setup and update the keyProvider which was create by `createRoom` was a thing before. Now we never update if the E2EEsystem changes
|
||||
// do we need this?
|
||||
|
||||
function getE2eeOptions(
|
||||
e2eeSystem: EncryptionSystem,
|
||||
rtcSession: MatrixRTCSession,
|
||||
): E2EEOptions | undefined {
|
||||
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
|
||||
|
||||
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
|
||||
const keyProvider = new MatrixKeyProvider();
|
||||
keyProvider.setRTCSession(rtcSession);
|
||||
return {
|
||||
keyProvider,
|
||||
worker: new E2EEWorker(),
|
||||
};
|
||||
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
|
||||
const keyProvider = new ExternalE2EEKeyProvider();
|
||||
keyProvider
|
||||
.setKey(e2eeSystem.secret)
|
||||
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
|
||||
return {
|
||||
keyProvider,
|
||||
worker: new E2EEWorker(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,26 +8,42 @@ Please see LICENSE in the repository root for full details.
|
||||
|
||||
import { connectedParticipantsObserver } from "@livekit/components-core";
|
||||
import {
|
||||
type Room as LivekitRoom,
|
||||
type RemoteParticipant,
|
||||
ConnectionState,
|
||||
Room as LivekitRoom,
|
||||
type RoomOptions,
|
||||
type E2EEOptions,
|
||||
RoomEvent,
|
||||
Track,
|
||||
} from "livekit-client";
|
||||
import { type MatrixClient } from "matrix-js-sdk";
|
||||
import {
|
||||
type LivekitFocus,
|
||||
type CallMembership,
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { combineLatest, map, type Observable } from "rxjs";
|
||||
import {
|
||||
BehaviorSubject,
|
||||
combineLatest,
|
||||
filter,
|
||||
fromEvent,
|
||||
map,
|
||||
NEVER,
|
||||
type Observable,
|
||||
type Subscription,
|
||||
switchMap,
|
||||
} from "rxjs";
|
||||
import { logger } from "matrix-js-sdk/lib/logger";
|
||||
|
||||
import { type SelectedDevice, type MediaDevices } from "./MediaDevices";
|
||||
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
|
||||
import { type Behavior } from "./Behavior";
|
||||
import { constant, type Behavior } from "./Behavior";
|
||||
import { type ObservableScope } from "./ObservableScope";
|
||||
import { defaultLiveKitOptions } from "../livekit/options";
|
||||
import { getValue } from "../utils/observable";
|
||||
import { getUrlParams } from "../UrlParams";
|
||||
import { type MuteStates } from "../room/MuteStates";
|
||||
|
||||
export class Connection {
|
||||
protected readonly sfuConfig = getSFUConfigWithOpenID(
|
||||
this.client,
|
||||
this.focus.livekit_service_url,
|
||||
this.livekitAlias,
|
||||
);
|
||||
protected stopped = false;
|
||||
|
||||
public async start(): Promise<void> {
|
||||
this.stopped = false;
|
||||
@@ -35,22 +51,44 @@ export class Connection {
|
||||
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
|
||||
}
|
||||
|
||||
protected stopped = false;
|
||||
|
||||
public stop(): void {
|
||||
void this.livekitRoom.disconnect();
|
||||
this.stopped = true;
|
||||
}
|
||||
|
||||
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
[],
|
||||
protected readonly sfuConfig = getSFUConfigWithOpenID(
|
||||
this.client,
|
||||
this.focus.livekit_service_url,
|
||||
this.livekitAlias,
|
||||
);
|
||||
|
||||
public readonly publishingParticipants$: Observable<RemoteParticipant[]> =
|
||||
this.scope.behavior(
|
||||
public readonly participantsIncludingSubscribers$;
|
||||
public readonly publishingParticipants$;
|
||||
public livekitRoom: LivekitRoom;
|
||||
|
||||
public connectionState$: Behavior<ConnectionState>;
|
||||
public constructor(
|
||||
protected readonly focus: LivekitFocus,
|
||||
protected readonly livekitAlias: string,
|
||||
protected readonly client: MatrixClient,
|
||||
protected readonly scope: ObservableScope,
|
||||
protected readonly membershipsFocusMap$: Behavior<
|
||||
{ membership: CallMembership; focus: LivekitFocus }[]
|
||||
>,
|
||||
e2eeLivekitOptions: E2EEOptions | undefined,
|
||||
) {
|
||||
this.livekitRoom = new LivekitRoom({
|
||||
...defaultLiveKitOptions,
|
||||
e2ee: e2eeLivekitOptions,
|
||||
});
|
||||
this.participantsIncludingSubscribers$ = this.scope.behavior(
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
[],
|
||||
);
|
||||
|
||||
this.publishingParticipants$ = this.scope.behavior(
|
||||
combineLatest([
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
this.participantsIncludingSubscribers$,
|
||||
this.membershipsFocusMap$,
|
||||
]).pipe(
|
||||
map(([participants, membershipsFocusMap]) =>
|
||||
@@ -62,27 +100,24 @@ export class Connection {
|
||||
: [],
|
||||
)
|
||||
// Find all associated publishing livekit participant objects
|
||||
.flatMap(({ sender, deviceId }) => {
|
||||
.flatMap((membership) => {
|
||||
const participant = participants.find(
|
||||
(p) => p.identity === `${sender}:${deviceId}`,
|
||||
(p) =>
|
||||
p.identity === `${membership.sender}:${membership.deviceId}`,
|
||||
);
|
||||
return participant ? [participant] : [];
|
||||
return participant ? [{ participant, membership }] : [];
|
||||
}),
|
||||
),
|
||||
),
|
||||
[],
|
||||
);
|
||||
|
||||
public constructor(
|
||||
protected readonly livekitRoom: LivekitRoom,
|
||||
protected readonly focus: LivekitFocus,
|
||||
protected readonly livekitAlias: string,
|
||||
protected readonly client: MatrixClient,
|
||||
protected readonly scope: ObservableScope,
|
||||
protected readonly membershipsFocusMap$: Behavior<
|
||||
{ membership: CallMembership; focus: LivekitFocus }[]
|
||||
>,
|
||||
) {}
|
||||
this.connectionState$ = this.scope.behavior<ConnectionState>(
|
||||
fromEvent<ConnectionState>(
|
||||
this.livekitRoom,
|
||||
RoomEvent.ConnectionStateChanged,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export class PublishConnection extends Connection {
|
||||
@@ -111,4 +146,271 @@ export class PublishConnection extends Connection {
|
||||
connectedParticipantsObserver(this.livekitRoom),
|
||||
[],
|
||||
);
|
||||
private readonly muteStates$: Behavior<MuteStates>;
|
||||
private updatingMuteStates$ = new BehaviorSubject(false);
|
||||
|
||||
public constructor(
|
||||
protected readonly focus: LivekitFocus,
|
||||
protected readonly livekitAlias: string,
|
||||
protected readonly client: MatrixClient,
|
||||
protected readonly scope: ObservableScope,
|
||||
protected readonly membershipsFocusMap$: Behavior<
|
||||
{ membership: CallMembership; focus: LivekitFocus }[]
|
||||
>,
|
||||
protected readonly devices: MediaDevices,
|
||||
e2eeLivekitOptions: E2EEOptions | undefined,
|
||||
) {
|
||||
super(
|
||||
focus,
|
||||
livekitAlias,
|
||||
client,
|
||||
scope,
|
||||
membershipsFocusMap$,
|
||||
e2eeLivekitOptions,
|
||||
);
|
||||
|
||||
// TODO-MULTI-SFU use actual mute states
|
||||
this.muteStates$ = constant({
|
||||
audio: { enabled: true, setEnabled: (enabled) => {} },
|
||||
video: { enabled: true, setEnabled: (enabled) => {} },
|
||||
});
|
||||
|
||||
logger.info("[LivekitRoom] Create LiveKit room");
|
||||
const { controlledAudioDevices } = getUrlParams();
|
||||
|
||||
const roomOptions: RoomOptions = {
|
||||
...defaultLiveKitOptions,
|
||||
videoCaptureDefaults: {
|
||||
...defaultLiveKitOptions.videoCaptureDefaults,
|
||||
deviceId: getValue(this.devices.videoInput.selected$)?.id,
|
||||
// TODO-MULTI-SFU add processor support back
|
||||
// processor,
|
||||
},
|
||||
audioCaptureDefaults: {
|
||||
...defaultLiveKitOptions.audioCaptureDefaults,
|
||||
deviceId: getValue(devices.audioInput.selected$)?.id,
|
||||
},
|
||||
audioOutput: {
|
||||
// When using controlled audio devices, we don't want to set the
|
||||
// deviceId here, because it will be set by the native app.
|
||||
// (also the id does not need to match a browser device id)
|
||||
deviceId: controlledAudioDevices
|
||||
? undefined
|
||||
: getValue(devices.audioOutput.selected$)?.id,
|
||||
},
|
||||
e2ee: e2eeLivekitOptions,
|
||||
};
|
||||
// We have to create the room manually here due to a bug inside
|
||||
// @livekit/components-react. JSON.stringify() is used in deps of a
|
||||
// useEffect() with an argument that references itself, if E2EE is enabled
|
||||
const room = new LivekitRoom(roomOptions);
|
||||
room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => {
|
||||
logger.error("Failed to set E2EE enabled on room", e);
|
||||
});
|
||||
this.livekitRoom = room;
|
||||
|
||||
// sync mute states TODO-MULTI_SFU This possibly can be simplified quite a bit.
|
||||
combineLatest([
|
||||
this.connectionState$,
|
||||
this.muteStates$,
|
||||
this.updatingMuteStates$,
|
||||
])
|
||||
.pipe(
|
||||
filter(([_c, _m, updating]) => !updating),
|
||||
this.scope.bind(),
|
||||
)
|
||||
.subscribe(([connectionState, muteStates, _]) => {
|
||||
// Sync the requested mute states with LiveKit's mute states. We do it this
|
||||
// way around rather than using LiveKit as the source of truth, so that the
|
||||
// states can be consistent throughout the lobby and loading screens.
|
||||
// It's important that we only do this in the connected state, because
|
||||
// LiveKit's internal mute states aren't consistent during connection setup,
|
||||
// and setting tracks to be enabled during this time causes errors.
|
||||
if (
|
||||
this.livekitRoom !== undefined &&
|
||||
connectionState === ConnectionState.Connected
|
||||
) {
|
||||
const participant = this.livekitRoom.localParticipant;
|
||||
|
||||
enum MuteDevice {
|
||||
Microphone,
|
||||
Camera,
|
||||
}
|
||||
|
||||
const syncMuteState = async (
|
||||
iterCount: number,
|
||||
type: MuteDevice,
|
||||
): Promise<void> => {
|
||||
// The approach for muting is to always bring the actual livekit state in sync with the button
|
||||
// This allows for a very predictable and reactive behavior for the user.
|
||||
// (the new state is the old state when pressing the button n times (where n is even))
|
||||
// (the new state is different to the old state when pressing the button n times (where n is uneven))
|
||||
// In case there are issues with the device there might be situations where setMicrophoneEnabled/setCameraEnabled
|
||||
// return immediately. This should be caught with the Error("track with new mute state could not be published").
|
||||
// For now we are still using an iterCount to limit the recursion loop to 10.
|
||||
// This could happen if the device just really does not want to turn on (hardware based issue)
|
||||
// but the mute button is in unmute state.
|
||||
// For now our fail mode is to just stay in this state.
|
||||
// TODO: decide for a UX on how that fail mode should be treated (disable button, hide button, sync button back to muted without user input)
|
||||
|
||||
if (iterCount > 10) {
|
||||
logger.error(
|
||||
"Stop trying to sync the input device with current mute state after 10 failed tries",
|
||||
);
|
||||
return;
|
||||
}
|
||||
let devEnabled;
|
||||
let btnEnabled;
|
||||
switch (type) {
|
||||
case MuteDevice.Microphone:
|
||||
devEnabled = participant.isMicrophoneEnabled;
|
||||
btnEnabled = muteStates.audio.enabled;
|
||||
break;
|
||||
case MuteDevice.Camera:
|
||||
devEnabled = participant.isCameraEnabled;
|
||||
btnEnabled = muteStates.video.enabled;
|
||||
break;
|
||||
}
|
||||
if (devEnabled !== btnEnabled && !this.updatingMuteStates$.value) {
|
||||
this.updatingMuteStates$.next(true);
|
||||
|
||||
try {
|
||||
let trackPublication;
|
||||
switch (type) {
|
||||
case MuteDevice.Microphone:
|
||||
trackPublication = await participant.setMicrophoneEnabled(
|
||||
btnEnabled,
|
||||
this.livekitRoom.options.audioCaptureDefaults,
|
||||
);
|
||||
break;
|
||||
case MuteDevice.Camera:
|
||||
trackPublication = await participant.setCameraEnabled(
|
||||
btnEnabled,
|
||||
this.livekitRoom.options.videoCaptureDefaults,
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
if (trackPublication) {
|
||||
// await participant.setMicrophoneEnabled can return immediately in some instances,
|
||||
// so that participant.isMicrophoneEnabled !== buttonEnabled.current.audio still holds true.
|
||||
// This happens if the device is still in a pending state
|
||||
// "sleeping" here makes sure we let react do its thing so that participant.isMicrophoneEnabled is updated,
|
||||
// so we do not end up in a recursion loop.
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
// track got successfully changed to mute/unmute
|
||||
// Run the check again after the change is done. Because the user
|
||||
// can update the state (presses mute button) while the device is enabling
|
||||
// itself we need might need to update the mute state right away.
|
||||
// This async recursion makes sure that setCamera/MicrophoneEnabled is
|
||||
// called as little times as possible.
|
||||
await syncMuteState(iterCount + 1, type);
|
||||
} else {
|
||||
throw new Error(
|
||||
"track with new mute state could not be published",
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
if ((e as DOMException).name === "NotAllowedError") {
|
||||
logger.error(
|
||||
"Fatal error while syncing mute state: resetting",
|
||||
e,
|
||||
);
|
||||
if (type === MuteDevice.Microphone) {
|
||||
muteStates.audio.setEnabled?.(false);
|
||||
} else {
|
||||
muteStates.video.setEnabled?.(false);
|
||||
}
|
||||
} else {
|
||||
logger.error(
|
||||
"Failed to sync audio mute state with LiveKit (will retry to sync in 1s):",
|
||||
e,
|
||||
);
|
||||
setTimeout(() => {
|
||||
this.updatingMuteStates$.next(false);
|
||||
}, 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
syncMuteState(0, MuteDevice.Microphone).catch((e) => {
|
||||
logger.error("Failed to sync audio mute state with LiveKit", e);
|
||||
});
|
||||
syncMuteState(0, MuteDevice.Camera).catch((e) => {
|
||||
logger.error("Failed to sync video mute state with LiveKit", e);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const syncDevice = (
|
||||
kind: MediaDeviceKind,
|
||||
selected$: Observable<SelectedDevice | undefined>,
|
||||
): Subscription =>
|
||||
selected$.pipe(this.scope.bind()).subscribe((device) => {
|
||||
if (this.connectionState$.value !== ConnectionState.Connected) return;
|
||||
logger.info(
|
||||
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
|
||||
this.livekitRoom.getActiveDevice(kind),
|
||||
" !== ",
|
||||
device?.id,
|
||||
);
|
||||
if (
|
||||
device !== undefined &&
|
||||
this.livekitRoom.getActiveDevice(kind) !== device.id
|
||||
) {
|
||||
this.livekitRoom
|
||||
.switchActiveDevice(kind, device.id)
|
||||
.catch((e) =>
|
||||
logger.error(`Failed to sync ${kind} device with LiveKit`, e),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
syncDevice("audioinput", devices.audioInput.selected$);
|
||||
if (!controlledAudioDevices)
|
||||
syncDevice("audiooutput", devices.audioOutput.selected$);
|
||||
syncDevice("videoinput", devices.videoInput.selected$);
|
||||
// Restart the audio input track whenever we detect that the active media
|
||||
// device has changed to refer to a different hardware device. We do this
|
||||
// for the sake of Chrome, which provides a "default" device that is meant
|
||||
// to match the system's default audio input, whatever that may be.
|
||||
// This is special-cased for only audio inputs because we need to dig around
|
||||
// in the LocalParticipant object for the track object and there's not a nice
|
||||
// way to do that generically. There is usually no OS-level default video capture
|
||||
// device anyway, and audio outputs work differently.
|
||||
devices.audioInput.selected$
|
||||
.pipe(
|
||||
switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER),
|
||||
this.scope.bind(),
|
||||
)
|
||||
.subscribe(() => {
|
||||
if (this.connectionState$.value !== ConnectionState.Connected) return;
|
||||
const activeMicTrack = Array.from(
|
||||
this.livekitRoom.localParticipant.audioTrackPublications.values(),
|
||||
).find((d) => d.source === Track.Source.Microphone)?.track;
|
||||
|
||||
if (
|
||||
activeMicTrack &&
|
||||
// only restart if the stream is still running: LiveKit will detect
|
||||
// when a track stops & restart appropriately, so this is not our job.
|
||||
// Plus, we need to avoid restarting again if the track is already in
|
||||
// the process of being restarted.
|
||||
activeMicTrack.mediaStreamTrack.readyState !== "ended"
|
||||
) {
|
||||
// Restart the track, which will cause Livekit to do another
|
||||
// getUserMedia() call with deviceId: default to get the *new* default device.
|
||||
// Note that room.switchActiveDevice() won't work: Livekit will ignore it because
|
||||
// the deviceId hasn't changed (was & still is default).
|
||||
this.livekitRoom.localParticipant
|
||||
.getTrackPublication(Track.Source.Microphone)
|
||||
?.audioTrack?.restartTrack()
|
||||
.catch((e) => {
|
||||
logger.error(`Failed to restart audio device track`, e);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
// TODO-MULTI-SFU Sync the requested track processors with LiveKit
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user