diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index ef4be238..dd1190b7 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -373,6 +373,7 @@ export class CallViewModel { * List of MediaItems that we want to have tiles for. */ // TODO KEEP THIS!! and adapt it to what our membershipManger returns + // TODO this also needs the local participant to be added. private readonly mediaItems$ = this.scope.behavior( generateKeyed$< [typeof this.matrixLivekitMembers$.value, number], diff --git a/src/state/CallViewModel/localMember/LocalMembership.ts b/src/state/CallViewModel/localMember/LocalMembership.ts index 8f7d8b54..1773eca1 100644 --- a/src/state/CallViewModel/localMember/LocalMembership.ts +++ b/src/state/CallViewModel/localMember/LocalMembership.ts @@ -98,7 +98,7 @@ interface Props { connectionManager: IConnectionManager; matrixRTCSession: MatrixRTCSession; matrixRoom: MatrixRoom; - localTransport$: Behavior; + localTransport$: Behavior; e2eeLivekitOptions: E2EEOptions | undefined; trackProcessorState$: Behavior; widget: WidgetHelpers | null; @@ -162,7 +162,11 @@ export const createLocalMembership$ = ({ // This should be used in a combineLatest with publisher$ to connect. // to make it possible to call startTracks before the preferredTransport$ has resolved. - const shouldStartTracks$ = new BehaviorSubject(false); + const trackStartRequested$ = new BehaviorSubject(false); + + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const connectRequested$ = new BehaviorSubject(false); // This should be used in a combineLatest with publisher$ to connect. const tracks$ = new BehaviorSubject([]); @@ -230,26 +234,24 @@ export const createLocalMembership$ = ({ ), ); - const publisher$ = scope.behavior( - connection$.pipe( - map((connection) => - connection - ? new Publisher( - scope, - connection, - mediaDevices, - muteStates, - e2eeLivekitOptions, - trackProcessorState$, - ) - : null, - ), - ), - ); + const publisher$ = new BehaviorSubject(null); + connection$.subscribe((connection) => { + if (connection !== null && publisher$.value === null) { + publisher$.next( + new Publisher( + scope, + connection, + mediaDevices, + muteStates, + e2eeLivekitOptions, + trackProcessorState$, + ), + ); + } + }); - combineLatest( - [publisher$, shouldStartTracks$], - (publisher, shouldStartTracks) => { + combineLatest([publisher$, trackStartRequested$]).subscribe( + ([publisher, shouldStartTracks]) => { if (publisher && shouldStartTracks) { publisher .createAndSetupTracks() @@ -286,41 +288,51 @@ export const createLocalMembership$ = ({ ); const startTracks = (): Behavior => { - shouldStartTracks$.next(true); + trackStartRequested$.next(true); return tracks$; }; - const requestConnect = (): LocalMemberConnectionState => { - if (state.livekit$.value.state === LivekitState.Uninitialized) { - startTracks(); - state.livekit$.next({ state: LivekitState.Connecting }); - combineLatest([publisher$, tracks$], (publisher, tracks) => { - publisher - ?.startPublishing() - .then(() => { - state.livekit$.next({ state: LivekitState.Connected }); - }) - .catch((error) => { - state.livekit$.next({ state: LivekitState.Error, error }); - }); + combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => { + if ( + tracks.length === 0 || + // change this to !== Publishing + state.livekit$.value.state !== LivekitState.Uninitialized + ) { + return; + } + state.livekit$.next({ state: LivekitState.Connecting }); + publisher + ?.startPublishing() + .then(() => { + state.livekit$.next({ state: LivekitState.Connected }); + }) + .catch((error) => { + state.livekit$.next({ state: LivekitState.Error, error }); }); - } - if (state.matrix$.value.state === MatrixState.Disconnected) { + }); + combineLatest([localTransport$, connectRequested$]).subscribe( + ([transport, connectRequested]) => { + if ( + transport === null || + !connectRequested || + state.matrix$.value.state !== MatrixState.Disconnected + ) { + logger.info("Waiting for transport to enter rtc session"); + return; + } state.matrix$.next({ state: MatrixState.Connecting }); - localTransport$.pipe( - tap((transport) => { - if (transport !== undefined) { - enterRTCSession(matrixRTCSession, transport, options.value).catch( - (error) => { - logger.error(error); - }, - ); - } else { - logger.info("Waiting for transport to enter rtc session"); - } - }), + enterRTCSession(matrixRTCSession, transport, options.value).catch( + (error) => { + logger.error(error); + }, ); - } + }, + ); + + const requestConnect = (): LocalMemberConnectionState => { + trackStartRequested$.next(true); + connectRequested$.next(true); + return state; }; @@ -453,8 +465,7 @@ export const createLocalMembership$ = ({ .pipe( // I dont see why we need this. isnt the check later on superseeding it? takeWhile( - (c) => - c !== undefined && c.state$.value.state !== "FailedToStart", + (c) => c !== null && c.state$.value.state !== "FailedToStart", ), switchMap((c) => c?.state$.value.state === "ConnectedToLkRoom" ? of(c) : NEVER, diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index a96962c9..b1fd71e9 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -13,20 +13,21 @@ import { isLivekitTransportConfig, } from "matrix-js-sdk/lib/matrixrtc"; import { type MatrixClient } from "matrix-js-sdk"; -import { combineLatest, distinctUntilChanged, first, from } from "rxjs"; +import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs"; import { logger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { deepCompare } from "matrix-js-sdk/lib/utils"; import { type Behavior } from "../../Behavior.ts"; import { - type Epoch, + Epoch, mapEpoch, type ObservableScope, } from "../../ObservableScope.ts"; import { Config } from "../../../config/Config.ts"; import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts"; import { getSFUConfigWithOpenID } from "../../../livekit/openIDSFU.ts"; +import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; /* * - get well known @@ -60,15 +61,16 @@ export const createLocalTransport$ = ({ client, roomId, useOldestMember$, -}: Props): Behavior => { +}: Props): Behavior => { /** * The transport over which we should be actively publishing our media. * undefined when not joined. */ const oldestMemberTransport$ = scope.behavior( memberships$.pipe( - mapEpoch( - (memberships) => memberships[0]?.getTransport(memberships[0]) ?? null, + map( + (memberships) => + memberships.value[0]?.getTransport(memberships.value[0]) ?? null, ), first((t) => t != null && isLivekitTransport(t)), ), @@ -88,13 +90,18 @@ export const createLocalTransport$ = ({ * The transport we should advertise in our MatrixRTC membership. */ const advertisedTransport$ = scope.behavior( - combineLatest( - [useOldestMember$, oldestMemberTransport$, preferredTransport$], - (useOldestMember, oldestMemberTransport, preferredTransport) => + combineLatest([ + useOldestMember$, + oldestMemberTransport$, + preferredTransport$, + ]).pipe( + map(([useOldestMember, oldestMemberTransport, preferredTransport]) => useOldestMember ? (oldestMemberTransport ?? preferredTransport) : preferredTransport, - ).pipe(distinctUntilChanged(deepCompare)), + ), + distinctUntilChanged(areLivekitTransportsEqual), + ), ); return advertisedTransport$; }; diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index ffdd2487..c10201bf 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -99,31 +99,36 @@ export class Publisher { // instead? This optimization would only be safe for a publish connection, // because we don't want to leak the user's intent to perhaps join a call to // remote servers before they actually commit to it. - const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((s) => { - if (s.state !== "FailedToStart") { - reject(new Error("Disconnected from LiveKit server")); - } else { - resolve(); - } - }); - try { - await promise; - } catch (e) { - throw e; - } finally { - sub.unsubscribe(); - } + // const { promise, resolve, reject } = Promise.withResolvers(); + // const sub = this.connection.state$.subscribe((s) => { + // if (s.state === "FailedToStart") { + // reject(new Error("Disconnected from LiveKit server")); + // } else if (s.state === "ConnectedToLkRoom") { + // resolve(); + // } + // }); + // try { + // await promise; + // } catch (e) { + // throw e; + // } finally { + // sub.unsubscribe(); + // } // TODO-MULTI-SFU: Prepublish a microphone track const audio = this.muteStates.audio.enabled$.value; const video = this.muteStates.video.enabled$.value; // createTracks throws if called with audio=false and video=false if (audio || video) { // TODO this can still throw errors? It will also prompt for permissions if not already granted - this.tracks = await lkRoom.localParticipant.createTracks({ - audio, - video, - }); + this.tracks = + (await lkRoom.localParticipant + .createTracks({ + audio, + video, + }) + .catch((error) => { + this.logger?.error("Failed to create tracks", error); + })) ?? []; } return this.tracks; } @@ -153,7 +158,9 @@ export class Publisher { for (const track of this.tracks) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. - await lkRoom.localParticipant.publishTrack(track); + await lkRoom.localParticipant.publishTrack(track).catch((error) => { + this.logger?.error("Failed to publish track", error); + }); // TODO: check if the connection is still active? and break the loop if not? } diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 05ffb4f9..729ed547 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -145,8 +145,8 @@ export function createMatrixLivekitMembers$({ // TODO add this to the JS-SDK export function areLivekitTransportsEqual( - t1?: LivekitTransport, - t2?: LivekitTransport, + t1: LivekitTransport | null, + t2: LivekitTransport | null, ): boolean { if (t1 && t2) return t1.livekit_service_url === t2.livekit_service_url; // In case we have different lk rooms in the same SFU (depends on the livekit authorization service) diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 0b79183e..b35f6112 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -220,7 +220,7 @@ abstract class BaseMediaViewModel { /** * The LiveKit video track for this media. */ - public readonly video$: Behavior; + public readonly video$: Behavior; /** * Whether there should be a warning that this media is unencrypted. */ @@ -235,12 +235,10 @@ abstract class BaseMediaViewModel { private observeTrackReference$( source: Track.Source, - ): Behavior { + ): Behavior { return this.scope.behavior( this.participant$.pipe( - switchMap((p) => - p === undefined ? of(undefined) : observeTrackReference$(p, source), - ), + switchMap((p) => (!p ? of(null) : observeTrackReference$(p, source))), ), ); } @@ -260,7 +258,7 @@ abstract class BaseMediaViewModel { // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // livekit. protected readonly participant$: Observable< - LocalParticipant | RemoteParticipant | undefined + LocalParticipant | RemoteParticipant | null >, encryptionSystem: EncryptionSystem, @@ -405,7 +403,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { scope: ObservableScope, id: string, member: RoomMember, - participant$: Observable, + participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, focusUrl: string, @@ -541,7 +539,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { scope: ObservableScope, id: string, member: RoomMember, - participant$: Behavior, + participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, focusURL: string, @@ -651,7 +649,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { scope: ObservableScope, id: string, member: RoomMember, - participant$: Observable, + participant$: Observable, encryptionSystem: EncryptionSystem, livekitRoom: LivekitRoom, focusUrl: string, diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 55de2061..9eec3967 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -82,7 +82,7 @@ export class UserMedia { this.scope, this.id, this.member, - this.participant$ as Behavior, + this.participant$ as Behavior, this.encryptionSystem, this.livekitRoom, this.focusURL, @@ -95,7 +95,7 @@ export class UserMedia { this.scope, this.id, this.member, - this.participant$ as Observable, + this.participant$ as Behavior, this.encryptionSystem, this.livekitRoom, this.focusURL, diff --git a/src/tile/GridTile.tsx b/src/tile/GridTile.tsx index 421cefda..1925eff6 100644 --- a/src/tile/GridTile.tsx +++ b/src/tile/GridTile.tsx @@ -144,7 +144,7 @@ const UserMediaTile: FC = ({ const tile = (