mirror of
https://github.com/vector-im/element-call.git
synced 2026-05-01 09:54:37 +00:00
Merge pull request #3854 from element-hq/valere/fix_race_connection_fails
Fix a race connection causing the joiner to never publish media to the call
This commit is contained in:
@@ -546,9 +546,7 @@ export function createCallViewModel$(
|
||||
},
|
||||
connectionManager,
|
||||
matrixRTCSession,
|
||||
localTransport$: scope.behavior(
|
||||
localTransport$.pipe(switchMap((t) => t.advertised$)),
|
||||
),
|
||||
localTransport$,
|
||||
logger: logger.getChild(`[${Date.now()}]`),
|
||||
});
|
||||
|
||||
|
||||
@@ -33,13 +33,20 @@ import {
|
||||
PublishState,
|
||||
TrackState,
|
||||
} from "./LocalMember";
|
||||
import { MatrixRTCTransportMissingError } from "../../../utils/errors";
|
||||
import {
|
||||
FailToGetOpenIdToken,
|
||||
MatrixRTCTransportMissingError,
|
||||
} from "../../../utils/errors";
|
||||
import { Epoch, ObservableScope } from "../../ObservableScope";
|
||||
import { constant } from "../../Behavior";
|
||||
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
|
||||
import { ConnectionState, type Connection } from "../remoteMembers/Connection";
|
||||
import { type Publisher } from "./Publisher";
|
||||
import { initializeWidget } from "../../../widget";
|
||||
import {
|
||||
type LocalTransport,
|
||||
type LocalTransportWithSFUConfig,
|
||||
} from "./LocalTransport";
|
||||
|
||||
initializeWidget();
|
||||
|
||||
@@ -214,7 +221,7 @@ describe("LocalMembership", () => {
|
||||
};
|
||||
|
||||
it("throws error on missing RTC config error", () => {
|
||||
withTestScheduler(({ scope, hot, expectObservable }) => {
|
||||
withTestScheduler(({ scope, hot, behavior, expectObservable }) => {
|
||||
const localTransport$ = scope.behavior<null | LivekitTransportConfig>(
|
||||
hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")),
|
||||
null,
|
||||
@@ -230,11 +237,16 @@ describe("LocalMembership", () => {
|
||||
),
|
||||
};
|
||||
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$: localTransport$,
|
||||
active$: behavior("a", { a: null }),
|
||||
};
|
||||
|
||||
const localMembership = createLocalMembership$({
|
||||
scope,
|
||||
...defaultCreateLocalMemberValues,
|
||||
connectionManager: mockConnectionManager,
|
||||
localTransport$,
|
||||
localTransport$: behavior("a", { a: aLocalTransport }),
|
||||
});
|
||||
localMembership.requestJoinAndPublish();
|
||||
|
||||
@@ -245,10 +257,62 @@ describe("LocalMembership", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("Should not publish to active transport if advertised has errors", () => {
|
||||
withTestScheduler(({ scope, hot, behavior, expectObservable }) => {
|
||||
const advertised$ = scope.behavior<null | LivekitTransportConfig>(
|
||||
hot("--#", {}, new FailToGetOpenIdToken(new Error("foo"))),
|
||||
null,
|
||||
);
|
||||
|
||||
// Populate a connection for active
|
||||
const connectionManagerData = new ConnectionManagerData();
|
||||
connectionManagerData.add(connectionTransportBConnected, []);
|
||||
const mockConnectionManager = {
|
||||
transports$: constant(new Epoch([bTransport])),
|
||||
connectionManagerData$: constant(new Epoch(connectionManagerData)),
|
||||
};
|
||||
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$,
|
||||
active$: behavior("a", { n: null, a: bTransportWithSFUConfig }),
|
||||
};
|
||||
|
||||
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
|
||||
() => {
|
||||
return {} as unknown as Publisher;
|
||||
},
|
||||
);
|
||||
const publisherFactory =
|
||||
defaultCreateLocalMemberValues.createPublisherFactory as ReturnType<
|
||||
typeof vi.fn
|
||||
>;
|
||||
|
||||
const localMembership = createLocalMembership$({
|
||||
scope,
|
||||
...defaultCreateLocalMemberValues,
|
||||
connectionManager: mockConnectionManager,
|
||||
localTransport$: behavior("a", { a: aLocalTransport }),
|
||||
});
|
||||
localMembership.requestJoinAndPublish();
|
||||
|
||||
expectObservable(localMembership.localMemberState$).toBe("n-e", {
|
||||
n: TransportState.Waiting,
|
||||
e: expect.toSatisfy((e) => e instanceof FailToGetOpenIdToken),
|
||||
});
|
||||
|
||||
// Should not have created any publisher
|
||||
expect(publisherFactory).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
});
|
||||
|
||||
it("logs if callIntent cannot be updated", async () => {
|
||||
const scope = new ObservableScope();
|
||||
|
||||
const localTransport$ = new BehaviorSubject(aTransport);
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$: new BehaviorSubject(aTransport),
|
||||
active$: new BehaviorSubject(aTransportWithSFUConfig),
|
||||
};
|
||||
|
||||
const mockConnectionManager = {
|
||||
transports$: constant(new Epoch([])),
|
||||
connectionManagerData$: constant(new Epoch(new ConnectionManagerData())),
|
||||
@@ -264,7 +328,7 @@ describe("LocalMembership", () => {
|
||||
leaveRoomSession: vi.fn(),
|
||||
},
|
||||
connectionManager: mockConnectionManager,
|
||||
localTransport$,
|
||||
localTransport$: new BehaviorSubject(aLocalTransport),
|
||||
});
|
||||
const expextedLog =
|
||||
"'not connected yet' while updating the call intent (this is expected on startup)";
|
||||
@@ -279,10 +343,31 @@ describe("LocalMembership", () => {
|
||||
const aTransport = {
|
||||
livekit_service_url: "a",
|
||||
} as LivekitTransportConfig;
|
||||
|
||||
const aTransportWithSFUConfig = {
|
||||
transport: aTransport,
|
||||
sfuConfig: {
|
||||
jwt: "foo",
|
||||
livekitAlias: "bar",
|
||||
livekitIdentity: "baz",
|
||||
url: "bro",
|
||||
},
|
||||
} as LocalTransportWithSFUConfig;
|
||||
|
||||
const bTransport = {
|
||||
livekit_service_url: "b",
|
||||
} as LivekitTransportConfig;
|
||||
|
||||
const bTransportWithSFUConfig = {
|
||||
transport: bTransport,
|
||||
sfuConfig: {
|
||||
jwt: "foo2",
|
||||
livekitAlias: "bar2",
|
||||
livekitIdentity: "baz2",
|
||||
url: "bro2",
|
||||
},
|
||||
} as LocalTransportWithSFUConfig;
|
||||
|
||||
const connectionTransportAConnected = {
|
||||
livekitRoom: mockLivekitRoom({
|
||||
localParticipant: {
|
||||
@@ -307,7 +392,11 @@ describe("LocalMembership", () => {
|
||||
it("recreates publisher if new connection is used, always unpublish and end tracks", async () => {
|
||||
const scope = new ObservableScope();
|
||||
|
||||
const localTransport$ = new BehaviorSubject(aTransport);
|
||||
const activeTransport$ = new BehaviorSubject(aTransportWithSFUConfig);
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$: new BehaviorSubject(aTransport),
|
||||
active$: activeTransport$,
|
||||
};
|
||||
|
||||
const publishers: Publisher[] = [];
|
||||
let seed = 0;
|
||||
@@ -343,10 +432,13 @@ describe("LocalMembership", () => {
|
||||
connectionManager: {
|
||||
connectionManagerData$: constant(new Epoch(connectionManagerData)),
|
||||
},
|
||||
localTransport$,
|
||||
localTransport$: new BehaviorSubject(aLocalTransport),
|
||||
});
|
||||
await flushPromises();
|
||||
localTransport$.next(bTransport);
|
||||
activeTransport$.next({
|
||||
...aTransportWithSFUConfig,
|
||||
transport: bTransport,
|
||||
});
|
||||
await flushPromises();
|
||||
|
||||
expect(publisherFactory).toHaveBeenCalledTimes(2);
|
||||
@@ -368,8 +460,6 @@ describe("LocalMembership", () => {
|
||||
it("only start tracks if requested", async () => {
|
||||
const scope = new ObservableScope();
|
||||
|
||||
const localTransport$ = new BehaviorSubject(aTransport);
|
||||
|
||||
const publishers: Publisher[] = [];
|
||||
|
||||
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
|
||||
@@ -396,6 +486,11 @@ describe("LocalMembership", () => {
|
||||
typeof vi.fn
|
||||
>;
|
||||
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$: new BehaviorSubject(aTransport),
|
||||
active$: new BehaviorSubject(aTransportWithSFUConfig),
|
||||
};
|
||||
|
||||
const connectionManagerData = new ConnectionManagerData();
|
||||
connectionManagerData.add(connectionTransportAConnected, []);
|
||||
// connectionManagerData.add(connectionTransportB, []);
|
||||
@@ -405,7 +500,7 @@ describe("LocalMembership", () => {
|
||||
connectionManager: {
|
||||
connectionManagerData$: constant(new Epoch(connectionManagerData)),
|
||||
},
|
||||
localTransport$,
|
||||
localTransport$: new BehaviorSubject(aLocalTransport),
|
||||
});
|
||||
await flushPromises();
|
||||
expect(publisherFactory).toHaveBeenCalledOnce();
|
||||
@@ -428,9 +523,15 @@ describe("LocalMembership", () => {
|
||||
const scope = new ObservableScope();
|
||||
|
||||
const connectionManagerData = new ConnectionManagerData();
|
||||
const localTransport$ = new BehaviorSubject<null | LivekitTransportConfig>(
|
||||
null,
|
||||
);
|
||||
|
||||
const activeTransport$ =
|
||||
new BehaviorSubject<null | LocalTransportWithSFUConfig>(null);
|
||||
|
||||
const aLocalTransport: LocalTransport = {
|
||||
advertised$: new BehaviorSubject(aTransport),
|
||||
active$: activeTransport$,
|
||||
};
|
||||
|
||||
const connectionManagerData$ = new BehaviorSubject(
|
||||
new Epoch(connectionManagerData),
|
||||
);
|
||||
@@ -470,14 +571,14 @@ describe("LocalMembership", () => {
|
||||
connectionManager: {
|
||||
connectionManagerData$,
|
||||
},
|
||||
localTransport$,
|
||||
localTransport$: new BehaviorSubject(aLocalTransport),
|
||||
});
|
||||
|
||||
await flushPromises();
|
||||
expect(localMembership.localMemberState$.value).toStrictEqual(
|
||||
TransportState.Waiting,
|
||||
);
|
||||
localTransport$.next(aTransport);
|
||||
activeTransport$.next(aTransportWithSFUConfig);
|
||||
await flushPromises();
|
||||
expect(localMembership.localMemberState$.value).toStrictEqual({
|
||||
matrix: RTCMemberStatus.Connected,
|
||||
|
||||
@@ -62,6 +62,8 @@ import {
|
||||
} from "../remoteMembers/Connection.ts";
|
||||
import { type HomeserverConnected } from "./HomeserverConnected.ts";
|
||||
import { and$ } from "../../../utils/observable.ts";
|
||||
import { type LocalTransport } from "./LocalTransport.ts";
|
||||
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
|
||||
|
||||
export enum TransportState {
|
||||
/** Not even a transport is available to the LocalMembership */
|
||||
@@ -127,7 +129,7 @@ interface Props {
|
||||
createPublisherFactory: (connection: Connection) => Publisher;
|
||||
joinMatrixRTC: (transport: LivekitTransportConfig) => void;
|
||||
homeserverConnected: HomeserverConnected;
|
||||
localTransport$: Behavior<LivekitTransportConfig | null>;
|
||||
localTransport$: Behavior<LocalTransport>;
|
||||
matrixRTCSession: Pick<
|
||||
MatrixRTCSession,
|
||||
"updateCallIntent" | "leaveRoomSession"
|
||||
@@ -160,7 +162,7 @@ interface Props {
|
||||
export const createLocalMembership$ = ({
|
||||
scope,
|
||||
connectionManager,
|
||||
localTransport$: localTransportCanThrow$,
|
||||
localTransport$,
|
||||
homeserverConnected,
|
||||
createPublisherFactory,
|
||||
joinMatrixRTC,
|
||||
@@ -205,23 +207,43 @@ export const createLocalMembership$ = ({
|
||||
const logger = parentLogger.getChild("[LocalMembership]");
|
||||
logger.debug(`Creating local membership..`);
|
||||
|
||||
// We consider error on the transport as fatal.
|
||||
// Whether it is the active transport or the preferred transport.
|
||||
const handleTransportError = (e: unknown): Observable<null> => {
|
||||
let error: ElementCallError;
|
||||
if (e instanceof ElementCallError) {
|
||||
error = e;
|
||||
} else {
|
||||
error = new UnknownCallError(
|
||||
e instanceof Error ? e : new Error("Unknown error from localTransport"),
|
||||
);
|
||||
}
|
||||
setTransportError(error);
|
||||
return of(null);
|
||||
};
|
||||
|
||||
// This is the transport that we will advertise in our membership.
|
||||
const advertisedTransport$ = localTransport$.pipe(
|
||||
switchMap((lt) => lt.advertised$),
|
||||
catchError(handleTransportError),
|
||||
distinctUntilChanged(areLivekitTransportsEqual),
|
||||
);
|
||||
|
||||
// Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
|
||||
const localTransport$ = scope.behavior(
|
||||
localTransportCanThrow$.pipe(
|
||||
catchError((e: unknown) => {
|
||||
let error: ElementCallError;
|
||||
if (e instanceof ElementCallError) {
|
||||
error = e;
|
||||
} else {
|
||||
error = new UnknownCallError(
|
||||
e instanceof Error
|
||||
? e
|
||||
: new Error("Unknown error from localTransport"),
|
||||
);
|
||||
}
|
||||
setTransportError(error);
|
||||
return of(null);
|
||||
const activeTransport$ = scope.behavior(
|
||||
localTransport$.pipe(
|
||||
switchMap((lt) => {
|
||||
return combineLatest([lt.active$, lt.advertised$]).pipe(
|
||||
map(([active, advertised]) => {
|
||||
// Our policy is to not publish to another transport if our prefered transport is miss-configured
|
||||
if (advertised == null) return null;
|
||||
|
||||
return active?.transport ?? null;
|
||||
}),
|
||||
);
|
||||
}),
|
||||
catchError(handleTransportError),
|
||||
distinctUntilChanged(areLivekitTransportsEqual),
|
||||
),
|
||||
);
|
||||
|
||||
@@ -229,7 +251,7 @@ export const createLocalMembership$ = ({
|
||||
const localConnection$ = scope.behavior(
|
||||
combineLatest([
|
||||
connectionManager.connectionManagerData$,
|
||||
localTransport$,
|
||||
activeTransport$,
|
||||
]).pipe(
|
||||
map(([{ value: connectionData }, localTransport]) => {
|
||||
if (localTransport === null) {
|
||||
@@ -398,7 +420,7 @@ export const createLocalMembership$ = ({
|
||||
const mediaState$: Behavior<LocalMemberMediaState> = scope.behavior(
|
||||
combineLatest([
|
||||
localConnectionState$,
|
||||
localTransport$,
|
||||
activeTransport$,
|
||||
joinAndPublishRequested$,
|
||||
from(trackStartRequested.promise).pipe(
|
||||
map(() => true),
|
||||
@@ -537,9 +559,11 @@ export const createLocalMembership$ = ({
|
||||
});
|
||||
});
|
||||
|
||||
// Keep matrix rtc session in sync with localTransport$, connectRequested$
|
||||
// Keep matrix rtc session in sync with advertisedTransport$, connectRequested$
|
||||
scope.reconcile(
|
||||
scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])),
|
||||
scope.behavior(
|
||||
combineLatest([advertisedTransport$, joinAndPublishRequested$]),
|
||||
),
|
||||
async ([transport, shouldConnect]) => {
|
||||
if (!transport) return;
|
||||
// if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration.
|
||||
|
||||
@@ -106,7 +106,7 @@ export function isLocalTransportWithSFUConfig(
|
||||
return "transport" in obj && "sfuConfig" in obj;
|
||||
}
|
||||
|
||||
interface LocalTransport {
|
||||
export interface LocalTransport {
|
||||
/**
|
||||
* The transport to be advertised in our MatrixRTC membership. `null` when not
|
||||
* yet fetched/validated.
|
||||
|
||||
Reference in New Issue
Block a user