diff --git a/locales/en/app.json b/locales/en/app.json index 11267439..104af750 100644 --- a/locales/en/app.json +++ b/locales/en/app.json @@ -72,12 +72,21 @@ "livekit_server_info": "LiveKit Server Info", "livekit_sfu": "LiveKit SFU: {{url}}", "matrix_id": "Matrix ID: {{id}}", - "multi_sfu": "Multi-SFU media transport", - "mute_all_audio": "Mute all audio (participants, reactions, join sounds)", - "prefer_sticky_events": { - "description": "Improves reliability of calls (requires homeserver support)", - "label": "Prefer sticky events" + "matrixRTCMode": { + "Comptibility": { + "label": "Compatibility: state events & multi SFU" + "description": "Compatible with homeservers that do not support sticky events (but all other EC clients are v0.17.0 or later)", + }, + "Legacy": { + "label": "Legacy: state events & oldest membership SFU" + "description": "Compatible with old versions of EC that do not support multi SFU", + }, + "Matrix_2_0": { + "label": "Matrix 2.0: sticky events & multi SFU" + "description": "Compatible only with homservers supporting sticky events and all EC clients v0.17.0 or later", + } }, + "mute_all_audio": "Mute all audio (participants, reactions, join sounds)", "show_connection_stats": "Show connection statistics", "url_params": "URL parameters" }, diff --git a/src/rtcSessionHelpers.ts b/src/rtcSessionHelpers.ts index 74023c22..a53418f7 100644 --- a/src/rtcSessionHelpers.ts +++ b/src/rtcSessionHelpers.ts @@ -21,6 +21,7 @@ import { ElementWidgetActions, widget } from "./widget"; import { MatrixRTCTransportMissingError } from "./utils/errors"; import { getUrlParams } from "./UrlParams"; import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; +import { MatrixRTCMode } from "./settings/settings.ts"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; @@ -98,9 +99,7 @@ export async function makeTransport( export interface EnterRTCSessionOptions { encryptMedia: boolean; - /** EXPERIMENTAL: If true, will use the multi-sfu codepath where each member connects to its SFU instead of everyone connecting to an elected on. */ - useMultiSfu: boolean; - preferStickyEvents: boolean; + matrixRTCMode: MatrixRTCMode; } /** @@ -112,7 +111,7 @@ export interface EnterRTCSessionOptions { export async function enterRTCSession( rtcSession: MatrixRTCSession, transport: LivekitTransport, - { encryptMedia, useMultiSfu, preferStickyEvents }: EnterRTCSessionOptions, + { encryptMedia, matrixRTCMode }: EnterRTCSessionOptions, ): Promise { PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); @@ -125,10 +124,11 @@ export async function enterRTCSession( const useDeviceSessionMemberEvents = features?.feature_use_device_session_member_events; const { sendNotificationType: notificationType, callIntent } = getUrlParams(); + const multiSFU = matrixRTCMode !== MatrixRTCMode.Legacy; // Multi-sfu does not need a preferred foci list. just the focus that is actually used. rtcSession.joinRoomSession( - useMultiSfu ? [] : [transport], - useMultiSfu ? transport : undefined, + multiSFU ? [] : [transport], + multiSFU ? transport : undefined, { notificationType, callIntent, @@ -147,7 +147,7 @@ export async function enterRTCSession( membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, useExperimentalToDeviceTransport: true, - unstableSendStickyEvents: preferStickyEvents, + unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0, }, ); if (widget) { diff --git a/src/settings/DeveloperSettingsTab.tsx b/src/settings/DeveloperSettingsTab.tsx index 08c22557..e29e9c15 100644 --- a/src/settings/DeveloperSettingsTab.tsx +++ b/src/settings/DeveloperSettingsTab.tsx @@ -29,7 +29,8 @@ import { multiSfu as multiSfuSetting, muteAllAudio as muteAllAudioSetting, alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting, - preferStickyEvents as preferStickyEventsSetting, + matrixRTCMode as matrixRTCModeSetting, + MatrixRTCMode, } from "./settings"; import type { Room as LivekitRoom } from "livekit-client"; import styles from "./DeveloperSettingsTab.module.css"; @@ -59,9 +60,7 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { }); }, [client]); - const [preferStickyEvents, setPreferStickyEvents] = useSetting( - preferStickyEventsSetting, - ); + const [matrixRTCMode, setMatrixRTCMode] = useSetting(matrixRTCModeSetting); const [showConnectionStats, setShowConnectionStats] = useSetting( showConnectionStatsSetting, @@ -71,8 +70,6 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { alwaysShowIphoneEarpieceSetting, ); - const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting); - const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting); const urlParams = useUrlParams(); @@ -148,17 +145,47 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { ): void => { - setPreferStickyEvents(event.target.checked); + (event: ChangeEvent) => { + if (event.target.checked) setMatrixRTCMode(MatrixRTCMode.Legacy); }, - [setPreferStickyEvents], + [setMatrixRTCMode], + )} + /> + ) => { + if (event.target.checked) + setMatrixRTCMode(MatrixRTCMode.Compatibil); + }, + [setMatrixRTCMode], + )} + /> + ) => { + if (event.target.checked) + setMatrixRTCMode(MatrixRTCMode.Matrix_2_0); + }, + [setMatrixRTCMode], )} /> @@ -176,22 +203,6 @@ export const DeveloperSettingsTab: FC = ({ client, livekitRooms }) => { )} /> - - ): void => { - setMultiSfu(event.target.checked); - }, - [setMultiSfu], - )} - /> - ( false, ); -export const preferStickyEvents = new Setting( - "prefer-sticky-events", - false, -); - export const audioInput = new Setting( "audio-input", undefined, @@ -120,8 +115,6 @@ export const soundEffectVolume = new Setting( 0.5, ); -export const multiSfu = new Setting("multi-sfu", false); - export const muteAllAudio = new Setting("mute-all-audio", false); export const alwaysShowSelf = new Setting("always-show-self", true); @@ -130,3 +123,14 @@ export const alwaysShowIphoneEarpiece = new Setting( "always-show-iphone-earpiece", false, ); + +export enum MatrixRTCMode { + Legacy = "legacy", + Compatibil = "compatibil", + Matrix_2_0 = "matrix_2_0", +} + +export const matrixRTCMode = new Setting( + "matrix-rtc-mode", + MatrixRTCMode.Legacy, +); diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 436255eb..7396a515 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -10,22 +10,16 @@ import { ConnectionState, type E2EEOptions, ExternalE2EEKeyProvider, - type LocalParticipant, - RemoteParticipant, type Room as LivekitRoom, type RoomOptions, } from "livekit-client"; import E2EEWorker from "livekit-client/e2ee-worker?worker"; import { - ClientEvent, type EventTimelineSetHandlerMap, EventType, type Room as MatrixRoom, RoomEvent, - type RoomMember, - SyncState, } from "matrix-js-sdk"; -import { deepCompare } from "matrix-js-sdk/lib/utils"; import { BehaviorSubject, combineLatest, @@ -62,14 +56,9 @@ import { } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { - type CallMembership, - isLivekitTransport, - type LivekitTransport, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, - MembershipManagerEvent, - Status, } from "matrix-js-sdk/lib/matrixrtc"; import { type IWidgetApiRequest } from "matrix-widget-api"; @@ -80,17 +69,12 @@ import { ScreenShareViewModel, type UserMediaViewModel, } from "./MediaViewModel"; -import { - accumulate, - and$, - generateKeyed$, - pauseWhen, -} from "../utils/observable"; +import { accumulate, generateKeyed$, pauseWhen } from "../utils/observable"; import { duplicateTiles, - multiSfu, + MatrixRTCMode, + matrixRTCMode, playReactionsSound, - preferStickyEvents, showReactions, } from "../settings/settings"; import { isFirefox } from "../Platform"; @@ -109,20 +93,13 @@ import { import { shallowEquals } from "../utils/array"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior, constant } from "./Behavior"; -import { - enterRTCSession, - getLivekitAlias, - makeTransport, -} from "../rtcSessionHelpers"; +import { enterRTCSession } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { type Connection } from "./remoteMembers/Connection.ts"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; -import { PublishConnection } from "./localMember/Publisher.ts"; -import { type Async, async$, mapAsync, ready } from "./Async"; import { sharingScreen$, UserMedia } from "./UserMedia.ts"; import { ScreenShare } from "./ScreenShare.ts"; import { @@ -134,12 +111,14 @@ import { type SpotlightLandscapeLayoutMedia, type SpotlightPortraitLayoutMedia, } from "./layout-types.ts"; -import { type ElementCallError, UnknownCallError } from "../utils/errors.ts"; +import { type ElementCallError } from "../utils/errors.ts"; import { ObservableScope } from "./ObservableScope.ts"; -import { memberDisplaynames$ } from "./remoteMembers/displayname.ts"; import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts"; import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts"; -import { ownMembership$ } from "./localMember/LocalMembership.ts"; +import { + localMembership$, + LocalMemberState, +} from "./localMember/LocalMembership.ts"; import { localTransport$ as computeLocalTransport$ } from "./localMember/LocalTransport.ts"; import { sessionBehaviors$ } from "./SessionBehaviors.ts"; import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts"; @@ -202,23 +181,20 @@ export class CallViewModel { private readonly userId = this.matrixRoom.client.getUserId()!; private readonly deviceId = this.matrixRoom.client.getDeviceId()!; - private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); private readonly livekitE2EEKeyProvider = getE2eeKeyProvider( this.options.encryptionSystem, this.matrixRTCSession, ); - private readonly e2eeLivekitOptions = (): E2EEOptions | undefined => - this.livekitE2EEKeyProvider - ? { - keyProvider: this.livekitE2EEKeyProvider, - worker: new E2EEWorker(), - } - : undefined; - private readonly _configError$ = new BehaviorSubject( - null, - ); + private readonly e2eeLivekitOptions: E2EEOptions | undefined = this + .livekitE2EEKeyProvider + ? { + keyProvider: this.livekitE2EEKeyProvider, + worker: new E2EEWorker(), + } + : undefined; + private sessionBehaviors = sessionBehaviors$( this.scope, this.matrixRTCSession, @@ -230,14 +206,16 @@ export class CallViewModel { memberships$: this.memberships$, client: this.matrixRoom.client, roomId: this.matrixRoom.roomId, - useOldestMember$: multiSfu.value$, + useOldestMember$: this.scope.behavior( + matrixRTCMode.value$.pipe(map((v) => v === MatrixRTCMode.Legacy)), + ), }); private connectionFactory = new ECConnectionFactory( this.matrixRoom.client, this.mediaDevices, this.trackProcessorState$, - this.e2eeLivekitOptions(), + this.e2eeLivekitOptions, getUrlParams().controlledAudioDevices, ); @@ -252,7 +230,6 @@ export class CallViewModel { this.scope, this.connectionFactory, this.allTransports$, - logger, ); private matrixLivekitMerger = new MatrixLivekitMerger( @@ -263,31 +240,36 @@ export class CallViewModel { this.userId, this.deviceId, ); + private matrixLivekitItems$ = this.matrixLivekitMerger.matrixLivekitItems$; - private localMembership = this.localMembership$({ + private localMembership = localMembership$({ scope: this.scope, muteStates: this.muteStates, - multiSfu: this.multiSfu, mediaDevices: this.mediaDevices, - trackProcessorState$: this.trackProcessorState$, + connectionManager: this.connectionManager, + matrixRTCSession: this.matrixRTCSession, + matrixRoom: this.matrixRoom, + localTransport$: this.localTransport$, e2eeLivekitOptions: this.e2eeLivekitOptions, + trackProcessorState$: this.trackProcessorState$, + widget, }); - private matrixLivekitItems$ = this.matrixLivekitMerger.matrixLivekitItems$; /** * If there is a configuration error with the call (e.g. misconfigured E2EE). * This is a fatal error that prevents the call from being created/joined. * Should render a blocking error screen. */ public get configError$(): Behavior { - return this._configError$; + return this.localMembership.configError$; } - private readonly join$ = new Subject(); - - // DISCUSS BAD ? - public join(): void { - this.join$.next(); + public join(): LocalMemberState { + return this.localMembership.requestConnect({ + encryptMedia: this.e2eeLivekitOptions !== undefined, + // TODO. This might need to get called again on each cahnge of matrixRTCMode... + matrixRTCMode: matrixRTCMode.getValue(), + }); } // CODESMELL? @@ -304,62 +286,7 @@ export class CallViewModel { * than whether all connections are truly up and running. */ // DISCUSS ? lets think why we need joined and how to do it better - private readonly joined$ = this.scope.behavior( - this.join$.pipe( - map(() => true), - // Using takeUntil with the repeat operator is perfectly valid. - // eslint-disable-next-line rxjs/no-unsafe-takeuntil - takeUntil(this.leaveHoisted$), - endWith(false), - repeat(), - startWith(false), - ), - ); - - // /** - // * The transport that we would personally prefer to publish on (if not for the - // * transport preferences of others, perhaps). - // */ - // // DISCUSS move to ownMembership - // private readonly preferredTransport$ = this.scope.behavior( - // async$(makeTransport(this.matrixRTCSession)), - // ); - - // /** - // * The transport over which we should be actively publishing our media. - // * null when not joined. - // */ - // // DISCUSSION ownMembershipManager - // private readonly localTransport$: Behavior | null> = - // this.scope.behavior( - // this.transports$.pipe( - // map((transports) => transports?.local ?? null), - // distinctUntilChanged | null>(deepCompare), - // ), - // ); - - // // DISCUSSION move to ConnectionManager - // public readonly livekitConnectionState$ = - // // TODO: This options.connectionState$ behavior is a small hack inserted - // // here to facilitate testing. This would likely be better served by - // // breaking CallViewModel down into more naturally testable components. - // this.options.connectionState$ ?? - // this.scope.behavior( - // this.localConnection$.pipe( - // switchMap((c) => - // c?.state === "ready" - // ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? - // c.value.state$.pipe( - // switchMap((s) => { - // if (s.state === "ConnectedToLkRoom") - // return s.connectionState$; - // return of(ConnectionState.Disconnected); - // }), - // ) - // : of(ConnectionState.Disconnected), - // ), - // ), - // ); + private readonly joined$ = this.localMembership.connected$; /** * Whether various media/event sources should pretend to be disconnected from @@ -370,9 +297,14 @@ export class CallViewModel { // down, for example, and we want to avoid making people worry that the app is // in a split-brained state. // DISCUSSION own membership manager ALSO this probably can be simplifis - private readonly pretendToBeDisconnected$ = this.reconnecting$; + private readonly pretendToBeDisconnected$ = + this.localMembership.reconnecting$; - public readonly audioParticipants$; // now will be created based on the connectionmanager + public readonly audioParticipants$ = this.scope.behavior( + this.matrixLivekitItems$.pipe( + map((items) => items.map((item) => item.participant)), + ), + ); public readonly handsRaised$ = this.scope.behavior( this.handsRaisedSubject$.pipe(pauseWhen(this.pretendToBeDisconnected$)), @@ -392,15 +324,6 @@ export class CallViewModel { ), ); - // Now will be added to the matricLivekitMerger - // memberDisplaynames$ = memberDisplaynames$( - // this.matrixRoom, - // this.memberships$, - // this.scope, - // this.userId, - // this.deviceId, - // ); - /** * List of MediaItems that we want to have tiles for. */ @@ -1352,6 +1275,7 @@ export class CallViewModel { /** * Whether we are sharing our screen. */ + // TODO move to LocalMembership public readonly sharingScreen$ = this.scope.behavior( from(this.localConnection$).pipe( switchMap((c) => @@ -1366,6 +1290,7 @@ export class CallViewModel { * Callback for toggling screen sharing. If null, screen sharing is not * available. */ + // TODO move to LocalMembership public readonly toggleScreenSharing = "getDisplayMedia" in (navigator.mediaDevices ?? {}) && !this.urlParams.hideScreensharing @@ -1408,101 +1333,6 @@ export class CallViewModel { >, private readonly trackProcessorState$: Behavior, ) { - // Start and stop session membership as needed - this.scope.reconcile(this.advertisedTransport$, async (advertised) => { - if (advertised !== null) { - try { - this._configError$.next(null); - await enterRTCSession(this.matrixRTCSession, advertised.transport, { - encryptMedia: this.options.encryptionSystem.kind !== E2eeType.NONE, - useMultiSfu: advertised.multiSfu, - preferStickyEvents: advertised.preferStickyEvents, - }); - } catch (e) { - logger.error("Error entering RTC session", e); - } - - // Update our member event when our mute state changes. - const intentScope = new ObservableScope(); - intentScope.reconcile( - this.muteStates.video.enabled$, - async (videoEnabled) => - this.matrixRTCSession.updateCallIntent( - videoEnabled ? "video" : "audio", - ), - ); - - return async (): Promise => { - intentScope.end(); - // Only sends Matrix leave event. The LiveKit session will disconnect - // as soon as either the stopConnection$ handler above gets to it or - // the view model is destroyed. - try { - await this.matrixRTCSession.leaveRoomSession(); - } catch (e) { - logger.error("Error leaving RTC session", e); - } - try { - await widget?.api.transport.send( - ElementWidgetActions.HangupCall, - {}, - ); - } catch (e) { - logger.error("Failed to send hangup action", e); - } - }; - } - }); - - // Pause upstream of all local media tracks when we're disconnected from - // MatrixRTC, because it can be an unpleasant surprise for the app to say - // 'reconnecting' and yet still be transmitting your media to others. - // We use matrixConnected$ rather than reconnecting$ because we want to - // pause tracks during the initial joining sequence too until we're sure - // that our own media is displayed on screen. - combineLatest([this.localConnection$, this.matrixConnected$]) - .pipe(this.scope.bind()) - .subscribe(([connection, connected]) => { - if (connection?.state !== "ready") return; - const publications = - connection.value.livekitRoom.localParticipant.trackPublications.values(); - if (connected) { - for (const p of publications) { - if (p.track?.isUpstreamPaused === true) { - const kind = p.track.kind; - logger.log( - `Resuming ${kind} track (MatrixRTC connection present)`, - ); - p.track - .resumeUpstream() - .catch((e) => - logger.error( - `Failed to resume ${kind} track after MatrixRTC reconnection`, - e, - ), - ); - } - } - } else { - for (const p of publications) { - if (p.track?.isUpstreamPaused === false) { - const kind = p.track.kind; - logger.log( - `Pausing ${kind} track (uncertain MatrixRTC connection)`, - ); - p.track - .pauseUpstream() - .catch((e) => - logger.error( - `Failed to pause ${kind} track after entering uncertain MatrixRTC connection`, - e, - ), - ); - } - } - } - }); - // Join automatically this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? } diff --git a/src/state/localMember/LocalMembership.ts b/src/state/localMember/LocalMembership.ts index 7448c2ee..0bd5fbb1 100644 --- a/src/state/localMember/LocalMembership.ts +++ b/src/state/localMember/LocalMembership.ts @@ -12,12 +12,7 @@ import { MembershipManagerEvent, Status, } from "matrix-js-sdk/lib/matrixrtc"; -import { - ClientEvent, - type MatrixClient, - SyncState, - type Room as MatrixRoom, -} from "matrix-js-sdk"; +import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk"; import { BehaviorSubject, combineLatest, @@ -25,6 +20,7 @@ import { map, type Observable, of, + scan, startWith, switchMap, tap, @@ -33,7 +29,7 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../Behavior"; import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; -import { type ObservableScope } from "../ObservableScope"; +import { ObservableScope } from "../ObservableScope"; import { Publisher } from "./Publisher"; import { type MuteStates } from "../MuteStates"; import { type ProcessorState } from "../../livekit/TrackProcessorContext"; @@ -44,31 +40,10 @@ import { enterRTCSession, type EnterRTCSessionOptions, } from "../../rtcSessionHelpers"; +import { ElementCallError } from "../../utils/errors"; +import { Widget } from "matrix-widget-api"; +import { ElementWidgetActions, WidgetHelpers } from "../../widget"; -/* - * - get well known - * - get oldest membership - * - get transport to use - * - get openId + jwt token - * - wait for createTrack() call - * - create tracks - * - wait for join() call - * - Publisher.publishTracks() - * - send join state/sticky event - */ -interface Props { - scope: ObservableScope; - mediaDevices: MediaDevices; - muteStates: MuteStates; - connectionManager: ConnectionManager; - matrixRTCSession: MatrixRTCSession; - matrixRoom: MatrixRoom; - localTransport$: Behavior; - client: MatrixClient; - roomId: string; - e2eeLivekitOptions: E2EEOptions | undefined; - trackerProcessorState$: Behavior; -} enum LivekitState { UNINITIALIZED = "uninitialized", CONNECTING = "connecting", @@ -95,10 +70,35 @@ type LocalMemberMatrixState = | { state: MatrixState.CONNECTING } | { state: MatrixState.DISCONNECTED }; -interface LocalMemberState { +export interface LocalMemberState { livekit$: BehaviorSubject; matrix$: BehaviorSubject; } + +/* + * - get well known + * - get oldest membership + * - get transport to use + * - get openId + jwt token + * - wait for createTrack() call + * - create tracks + * - wait for join() call + * - Publisher.publishTracks() + * - send join state/sticky event + */ +interface Props { + scope: ObservableScope; + mediaDevices: MediaDevices; + muteStates: MuteStates; + connectionManager: ConnectionManager; + matrixRTCSession: MatrixRTCSession; + matrixRoom: MatrixRoom; + localTransport$: Behavior; + e2eeLivekitOptions: E2EEOptions | undefined; + trackProcessorState$: Behavior; + widget: WidgetHelpers | null; +} + /** * This class is responsible for managing the own membership in a room. * We want @@ -120,7 +120,8 @@ export const localMembership$ = ({ localTransport$, matrixRoom, e2eeLivekitOptions, - trackerProcessorState$, + trackProcessorState$, + widget, }: Props): { // publisher: Publisher requestConnect: (options: EnterRTCSessionOptions) => LocalMemberState; @@ -129,6 +130,8 @@ export const localMembership$ = ({ state: LocalMemberState; // TODO this is probably superseeded by joinState$ homeserverConnected$: Behavior; connected$: Behavior; + reconnecting$: Behavior; + configError$: Behavior; } => { const state = { livekit$: new BehaviorSubject({ @@ -148,11 +151,12 @@ export const localMembership$ = ({ const connection$ = scope.behavior( combineLatest([connectionManager.connections$, localTransport$]).pipe( - map(([connections, transport]) => - connections.find((connection) => + map(([connections, transport]) => { + if (transport === undefined) return undefined; + return connections.find((connection) => areLivekitTransportsEqual(connection.transport, transport), - ), - ), + ); + }), ), ); /** @@ -214,7 +218,7 @@ export const localMembership$ = ({ mediaDevices, muteStates, e2eeLivekitOptions, - trackerProcessorState$, + trackProcessorState$, ) : null, ), @@ -242,31 +246,28 @@ export const localMembership$ = ({ // /** // * Whether we should tell the user that we're reconnecting to the call. // */ - // // DISCUSSION own membership manager - // const reconnecting$ = scope.behavior( - // connected$.pipe( - // // We are reconnecting if we previously had some successful initial - // // connection but are now disconnected - // scan( - // ({ connectedPreviously }, connectedNow) => ({ - // connectedPreviously: connectedPreviously || connectedNow, - // reconnecting: connectedPreviously && !connectedNow, - // }), - // { connectedPreviously: false, reconnecting: false }, - // ), - // map(({ reconnecting }) => reconnecting), - // ), - // ); + // DISCUSSION is there a better way to do this? + // sth that is more deriectly implied from the membership manager of the js sdk. (fromEvent(matrixRTCSession, Reconnecting)) ??? or similar + const reconnecting$ = scope.behavior( + connected$.pipe( + // We are reconnecting if we previously had some successful initial + // connection but are now disconnected + scan( + ({ connectedPreviously }, connectedNow) => ({ + connectedPreviously: connectedPreviously || connectedNow, + reconnecting: connectedPreviously && !connectedNow, + }), + { connectedPreviously: false, reconnecting: false }, + ), + map(({ reconnecting }) => reconnecting), + ), + ); const startTracks = (): Behavior => { shouldStartTracks$.next(true); return tracks$; }; - // const joinState$ = new BehaviorSubject({ - // state: LivekitState.UNINITIALIZED, - // }); - const requestConnect = ( options: EnterRTCSessionOptions, ): LocalMemberState => { @@ -288,11 +289,15 @@ export const localMembership$ = ({ state.matrix$.next({ state: MatrixState.CONNECTING }); localTransport$.pipe( tap((transport) => { - enterRTCSession(matrixRTCSession, transport, options).catch( - (error) => { - logger.error(error); - }, - ); + if (transport !== undefined) { + enterRTCSession(matrixRTCSession, transport, options).catch( + (error) => { + logger.error(error); + }, + ); + } else { + logger.info("Waiting for transport to enter rtc session"); + } }), ); } @@ -317,6 +322,93 @@ export const localMembership$ = ({ return state.livekit$; }; + // Pause upstream of all local media tracks when we're disconnected from + // MatrixRTC, because it can be an unpleasant surprise for the app to say + // 'reconnecting' and yet still be transmitting your media to others. + // We use matrixConnected$ rather than reconnecting$ because we want to + // pause tracks during the initial joining sequence too until we're sure + // that our own media is displayed on screen. + combineLatest([connection$, homeserverConnected$]) + .pipe(scope.bind()) + .subscribe(([connection, connected]) => { + if (connection?.state$.value.state !== "ConnectedToLkRoom") return; + const publications = + connection.livekitRoom.localParticipant.trackPublications.values(); + if (connected) { + for (const p of publications) { + if (p.track?.isUpstreamPaused === true) { + const kind = p.track.kind; + logger.log(`Resuming ${kind} track (MatrixRTC connection present)`); + p.track + .resumeUpstream() + .catch((e) => + logger.error( + `Failed to resume ${kind} track after MatrixRTC reconnection`, + e, + ), + ); + } + } + } else { + for (const p of publications) { + if (p.track?.isUpstreamPaused === false) { + const kind = p.track.kind; + logger.log( + `Pausing ${kind} track (uncertain MatrixRTC connection)`, + ); + p.track + .pauseUpstream() + .catch((e) => + logger.error( + `Failed to pause ${kind} track after entering uncertain MatrixRTC connection`, + e, + ), + ); + } + } + } + }); + + const configError$ = new BehaviorSubject(null); + // TODO I do not fully understand what this does. + // Is it needed? + // Is this at the right place? + // Can this be simplified? + // Start and stop session membership as needed + scope.reconcile(localTransport$, async (advertised) => { + if (advertised !== null && advertised !== undefined) { + try { + configError$.next(null); + await enterRTCSession(matrixRTCSession, advertised, options); + } catch (e) { + logger.error("Error entering RTC session", e); + } + + // Update our member event when our mute state changes. + const intentScope = new ObservableScope(); + intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) => + matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"), + ); + + return async (): Promise => { + intentScope.end(); + // Only sends Matrix leave event. The LiveKit session will disconnect + // as soon as either the stopConnection$ handler above gets to it or + // the view model is destroyed. + try { + await matrixRTCSession.leaveRoomSession(); + } catch (e) { + logger.error("Error leaving RTC session", e); + } + try { + await widget?.api.transport.send(ElementWidgetActions.HangupCall, {}); + } catch (e) { + logger.error("Failed to send hangup action", e); + } + }; + } + }); + return { startTracks, requestConnect, @@ -324,5 +416,7 @@ export const localMembership$ = ({ state, homeserverConnected$, connected$, + reconnecting$, + configError$, }; }; diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index e333173e..37c616f8 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -14,7 +14,7 @@ import { type ParticipantId, } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; -import { type Logger } from "matrix-js-sdk/lib/logger"; +import { logger, type Logger } from "matrix-js-sdk/lib/logger"; import { type Participant as LivekitParticipant } from "livekit-client"; import { type Behavior } from "../Behavior"; @@ -106,8 +106,8 @@ export class ConnectionManager { private readonly scope: ObservableScope, private readonly connectionFactory: ConnectionFactory, private readonly inputTransports$: Behavior, - logger: Logger, ) { + // TODO logger: only construct one logger from the client and make it compatible via a EC specific singleton. this.logger = logger.getChild("ConnectionManager"); scope.onEnd(() => this.running$.next(false)); }