diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 2cb59810..711e5e7e 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -120,7 +120,6 @@ import { type LocalMatrixLivekitMember, type RemoteMatrixLivekitMember, type MatrixLivekitMember, - areLivekitTransportsEqual, } from "./remoteMembers/MatrixLivekitMembers.ts"; import { type AutoLeaveReason, @@ -518,26 +517,6 @@ export function createCallViewModel$( ), ); - // Observe the transport we should publish - const publishingTransport$ = localTransport$.pipe( - // observe the active$ transport - switchMap((t) => { - return combineLatest([t.active$, t.advertised$]).pipe( - map(([active, advertised]) => { - if (active?.transport) { - // use the active one (oldest member transport) - return active.transport; - } else { - // There is no active transport, we might just be the first member in the call - // so use the advertised to start - return advertised; - } - }), - ); - }), - distinctUntilChanged(areLivekitTransportsEqual), - ); - const localMembership = createLocalMembership$({ scope, homeserverConnected: createHomeserverConnected$( @@ -567,7 +546,7 @@ export function createCallViewModel$( }, connectionManager, matrixRTCSession, - localTransport$: scope.behavior(publishingTransport$), + localTransport$, logger: logger.getChild(`[${Date.now()}]`), }); diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index e5e9f327..bfcd7167 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -40,6 +40,10 @@ import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; import { initializeWidget } from "../../../widget"; +import { + type LocalTransport, + type LocalTransportWithSFUConfig, +} from "./LocalTransport"; initializeWidget(); @@ -214,7 +218,7 @@ describe("LocalMembership", () => { }; it("throws error on missing RTC config error", () => { - withTestScheduler(({ scope, hot, expectObservable }) => { + withTestScheduler(({ scope, hot, behavior, expectObservable }) => { const localTransport$ = scope.behavior( hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), null, @@ -230,11 +234,16 @@ describe("LocalMembership", () => { ), }; + const aLocalTransport: LocalTransport = { + advertised$: localTransport$, + active$: behavior("a", { a: null }), + }; + const localMembership = createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, connectionManager: mockConnectionManager, - localTransport$, + localTransport$: behavior("a", { a: aLocalTransport }), }); localMembership.requestJoinAndPublish(); @@ -248,7 +257,11 @@ describe("LocalMembership", () => { it("logs if callIntent cannot be updated", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: new BehaviorSubject(aTransportWithSFUConfig), + }; + const mockConnectionManager = { transports$: constant(new Epoch([])), connectionManagerData$: constant(new Epoch(new ConnectionManagerData())), @@ -264,7 +277,7 @@ describe("LocalMembership", () => { leaveRoomSession: vi.fn(), }, connectionManager: mockConnectionManager, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); const expextedLog = "'not connected yet' while updating the call intent (this is expected on startup)"; @@ -279,6 +292,17 @@ describe("LocalMembership", () => { const aTransport = { livekit_service_url: "a", } as LivekitTransportConfig; + + const aTransportWithSFUConfig = { + transport: aTransport, + sfuConfig: { + jwt: "foo", + livekitAlias: "bar", + livekitIdentity: "baz", + url: "bro", + }, + } as LocalTransportWithSFUConfig; + const bTransport = { livekit_service_url: "b", } as LivekitTransportConfig; @@ -307,7 +331,11 @@ describe("LocalMembership", () => { it("recreates publisher if new connection is used, always unpublish and end tracks", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); + const activeTransport$ = new BehaviorSubject(aTransportWithSFUConfig); + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: activeTransport$, + }; const publishers: Publisher[] = []; let seed = 0; @@ -343,10 +371,13 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$: constant(new Epoch(connectionManagerData)), }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); - localTransport$.next(bTransport); + activeTransport$.next({ + ...aTransportWithSFUConfig, + transport: bTransport, + }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledTimes(2); @@ -368,8 +399,6 @@ describe("LocalMembership", () => { it("only start tracks if requested", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); - const publishers: Publisher[] = []; const tracks$ = new BehaviorSubject([]); @@ -396,6 +425,11 @@ describe("LocalMembership", () => { typeof vi.fn >; + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: new BehaviorSubject(aTransportWithSFUConfig), + }; + const connectionManagerData = new ConnectionManagerData(); connectionManagerData.add(connectionTransportAConnected, []); // connectionManagerData.add(connectionTransportB, []); @@ -405,7 +439,7 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$: constant(new Epoch(connectionManagerData)), }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledOnce(); @@ -428,9 +462,15 @@ describe("LocalMembership", () => { const scope = new ObservableScope(); const connectionManagerData = new ConnectionManagerData(); - const localTransport$ = new BehaviorSubject( - null, - ); + + const activeTransport$ = + new BehaviorSubject(null); + + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: activeTransport$, + }; + const connectionManagerData$ = new BehaviorSubject( new Epoch(connectionManagerData), ); @@ -470,14 +510,14 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$, }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); expect(localMembership.localMemberState$.value).toStrictEqual( TransportState.Waiting, ); - localTransport$.next(aTransport); + activeTransport$.next(aTransportWithSFUConfig); await flushPromises(); expect(localMembership.localMemberState$.value).toStrictEqual({ matrix: RTCMemberStatus.Connected, diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index eb641ca7..ec4a2082 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,6 +62,8 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; +import { type LocalTransport } from "./LocalTransport.ts"; +import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -127,7 +129,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -160,7 +162,7 @@ interface Props { export const createLocalMembership$ = ({ scope, connectionManager, - localTransport$: localTransportCanThrow$, + localTransport$, homeserverConnected, createPublisherFactory, joinMatrixRTC, @@ -205,23 +207,34 @@ export const createLocalMembership$ = ({ const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); + // We consider error on the transport as fatal. + // Whether it is the active transport or the preferred transport. + const handleTransportError = (e: unknown): Observable => { + let error: ElementCallError; + if (e instanceof ElementCallError) { + error = e; + } else { + error = new UnknownCallError( + e instanceof Error ? e : new Error("Unknown error from localTransport"), + ); + } + setTransportError(error); + return of(null); + }; + + // This is the transport that we will advertise in our membership. + const advertisedTransport$ = localTransport$.pipe( + switchMap((lt) => lt.advertised$), + catchError(handleTransportError), + distinctUntilChanged(areLivekitTransportsEqual), + ); + // Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. - const localTransport$ = scope.behavior( - localTransportCanThrow$.pipe( - catchError((e: unknown) => { - let error: ElementCallError; - if (e instanceof ElementCallError) { - error = e; - } else { - error = new UnknownCallError( - e instanceof Error - ? e - : new Error("Unknown error from localTransport"), - ); - } - setTransportError(error); - return of(null); - }), + const activeTransport$ = scope.behavior( + localTransport$.pipe( + switchMap((lt) => lt.active$.pipe(map((t) => t?.transport ?? null))), + catchError(handleTransportError), + distinctUntilChanged(areLivekitTransportsEqual), ), ); @@ -229,7 +242,7 @@ export const createLocalMembership$ = ({ const localConnection$ = scope.behavior( combineLatest([ connectionManager.connectionManagerData$, - localTransport$, + activeTransport$, ]).pipe( map(([{ value: connectionData }, localTransport]) => { if (localTransport === null) { @@ -398,7 +411,7 @@ export const createLocalMembership$ = ({ const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, - localTransport$, + activeTransport$, joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), @@ -537,9 +550,11 @@ export const createLocalMembership$ = ({ }); }); - // Keep matrix rtc session in sync with localTransport$, connectRequested$ + // Keep matrix rtc session in sync with advertisedTransport$, connectRequested$ scope.reconcile( - scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])), + scope.behavior( + combineLatest([advertisedTransport$, joinAndPublishRequested$]), + ), async ([transport, shouldConnect]) => { if (!transport) return; // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 037b6a0b..10ea79c4 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -106,7 +106,7 @@ export function isLocalTransportWithSFUConfig( return "transport" in obj && "sfuConfig" in obj; } -interface LocalTransport { +export interface LocalTransport { /** * The transport to be advertised in our MatrixRTC membership. `null` when not * yet fetched/validated.