lots of work. noone knows if it works.

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-28 17:45:14 +02:00
parent a190c4e491
commit 33ba746f2b
3 changed files with 516 additions and 229 deletions

View File

@@ -20,10 +20,10 @@ import {
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import {
ClientEvent,
RoomMember,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
type RoomMember,
} from "matrix-js-sdk";
import {
BehaviorSubject,
@@ -104,7 +104,7 @@ import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { type Behavior } from "./Behavior";
import { defaultLiveKitOptions } from "../livekit/options";
import {
enterRTCSession,
getLivekitAlias,
@@ -388,31 +388,6 @@ class ScreenShare {
type MediaItem = UserMedia | ScreenShare;
function getE2eeOptions(
e2eeSystem: EncryptionSystem,
rtcSession: MatrixRTCSession,
): E2EEOptions | undefined {
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
const keyProvider = new MatrixKeyProvider();
keyProvider.setRTCSession(rtcSession);
return {
keyProvider,
worker: new E2EEWorker(),
};
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
const keyProvider = new ExternalE2EEKeyProvider();
keyProvider
.setKey(e2eeSystem.secret)
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
return {
keyProvider,
worker: new E2EEWorker(),
};
}
}
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
@@ -436,29 +411,25 @@ function getRoomMemberFromRtcMember(
}
export class CallViewModel extends ViewModel {
private readonly e2eeOptions = getE2eeOptions(
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly livekitE2EERoomOptions = getE2eeOptions(
this.options.encryptionSystem,
this.matrixRTCSession,
);
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly localConnectionLivekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
});
private readonly localFocus = makeFocus(this.matrixRTCSession);
private readonly localConnection = this.localFocus.then(
(focus) =>
new PublishConnection(
this.localConnectionLivekitRoom,
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
this.mediaDevices,
this.livekitE2EERoomOptions,
),
);
@@ -483,12 +454,12 @@ export class CallViewModel extends ViewModel {
),
);
private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe(
private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe(
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
);
private readonly remoteConnections$ = this.scope.behavior(
combineLatest([this.localFocus, this.focusServiceUrls$]).pipe(
combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe(
accumulate(
new Map<string, Connection>(),
(prev, [localFocus, focusUrls]) => {
@@ -505,10 +476,6 @@ export class CallViewModel extends ViewModel {
focusUrl,
);
nextConnection = new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
{
livekit_service_url: focusUrl,
livekit_alias: this.livekitAlias,
@@ -518,6 +485,7 @@ export class CallViewModel extends ViewModel {
this.matrixRTCSession.room.client,
this.scope,
this.membershipsAndFocusMap$,
this.livekitE2EERoomOptions,
);
} else {
logger.log(
@@ -634,29 +602,54 @@ export class CallViewModel extends ViewModel {
// in a split-brained state.
private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
* The RemoteParticipants including those that are being "held" on the screen
*/
private readonly remoteParticipants$ = this.scope
.behavior<RemoteParticipant[]>(
combineLatest(
[this.localConnection, this.remoteConnections$],
(localConnection, remoteConnections) => {
const remoteConnectionsParticipants = [
...remoteConnections.values(),
].map((c) => c.publishingParticipants$);
return combineLatest(
[
localConnection.publishingParticipants$,
...remoteConnectionsParticipants,
],
(...ps) => ps.flat(1),
private readonly participants$ = this.scope
.behavior<
{
participant: LocalParticipant | RemoteParticipant;
member: RoomMember;
livekitRoom: LivekitRoom;
}[]
>(
from(this.localConnection).pipe(
switchMap((localConnection) => {
const memberError = (): never => {
throw new Error("No room member for call membership");
};
const localParticipant = {
participant: localConnection.livekitRoom.localParticipant,
member:
this.matrixRoom.getMember(this.userId ?? "") ?? memberError(),
livekitRoom: localConnection.livekitRoom,
};
return this.remoteConnections$.pipe(
switchMap((connections) =>
combineLatest(
[...connections.values()].map((c) =>
c.publishingParticipants$.pipe(
map((ps) =>
ps.map(({ participant, membership }) => ({
participant,
member:
getRoomMemberFromRtcMember(
membership,
this.matrixRoom,
)?.member ?? memberError(),
livekitRoom: c.livekitRoom,
})),
),
),
),
),
),
map((remoteParticipants) => [
...remoteParticipants.flat(1),
localParticipant,
]),
);
},
).pipe(switchAll(), startWith([])),
}),
),
)
.pipe(pauseWhen(this.pretendToBeDisconnected$));
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$));
/**
* Displaynames for each member of the call. This will disambiguate
@@ -728,8 +721,7 @@ export class CallViewModel extends ViewModel {
*/
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([
this.remoteParticipants$,
observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant),
this.participants$,
duplicateTiles.value$,
// Also react to changes in the MatrixRTC session list.
// The session list will also be update if a room membership changes.
@@ -744,43 +736,21 @@ export class CallViewModel extends ViewModel {
(
prevItems,
[
remoteParticipants,
{ participant: localParticipant },
participants,
duplicateTiles,
_membershipsChanged,
showNonMemberTiles,
],
) => {
const newItems = new Map(
const newItems: Map<string, UserMedia | ScreenShare> = new Map(
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
const room = this.matrixRoom;
// m.rtc.members are the basis for calculating what is visible in the call
for (const rtcMember of this.matrixRTCSession.memberships) {
const { member, id: livekitParticipantId } =
getRoomMemberFromRtcMember(rtcMember, room);
const matrixIdentifier = `${rtcMember.sender}:${rtcMember.deviceId}`;
let participant:
| LocalParticipant
| RemoteParticipant
| undefined = undefined;
if (livekitParticipantId === "local") {
participant = localParticipant;
} else {
participant = remoteParticipants.find(
(p) => p.identity === livekitParticipantId,
);
}
if (!member) {
logger.error(
"Could not find member for media id: ",
livekitParticipantId,
);
}
for (const { participant, member, livekitRoom } of participants) {
const matrixId = participant.isLocal
? "local"
: participant.identity;
for (let i = 0; i < 1 + duplicateTiles; i++) {
const indexedMediaId = `${livekitParticipantId}:${i}`;
let prevMedia = prevItems.get(indexedMediaId);
const mediaId = `${matrixId}:${i}`;
let prevMedia = prevItems.get(mediaId);
if (prevMedia && prevMedia instanceof UserMedia) {
prevMedia.updateParticipant(participant);
if (prevMedia.vm.member === undefined) {
@@ -793,33 +763,33 @@ export class CallViewModel extends ViewModel {
}
}
yield [
indexedMediaId,
mediaId,
// We create UserMedia with or without a participant.
// This will be the initial value of a BehaviourSubject.
// Once a participant appears we will update the BehaviourSubject. (see above)
prevMedia ??
new UserMedia(
indexedMediaId,
mediaId,
member,
participant,
this.options.encryptionSystem,
this.localConnectionLivekitRoom,
livekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
map((m) => m.get(matrixId) ?? "[👻]"),
),
this.handsRaised$.pipe(
map((v) => v[matrixIdentifier]?.time ?? null),
map((v) => v[matrixId]?.time ?? null),
),
this.reactions$.pipe(
map((v) => v[matrixIdentifier] ?? undefined),
map((v) => v[matrixId] ?? undefined),
),
),
];
if (participant?.isScreenShareEnabled) {
const screenShareId = `${indexedMediaId}:screen-share`;
const screenShareId = `${mediaId}:screen-share`;
yield [
screenShareId,
prevItems.get(screenShareId) ??
@@ -828,10 +798,10 @@ export class CallViewModel extends ViewModel {
member,
participant,
this.options.encryptionSystem,
this.localConnectionLivekitRoom,
livekitRoom,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
map((m) => m.get(matrixId) ?? "[👻]"),
),
),
];
@@ -853,47 +823,51 @@ export class CallViewModel extends ViewModel {
// - If one wants to test scalability using the LiveKit CLI.
// - If an experimental project does not yet do the MatrixRTC bits.
// - If someone wants to debug if the LiveKit connection works but MatrixRTC room state failed to arrive.
const newNonMemberItems = showNonMemberTiles
? new Map(
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
for (const participant of remoteParticipants) {
for (let i = 0; i < 1 + duplicateTiles; i++) {
const maybeNonMemberParticipantId =
participant.identity + ":" + i;
if (!newItems.has(maybeNonMemberParticipantId)) {
const nonMemberId = maybeNonMemberParticipantId;
yield [
nonMemberId,
prevItems.get(nonMemberId) ??
new UserMedia(
nonMemberId,
undefined,
participant,
this.options.encryptionSystem,
this.localConnectionLivekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map(
(m) => m.get(participant.identity) ?? "[👻]",
),
),
of(null),
of(null),
),
];
}
}
}
}.bind(this)(),
)
: new Map();
if (newNonMemberItems.size > 0) {
logger.debug("Added NonMember items: ", newNonMemberItems);
}
// TODO-MULTI-SFU
// const newNonMemberItems = showNonMemberTiles
// ? new Map(
// function* (
// this: CallViewModel,
// ): Iterable<[string, MediaItem]> {
// for (const participant of remoteParticipants) {
// for (let i = 0; i < 1 + duplicateTiles; i++) {
// const maybeNonMemberParticipantId =
// participant.identity + ":" + i;
// if (!newItems.has(maybeNonMemberParticipantId)) {
// const nonMemberId = maybeNonMemberParticipantId;
// yield [
// nonMemberId,
// prevItems.get(nonMemberId) ??
// new UserMedia(
// nonMemberId,
// undefined,
// participant,
// this.options.encryptionSystem,
// localConnection.livekitRoom,
// this.mediaDevices,
// this.pretendToBeDisconnected$,
// this.memberDisplaynames$.pipe(
// map(
// (m) =>
// m.get(participant.identity) ?? "[👻]",
// ),
// ),
// of(null),
// of(null),
// ),
// ];
// }
// }
// }
// }.bind(this)(),
// )
// : new Map();
// if (newNonMemberItems.size > 0) {
// logger.debug("Added NonMember items: ", newNonMemberItems);
// }
const combinedNew = new Map([
...newNonMemberItems.entries(),
// ...newNonMemberItems.entries(),
...newItems.entries(),
]);
@@ -1724,66 +1698,77 @@ export class CallViewModel extends ViewModel {
// We use matrixConnected$ rather than reconnecting$ because we want to
// pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen.
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
const publications =
this.localConnectionLivekitRoom.localParticipant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
const kind = p.track.kind;
logger.log(
`Resumming ${kind} track (MatrixRTC connection present)`,
);
p.track
.resumeUpstream()
.catch((e) =>
logger.error(
`Failed to resume ${kind} track after MatrixRTC reconnection`,
e,
),
void this.localConnection.then((localConnection) =>
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
const publications =
localConnection.livekitRoom.localParticipant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
const kind = p.track.kind;
logger.log(
`Resuming ${kind} track (MatrixRTC connection present)`,
);
p.track
.resumeUpstream()
.catch((e) =>
logger.error(
`Failed to resume ${kind} track after MatrixRTC reconnection`,
e,
),
);
}
}
} else {
for (const p of publications) {
if (p.track?.isUpstreamPaused === false) {
const kind = p.track.kind;
logger.log(
`Pausing ${kind} track (uncertain MatrixRTC connection)`,
);
p.track
.pauseUpstream()
.catch((e) =>
logger.error(
`Failed to pause ${kind} track after entering uncertain MatrixRTC connection`,
e,
),
);
}
}
}
} else {
for (const p of publications) {
if (p.track?.isUpstreamPaused === false) {
const kind = p.track.kind;
logger.log(
`Pausing ${kind} track (uncertain MatrixRTC connection)`,
);
p.track
.pauseUpstream()
.catch((e) =>
logger.error(
`Failed to pause ${kind} track after entering uncertain MatrixRTC connection`,
e,
),
);
}
}
}
});
}),
);
// Join automatically
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?
}
}
export const membershipsFocusUrl = (
memberships: CallMembership[],
matrixRTCSession: MatrixRTCSession,
): { livekit_service_url: string; membership: CallMembership }[] => {
return memberships
.map(
(m) =>
[matrixRTCSession.resolveActiveFocus(m), m] as [
LivekitFocusConfig | undefined,
CallMembership,
],
)
.filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f))
.map(([f, m]) => ({
livekit_service_url: f!.livekit_service_url,
membership: m,
}));
};
// TODO-MULTI-SFU // Setup and update the keyProvider which was create by `createRoom` was a thing before. Now we never update if the E2EEsystem changes
// do we need this?
function getE2eeOptions(
e2eeSystem: EncryptionSystem,
rtcSession: MatrixRTCSession,
): E2EEOptions | undefined {
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
const keyProvider = new MatrixKeyProvider();
keyProvider.setRTCSession(rtcSession);
return {
keyProvider,
worker: new E2EEWorker(),
};
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
const keyProvider = new ExternalE2EEKeyProvider();
keyProvider
.setKey(e2eeSystem.secret)
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
return {
keyProvider,
worker: new E2EEWorker(),
};
}
}