This commit is contained in:
Robin
2025-11-06 11:23:36 -05:00
parent d8e29467f6
commit ee2a6718a0
4 changed files with 160 additions and 145 deletions

View File

@@ -67,7 +67,7 @@ import {
ScreenShareViewModel,
type UserMediaViewModel,
} from "./MediaViewModel";
import { accumulate, generateKeyed$, pauseWhen } from "../utils/observable";
import { accumulate, generateMap$, pauseWhen } from "../utils/observable";
import {
duplicateTiles,
MatrixRTCMode,
@@ -110,7 +110,7 @@ import {
} from "./layout-types.ts";
import { type ElementCallError } from "../utils/errors.ts";
import { type ObservableScope } from "./ObservableScope.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/matrixLivekitMerger.ts";
import { createMatrixLivekitMembers$ } from "./remoteMembers/MatrixLivekitMembers.ts";
import { createLocalMembership$ } from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import { createSessionMembershipsAndTransports$ } from "./SessionBehaviors.ts";
@@ -335,24 +335,17 @@ export class CallViewModel {
*/
// TODO KEEP THIS!! and adapt it to what our membershipManger returns
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
generateKeyed$<
[typeof this.matrixLivekitMembers$.value, number],
MediaItem,
MediaItem[]
>(
generateMap$(
// Generate a collection of MediaItems from the list of expected (whether
// present or missing) LiveKit participants.
combineLatest([this.matrixLivekitMembers$, duplicateTiles.value$]),
([matrixLivekitMembers, duplicateTiles], createOrGet) => {
function* ([matrixLivekitMembers, duplicateTiles]) {
const items: MediaItem[] = [];
for (const {
connection,
participant,
member,
displayName$,
for (const [
participantId,
} of matrixLivekitMembers) {
{ connection$, participant$, member$, displayName$ },
] of matrixLivekitMembers) {
if (connection === undefined) {
logger.warn("connection is not yet initialised.");
continue;
@@ -361,7 +354,6 @@ export class CallViewModel {
const mediaId = `${participantId}:${i}`;
const lkRoom = connection?.livekitRoom;
const url = connection?.transport.livekit_service_url;
const dpName$ = displayName$.pipe(map((n) => n ?? "[👻]"));
const item = createOrGet(
mediaId,
(scope) =>
@@ -378,7 +370,7 @@ export class CallViewModel {
url,
this.mediaDevices,
this.pretendToBeDisconnected$,
dpName$,
displayName$,
this.handsRaised$.pipe(
map((v) => v[participantId]?.time ?? null),
),
@@ -405,7 +397,7 @@ export class CallViewModel {
lkRoom,
url,
this.pretendToBeDisconnected$,
dpName$,
displayName$,
),
),
);

View File

@@ -20,8 +20,8 @@ import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../Behavior";
import { type Connection } from "./Connection";
import { type ObservableScope } from "../ObservableScope";
import { generateKeyed$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./matrixLivekitMerger";
import { generateItems$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
@@ -142,31 +142,28 @@ export function createConnectionManager$({
* Connections for each transport in use by one or more session members.
*/
const connections$ = scope.behavior(
generateKeyed$<LivekitTransport[], Connection, Connection[]>(
generateItems$(
transports$,
(transports, createOrGet) => {
const createConnection =
(
transport: LivekitTransport,
): ((scope: ObservableScope) => Connection) =>
(scope) => {
const connection = connectionFactory.createConnection(
transport,
scope,
logger,
);
// Start the connection immediately
// Use connection state to track connection progress
void connection.start();
// TODO subscribe to connection state to retry or log issues?
return connection;
function* (transports) {
for (const transport of transports)
yield {
// We need to serialize the transport to a string to properly use it
// as a Map key, but we also need the real transport object in order
// to construct the connection; pass it through as the item's data.
key: `${transport.livekit_service_url}|${transport.livekit_alias}`,
data: transport,
};
return transports.map((transport) => {
const key =
transport.livekit_service_url + "|" + transport.livekit_alias;
return createOrGet(key, createConnection(transport));
});
},
(scope, key, transport$) => {
const connection = connectionFactory.createConnection(
transport$.value,
scope,
logger,
);
// Start the connection immediately
// Use connection state to track connection progress
void connection.start();
return connection;
},
),
);

View File

@@ -13,10 +13,23 @@ import {
type LivekitTransport,
type CallMembership,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, startWith, type Observable } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import {
combineLatest,
filter,
fromEvent,
map,
startWith,
switchMap,
type Observable,
} from "rxjs";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import {
RoomStateEvent,
type Room as MatrixRoom,
type RoomMember,
} from "matrix-js-sdk";
// import type { Logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior";
@@ -24,6 +37,7 @@ import { type ObservableScope } from "../ObservableScope";
import { type createConnectionManager$ } from "./ConnectionManager";
import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname";
import { type Connection } from "./Connection";
import { generateItems$ } from "../../utils/observable";
/**
* Represent a matrix call member and his associated livekit participation.
@@ -31,18 +45,20 @@ import { type Connection } from "./Connection";
* or if it has no livekit transport at all.
*/
export interface MatrixLivekitMember {
membership: CallMembership;
participantId: string;
membership$: Behavior<CallMembership>;
displayName$: Behavior<string>;
participant?: LocalLivekitParticipant | RemoteLivekitParticipant;
connection?: Connection;
participant$:
| Behavior<LocalLivekitParticipant | undefined>
| Behavior<RemoteLivekitParticipant | undefined>;
connection$: Behavior<Connection | undefined>;
mxcAvatarUrl$: Behavior<string | undefined>;
/**
* TODO Try to remove this! Its waaay to much information.
* Just get the member's avatar
* @deprecated
*/
member: RoomMember;
mxcAvatarUrl?: string;
participantId: string;
member$: Behavior<RoomMember>;
}
interface Props {
@@ -85,7 +101,7 @@ export function createMatrixLivekitMembers$({
*/
function createMatrixLivekitMember$(): Observable<MatrixLivekitMember[]> {
const displaynameMap$ = memberDisplaynames$(
const displayNameMap$ = memberDisplaynames$(
scope,
matrixRoom,
membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))),
@@ -93,51 +109,75 @@ export function createMatrixLivekitMembers$({
deviceId,
);
return combineLatest([
membershipsWithTransport$,
connectionManager.connectionManagerData$,
]).pipe(
map(([memberships, managerData]) => {
const items: MatrixLivekitMember[] = memberships.map(
({ membership, transport }) => {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
return generateItems$(
combineLatest([
membershipsWithTransport$,
connectionManager.connectionManagerData$,
]),
function* ([memberships, managerData]) {
for (const { membership, transport } of memberships) {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
const participants = transport
? managerData.getParticipantForTransport(transport)
: [];
const participant = participants.find(
(p) => p.identity == participantId,
);
const member = getRoomMemberFromRtcMember(
membership,
matrixRoom,
)?.member;
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
const displayName$ = scope.behavior(
displaynameMap$.pipe(
map(
(displayNameMap) =>
displayNameMap.get(membership.membershipID) ?? "---",
),
),
);
return {
const participants = transport
? managerData.getParticipantForTransport(transport)
: [];
const participant = participants.find(
(p) => p.identity == participantId,
);
const member = getRoomMemberFromRtcMember(
membership,
matrixRoom,
)?.member;
if (member === undefined) {
logger.warn(`No room member for participant ${participantId}`);
continue;
}
const connection = transport
? managerData.getConnectionForTransport(transport)
: undefined;
yield {
key: participantId,
data: {
participant,
membership,
connection,
// This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
// TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely.
member: member as RoomMember,
displayName$,
mxcAvatarUrl: member?.getMxcAvatarUrl(),
participantId,
};
},
);
return items;
member,
},
};
}
},
(scope, participantId, data$): MatrixLivekitMember => ({
participantId,
membership$: scope.behavior(data$.pipe(map((data) => data.membership))),
displayName$: scope.behavior(
displayNameMap$.pipe(
map((displayNames) => displayNames.get(participantId)),
filter((name) => name !== undefined),
),
"",
),
participant$: scope.behavior(
data$.pipe(map((data) => data.participant)),
// Assert that a local participant will never become a remote
// participant or vice versa
) as
| Behavior<LocalLivekitParticipant | undefined>
| Behavior<RemoteLivekitParticipant | undefined>,
connection$: scope.behavior(data$.pipe(map((data) => data.connection))),
mxcAvatarUrl$: scope.behavior(
// React to avatar changes
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(
startWith(null),
switchMap(() =>
data$.pipe(map((data) => data.member.getMxcAvatarUrl())),
),
),
),
member$: scope.behavior(data$.pipe(map((data) => data.member))),
}),
);
}

View File

@@ -20,6 +20,7 @@ import {
takeWhile,
tap,
withLatestFrom,
BehaviorSubject,
} from "rxjs";
import { type Behavior } from "../state/Behavior";
@@ -120,69 +121,54 @@ export function pauseWhen<T>(pause$: Behavior<boolean>) {
}
/**
* Maps a changing input value to an output value consisting of items that have
* automatically generated ObservableScopes tied to a key. Items will be
* automatically created when their key is requested for the first time, reused
* when the same key is requested at a later time, and destroyed (have their
* scope ended) when the key is no longer requested.
* Maps a changing input value to a collection of items that each capture some
* dynamic data and are tied to a key. Items will be automatically created when
* their key is requested for the first time, reused when the same key is
* requested acy later time, and destroyed (have their scope ended) when the key
* is no longer requested.
*
* @param input$ The input value to be mapped.
* @param project A function mapping input values to output values. This
* function receives an additional callback `createOrGet` which can be used
* within the function body to request that an item be generated for a certain
* key. The caller provides a factory which will be used to create the item if
* it is being requested for the first time. Otherwise, the item previously
* existing under that key will be returned.
* @param generator A generator function yielding a key and the currently
* associated data for each item that it wants to exist.
* @param factory A function constructing an actual item, given the item's key,
* dynamic data, and an automatically managed ObservableScope for the item.
*/
export function generateKeyed$<In, Item, Out>(
input$: Observable<In>,
project: (
input: In,
createOrGet: (
key: string,
factory: (scope: ObservableScope) => Item,
) => Item,
) => Out,
): Observable<Out> {
export function generateItems$<Input, Key, Data, Item>(
input$: Observable<Input>,
generator: (input: Input) => Generator<{ key: Key; data: Data }, void, void>,
factory: (scope: ObservableScope, key: Key, data$: Behavior<Data>) => Item,
): Observable<Item[]> {
return input$.pipe(
// Keep track of the existing items over time, so we can reuse them
scan<
In,
{
items: Map<string, { item: Item; scope: ObservableScope }>;
output: Out;
},
{ items: Map<string, { item: Item; scope: ObservableScope }> }
>(
(state, data) => {
const nextItems = new Map<
string,
{ item: Item; scope: ObservableScope }
>();
scan((prevItems, input) => {
const nextItems = new Map<
Key,
{ scope: ObservableScope; data$: BehaviorSubject<Data>; item: Item }
>();
const output = project(data, (key, factory) => {
let item = state.items.get(key);
if (item === undefined) {
// First time requesting the key; create the item
const scope = new ObservableScope();
item = { item: factory(scope), scope };
}
nextItems.set(key, item);
return item.item;
});
for (const { key, data } of generator(input)) {
let item = prevItems.get(key);
if (item === undefined) {
// First time requesting the key; create the item
const scope = new ObservableScope();
const data$ = new BehaviorSubject(data);
item = { scope, data$, item: factory(scope, key, data$) };
} else {
item.data$.next(data);
}
nextItems.set(key, item);
}
// Destroy all items that are no longer being requested
for (const [key, { scope }] of state.items)
if (!nextItems.has(key)) scope.end();
// Destroy all items that are no longer being requested
for (const [key, { scope }] of prevItems)
if (!nextItems.has(key)) scope.end();
return { items: nextItems, output };
},
{ items: new Map() },
),
finalizeValue((state) => {
return nextItems;
}, new Map<Key, { scope: ObservableScope; data$: BehaviorSubject<Data>; item: Item }>()),
finalizeValue((items) => {
// Destroy all remaining items when no longer subscribed
for (const { scope } of state.items.values()) scope.end();
for (const { scope } of items.values()) scope.end();
}),
map(({ output }) => output),
map((items) => [...items.values()].map(({ item }) => item)),
);
}