diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index af588e62..03463141 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details. */ import { - connectedParticipantsObserver, observeParticipantEvents, observeParticipantMedia, } from "@livekit/components-core"; @@ -25,7 +24,6 @@ import { type EventTimelineSetHandlerMap, EventType, RoomEvent, - type MatrixClient, RoomStateEvent, SyncState, type Room as MatrixRoom, @@ -117,7 +115,6 @@ import { shallowEquals } from "../utils/array"; import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; import { constant, type Behavior } from "./Behavior"; -import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { defaultLiveKitOptions } from "../livekit/options"; import { enterRTCSession, @@ -126,7 +123,8 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { ECConnectionState } from "../livekit/useECConnectionState"; +import { type ECConnectionState } from "../livekit/useECConnectionState"; +import { Connection, PublishConnection } from "./Connection"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -459,88 +457,6 @@ function getRoomMemberFromRtcMember( return { id, member }; } -// TODO-MULTI-SFU Add all device syncing logic from useLivekit -class Connection { - private readonly sfuConfig = getSFUConfigWithOpenID( - this.client, - this.serviceUrl, - this.livekitAlias, - ); - - public async startSubscribing(): Promise { - this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); - await this.livekitRoom.localParticipant.publishTrack(tracks[0]); - } - - public async startPublishing(): Promise { - this.stopped = false; - const { url, jwt } = await this.sfuConfig; - if (!this.stopped) await this.livekitRoom.connect(url, jwt); - - if (!this.stopped) { - const tracks = await this.livekitRoom.localParticipant.createTracks({ - audio: true, - video: true, - }); - for (const track of tracks) { - await this.livekitRoom.localParticipant.publishTrack(track); - } - } - } - - private stopped = false; - - public stop(): void { - void this.livekitRoom.disconnect(); - this.stopped = true; - } - - public readonly participantsIncludingSubscribers$ = this.scope.behavior( - connectedParticipantsObserver(this.livekitRoom), - [], - ); - - public readonly publishingParticipants$ = ( - memberships$: Behavior, - ): Observable => - this.scope.behavior( - combineLatest([ - connectedParticipantsObserver(this.livekitRoom), - memberships$, - ]).pipe( - map(([participants, memberships]) => { - const publishingMembers = membershipsFocusUrl( - memberships, - this.matrixRTCSession, - ) - .filter((f) => f.livekit_service_url === this.serviceUrl) - .map((f) => f.membership); - - const publishingP = publishingMembers - .map((m) => { - return participants.find((p) => { - return p.identity === `${m.sender}:${m.deviceId}`; - }); - }) - .filter((p): p is RemoteParticipant => !!p); - return publishingP; - }), - ), - [], - ); - - public constructor( - private readonly livekitRoom: LivekitRoom, - private readonly serviceUrl: string, - private readonly livekitAlias: string, - private readonly client: MatrixClient, - private readonly scope: ObservableScope, - private readonly matrixRTCSession: MatrixRTCSession, - ) {} -} - export class CallViewModel extends ViewModel { private readonly e2eeOptions = getE2eeOptions( this.options.encryptionSystem, @@ -558,7 +474,7 @@ export class CallViewModel extends ViewModel { private readonly localConnection = this.localFocus.then( (focus) => - new Connection( + new PublishConnection( this.localConnectionLivekitRoom, focus.livekit_service_url, this.livekitAlias, @@ -1881,7 +1797,7 @@ export class CallViewModel extends ViewModel { .subscribe( (c) => void c - .startPublishing() + .start() // eslint-disable-next-line no-console .then(() => console.log("successfully started publishing")) // eslint-disable-next-line no-console @@ -1890,7 +1806,7 @@ export class CallViewModel extends ViewModel { this.connectionInstructions$ .pipe(this.scope.bind()) .subscribe(({ start, stop }) => { - for (const connection of start) void connection.startSubscribing(); + for (const connection of start) void connection.start(); for (const connection of stop) connection.stop(); }); combineLatest([this.localFocus, this.joined$]) @@ -1954,7 +1870,7 @@ export class CallViewModel extends ViewModel { } } -const membershipsFocusUrl = ( +export const membershipsFocusUrl = ( memberships: CallMembership[], matrixRTCSession: MatrixRTCSession, ): { livekit_service_url: string; membership: CallMembership }[] => { diff --git a/src/state/Connection.ts b/src/state/Connection.ts new file mode 100644 index 00000000..ff5ebb64 --- /dev/null +++ b/src/state/Connection.ts @@ -0,0 +1,116 @@ +// TODO-MULTI-SFU Add all device syncing logic from useLivekit +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { connectedParticipantsObserver } from "@livekit/components-core"; +import { + type Room as LivekitRoom, + type RemoteParticipant, +} from "livekit-client"; +import { type MatrixClient } from "matrix-js-sdk"; +import { + type CallMembership, + type MatrixRTCSession, +} from "matrix-js-sdk/lib/matrixrtc"; +import { combineLatest, map, type Observable } from "rxjs"; + +import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; +import { type Behavior } from "./Behavior"; +import { membershipsFocusUrl } from "./CallViewModel"; +import { type ObservableScope } from "./ObservableScope"; + +export class Connection { + protected readonly sfuConfig = getSFUConfigWithOpenID( + this.client, + this.serviceUrl, + this.livekitAlias, + ); + + public async start(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + } + + protected stopped = false; + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participantsIncludingSubscribers$ = this.scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); + + public readonly publishingParticipants$ = ( + memberships$: Behavior, + ): Observable => + this.scope.behavior( + combineLatest([ + connectedParticipantsObserver(this.livekitRoom), + memberships$, + ]).pipe( + map(([participants, memberships]) => { + const publishingMembers = membershipsFocusUrl( + memberships, + this.matrixRTCSession, + ) + .filter((f) => f.livekit_service_url === this.serviceUrl) + .map((f) => f.membership); + + const publishingP = publishingMembers + .map((m) => { + return participants.find((p) => { + return p.identity === `${m.sender}:${m.deviceId}`; + }); + }) + .filter((p): p is RemoteParticipant => !!p); + return publishingP; + }), + ), + [], + ); + + public constructor( + protected readonly livekitRoom: LivekitRoom, + protected readonly serviceUrl: string, + protected readonly livekitAlias: string, + protected readonly client: MatrixClient, + protected readonly scope: ObservableScope, + protected readonly matrixRTCSession: MatrixRTCSession, + ) {} +} + +export class PublishConnection extends Connection { + public async start(): Promise { + this.stopped = false; + const { url, jwt } = await this.sfuConfig; + if (!this.stopped) await this.livekitRoom.connect(url, jwt); + + if (!this.stopped) { + const tracks = await this.livekitRoom.localParticipant.createTracks({ + audio: true, + video: true, + }); + for (const track of tracks) { + await this.livekitRoom.localParticipant.publishTrack(track); + } + } + } + + public stop(): void { + void this.livekitRoom.disconnect(); + this.stopped = true; + } + + public readonly participantsIncludingSubscribers$ = this.scope.behavior( + connectedParticipantsObserver(this.livekitRoom), + [], + ); +}