Co-authored-by: Valere <bill.carson@valrsoft.com>
This commit is contained in:
Timo K
2025-10-29 12:37:14 +01:00
parent cfe05f1ed9
commit 3de0bbcfc9
2 changed files with 118 additions and 38 deletions

View File

@@ -16,8 +16,10 @@ import {
type RemoteParticipant,
Room as LivekitRoom,
type RoomOptions,
Participant,
} from "livekit-client";
import {
ParticipantId,
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
@@ -37,6 +39,8 @@ import {
SFURoomCreationRestrictedError,
} from "../../utils/errors.ts";
export type PublishingParticipant = Participant;
export interface ConnectionOpts {
/** The media transport to connect to. */
transport: LivekitTransport;
@@ -72,21 +76,6 @@ export type ConnectionState =
}
| { state: "Stopped"; transport: LivekitTransport };
/**
* Represents participant publishing or expected to publish on the connection.
* It is paired with its associated rtc membership.
*/
export type PublishingParticipant = {
/**
* The LiveKit participant publishing on this connection, or undefined if the participant is not currently (yet) connected to the livekit room.
*/
participant: RemoteParticipant | undefined;
/**
* The rtc call membership associated with this participant.
*/
membership: CallMembership;
};
/**
* A connection to a Matrix RTC LiveKit backend.
*
@@ -205,7 +194,11 @@ export class Connection {
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly publishingParticipants$: Behavior<PublishingParticipant[]>;
public readonly participantsWithPublishTrack$: Behavior<
PublishingParticipant[]
>;
/**
* The media transport to connect to.
@@ -233,12 +226,15 @@ export class Connection {
this.transport = transport;
this.client = client;
const participantsIncludingSubscribers$: Behavior<RemoteParticipant[]> =
scope.behavior(connectedParticipantsObserver(this.livekitRoom), []);
this.participantsWithPublishTrack$ = scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
// Legacy using callMemberships
this.publishingParticipants$ = scope.behavior(
combineLatest(
[participantsIncludingSubscribers$, membershipsWithTransport$],
[this.participantsIncludingSubscribers$, membershipsWithTransport$],
(participants, remoteTransports) =>
remoteTransports
// Find all members that claim to publish on this connection

View File

@@ -19,6 +19,7 @@ import {
type Transport,
LivekitTransport,
isLivekitTransport,
ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc";
import {
combineLatest,
@@ -40,20 +41,44 @@ import { pauseWhen } from "../../utils/observable";
// - make ConnectionManager its own actual class
// - write test for scopes (do we really need to bind scope)
class ConnectionManager {
constructor(transports$: Observable<Transport[]>) {}
public startWithMemberships(memberships$: Behavior<CallMembership[]>) {}
public setTansports(transports$: Behavior<Transport[]>): void {}
public readonly connections$: Observable<Connection[]> = constant([]);
// connection is used to find the transport (to find matching callmembership) & for the livekitRoom
public readonly participantsByMemberId$: Behavior<
Map<
ParticipantId,
// It can be an array because a bad behaving client could be publishingParticipants$
// multiple times to several livekit rooms.
{ participant: LivekitParticipant; connection: Connection }[]
>
> = constant(new Map());
}
/**
* Represents participant publishing or expected to publish on the connection.
* It is paired with its associated rtc membership.
*/
export type PublishingParticipant = {
/**
* The LiveKit participant publishing on this connection, or undefined if the participant is not currently (yet) connected to the livekit room.
*/
participant: RemoteParticipant | undefined;
/**
* The rtc call membership associated with this participant.
*/
membership: CallMembership;
};
/**
* Represent a matrix call member and his associated livekit participation.
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitItem {
membership: CallMembership;
livekitParticipant?: LivekitParticipant;
//TODO Try to remove this! Its waaay to much information
// Just use to get the member's avatar
member?: RoomMember;
}
@@ -107,7 +132,7 @@ export class MatrixLivekitMerger {
private scope: ObservableScope,
private matrixRoom: MatrixRoom,
) {
connectionManager.startWithMemberships(this.memberships$);
connectionManager.setTansports(this.transports$);
}
/**
@@ -118,17 +143,12 @@ export class MatrixLivekitMerger {
* together when it might change together is what you have to do in RxJS to
* avoid reading inconsistent state or observing too many changes.)
*/
// TODO pass this over to our conncetions
private readonly membershipsWithTransport$: Behavior<{
membership: CallMembership;
transport?: LivekitTransport;
} | null> = this.scope.behavior(
private readonly membershipsWithTransport$ = this.scope.behavior(
this.memberships$.pipe(
map((memberships) => {
const oldestMembership = this.matrixRTCSession.getOldestMembership();
memberships.map((membership) => {
let transport = membership.getTransport(
return memberships.map((membership) => {
const transport = membership.getTransport(
oldestMembership ?? membership,
);
return {
@@ -140,14 +160,65 @@ export class MatrixLivekitMerger {
),
);
private allPublishingParticipants$ = this.connectionManager.connections$.pipe(
switchMap((connections) => {
const listOfPublishingParticipants = connections.map(
(connection) => connection.publishingParticipants$,
);
return combineLatest(listOfPublishingParticipants).pipe(
map((list) => list.flatMap((innerList) => innerList)),
);
private readonly transports$ = this.scope.behavior(
this.membershipsWithTransport$.pipe(
map((membershipsWithTransport) =>
membershipsWithTransport.reduce((acc, { transport }) => {
if (
transport &&
!acc.some((t) => areLivekitTransportsEqual(t, transport))
) {
acc.push(transport);
}
return acc;
}, [] as LivekitTransport[]),
),
),
);
// TODO move this over this the connection manager
// We have a lost of connections, for each of these these
// connection we create a stream of (participant, connection) tuples.
// Then we combine the several streams (1 per Connection) into a single stream of tuples.
private participantsWithConnection$ =
this.connectionManager.connections$.pipe(
switchMap((connections) => {
const listsOfParticipantWithConnection = connections.map(
(connection) => {
return connection.participantsWithPublishTrack$.pipe(
map((participants) =>
participants.map((p) => ({
participant: p,
connection,
})),
),
);
},
);
return combineLatest(listsOfParticipantWithConnection).pipe(
map((lists) => lists.flatMap((list) => list)),
);
}),
);
// TODO move this over this the connection manager
// Filters the livekit partic
private participantsByMemberId$ = this.participantsWithConnection$.pipe(
map((participantsWithConnections) => {
const participantsByMemberId = new Map<string, Participant[]>();
participantsWithConnections.forEach(({ participant, connection }) => {
if (participant.getTrackPublications().length > 0) {
const currentVal = participantsByMemberId.get(participant.identity);
participantsByMemberId.set(participant.identity, {
connection,
participants:
currentVal === undefined
? [participant]
: ([...currentVal, participant] as Participant[]),
});
}
});
return participantsByMemberId;
}),
);
@@ -172,3 +243,16 @@ export class MatrixLivekitMerger {
)
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$));
}
// TODO add this to the JS-SDK
function areLivekitTransportsEqual(
t1: LivekitTransport,
t2: LivekitTransport,
): boolean {
return (
t1.livekit_service_url === t2.livekit_service_url &&
// In case we have different lk rooms in the same SFU (depends on the livekit authorization service)
// It is only needed in case the livekit authorization service is not behaving as expected (or custom implementation)
t1.livekit_alias === t2.livekit_alias
);
}