Make track processor work

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-09-23 11:38:34 +02:00
parent 8bf24895ce
commit 78e9521f22
5 changed files with 68 additions and 3 deletions

View File

@@ -19,14 +19,21 @@ import {
useMemo,
} from "react";
import { type LocalVideoTrack } from "livekit-client";
import { combineLatest, map, type Observable } from "rxjs";
import { useObservable } from "observable-hooks";
import {
backgroundBlur as backgroundBlurSettings,
useSetting,
} from "../settings/settings";
import { BlurBackgroundTransformer } from "./BlurBackgroundTransformer";
import { type Behavior } from "../state/Behavior";
type ProcessorState = {
//TODO-MULTI-SFU: This is not yet fully there.
// it is a combination of exposing observable and react hooks.
// preferably we should not make this a context anymore and instead just a vm?
export type ProcessorState = {
supported: boolean | undefined;
processor: undefined | ProcessorWrapper<BackgroundOptions>;
};
@@ -42,6 +49,39 @@ export function useTrackProcessor(): ProcessorState {
return state;
}
export function useTrackProcessorObservable$(): Observable<ProcessorState> {
const state = use(ProcessorContext);
if (state === undefined)
throw new Error(
"useTrackProcessor must be used within a ProcessorProvider",
);
const state$ = useObservable(
(init$) => init$.pipe(map(([init]) => init)),
[state],
);
return state$;
}
export const trackProcessorSync = (
videoTrack$: Behavior<LocalVideoTrack | null>,
processor$: Behavior<ProcessorState>,
): void => {
combineLatest([videoTrack$, processor$]).subscribe(
([videoTrack, processorState]) => {
if (!processorState) return;
if (!videoTrack) return;
const { processor } = processorState;
if (processor && !videoTrack.getProcessor()) {
void videoTrack.setProcessor(processor);
}
if (!processor && videoTrack.getProcessor()) {
void videoTrack.stopProcessor();
}
},
);
};
export const useTrackProcessorSync = (
videoTrack: LocalVideoTrack | null,
): void => {

View File

@@ -55,6 +55,7 @@ interface UseLivekitResult {
}
// TODO-MULTI-SFU This is not used anymore but the device syncing logic needs to be moved into the connection object.
// seems to be mostly done... See Connection.ts
export function useLivekitPublicationRoom(
rtcSession: MatrixRTCSession,
muteStates: MuteStates,

View File

@@ -114,6 +114,7 @@ import { useAudioContext } from "../useAudioContext";
import ringtoneMp3 from "../sound/ringtone.mp3?url";
import ringtoneOgg from "../sound/ringtone.ogg?url";
import { ConnectionLostError } from "../utils/errors.ts";
import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx";
const canScreenshare = "getDisplayMedia" in (navigator.mediaDevices ?? {});
@@ -133,6 +134,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } =
useUrlParams();
const trackProcessorState$ = useTrackProcessorObservable$();
useEffect(() => {
const reactionsReader = new ReactionsReader(props.rtcSession);
const vm = new CallViewModel(
@@ -147,6 +149,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
},
reactionsReader.raisedHands$,
reactionsReader.reactions$,
trackProcessorState$,
);
setVm(vm);
@@ -166,6 +169,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
sendNotificationType,
waitForCallPickup,
props.onLeft,
trackProcessorState$,
]);
if (vm === null) return null;

View File

@@ -125,6 +125,7 @@ import { Connection, PublishConnection } from "./Connection";
import { type MuteStates } from "./MuteStates";
import { PosthogAnalytics } from "../analytics/PosthogAnalytics";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -460,6 +461,7 @@ export class CallViewModel extends ViewModel {
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
);
@@ -1861,6 +1863,7 @@ export class CallViewModel extends ViewModel {
private readonly reactionsSubject$: Observable<
Record<string, ReactionInfo>
>,
private readonly trackProcessorState$: Observable<ProcessorState>,
) {
super();

View File

@@ -15,6 +15,7 @@ import {
Room as LivekitRoom,
type E2EEOptions,
Track,
LocalVideoTrack,
} from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk";
import {
@@ -39,6 +40,11 @@ import { defaultLiveKitOptions } from "../livekit/options";
import { getValue } from "../utils/observable";
import { getUrlParams } from "../UrlParams";
import { type MuteStates } from "./MuteStates";
import {
type ProcessorState,
trackProcessorSync,
} from "../livekit/TrackProcessorContext";
import { observeTrackReference$ } from "./MediaViewModel";
export class Connection {
protected stopped = false;
@@ -151,6 +157,7 @@ export class PublishConnection extends Connection {
devices: MediaDevices,
private readonly muteStates: MuteStates,
e2eeLivekitOptions: E2EEOptions | undefined,
trackerProcessorState$: Behavior<ProcessorState>,
) {
logger.info("[LivekitRoom] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams();
@@ -160,8 +167,7 @@ export class PublishConnection extends Connection {
videoCaptureDefaults: {
...defaultLiveKitOptions.videoCaptureDefaults,
deviceId: devices.videoInput.selected$.value?.id,
// TODO-MULTI-SFU add processor support back
// processor,
processor: trackerProcessorState$.value.processor,
},
audioCaptureDefaults: {
...defaultLiveKitOptions.audioCaptureDefaults,
@@ -191,6 +197,17 @@ export class PublishConnection extends Connection {
room,
);
// Setup track processor syncing (blur)
const track$ = this.scope.behavior(
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
map((trackRef) => {
const track = trackRef?.publication?.track;
return track instanceof LocalVideoTrack ? track : null;
}),
),
);
trackProcessorSync(track$, trackerProcessorState$);
this.muteStates.audio.setHandler(async (desired) => {
try {
await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);