[Feature] Support delayed event delegation for legacy JWT request (#3946)

* make `delayed_leave_event_delay_ms` and `network_error_retry_ms` mandatory

* Support delegation for legacy  jwt request

* Calculate `maximumNetworkErrorRetryCount` based on timeouts
This commit is contained in:
fkwp
2026-05-05 17:33:39 +02:00
committed by GitHub
parent 95f772df2b
commit 147d0f96e0
4 changed files with 169 additions and 6 deletions

View File

@@ -164,6 +164,14 @@ export interface ResolvedConfigOptions extends ConfigOptions {
};
sync_disconnect_grace_period_ms: number;
ssla: string;
matrix_rtc_session: {
wait_for_key_rotation_ms?: number;
delayed_leave_event_delay_ms: number;
delayed_leave_event_restart_local_timeout_ms?: number;
delayed_leave_event_restart_ms?: number;
network_error_retry_ms: number;
membership_event_expiry_ms?: number;
};
}
export const DEFAULT_CONFIG: ResolvedConfigOptions = {
@@ -178,4 +186,8 @@ export const DEFAULT_CONFIG: ResolvedConfigOptions = {
},
sync_disconnect_grace_period_ms: 10000,
ssla: "https://static.element.io/legal/element-software-and-services-license-agreement-uk-1.pdf",
matrix_rtc_session: {
delayed_leave_event_delay_ms: 10000,
network_error_retry_ms: 1000,
},
};

View File

