start onwMemberhsip.ts

This commit is contained in:
Timo K
2025-10-30 15:15:49 +01:00
parent c8ef8d6a24
commit 4f892e358a
5 changed files with 175 additions and 96 deletions

View File

@@ -25,7 +25,7 @@ export type OpenIDClientParts = Pick<
export async function getSFUConfigWithOpenID(
client: OpenIDClientParts,
serviceUrl: string,
livekitAlias: string,
matrixRoomId: string,
): Promise<SFUConfig> {
let openIdToken: IOpenIDToken;
try {
@@ -43,7 +43,7 @@ export async function getSFUConfigWithOpenID(
const sfuConfig = await getLiveKitJWT(
client,
serviceUrl,
livekitAlias,
matrixRoomId,
openIdToken,
);
logger.info(`Got JWT from call's active focus URL.`);

View File

@@ -13,6 +13,7 @@ import {
} from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { type MatrixClient } from "matrix-js-sdk";
import { PosthogAnalytics } from "./analytics/PosthogAnalytics";
import { Config } from "./config/Config";
@@ -23,16 +24,13 @@ import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts";
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
export function getLivekitAlias(rtcSession: MatrixRTCSession): string {
// For now we assume everything is a room-scoped call
return rtcSession.room.roomId;
}
async function makeTransportInternal(
rtcSession: MatrixRTCSession,
client: MatrixClient,
roomId: string,
): Promise<LivekitTransport> {
logger.log("Searching for a preferred transport");
const livekitAlias = getLivekitAlias(rtcSession);
//TODO refactor this to use the jwt service returned alias.
const livekitAlias = roomId;
// TODO-MULTI-SFU: Either remove this dev tool or make it more official
const urlFromStorage =
@@ -52,7 +50,7 @@ async function makeTransportInternal(
}
// Prioritize the .well-known/matrix/client, if available, over the configured SFU
const domain = rtcSession.room.client.getDomain();
const domain = client.getDomain();
if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started
@@ -85,12 +83,13 @@ async function makeTransportInternal(
}
export async function makeTransport(
rtcSession: MatrixRTCSession,
client: MatrixClient,
roomId: string,
): Promise<LivekitTransport> {
const transport = await makeTransportInternal(rtcSession);
const transport = await makeTransportInternal(client, roomId);
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID(
rtcSession.room.client,
client,
transport.livekit_service_url,
transport.livekit_alias,
);

View File

@@ -6,6 +6,7 @@ Please see LICENSE in the repository root for full details.
*/
import { catchError, from, map, type Observable, of, startWith } from "rxjs";
import { Behavior } from "./Behavior";
/**
* Data that may need to be loaded asynchronously.
@@ -51,3 +52,9 @@ export function mapAsync<A, B>(
): Async<B> {
return async.state === "ready" ? ready(project(async.value)) : async;
}
export function unwrapAsync<A>(fallback: A): (async: Async<A>) => A {
return (async: Async<A>) => {
return async.state === "ready" ? async.value : fallback;
};
}

View File

@@ -139,6 +139,7 @@ import { ObservableScope } from "./ObservableScope.ts";
import { memberDisplaynames$ } from "./remoteMembers/displayname.ts";
import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts";
import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts";
import { ownMembership$ } from "./ownMember/OwnMembership.ts";
//TODO
// Larger rename
@@ -237,6 +238,15 @@ export class CallViewModel {
this.matrixRoom,
);
private ownMembership = ownMembership$({
scope: this.scope,
muteStates: this.muteStates,
multiSfu: this.multiSfu,
mediaDevices: this.mediaDevices,
trackProcessorState$: this.trackProcessorState$,
e2eeLivekitOptions: this.e2eeLivekitOptions,
});
/**
* If there is a configuration error with the call (e.g. misconfigured E2EE).
* This is a fatal error that prevents the call from being created/joined.
@@ -358,45 +368,6 @@ export class CallViewModel {
// );
private readonly userId = this.matrixRoom.client.getUserId()!;
private readonly deviceId = this.matrixRoom.client.getDeviceId()!;
/**
* Whether we are connected to the MatrixRTC session.
*/
// DISCUSSION own membership manager
private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([this.matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.StatusChanged,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.ProbablyLeft,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.probablyLeft !== true),
),
),
);
/**
* Whether various media/event sources should pretend to be disconnected from

View File

@@ -1,31 +1,155 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { LiveKitReactNativeInfo } from "livekit-client";
import { Behavior, constant } from "../Behavior";
import { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { ConnectionManager } from "../remoteMembers/ConnectionManager";
import { type E2EEOptions } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger";
import {
type LivekitTransport,
type MatrixRTCSession,
MembershipManagerEvent,
Status,
} from "matrix-js-sdk/lib/matrixrtc";
import {
ClientEvent,
type MatrixClient,
SyncState,
type Room as MatrixRoom,
} from "matrix-js-sdk";
import { fromEvent, map, type Observable, scan, startWith } from "rxjs";
import { multiSfu } from "../../settings/settings";
import { type Behavior } from "../Behavior";
import { type ConnectionManager } from "../remoteMembers/ConnectionManager";
import { makeTransport } from "../../rtcSessionHelpers";
import { type ObservableScope } from "../ObservableScope";
import { async$, unwrapAsync } from "../Async";
import { Publisher } from "./Publisher";
import { type MuteStates } from "../MuteStates";
import { type ProcessorState } from "../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../../state/MediaDevices";
import { and$ } from "../../utils/observable";
const ownMembership$ = (
multiSfu: boolean,
preferStickyEvents: boolean,
connectionManager: ConnectionManager,
transport: LivekitTransport,
): {
connected: Behavior<boolean>;
transport: Behavior<LivekitTransport | null>;
interface Props {
scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates;
connectionManager: ConnectionManager;
matrixRTCSession: MatrixRTCSession;
matrixRoom: MatrixRoom;
client: MatrixClient;
preferStickyEvents: boolean;
roomId: string;
e2eeLivekitOptions: E2EEOptions | undefined;
trackerProcessorState$: Behavior<ProcessorState>;
}
/**
* This class is responsible for managing the own membership in a room.
* We want
* - a publisher
* -
* @param param0
* @returns
* - publisher: The handle to create tracks and publish them to the room.
* - connected$: the current connection state. Including matrix server and livekit server connection. (only the livekit server relevant for our own participation)
* - transport$: the transport object the ownMembership$ ended up using.
*
*/
export const ownMembership$ = ({
scope,
muteStates,
mediaDevices,
preferStickyEvents,
connectionManager,
matrixRTCSession,
matrixRoom,
e2eeLivekitOptions,
client,
roomId,
trackerProcessorState$,
}: Props): {
connected$: Behavior<boolean>;
transport$: Behavior<LivekitTransport | null>;
publisher: Publisher;
} => {
const userId = this.matrixRoom.client.getUserId()!;
const deviceId = this.matrixRoom.client.getDeviceId()!;
const userId = client.getUserId()!;
const deviceId = client.getDeviceId()!;
const multiSfu$ = multiSfu.value$;
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
const preferredTransport$ = scope.behavior(
async$(makeTransport(client, roomId)).pipe(
map(unwrapAsync<LivekitTransport | null>(null)),
),
);
const connection = connectionManager.registerTransports(
constant([transport]),
scope.behavior(preferredTransport$.pipe(map((t) => (t ? [t] : [])))),
)[0];
if (!connection) {
logger.warn(
"No connection found when passing transport to connectionManager. transport:",
preferredTransport$.value,
);
}
/**
* Whether we are connected to the MatrixRTC session.
*/
// DISCUSSION own membership manager
const matrixConnected$ = scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
),
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
const connected$ = scope.behavior(
and$(
matrixConnected$,
connection.state$.pipe(
map((state) => state.state === "ConnectedToLkRoom"),
),
),
);
const publisher = new Publisher(
scope,
connection,
mediaDevices,
muteStates,
e2eeLivekitOptions,
trackerProcessorState$,
);
const publisher = new Publisher(connection);
// HOW IT WAS PREVIEOUSLY CREATED
// new PublishConnection(
@@ -41,21 +165,13 @@ const ownMembership$ = (
// this.e2eeLivekitOptions(),
// this.scope.behavior(this.trackProcessorState$),
// ),
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
// DISCUSS move to ownMembership
private readonly preferredTransport$ = this.scope.behavior(
async$(makeTransport(this.matrixRTCSession)),
);
/**
* The transport over which we should be actively publishing our media.
* null when not joined.
*/
// DISCUSSION ownMembershipManager
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
const localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
@@ -68,7 +184,7 @@ const ownMembership$ = (
* it is a multi-SFU transport and whether we should use sticky events).
*/
// DISCUSSION ownMembershipManager
private readonly advertisedTransport$: Behavior<{
const advertisedTransport$: Behavior<{
multiSfu: boolean;
preferStickyEvents: boolean;
transport: LivekitTransport;
@@ -97,27 +213,13 @@ const ownMembership$ = (
);
// MATRIX RELATED
//
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
// DISCUSSION own membership manager
private readonly connected$ = this.scope.behavior(
and$(
this.matrixConnected$,
this.livekitConnectionState$.pipe(
map((state) => state === ConnectionState.Connected),
),
),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
// DISCUSSION own membership manager
public readonly reconnecting$ = this.scope.behavior(
this.connected$.pipe(
const reconnecting$ = scope.behavior(
connected$.pipe(
// We are reconnecting if we previously had some successful initial
// connection but are now disconnected
scan(
@@ -130,5 +232,5 @@ const ownMembership$ = (
map(({ reconnecting }) => reconnecting),
),
);
return { connected: true, transport$ };
return { connected$, transport$: preferredTransport$, publisher };
};