Add debug logs to generateItems

It's always worth having logs for when state holders are created or destroyed (these are often the most interesting things happening in the application), so I thought it would be nice to have generateItems always log for you when it's doing that.
This commit is contained in:
Robin
2026-02-06 13:02:20 +01:00
parent 2c1476f151
commit a0209eb433
6 changed files with 54 additions and 27 deletions

View File

@@ -715,6 +715,7 @@ export function createCallViewModel$(
// Generate a collection of MediaItems from the list of expected (whether
// present or missing) LiveKit participants.
generateItems(
"CallViewModel userMedia$",
function* ([
localMatrixLivekitMember,
matrixLivekitMembers,

View File

@@ -162,6 +162,7 @@ export function createConnectionManager$({
const connections$ = scope.behavior(
localAndRemoteTransports$.pipe(
generateItemsWithEpoch(
"ConnectionManager connections$",
function* (transports) {
for (const transportWithOrWithoutSfuConfig of transports) {
if (

View File

@@ -11,7 +11,6 @@ import {
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, filter, map } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "./ConnectionManager";
@@ -19,8 +18,6 @@ import { Epoch, type ObservableScope } from "../../ObservableScope";
import { type Connection } from "./Connection";
import { generateItemsWithEpoch } from "../../../utils/observable";
const logger = rootLogger.getChild("[MatrixLivekitMembers]");
interface LocalTaggedParticipant {
type: "local";
value$: Behavior<LocalParticipant | null>;
@@ -94,9 +91,10 @@ export function createMatrixLivekitMembers$({
),
map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)),
generateItemsWithEpoch(
"MatrixLivekitMembers",
// Generator function.
// creates an array of `{key, data}[]`
// Each change in the keys (new key, missing key) will result in a call to the factory function.
// Each change in the keys (new key) will result in a call to the factory function.
function* ([membershipsWithTransport, managerData]) {
for (const { membership, transport } of membershipsWithTransport) {
const participants = transport
@@ -111,26 +109,23 @@ export function createMatrixLivekitMembers$({
: null;
yield {
// This could also just be the memberId without the other fields.
// In theory we should never have the same memberId for different userIds (they are UUIDs)
// This still makes us resilient agains someone who intentionally tries to use the same memberId.
// If they want to do this they would now need to also use the same sender which is impossible.
// This could just be the backend identity without the other keys.
// The user ID, device ID, and member ID are included however so
// they show up in debug logs.
keys: [
membership.userId,
membership.deviceId,
membership.memberId,
membership.rtcBackendIdentity,
],
data: { membership, participant, connection },
};
}
},
// Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory.
(scope, data$, userId, deviceId, memberId) => {
logger.debug(
`Generating member for livekitIdentity: ${data$.value.membership.rtcBackendIdentity},keys userId:deviceId:memberId ${userId}:${deviceId}:${memberId}`,
);
// Each update where the key of the generator array do not change will result in updates to the `data$` behavior.
(scope, data$, userId, _deviceId, _memberId, _rtcBackendIdentity) => {
const { participant$, ...rest } = scope.splitBehavior(data$);
// will only get called once per `participantId, userId` pair.
// will only get called once per backend identity.
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.
return {
userId,

View File

@@ -130,6 +130,7 @@ export class UserMedia {
// MediaViewModels don't support it though since they look for a unique
// track for the given source. So generateItems here is a bit overkill.
generateItems(
`${this.id} screenShares$`,
function* (p) {
if (p.isScreenShareEnabled)
yield {

View File

@@ -47,6 +47,7 @@ test("generateItems", () => {
expectObservable(
hot<string>(inputMarbles).pipe(
generateItems(
"test items",
function* (input) {
for (let i = 1; i <= +input; i++) {
yield { keys: [i], data: undefined };

View File

@@ -24,6 +24,7 @@ import {
type OperatorFunction,
distinctUntilChanged,
} from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../state/Behavior";
import { Epoch, ObservableScope } from "../state/ObservableScope";
@@ -122,8 +123,9 @@ export function pauseWhen<T>(pause$: Behavior<boolean>) {
);
}
interface ItemHandle<Data, Item> {
interface ItemHandle<Keys extends unknown[], Data, Item> {
scope: ObservableScope;
keys: readonly [...Keys];
data$: BehaviorSubject<Data>;
item: Item;
}
@@ -135,6 +137,7 @@ interface ItemHandle<Data, Item> {
* requested at a later time, and destroyed (have their scope ended) when the
* key is no longer requested.
*
* @param name A name for this collection to use in debug logs.
* @param generator A generator function yielding a tuple of keys and the
* currently associated data for each item that it wants to exist.
* @param factory A function constructing an individual item, given the item's key,
@@ -146,16 +149,17 @@ export function generateItems<
Data,
Item,
>(
name: string,
generator: (
input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: (
scope: ObservableScope,
data$: Behavior<Data>,
...keys: Keys
) => Item,
): OperatorFunction<Input, Item[]> {
return generateItemsInternal(generator, factory, (items) => items);
return generateItemsInternal(name, generator, factory, (items) => items);
}
/**
@@ -167,9 +171,10 @@ export function generateItemsWithEpoch<
Data,
Item,
>(
name: string,
generator: (
input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: (
scope: ObservableScope,
data$: Behavior<Data>,
@@ -177,6 +182,7 @@ export function generateItemsWithEpoch<
) => Item,
): OperatorFunction<Epoch<Input>, Epoch<Item[]>> {
return generateItemsInternal(
name,
function* (input) {
yield* generator(input.value);
},
@@ -214,9 +220,10 @@ function generateItemsInternal<
Item,
Output,
>(
name: string,
generator: (
input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: (
scope: ObservableScope,
data$: Behavior<Data>,
@@ -232,26 +239,34 @@ function generateItemsInternal<
Input,
{
map: Map<any, any>;
items: Set<ItemHandle<Data, Item>>;
items: Set<ItemHandle<Keys, Data, Item>>;
input: Input;
},
{ map: Map<any, any>; items: Set<ItemHandle<Data, Item>> }
{ map: Map<any, any>; items: Set<ItemHandle<Keys, Data, Item>> }
>(
({ map: prevMap, items: prevItems }, input) => {
const nextMap = new Map();
const nextItems = new Set<ItemHandle<Data, Item>>();
const nextItems = new Set<ItemHandle<Keys, Data, Item>>();
for (const { keys, data } of generator(input)) {
// Disable type checks for a second to grab the item out of a nested map
let i: any = prevMap;
for (const key of keys) i = i?.get(key);
let item = i as ItemHandle<Data, Item> | undefined;
let item = i as ItemHandle<Keys, Data, Item> | undefined;
if (item === undefined) {
// First time requesting the key; create the item
const scope = new ObservableScope();
const data$ = new BehaviorSubject(data);
item = { scope, data$, item: factory(scope, data$, ...keys) };
logger.debug(
`[${name}] Creating item with keys ${keys.join(", ")}`,
);
item = {
scope,
keys,
data$,
item: factory(scope, data$, ...keys),
};
} else {
item.data$.next(data);
}
@@ -269,7 +284,7 @@ function generateItemsInternal<
const finalKey = keys[keys.length - 1];
if (m.has(finalKey))
throw new Error(
`Keys must be unique (tried to generate multiple items for key ${keys})`,
`Keys must be unique (tried to generate multiple items for key ${keys.join(", ")})`,
);
m.set(keys[keys.length - 1], item);
nextItems.add(item);
@@ -277,7 +292,12 @@ function generateItemsInternal<
// Destroy all items that are no longer being requested
for (const item of prevItems)
if (!nextItems.has(item)) item.scope.end();
if (!nextItems.has(item)) {
logger.debug(
`[${name}] Destroying item with keys ${item.keys.join(", ")}`,
);
item.scope.end();
}
return { map: nextMap, items: nextItems, input };
},
@@ -285,7 +305,15 @@ function generateItemsInternal<
),
finalizeValue(({ items }) => {
// Destroy all remaining items when no longer subscribed
for (const { scope } of items) scope.end();
logger.debug(
`[${name}] End of scope, destroying all ${items.size} items…`,
);
for (const item of items) {
logger.debug(
`[${name}] Destroying item with keys ${item.keys.join(", ")}`,
);
item.scope.end();
}
}),
map(({ items, input }) =>
project(