/* Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client"; import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest } from "rxjs"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; import { type Behavior } from "./Behavior"; import { type ObservableScope } from "./ObservableScope"; import { defaultLiveKitOptions } from "../livekit/options"; export interface ConnectionOpts { /** The focus server to connect to. */ focus: LivekitFocus; /** The Matrix client to use for OpenID and SFU config requests. */ client: OpenIDClientParts; /** The observable scope to use for this connection. */ scope: ObservableScope; /** An observable of the current RTC call memberships and their associated focus. */ membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>; /** Optional factory to create the Livekit room, mainly for testing purposes. */ livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; } export type FocusConnectionState = | { state: 'Initialized' } | { state: 'FetchingConfig', focus: LivekitFocus } | { state: 'ConnectingToLkRoom', focus: LivekitFocus } | { state: 'PublishingTracks', focus: LivekitFocus } | { state: 'FailedToStart', error: Error, focus: LivekitFocus } | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitFocus } | { state: 'Stopped', focus: LivekitFocus }; /** * A connection to a Matrix RTC LiveKit backend. * * Expose observables for participants and connection state. */ export class Connection { // Private Behavior private readonly _focusedConnectionState$ = new BehaviorSubject({ state: 'Initialized' }); /** * The current state of the connection to the focus server. */ public readonly focusedConnectionState$: Behavior; /** * Whether the connection has been stopped. * @see Connection.stop * */ protected stopped = false; /** * Starts the connection. * * This will: * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) * 2. Use this token to request the SFU config to the MatrixRtc authentication service. * 3. Connect to the configured LiveKit room. */ public async start(): Promise { this.stopped = false; try { this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.targetFocus }); // TODO could this be loaded earlier to save time? const { url, jwt } = await this.getSFUConfigWithOpenID(); // If we were stopped while fetching the config, don't proceed to connect if (this.stopped) return; this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.targetFocus }); await this.livekitRoom.connect(url, jwt); // If we were stopped while connecting, don't proceed to update state. if (this.stopped) return; this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.targetFocus, connectionState: this.livekitRoom.state }); } catch (error) { this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.targetFocus }); throw error; } } protected async getSFUConfigWithOpenID(): Promise { return await getSFUConfigWithOpenID( this.client, this.targetFocus.livekit_service_url, this.targetFocus.livekit_alias ) } /** * Stops the connection. * * This will disconnect from the LiveKit room. * If the connection is already stopped, this is a no-op. */ public async stop(): Promise { if (this.stopped) return; await this.livekitRoom.disconnect(); this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus }); this.stopped = true; } /** * An observable of the participants that are publishing on this connection. * This is derived from `participantsIncludingSubscribers$` and `membershipsFocusMap$`. * It filters the participants to only those that are associated with a membership that claims to publish on this connection. */ public readonly publishingParticipants$; /** * The focus server to connect to. */ protected readonly targetFocus: LivekitFocus; private readonly client: OpenIDClientParts; /** * Creates a new connection to a matrix RTC LiveKit backend. * * @param livekitRoom - LiveKit room instance to use. * @param opts - Connection options {@link ConnectionOpts}. * */ protected constructor( public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, ) { const { focus, client, scope, membershipsFocusMap$ } = opts; this.livekitRoom = livekitRoom this.targetFocus = focus; this.client = client; this.focusedConnectionState$ = scope.behavior( this._focusedConnectionState$, { state: 'Initialized' } ); const participantsIncludingSubscribers$ = scope.behavior( connectedParticipantsObserver(this.livekitRoom), [] ); this.publishingParticipants$ = scope.behavior( combineLatest( [participantsIncludingSubscribers$, membershipsFocusMap$], (participants, membershipsFocusMap) => membershipsFocusMap // Find all members that claim to publish on this connection .flatMap(({ membership, focus }) => focus.livekit_service_url === this.targetFocus.livekit_service_url ? [membership] : [] ) // Find all associated publishing livekit participant objects .flatMap((membership) => { const participant = participants.find( (p) => p.identity === `${membership.sender}:${membership.deviceId}` ); return participant ? [{ participant, membership }] : []; }) ), [] ); scope.behavior( connectionStateObserver(this.livekitRoom) ).subscribe((connectionState) => { const current = this._focusedConnectionState$.value; // Only update the state if we are already connected to the LiveKit room. if (current.state === 'ConnectedToLkRoom') { this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', connectionState, focus: current.focus }); } }); scope.onEnd(() => void this.stop()); } } /** * A remote connection to the Matrix RTC LiveKit backend. * * This connection is used for subscribing to remote participants. * It does not publish any local tracks. */ export class RemoteConnection extends Connection { /** * Creates a new remote connection to a matrix RTC LiveKit backend. * @param opts * @param sharedE2eeOption - The shared E2EE options to use for the connection. */ public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) { const factory = opts.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); const livekitRoom = factory({ ...defaultLiveKitOptions, e2ee: sharedE2eeOption }); super(livekitRoom, opts); } }