diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index e04f4698..35ab658b 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -99,6 +99,7 @@ import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts import { createLocalMembership$, enterRTCSession, + TransportState, } from "./localMember/LocalMember.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { @@ -577,17 +578,6 @@ export function createCallViewModel$( ), ); - /** - * Whether various media/event sources should pretend to be disconnected from - * all network input, even if their connection still technically works. - */ - // We do this when the app is in the 'reconnecting' state, because it might be - // that the LiveKit connection is still functional while the homeserver is - // down, for example, and we want to avoid making people worry that the app is - // in a split-brained state. - // DISCUSSION own membership manager ALSO this probably can be simplifis - const reconnecting$ = localMembership.reconnecting$; - const audioParticipants$ = scope.behavior( matrixLivekitMembers$.pipe( switchMap((membersWithEpoch) => { @@ -635,7 +625,7 @@ export function createCallViewModel$( ); const handsRaised$ = scope.behavior( - handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), + handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)), ); const reactions$ = scope.behavior( @@ -648,7 +638,7 @@ export function createCallViewModel$( ]), ), ), - pauseWhen(reconnecting$), + pauseWhen(localMembership.reconnecting$), ), ); @@ -739,7 +729,7 @@ export function createCallViewModel$( livekitRoom$, focusUrl$, mediaDevices, - reconnecting$, + localMembership.reconnecting$, displayName$, matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), @@ -1422,6 +1412,37 @@ export function createCallViewModel$( // reassigned here to make it publicly accessible const toggleScreenSharing = localMembership.toggleScreenSharing; + const errors$ = scope.behavior<{ + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } | null>( + localMembership.localMemberState$.pipe( + map((value) => { + const returnObject: { + transportError?: ElementCallError; + matrixError?: ElementCallError; + connectionError?: ElementCallError; + publishError?: ElementCallError; + } = {}; + if (value instanceof ElementCallError) return { transportError: value }; + if (value === TransportState.Waiting) return null; + if (value.matrix instanceof ElementCallError) + returnObject.matrixError = value.matrix; + if (value.media instanceof ElementCallError) + returnObject.publishError = value.media; + else if ( + typeof value.media === "object" && + value.media.connection instanceof ElementCallError + ) + returnObject.connectionError = value.media.connection; + return returnObject; + }), + ), + null, + ); + return { autoLeave$: autoLeave$, callPickupState$: callPickupState$, @@ -1438,8 +1459,16 @@ export function createCallViewModel$( unhoverScreen: (): void => screenUnhover$.next(), fatalError$: scope.behavior( - localMembership.localMemberState$.pipe( - filter((v) => v instanceof ElementCallError), + errors$.pipe( + map((errors) => { + return ( + errors?.transportError ?? + errors?.matrixError ?? + errors?.connectionError ?? + null + ); + }), + filter((error) => error !== null), ), null, ), @@ -1472,7 +1501,7 @@ export function createCallViewModel$( showFooter$: showFooter$, earpieceMode$: earpieceMode$, audioOutputSwitcher$: audioOutputSwitcher$, - reconnecting$: reconnecting$, + reconnecting$: localMembership.reconnecting$, }; } diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index df42cba9..532d5d55 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -42,6 +42,7 @@ import { type Publisher } from "./Publisher.ts"; import { type MuteStates } from "../../MuteStates.ts"; import { ElementCallError, + FailToStartLivekitConnection, MembershipManagerError, UnknownCallError, } from "../../../utils/errors.ts"; @@ -56,6 +57,7 @@ import { type FailedToStartError, } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; +import { and$ } from "../../../utils/observable.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -86,13 +88,12 @@ export type LocalMemberMediaState = } | PublishState | ElementCallError; -export type LocalMemberMatrixState = Error | RTCSessionStatus; export type LocalMemberState = | ElementCallError | TransportState.Waiting | { media: LocalMemberMediaState; - matrix: LocalMemberMatrixState; + matrix: ElementCallError | RTCSessionStatus; }; /* @@ -220,10 +221,6 @@ export const createLocalMembership$ = ({ ), ); - const localConnectionState$ = localConnection$.pipe( - switchMap((connection) => (connection ? connection.state$ : of(null))), - ); - // MATRIX RELATED // This should be used in a combineLatest with publisher$ to connect. @@ -308,23 +305,27 @@ export const createLocalMembership$ = ({ try { await publisher?.startPublishing(); } catch (error) { - setMediaError(error as ElementCallError); + const message = + error instanceof Error ? error.message : String(error); + setPublishError(new FailToStartLivekitConnection(message)); } } else if (tracks.length !== 0 && !shouldJoinAndPublish) { try { await publisher?.stopPublishing(); } catch (error) { - setMediaError(new UnknownCallError(error as Error)); + setPublishError(new UnknownCallError(error as Error)); } } }, ); - const fatalMediaError$ = new BehaviorSubject(null); - const setMediaError = (e: ElementCallError): void => { - if (fatalMediaError$.value !== null) - logger.error("Multiple Media Errors:", e); - else fatalMediaError$.next(e); + // STATE COMPUTATION + + // These are non fatal since we can join a room and concume media even though publishing failed. + const publishError$ = new BehaviorSubject(null); + const setPublishError = (e: ElementCallError): void => { + if (publishError$.value !== null) logger.error("Multiple Media Errors:", e); + else publishError$.next(e); }; const fatalTransportError$ = new BehaviorSubject( @@ -336,6 +337,10 @@ export const createLocalMembership$ = ({ else fatalTransportError$.next(e); }; + const localConnectionState$ = localConnection$.pipe( + switchMap((connection) => (connection ? connection.state$ : of(null))), + ); + const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, @@ -392,22 +397,22 @@ export const createLocalMembership$ = ({ homeserverConnected.rtsSession$, fatalMatrixError$, fatalTransportError$, - fatalMediaError$, + publishError$, ]).pipe( map( ([ mediaState, rtcSessionStatus, - matrixError, - transportError, - mediaError, + fatalMatrixError, + fatalTransportError, + publishError, ]) => { - if (transportError !== null) return transportError; - // `mediaState` will be 'null' until the transport appears. + if (fatalTransportError !== null) return fatalTransportError; + // `mediaState` will be 'null' until the transport/connection appears. if (mediaState && rtcSessionStatus) return { - matrix: matrixError ?? rtcSessionStatus, - media: mediaError ?? mediaState, + matrix: fatalMatrixError ?? rtcSessionStatus, + media: publishError ?? mediaState, }; return TransportState.Waiting; }, @@ -415,6 +420,31 @@ export const createLocalMembership$ = ({ ), ); + /** + * Whether we are "fully" connected to the call. Accounts for both the + * connection to the MatrixRTC session and the LiveKit publish connection. + */ + const matrixAndLivekitConnected$ = scope.behavior( + and$( + homeserverConnected.combined$, + localConnectionState$.pipe( + map((state) => state === ConnectionState.LivekitConnected), + ), + ).pipe( + tap((v) => logger.debug("livekit+matrix: Connected state changed", v)), + ), + ); + + /** + * Whether we should tell the user that we're reconnecting to the call. + */ + const reconnecting$ = scope.behavior( + matrixAndLivekitConnected$.pipe( + pairwise(), + map(([prev, current]) => prev === true && current === false), + ), + ); + // inform the widget about the connect and disconnect intent from the user. scope .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [ @@ -576,15 +606,7 @@ export const createLocalMembership$ = ({ localMemberState$, tracks$, participant$, - reconnecting$: scope.behavior( - localMemberState$.pipe( - map((state) => { - if (typeof state === "object" && "matrix" in state) - return state.matrix === RTCSessionStatus.Reconnecting; - return false; - }), - ), - ), + reconnecting$, disconnected$: scope.behavior( homeserverConnected.rtsSession$.pipe( map((state) => state === RTCSessionStatus.Disconnected), diff --git a/src/state/CallViewModel/localMember/Publisher.ts b/src/state/CallViewModel/localMember/Publisher.ts index b32e7e99..df67f179 100644 --- a/src/state/CallViewModel/localMember/Publisher.ts +++ b/src/state/CallViewModel/localMember/Publisher.ts @@ -32,15 +32,8 @@ import { } from "../../../livekit/TrackProcessorContext.tsx"; import { getUrlParams } from "../../../UrlParams.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts"; -import { - ConnectionState, - type Connection, -} from "../remoteMembers/Connection.ts"; +import { type Connection } from "../remoteMembers/Connection.ts"; import { type ObservableScope } from "../../ObservableScope.ts"; -import { - ElementCallError, - FailToStartLivekitConnection, -} from "../../../utils/errors.ts"; /** * A wrapper for a Connection object. @@ -160,27 +153,29 @@ export class Publisher { public async startPublishing(): Promise { this.logger.debug("startPublishing called"); const lkRoom = this.connection.livekitRoom; - const { promise, resolve, reject } = Promise.withResolvers(); - const sub = this.connection.state$.subscribe((state) => { - if (state instanceof Error) { - const error = - state instanceof ElementCallError - ? state - : new FailToStartLivekitConnection(state.message); - reject(error); - } else if (state === ConnectionState.LivekitConnected) { - resolve(); - } else { - this.logger.info("waiting for connection: ", state); - } - }); - try { - await promise; - } catch (e) { - throw e; - } finally { - sub.unsubscribe(); - } + + // we do not need to do this since lk will wait in `localParticipant.publishTrack` + // const { promise, resolve, reject } = Promise.withResolvers(); + // const sub = this.connection.state$.subscribe((state) => { + // if (state instanceof Error) { + // const error = + // state instanceof ElementCallError + // ? state + // : new FailToStartLivekitConnection(state.message); + // reject(error); + // } else if (state === ConnectionState.LivekitConnected) { + // resolve(); + // } else { + // this.logger.info("waiting for connection: ", state); + // } + // }); + // try { + // await promise; + // } catch (e) { + // throw e; + // } finally { + // sub.unsubscribe(); + // } for (const track of this.tracks$.value) { this.logger.info("publish ", this.tracks$.value.length, "tracks"); @@ -188,9 +183,10 @@ export class Publisher { // with a timeout. await lkRoom.localParticipant.publishTrack(track).catch((error) => { this.logger.error("Failed to publish track", error); - throw new FailToStartLivekitConnection( - error instanceof Error ? error.message : error, - ); + // throw new FailToStartLivekitConnection( + // error instanceof Error ? error.message : error, + // ); + throw error; }); this.logger.info("published track ", track.kind, track.id); diff --git a/src/state/CallViewModel/remoteMembers/Connection.ts b/src/state/CallViewModel/remoteMembers/Connection.ts index 29ad7a8c..2fd9eaa8 100644 --- a/src/state/CallViewModel/remoteMembers/Connection.ts +++ b/src/state/CallViewModel/remoteMembers/Connection.ts @@ -150,7 +150,8 @@ export class Connection { throw new InsufficientCapacityError(); } if (e.status === 404) { - // error msg is "Could not establish signal connection: requested room does not exist" + // error msg is "Failed to create call" + // error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists." // The room does not exist. There are two different modes of operation for the SFU: // - the room is created on the fly when connecting (livekit `auto_create` option) // - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service) @@ -172,6 +173,7 @@ export class Connection { } catch (error) { this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this._state$.next(error instanceof Error ? error : new Error(`${error}`)); + // Its okay to ignore the throw. The error is part of the state. throw error; } }