From 78e9521f22f6c428b009ff98d256be1b56ef4694 Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 23 Sep 2025 11:38:34 +0200 Subject: [PATCH] Make track processor work Signed-off-by: Timo K --- src/livekit/TrackProcessorContext.tsx | 42 ++++++++++++++++++++++++++- src/livekit/useLivekit.ts | 1 + src/room/InCallView.tsx | 4 +++ src/state/CallViewModel.ts | 3 ++ src/state/Connection.ts | 21 ++++++++++++-- 5 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/livekit/TrackProcessorContext.tsx b/src/livekit/TrackProcessorContext.tsx index b37a6e3e..4a5ace46 100644 --- a/src/livekit/TrackProcessorContext.tsx +++ b/src/livekit/TrackProcessorContext.tsx @@ -19,14 +19,21 @@ import { useMemo, } from "react"; import { type LocalVideoTrack } from "livekit-client"; +import { combineLatest, map, type Observable } from "rxjs"; +import { useObservable } from "observable-hooks"; import { backgroundBlur as backgroundBlurSettings, useSetting, } from "../settings/settings"; import { BlurBackgroundTransformer } from "./BlurBackgroundTransformer"; +import { type Behavior } from "../state/Behavior"; -type ProcessorState = { +//TODO-MULTI-SFU: This is not yet fully there. +// it is a combination of exposing observable and react hooks. +// preferably we should not make this a context anymore and instead just a vm? + +export type ProcessorState = { supported: boolean | undefined; processor: undefined | ProcessorWrapper; }; @@ -42,6 +49,39 @@ export function useTrackProcessor(): ProcessorState { return state; } +export function useTrackProcessorObservable$(): Observable { + const state = use(ProcessorContext); + if (state === undefined) + throw new Error( + "useTrackProcessor must be used within a ProcessorProvider", + ); + const state$ = useObservable( + (init$) => init$.pipe(map(([init]) => init)), + [state], + ); + + return state$; +} + +export const trackProcessorSync = ( + videoTrack$: Behavior, + processor$: Behavior, +): void => { + combineLatest([videoTrack$, processor$]).subscribe( + ([videoTrack, processorState]) => { + if (!processorState) return; + if (!videoTrack) return; + const { processor } = processorState; + if (processor && !videoTrack.getProcessor()) { + void videoTrack.setProcessor(processor); + } + if (!processor && videoTrack.getProcessor()) { + void videoTrack.stopProcessor(); + } + }, + ); +}; + export const useTrackProcessorSync = ( videoTrack: LocalVideoTrack | null, ): void => { diff --git a/src/livekit/useLivekit.ts b/src/livekit/useLivekit.ts index 0672a8eb..420bac95 100644 --- a/src/livekit/useLivekit.ts +++ b/src/livekit/useLivekit.ts @@ -55,6 +55,7 @@ interface UseLivekitResult { } // TODO-MULTI-SFU This is not used anymore but the device syncing logic needs to be moved into the connection object. +// seems to be mostly done... See Connection.ts export function useLivekitPublicationRoom( rtcSession: MatrixRTCSession, muteStates: MuteStates, diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index b8460ad8..157ee46a 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -114,6 +114,7 @@ import { useAudioContext } from "../useAudioContext"; import ringtoneMp3 from "../sound/ringtone.mp3?url"; import ringtoneOgg from "../sound/ringtone.ogg?url"; import { ConnectionLostError } from "../utils/errors.ts"; +import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx"; const canScreenshare = "getDisplayMedia" in (navigator.mediaDevices ?? {}); @@ -133,6 +134,7 @@ export const ActiveCall: FC = (props) => { const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } = useUrlParams(); + const trackProcessorState$ = useTrackProcessorObservable$(); useEffect(() => { const reactionsReader = new ReactionsReader(props.rtcSession); const vm = new CallViewModel( @@ -147,6 +149,7 @@ export const ActiveCall: FC = (props) => { }, reactionsReader.raisedHands$, reactionsReader.reactions$, + trackProcessorState$, ); setVm(vm); @@ -166,6 +169,7 @@ export const ActiveCall: FC = (props) => { sendNotificationType, waitForCallPickup, props.onLeft, + trackProcessorState$, ]); if (vm === null) return null; diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 8479f76b..40828357 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -125,6 +125,7 @@ import { Connection, PublishConnection } from "./Connection"; import { type MuteStates } from "./MuteStates"; import { PosthogAnalytics } from "../analytics/PosthogAnalytics"; import { getUrlParams } from "../UrlParams"; +import { type ProcessorState } from "../livekit/TrackProcessorContext"; export interface CallViewModelOptions { encryptionSystem: EncryptionSystem; @@ -460,6 +461,7 @@ export class CallViewModel extends ViewModel { this.mediaDevices, this.muteStates, this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), ), ); @@ -1861,6 +1863,7 @@ export class CallViewModel extends ViewModel { private readonly reactionsSubject$: Observable< Record >, + private readonly trackProcessorState$: Observable, ) { super(); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 4f9721ea..6804b2b7 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -15,6 +15,7 @@ import { Room as LivekitRoom, type E2EEOptions, Track, + LocalVideoTrack, } from "livekit-client"; import { type MatrixClient } from "matrix-js-sdk"; import { @@ -39,6 +40,11 @@ import { defaultLiveKitOptions } from "../livekit/options"; import { getValue } from "../utils/observable"; import { getUrlParams } from "../UrlParams"; import { type MuteStates } from "./MuteStates"; +import { + type ProcessorState, + trackProcessorSync, +} from "../livekit/TrackProcessorContext"; +import { observeTrackReference$ } from "./MediaViewModel"; export class Connection { protected stopped = false; @@ -151,6 +157,7 @@ export class PublishConnection extends Connection { devices: MediaDevices, private readonly muteStates: MuteStates, e2eeLivekitOptions: E2EEOptions | undefined, + trackerProcessorState$: Behavior, ) { logger.info("[LivekitRoom] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); @@ -160,8 +167,7 @@ export class PublishConnection extends Connection { videoCaptureDefaults: { ...defaultLiveKitOptions.videoCaptureDefaults, deviceId: devices.videoInput.selected$.value?.id, - // TODO-MULTI-SFU add processor support back - // processor, + processor: trackerProcessorState$.value.processor, }, audioCaptureDefaults: { ...defaultLiveKitOptions.audioCaptureDefaults, @@ -191,6 +197,17 @@ export class PublishConnection extends Connection { room, ); + // Setup track processor syncing (blur) + const track$ = this.scope.behavior( + observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( + map((trackRef) => { + const track = trackRef?.publication?.track; + return track instanceof LocalVideoTrack ? track : null; + }), + ), + ); + trackProcessorSync(track$, trackerProcessorState$); + this.muteStates.audio.setHandler(async (desired) => { try { await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);