diff --git a/.changeset/plain-pumas-relax.md b/.changeset/plain-pumas-relax.md new file mode 100644 index 000000000..81dcb51c5 --- /dev/null +++ b/.changeset/plain-pumas-relax.md @@ -0,0 +1,5 @@ +--- +"@exactly/server": patch +--- + +✨ migrate webhook subscription to queue diff --git a/cspell.json b/cspell.json index 54e7eee97..ee52c49d8 100644 --- a/cspell.json +++ b/cspell.json @@ -139,6 +139,7 @@ "reactnavigation", "redeployer", "reentrancy", + "retriable", "rpid", "rustup", "scannability", diff --git a/server/api/auth/authentication.ts b/server/api/auth/authentication.ts index 04a777ff0..a45dc72e1 100644 --- a/server/api/auth/authentication.ts +++ b/server/api/auth/authentication.ts @@ -44,7 +44,7 @@ import database, { credentials } from "../../database"; import androidOrigins from "../../utils/android/origins"; import appOrigin from "../../utils/appOrigin"; import authSecret from "../../utils/authSecret"; -import createCredential from "../../utils/createCredential"; +import createCredential, { WebhookNotReadyError } from "../../utils/createCredential"; import decodePublicKey from "../../utils/decodePublicKey"; import getIntercomToken from "../../utils/intercom"; import publicClient from "../../utils/publicClient"; @@ -341,6 +341,10 @@ Submit the signed SIWE message to prove ownership of an Ethereum address. The se 200, ); } catch (error) { + if (error instanceof WebhookNotReadyError) { + captureException(error, { level: "warning", tags: { retriable: true } }); + return c.json({ code: "service unavailable" }, 503); + } captureException(error, { level: "error", tags: { unhandled: true } }); return c.json({ code: "ouch", legacy: "ouch" }, 500); } diff --git a/server/api/auth/registration.ts b/server/api/auth/registration.ts index eb07a2dc1..e65176060 100644 --- a/server/api/auth/registration.ts +++ b/server/api/auth/registration.ts @@ -40,7 +40,7 @@ import { Address, Base64URL, Hex } from "@exactly/common/validation"; import { Authentication } from "./authentication"; import androidOrigins from "../../utils/android/origins"; import appOrigin from "../../utils/appOrigin"; -import createCredential from "../../utils/createCredential"; +import createCredential, { WebhookNotReadyError } from "../../utils/createCredential"; import getIntercomToken from "../../utils/intercom"; import publicClient from "../../utils/publicClient"; import redis from "../../utils/redis"; @@ -370,6 +370,10 @@ export default new Hono() 200, ); } catch (error) { + if (error instanceof WebhookNotReadyError) { + captureException(error, { level: "warning", tags: { retriable: true } }); + return c.json({ code: "service unavailable" }, 503); + } captureException(error, { level: "error", tags: { unhandled: true } }); return c.json({ code: "ouch", legacy: "ouch" }, 500); } diff --git a/server/hooks/activity.ts b/server/hooks/activity.ts index 5fdaafb9a..fc16f2524 100644 --- a/server/hooks/activity.ts +++ b/server/hooks/activity.ts @@ -323,10 +323,15 @@ findWebhook(({ webhook_type, webhook_url }) => webhook_type === "ADDRESS_ACTIVIT .then(async (currentHook) => { if (currentHook) { webhookId = currentHook.id; + debug("alchemy webhook initialized with existing hook: %s", webhookId); return signingKeys.add(currentHook.signing_key); } const newHook = await createWebhook({ webhook_type: "ADDRESS_ACTIVITY", webhook_url: url, addresses: [] }); webhookId = newHook.id; + debug("alchemy webhook initialized with new hook: %s", webhookId); signingKeys.add(newHook.signing_key); }) - .catch((error: unknown) => captureException(error)); + .catch((error: unknown) => { + debug("failed to initialize alchemy webhook: %o", error); + captureException(error, { level: "error" }); + }); diff --git a/server/index.ts b/server/index.ts index 667fbae63..2cfcab1b3 100644 --- a/server/index.ts +++ b/server/index.ts @@ -20,6 +20,7 @@ import panda from "./hooks/panda"; import persona from "./hooks/persona"; import androidFingerprints from "./utils/android/fingerprints"; import appOrigin from "./utils/appOrigin"; +import { closeQueue as closeAccountQueue } from "./utils/createCredential"; import { close as closeRedis } from "./utils/redis"; import { closeAndFlush as closeSegment } from "./utils/segment"; @@ -319,10 +320,10 @@ export default app; const server = serve(app); -export async function close() { +export function close() { return new Promise((resolve, reject) => { server.close((error) => { - Promise.allSettled([closeSentry(), closeRedis(), closeSegment(), database.$client.end()]) + Promise.allSettled([closeSentry(), closeRedis(), closeSegment(), database.$client.end(), closeAccountQueue()]) .then((results) => { if (error) reject(error); else if (results.some((result) => result.status === "rejected")) reject(new Error("closing services failed")); diff --git a/server/test/api/auth.test.ts b/server/test/api/auth.test.ts index 210549302..1354e0f5e 100644 --- a/server/test/api/auth.test.ts +++ b/server/test/api/auth.test.ts @@ -3,6 +3,7 @@ import "../expect"; import customer from "../mocks/sardine"; import "../mocks/sentry"; +import { captureException } from "@sentry/node"; import { verifyAuthenticationResponse, verifyRegistrationResponse } from "@simplewebauthn/server"; import { eq } from "drizzle-orm"; import { testClient } from "hono/testing"; @@ -19,6 +20,7 @@ import { Address } from "@exactly/common/validation"; import app, { type Authentication } from "../../api/auth/authentication"; import registrationApp from "../../api/auth/registration"; import database, { credentials } from "../../database"; +import { WebhookNotReadyError } from "../../utils/createCredential"; import * as publicClient from "../../utils/publicClient"; import redis from "../../utils/redis"; @@ -29,6 +31,14 @@ import type * as ViemSiwe from "viem/siwe"; const appClient = testClient(app); const registrationAppClient = testClient(registrationApp); +const mocks = vi.hoisted(() => ({ activityWebhookId: "activity" as string | undefined })); + +vi.mock("../../hooks/activity", () => ({ + get webhookId() { + return mocks.activityWebhookId; + }, +})); + describe("authentication", () => { beforeAll(async () => { await database.insert(credentials).values([ @@ -49,23 +59,12 @@ describe("authentication", () => { afterEach(async () => { vi.clearAllMocks(); + mocks.activityWebhookId = "activity"; await redis.del("test-session"); }); it("returns intercom token on successful login", async () => { - const response = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const response = await postAuthenticationWebauthn(); expect(response.status).toBe(200); @@ -87,38 +86,14 @@ describe("authentication", () => { it("returns 400 if authentication challenge is missing", async () => { await redis.del("test-session"); - const response = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const response = await postAuthenticationWebauthn(); expect(response.status).toBe(400); expect(await response.json()).toEqual(expect.objectContaining({ code: "no authentication" })); }); it("returns 400 for missing credential with non-siwe assertion", async () => { - const response = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - rawId: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const response = await postAuthenticationWebauthn("bWlzc2luZy1jcmVk"); // cspell:ignore Wlzc expect(response.status).toBe(400); expect(await response.json()).toEqual(expect.objectContaining({ code: "no credential" })); @@ -126,32 +101,8 @@ describe("authentication", () => { }); it("consumes challenge after failed authentication to prevent replay", async () => { - const firstResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - rawId: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); - const secondResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - rawId: "bWlzc2luZy1jcmVk", // cspell:ignore Wlzc - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const firstResponse = await postAuthenticationWebauthn("bWlzc2luZy1jcmVk"); // cspell:ignore Wlzc + const secondResponse = await postAuthenticationWebauthn("bWlzc2luZy1jcmVk"); expect(firstResponse.status).toBe(400); expect(await firstResponse.json()).toEqual(expect.objectContaining({ code: "no credential" })); @@ -162,32 +113,8 @@ describe("authentication", () => { it("consumes challenge before verifier exceptions", async () => { vi.mocked(verifyAuthenticationResponse).mockRejectedValueOnce(new Error("boom")); - const firstResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); - const secondResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const firstResponse = await postAuthenticationWebauthn(); + const secondResponse = await postAuthenticationWebauthn(); expect(firstResponse.status).toBe(500); expect(await firstResponse.json()).toEqual(expect.objectContaining({ code: "ouch" })); @@ -201,32 +128,8 @@ describe("authentication", () => { authenticationInfo: { credentialID: "dGVzdC1jcmVkLWlk", newCounter: 1 }, } as Awaited>); - const firstResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); - const secondResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const firstResponse = await postAuthenticationWebauthn(); + const secondResponse = await postAuthenticationWebauthn(); expect(firstResponse.status).toBe(400); expect(await firstResponse.json()).toEqual(expect.objectContaining({ code: "bad authentication" })); @@ -240,32 +143,8 @@ describe("authentication", () => { authenticationInfo: { credentialID: "another-credential", newCounter: 1 }, } as Awaited>); - const firstResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); - const secondResponse = await appClient.index.$post( - { - json: { - method: "webauthn", - id: "dGVzdC1jcmVkLWlk", - rawId: "dGVzdC1jcmVkLWlk", - response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, - clientExtensionResults: {}, - type: "public-key", - }, - }, - { headers: { cookie: "session_id=test-session" } }, - ); + const firstResponse = await postAuthenticationWebauthn(); + const secondResponse = await postAuthenticationWebauthn(); expect(firstResponse.status).toBe(400); expect(await firstResponse.json()).toEqual(expect.objectContaining({ code: "bad authentication" })); @@ -376,6 +255,24 @@ describe("authentication", () => { expect(secondResponse.status).toBe(400); expect(await secondResponse.json()).toEqual(expect.objectContaining({ code: "no authentication" })); }); + + it("returns 503 when webhook not ready for new siwe credential", async () => { + mocks.activityWebhookId = undefined; + vi.spyOn(publicClient.default, "verifySiweMessage").mockResolvedValue(true); + const id = "0x1234567890123456789012345678901234567899"; + + const response = await appClient.index.$post( + { json: { method: "siwe", id, signature: "0xdeadbeef" } }, + { headers: { cookie: "session_id=test-session" } }, + ); + + expect(response.status).toBe(503); + expect(await response.json()).toStrictEqual({ code: "service unavailable" }); + expect(vi.mocked(captureException)).toHaveBeenCalledWith(expect.any(WebhookNotReadyError), { + level: "warning", + tags: { retriable: true }, + }); + }); }); describe("registration", () => { @@ -580,36 +477,19 @@ vi.mock("@simplewebauthn/server", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, - verifyAuthenticationResponse: vi - .fn<() => Promise<{ authenticationInfo: { credentialID: string; newCounter: number }; verified: boolean }>>() - .mockResolvedValue({ + verifyAuthenticationResponse: vi.fn().mockResolvedValue({ + verified: true, + authenticationInfo: { credentialID: "dGVzdC1jcmVkLWlk", newCounter: 1 }, + }), + verifyRegistrationResponse: vi.fn().mockImplementation((options: { response: { id: string } }) => + Promise.resolve({ verified: true, - authenticationInfo: { credentialID: "dGVzdC1jcmVkLWlk", newCounter: 1 }, + registrationInfo: { + credential: { id: options.response.id, publicKey: new Uint8Array(65), counter: 0, transports: ["internal"] }, + credentialDeviceType: "multiDevice", + }, }), - verifyRegistrationResponse: vi - .fn< - (options: { response: { id: string } }) => Promise<{ - registrationInfo: { - credential: { counter: number; id: string; publicKey: Uint8Array; transports: string[] }; - credentialDeviceType: string; - }; - verified: boolean; - }> - >() - .mockImplementation((options: { response: { id: string } }) => - Promise.resolve({ - verified: true, - registrationInfo: { - credential: { - id: options.response.id, - publicKey: new Uint8Array(65), - counter: 0, - transports: ["internal"], - }, - credentialDeviceType: "multiDevice", - }, - }), - ), + ), }; }); @@ -646,6 +526,22 @@ vi.mock("viem/siwe", async (importOriginal) => { }; }); +function postAuthenticationWebauthn(id = "dGVzdC1jcmVkLWlk") { + return appClient.index.$post( + { + json: { + method: "webauthn", + id, + rawId: id, + response: { clientDataJSON: "dGVzdA", authenticatorData: "dGVzdA", signature: "dGVzdA" }, + clientExtensionResults: {}, + type: "public-key", + }, + }, + { headers: { cookie: "session_id=test-session" } }, + ); +} + type RegistrationWebauthnAssertion = { clientExtensionResults: Record; id: string; @@ -658,9 +554,7 @@ type RegistrationWebauthnAssertionOverride = Partial; }; -function registrationWebauthnAssertion( - override: RegistrationWebauthnAssertionOverride = {}, -): RegistrationWebauthnAssertion { +function registrationWebauthnAssertion(override: RegistrationWebauthnAssertionOverride = {}) { const base: RegistrationWebauthnAssertion = { id: "dGVzdC1jcmVkLWlk2", rawId: "dGVzdC1jcmVkLWlk2", diff --git a/server/test/hooks/activity.test.ts b/server/test/hooks/activity.test.ts index 7fd209720..1916d432b 100644 --- a/server/test/hooks/activity.test.ts +++ b/server/test/hooks/activity.test.ts @@ -1,4 +1,4 @@ -import "../mocks/alchemy"; +import { findWebhook as findWebhookMock } from "../mocks/alchemy"; import "../mocks/deployments"; import "../mocks/keeper"; import "../mocks/onesignal"; @@ -1177,3 +1177,23 @@ const mockERC20Abi = [ stateMutability: "nonpayable", }, ] as const; + +describe("webhook initialization", () => { + beforeEach(() => vi.resetModules()); + + it("sets webhookId when existing hook is found", async () => { + vi.mocked(findWebhookMock).mockResolvedValueOnce({ id: "existing-hook-id", signing_key: "existing-signing-key" }); + const activity = await import("../../hooks/activity"); + await vi.waitUntil(() => activity.webhookId === "existing-hook-id", 5000); + expect(activity.webhookId).toBe("existing-hook-id"); + }); + + it("captures exception when webhook initialization fails", async () => { + const error = new Error("alchemy error"); + vi.mocked(findWebhookMock).mockRejectedValueOnce(error); + const { captureException: ce } = await import("@sentry/node"); + await import("../../hooks/activity"); + await vi.waitUntil(() => vi.mocked(ce).mock.calls.some(([error_]) => error_ === error), 5000); + expect(ce).toHaveBeenCalledWith(error, { level: "error" }); + }); +}); diff --git a/server/test/mocks/alchemy.ts b/server/test/mocks/alchemy.ts index fdbf58272..12a11d506 100644 --- a/server/test/mocks/alchemy.ts +++ b/server/test/mocks/alchemy.ts @@ -1,10 +1,16 @@ import { validator } from "hono/validator"; import { vi } from "vitest"; +const { findWebhook, createWebhook } = vi.hoisted(() => ({ + findWebhook: vi.fn().mockResolvedValue({ id: "activity", signing_key: "mock-signing-key" }), + createWebhook: vi.fn().mockResolvedValue({ id: "mock-webhook-id", signing_key: "mock-signing-key" }), +})); + vi.mock("../../utils/alchemy", async (importOriginal) => ({ ...(await importOriginal()), headerValidator: () => validator("header", () => undefined), - findWebhook: () => Promise.resolve(), - createWebhook: () => Promise.resolve({ id: "mock-webhook-id", signing_key: "mock-signing-key" }), - updateWebhookAddresses: () => Promise.resolve(), + findWebhook, + createWebhook, })); + +export { createWebhook, findWebhook }; diff --git a/server/test/mocks/redis.ts b/server/test/mocks/redis.ts new file mode 100644 index 000000000..31330e3db --- /dev/null +++ b/server/test/mocks/redis.ts @@ -0,0 +1,13 @@ +import Redis from "ioredis-mock"; + +import { vi } from "vitest"; + +vi.mock("ioredis", () => ({ default: Redis, Redis })); +vi.mock("bullmq", () => ({ + Queue: vi.fn(function () { + return { add: vi.fn().mockResolvedValue({}), close: vi.fn().mockResolvedValue(undefined) }; // eslint-disable-line unicorn/no-useless-undefined + }), + Worker: vi.fn(function () { + return { on: vi.fn().mockReturnThis(), close: vi.fn().mockResolvedValue(undefined) }; // eslint-disable-line unicorn/no-useless-undefined + }), +})); diff --git a/server/test/mocks/sentry.ts b/server/test/mocks/sentry.ts index 547ca9aca..1cb59fae9 100644 --- a/server/test/mocks/sentry.ts +++ b/server/test/mocks/sentry.ts @@ -1,6 +1,8 @@ import "../../instrument.cjs"; import { close } from "@sentry/node"; -import { afterAll } from "vitest"; +import { afterAll, vi } from "vitest"; + +vi.mock("@sentry/node", { spy: true }); afterAll(() => close()); diff --git a/server/test/utils/createCredential.test.ts b/server/test/utils/createCredential.test.ts new file mode 100644 index 000000000..47c5890d1 --- /dev/null +++ b/server/test/utils/createCredential.test.ts @@ -0,0 +1,156 @@ +import "../mocks/sardine"; +import "../mocks/sentry"; + +import { captureException, startSpan } from "@sentry/node"; +import { eq } from "drizzle-orm"; +import { Hono } from "hono"; +import { afterAll, beforeEach, describe, expect, it, vi } from "vitest"; + +import database, { credentials } from "../../database"; +import createCredential, { closeQueue, queue, WebhookNotReadyError, worker } from "../../utils/createCredential"; +import { close as closeRedis } from "../../utils/redis"; + +const mocks = vi.hoisted<{ webhookId: { value: string | undefined } }>(() => ({ + webhookId: { value: "webhook-id" }, +})); + +vi.mock("hono/cookie", () => ({ setSignedCookie: vi.fn() })); +vi.mock("../../utils/segment", () => ({ identify: vi.fn() })); +vi.mock("../../utils/authSecret", () => ({ default: "secret" })); +vi.mock("../../utils/alchemy", () => ({ headers: { "X-Alchemy-Token": "mock-token" } })); +vi.mock("../../hooks/activity", () => ({ + get webhookId() { + return mocks.webhookId.value; + }, +})); + +vi.spyOn(globalThis, "fetch").mockResolvedValue(new Response("{}")); + +const credentialId = "0x1234567890123456789012345678901234567888"; + +function credential() { + return new Hono() + .onError((error) => { + throw error; + }) + .post("/", async (c) => { + await createCredential(c, credentialId); + return c.body(null); + }) + .request("/", { method: "POST" }); +} + +function jobDone(name: string, data: { account: string; webhookId: string }) { + return new Promise((resolve, reject) => { + const completed = (job: { data: { account: string; webhookId: string }; name: string }) => { + if (job.name !== name || job.data.account !== data.account || job.data.webhookId !== data.webhookId) return; + cleanup(); + resolve(); + }; + const failed = (job: undefined | { data: { account: string; webhookId: string }; name: string }, error: Error) => { + if (job?.name !== name || job.data.account !== data.account || job.data.webhookId !== data.webhookId) return; + cleanup(); + reject(error); + }; + const cleanup = () => { + worker.off("completed", completed); + worker.off("failed", failed); + }; + worker.on("completed", completed); + worker.on("failed", failed); + queue.add(name, data, { attempts: 1 }).catch((error: unknown) => { + cleanup(); + reject(error instanceof Error ? error : new Error("queue add failed", { cause: error })); + }); + }); +} + +describe("createCredential", () => { + afterAll(async () => { + await database.delete(credentials).where(eq(credentials.id, credentialId)); + await closeQueue(); + await closeRedis(); + }); + + beforeEach(async () => { + vi.clearAllMocks(); + mocks.webhookId.value = "webhook-id"; + await database.delete(credentials).where(eq(credentials.id, credentialId)); + }); + + it("adds job to queue when credential is created", async () => { + await credential(); + + await vi.waitFor(() => { + const [, init] = vi.mocked(fetch).mock.calls[0] ?? []; + expect(init).toMatchObject({ method: "PATCH", headers: { "X-Alchemy-Token": "mock-token" } }); + expect(init?.body).toContain("webhook-id"); + }); + expect(vi.mocked(captureException)).not.toHaveBeenCalled(); + }); + + it("throws WebhookNotReadyError when webhookId is undefined", async () => { + mocks.webhookId.value = undefined; + + await expect(credential()).rejects.toThrow(WebhookNotReadyError); + expect(fetch).not.toHaveBeenCalled(); + expect(vi.mocked(captureException)).not.toHaveBeenCalled(); + }); + + it("captures exception when queue.add fails", async () => { + const error = new Error("queue error"); + const addSpy = vi.spyOn(queue, "add").mockRejectedValueOnce(error); + + await credential(); + await vi.waitFor(() => { + expect(vi.mocked(captureException).mock.calls[0]?.[0]).toBe(error); + expect(vi.mocked(captureException).mock.calls[0]?.[1]).toMatchObject({ + level: "error", + extra: { job: "create", webhookId: "webhook-id" }, + }); + }); + expect(fetch).not.toHaveBeenCalled(); + addSpy.mockRestore(); + }); + + it("calls Alchemy API to update webhook addresses", async () => { + await jobDone("create", { account: "0x123", webhookId: "hook-123" }); + + const [, init] = vi.mocked(fetch).mock.calls[0] ?? []; + expect(init).toMatchObject({ + method: "PATCH", + headers: { "X-Alchemy-Token": "mock-token" }, + body: JSON.stringify({ webhook_id: "hook-123", addresses_to_add: ["0x123"], addresses_to_remove: [] }), + }); + expect(vi.mocked(startSpan)).toHaveBeenCalledWith( + expect.objectContaining({ name: "credential.processor", op: "queue.process" }), + expect.any(Function), + ); + expect(vi.mocked(captureException)).not.toHaveBeenCalled(); + }); + + it("throws for unknown job names", async () => { + await expect(jobDone("unknown", { account: "0x123", webhookId: "hook-123" })).rejects.toThrow( + "Unknown job name: unknown", + ); + expect(fetch).not.toHaveBeenCalled(); + expect(vi.mocked(captureException).mock.calls[0]?.[0]).toBeInstanceOf(Error); + expect(vi.mocked(captureException).mock.calls[0]?.[1]).toMatchObject({ + level: "error", + extra: { job: { account: "0x123", webhookId: "hook-123" } }, + }); + }); + + it("throws when Alchemy API call fails", async () => { + vi.spyOn(globalThis, "fetch").mockResolvedValueOnce(new Response("Internal Server Error", { status: 500 })); + + await expect(jobDone("create", { account: "0x123", webhookId: "hook-123" })).rejects.toThrow( + "500 Internal Server Error", + ); + expect(vi.mocked(captureException).mock.calls[0]?.[0]).toBeInstanceOf(Error); + expect(vi.mocked(captureException).mock.calls[0]?.[1]).toMatchObject({ + level: "error", + extra: { job: { account: "0x123", webhookId: "hook-123" } }, + }); + }); +}); diff --git a/server/utils/alchemy.ts b/server/utils/alchemy.ts index fc9a7b10c..fda045864 100644 --- a/server/utils/alchemy.ts +++ b/server/utils/alchemy.ts @@ -9,8 +9,6 @@ import chain from "@exactly/common/generated/chain"; import ServiceError from "./ServiceError"; import verifySignature from "./verifySignature"; -import type { Address } from "@exactly/common/validation"; - if (!process.env.ALCHEMY_WEBHOOKS_KEY) throw new Error("missing alchemy webhooks key"); export const headers = { "Content-Type": "application/json", "X-Alchemy-Token": process.env.ALCHEMY_WEBHOOKS_KEY }; @@ -55,16 +53,6 @@ export async function createWebhook( return parse(WebhookResponse, await create.json()).data; } -export async function updateWebhookAddresses(id: string | undefined, add: Address[], remove: Address[] = []) { - if (!id) return; - const update = await fetch("https://dashboard.alchemy.com/api/update-webhook-addresses", { - headers, - method: "PATCH", - body: JSON.stringify({ webhook_id: id, addresses_to_add: add, addresses_to_remove: remove }), - }); - if (!update.ok) throw new ServiceError("Alchemy", update.status, await update.text()); -} - const Webhook = object({ id: string(), network: pipe( diff --git a/server/utils/createCredential.ts b/server/utils/createCredential.ts index 9d6652ebe..5eb699a50 100644 --- a/server/utils/createCredential.ts +++ b/server/utils/createCredential.ts @@ -1,4 +1,6 @@ -import { captureException, setUser } from "@sentry/core"; +import { SPAN_STATUS_ERROR } from "@sentry/core"; +import { addBreadcrumb, captureException, setUser, startSpan } from "@sentry/node"; +import { Queue, Worker, type Job } from "bullmq"; import { setSignedCookie } from "hono/cookie"; import { parse } from "valibot"; import { hexToBytes, isAddress } from "viem"; @@ -9,9 +11,10 @@ import domain from "@exactly/common/domain"; import { exaAccountFactoryAddress } from "@exactly/common/generated/chain"; import { Address } from "@exactly/common/validation"; -import { updateWebhookAddresses } from "./alchemy"; +import { headers } from "./alchemy"; import authSecret from "./authSecret"; import decodePublicKey from "./decodePublicKey"; +import { queue as redisConnection } from "./redis"; import { customer } from "./sardine"; import { identify } from "./segment"; import database from "../database"; @@ -26,6 +29,8 @@ export default async function createCredential( credentialId: C, options?: { source?: string; webauthn?: WebAuthnCredential }, ) { + if (!webhookId) throw new WebhookNotReadyError(); + const publicKey = options?.webauthn?.publicKey ?? (isAddress(credentialId) ? new Uint8Array(hexToBytes(credentialId)) : undefined); if (!publicKey) throw new Error("bad credential"); @@ -45,6 +50,7 @@ export default async function createCredential( source: options?.source, }, ]); + await Promise.all([ setSignedCookie(c, "credential_id", credentialId, authSecret, { expires, @@ -53,7 +59,6 @@ export default async function createCredential( ? { sameSite: "lax", secure: false } : { domain, sameSite: "none", secure: true, partitioned: true }), }), - updateWebhookAddresses(webhookId, [account]).catch((error: unknown) => captureException(error)), customer({ flow: { name: "signup", type: "signup" }, customer: { @@ -62,6 +67,78 @@ export default async function createCredential( }, }).catch((error: unknown) => captureException(error, { level: "error" })), ]); + + queue.add("create", { account, webhookId }).catch((error: unknown) => + captureException(error, { + level: "error", + extra: { job: "create", account, webhookId, credentialId }, + }), + ); + identify({ userId: account }); return { credentialId, factory: parse(Address, exaAccountFactoryAddress), x, y, auth: expires.getTime() }; } + +const queueName = "account"; + +export const queue = new Queue(queueName, { connection: redisConnection }); + +export const worker = new Worker( + queueName, + (job: Job<{ account: Address; webhookId: string }>) => + startSpan( + { name: "credential.processor", op: "queue.process", attributes: { job: job.name, ...job.data } }, + async (span) => { + switch (job.name) { + case "create": { + const response = await fetch("https://dashboard.alchemy.com/api/update-webhook-addresses", { + method: "PATCH", + headers, + body: JSON.stringify({ + webhook_id: job.data.webhookId, + addresses_to_add: [job.data.account], + addresses_to_remove: [], + }), + }); + if (!response.ok) { + const text = await response.text(); + span.setStatus({ code: SPAN_STATUS_ERROR, message: text }); + throw new Error(`${response.status} ${text}`); + } + break; + } + default: { + const message = `Unknown job name: ${job.name}`; + span.setStatus({ code: SPAN_STATUS_ERROR, message }); + throw new Error(message); + } + } + }, + ), + { connection: redisConnection, limiter: { max: 10, duration: 1000 } }, +); + +worker + .on("failed", (job, error) => { + captureException(error, { level: "error", extra: { job: job?.data } }); + }) + .on("completed", (job) => { + addBreadcrumb({ category: "queue", message: `Job ${job.id} completed`, level: "info", data: { job: job.data } }); + }) + .on("active", (job) => { + addBreadcrumb({ category: "queue", message: `Job ${job.id} active`, level: "info", data: { job: job.data } }); + }) + .on("error", (error) => { + captureException(error, { level: "error", tags: { queue: queueName } }); + }); + +export async function closeQueue() { + await Promise.all([worker.close(), queue.close()]); +} + +export class WebhookNotReadyError extends Error { + constructor() { + super("alchemy webhook not initialized yet, retry credential creation"); + this.name = "WebhookNotReadyError"; + } +}