diff --git a/playwright/widget/federation-oldest-membership-bug.spec.ts b/playwright/widget/federation-oldest-membership-bug.spec.ts new file mode 100644 index 00000000..70442e05 --- /dev/null +++ b/playwright/widget/federation-oldest-membership-bug.spec.ts @@ -0,0 +1,92 @@ +/* +Copyright 2026 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { expect, test } from "@playwright/test"; + +import { widgetTest } from "../fixtures/widget-user"; +import { HOST1, HOST2, TestHelpers } from "./test-helpers"; + +widgetTest( + "Bug new joiner was not publishing on correct SFU", + async ({ addUser, browserName }) => { + test.skip( + browserName === "firefox", + "This is a bug in the old widget, not a browser problem.", + ); + + test.slow(); + + // 2 users in federation + const florian = await addUser("floriant", HOST1); + const timo = await addUser("timo", HOST2); + + // Florian creates a room and invites Timo to it + const roomName = "Call Room"; + await TestHelpers.createRoom(roomName, florian.page, [timo.mxId]); + + // Timo joins the room + await TestHelpers.acceptRoomInvite(roomName, timo.page); + + // Ensure we are in legacy mode (should be the default) + await TestHelpers.openWidgetSetEmbeddedElementCallRtcModeCloseWidget( + florian.page, + "legacy", + ); + await TestHelpers.openWidgetSetEmbeddedElementCallRtcModeCloseWidget( + timo.page, + "legacy", + ); + + // Let timo create a call + await TestHelpers.startCallInCurrentRoom(timo.page, false); + await TestHelpers.joinCallFromLobby(timo.page); + + // We want to simulate that the oldest membership authentication is way slower than + // the preffered auth. + // In this setup, timo advertised$ transport will be it's own, and the active will be the one from florian + await florian.page.route( + "**/matrix-rtc.othersite.m.localhost/livekit/jwt/**", + async (route) => { + await new Promise((resolve) => setTimeout(resolve, 2000)); // 5 second delay + await route.continue(); + }, + ); + + // Florian joins the call + await expect(florian.page.getByTestId("join-call-button")).toBeVisible(); + await florian.page.getByTestId("join-call-button").click(); + await TestHelpers.joinCallFromLobby(florian.page); + + await florian.page.waitForTimeout(3000); + await timo.page.waitForTimeout(3000); + + // We should see 2 video tiles everywhere now + for (const user of [timo, florian]) { + const frame = user.page + .locator('iframe[title="Element Call"]') + .contentFrame(); + await expect(frame.getByTestId("videoTile")).toHaveCount(2); + + // No one should be waiting for media + await expect(frame.getByText("Waiting for media...")).not.toBeVisible(); + + // There should be 2 video elements, visible and autoplaying + const videoElements = await frame.locator("video").all(); + expect(videoElements.length).toBe(2); + + const blockDisplayCount = await frame + .locator("video") + .evaluateAll( + (videos: Element[]) => + videos.filter( + (v: Element) => window.getComputedStyle(v).display === "block", + ).length, + ); + expect(blockDisplayCount).toBe(2); + } + }, +); diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index 6dca08dc..711e5e7e 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -546,9 +546,7 @@ export function createCallViewModel$( }, connectionManager, matrixRTCSession, - localTransport$: scope.behavior( - localTransport$.pipe(switchMap((t) => t.advertised$)), - ), + localTransport$, logger: logger.getChild(`[${Date.now()}]`), }); diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index e5e9f327..6eaaa0b0 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -33,13 +33,20 @@ import { PublishState, TrackState, } from "./LocalMember"; -import { MatrixRTCTransportMissingError } from "../../../utils/errors"; +import { + FailToGetOpenIdToken, + MatrixRTCTransportMissingError, +} from "../../../utils/errors"; import { Epoch, ObservableScope } from "../../ObservableScope"; import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; import { initializeWidget } from "../../../widget"; +import { + type LocalTransport, + type LocalTransportWithSFUConfig, +} from "./LocalTransport"; initializeWidget(); @@ -214,7 +221,7 @@ describe("LocalMembership", () => { }; it("throws error on missing RTC config error", () => { - withTestScheduler(({ scope, hot, expectObservable }) => { + withTestScheduler(({ scope, hot, behavior, expectObservable }) => { const localTransport$ = scope.behavior( hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), null, @@ -230,11 +237,16 @@ describe("LocalMembership", () => { ), }; + const aLocalTransport: LocalTransport = { + advertised$: localTransport$, + active$: behavior("a", { a: null }), + }; + const localMembership = createLocalMembership$({ scope, ...defaultCreateLocalMemberValues, connectionManager: mockConnectionManager, - localTransport$, + localTransport$: behavior("a", { a: aLocalTransport }), }); localMembership.requestJoinAndPublish(); @@ -245,10 +257,62 @@ describe("LocalMembership", () => { }); }); + it("Should not publish to active transport if advertised has errors", () => { + withTestScheduler(({ scope, hot, behavior, expectObservable }) => { + const advertised$ = scope.behavior( + hot("--#", {}, new FailToGetOpenIdToken(new Error("foo"))), + null, + ); + + // Populate a connection for active + const connectionManagerData = new ConnectionManagerData(); + connectionManagerData.add(connectionTransportBConnected, []); + const mockConnectionManager = { + transports$: constant(new Epoch([bTransport])), + connectionManagerData$: constant(new Epoch(connectionManagerData)), + }; + + const aLocalTransport: LocalTransport = { + advertised$, + active$: behavior("a", { n: null, a: bTransportWithSFUConfig }), + }; + + defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation( + () => { + return {} as unknown as Publisher; + }, + ); + const publisherFactory = + defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< + typeof vi.fn + >; + + const localMembership = createLocalMembership$({ + scope, + ...defaultCreateLocalMemberValues, + connectionManager: mockConnectionManager, + localTransport$: behavior("a", { a: aLocalTransport }), + }); + localMembership.requestJoinAndPublish(); + + expectObservable(localMembership.localMemberState$).toBe("n-e", { + n: TransportState.Waiting, + e: expect.toSatisfy((e) => e instanceof FailToGetOpenIdToken), + }); + + // Should not have created any publisher + expect(publisherFactory).toHaveBeenCalledTimes(0); + }); + }); + it("logs if callIntent cannot be updated", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: new BehaviorSubject(aTransportWithSFUConfig), + }; + const mockConnectionManager = { transports$: constant(new Epoch([])), connectionManagerData$: constant(new Epoch(new ConnectionManagerData())), @@ -264,7 +328,7 @@ describe("LocalMembership", () => { leaveRoomSession: vi.fn(), }, connectionManager: mockConnectionManager, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); const expextedLog = "'not connected yet' while updating the call intent (this is expected on startup)"; @@ -279,10 +343,31 @@ describe("LocalMembership", () => { const aTransport = { livekit_service_url: "a", } as LivekitTransportConfig; + + const aTransportWithSFUConfig = { + transport: aTransport, + sfuConfig: { + jwt: "foo", + livekitAlias: "bar", + livekitIdentity: "baz", + url: "bro", + }, + } as LocalTransportWithSFUConfig; + const bTransport = { livekit_service_url: "b", } as LivekitTransportConfig; + const bTransportWithSFUConfig = { + transport: bTransport, + sfuConfig: { + jwt: "foo2", + livekitAlias: "bar2", + livekitIdentity: "baz2", + url: "bro2", + }, + } as LocalTransportWithSFUConfig; + const connectionTransportAConnected = { livekitRoom: mockLivekitRoom({ localParticipant: { @@ -307,7 +392,11 @@ describe("LocalMembership", () => { it("recreates publisher if new connection is used, always unpublish and end tracks", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); + const activeTransport$ = new BehaviorSubject(aTransportWithSFUConfig); + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: activeTransport$, + }; const publishers: Publisher[] = []; let seed = 0; @@ -343,10 +432,13 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$: constant(new Epoch(connectionManagerData)), }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); - localTransport$.next(bTransport); + activeTransport$.next({ + ...aTransportWithSFUConfig, + transport: bTransport, + }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledTimes(2); @@ -368,8 +460,6 @@ describe("LocalMembership", () => { it("only start tracks if requested", async () => { const scope = new ObservableScope(); - const localTransport$ = new BehaviorSubject(aTransport); - const publishers: Publisher[] = []; const tracks$ = new BehaviorSubject([]); @@ -396,6 +486,11 @@ describe("LocalMembership", () => { typeof vi.fn >; + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: new BehaviorSubject(aTransportWithSFUConfig), + }; + const connectionManagerData = new ConnectionManagerData(); connectionManagerData.add(connectionTransportAConnected, []); // connectionManagerData.add(connectionTransportB, []); @@ -405,7 +500,7 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$: constant(new Epoch(connectionManagerData)), }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); expect(publisherFactory).toHaveBeenCalledOnce(); @@ -428,9 +523,15 @@ describe("LocalMembership", () => { const scope = new ObservableScope(); const connectionManagerData = new ConnectionManagerData(); - const localTransport$ = new BehaviorSubject( - null, - ); + + const activeTransport$ = + new BehaviorSubject(null); + + const aLocalTransport: LocalTransport = { + advertised$: new BehaviorSubject(aTransport), + active$: activeTransport$, + }; + const connectionManagerData$ = new BehaviorSubject( new Epoch(connectionManagerData), ); @@ -470,14 +571,14 @@ describe("LocalMembership", () => { connectionManager: { connectionManagerData$, }, - localTransport$, + localTransport$: new BehaviorSubject(aLocalTransport), }); await flushPromises(); expect(localMembership.localMemberState$.value).toStrictEqual( TransportState.Waiting, ); - localTransport$.next(aTransport); + activeTransport$.next(aTransportWithSFUConfig); await flushPromises(); expect(localMembership.localMemberState$.value).toStrictEqual({ matrix: RTCMemberStatus.Connected, diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index eb641ca7..188dc543 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,6 +62,8 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; +import { type LocalTransport } from "./LocalTransport.ts"; +import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -127,7 +129,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -160,7 +162,7 @@ interface Props { export const createLocalMembership$ = ({ scope, connectionManager, - localTransport$: localTransportCanThrow$, + localTransport$, homeserverConnected, createPublisherFactory, joinMatrixRTC, @@ -205,23 +207,43 @@ export const createLocalMembership$ = ({ const logger = parentLogger.getChild("[LocalMembership]"); logger.debug(`Creating local membership..`); + // We consider error on the transport as fatal. + // Whether it is the active transport or the preferred transport. + const handleTransportError = (e: unknown): Observable => { + let error: ElementCallError; + if (e instanceof ElementCallError) { + error = e; + } else { + error = new UnknownCallError( + e instanceof Error ? e : new Error("Unknown error from localTransport"), + ); + } + setTransportError(error); + return of(null); + }; + + // This is the transport that we will advertise in our membership. + const advertisedTransport$ = localTransport$.pipe( + switchMap((lt) => lt.advertised$), + catchError(handleTransportError), + distinctUntilChanged(areLivekitTransportsEqual), + ); + // Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error. - const localTransport$ = scope.behavior( - localTransportCanThrow$.pipe( - catchError((e: unknown) => { - let error: ElementCallError; - if (e instanceof ElementCallError) { - error = e; - } else { - error = new UnknownCallError( - e instanceof Error - ? e - : new Error("Unknown error from localTransport"), - ); - } - setTransportError(error); - return of(null); + const activeTransport$ = scope.behavior( + localTransport$.pipe( + switchMap((lt) => { + return combineLatest([lt.active$, lt.advertised$]).pipe( + map(([active, advertised]) => { + // Our policy is to not publish to another transport if our prefered transport is miss-configured + if (advertised == null) return null; + + return active?.transport ?? null; + }), + ); }), + catchError(handleTransportError), + distinctUntilChanged(areLivekitTransportsEqual), ), ); @@ -229,7 +251,7 @@ export const createLocalMembership$ = ({ const localConnection$ = scope.behavior( combineLatest([ connectionManager.connectionManagerData$, - localTransport$, + activeTransport$, ]).pipe( map(([{ value: connectionData }, localTransport]) => { if (localTransport === null) { @@ -398,7 +420,7 @@ export const createLocalMembership$ = ({ const mediaState$: Behavior = scope.behavior( combineLatest([ localConnectionState$, - localTransport$, + activeTransport$, joinAndPublishRequested$, from(trackStartRequested.promise).pipe( map(() => true), @@ -537,9 +559,11 @@ export const createLocalMembership$ = ({ }); }); - // Keep matrix rtc session in sync with localTransport$, connectRequested$ + // Keep matrix rtc session in sync with advertisedTransport$, connectRequested$ scope.reconcile( - scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])), + scope.behavior( + combineLatest([advertisedTransport$, joinAndPublishRequested$]), + ), async ([transport, shouldConnect]) => { if (!transport) return; // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 037b6a0b..10ea79c4 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -106,7 +106,7 @@ export function isLocalTransportWithSFUConfig( return "transport" in obj && "sfuConfig" in obj; } -interface LocalTransport { +export interface LocalTransport { /** * The transport to be advertised in our MatrixRTC membership. `null` when not * yet fetched/validated.