diff --git a/.github/workflows/build-and-publish-docker.yaml b/.github/workflows/build-and-publish-docker.yaml index 4b7fdf1c..68f7131c 100644 --- a/.github/workflows/build-and-publish-docker.yaml +++ b/.github/workflows/build-and-publish-docker.yaml @@ -33,8 +33,8 @@ jobs: name: build-output-full path: dist - - name: Login to GitHub container registry - uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0 + - name: Log in to container registry + uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3.7.0 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} @@ -78,7 +78,7 @@ jobs: - name: Extract metadata (tags, labels) for Docker id: meta - uses: docker/metadata-action@318604b99e75e41977312d83839a89be02ca4893 # v5.9.0 + uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051 # v5.10.0 with: images: | ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} @@ -88,10 +88,10 @@ jobs: org.opencontainers.image.licenses=AGPL-3.0-only OR LicenseRef-Element-Commercial - name: Set up Docker Buildx - uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 + uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0 - name: Build and push Docker image - uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # v6.18.0 + uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 # v6.19.2 with: context: . platforms: linux/amd64,linux/arm64 diff --git a/.github/workflows/build-element-call.yaml b/.github/workflows/build-element-call.yaml index 01553fec..4ca5ccad 100644 --- a/.github/workflows/build-element-call.yaml +++ b/.github/workflows/build-element-call.yaml @@ -7,7 +7,7 @@ on: type: string package: type: string # This would ideally be a `choice` type, but that isn't supported yet - description: The package type to be built. Must be one of 'full' or 'embedded' + description: The package type to be built. Must be one of 'full', 'embedded', or 'sdk' required: true build_mode: type: string # This would ideally be a `choice` type, but that isn't supported yet diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6aa5fae6..9b86215e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -69,3 +69,17 @@ jobs: SENTRY_URL: ${{ secrets.SENTRY_URL }} SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_AUTH_TOKEN }} CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + build_sdk_element_call: + # Use the embedded package vite build + uses: ./.github/workflows/build-element-call.yaml + with: + package: sdk + vite_app_version: ${{ github.event.release.tag_name || github.sha }} + build_mode: ${{ github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'development build') && 'development' || 'production' }} + secrets: + SENTRY_ORG: ${{ secrets.SENTRY_ORG }} + SENTRY_PROJECT: ${{ secrets.SENTRY_PROJECT }} + SENTRY_URL: ${{ secrets.SENTRY_URL }} + SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_AUTH_TOKEN }} + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/deploy-to-netlify.yaml b/.github/workflows/deploy-to-netlify.yaml index 388192e4..3f964ad4 100644 --- a/.github/workflows/deploy-to-netlify.yaml +++ b/.github/workflows/deploy-to-netlify.yaml @@ -14,6 +14,10 @@ on: deployment_ref: required: true type: string + package: + required: true + type: string + description: Which package to deploy - 'full', 'embedded', or 'sdk' artifact_run_id: required: false type: string @@ -50,7 +54,7 @@ jobs: with: github-token: ${{ secrets.ELEMENT_BOT_TOKEN }} run-id: ${{ inputs.artifact_run_id }} - name: build-output-full + name: build-output-${{ inputs.package }} path: webapp - name: Add redirects file @@ -58,15 +62,19 @@ jobs: run: curl -s https://raw.githubusercontent.com/element-hq/element-call/main/config/netlify_redirects > webapp/_redirects - name: Add config file - run: curl -s "https://raw.githubusercontent.com/${{ inputs.pr_head_full_name }}/${{ inputs.pr_head_ref }}/config/config_netlify_preview.json" > webapp/config.json - + run: | + if [ "${{ inputs.package }}" = "full" ]; then + curl -s "https://raw.githubusercontent.com/${{ inputs.pr_head_full_name }}/${{ inputs.pr_head_ref }}/config/config_netlify_preview.json" > webapp/config.json + else + curl -s "https://raw.githubusercontent.com/${{ inputs.pr_head_full_name }}/${{ inputs.pr_head_ref }}/config/config_netlify_preview_sdk.json" > webapp/config.json + fi - name: ☁️ Deploy to Netlify id: netlify uses: nwtgck/actions-netlify@4cbaf4c08f1a7bfa537d6113472ef4424e4eb654 # v3.0 with: publish-dir: webapp deploy-message: "Deploy from GitHub Actions" - alias: pr${{ inputs.pr_number }} + alias: ${{ inputs.package == 'sdk' && format('pr{0}-sdk', inputs.pr_number) || format('pr{0}', inputs.pr_number) }} env: NETLIFY_AUTH_TOKEN: ${{ secrets.NETLIFY_AUTH_TOKEN }} NETLIFY_SITE_ID: ${{ secrets.NETLIFY_SITE_ID }} diff --git a/.github/workflows/pr-deploy.yaml b/.github/workflows/pr-deploy.yaml index 7b128352..fe934162 100644 --- a/.github/workflows/pr-deploy.yaml +++ b/.github/workflows/pr-deploy.yaml @@ -20,7 +20,7 @@ jobs: owner: ${{ github.event.workflow_run.head_repository.owner.login }} branch: ${{ github.event.workflow_run.head_branch }} - netlify: + netlify-full: needs: prdetails permissions: deployments: write @@ -31,6 +31,24 @@ jobs: pr_head_full_name: ${{ github.event.workflow_run.head_repository.full_name }} pr_head_ref: ${{ needs.prdetails.outputs.pr_data_json && fromJSON(needs.prdetails.outputs.pr_data_json).head.ref }} deployment_ref: ${{ needs.prdetails.outputs.pr_data_json && fromJSON(needs.prdetails.outputs.pr_data_json).head.sha || github.ref || github.head_ref }} + package: full + secrets: + ELEMENT_BOT_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }} + NETLIFY_AUTH_TOKEN: ${{ secrets.NETLIFY_AUTH_TOKEN }} + NETLIFY_SITE_ID: ${{ secrets.NETLIFY_SITE_ID }} + + netlify-sdk: + needs: prdetails + permissions: + deployments: write + uses: ./.github/workflows/deploy-to-netlify.yaml + with: + artifact_run_id: ${{ github.event.workflow_run.id || github.run_id }} + pr_number: ${{ needs.prdetails.outputs.pr_number }} + pr_head_full_name: ${{ github.event.workflow_run.head_repository.full_name }} + pr_head_ref: ${{ needs.prdetails.outputs.pr_data_json && fromJSON(needs.prdetails.outputs.pr_data_json).head.ref }} + deployment_ref: ${{ needs.prdetails.outputs.pr_data_json && fromJSON(needs.prdetails.outputs.pr_data_json).head.sha || github.ref || github.head_ref }} + package: sdk secrets: ELEMENT_BOT_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }} NETLIFY_AUTH_TOKEN: ${{ secrets.NETLIFY_AUTH_TOKEN }} diff --git a/.github/workflows/publish-embedded-packages.yaml b/.github/workflows/publish-embedded-packages.yaml index 275397b5..fc8a640f 100644 --- a/.github/workflows/publish-embedded-packages.yaml +++ b/.github/workflows/publish-embedded-packages.yaml @@ -153,7 +153,7 @@ jobs: path: embedded/android/lib/src/main/assets/element-call - name: ☕️ Setup Java - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4 + uses: actions/setup-java@c1e323688fd81a25caa38c78aa6df2d33d3e20d9 # v4 with: distribution: "temurin" java-version: "17" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 3251f50e..012de7cb 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -22,7 +22,7 @@ jobs: - name: Vitest run: "yarn run test:coverage" - name: Upload to codecov - uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5 + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} with: diff --git a/.github/workflows/translations-download.yaml b/.github/workflows/translations-download.yaml index 76fe418c..45f366cd 100644 --- a/.github/workflows/translations-download.yaml +++ b/.github/workflows/translations-download.yaml @@ -42,7 +42,7 @@ jobs: - name: Create Pull Request id: cpr - uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7.0.9 + uses: peter-evans/create-pull-request@22a9089034f40e5a961c8808d113e2c98fb63676 # v7.0.11 with: token: ${{ secrets.ELEMENT_BOT_TOKEN }} branch: actions/localazy-download diff --git a/config/config_netlify_preview_sdk.json b/config/config_netlify_preview_sdk.json new file mode 100644 index 00000000..784f0c7e --- /dev/null +++ b/config/config_netlify_preview_sdk.json @@ -0,0 +1,16 @@ +{ + "default_server_config": { + "m.homeserver": { + "base_url": "https://call-unstable.ems.host", + "server_name": "call-unstable.ems.host" + } + }, + "ssla": "https://static.element.io/legal/element-software-and-services-license-agreement-uk-1.pdf", + "matrix_rtc_session": { + "wait_for_key_rotation_ms": 3000, + "membership_event_expiry_ms": 180000000, + "delayed_leave_event_delay_ms": 18000, + "delayed_leave_event_restart_ms": 4000, + "network_error_retry_ms": 100 + } +} diff --git a/package.json b/package.json index 14193013..49612120 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,9 @@ "build:embedded": "yarn build:full --config vite-embedded.config.js", "build:embedded:production": "yarn build:embedded", "build:embedded:development": "yarn build:embedded --mode development", - "build:sdk": "yarn build:full --config vite-sdk.config.js", "build:sdk:development": "yarn build:sdk --mode development", + "build:sdk": "yarn build:full --config vite-sdk.config.js", + "build:sdk:production": "yarn build:sdk", "serve": "vite preview", "prettier:check": "prettier -c .", "prettier:format": "prettier -w .", @@ -104,7 +105,7 @@ "livekit-client": "^2.13.0", "lodash-es": "^4.17.21", "loglevel": "^1.9.1", - "matrix-js-sdk": "matrix-org/matrix-js-sdk#develop", + "matrix-js-sdk": "matrix-org/matrix-js-sdk#6e3efef0c5f660df47cf00874927dec1c75cc3cf", "matrix-widget-api": "^1.16.1", "node-stdlib-browser": "^1.3.1", "normalize.css": "^8.0.1", diff --git a/sdk/README.md b/sdk/README.md index 91337f10..ad8ff97e 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -1,4 +1,4 @@ -# SDK mode +# SDK mode (EXPERIMENTAL) EC can be build in sdk mode. This will result in a compiled js file that can be imported in very simple webapps. diff --git a/sdk/helper.ts b/sdk/helper.ts index a3d597be..47de4a93 100644 --- a/sdk/helper.ts +++ b/sdk/helper.ts @@ -12,15 +12,12 @@ Please see LICENSE in the repository root for full details. import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { scan } from "rxjs"; -import { widget as _widget } from "../src/widget"; +import { type WidgetHelpers } from "../src/widget"; import { type LivekitRoomItem } from "../src/state/CallViewModel/CallViewModel"; export const logger = rootLogger.getChild("[MatrixRTCSdk]"); -if (!_widget) throw Error("No widget. This webapp can only start as a widget"); -export const widget = _widget; - -export const tryMakeSticky = (): void => { +export const tryMakeSticky = (widget: WidgetHelpers): void => { logger.info("try making sticky MatrixRTCSdk"); void widget.api .setAlwaysOnScreen(true) diff --git a/sdk/main.ts b/sdk/main.ts index a273ed8a..c65bf4a7 100644 --- a/sdk/main.ts +++ b/sdk/main.ts @@ -6,6 +6,8 @@ Please see LICENSE in the repository root for full details. */ /** + * EXPERIMENTAL + * * This file is the entrypoint for the sdk build of element call: `yarn build:sdk` * use in widgets. * It exposes the `createMatrixRTCSdk` which creates the `MatrixRTCSdk` interface (see below) that @@ -30,8 +32,8 @@ import { } from "rxjs"; import { type CallMembership, - MatrixRTCSession, MatrixRTCSessionEvent, + MatrixRTCSessionManager, } from "matrix-js-sdk/lib/matrixrtc"; import { type Room as LivekitRoom, @@ -50,14 +52,12 @@ import { getUrlParams } from "../src/UrlParams"; import { MuteStates } from "../src/state/MuteStates"; import { MediaDevices } from "../src/state/MediaDevices"; import { E2eeType } from "../src/e2ee/e2eeType"; +import { currentAndPrev, logger, TEXT_LK_TOPIC, tryMakeSticky } from "./helper"; import { - currentAndPrev, - logger, - TEXT_LK_TOPIC, - tryMakeSticky, - widget, -} from "./helper"; -import { ElementWidgetActions, initializeWidget } from "../src/widget"; + ElementWidgetActions, + widget as _widget, + initializeWidget, +} from "../src/widget"; import { type Connection } from "../src/state/CallViewModel/remoteMembers/Connection"; interface MatrixRTCSdk { @@ -68,7 +68,13 @@ interface MatrixRTCSdk { join: () => void; /** @throws on leave errors */ leave: () => void; - data$: Observable<{ sender: string; data: string }>; + /** + * Ends the rtc sdk. This will unsubscribe any event listeners. And end the associated scope. + * No updates can be received from the rtc sdk. The sdk cannot be restarted after. + * A new sdk needs to be created via createMatrixRTCSdk. + */ + stop: () => void; + data$: Observable<{ rtcBackendIdentity: string; data: string }>; /** * flattened list of members */ @@ -79,32 +85,54 @@ interface MatrixRTCSdk { participant: LocalParticipant | RemoteParticipant | null; }[] >; + /** + * flattened local members + */ + localMember$: Behavior<{ + connection: Connection | null; + membership: CallMembership; + participant: LocalParticipant | null; + } | null>; /** Use the LocalMemberConnectionState returned from `join` for a more detailed connection state */ connected$: Behavior; sendData?: (data: unknown) => Promise; + sendRoomMessage?: (message: string) => Promise; } export async function createMatrixRTCSdk( application: string = "m.call", id: string = "", + sticky: boolean = false, ): Promise { - initializeWidget(); + const scope = new ObservableScope(); + + // widget client + initializeWidget(application, true); + const widget = _widget; + if (!widget) throw Error("No widget. This webapp can only start as a widget"); const client = await widget.client; logger.info("client created"); - const scope = new ObservableScope(); + + // url params const { roomId } = getUrlParams(); if (roomId === null) throw Error("could not get roomId from url params"); - const room = client.getRoom(roomId); if (room === null) throw Error("could not get room from client"); + // rtc session + const slot = { application, id }; + const rtcSessionManager = new MatrixRTCSessionManager(logger, client, slot); + rtcSessionManager.start(); + const rtcSession = rtcSessionManager.getRoomSession(room); + + // media devices const mediaDevices = new MediaDevices(scope); const muteStates = new MuteStates(scope, mediaDevices, { - audioEnabled: true, - videoEnabled: true, + audioEnabled: false, + videoEnabled: false, }); - const slot = { application, id }; - const rtcSession = new MatrixRTCSession(client, room, slot); + + // call view model const callViewModel = createCallViewModel$( scope, rtcSession, @@ -117,8 +145,9 @@ export async function createMatrixRTCSdk( constant({ supported: false, processor: undefined }), ); logger.info("CallViewModelCreated"); + // create data listener - const data$ = new Subject<{ sender: string; data: string }>(); + const data$ = new Subject<{ rtcBackendIdentity: string; data: string }>(); const lkTextStreamHandlerFunction = async ( reader: TextStreamReader, @@ -140,7 +169,7 @@ export async function createMatrixRTCSdk( if (participants && participants.includes(participantInfo.identity)) { const text = await reader.readAll(); logger.info(`Received text: ${text}`); - data$.next({ sender: participantInfo.identity, data: text }); + data$.next({ rtcBackendIdentity: participantInfo.identity, data: text }); } else { logger.warn( "Received text from unknown participant", @@ -230,6 +259,16 @@ export async function createMatrixRTCSdk( } }; + const sendRoomMessage = async (message: string): Promise => { + const messageString = JSON.stringify(message); + logger.info("try sending to room: ", messageString); + try { + await client.sendTextMessage(room.roomId, message); + } catch (e) { + logger.error("failed sending to room: ", messageString, e); + } + }; + // after hangup gets called const leaveSubs = callViewModel.leave$.subscribe(() => { const scheduleWidgetCloseOnLeave = async (): Promise => { @@ -257,9 +296,6 @@ export async function createMatrixRTCSdk( // schedule close first and then leave (scope.end) void scheduleWidgetCloseOnLeave(); - - // actual hangup (ending scope will send the leave event.. its kinda odd. since you might end up closing the widget too fast) - scope.end(); }); logger.info("createMatrixRTCSdk done"); @@ -267,15 +303,40 @@ export async function createMatrixRTCSdk( return { join: (): void => { // first lets try making the widget sticky - tryMakeSticky(); + if (sticky) tryMakeSticky(widget); callViewModel.join(); }, leave: (): void => { - callViewModel.hangup(); + callViewModel.leave(); + }, + stop: (): void => { leaveSubs.unsubscribe(); livekitRoomItemsSub.unsubscribe(); + scope.end(); }, data$, + localMember$: scope.behavior( + callViewModel.localMatrixLivekitMember$.pipe( + tap((member) => + logger.info("localMatrixLivekitMember$ next: ", member), + ), + switchMap((member) => { + if (member === null) return of(null); + return combineLatest([ + member.connection$, + member.membership$, + member.participant.value$, + ]).pipe( + map(([connection, membership, participant]) => ({ + connection, + membership, + participant, + })), + ); + }), + tap((member) => logger.info("localMember$ next: ", member)), + ), + ), connected$: callViewModel.connected$, members$: scope.behavior( callViewModel.matrixLivekitMembers$.pipe( @@ -302,5 +363,6 @@ export async function createMatrixRTCSdk( [], ), sendData, + sendRoomMessage, }; } diff --git a/src/settings/rageshake.test.ts b/src/settings/rageshake.test.ts new file mode 100644 index 00000000..9c3f1486 --- /dev/null +++ b/src/settings/rageshake.test.ts @@ -0,0 +1,34 @@ +/* +Copyright 2026 Element Creations Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { expect, it } from "vitest"; + +import { init as initRageshake } from "./rageshake"; + +it("Logger should not crash if JSON.stringify fails", async () => { + // JSON.stringify can throw. We want to make sure that the logger can handle this gracefully. + await initRageshake(); + + const bigIntObj = { n: 1n }; + const notStringifiable = { + bigIntObj, + }; + // @ts-expect-error - we want to create an object that cannot be stringified + notStringifiable.foo = notStringifiable; // circular reference + + // ensure this cannot be stringified + expect(() => JSON.stringify(notStringifiable)).toThrow(); + + expect(() => + global.mx_rage_logger.log( + 1, + "test", + "This is a test message", + notStringifiable, + ), + ).not.toThrow(); +}); diff --git a/src/settings/rageshake.ts b/src/settings/rageshake.ts index 26d0839b..c288f73e 100644 --- a/src/settings/rageshake.ts +++ b/src/settings/rageshake.ts @@ -75,7 +75,14 @@ class ConsoleLogger extends EventEmitter { } else if (arg instanceof Error) { return arg.message + (arg.stack ? `\n${arg.stack}` : ""); } else if (typeof arg === "object") { - return JSON.stringify(arg, getCircularReplacer()); + try { + return JSON.stringify(arg, getCircularReplacer()); + } catch { + // Stringify can fail if the object has circular references or if + // there is a bigInt. + // Did happen even with our `getCircularReplacer`. In this case, just log + return "<$ failed to serialize object $>"; + } } else { return arg; } diff --git a/src/state/CallViewModel/CallViewModel.ts b/src/state/CallViewModel/CallViewModel.ts index cf6ca92b..61afb7b9 100644 --- a/src/state/CallViewModel/CallViewModel.ts +++ b/src/state/CallViewModel/CallViewModel.ts @@ -60,6 +60,7 @@ import { import { accumulate, filterBehavior, + generateItem, generateItems, pauseWhen, } from "../../utils/observable"; @@ -217,15 +218,23 @@ export interface CallViewModel { "unknown" | "ringing" | "timeout" | "decline" | "success" | null >; /** Observable that emits when the user should leave the call (hangup pressed, widget action, error). - * THIS DOES NOT LEAVE THE CALL YET. The only way to leave the call (send the hangup event) is by ending the scope. + * THIS DOES NOT LEAVE THE CALL YET. The only way to leave the call (send the hangup event) is + * - by ending the scope + * - or calling requestDisconnect + * + * TODO: it seems more reasonable to add a leave() method (that calls requestDisconnect) that will then update leave$ and remove the hangup pattern */ leave$: Observable<"user" | AutoLeaveReason>; - /** Call to initiate hangup. Use in conbination with reconnectino state track the async hangup process. */ + /** Call to initiate hangup. Use in conbination with reconnection state track the async hangup process. */ hangup: () => void; // joining join: () => void; + /** + * calls requestDisconnect. The async leave state can than be observed via connected$ + */ + leave: () => void; // screen sharing /** * Callback to toggle screen sharing. If null, screen sharing is not possible. @@ -436,35 +445,38 @@ export function createCallViewModel$( memberId: uuidv4(), }; - const localTransport$ = createLocalTransport$({ - scope: scope, - memberships$: memberships$, - ownMembershipIdentity, - client, - delayId$: scope.behavior( - ( - fromEvent( - matrixRTCSession, - MembershipManagerEvent.DelayIdChanged, - // The type of reemitted event includes the original emitted as the second arg. - ) as Observable<[string | undefined, IMembershipManager]> - ).pipe(map(([delayId]) => delayId ?? null)), - matrixRTCSession.delayId ?? null, - ), - roomId: matrixRoom.roomId, - forceJwtEndpoint$: scope.behavior( - matrixRTCMode$.pipe( - map((v) => - v === MatrixRTCMode.Matrix_2_0 - ? JwtEndpointVersion.Matrix_2_0 - : JwtEndpointVersion.Legacy, - ), + const localTransport$ = scope.behavior( + matrixRTCMode$.pipe( + generateItem( + "CallViewModel localTransport$", + // Re-create LocalTransport whenever the mode changes + (mode) => ({ keys: [mode], data: undefined }), + (scope, _data$, mode) => + createLocalTransport$({ + scope: scope, + memberships$: memberships$, + ownMembershipIdentity, + client, + delayId$: scope.behavior( + ( + fromEvent( + matrixRTCSession, + MembershipManagerEvent.DelayIdChanged, + // The type of reemitted event includes the original emitted as the second arg. + ) as Observable<[string | undefined, IMembershipManager]> + ).pipe(map(([delayId]) => delayId ?? null)), + matrixRTCSession.delayId ?? null, + ), + roomId: matrixRoom.roomId, + forceJwtEndpoint: + mode === MatrixRTCMode.Matrix_2_0 + ? JwtEndpointVersion.Matrix_2_0 + : JwtEndpointVersion.Legacy, + useOldestMember: mode === MatrixRTCMode.Legacy, + }), ), ), - useOldestMember$: scope.behavior( - matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)), - ), - }); + ); const connectionFactory = new ECConnectionFactory( client, @@ -483,6 +495,7 @@ export function createCallViewModel$( connectionFactory: connectionFactory, localTransport$: scope.behavior( localTransport$.pipe( + switchMap((t) => t.active$), catchError((e: unknown) => { logger.info( "could not pass local transport to createConnectionManager$. localTransport$ threw an error", @@ -516,13 +529,13 @@ export function createCallViewModel$( ); const localMembership = createLocalMembership$({ - scope: scope, + scope, homeserverConnected: createHomeserverConnected$( scope, client, matrixRTCSession, ), - muteStates: muteStates, + muteStates, joinMatrixRTC: (transport: LivekitTransportConfig) => { return enterRTCSession( matrixRTCSession, @@ -542,9 +555,11 @@ export function createCallViewModel$( ), ); }, - connectionManager: connectionManager, - matrixRTCSession: matrixRTCSession, - localTransport$: localTransport$, + connectionManager, + matrixRTCSession, + localTransport$: scope.behavior( + localTransport$.pipe(switchMap((t) => t.advertised$)), + ), logger: logger.getChild(`[${Date.now()}]`), }); @@ -715,6 +730,7 @@ export function createCallViewModel$( // Generate a collection of MediaItems from the list of expected (whether // present or missing) LiveKit participants. generateItems( + "CallViewModel userMedia$", function* ([ localMatrixLivekitMember, matrixLivekitMembers, @@ -1496,6 +1512,7 @@ export function createCallViewModel$( leave$: leave$, hangup: (): void => userHangup$.next(), join: localMembership.requestJoinAndPublish, + leave: localMembership.requestDisconnect, toggleScreenSharing: toggleScreenSharing, sharingScreen$: sharingScreen$, @@ -1545,7 +1562,15 @@ export function createCallViewModel$( matrixLivekitMembers$.pipe( map((members) => members.value), tap((v) => { - logger.debug("matrixLivekitMembers$ updated (exported)", v); + const listForLogs = v + .map( + (m) => + m.membership$.value.userId + "|" + m.membership$.value.deviceId, + ) + .join(","); + logger.debug( + `matrixLivekitMembers$ updated (exported) [${listForLogs}]`, + ); }), ), ), diff --git a/src/state/CallViewModel/localMember/LocalMember.test.ts b/src/state/CallViewModel/localMember/LocalMember.test.ts index b228cd08..e5e9f327 100644 --- a/src/state/CallViewModel/localMember/LocalMember.test.ts +++ b/src/state/CallViewModel/localMember/LocalMember.test.ts @@ -39,7 +39,6 @@ import { constant } from "../../Behavior"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionState, type Connection } from "../remoteMembers/Connection"; import { type Publisher } from "./Publisher"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport"; import { initializeWidget } from "../../../widget"; initializeWidget(); @@ -216,11 +215,10 @@ describe("LocalMembership", () => { it("throws error on missing RTC config error", () => { withTestScheduler(({ scope, hot, expectObservable }) => { - const localTransport$ = - scope.behavior( - hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), - null, - ); + const localTransport$ = scope.behavior( + hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), + null, + ); // we do not need any connection data since we want to fail before reaching that. const mockConnectionManager = { @@ -279,23 +277,11 @@ describe("LocalMembership", () => { }); const aTransport = { - transport: { - livekit_service_url: "a", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "a", + } as LivekitTransportConfig; const bTransport = { - transport: { - livekit_service_url: "b", - } as LivekitTransportConfig, - sfuConfig: { - url: "sfu-url", - jwt: "sfu-token", - }, - } as LocalTransportWithSFUConfig; + livekit_service_url: "b", + } as LivekitTransportConfig; const connectionTransportAConnected = { livekitRoom: mockLivekitRoom({ @@ -305,7 +291,7 @@ describe("LocalMembership", () => { } as unknown as LocalParticipant, }), state$: constant(ConnectionState.LivekitConnected), - transport: aTransport.transport, + transport: aTransport, } as unknown as Connection; const connectionTransportAConnecting = { ...connectionTransportAConnected, @@ -314,7 +300,7 @@ describe("LocalMembership", () => { } as unknown as Connection; const connectionTransportBConnected = { state$: constant(ConnectionState.LivekitConnected), - transport: bTransport.transport, + transport: bTransport, livekitRoom: mockLivekitRoom({}), } as unknown as Connection; @@ -368,12 +354,8 @@ describe("LocalMembership", () => { // stop the first Publisher and let the second one life. expect(publishers[0].destroy).toHaveBeenCalled(); expect(publishers[1].destroy).not.toHaveBeenCalled(); - expect(publisherFactory.mock.calls[0][0].transport).toBe( - aTransport.transport, - ); - expect(publisherFactory.mock.calls[1][0].transport).toBe( - bTransport.transport, - ); + expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); + expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); scope.end(); await flushPromises(); // stop all tracks after ending scopes @@ -446,8 +428,9 @@ describe("LocalMembership", () => { const scope = new ObservableScope(); const connectionManagerData = new ConnectionManagerData(); - const localTransport$ = - new BehaviorSubject(null); + const localTransport$ = new BehaviorSubject( + null, + ); const connectionManagerData$ = new BehaviorSubject( new Epoch(connectionManagerData), ); @@ -519,7 +502,7 @@ describe("LocalMembership", () => { }); ( - connectionManagerData2.getConnectionForTransport(aTransport.transport)! + connectionManagerData2.getConnectionForTransport(aTransport)! .state$ as BehaviorSubject ).next(ConnectionState.LivekitConnected); expect(localMembership.localMemberState$.value).toStrictEqual({ diff --git a/src/state/CallViewModel/localMember/LocalMember.ts b/src/state/CallViewModel/localMember/LocalMember.ts index 2f38ad82..eb641ca7 100644 --- a/src/state/CallViewModel/localMember/LocalMember.ts +++ b/src/state/CallViewModel/localMember/LocalMember.ts @@ -62,7 +62,6 @@ import { } from "../remoteMembers/Connection.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { and$ } from "../../../utils/observable.ts"; -import { type LocalTransportWithSFUConfig } from "./LocalTransport.ts"; export enum TransportState { /** Not even a transport is available to the LocalMembership */ @@ -128,7 +127,7 @@ interface Props { createPublisherFactory: (connection: Connection) => Publisher; joinMatrixRTC: (transport: LivekitTransportConfig) => void; homeserverConnected: HomeserverConnected; - localTransport$: Behavior; + localTransport$: Behavior; matrixRTCSession: Pick< MatrixRTCSession, "updateCallIntent" | "leaveRoomSession" @@ -147,7 +146,7 @@ interface Props { * @param props.createPublisherFactory Factory to create a publisher once we have a connection. * @param props.joinMatrixRTC Callback to join the matrix RTC session once we have a transport. * @param props.homeserverConnected The homeserver connected state. - * @param props.localTransport$ The local transport to use for publishing. + * @param props.localTransport$ The transport to advertise in our membership. * @param props.logger The logger to use. * @param props.muteStates The mute states for video and audio. * @param props.matrixRTCSession The matrix RTC session to join. @@ -237,9 +236,7 @@ export const createLocalMembership$ = ({ return null; } - return connectionData.getConnectionForTransport( - localTransport.transport, - ); + return connectionData.getConnectionForTransport(localTransport); }), tap((connection) => { logger.info( @@ -549,7 +546,7 @@ export const createLocalMembership$ = ({ if (!shouldConnect) return; try { - joinMatrixRTC(transport.transport); + joinMatrixRTC(transport); } catch (error) { logger.error("Error entering RTC session", error); if (error instanceof Error) diff --git a/src/state/CallViewModel/localMember/LocalTransport.test.ts b/src/state/CallViewModel/localMember/LocalTransport.test.ts index 2476923a..8454b09a 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.test.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.test.ts @@ -13,15 +13,24 @@ import { it, type MockedObject, vi, + type MockInstance, } from "vitest"; -import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; +import { + type CallMembership, + type LivekitTransportConfig, +} from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, lastValueFrom } from "rxjs"; import fetchMock from "fetch-mock"; -import { mockConfig, flushPromises, ownMemberMock } from "../../../utils/test"; +import { + mockConfig, + flushPromises, + ownMemberMock, + mockRtcMembership, +} from "../../../utils/test"; import { createLocalTransport$, JwtEndpointVersion } from "./LocalTransport"; import { constant } from "../../Behavior"; -import { Epoch, ObservableScope } from "../../ObservableScope"; +import { Epoch, ObservableScope, trackEpoch } from "../../ObservableScope"; import { MatrixRTCTransportMissingError, FailToGetOpenIdToken, @@ -43,10 +52,10 @@ describe("LocalTransport", () => { afterEach(() => scope.end()); it("throws if config is missing", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -58,14 +67,15 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( new MatrixRTCTransportMissingError(""), ); + expect(() => active$.value).toThrow(new MatrixRTCTransportMissingError("")); }); it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => { @@ -83,10 +93,10 @@ describe("LocalTransport", () => { ); const observations: unknown[] = []; const errors: Error[] = []; - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!example_room_id", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { baseUrl: "https://lk.example.org", @@ -98,10 +108,10 @@ describe("LocalTransport", () => { getDeviceId: vi.fn(), }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); - localTransport$.subscribe( + active$.subscribe( (o) => observations.push(o), (e) => errors.push(e), ); @@ -111,7 +121,8 @@ describe("LocalTransport", () => { const expectedError = new FailToGetOpenIdToken(new Error("no openid")); expect(observations).toStrictEqual([null]); expect(errors).toStrictEqual([expectedError]); - expect(() => localTransport$.value).toThrow(expectedError); + expect(() => advertised$.value).toThrow(expectedError); + expect(() => active$.value).toThrow(expectedError); }); it("emits preferred transport after OpenID resolves", async () => { @@ -126,10 +137,10 @@ describe("LocalTransport", () => { openIdResolver.promise, ); - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, roomId: "!room:example.org", - useOldestMember$: constant(false), + useOldestMember: false, memberships$: constant(new Epoch([])), client: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -140,7 +151,7 @@ describe("LocalTransport", () => { baseUrl: "https://lk.example.org", }, ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant("delay_id_mock"), }); @@ -150,14 +161,17 @@ describe("LocalTransport", () => { livekitAlias: "Akph4alDMhen", livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId, }); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "jwt", livekitAlias: "Akph4alDMhen", @@ -167,51 +181,122 @@ describe("LocalTransport", () => { }); }); - it("updates local transport when oldest member changes", async () => { - // Use config so transport discovery succeeds, but delay OpenID JWT fetch - mockConfig({ - livekit: { livekit_service_url: "https://lk.example.org" }, + describe("oldest member mode", () => { + const aliceTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://alice.example.org", + }; + const bobTransport: LivekitTransportConfig = { + type: "livekit", + livekit_service_url: "https://bob.example.org", + }; + const aliceMembership = mockRtcMembership("@alice:example.org", "AAA", { + fociPreferred: [aliceTransport], }); - const memberships$ = new BehaviorSubject(new Epoch([])); - const openIdResolver = Promise.withResolvers(); - - vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue( - openIdResolver.promise, - ); - - const localTransport$ = createLocalTransport$({ - scope, - roomId: "!example_room_id", - useOldestMember$: constant(true), - memberships$, - client: { - getDomain: () => "", - // eslint-disable-next-line @typescript-eslint/naming-convention - _unstable_getRTCTransports: async () => Promise.resolve([]), - getOpenIdToken: vi.fn(), - getDeviceId: vi.fn(), - baseUrl: "https://lk.example.org", - }, - ownMembershipIdentity: ownMemberMock, - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), - delayId$: constant("delay_id_mock"), + const bobMembership = mockRtcMembership("@bob:example.org", "BBB", { + fociPreferred: [bobTransport], }); - openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); - await flushPromises(); - // final - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, - sfuConfig: { - jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", - livekitAlias: "Akph4alDMhen", - livekitIdentity: "@lk_user:ABCDEF", - url: "https://lk.example.org", - }, + let openIdSpy: MockInstance<(typeof openIDSFU)["getSFUConfigWithOpenID"]>; + beforeEach(() => { + openIdSpy = vi + .spyOn(openIDSFU, "getSFUConfigWithOpenID") + .mockResolvedValue(openIdResponse); + }); + + it("updates active transport when oldest member changes", async () => { + // Initially, Alice is the only member + const memberships$ = new BehaviorSubject([aliceMembership]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => Promise.resolve([]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Alice's transport should be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob joins the call, but Alice is still the oldest member + openIdSpy.mockClear(); + memberships$.next([aliceMembership, bobMembership]); + await flushPromises(); + // No new SFU config should've been fetched + expect(openIdSpy).not.toHaveBeenCalled(); + // Alice's transport should still be active and advertised + expect(active$.value?.transport).toStrictEqual(aliceTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + + // Now Bob takes Alice's place as the oldest member + openIdSpy.mockClear(); + memberships$.next([bobMembership, aliceMembership]); + // Active transport should reset to null until we have Bob's SFU config + expect(active$.value).toStrictEqual(null); + await flushPromises(); + // Bob's SFU config should've been fetched + expect(openIdSpy).toHaveBeenCalled(); + // Bob's transport should be active, but Alice's should remain advertised + // (since we don't want the change in oldest member to cause a wave of new + // state events) + expect(active$.value?.transport).toStrictEqual(bobTransport); + expect(advertised$.value).toStrictEqual(aliceTransport); + }); + + it("advertises preferred transport when no other member exists", async () => { + // Initially, there are no members + const memberships$ = new BehaviorSubject([]); + + const { advertised$, active$ } = createLocalTransport$({ + scope, + roomId: "!example_room_id", + useOldestMember: true, + memberships$: scope.behavior(memberships$.pipe(trackEpoch())), + client: { + getDomain: () => "", + // eslint-disable-next-line @typescript-eslint/naming-convention + _unstable_getRTCTransports: async () => + Promise.resolve([aliceTransport]), + getOpenIdToken: vi.fn(), + getDeviceId: vi.fn(), + baseUrl: "https://lk.example.org", + }, + ownMembershipIdentity: ownMemberMock, + forceJwtEndpoint: JwtEndpointVersion.Legacy, + delayId$: constant("delay_id_mock"), + }); + + expect(active$.value).toBe(null); + await flushPromises(); + // Our own preferred transport should be advertised + expect(advertised$.value).toStrictEqual(aliceTransport); + // No transport should be active however (there is still no oldest member) + expect(active$.value).toBe(null); + + // Now Bob joins the call and becomes the oldest member + memberships$.next([bobMembership]); + await flushPromises(); + // We should still advertise our own preferred transport (to avoid + // unnecessary state changes) + expect(advertised$.value).toStrictEqual(aliceTransport); + // Bob's transport should become active + expect(active$.value?.transport).toBe(bobTransport); }); }); @@ -229,8 +314,8 @@ describe("LocalTransport", () => { ownMembershipIdentity: ownMemberMock, scope, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -256,15 +341,19 @@ describe("LocalTransport", () => { mockConfig({ livekit: { livekit_service_url: "https://lk.example.org" }, }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -273,13 +362,15 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via user settings", async () => { customLivekitUrl.setValue("https://lk.example.org"); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ + expect(active$.value).toStrictEqual({ transport: { livekit_service_url: "https://lk.example.org", type: "livekit", @@ -292,19 +383,24 @@ describe("LocalTransport", () => { }, }); }); + it("supports getting transport via backend", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, ]); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -313,6 +409,7 @@ describe("LocalTransport", () => { }, }); }); + it("fails fast if the openID request fails for backend config", async () => { localTransportOpts.client._unstable_getRTCTransports.mockResolvedValue([ { type: "livekit", livekit_service_url: "https://lk.example.org" }, @@ -320,13 +417,11 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("supports getting transport via well-known", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -334,15 +429,19 @@ describe("LocalTransport", () => { { type: "livekit", livekit_service_url: "https://lk.example.org" }, ], }); - const localTransport$ = createLocalTransport$(localTransportOpts); + const { advertised$, active$ } = + createLocalTransport$(localTransportOpts); openIdResolver.resolve?.(openIdResponse); - expect(localTransport$.value).toBe(null); + expect(advertised$.value).toBe(null); + expect(active$.value).toBe(null); await flushPromises(); - expect(localTransport$.value).toStrictEqual({ - transport: { - livekit_service_url: "https://lk.example.org", - type: "livekit", - }, + const expectedTransport = { + livekit_service_url: "https://lk.example.org", + type: "livekit", + }; + expect(advertised$.value).toStrictEqual(expectedTransport); + expect(active$.value).toStrictEqual({ + transport: expectedTransport, sfuConfig: { jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", livekitAlias: "Akph4alDMhen", @@ -352,6 +451,7 @@ describe("LocalTransport", () => { }); expect(fetchMock.done()).toEqual(true); }); + it("fails fast if the openId request fails for the well-known config", async () => { localTransportOpts.client.getDomain.mockReturnValue("example.org"); fetchMock.getOnce("https://example.org/.well-known/matrix/client", { @@ -362,20 +462,18 @@ describe("LocalTransport", () => { openIdResolver.reject( new FailToGetOpenIdToken(new Error("Test driven error")), ); - try { - await lastValueFrom(createLocalTransport$(localTransportOpts)); - throw Error("Expected test to throw"); - } catch (ex) { - expect(ex).toBeInstanceOf(FailToGetOpenIdToken); - } + await expect(async () => + lastValueFrom(createLocalTransport$(localTransportOpts).active$), + ).rejects.toThrow(expect.any(FailToGetOpenIdToken)); }); + it("throws if no options are available", async () => { - const localTransport$ = createLocalTransport$({ + const { advertised$, active$ } = createLocalTransport$({ scope, ownMembershipIdentity: ownMemberMock, roomId: "!example_room_id", - useOldestMember$: constant(false), - forceJwtEndpoint$: constant(JwtEndpointVersion.Legacy), + useOldestMember: false, + forceJwtEndpoint: JwtEndpointVersion.Legacy, delayId$: constant(null), memberships$: constant(new Epoch([])), client: { @@ -390,7 +488,10 @@ describe("LocalTransport", () => { }); await flushPromises(); - expect(() => localTransport$.value).toThrow( + expect(() => advertised$.value).toThrow( + new MatrixRTCTransportMissingError(""), + ); + expect(() => active$.value).toThrow( new MatrixRTCTransportMissingError(""), ); }); diff --git a/src/state/CallViewModel/localMember/LocalTransport.ts b/src/state/CallViewModel/localMember/LocalTransport.ts index 73364094..0b566ba0 100644 --- a/src/state/CallViewModel/localMember/LocalTransport.ts +++ b/src/state/CallViewModel/localMember/LocalTransport.ts @@ -13,12 +13,15 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { - combineLatest, distinctUntilChanged, + first, from, map, + merge, of, + startWith, switchMap, + tap, } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; @@ -58,8 +61,8 @@ interface Props { OpenIDClientParts; // Used by the jwt service to create the livekit room and compute the livekit alias. roomId: string; - useOldestMember$: Behavior; - forceJwtEndpoint$: Behavior; + useOldestMember: boolean; + forceJwtEndpoint: JwtEndpointVersion; delayId$: Behavior; } @@ -93,23 +96,35 @@ export interface LocalTransportWithSFUConfig { transport: LivekitTransportConfig; sfuConfig: SFUConfig; } + export function isLocalTransportWithSFUConfig( obj: LivekitTransportConfig | LocalTransportWithSFUConfig, ): obj is LocalTransportWithSFUConfig { return "transport" in obj && "sfuConfig" in obj; } +interface LocalTransport { + /** + * The transport to be advertised in our MatrixRTC membership. `null` when not + * yet fetched/validated. + */ + advertised$: Behavior; + /** + * The transport to connect to and publish media on. `null` when not yet known + * or available. + */ + active$: Behavior; +} + /** - * This class is responsible for managing the local transport. - * "Which transport is the local member going to use" + * Connects to the JWT service and determines the transports that the local member should use. * * @prop useOldestMember Whether to use the same transport as the oldest member. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. - * - * @prop useOldJwtEndpoint$ Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. + * @prop useOldJwtEndpoint Whether to set forceOldJwtEndpoint on the returned transport and to use the old JWT endpoint. * This is used when the connection manager needs to know if it has to use the legacy endpoint which implies a string concatenated rtcBackendIdentity. * (which is expected for non sticky event based rtc member events) - * @returns The local transport. It will be created using the correct sfu endpoint based on the useOldJwtEndpoint$ value. + * @returns The transport to advertise in the local MatrixRTC membership, along with the transport to actively publish media to. * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ export const createLocalTransport$ = ({ @@ -118,114 +133,156 @@ export const createLocalTransport$ = ({ ownMembershipIdentity, client, roomId, - useOldestMember$, - forceJwtEndpoint$, + useOldestMember, + forceJwtEndpoint, delayId$, -}: Props): Behavior => { +}: Props): LocalTransport => { /** - * The transport over which we should be actively publishing our media. - * undefined when not joined. + * The LiveKit transport in use by the oldest RTC membership. `null` when the + * oldest member has no such transport. */ - const oldestMemberTransport$ = - scope.behavior( - combineLatest([memberships$, useOldestMember$]).pipe( - map(([memberships, useOldestMember]) => { - if (!useOldestMember) return null; // No need to do any prefetching if not using oldest member - const oldestMember = memberships.value[0]; - const transport = oldestMember?.getTransport(oldestMember); - if (!transport) return null; - return transport; - }), - switchMap((transport) => { - if (transport !== null && isLivekitTransportConfig(transport)) { - // Get the open jwt token to connect to the sfu - const computeLocalTransportWithSFUConfig = - async (): Promise => { - return { - transport, - sfuConfig: await getSFUConfigWithOpenID( - client, - ownMembershipIdentity, - transport.livekit_service_url, - roomId, - { forceJwtEndpoint: JwtEndpointVersion.Legacy }, - logger, - ), - }; - }; - return from(computeLocalTransportWithSFUConfig()); - } - return of(null); - }), - ), - null, - ); + const oldestMemberTransport$ = scope.behavior( + memberships$.pipe( + map((memberships) => { + const oldestMember = memberships.value[0]; + if (oldestMember === undefined) { + logger.info("Oldest member: not found"); + return null; + } + const transport = oldestMember.getTransport(oldestMember); + if (transport === undefined) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has no transport`, + ); + return null; + } + if (!isLivekitTransportConfig(transport)) { + logger.warn( + `Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has invalid transport`, + ); + return null; + } + logger.info( + "Oldest member: ${oldestMember.userId}|${oldestMember.deviceId}|${oldestMember.memberId} has valid transport", + ); + return transport; + }), + distinctUntilChanged(areLivekitTransportsEqual), + ), + ); /** * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). + * transport preferences of others, perhaps). `null` until fetched and + * validated. * * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken */ - const preferredTransport$ = scope.behavior( - // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new - // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity - // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) - // When using sticky events (we need to use the new endpoint). - combineLatest([customLivekitUrl.value$, delayId$, forceJwtEndpoint$]).pipe( - switchMap(([customUrl, delayId, forceEndpoint]) => { - logger.info( - "Creating preferred transport based on: ", - "customUrl: ", - customUrl, - "delayId: ", - delayId, - "forceEndpoint: ", - forceEndpoint, - ); - return from( - makeTransport( - client, - ownMembershipIdentity, - roomId, - customUrl, - forceEndpoint, - delayId ?? undefined, + const preferredTransport$ = + scope.behavior( + // preferredTransport$ (used for multi sfu) needs to know if we are using the old or new + // jwt endpoint (`get_token` vs `sfu/get`) based on that the jwt endpoint will compute the rtcBackendIdentity + // differently. (sha(`${userId}|${deviceId}|${memberId}`) vs `${userId}|${deviceId}|${memberId}`) + // When using sticky events (we need to use the new endpoint). + customLivekitUrl.value$.pipe( + switchMap((customUrl) => + startWith(null)( + // Fetch the SFU config, and repeat this asynchronously for every + // change in delay ID. + delayId$.pipe( + switchMap(async (delayId) => { + logger.info( + "Creating preferred transport based on: ", + "customUrl: ", + customUrl, + "delayId: ", + delayId, + "forceJwtEndpoint: ", + forceJwtEndpoint, + ); + return makeTransport( + client, + ownMembershipIdentity, + roomId, + customUrl, + forceJwtEndpoint, + delayId ?? undefined, + ); + }), + // We deliberately hide any changes to the SFU config because we + // do not actually want the app to reconnect whenever the JWT + // token changes due to us delegating a new delayed event. The + // initial SFU config for the transport is all the app needs. + distinctUntilChanged((prev, next) => + areLivekitTransportsEqual(prev.transport, next.transport), + ), + ), ), - ); - }), - ), - null, - ); + ), + ), + ); - /** - * The chosen transport we should advertise in our MatrixRTC membership. - */ - return scope.behavior( - combineLatest([ - useOldestMember$, - oldestMemberTransport$, - preferredTransport$, - ]).pipe( - map(([useOldestMember, oldestMemberTransport, preferredTransport]) => { - return useOldestMember - ? (oldestMemberTransport ?? preferredTransport) - : preferredTransport; - }), - distinctUntilChanged((t1, t2) => { - logger.info( - "Local Transport Update from:", - t1?.transport.livekit_service_url, - " to ", - t2?.transport.livekit_service_url, - ); - return areLivekitTransportsEqual( - t1?.transport ?? null, - t2?.transport ?? null, - ); - }), + if (useOldestMember) { + // --- Oldest member mode --- + return { + // Never update the transport that we advertise in our membership. Just + // take the first valid oldest member or preferred transport that we learn + // about, and stick with that. This avoids unnecessary SFU hops and room + // state changes. + advertised$: scope.behavior( + merge( + oldestMemberTransport$, + preferredTransport$.pipe(map((t) => t?.transport ?? null)), + ).pipe( + first((t) => t !== null), + tap((t) => + logger.info(`Advertise transport: ${t.livekit_service_url}`), + ), + ), + null, + ), + // Publish on the transport used by the oldest member. + active$: scope.behavior( + oldestMemberTransport$.pipe( + switchMap((transport) => { + // Oldest member not available (or invalid SFU config). + if (transport === null) return of(null); + // Oldest member available: fetch the SFU config. + const fetchOldestMemberTransport = + async (): Promise => ({ + transport, + sfuConfig: await getSFUConfigWithOpenID( + client, + ownMembershipIdentity, + transport.livekit_service_url, + roomId, + { forceJwtEndpoint: JwtEndpointVersion.Legacy }, + logger, + ), + }); + return from(fetchOldestMemberTransport()).pipe(startWith(null)); + }), + tap((t) => + logger.info( + `Publish on transport: ${t?.transport.livekit_service_url}`, + ), + ), + ), + ), + }; + } + + // --- Multi-SFU mode --- + // Always publish on and advertise the preferred transport. + return { + advertised$: scope.behavior( + preferredTransport$.pipe( + map((t) => t?.transport ?? null), + distinctUntilChanged(areLivekitTransportsEqual), + ), ), - ); + active$: preferredTransport$, + }; }; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 60c824b9..727f68bc 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -90,7 +90,7 @@ export interface IConnectionManager { * @param props - Configuration object * @param props.scope - The observable scope used by this object * @param props.connectionFactory - Used to create new connections - * @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$) + * @param props.localTransport$ - The transport to publish local media on. (deduplicated with remoteTransports$) * @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$) * @param props.ownMembershipIdentity - The own membership identity to use. * @param props.logger - The logger to use. @@ -162,22 +162,23 @@ export function createConnectionManager$({ const connections$ = scope.behavior( localAndRemoteTransports$.pipe( generateItemsWithEpoch( + "ConnectionManager connections$", function* (transports) { - for (const transportWithOrWithoutSfuConfig of transports) { - if ( - isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig) - ) { - // This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field - const { transport, sfuConfig } = transportWithOrWithoutSfuConfig; + for (const transport of transports) { + if (isLocalTransportWithSFUConfig(transport)) { + // This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field. yield { - keys: [transport.livekit_service_url, sfuConfig], + keys: [ + transport.transport.livekit_service_url, + transport.sfuConfig, + ], data: undefined, }; } else { yield { keys: [ - transportWithOrWithoutSfuConfig.livekit_service_url, - undefined as undefined | SFUConfig, + transport.livekit_service_url, + undefined as SFUConfig | undefined, ], data: undefined, }; @@ -193,6 +194,8 @@ export function createConnectionManager$({ }, ownMembershipIdentity, logger, + // TODO: This whole optional SFUConfig parameter is not particularly elegant. + // I would like it if connections always fetched the SFUConfig by themselves. sfuConfig, ); // Start the connection immediately diff --git a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts index 04c211d9..acd5b55f 100644 --- a/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts +++ b/src/state/CallViewModel/remoteMembers/MatrixLivekitMembers.ts @@ -11,7 +11,6 @@ import { type LivekitTransportConfig, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, filter, map } from "rxjs"; -import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; @@ -19,8 +18,6 @@ import { Epoch, type ObservableScope } from "../../ObservableScope"; import { type Connection } from "./Connection"; import { generateItemsWithEpoch } from "../../../utils/observable"; -const logger = rootLogger.getChild("[MatrixLivekitMembers]"); - interface LocalTaggedParticipant { type: "local"; value$: Behavior; @@ -94,9 +91,10 @@ export function createMatrixLivekitMembers$({ ), map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)), generateItemsWithEpoch( + "MatrixLivekitMembers", // Generator function. // creates an array of `{key, data}[]` - // Each change in the keys (new key, missing key) will result in a call to the factory function. + // Each change in the keys (new key) will result in a call to the factory function. function* ([membershipsWithTransport, managerData]) { for (const { membership, transport } of membershipsWithTransport) { const participants = transport @@ -111,26 +109,23 @@ export function createMatrixLivekitMembers$({ : null; yield { - // This could also just be the memberId without the other fields. - // In theory we should never have the same memberId for different userIds (they are UUIDs) - // This still makes us resilient agains someone who intentionally tries to use the same memberId. - // If they want to do this they would now need to also use the same sender which is impossible. + // This could just be the backend identity without the other keys. + // The user ID, device ID, and member ID are included however so + // they show up in debug logs. keys: [ membership.userId, membership.deviceId, membership.memberId, + membership.rtcBackendIdentity, ], data: { membership, participant, connection }, }; } }, - // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. - (scope, data$, userId, deviceId, memberId) => { - logger.debug( - `Generating member for livekitIdentity: ${data$.value.membership.rtcBackendIdentity},keys userId:deviceId:memberId ${userId}:${deviceId}:${memberId}`, - ); + // Each update where the key of the generator array do not change will result in updates to the `data$` behavior. + (scope, data$, userId, _deviceId, _memberId, _rtcBackendIdentity) => { const { participant$, ...rest } = scope.splitBehavior(data$); - // will only get called once per `participantId, userId` pair. + // will only get called once per backend identity. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. return { userId, diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index 3da69c46..7f806697 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -257,10 +257,9 @@ abstract class BaseMediaViewModel { * The Matrix user to which this media belongs. */ public readonly userId: string, - public readonly rtcBackendIdentity: string, // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // livekit. - protected readonly participant$: Observable< + protected readonly participant$: Behavior< LocalParticipant | RemoteParticipant | null >, @@ -407,8 +406,11 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { scope: ObservableScope, id: string, userId: string, - rtcBackendIdentity: string, - participant$: Observable, + /** + * The expected identity of the LiveKit participant. Exposed for debugging. + */ + public readonly rtcBackendIdentity: string, + participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom$: Behavior, focusUrl$: Behavior, @@ -421,7 +423,6 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { scope, id, userId, - rtcBackendIdentity, participant$, encryptionSystem, Track.Source.Microphone, @@ -677,7 +678,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { id: string, userId: string, rtcBackendIdentity: string, - participant$: Observable, + participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom$: Behavior, focusUrl$: Behavior, @@ -779,8 +780,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { scope: ObservableScope, id: string, userId: string, - rtcBackendIdentity: string, - participant$: Observable, + participant$: Behavior, encryptionSystem: EncryptionSystem, livekitRoom$: Behavior, focusUrl$: Behavior, @@ -793,7 +793,6 @@ export class ScreenShareViewModel extends BaseMediaViewModel { scope, id, userId, - rtcBackendIdentity, participant$, encryptionSystem, Track.Source.ScreenShareAudio, diff --git a/src/state/ScreenShare.ts b/src/state/ScreenShare.ts index e4f5de1f..6c908b1f 100644 --- a/src/state/ScreenShare.ts +++ b/src/state/ScreenShare.ts @@ -4,7 +4,7 @@ Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { of } from "rxjs"; + import { type LocalParticipant, type RemoteParticipant, @@ -14,7 +14,7 @@ import { import { type ObservableScope } from "./ObservableScope.ts"; import { ScreenShareViewModel } from "./MediaViewModel.ts"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; -import type { Behavior } from "./Behavior.ts"; +import { constant, type Behavior } from "./Behavior.ts"; /** * A screen share media item to be presented in a tile. This is a thin wrapper @@ -28,7 +28,6 @@ export class ScreenShare { private readonly scope: ObservableScope, id: string, userId: string, - rtcBackendIdentity: string, participant: LocalParticipant | RemoteParticipant, encryptionSystem: EncryptionSystem, livekitRoom$: Behavior, @@ -41,8 +40,7 @@ export class ScreenShare { this.scope, id, userId, - rtcBackendIdentity, - of(participant), + constant(participant), encryptionSystem, livekitRoom$, focusUrl$, diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 2a125257..2adc9134 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -130,6 +130,7 @@ export class UserMedia { // MediaViewModels don't support it though since they look for a unique // track for the given source. So generateItems here is a bit overkill. generateItems( + `${this.id} screenShares$`, function* (p) { if (p.isScreenShareEnabled) yield { @@ -142,7 +143,6 @@ export class UserMedia { scope, `${this.id}:${key}`, this.userId, - this.rtcBackendIdentity, p, this.encryptionSystem, this.livekitRoom$, diff --git a/src/state/layout-types.ts b/src/state/layout-types.ts index 3796715c..f32869df 100644 --- a/src/state/layout-types.ts +++ b/src/state/layout-types.ts @@ -10,6 +10,8 @@ import { type SpotlightTileViewModel, } from "./TileViewModel.ts"; import { + type LocalUserMediaViewModel, + type RemoteUserMediaViewModel, type MediaViewModel, type UserMediaViewModel, } from "./MediaViewModel.ts"; @@ -40,8 +42,8 @@ export interface SpotlightExpandedLayoutMedia { export interface OneOnOneLayoutMedia { type: "one-on-one"; - local: UserMediaViewModel; - remote: UserMediaViewModel; + local: LocalUserMediaViewModel; + remote: RemoteUserMediaViewModel; } export interface PipLayoutMedia { diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index be677367..80cbb3c8 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -47,6 +47,7 @@ test("generateItems", () => { expectObservable( hot(inputMarbles).pipe( generateItems( + "test items", function* (input) { for (let i = 1; i <= +input; i++) { yield { keys: [i], data: undefined }; diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 9739353f..2e19748b 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -24,6 +24,7 @@ import { type OperatorFunction, distinctUntilChanged, } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; import { type Behavior } from "../state/Behavior"; import { Epoch, ObservableScope } from "../state/ObservableScope"; @@ -122,8 +123,9 @@ export function pauseWhen(pause$: Behavior) { ); } -interface ItemHandle { +interface ItemHandle { scope: ObservableScope; + keys: readonly [...Keys]; data$: BehaviorSubject; item: Item; } @@ -135,6 +137,7 @@ interface ItemHandle { * requested at a later time, and destroyed (have their scope ended) when the * key is no longer requested. * + * @param name A name for this collection to use in debug logs. * @param generator A generator function yielding a tuple of keys and the * currently associated data for each item that it wants to exist. * @param factory A function constructing an individual item, given the item's key, @@ -146,16 +149,17 @@ export function generateItems< Data, Item, >( + name: string, generator: ( input: Input, - ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>, factory: ( scope: ObservableScope, data$: Behavior, ...keys: Keys ) => Item, ): OperatorFunction { - return generateItemsInternal(generator, factory, (items) => items); + return generateItemsInternal(name, generator, factory, (items) => items); } /** @@ -167,9 +171,10 @@ export function generateItemsWithEpoch< Data, Item, >( + name: string, generator: ( input: Input, - ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>, factory: ( scope: ObservableScope, data$: Behavior, @@ -177,6 +182,7 @@ export function generateItemsWithEpoch< ) => Item, ): OperatorFunction, Epoch> { return generateItemsInternal( + name, function* (input) { yield* generator(input.value); }, @@ -207,6 +213,38 @@ export function filterBehavior( ); } +/** + * Maps a changing input value to an item whose lifetime is tied to a certain + * computed key. The item may capture some dynamic data from the input. + */ +export function generateItem< + Input, + Keys extends [unknown, ...unknown[]], + Data, + Item, +>( + name: string, + generator: (input: Input) => { keys: readonly [...Keys]; data: Data }, + factory: ( + scope: ObservableScope, + data$: Behavior, + ...keys: Keys + ) => Item, +): OperatorFunction { + return (input$) => + input$.pipe( + generateItemsInternal( + name, + function* (input) { + yield generator(input); + }, + factory, + (items) => items, + ), + map(([item]) => item), + ); +} + function generateItemsInternal< Input, Keys extends [unknown, ...unknown[]], @@ -214,9 +252,10 @@ function generateItemsInternal< Item, Output, >( + name: string, generator: ( input: Input, - ) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, + ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>, factory: ( scope: ObservableScope, data$: Behavior, @@ -232,26 +271,34 @@ function generateItemsInternal< Input, { map: Map; - items: Set>; + items: Set>; input: Input; }, - { map: Map; items: Set> } + { map: Map; items: Set> } >( ({ map: prevMap, items: prevItems }, input) => { const nextMap = new Map(); - const nextItems = new Set>(); + const nextItems = new Set>(); for (const { keys, data } of generator(input)) { // Disable type checks for a second to grab the item out of a nested map let i: any = prevMap; for (const key of keys) i = i?.get(key); - let item = i as ItemHandle | undefined; + let item = i as ItemHandle | undefined; if (item === undefined) { // First time requesting the key; create the item const scope = new ObservableScope(); const data$ = new BehaviorSubject(data); - item = { scope, data$, item: factory(scope, data$, ...keys) }; + logger.debug( + `[${name}] Creating item with keys ${keys.join(", ")}`, + ); + item = { + scope, + keys, + data$, + item: factory(scope, data$, ...keys), + }; } else { item.data$.next(data); } @@ -269,7 +316,7 @@ function generateItemsInternal< const finalKey = keys[keys.length - 1]; if (m.has(finalKey)) throw new Error( - `Keys must be unique (tried to generate multiple items for key ${keys})`, + `Keys must be unique (tried to generate multiple items for key ${keys.join(", ")})`, ); m.set(keys[keys.length - 1], item); nextItems.add(item); @@ -277,7 +324,12 @@ function generateItemsInternal< // Destroy all items that are no longer being requested for (const item of prevItems) - if (!nextItems.has(item)) item.scope.end(); + if (!nextItems.has(item)) { + logger.debug( + `[${name}] Destroying item with keys ${item.keys.join(", ")}`, + ); + item.scope.end(); + } return { map: nextMap, items: nextItems, input }; }, @@ -285,7 +337,15 @@ function generateItemsInternal< ), finalizeValue(({ items }) => { // Destroy all remaining items when no longer subscribed - for (const { scope } of items) scope.end(); + logger.debug( + `[${name}] End of scope, destroying all ${items.size} items…`, + ); + for (const item of items) { + logger.debug( + `[${name}] Destroying item with keys ${item.keys.join(", ")}`, + ); + item.scope.end(); + } }), map(({ items, input }) => project( diff --git a/src/widget.test.ts b/src/widget.test.ts index f85c56bc..2e5bf743 100644 --- a/src/widget.test.ts +++ b/src/widget.test.ts @@ -5,7 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { beforeAll, describe, expect, vi, it } from "vitest"; +import { describe, expect, vi, it, beforeEach } from "vitest"; import { createRoomWidgetClient, EventType } from "matrix-js-sdk"; import { getUrlParams } from "./UrlParams"; @@ -35,11 +35,14 @@ vi.mock("./UrlParams", () => ({ })), })); -initializeWidget(); -describe("widget", () => { - beforeAll(() => {}); +beforeEach(() => { + createRoomWidgetClientSpy.mockClear(); +}); +describe("widget", () => { it("should create an embedded client with the correct params", () => { + initializeWidget("ANYRTCAPP"); + expect(getUrlParams()).toStrictEqual({ widgetId: "id", parentUrl: "http://parentUrl", @@ -66,13 +69,16 @@ describe("widget", () => { ]; const sendState = [ - "myYser", // Legacy call membership events - `_myYser_AAAAA_m.call`, // Session membership events - `myYser_AAAAA_m.call`, // The above with no leading underscore, for room versions whose auth rules allow it - ].map((stateKey) => ({ - eventType: EventType.GroupCallMemberPrefix, - stateKey, - })); + { eventType: "org.matrix.msc3401.call.member", stateKey: "myYser" }, // Legacy call membership events + { + eventType: "org.matrix.msc3401.call.member", + stateKey: `_myYser_AAAAA_ANYRTCAPP`, + }, // Session membership events + { + eventType: "org.matrix.msc3401.call.member", + stateKey: `myYser_AAAAA_ANYRTCAPP`, + }, // The above with no leading underscore, for room versions whose auth rules allow it + ]; const receiveState = [ { eventType: EventType.RoomCreate }, { eventType: EventType.RoomName }, @@ -124,4 +130,32 @@ describe("widget", () => { }); expect(createRoomWidgetClientSpy.mock.calls[0][4]).toStrictEqual(false); }); + + it("should request send message permission if requested", () => { + initializeWidget("ANYRTCAPP", true); + expect(createRoomWidgetClientSpy).toHaveBeenLastCalledWith( + expect.anything(), + // capabilities + expect.objectContaining({ + sendEvent: expect.arrayContaining(["m.room.message"]), + }), + expect.anything(), + expect.anything(), + expect.anything(), + ); + }); + + it("should not request send message permission when not requested", () => { + initializeWidget("", false); + expect(createRoomWidgetClientSpy).toHaveBeenLastCalledWith( + expect.anything(), + // capabilities + expect.objectContaining({ + sendEvent: expect.not.arrayContaining(["m.room.message"]), + }), + expect.anything(), + expect.anything(), + expect.anything(), + ); + }); }); diff --git a/src/widget.ts b/src/widget.ts index 16dbf514..321727f6 100644 --- a/src/widget.ts +++ b/src/widget.ts @@ -68,7 +68,10 @@ export let widget: WidgetHelpers | null; */ // this needs to be a seperate call and cannot be done on import to allow us to spy on methods in here before // execution. -export const initializeWidget = (): void => { +export const initializeWidget = ( + rtcApplication: string = "m.call", + sendRoomEvents = false, +): void => { try { const { widgetId, @@ -116,6 +119,9 @@ export const initializeWidget = (): void => { EventType.CallNotify, // Sent as a deprecated fallback EventType.RTCNotification, ]; + if (sendRoomEvents) { + sendEvent.push(EventType.RoomMessage); + } const sendRecvEvent = [ "org.matrix.rageshake_request", EventType.CallEncryptionKeysPrefix, @@ -128,8 +134,8 @@ export const initializeWidget = (): void => { const sendState = [ userId, // Legacy call membership events - `_${userId}_${deviceId}_m.call`, // Session membership events - `${userId}_${deviceId}_m.call`, // The above with no leading underscore, for room versions whose auth rules allow it + `_${userId}_${deviceId}_${rtcApplication}`, // Session membership events + `${userId}_${deviceId}_${rtcApplication}`, // The above with no leading underscore, for room versions whose auth rules allow it ].map((stateKey) => ({ eventType: EventType.GroupCallMemberPrefix, stateKey, diff --git a/yarn.lock b/yarn.lock index e486bf6b..b1d27dec 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8364,7 +8364,7 @@ __metadata: livekit-client: "npm:^2.13.0" lodash-es: "npm:^4.17.21" loglevel: "npm:^1.9.1" - matrix-js-sdk: "matrix-org/matrix-js-sdk#develop" + matrix-js-sdk: "matrix-org/matrix-js-sdk#6e3efef0c5f660df47cf00874927dec1c75cc3cf" matrix-widget-api: "npm:^1.16.1" node-stdlib-browser: "npm:^1.3.1" normalize.css: "npm:^8.0.1" @@ -11452,9 +11452,9 @@ __metadata: languageName: node linkType: hard -"matrix-js-sdk@matrix-org/matrix-js-sdk#develop": +"matrix-js-sdk@matrix-org/matrix-js-sdk#6e3efef0c5f660df47cf00874927dec1c75cc3cf": version: 40.1.0 - resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=f2157f28bbadf2898fe21991f69ccb2af40df326" + resolution: "matrix-js-sdk@https://github.com/matrix-org/matrix-js-sdk.git#commit=6e3efef0c5f660df47cf00874927dec1c75cc3cf" dependencies: "@babel/runtime": "npm:^7.12.5" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^17.0.0" @@ -11470,17 +11470,17 @@ __metadata: sdp-transform: "npm:^3.0.0" unhomoglyph: "npm:^1.0.6" uuid: "npm:13" - checksum: 10c0/d646b9214abbf0b9126760105edd9c57be7ffe8b53ae4acd5fefe841a51ad7d78fa57130922b3eac65ff2266b43f31ea60b4bdda9481e6bf8f1808d96726ed8a + checksum: 10c0/2c4db56fd0164d801c2f125ab2a442e3659314d4cc2fd640ea152b829d0db8b05ff808020e387a761afde4ff7a07b271c25431337de9f7c765c523c8cd837e36 languageName: node linkType: hard "matrix-widget-api@npm:^1.16.1": - version: 1.16.1 - resolution: "matrix-widget-api@npm:1.16.1" + version: 1.17.0 + resolution: "matrix-widget-api@npm:1.17.0" dependencies: "@types/events": "npm:^3.0.0" events: "npm:^3.2.0" - checksum: 10c0/d88180f514104b84d3018055fc955138d65195465480a51e9afe5dbf2f3175b54e3483b4c4f1feab2dd27440f403051d9c8b293bd0532c09b136c6b23606e1ee + checksum: 10c0/3651c860900149ecc2fe74640b47687bab8a347eb718a522085189e2b84efe462c9d81c1e8caff08d122f0b3e9cef8303a5802837673e5c9b465f7624c56a8f3 languageName: node linkType: hard