diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts index 52a09033..6ffdb5b5 100644 --- a/src/state/ownMember/OwnMembership.ts +++ b/src/state/ownMember/OwnMembership.ts @@ -19,10 +19,21 @@ import { SyncState, type Room as MatrixRoom, } from "matrix-js-sdk"; -import { fromEvent, map, type Observable, scan, startWith } from "rxjs"; +import { + BehaviorSubject, + combineLatest, + from, + fromEvent, + map, + type Observable, + of, + scan, + startWith, + switchMap, +} from "rxjs"; import { multiSfu } from "../../settings/settings"; import { type Behavior } from "../Behavior"; -import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; +import { ConnectionManager } from "../remoteMembers/ConnectionManager"; import { makeTransport } from "../../rtcSessionHelpers"; import { type ObservableScope } from "../ObservableScope"; import { async$, unwrapAsync } from "../Async"; @@ -31,7 +42,19 @@ import { type MuteStates } from "../MuteStates"; import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type MediaDevices } from "../../state/MediaDevices"; import { and$ } from "../../utils/observable"; +import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger"; +/* + * - get well known + * - get oldest membership + * - get transport to use + * - get openId + jwt token + * - wait for createTrack() call + * - create tracks + * - wait for join() call + * - Publisher.publishTracks() + * - send join state/sticky event + */ interface Props { scope: ObservableScope; mediaDevices: MediaDevices; @@ -71,10 +94,18 @@ export const ownMembership$ = ({ roomId, trackerProcessorState$, }: Props): { - connected$: Behavior; - transport$: Behavior; - publisher: Publisher; + // publisher: Publisher + requestJoin(): Observable; + startTracks(): Track[]; } => { + // This should be used in a combineLatest with publisher$ to connect. + const shouldStartTracks$ = BehaviorSubject(false); + + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const startTracks = () => { + shouldStartTracks$.next(true); + }; + const userId = client.getUserId()!; const deviceId = client.getDeviceId()!; const multiSfu$ = multiSfu.value$; @@ -82,22 +113,23 @@ export const ownMembership$ = ({ * The transport that we would personally prefer to publish on (if not for the * transport preferences of others, perhaps). */ - const preferredTransport$ = scope.behavior( - async$(makeTransport(client, roomId)).pipe( - map(unwrapAsync(null)), - ), + const preferredTransport$: Behavior = scope.behavior( + from(makeTransport(client, roomId)), ); - const connection = connectionManager.registerTransports( + connectionManager.registerTransports( scope.behavior(preferredTransport$.pipe(map((t) => (t ? [t] : [])))), - )[0]; - if (!connection) { - logger.warn( - "No connection found when passing transport to connectionManager. transport:", - preferredTransport$.value, - ); - } + ); + const connection$ = scope.behavior( + combineLatest([connectionManager.connections$, preferredTransport$]).pipe( + map(([connections, transport]) => + connections.find((connection) => + areLivekitTransportsEqual(connection.transport, transport), + ), + ), + ), + ); /** * Whether we are connected to the MatrixRTC session. */ @@ -129,6 +161,7 @@ export const ownMembership$ = ({ ), ), ); + /** * Whether we are "fully" connected to the call. Accounts for both the * connection to the MatrixRTC session and the LiveKit publish connection. @@ -136,19 +169,31 @@ export const ownMembership$ = ({ const connected$ = scope.behavior( and$( matrixConnected$, - connection.state$.pipe( - map((state) => state.state === "ConnectedToLkRoom"), + connection$.pipe( + switchMap((c) => + c + ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) + : of(false), + ), ), ), ); - const publisher = new Publisher( - scope, - connection, - mediaDevices, - muteStates, - e2eeLivekitOptions, - trackerProcessorState$, + const publisher = scope.behavior( + connection$.pipe( + map((c) => + c + ? new Publisher( + scope, + c, + mediaDevices, + muteStates, + e2eeLivekitOptions, + trackerProcessorState$, + ) + : null, + ), + ), ); // HOW IT WAS PREVIEOUSLY CREATED @@ -171,11 +216,11 @@ export const ownMembership$ = ({ * null when not joined. */ // DISCUSSION ownMembershipManager - const localTransport$: Behavior | null> = + const localTransport$: Behavior = this.scope.behavior( this.transports$.pipe( map((transports) => transports?.local ?? null), - distinctUntilChanged | null>(deepCompare), + distinctUntilChanged(deepCompare), ), ); diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index f15bee10..845c2af0 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -99,7 +99,7 @@ export class ConnectionManager { /** * Connections for each transport in use by one or more session members. */ - private readonly connections$ = this.scope.behavior( + public readonly connections$ = this.scope.behavior( generateKeyed$( this.transports$, (transports, createOrGet) => { @@ -144,22 +144,20 @@ export class ConnectionManager { * the same `transports$` behavior reference. * @param transports$ The Behavior containing a list of transports to subscribe to. */ - public registerTransports( - transports$: Behavior, - ): Connection[] { + public registerTransports(transports$: Behavior): void { if (!this.transportsSubscriptions$.value.some((t$) => t$ === transports$)) { this.transportsSubscriptions$.next( this.transportsSubscriptions$.value.concat(transports$), ); } - // After updating the subscriptions our connection list is also updated. - return transports$.value - .map((transport) => { - const isConnectionForTransport = (connection: Connection): boolean => - areLivekitTransportsEqual(connection.transport, transport); - return this.connections$.value.find(isConnectionForTransport); - }) - .filter((c) => c !== undefined); + // // After updating the subscriptions our connection list is also updated. + // return transports$.value + // .map((transport) => { + // const isConnectionForTransport = (connection: Connection): boolean => + // areLivekitTransportsEqual(connection.transport, transport); + // return this.connections$.value.find(isConnectionForTransport); + // }) + // .filter((c) => c !== undefined); } /** @@ -218,7 +216,7 @@ export class ConnectionManager { * Each participant that is found on all connections managed by the manager will be listed. * * They are stored an a map keyed by `participant.identity` - * (which is equivalent to the `member.id` field in the `m.rtc.member` event) + * TODO (which is equivalent to the `member.id` field in the `m.rtc.member` event) right now its userId:deviceId */ public allParticipantsByMemberId$ = this.scope.behavior( this.allParticipantsWithConnection$.pipe( diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index 0411a0ca..bd6ed353 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -109,7 +109,8 @@ export class MatrixLivekitMerger { const items: MatrixLivekitItem[] = memberships.map( ({ membership, transport }) => { const participantsWithConnection = participantsByMemberId.get( - membership.membershipID, + // membership.membershipID, Currently its hardcoded by the jwt service to + `${membership.userId}:${membership.deviceId}`, ); const participant = transport &&