This commit is contained in:
Timo K
2025-10-28 21:18:47 +01:00
parent 5b21691c21
commit 9cdbb1135f
7 changed files with 720 additions and 410 deletions

View File

@@ -118,12 +118,12 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { type Connection, RemoteConnection } from "./Connection";
import { type Connection, RemoteConnection } from "./remoteMembers/Connection.ts";
import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
import { ElementWidgetActions, widget } from "../widget";
import { PublishConnection } from "./PublishConnection.ts";
import { PublishConnection } from "./ownMember/PublishConnection.ts";
import { type Async, async$, mapAsync, ready } from "./Async";
import { sharingScreen$, UserMedia } from "./UserMedia.ts";
import { ScreenShare } from "./ScreenShare.ts";
@@ -138,6 +138,7 @@ import {
} from "./layout-types.ts";
import { ElementCallError, UnknownCallError } from "../utils/errors.ts";
import { ObservableScope } from "./ObservableScope.ts";
import { memberDisplaynames$ } from "./remoteMembers/displayname.ts";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -217,10 +218,12 @@ export class CallViewModel {
private readonly join$ = new Subject<void>();
// DISCUSS BAD ?
public join(): void {
this.join$.next();
}
// CODESMALL
// This is functionally the same Observable as leave$, except here it's
// hoisted to the top of the class. This enables the cyclic dependency between
// leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ ->
@@ -233,6 +236,7 @@ export class CallViewModel {
* Whether we are joined to the call. This reflects our local state rather
* than whether all connections are truly up and running.
*/
// DISCUSS ? lets think why we need joined and how to do it better
private readonly joined$ = this.scope.behavior(
this.join$.pipe(
map(() => true),
@@ -246,26 +250,290 @@ export class CallViewModel {
);
/**
* The MatrixRTC session participants.
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
// Note that MatrixRTCSession already filters the call memberships by users
// that are joined to the room; we don't need to perform extra filtering here.
private readonly memberships$ = this.scope.behavior(
fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.memberships),
// DISCUSS move to ownMembership
private readonly preferredTransport$ = this.scope.behavior(
async$(makeTransport(this.matrixRTCSession)),
);
/**
* The transport over which we should be actively publishing our media.
* null when not joined.
*/
// DISCUSSION ownMembershipManager
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
distinctUntilChanged<Async<LivekitTransport> | null>(deepCompare),
),
);
/**
* The transport we should advertise in our MatrixRTC membership (plus whether
* it is a multi-SFU transport and whether we should use sticky events).
*/
// DISCUSSION ownMembershipManager
private readonly advertisedTransport$: Behavior<{
multiSfu: boolean;
preferStickyEvents: boolean;
transport: LivekitTransport;
} | null> = this.scope.behavior(
this.transports$.pipe(
map((transports) =>
transports?.local.state === "ready" &&
transports.preferred.state === "ready"
? {
multiSfu: transports.multiSfu,
preferStickyEvents: transports.preferStickyEvents,
// In non-multi-SFU mode we should always advertise the preferred
// SFU to minimize the number of membership updates
transport: transports.multiSfu
? transports.local.value
: transports.preferred.value,
}
: null,
),
distinctUntilChanged<{
multiSfu: boolean;
preferStickyEvents: boolean;
transport: LivekitTransport;
} | null>(deepCompare),
),
);
// DISCUSSION move to ConnectionManager
/**
* The local connection over which we will publish our media. It could
* possibly also have some remote users' media available on it.
* null when not joined.
*/
private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
this.scope.behavior(
generateKeyed$<
Async<LivekitTransport> | null,
PublishConnection,
Async<PublishConnection> | null
>(
this.localTransport$,
(transport, createOrGet) =>
transport &&
mapAsync(transport, (transport) =>
createOrGet(
// Stable key that uniquely idenifies the transport
JSON.stringify({
url: transport.livekit_service_url,
alias: transport.livekit_alias,
}),
(scope) =>
new PublishConnection(
{
transport,
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
livekitRoomFactory: this.options.livekitRoomFactory,
},
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
),
),
),
);
// DISCUSSION move to ConnectionManager
public readonly livekitConnectionState$ =
// TODO: This options.connectionState$ behavior is a small hack inserted
// here to facilitate testing. This would likely be better served by
// breaking CallViewModel down into more naturally testable components.
this.options.connectionState$ ??
this.scope.behavior<ConnectionState>(
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
c.value.state$.pipe(
switchMap((s) => {
if (s.state === "ConnectedToLkRoom")
return s.connectionState$;
return of(ConnectionState.Disconnected);
}),
)
: of(ConnectionState.Disconnected),
),
),
);
/**
* Connections for each transport in use by one or more session members that
* is *distinct* from the local transport.
*/
// DISCUSSION move to ConnectionManager
private readonly remoteConnections$ = this.scope.behavior(
generateKeyed$<typeof this.transports$.value, Connection, Connection[]>(
this.transports$,
(transports, createOrGet) => {
const connections: Connection[] = [];
// Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") {
// TODO: Handle custom transport.livekit_alias values here
const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set(
transports.remote.map(
({ transport }) => transport.livekit_service_url,
),
);
remoteServiceUrls.delete(localServiceUrl);
for (const remoteServiceUrl of remoteServiceUrls)
connections.push(
createOrGet(
remoteServiceUrl,
(scope) =>
new RemoteConnection(
{
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
},
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
livekitRoomFactory: this.options.livekitRoomFactory,
},
this.e2eeLivekitOptions(),
),
),
);
}
return connections;
},
),
);
/**
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
* A list of the connections that should be active at any given time.
*/
private readonly preferredTransport$ = this.scope.behavior(
async$(makeTransport(this.matrixRTCSession)),
// DISCUSSION move to ConnectionManager
private readonly connections$ = this.scope.behavior<Connection[]>(
combineLatest(
[this.localConnection$, this.remoteConnections$],
(local, remote) => [
...(local?.state === "ready" ? [local.value] : []),
...remote.values(),
],
),
);
/**
* Emits with connections whenever they should be started or stopped.
*/
// DISCUSSION move to ConnectionManager
private readonly connectionInstructions$ = this.connections$.pipe(
pairwise(),
map(([prev, next]) => {
const start = new Set(next.values());
for (const connection of prev) start.delete(connection);
const stop = new Set(prev.values());
for (const connection of next) stop.delete(connection);
return { start, stop };
}),
);
public readonly allLivekitRooms$ = this.scope.behavior(
this.connections$.pipe(
map((connections) =>
[...connections.values()].map((c) => ({
room: c.livekitRoom,
url: c.transport.livekit_service_url,
isLocal: c instanceof PublishConnection,
})),
),
),
);
private readonly userId = this.matrixRoom.client.getUserId()!;
private readonly deviceId = this.matrixRoom.client.getDeviceId()!;
/**
* Whether we are connected to the MatrixRTC session.
*/
// DISCUSSION own membership manager
private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([this.matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.StatusChanged,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.ProbablyLeft,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.probablyLeft !== true),
),
),
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
// DISCUSSION own membership manager
private readonly connected$ = this.scope.behavior(
and$(
this.matrixConnected$,
this.livekitConnectionState$.pipe(
map((state) => state === ConnectionState.Connected),
),
),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
// DISCUSSION own membership manager
public readonly reconnecting$ = this.scope.behavior(
this.connected$.pipe(
// We are reconnecting if we previously had some successful initial
// connection but are now disconnected
scan(
({ connectedPreviously }, connectedNow) => ({
connectedPreviously: connectedPreviously || connectedNow,
reconnecting: connectedPreviously && !connectedNow,
}),
{ connectedPreviously: false, reconnecting: false },
),
map(({ reconnecting }) => reconnecting),
),
);
/**
@@ -276,7 +544,8 @@ export class CallViewModel {
* together when it might change together is what you have to do in RxJS to
* avoid reading inconsistent state or observing too many changes.)
*/
// TODO-MULTI-SFU find a better name for this. with the addition of sticky events it's no longer just about transports.
// TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports.
// DISCUSS move the local part to the own membership file
private readonly transports$: Behavior<{
local: Async<LivekitTransport>;
remote: { membership: CallMembership; transport: LivekitTransport }[];
@@ -342,282 +611,6 @@ export class CallViewModel {
),
);
/**
* Lists the transports used by each MatrixRTC session member other than
* ourselves.
*/
private readonly remoteTransports$ = this.scope.behavior(
this.transports$.pipe(map((transports) => transports?.remote ?? [])),
);
/**
* The transport over which we should be actively publishing our media.
* null when not joined.
*/
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
distinctUntilChanged<Async<LivekitTransport> | null>(deepCompare),
),
);
/**
* The transport we should advertise in our MatrixRTC membership (plus whether
* it is a multi-SFU transport and whether we should use sticky events).
*/
private readonly advertisedTransport$: Behavior<{
multiSfu: boolean;
preferStickyEvents: boolean;
transport: LivekitTransport;
} | null> = this.scope.behavior(
this.transports$.pipe(
map((transports) =>
transports?.local.state === "ready" &&
transports.preferred.state === "ready"
? {
multiSfu: transports.multiSfu,
preferStickyEvents: transports.preferStickyEvents,
// In non-multi-SFU mode we should always advertise the preferred
// SFU to minimize the number of membership updates
transport: transports.multiSfu
? transports.local.value
: transports.preferred.value,
}
: null,
),
distinctUntilChanged<{
multiSfu: boolean;
preferStickyEvents: boolean;
transport: LivekitTransport;
} | null>(deepCompare),
),
);
/**
* The local connection over which we will publish our media. It could
* possibly also have some remote users' media available on it.
* null when not joined.
*/
private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
this.scope.behavior(
generateKeyed$<
Async<LivekitTransport> | null,
PublishConnection,
Async<PublishConnection> | null
>(
this.localTransport$,
(transport, createOrGet) =>
transport &&
mapAsync(transport, (transport) =>
createOrGet(
// Stable key that uniquely idenifies the transport
JSON.stringify({
url: transport.livekit_service_url,
alias: transport.livekit_alias,
}),
(scope) =>
new PublishConnection(
{
transport,
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
livekitRoomFactory: this.options.livekitRoomFactory,
},
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
),
),
),
);
public readonly livekitConnectionState$ =
// TODO: This options.connectionState$ behavior is a small hack inserted
// here to facilitate testing. This would likely be better served by
// breaking CallViewModel down into more naturally testable components.
this.options.connectionState$ ??
this.scope.behavior<ConnectionState>(
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
c.value.transportState$.pipe(
switchMap((s) => {
if (s.state === "ConnectedToLkRoom")
return s.connectionState$;
return of(ConnectionState.Disconnected);
}),
)
: of(ConnectionState.Disconnected),
),
),
);
/**
* Connections for each transport in use by one or more session members that
* is *distinct* from the local transport.
*/
private readonly remoteConnections$ = this.scope.behavior(
generateKeyed$<typeof this.transports$.value, Connection, Connection[]>(
this.transports$,
(transports, createOrGet) => {
const connections: Connection[] = [];
// Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") {
// TODO: Handle custom transport.livekit_alias values here
const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set(
transports.remote.map(
({ transport }) => transport.livekit_service_url,
),
);
remoteServiceUrls.delete(localServiceUrl);
for (const remoteServiceUrl of remoteServiceUrls)
connections.push(
createOrGet(
remoteServiceUrl,
(scope) =>
new RemoteConnection(
{
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
},
client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
livekitRoomFactory: this.options.livekitRoomFactory,
},
this.e2eeLivekitOptions(),
),
),
);
}
return connections;
},
),
);
/**
* A list of the connections that should be active at any given time.
*/
private readonly connections$ = this.scope.behavior<Connection[]>(
combineLatest(
[this.localConnection$, this.remoteConnections$],
(local, remote) => [
...(local?.state === "ready" ? [local.value] : []),
...remote.values(),
],
),
);
/**
* Emits with connections whenever they should be started or stopped.
*/
private readonly connectionInstructions$ = this.connections$.pipe(
pairwise(),
map(([prev, next]) => {
const start = new Set(next.values());
for (const connection of prev) start.delete(connection);
const stop = new Set(prev.values());
for (const connection of next) stop.delete(connection);
return { start, stop };
}),
);
public readonly allLivekitRooms$ = this.scope.behavior(
this.connections$.pipe(
map((connections) =>
[...connections.values()].map((c) => ({
room: c.livekitRoom,
url: c.transport.livekit_service_url,
isLocal: c instanceof PublishConnection,
})),
),
),
);
private readonly userId = this.matrixRoom.client.getUserId()!;
private readonly deviceId = this.matrixRoom.client.getDeviceId()!;
/**
* Whether we are connected to the MatrixRTC session.
*/
private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(this.matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([this.matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.StatusChanged,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(
this.matrixRTCSession,
MembershipManagerEvent.ProbablyLeft,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.probablyLeft !== true),
),
),
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
private readonly connected$ = this.scope.behavior(
and$(
this.matrixConnected$,
this.livekitConnectionState$.pipe(
map((state) => state === ConnectionState.Connected),
),
),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
public readonly reconnecting$ = this.scope.behavior(
this.connected$.pipe(
// We are reconnecting if we previously had some successful initial
// connection but are now disconnected
scan(
({ connectedPreviously }, connectedNow) => ({
connectedPreviously: connectedPreviously || connectedNow,
reconnecting: connectedPreviously && !connectedNow,
}),
{ connectedPreviously: false, reconnecting: false },
),
map(({ reconnecting }) => reconnecting),
),
);
/**
* Whether various media/event sources should pretend to be disconnected from
* all network input, even if their connection still technically works.
@@ -626,6 +619,7 @@ export class CallViewModel {
// that the LiveKit connection is still functional while the homeserver is
// down, for example, and we want to avoid making people worry that the app is
// in a split-brained state.
// DISCUSSION own membership manager ALSO this probably can be simplifis
private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
@@ -718,57 +712,6 @@ export class CallViewModel {
),
);
/**
* Displaynames for each member of the call. This will disambiguate
* any displaynames that clashes with another member. Only members
* joined to the call are considered here.
*/
// It turns out that doing the disambiguation above is rather expensive on Safari (10x slower
// than on Chrome/Firefox). This means it is important that we multicast the result so that we
// don't do this work more times than we need to. This is achieved by converting to a behavior:
public readonly memberDisplaynames$ = this.scope.behavior(
combineLatest(
[
// Handle call membership changes
this.memberships$,
// Additionally handle display name changes (implicitly reacting to them)
fromEvent(this.matrixRoom, RoomStateEvent.Members).pipe(
startWith(null),
),
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
],
(memberships, _displaynames) => {
const displaynameMap = new Map<string, string>([
[
`${this.userId}:${this.deviceId}`,
this.matrixRoom.getMember(this.userId)?.rawDisplayName ??
this.userId,
],
]);
const room = this.matrixRoom;
// We only consider RTC members for disambiguation as they are the only visible members.
for (const rtcMember of memberships) {
const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`;
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
if (!member) {
logger.error(
"Could not find member for media id:",
matrixIdentifier,
);
continue;
}
const disambiguate = shouldDisambiguate(member, memberships, room);
displaynameMap.set(
matrixIdentifier,
calculateDisplayName(member, disambiguate),
);
}
return displaynameMap;
},
),
);
public readonly handsRaised$ = this.scope.behavior(
this.handsRaisedSubject$.pipe(pauseWhen(this.pretendToBeDisconnected$)),
);
@@ -787,6 +730,14 @@ export class CallViewModel {
),
);
memberDisplaynames$ = memberDisplaynames$(
this.matrixRoom,
this.memberships$,
this.scope,
this.userId,
this.deviceId,
);
/**
* List of MediaItems that we want to have tiles for.
*/
@@ -1655,6 +1606,8 @@ export class CallViewModel {
/**
* Emits an array of reactions that should be visible on the screen.
*/
// DISCUSSION move this into a reaction file
// const {visibleReactions$, audibleReactions$} = reactionsObservables$(showReactionSetting$, )
public readonly visibleReactions$ = this.scope.behavior(
showReactions.value$.pipe(
switchMap((show) => (show ? this.reactions$ : of({}))),
@@ -1790,6 +1743,7 @@ export class CallViewModel {
private readonly trackProcessorState$: Observable<ProcessorState>,
) {
// Start and stop local and remote connections as needed
// DISCUSSION connection manager
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
@@ -1947,13 +1901,3 @@ function getE2eeKeyProvider(
return keyProvider;
}
}
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
): { id: string; member: RoomMember | undefined } {
return {
id: rtcMember.userId + ":" + rtcMember.deviceId,
member: room.getMember(rtcMember.userId) ?? undefined,
};
}

View File

@@ -0,0 +1,85 @@
import { Behavior } from "../Behavior";
const ownMembership$ = (
multiSfu: boolean,
preferStickyEvents: boolean,
): {
connected: Behavior<boolean>;
transport: Behavior<LivekitTransport | null>;
} => {
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
* members. For completeness this also lists the preferred transport and
* whether we are in multi-SFU mode or sticky events mode (because
* advertisedTransport$ wants to read them at the same time, and bundling data
* together when it might change together is what you have to do in RxJS to
* avoid reading inconsistent state or observing too many changes.)
*/
// TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports.
// DISCUSS move to MatrixLivekitMerger
const transport$: Behavior<{
local: Async<LivekitTransport>;
preferred: Async<LivekitTransport>;
multiSfu: boolean;
preferStickyEvents: boolean;
} | null> = this.scope.behavior(
this.joined$.pipe(
switchMap((joined) =>
joined
? combineLatest(
[
this.preferredTransport$,
this.memberships$,
multiSfu.value$,
preferStickyEvents.value$,
],
(preferred, memberships, preferMultiSfu, preferStickyEvents) => {
// Multi-SFU must be implicitly enabled when using sticky events
const multiSfu = preferStickyEvents || preferMultiSfu;
const oldestMembership =
this.matrixRTCSession.getOldestMembership();
const remote = memberships.flatMap((m) => {
if (m.userId === this.userId && m.deviceId === this.deviceId)
return [];
const t = m.getTransport(oldestMembership ?? m);
return t && isLivekitTransport(t)
? [{ membership: m, transport: t }]
: [];
});
let local = preferred;
if (!multiSfu) {
const oldest = this.matrixRTCSession.getOldestMembership();
if (oldest !== undefined) {
const selection = oldest.getTransport(oldest);
// TODO selection can be null if no transport is configured should we report an error?
if (selection && isLivekitTransport(selection))
local = ready(selection);
}
}
if (local.state === "error") {
this._configError$.next(
local.value instanceof ElementCallError
? local.value
: new UnknownCallError(local.value),
);
}
return {
local,
remote,
preferred,
multiSfu,
preferStickyEvents,
};
},
)
: of(null),
),
),
);
return { connected: true, transport$ };
};

View File

@@ -21,19 +21,19 @@ import {
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import type { Behavior } from "./Behavior.ts";
import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts";
import type { MuteStates } from "./MuteStates.ts";
import type { Behavior } from "../Behavior.ts";
import type { MediaDevices, SelectedDevice } from "../MediaDevices.ts";
import type { MuteStates } from "../MuteStates.ts";
import {
type ProcessorState,
trackProcessorSync,
} from "../livekit/TrackProcessorContext.tsx";
import { getUrlParams } from "../UrlParams.ts";
import { defaultLiveKitOptions } from "../livekit/options.ts";
import { getValue } from "../utils/observable.ts";
import { observeTrackReference$ } from "./MediaViewModel.ts";
import { Connection, type ConnectionOpts } from "./Connection.ts";
import { type ObservableScope } from "./ObservableScope.ts";
} from "../../livekit/TrackProcessorContext.tsx";
import { getUrlParams } from "../../UrlParams.ts";
import { defaultLiveKitOptions } from "../../livekit/options.ts";
import { getValue } from "../../utils/observable.ts";
import { observeTrackReference$ } from "../MediaViewModel.ts";
import { Connection, type ConnectionOpts } from "../remoteMembers/Connection.ts";
import { type ObservableScope } from "../ObservableScope.ts";
/**
* A connection to the local LiveKit room, the one the user is publishing to.

View File

@@ -34,17 +34,17 @@ import type {
} from "matrix-js-sdk/lib/matrixrtc";
import {
type ConnectionOpts,
type TransportState,
type ConnectionState,
type PublishingParticipant,
RemoteConnection,
} from "./Connection.ts";
import { ObservableScope } from "./ObservableScope.ts";
import { type OpenIDClientParts } from "../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../utils/errors.ts";
import { PublishConnection } from "./PublishConnection.ts";
import { mockMediaDevices, mockMuteStates } from "../utils/test.ts";
import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx";
import { type MuteStates } from "./MuteStates.ts";
import { ObservableScope } from "../ObservableScope.ts";
import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../../utils/errors.ts";
import { PublishConnection } from "../ownMember/PublishConnection.ts";
import { mockMediaDevices, mockMuteStates } from "../../utils/test.ts";
import type { ProcessorState } from "../../livekit/TrackProcessorContext.tsx";
import { type MuteStates } from "../MuteStates.ts";
let testScope: ObservableScope;
@@ -161,7 +161,7 @@ describe("Start connection states", () => {
};
const connection = new RemoteConnection(opts, undefined);
expect(connection.transportState$.getValue().state).toEqual("Initialized");
expect(connection.state$.getValue().state).toEqual("Initialized");
});
it("fail to getOpenId token then error state", async () => {
@@ -178,8 +178,8 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined);
const capturedStates: TransportState[] = [];
const s = connection.transportState$.subscribe((value) => {
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
onTestFinished(() => s.unsubscribe());
@@ -231,8 +231,8 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined);
const capturedStates: TransportState[] = [];
const s = connection.transportState$.subscribe((value) => {
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
onTestFinished(() => s.unsubscribe());
@@ -288,8 +288,8 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined);
const capturedStates: TransportState[] = [];
const s = connection.transportState$.subscribe((value) => {
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
onTestFinished(() => s.unsubscribe());
@@ -345,8 +345,8 @@ describe("Start connection states", () => {
const connection = setupRemoteConnection();
const capturedStates: TransportState[] = [];
const s = connection.transportState$.subscribe((value) => {
const capturedStates: ConnectionState[] = [];
const s = connection.state$.subscribe((value) => {
capturedStates.push(value);
});
onTestFinished(() => s.unsubscribe());
@@ -401,7 +401,7 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = [];
const s = connection.publishingParticipants$.subscribe((publishers) => {
const s = connection.allLivekitParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
if (
publishers.some(
@@ -538,7 +538,7 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = [];
const s = connection.publishingParticipants$.subscribe((publishers) => {
const s = connection.allLivekitParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
});
onTestFinished(() => s.unsubscribe());

View File

@@ -11,7 +11,7 @@ import {
} from "@livekit/components-core";
import {
ConnectionError,
type ConnectionState,
type ConnectionState as LivekitConenctionState,
type E2EEOptions,
type RemoteParticipant,
Room as LivekitRoom,
@@ -21,21 +21,21 @@ import {
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { BehaviorSubject, combineLatest, type Observable } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import {
getSFUConfigWithOpenID,
type OpenIDClientParts,
type SFUConfig,
} from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope";
import { defaultLiveKitOptions } from "../livekit/options";
} from "../../livekit/openIDSFU.ts";
import { type Behavior } from "../Behavior.ts";
import { type ObservableScope } from "../ObservableScope.ts";
import { defaultLiveKitOptions } from "../../livekit/options.ts";
import {
InsufficientCapacityError,
SFURoomCreationRestrictedError,
} from "../utils/errors.ts";
} from "../../utils/errors.ts";
export interface ConnectionOpts {
/** The media transport to connect to. */
@@ -44,8 +44,14 @@ export interface ConnectionOpts {
client: OpenIDClientParts;
/** The observable scope to use for this connection. */
scope: ObservableScope;
/** An observable of the current RTC call memberships and their associated transports. */
remoteTransports$: Behavior<
/**
* An observable of the current RTC call memberships and their associated transports.
* Used to differentiate between publishing and subscribging participants on each connection.
* Used to find out which rtc member should upload to this connection (publishingParticipants$).
* The livekit room gives access to all the users subscribing to this connection, we need
* to filter out the ones that are uploading to this connection.
*/
membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport: LivekitTransport }[]
>;
@@ -53,7 +59,7 @@ export interface ConnectionOpts {
livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom;
}
export type TransportState =
export type ConnectionState =
| { state: "Initialized" }
| { state: "FetchingConfig"; transport: LivekitTransport }
| { state: "ConnectingToLkRoom"; transport: LivekitTransport }
@@ -61,7 +67,7 @@ export type TransportState =
| { state: "FailedToStart"; error: Error; transport: LivekitTransport }
| {
state: "ConnectedToLkRoom";
connectionState$: Observable<ConnectionState>;
livekitConnectionState$: Observable<LivekitConenctionState>;
transport: LivekitTransport;
}
| { state: "Stopped"; transport: LivekitTransport };
@@ -88,15 +94,14 @@ export type PublishingParticipant = {
*/
export class Connection {
// Private Behavior
private readonly _transportState$ = new BehaviorSubject<TransportState>({
private readonly _state$ = new BehaviorSubject<ConnectionState>({
state: "Initialized",
});
/**
* The current state of the connection to the media transport.
*/
public readonly transportState$: Behavior<TransportState> =
this._transportState$;
public readonly state$: Behavior<ConnectionState> = this._state$;
/**
* Whether the connection has been stopped.
@@ -118,7 +123,7 @@ export class Connection {
public async start(): Promise<void> {
this.stopped = false;
try {
this._transportState$.next({
this._state$.next({
state: "FetchingConfig",
transport: this.transport,
});
@@ -126,7 +131,7 @@ export class Connection {
// If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return;
this._transportState$.next({
this._state$.next({
state: "ConnectingToLkRoom",
transport: this.transport,
});
@@ -157,13 +162,13 @@ export class Connection {
// If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return;
this._transportState$.next({
this._state$.next({
state: "ConnectedToLkRoom",
transport: this.transport,
connectionState$: connectionStateObserver(this.livekitRoom),
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
});
} catch (error) {
this._transportState$.next({
this._state$.next({
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
transport: this.transport,
@@ -188,7 +193,7 @@ export class Connection {
public async stop(): Promise<void> {
if (this.stopped) return;
await this.livekitRoom.disconnect();
this._transportState$.next({
this._state$.next({
state: "Stopped",
transport: this.transport,
});
@@ -218,23 +223,22 @@ export class Connection {
protected constructor(
public readonly livekitRoom: LivekitRoom,
opts: ConnectionOpts,
logger?: Logger,
) {
logger.log(
logger?.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope, remoteTransports$ } = opts;
const { transport, client, scope, membershipsWithTransport$ } = opts;
this.transport = transport;
this.client = client;
const participantsIncludingSubscribers$ = scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
const participantsIncludingSubscribers$: Behavior<RemoteParticipant[]> =
scope.behavior(connectedParticipantsObserver(this.livekitRoom), []);
this.publishingParticipants$ = scope.behavior(
combineLatest(
[participantsIncludingSubscribers$, remoteTransports$],
[participantsIncludingSubscribers$, membershipsWithTransport$],
(participants, remoteTransports) =>
remoteTransports
// Find all members that claim to publish on this connection

View File

@@ -0,0 +1,78 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { type Room, type RoomMember, RoomStateEvent } from "matrix-js-sdk";
import { combineLatest, fromEvent, type Observable, startWith } from "rxjs";
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix";
import { type ObservableScope } from "../ObservableScope";
import { calculateDisplayName, shouldDisambiguate } from "../../utils/displayname";
/**
* Displayname for each member of the call. This will disambiguate
* any displayname that clashes with another member. Only members
* joined to the call are considered here.
*/
// don't do this work more times than we need to. This is achieved by converting to a behavior:
export const memberDisplaynames$ = (
matrixRoom: Room,
memberships$: Observable<CallMembership[]>,
scope: ObservableScope,
userId: string,
deviceId: string,
) =>
scope.behavior(
combineLatest(
[
// Handle call membership changes
memberships$,
// Additionally handle display name changes (implicitly reacting to them)
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)),
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
],
(memberships, _displaynames) => {
const displaynameMap = new Map<string, string>([
[
`${userId}:${deviceId}`,
matrixRoom.getMember(userId)?.rawDisplayName ?? userId,
],
]);
const room = matrixRoom;
// We only consider RTC members for disambiguation as they are the only visible members.
for (const rtcMember of memberships) {
const matrixIdentifier = `${rtcMember.userId}:${rtcMember.deviceId}`;
const { member } = getRoomMemberFromRtcMember(rtcMember, room);
if (!member) {
logger.error(
"Could not find member for media id:",
matrixIdentifier,
);
continue;
}
const disambiguate = shouldDisambiguate(member, memberships, room);
displaynameMap.set(
matrixIdentifier,
calculateDisplayName(member, disambiguate),
);
}
return displaynameMap;
},
),
);
export function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
): { id: string; member: RoomMember | undefined } {
return {
id: rtcMember.userId + ":" + rtcMember.deviceId,
member: room.getMember(rtcMember.userId) ?? undefined,
};
}

View File

@@ -0,0 +1,199 @@
/*
Copyright 2025 Element c.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
LocalParticipant,
Participant,
RemoteParticipant,
type Participant as LivekitParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import {
type MatrixRTCSession,
MatrixRTCSessionEvent,
type CallMembership,
type Transport,
LivekitTransport,
isLivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import {
combineLatest,
fromEvent,
map,
startWith,
switchMap,
type Observable,
} from "rxjs";
import { type ObservableScope } from "../ObservableScope";
import { type Connection } from "./Connection";
import { Behavior } from "../Behavior";
import { RoomMember } from "matrix-js-sdk";
import { getRoomMemberFromRtcMember } from "./displayname";
// TODOs:
// - make ConnectionManager its own actual class
// - write test for scopes (do we really need to bind scope)
class ConnectionManager {
constructor(transports$: Observable<Transport[]>) {}
public readonly connections$: Observable<Connection[]>;
}
/**
* Represent a matrix call member and his associated livekit participation.
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitItem {
callMembership: CallMembership;
livekitParticipant?: LivekitParticipant;
}
// Alternative structure idea:
// const livekitMatrixItems$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitItem[]> => {
// Map of Connection -> to (callMembership, LivekitParticipant?))
type participants = {participant: LocalParticipant | RemoteParticipant}[]
interface LivekitRoomWithParticipants {
livekitRoom: LivekitRoom;
url: string; // Included for use as a React key
participants: {
// What id is that??
// Looks like it userId:Deviceid?
id: string;
participant: LocalParticipant | RemoteParticipant | undefined;
// Why do we fetch a full room member here?
// looks like it is only for avatars?
// TODO: Remove that. have some Avatar Provider that can fetch avatar for user ids.
member: RoomMember;
}[];
}
/**
* Combines MatrixRtc and Livekit worlds.
*
* It has a small public interface:
* - in (via constructor):
* - an observable of CallMembership[] to track the call members (The matrix side)
* - a `ConnectionManager` for the lk rooms (The livekit side)
* - out (via public Observable):
* - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data.
*/
export class MatrixLivekitMerger {
public remoteMatrixLivekitItems$: Observable<MatrixLivekitItem[]>;
/**
* The MatrixRTC session participants.
*/
// Note that MatrixRTCSession already filters the call memberships by users
// that are joined to the room; we don't need to perform extra filtering here.
private readonly memberships$ = this.scope.behavior(
fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
).pipe(
startWith(null),
map(() => this.matrixRTCSession.memberships),
),
);
public constructor(
private matrixRTCSession: MatrixRTCSession,
private connectionManager: ConnectionManager,
private scope: ObservableScope,
) {
const publishingParticipants$ = combineLatest([
this.memberships$,
connectionManager.connections$,
]).pipe(map(), this.scope.bind());
this.remoteMatrixLivekitItems$ = combineLatest([
callMemberships$,
connectionManager.connections$,
]).pipe(this.scope.bind());
// Implementation goes here
}
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
* members. For completeness this also lists the preferred transport and
* whether we are in multi-SFU mode or sticky events mode (because
* advertisedTransport$ wants to read them at the same time, and bundling data
* together when it might change together is what you have to do in RxJS to
* avoid reading inconsistent state or observing too many changes.)
*/
private readonly membershipsWithTransport$: Behavior<{
membership: CallMembership;
transport?: LivekitTransport;
} | null> = this.scope.behavior(
this.memberships$.pipe(
map((memberships) => {
const oldestMembership = this.matrixRTCSession.getOldestMembership();
memberships.map((membership) => {
let transport = membership.getTransport(oldestMembership ?? membership)
return { membership, transport: isLivekitTransport(transport) ? transport : undefined };
})
}),
),
);
/**
* Lists the transports used by each MatrixRTC session member other than
* ourselves.
*/
// private readonly remoteTransports$ = this.scope.behavior(
// this.membershipsWithTransport$.pipe(
// map((transports) => transports?.remote ?? []),
// ),
// );
/**
* Lists, for each LiveKit room, the LiveKit participants whose media should
* be presented.
*/
private readonly participantsByRoom$ = this.scope.behavior<LivekitRoomWithParticipants[]>(
// TODO: Move this logic into Connection/PublishConnection if possible
this.connectionManager.connections$.pipe(
switchMap((connections) => {
connections.map((c)=>c.publishingParticipants$.pipe(
map((publishingParticipants) => {
const participants: {
id: string;
participant: LivekitParticipant | undefined;
member: RoomMember;
}[] = publishingParticipants.map(({ participant, membership }) => ({
// TODO update to UUID
id: `${membership.userId}:${membership.deviceId}`,
participant,
// This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
member:
getRoomMemberFromRtcMember(
membership,
this.matrixRoom,
)?.member ?? memberError(),
}));
return {
livekitRoom: c.livekitRoom,
url: c.transport.livekit_service_url,
participants,
};
}),
),
),
),
),
);
}),
)
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)),
);
}