refactor connnection class

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-28 13:52:12 +02:00
parent a617a92e88
commit e4a54e3a19
2 changed files with 122 additions and 90 deletions

View File

@@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details.
*/
import {
connectedParticipantsObserver,
observeParticipantEvents,
observeParticipantMedia,
} from "@livekit/components-core";
@@ -25,7 +24,6 @@ import {
type EventTimelineSetHandlerMap,
EventType,
RoomEvent,
type MatrixClient,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
@@ -117,7 +115,6 @@ import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { constant, type Behavior } from "./Behavior";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { defaultLiveKitOptions } from "../livekit/options";
import {
enterRTCSession,
@@ -126,7 +123,8 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { ECConnectionState } from "../livekit/useECConnectionState";
import { type ECConnectionState } from "../livekit/useECConnectionState";
import { Connection, PublishConnection } from "./Connection";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -459,88 +457,6 @@ function getRoomMemberFromRtcMember(
return { id, member };
}
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
class Connection {
private readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.serviceUrl,
this.livekitAlias,
);
public async startSubscribing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
await this.livekitRoom.localParticipant.publishTrack(tracks[0]);
}
public async startPublishing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
if (!this.stopped) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: true,
video: true,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
}
}
private stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$ = (
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
memberships$,
]).pipe(
map(([participants, memberships]) => {
const publishingMembers = membershipsFocusUrl(
memberships,
this.matrixRTCSession,
)
.filter((f) => f.livekit_service_url === this.serviceUrl)
.map((f) => f.membership);
const publishingP = publishingMembers
.map((m) => {
return participants.find((p) => {
return p.identity === `${m.sender}:${m.deviceId}`;
});
})
.filter((p): p is RemoteParticipant => !!p);
return publishingP;
}),
),
[],
);
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
private readonly matrixRTCSession: MatrixRTCSession,
) {}
}
export class CallViewModel extends ViewModel {
private readonly e2eeOptions = getE2eeOptions(
this.options.encryptionSystem,
@@ -558,7 +474,7 @@ export class CallViewModel extends ViewModel {
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
new PublishConnection(
this.localConnectionLivekitRoom,
focus.livekit_service_url,
this.livekitAlias,
@@ -1881,7 +1797,7 @@ export class CallViewModel extends ViewModel {
.subscribe(
(c) =>
void c
.startPublishing()
.start()
// eslint-disable-next-line no-console
.then(() => console.log("successfully started publishing"))
// eslint-disable-next-line no-console
@@ -1890,7 +1806,7 @@ export class CallViewModel extends ViewModel {
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const connection of start) void connection.startSubscribing();
for (const connection of start) void connection.start();
for (const connection of stop) connection.stop();
});
combineLatest([this.localFocus, this.joined$])
@@ -1954,7 +1870,7 @@ export class CallViewModel extends ViewModel {
}
}
const membershipsFocusUrl = (
export const membershipsFocusUrl = (
memberships: CallMembership[],
matrixRTCSession: MatrixRTCSession,
): { livekit_service_url: string; membership: CallMembership }[] => {

116
src/state/Connection.ts Normal file
View File

@@ -0,0 +1,116 @@
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { connectedParticipantsObserver } from "@livekit/components-core";
import {
type Room as LivekitRoom,
type RemoteParticipant,
} from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk";
import {
type CallMembership,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, type Observable } from "rxjs";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior";
import { membershipsFocusUrl } from "./CallViewModel";
import { type ObservableScope } from "./ObservableScope";
export class Connection {
protected readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.serviceUrl,
this.livekitAlias,
);
public async start(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
protected stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$ = (
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
memberships$,
]).pipe(
map(([participants, memberships]) => {
const publishingMembers = membershipsFocusUrl(
memberships,
this.matrixRTCSession,
)
.filter((f) => f.livekit_service_url === this.serviceUrl)
.map((f) => f.membership);
const publishingP = publishingMembers
.map((m) => {
return participants.find((p) => {
return p.identity === `${m.sender}:${m.deviceId}`;
});
})
.filter((p): p is RemoteParticipant => !!p);
return publishingP;
}),
),
[],
);
public constructor(
protected readonly livekitRoom: LivekitRoom,
protected readonly serviceUrl: string,
protected readonly livekitAlias: string,
protected readonly client: MatrixClient,
protected readonly scope: ObservableScope,
protected readonly matrixRTCSession: MatrixRTCSession,
) {}
}
export class PublishConnection extends Connection {
public async start(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
if (!this.stopped) {
const tracks = await this.livekitRoom.localParticipant.createTracks({
audio: true,
video: true,
});
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
}
}
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participantsIncludingSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
}