Ensure we don't publish to any transport until our own transport is ok

This commit is contained in:
Valere
2026-04-10 09:24:34 +02:00
parent 40dacd523b
commit aea5815dab
2 changed files with 73 additions and 2 deletions

View File

@@ -33,7 +33,10 @@ import {
PublishState,
TrackState,
} from "./LocalMember";
import { MatrixRTCTransportMissingError } from "../../../utils/errors";
import {
FailToGetOpenIdToken,
MatrixRTCTransportMissingError,
} from "../../../utils/errors";
import { Epoch, ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
@@ -254,6 +257,55 @@ describe("LocalMembership", () => {
});
});
it("Should not publish to active transport if advertised has errors", () => {
withTestScheduler(({ scope, hot, behavior, expectObservable }) => {
const advertised$ = scope.behavior<null | LivekitTransportConfig>(
hot("--#", {}, new FailToGetOpenIdToken(new Error("foo"))),
null,
);
// Populate a connection for active
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportBConnected, []);
const mockConnectionManager = {
transports$: constant(new Epoch([bTransport])),
connectionManagerData$: constant(new Epoch(connectionManagerData)),
};
const aLocalTransport: LocalTransport = {
advertised$,
active$: behavior("a", { n: null, a: bTransportWithSFUConfig }),
};
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
return {} as unknown as Publisher;
},
);
const publisherFactory =
defaultCreateLocalMemberValues.createPublisherFactory as ReturnType<
typeof vi.fn
>;
const localMembership = createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
connectionManager: mockConnectionManager,
localTransport$: behavior("a", { a: aLocalTransport }),
});
localMembership.requestJoinAndPublish();
expectObservable(localMembership.localMemberState$).toBe("n-e", {
n: TransportState.Waiting,
e: expect.toSatisfy((e) => e instanceof FailToGetOpenIdToken),
});
// Should not have created any publisher
expect(publisherFactory).toHaveBeenCalledTimes(0);
});
});
it("logs if callIntent cannot be updated", async () => {
const scope = new ObservableScope();
@@ -307,6 +359,16 @@ describe("LocalMembership", () => {
livekit_service_url: "b",
} as LivekitTransportConfig;
const bTransportWithSFUConfig = {
transport: bTransport,
sfuConfig: {
jwt: "foo2",
livekitAlias: "bar2",
livekitIdentity: "baz2",
url: "bro2",
},
} as LocalTransportWithSFUConfig;
const connectionTransportAConnected = {
livekitRoom: mockLivekitRoom({
localParticipant: {

View File

@@ -232,7 +232,16 @@ export const createLocalMembership$ = ({
// Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
const activeTransport$ = scope.behavior(
localTransport$.pipe(
switchMap((lt) => lt.active$.pipe(map((t) => t?.transport ?? null))),
switchMap((lt) => {
return combineLatest([lt.active$, lt.advertised$]).pipe(
map(([active, advertised]) => {
// Our policy is to not publish to another transport if our prefered transport is miss-configured
if (advertised == null) return null;
return active?.transport ?? null;
}),
);
}),
catchError(handleTransportError),
distinctUntilChanged(areLivekitTransportsEqual),
),