diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index c75c84c9..1a5281c8 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details. */ import { - connectedParticipantsObserver, observeParticipantEvents, observeParticipantMedia, } from "@livekit/components-core"; @@ -21,7 +20,6 @@ import { import E2EEWorker from "livekit-client/e2ee-worker?worker"; import { ClientEvent, - type MatrixClient, RoomStateEvent, SyncState, type Room as MatrixRoom, @@ -54,6 +52,7 @@ import { import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, + isLivekitFocus, isLivekitFocusConfig, type LivekitFocusConfig, type MatrixRTCSession, @@ -105,7 +104,6 @@ import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior } from "./Behavior"; -import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { defaultLiveKitOptions } from "../livekit/options"; import { enterRTCSession, @@ -114,6 +112,7 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; +import { Connection, PublishConnection } from "./Connection"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -436,88 +435,6 @@ function getRoomMemberFromRtcMember( return { id, member }; } -// TODO-MULTI-SFU Add all device syncing logic from useLivekit -class Connection { - private readonly sfuConfig = getSFUConfigWithOpenID( - this.client, - this.serviceUrl, - this.livekitAlias, - ); - - public async startSubscribing(): Promise { - this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); - await this.livekitRoom.localParticipant.publishTrack(tracks[0]); - } - - public async startPublishing(): Promise { - this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); - - if (!this.stopped) { - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: true, - video: true, - }); - for (const track of tracks) { - await this.livekitRoom.localParticipant.publishTrack(track); - } - } - } - - private stopped = false; - - public stop(): void { - void this.livekitRoom.disconnect(); - this.stopped = true; - } - - public readonly participantsIncludingSubscribers$ = this.scope.behavior( - connectedParticipantsObserver(this.livekitRoom), - [], - ); - - public readonly publishingParticipants$ = ( - memberships$: Behavior, - ): Observable => - this.scope.behavior( - combineLatest([ - connectedParticipantsObserver(this.livekitRoom), - memberships$, - ]).pipe( - map(([participants, memberships]) => { - const publishingMembers = membershipsFocusUrl( - memberships, - this.matrixRTCSession, - ) - .filter((f) => f.livekit_service_url === this.serviceUrl) - .map((f) => f.membership); - - const publishingP = publishingMembers - .map((m) => { - return participants.find((p) => { - return p.identity === `${m.sender}:${m.deviceId}`; - }); - }) - .filter((p): p is RemoteParticipant => !!p); - return publishingP; - }), - ), - [], - ); - - public constructor( - private readonly livekitRoom: LivekitRoom, - private readonly serviceUrl: string, - private readonly livekitAlias: string, - private readonly client: MatrixClient, - private readonly scope: ObservableScope, - private readonly matrixRTCSession: MatrixRTCSession, - ) {} -} - export class CallViewModel extends ViewModel { private readonly e2eeOptions = getE2eeOptions( this.options.encryptionSystem, @@ -535,13 +452,13 @@ export class CallViewModel extends ViewModel { private readonly localConnection = this.localFocus.then( (focus) => - new Connection( + new PublishConnection( this.localConnectionLivekitRoom, - focus.livekit_service_url, + focus, this.livekitAlias, this.matrixRTCSession.room.client, this.scope, - this.matrixRTCSession, + this.membershipsAndFocusMap$, ), ); @@ -555,53 +472,67 @@ export class CallViewModel extends ViewModel { ), ); - private readonly foci$ = this.memberships$.pipe( - map( - (memberships) => - new Set( - membershipsFocusUrl(memberships, this.matrixRTCSession).map( - (f) => f.livekit_service_url, - ), - ), + private readonly membershipsAndFocusMap$ = this.scope.behavior( + this.memberships$.pipe( + map((memberships) => + memberships.flatMap((m) => { + const f = this.matrixRTCSession.resolveActiveFocus(m); + return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : []; + }), + ), ), ); + private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe( + map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), + ); + private readonly remoteConnections$ = this.scope.behavior( - combineLatest([this.localFocus, this.foci$]).pipe( - accumulate(new Map(), (prev, [localFocus, foci]) => { - const stopped = new Map(prev); - const next = new Map(); - for (const focus of foci) { - if (focus !== localFocus.livekit_service_url) { - stopped.delete(focus); + combineLatest([this.localFocus, this.focusServiceUrls$]).pipe( + accumulate( + new Map(), + (prev, [localFocus, focusUrls]) => { + const stopped = new Map(prev); + const next = new Map(); + for (const focusUrl of focusUrls) { + if (focusUrl !== localFocus.livekit_service_url) { + stopped.delete(focusUrl); - let nextConnection = prev.get(focus); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", - focus, - ); - nextConnection = new Connection( - new LivekitRoom({ - ...defaultLiveKitOptions, - e2ee: this.e2eeOptions, - }), - focus, - this.livekitAlias, - this.matrixRTCSession.room.client, - this.scope, - this.matrixRTCSession, - ); - } else { - logger.log("SFU remoteConnections$ use prev connection: ", focus); + let nextConnection = prev.get(focusUrl); + if (!nextConnection) { + logger.log( + "SFU remoteConnections$ construct new connection: ", + focusUrl, + ); + nextConnection = new Connection( + new LivekitRoom({ + ...defaultLiveKitOptions, + e2ee: this.e2eeOptions, + }), + { + livekit_service_url: focusUrl, + livekit_alias: this.livekitAlias, + type: "livekit", + }, + this.livekitAlias, + this.matrixRTCSession.room.client, + this.scope, + this.membershipsAndFocusMap$, + ); + } else { + logger.log( + "SFU remoteConnections$ use prev connection: ", + focusUrl, + ); + } + next.set(focusUrl, nextConnection); } - next.set(focus, nextConnection); } - } - for (const connection of stopped.values()) connection.stop(); - return next; - }), + for (const connection of stopped.values()) connection.stop(); + return next; + }, + ), ), ); @@ -713,11 +644,11 @@ export class CallViewModel extends ViewModel { (localConnection, remoteConnections) => { const remoteConnectionsParticipants = [ ...remoteConnections.values(), - ].map((c) => c.publishingParticipants$(this.memberships$)); + ].map((c) => c.publishingParticipants$); return combineLatest( [ - localConnection.publishingParticipants$(this.memberships$), + localConnection.publishingParticipants$, ...remoteConnectionsParticipants, ], (...ps) => ps.flat(1), @@ -1765,7 +1696,7 @@ export class CallViewModel extends ViewModel { .subscribe( (c) => void c - .startPublishing() + .start() // eslint-disable-next-line no-console .then(() => console.log("successfully started publishing")) // eslint-disable-next-line no-console @@ -1774,7 +1705,7 @@ export class CallViewModel extends ViewModel { this.connectionInstructions$ .pipe(this.scope.bind()) .subscribe(({ start, stop }) => { - for (const connection of start) void connection.startSubscribing(); + for (const connection of start) void connection.start(); for (const connection of stop) connection.stop(); }); combineLatest([this.localFocus, this.joined$]) @@ -1838,7 +1769,7 @@ export class CallViewModel extends ViewModel { } } -const membershipsFocusUrl = ( +export const membershipsFocusUrl = ( memberships: CallMembership[], matrixRTCSession: MatrixRTCSession, ): { livekit_service_url: string; membership: CallMembership }[] => { diff --git a/src/state/Connection.ts b/src/state/Connection.ts new file mode 100644 index 00000000..6e114603 --- /dev/null +++ b/src/state/Connection.ts @@ -0,0 +1,114 @@ +// TODO-MULTI-SFU Add all device syncing logic from useLivekit +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { connectedParticipantsObserver } from "@livekit/components-core"; +import { + type Room as LivekitRoom, + type RemoteParticipant, +} from "livekit-client"; +import { type MatrixClient } from "matrix-js-sdk"; +import { + type LivekitFocus, + type CallMembership, +} from "matrix-js-sdk/lib/matrixrtc"; +import { combineLatest, map, type Observable } from "rxjs"; + +import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; +import { type Behavior } from "./Behavior"; +import { type ObservableScope } from "./ObservableScope"; + +export class Connection { + protected readonly sfuConfig = getSFUConfigWithOpenID( + this.client, + this.focus.livekit_service_url, + this.livekitAlias, + ); + + public async start(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + protected stopped = false; + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participantsIncludingSubscribers$ = this.scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); + + public readonly publishingParticipants$: Observable = + this.scope.behavior( + combineLatest([ + connectedParticipantsObserver(this.livekitRoom), + this.membershipsFocusMap$, + ]).pipe( + map(([participants, membershipsFocusMap]) => + membershipsFocusMap + // Find all members that claim to publish on this connection + .flatMap(({ membership, focus }) => + focus.livekit_service_url === this.focus.livekit_service_url + ? [membership] + : [], + ) + // Find all associated publishing livekit participant objects + .flatMap(({ sender, deviceId }) => { + const participant = participants.find( + (p) => p.identity === `${sender}:${deviceId}`, + ); + return participant ? [participant] : []; + }), + ), + ), + [], + ); + + public constructor( + protected readonly livekitRoom: LivekitRoom, + protected readonly focus: LivekitFocus, + protected readonly livekitAlias: string, + protected readonly client: MatrixClient, + protected readonly scope: ObservableScope, + protected readonly membershipsFocusMap$: Behavior< + { membership: CallMembership; focus: LivekitFocus }[] + >, + ) {} +} + +export class PublishConnection extends Connection { + public async start(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + + if (!this.stopped) { + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio: true, + video: true, + }); + for (const track of tracks) { + await this.livekitRoom.localParticipant.publishTrack(track); + } + } + } + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participantsIncludingSubscribers$ = this.scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); +}