mirror of
https://github.com/vector-im/element-call.git
synced 2026-06-30 18:02:56 +00:00
Merge pull request #4051 from element-hq/remote-members
Clarify which Matrix-LiveKit members are remote
This commit is contained in:
10
sdk/main.ts
10
sdk/main.ts
@@ -76,9 +76,9 @@ interface MatrixRTCSdk {
|
||||
stop: () => void;
|
||||
data$: Observable<{ rtcBackendIdentity: string; data: string }>;
|
||||
/**
|
||||
* flattened list of members
|
||||
* flattened list of remote members
|
||||
*/
|
||||
members$: Behavior<
|
||||
remoteMembers$: Behavior<
|
||||
{
|
||||
connection: Connection | null;
|
||||
membership: CallMembership;
|
||||
@@ -86,7 +86,7 @@ interface MatrixRTCSdk {
|
||||
}[]
|
||||
>;
|
||||
/**
|
||||
* flattened local members
|
||||
* flattened local member
|
||||
*/
|
||||
localMember$: Behavior<{
|
||||
connection: Connection | null;
|
||||
@@ -338,8 +338,8 @@ export async function createMatrixRTCSdk(
|
||||
),
|
||||
),
|
||||
connected$: callViewModel.connected$,
|
||||
members$: scope.behavior(
|
||||
callViewModel.matrixLivekitMembers$.pipe(
|
||||
remoteMembers$: scope.behavior(
|
||||
callViewModel.remoteMatrixLivekitMembers$.pipe(
|
||||
switchMap((members) => {
|
||||
const listOfMemberObservables = members.map((member) =>
|
||||
combineLatest([
|
||||
|
||||
@@ -127,10 +127,9 @@ import {
|
||||
createConnectionManager$,
|
||||
} from "./remoteMembers/ConnectionManager.ts";
|
||||
import {
|
||||
createMatrixLivekitMembers$,
|
||||
createRemoteMatrixLivekitMembers$,
|
||||
type LocalMatrixLivekitMember,
|
||||
type RemoteMatrixLivekitMember,
|
||||
type MatrixLivekitMember,
|
||||
} from "./remoteMembers/MatrixLivekitMembers.ts";
|
||||
import {
|
||||
type AutoLeaveReason,
|
||||
@@ -302,7 +301,7 @@ export interface CallViewModel {
|
||||
/** Participants sorted by livekit room so they can be used in the audio rendering */
|
||||
livekitRoomItems$: Behavior<LivekitRoomItem[]>;
|
||||
/** use the layout instead, this is just for the sdk export. */
|
||||
matrixLivekitMembers$: Behavior<RemoteMatrixLivekitMember[]>;
|
||||
remoteMatrixLivekitMembers$: Behavior<RemoteMatrixLivekitMember[]>;
|
||||
localMatrixLivekitMember$: Behavior<LocalMatrixLivekitMember | null>;
|
||||
/** List of participants raising their hand */
|
||||
handsRaised$: Behavior<Record<string, RaisedHandInfo>>;
|
||||
@@ -529,13 +528,15 @@ export function createCallViewModel$(
|
||||
ownMembershipIdentity,
|
||||
});
|
||||
|
||||
const matrixLivekitMembers$: Behavior<Epoch<RemoteMatrixLivekitMember[]>> =
|
||||
createMatrixLivekitMembers$({
|
||||
scope: scope,
|
||||
membershipsWithTransport$:
|
||||
membershipsAndTransports.membershipsWithTransport$,
|
||||
connectionManager: connectionManager,
|
||||
});
|
||||
const remoteMatrixLivekitMembers$: Behavior<
|
||||
Epoch<RemoteMatrixLivekitMember[]>
|
||||
> = createRemoteMatrixLivekitMembers$({
|
||||
scope: scope,
|
||||
membershipsWithTransport$:
|
||||
membershipsAndTransports.membershipsWithTransport$,
|
||||
connectionManager: connectionManager,
|
||||
localUser: { userId, deviceId },
|
||||
});
|
||||
|
||||
const connectOptions$ = scope.behavior(
|
||||
matrixRTCMode$.pipe(
|
||||
@@ -612,6 +613,13 @@ export function createCallViewModel$(
|
||||
),
|
||||
);
|
||||
|
||||
const matrixLivekitMembers$ = scope.behavior(
|
||||
combineLatest(
|
||||
[localMatrixLivekitMember$, remoteMatrixLivekitMembers$],
|
||||
(local, remote) => [...(local === null ? [] : [local]), ...remote.value],
|
||||
),
|
||||
);
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// matrixMemberMetadataStore
|
||||
|
||||
@@ -641,7 +649,7 @@ export function createCallViewModel$(
|
||||
connectionManager.connectionManagerData$.pipe(map((d) => d.value)),
|
||||
);
|
||||
const livekitRoomItems$ = scope.behavior(
|
||||
matrixLivekitMembers$.pipe(
|
||||
remoteMatrixLivekitMembers$.pipe(
|
||||
switchMap((members) => {
|
||||
const a$ = combineLatest(
|
||||
members.value.map((member) =>
|
||||
@@ -707,43 +715,20 @@ export function createCallViewModel$(
|
||||
* List of user media (camera feeds) that we want tiles for.
|
||||
*/
|
||||
const userMedia$ = scope.behavior<WrappedUserMediaViewModel[]>(
|
||||
combineLatest([
|
||||
localMatrixLivekitMember$,
|
||||
matrixLivekitMembers$,
|
||||
duplicateTiles.value$,
|
||||
]).pipe(
|
||||
combineLatest([matrixLivekitMembers$, duplicateTiles.value$]).pipe(
|
||||
// Generate a collection of user media from the list of expected (whether
|
||||
// present or missing) LiveKit participants.
|
||||
generateItems(
|
||||
"CallViewModel userMedia$",
|
||||
function* ([
|
||||
localMatrixLivekitMember,
|
||||
matrixLivekitMembers,
|
||||
duplicateTiles,
|
||||
]) {
|
||||
const computeMediaId = (m: MatrixLivekitMember): string =>
|
||||
`${m.userId}:${m.membership$.value.deviceId}`;
|
||||
|
||||
const localUserMediaId = localMatrixLivekitMember
|
||||
? computeMediaId(localMatrixLivekitMember)
|
||||
: undefined;
|
||||
|
||||
const localAsArray = localMatrixLivekitMember
|
||||
? [localMatrixLivekitMember]
|
||||
: [];
|
||||
const remoteWithoutLocal = matrixLivekitMembers.value.filter(
|
||||
(m) => computeMediaId(m) !== localUserMediaId,
|
||||
);
|
||||
const allMatrixLivekitMembers = [
|
||||
...localAsArray,
|
||||
...remoteWithoutLocal,
|
||||
];
|
||||
|
||||
for (const matrixLivekitMember of allMatrixLivekitMembers) {
|
||||
const { userId, participant, connection$, membership$ } =
|
||||
matrixLivekitMember;
|
||||
const rtcId = membership$.value.rtcBackendIdentity; // rtcBackendIdentity
|
||||
const mediaId = computeMediaId(matrixLivekitMember);
|
||||
function* ([members, duplicateTiles]) {
|
||||
for (const {
|
||||
userId,
|
||||
participant,
|
||||
connection$,
|
||||
membership$,
|
||||
} of members) {
|
||||
const rtcId = membership$.value.rtcBackendIdentity;
|
||||
const mediaId = `${userId}:${membership$.value.deviceId}`;
|
||||
for (let dup = 0; dup < 1 + duplicateTiles; dup++) {
|
||||
yield {
|
||||
keys: [dup, mediaId, userId, participant, connection$, rtcId],
|
||||
@@ -861,7 +846,7 @@ export function createCallViewModel$(
|
||||
* multiple devices.
|
||||
*/
|
||||
const participantCount$ = scope.behavior(
|
||||
matrixLivekitMembers$.pipe(map((ms) => ms.value.length)),
|
||||
matrixLivekitMembers$.pipe(map((ms) => ms.length)),
|
||||
);
|
||||
|
||||
const leaveSoundEffect$ = userMedia$.pipe(
|
||||
@@ -1770,8 +1755,8 @@ export function createCallViewModel$(
|
||||
setGridMode: setGridMode,
|
||||
layout$: layout$,
|
||||
localMatrixLivekitMember$,
|
||||
matrixLivekitMembers$: scope.behavior(
|
||||
matrixLivekitMembers$.pipe(
|
||||
remoteMatrixLivekitMembers$: scope.behavior(
|
||||
remoteMatrixLivekitMembers$.pipe(
|
||||
map((members) => members.value),
|
||||
tap((v) => {
|
||||
const listForLogs = v
|
||||
|
||||
@@ -15,7 +15,7 @@ import { BehaviorSubject, combineLatest, map, type Observable } from "rxjs";
|
||||
import { type IConnectionManager } from "./ConnectionManager.ts";
|
||||
import {
|
||||
type RemoteMatrixLivekitMember,
|
||||
createMatrixLivekitMembers$,
|
||||
createRemoteMatrixLivekitMembers$,
|
||||
} from "./MatrixLivekitMembers.ts";
|
||||
import {
|
||||
Epoch,
|
||||
@@ -31,6 +31,7 @@ import {
|
||||
} from "../../../utils/test.ts";
|
||||
import { type Connection } from "./Connection.ts";
|
||||
import { constant } from "../../Behavior.ts";
|
||||
import { localRtcMember } from "../../../utils/test-fixtures.ts";
|
||||
|
||||
let testScope: ObservableScope;
|
||||
|
||||
@@ -88,16 +89,17 @@ test("should signal participant not yet connected to livekit", async () => {
|
||||
mockConnectionManagerData$,
|
||||
);
|
||||
|
||||
const matrixLivekitMember$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
|
||||
connectionManager: {
|
||||
connectionManagerData$: connectionManagerData$,
|
||||
} as unknown as IConnectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
expect(remoteMatrixLivekitMembers$.value.value).toSatisfy(
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
@@ -157,16 +159,17 @@ test("should signal participant on a connection that is publishing", async () =>
|
||||
constant(dataWithPublisher),
|
||||
);
|
||||
|
||||
const matrixLivekitMember$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
|
||||
connectionManager: {
|
||||
connectionManagerData$: connectionManagerData$,
|
||||
} as unknown as IConnectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
expect(remoteMatrixLivekitMembers$.value.value).toSatisfy(
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
@@ -197,15 +200,16 @@ test("should signal participant on a connection that is not publishing", async (
|
||||
constant(dataWithPublisher),
|
||||
);
|
||||
|
||||
const matrixLivekitMember$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
|
||||
connectionManager: {
|
||||
connectionManagerData$: connectionManagerData$,
|
||||
} as unknown as IConnectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
expect(remoteMatrixLivekitMembers$.value.value).toSatisfy(
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(1);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
@@ -245,15 +249,16 @@ describe("Publication edge case", () => {
|
||||
constant(connectionWithPublisher),
|
||||
);
|
||||
|
||||
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
|
||||
connectionManager: {
|
||||
connectionManagerData$: connectionManagerData$,
|
||||
} as unknown as IConnectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMembers$.value.value).toSatisfy(
|
||||
expect(remoteMatrixLivekitMembers$.value.value).toSatisfy(
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(2);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
@@ -303,16 +308,17 @@ test("bob is publishing in the wrong connection", async () => {
|
||||
connectionsWithPublisher$,
|
||||
);
|
||||
|
||||
const matrixLivekitMember$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
|
||||
connectionManager: {
|
||||
connectionManagerData$: connectionManagerData$,
|
||||
} as unknown as IConnectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
|
||||
await flushPromises();
|
||||
expect(matrixLivekitMember$.value.value).toSatisfy(
|
||||
expect(remoteMatrixLivekitMembers$.value.value).toSatisfy(
|
||||
(data: RemoteMatrixLivekitMember[]) => {
|
||||
expect(data.length).toEqual(2);
|
||||
expect(data[0].membership$.value).toBe(bobMembership);
|
||||
|
||||
@@ -62,7 +62,9 @@ interface Props {
|
||||
Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
|
||||
>;
|
||||
connectionManager: IConnectionManager;
|
||||
localUser: { deviceId: string; userId: string };
|
||||
}
|
||||
|
||||
/**
|
||||
* Combines MatrixRTC and Livekit worlds.
|
||||
*
|
||||
@@ -73,13 +75,14 @@ interface Props {
|
||||
* - out (via public Observable):
|
||||
* - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data.
|
||||
*/
|
||||
export function createMatrixLivekitMembers$({
|
||||
export function createRemoteMatrixLivekitMembers$({
|
||||
scope,
|
||||
membershipsWithTransport$,
|
||||
connectionManager,
|
||||
localUser,
|
||||
}: Props): Behavior<Epoch<RemoteMatrixLivekitMember[]>> {
|
||||
/**
|
||||
* Stream of all the call members and their associated livekit data (if available).
|
||||
* Behavior of all the remote call members and their associated livekit data (if available).
|
||||
*/
|
||||
return scope.behavior(
|
||||
combineLatest([
|
||||
@@ -91,12 +94,19 @@ export function createMatrixLivekitMembers$({
|
||||
),
|
||||
map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)),
|
||||
generateItemsWithEpoch(
|
||||
"MatrixLivekitMembers",
|
||||
"RemoteMatrixLivekitMembers",
|
||||
// Generator function.
|
||||
// creates an array of `{key, data}[]`
|
||||
// Each change in the keys (new key) will result in a call to the factory function.
|
||||
function* ([membershipsWithTransport, managerData]) {
|
||||
for (const { membership, transport } of membershipsWithTransport) {
|
||||
// Exclude the local membership
|
||||
if (
|
||||
membership.userId === localUser.userId &&
|
||||
membership.deviceId === localUser.deviceId
|
||||
)
|
||||
continue;
|
||||
|
||||
const participants = transport
|
||||
? managerData.getParticipantsForTransport(transport)
|
||||
: [];
|
||||
|
||||
@@ -29,13 +29,13 @@ import {
|
||||
import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
|
||||
import {
|
||||
areLivekitTransportsEqual,
|
||||
createMatrixLivekitMembers$,
|
||||
createRemoteMatrixLivekitMembers$,
|
||||
type RemoteMatrixLivekitMember,
|
||||
} from "./MatrixLivekitMembers.ts";
|
||||
import { createConnectionManager$ } from "./ConnectionManager.ts";
|
||||
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
|
||||
import { constant } from "../../Behavior.ts";
|
||||
import { testJWTToken } from "../../../utils/test-fixtures.ts";
|
||||
import { localRtcMember, testJWTToken } from "../../../utils/test-fixtures.ts";
|
||||
|
||||
// Test the integration of ConnectionManager and MatrixLivekitMerger
|
||||
|
||||
@@ -130,14 +130,15 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
});
|
||||
|
||||
const matrixLivekitMembers$ = createMatrixLivekitMembers$({
|
||||
const remoteMatrixLivekitMembers$ = createRemoteMatrixLivekitMembers$({
|
||||
scope: testScope,
|
||||
membershipsWithTransport$:
|
||||
membershipsAndTransports.membershipsWithTransport$,
|
||||
connectionManager,
|
||||
localUser: localRtcMember,
|
||||
});
|
||||
|
||||
expectObservable(matrixLivekitMembers$).toBe(vMarble, {
|
||||
expectObservable(remoteMatrixLivekitMembers$).toBe(vMarble, {
|
||||
a: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
|
||||
const items = e.value;
|
||||
expect(items.length).toBe(1);
|
||||
|
||||
Reference in New Issue
Block a user