@@ -84,6 +84,99 @@ describe("getSFUConfigWithOpenID", () => {
expect.fail("Expected test to throw;");
});
it("should retry without delay params if the JWT service legacy endpoint returns M_BAD_JSON 400", async () => {
let callCount = 0;
fetchMock.post(
"https://sfu.example.org/sfu/get",
(url, opts) => {
callCount++;
const body = JSON.parse(opts.body as string);
// First call: check if it has delay parts and return 400
if (callCount === 1) {
expect(body).toHaveProperty("delay_id", "mock_delay_id");
return {
status: 400,
body: { errcode: "M_BAD_JSON", error: "Unsupported parameters" },
};
}
// Second call: check if delay parts were stripped and return success
expect(body).not.toHaveProperty("delay_id");
expect(body).not.toHaveProperty("delay_timeout");
expect(body).not.toHaveProperty("delay_cs_api_url");
return {
status: 200,
body: { url: sfuUrl, jwt: testJWTToken },
};
},
{ overwriteRoutes: true },
);
// Note: Assuming getSFUConfigWithOpenID eventually calls getLiveKitJWT
const config = await getSFUConfigWithOpenID(
matrixClient,
ownMemberMock,
"https://sfu.example.org",
"!example_room_id",
{
delayEndpointBaseUrl: "https://matrix.homeserverserver.org",
delayId: "mock_delay_id",
},
);
expect(config.jwt).toBe(testJWTToken);
expect(callCount).toBe(2);
void (await fetchMock.flush());
});
it("should successfully send delay parameters to the JWT service legacy endpoint", async () => {
fetchMock.post(
"https://sfu.example.org/sfu/get",
(url, opts) => {
const body = JSON.parse(opts.body as string);
// Verify, that the request contains the expected delay parameters
if (
body.delay_id === "mock_delay_id" &&
body.delay_timeout === 10000 &&
body.delay_cs_api_url === "https://homeserverserver.org/cs_api"
) {
return {
status: 200,
body: { url: sfuUrl, jwt: testJWTToken },
};
}
return {
status: 400,
body: { error: "Missing expected delay params" },
};
},
{ overwriteRoutes: true },
);
const config = await getSFUConfigWithOpenID(
matrixClient,
ownMemberMock,
"https://sfu.example.org",
"!example_room_id",
{
delayEndpointBaseUrl: "https://homeserverserver.org/cs_api",
delayId: "mock_delay_id",
},
);
// Prüfe das Ergebnis
expect(config).toMatchObject({
jwt: testJWTToken,
url: sfuUrl,
});
void (await fetchMock.flush());
});
it("should try legacy and then new endpoint with delay delegation", async () => {
fetchMock.post("https://sfu.example.org/get_token", () => {
return {
@@ -121,7 +214,7 @@ describe("getSFUConfigWithOpenID", () => {
expect(calls[0][0]).toStrictEqual("https://sfu.example.org/get_token");
expect(calls[0][1]).toStrictEqual({
// check if it uses correct delayID!
body: '{"room_id":"!example_room_id","slot_id":"m.call#ROOM","member":{"id":"@alice:example.org:DEVICE","claimed_user_id":"@alice:example.org","claimed_device_id":"DEVICE"},"delay_id":"mock_delay_id","delay_timeout":1000,"delay_cs_api_url":"https://matrix.homeserverserver.org"}',
body: '{"room_id":"!example_room_id","slot_id":"m.call#ROOM","member":{"id":"@alice:example.org:DEVICE","claimed_user_id":"@alice:example.org","claimed_device_id":"DEVICE"},"delay_id":"mock_delay_id","delay_timeout":10000,"delay_cs_api_url":"https://matrix.homeserverserver.org"}',
method: "POST",
headers: {
"Content-Type": "application/json",
@@ -131,7 +224,7 @@ describe("getSFUConfigWithOpenID", () => {
expect(calls[1][0]).toStrictEqual("https://sfu.example.org/sfu/get");
expect(calls[1][1]).toStrictEqual({
body: '{"room":"!example_room_id","device_id":"DEVICE"}',
body: '{"room":"!example_room_id","device_id":"DEVICE","delay_id":"mock_delay_id","delay_timeout":10000,"delay_cs_api_url":"https://matrix.homeserverserver.org"}',
headers: {
"Content-Type": "application/json",
},
@@ -176,7 +269,7 @@ describe("getSFUConfigWithOpenID", () => {
expect(calls[0][0]).toStrictEqual("https://sfu.example.org/get_token");
expect(calls[0][1]).toStrictEqual({
// check if it uses correct delayID!
body: '{"room_id":"!example_room_id","slot_id":"m.call#ROOM","member":{"id":"@alice:example.org:DEVICE","claimed_user_id":"@alice:example.org","claimed_device_id":"DEVICE"},"delay_id":"mock_delay_id","delay_timeout":1000,"delay_cs_api_url":"https://matrix.homeserverserver.org"}',
body: '{"room_id":"!example_room_id","slot_id":"m.call#ROOM","member":{"id":"@alice:example.org:DEVICE","claimed_user_id":"@alice:example.org","claimed_device_id":"DEVICE"},"delay_id":"mock_delay_id","delay_timeout":10000,"delay_cs_api_url":"https://matrix.homeserverserver.org"}',
method: "POST",
headers: {
"Content-Type": "application/json",

View File

@@ -155,6 +155,8 @@ export async function getSFUConfigWithOpenID(
serviceUrl,
roomId,
openIdToken,
opts?.delayEndpointBaseUrl,
opts?.delayId,
);
logger?.info(`Got JWT from call's active focus URL.`);
return extractFullConfigFromToken(sfuConfig);
@@ -187,20 +189,62 @@ async function getLiveKitJWT(
livekitServiceURL: string,
matrixRoomId: string,
openIDToken: IOpenIDToken,
delayEndpointBaseUrl?: string,
delayId?: string,
): Promise<{ url: string; jwt: string }> {
const res = await doNetworkOperationWithRetry(async () => {
interface IDelayParams {
delay_id?: string;
delay_timeout?: number;
delay_cs_api_url?: string;
}
let bodyDalayParts: IDelayParams = {};
// Also check for empty string
if (delayId && delayEndpointBaseUrl) {
const delayTimeoutMs =
Config.get().matrix_rtc_session?.delayed_leave_event_delay_ms;
bodyDalayParts = {
delay_id: delayId,
delay_timeout: delayTimeoutMs,
delay_cs_api_url: delayEndpointBaseUrl,
};
}
const makeRequest = async (delayParts: IDelayParams): Promise<Response> => {
return await fetch(livekitServiceURL + "/sfu/get", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
// This is the actual livekit room alias. For the legacy jwt endpoint simply the room id was used.
// The legacy JWT endpoint uses only the matrix room id to calculate the livekit room alias.
// However, the livekit room alias is provided as part of the JWT payload.
room: matrixRoomId,
openid_token: openIDToken,
device_id: deviceId,
...delayParts,
}),
});
};
const res = await doNetworkOperationWithRetry(async () => {
let response = await makeRequest(bodyDalayParts);
// Old service compatibility check
const oldServiceDoesNotSupportDelayParts =
response.status === 400 && Object.keys(bodyDalayParts).length > 0;
// If http status 400 with M_BAD_JSON and we sent delay parts, retry without them
if (oldServiceDoesNotSupportDelayParts) {
try {
const errorBody = await response.json();
if (errorBody.errcode === "M_BAD_JSON") {
response = await makeRequest({});
}
} catch {
// If we can't parse the error, treat as real error
}
}
return response;
});
if (!res.ok) {
@@ -241,7 +285,7 @@ export async function getLiveKitJWTWithDelayDelegation(
// Also check for empty string
if (delayId && delayEndpointBaseUrl) {
const delayTimeoutMs =
Config.get().matrix_rtc_session?.delayed_leave_event_delay_ms ?? 1000;
Config.get().matrix_rtc_session?.delayed_leave_event_delay_ms;
bodyDalayParts = {
delay_id: delayId,
delay_timeout: delayTimeoutMs,

View File

@@ -778,6 +778,19 @@ export function enterRTCSession(
};
}
// Calculates `maximumNetworkErrorRetryCount`. The connection is failed if EITHER:
// - The /sync loop is unresponsive for > `gracePeriod` ms, or
// - A delayed leave event is emitted (after `leaveDelay` ms period).
// Note: Use leaveDelay >> gracePeriod for delegated leave events.
const gracePeriod = Config.get().sync_disconnect_grace_period_ms;
const leaveDelay = matrixRtcSessionConfig?.delayed_leave_event_delay_ms;
const retryInterval = matrixRtcSessionConfig?.network_error_retry_ms;
// Math.min is used to account for the respective worst case: /sync not available or leave event emitted.
const maxWaitTime = Math.min(gracePeriod, leaveDelay);
const maximumNetworkErrorRetryCount =
Math.ceil(maxWaitTime / retryInterval) + 1;
// Multi-sfu does not need a preferred foci list. just the focus that is actually used.
// TODO where/how do we track errors originating from the ongoing rtcSession?
@@ -803,6 +816,7 @@ export function enterRTCSession(
membershipEventExpiryMs:
matrixRtcSessionConfig?.membership_event_expiry_ms,
unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0,
maximumNetworkErrorRetryCount: maximumNetworkErrorRetryCount,
},
);
}