refact: step 2 - break down old membership local transport logic

This commit is contained in:
Valere
2026-04-02 16:09:18 +02:00
parent 4cecdda955
commit 90bfaecd13

View File

@@ -12,13 +12,14 @@ import {
} from "matrix-js-sdk/lib/matrixrtc";
import { type MatrixClient } from "matrix-js-sdk";
import {
catchError,
combineLatest,
distinctUntilChanged,
first,
from,
map,
merge,
Observable,
type Observable,
of,
startWith,
switchMap,
@@ -184,7 +185,7 @@ export const createLocalTransport$ = ({
switchMap(async ([transport, delayId]) => {
try {
return await doOpenIdAndJWTFromUrl(
transport.livekit_service_url,
transport,
forceJwtEndpoint,
ownMembershipIdentity,
roomId,
@@ -192,18 +193,11 @@ export const createLocalTransport$ = ({
delayId ?? undefined,
);
} catch (e) {
if (
e instanceof FailToGetOpenIdToken ||
e instanceof NoMatrix2AuthorizationService
) {
// rethrow as is
throw e;
}
// Catch others and rethrow as FailToGetOpenIdToken that has user friendly message.
logger.error("Failed to get JWT from preferred transport", e);
throw new FailToGetOpenIdToken(
e instanceof Error ? e : new Error(String(e)),
logger.error(
`Failed to authenticate to transport ${transport.livekit_service_url}`,
e,
);
throw mapAuthErrorToUserFriendlyError(e);
}
}),
);
@@ -283,7 +277,7 @@ function observerOldestMembership$(
// If the OpenID request were to fail, then it's acceptable for us to fail
// this function early, as we assume the homeserver has got some problems.
async function doOpenIdAndJWTFromUrl(
url: string,
transport: LivekitTransportConfig,
forceJwtEndpoint: JwtEndpointVersion,
membership: CallMembershipIdentityParts,
roomId: string,
@@ -297,7 +291,7 @@ async function doOpenIdAndJWTFromUrl(
const sfuConfig = await getSFUConfigWithOpenID(
client,
membership,
url,
transport.livekit_service_url,
roomId,
{
forceJwtEndpoint: forceJwtEndpoint,
@@ -307,10 +301,7 @@ async function doOpenIdAndJWTFromUrl(
logger,
);
return {
transport: {
type: "livekit",
livekit_service_url: url,
},
transport,
sfuConfig,
};
}
@@ -327,6 +318,37 @@ function observeLocalTransportForOldestMembership(
ownMembershipIdentity: CallMembershipIdentityParts,
roomId: string,
): LocalTransport {
// Ensure we can authenticate with the SFU.
const authenticatedOldestMemberTransport$ = oldestMemberTransport$.pipe(
switchMap((transport) => {
// Oldest member not available -we are first- (or invalid SFU config).
if (transport === null) return of(null);
// Whenever there is transport change we want to revert
// to no transport while we do the authentication.
// So do a from(promise) here to be able to startWith(null)
return from(
doOpenIdAndJWTFromUrl(
transport,
JwtEndpointVersion.Legacy,
ownMembershipIdentity,
roomId,
client,
undefined,
),
).pipe(
catchError((e: unknown) => {
logger.error(
`Failed to authenticate to transport ${transport.livekit_service_url}`,
e,
);
throw mapAuthErrorToUserFriendlyError(e);
}),
startWith(null),
);
}),
);
// --- Oldest member mode ---
return {
// Never update the transport that we advertise in our membership. Just
@@ -335,7 +357,9 @@ function observeLocalTransportForOldestMembership(
// state changes.
advertised$: scope.behavior(
merge(
oldestMemberTransport$,
authenticatedOldestMemberTransport$.pipe(
map((t) => t?.transport ?? null),
),
preferredTransport$.pipe(map((t) => t.transport)),
).pipe(
first((t) => t !== null),
@@ -347,25 +371,7 @@ function observeLocalTransportForOldestMembership(
),
// Publish on the transport used by the oldest member.
active$: scope.behavior(
oldestMemberTransport$.pipe(
switchMap((transport) => {
// Oldest member not available (or invalid SFU config).
if (transport === null) return of(null);
// Oldest member available: fetch the SFU config.
const fetchOldestMemberTransport =
async (): Promise<LocalTransportWithSFUConfig> => ({
transport,
sfuConfig: await getSFUConfigWithOpenID(
client,
ownMembershipIdentity,
transport.livekit_service_url,
roomId,
{ forceJwtEndpoint: JwtEndpointVersion.Legacy },
logger,
),
});
return from(fetchOldestMemberTransport()).pipe(startWith(null));
}),
authenticatedOldestMemberTransport$.pipe(
tap((t) =>
logger.info(
`Publish on transport: ${t?.transport.livekit_service_url}`,
@@ -376,3 +382,17 @@ function observeLocalTransportForOldestMembership(
),
};
}
function mapAuthErrorToUserFriendlyError(e: unknown): Error {
if (
e instanceof FailToGetOpenIdToken ||
e instanceof NoMatrix2AuthorizationService
) {
// rethrow as is
return e;
}
// Catch others and rethrow as FailToGetOpenIdToken that has user friendly message.
return new FailToGetOpenIdToken(
e instanceof Error ? e : new Error(String(e)),
);
}