Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Add a partial index for the Ledger Monitor pending-payment scan.
*
* The poller filters on status + created_at and excludes soft-deleted rows,
* so this partial index lets Postgres satisfy that query without scanning the
* broader payments indexes.
*/

export async function up(knex) {
await knex.raw(
"CREATE INDEX IF NOT EXISTS payments_pending_created_idx ON payments(status, created_at ASC) WHERE deleted_at IS NULL",
);

console.log("✓ Added payments_pending_created_idx for Ledger Monitor polling");
}

export async function down(knex) {
await knex.raw("DROP INDEX IF EXISTS payments_pending_created_idx");
console.log("✓ Removed payments_pending_created_idx");
}
35 changes: 33 additions & 2 deletions backend/src/lib/horizon-poller.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ import {
const POLL_INTERVAL_MS = 15_000; // 15 seconds between normal cycles
const BATCH_SIZE = 50; // max pending payments per cycle
const MAX_AGE_HOURS = 24; // ignore payments older than 24 h (likely abandoned)
const MERCHANT_NOTIFICATION_FIELDS = [
"webhook_secret",
"webhook_version",
"notification_email",
"email",
"business_name",
"webhook_custom_headers",
].join(", ");

/** Back-off schedule (ms) applied after consecutive Horizon fetch failures. */
const BACKOFF_DELAYS_MS = [5_000, 15_000, 30_000, 60_000];
Expand Down Expand Up @@ -152,10 +160,11 @@ async function pollPendingPayments() {

const { data: pending, error } = await supabase
.from("payments")
.select("id, amount, asset, asset_issuer, recipient, memo, memo_type, webhook_url, created_at, merchant_id, merchants(webhook_secret, webhook_version, notification_email, email, business_name, webhook_custom_headers)")
.select("id, amount, asset, asset_issuer, recipient, memo, memo_type, webhook_url, created_at, merchant_id, metadata")
.eq("status", "pending")
.is("deleted_at", null)
.gte("created_at", cutoff)
.order("created_at", { ascending: true })
.limit(BATCH_SIZE);

if (error) {
Expand Down Expand Up @@ -489,7 +498,7 @@ async function checkPayment(payment) {
}

// Webhook
const merchant = payment.merchants;
const merchant = await loadMerchantNotificationConfig(payment.merchant_id);
if (merchant) {
const webhookPayload = getPayloadForVersion(
merchant.webhook_version || "v1",
Expand Down Expand Up @@ -544,3 +553,25 @@ async function checkPayment(payment) {
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function loadMerchantNotificationConfig(merchantId) {
if (!merchantId) {
return null;
}

const { data, error } = await supabase
.from("merchants")
.select(MERCHANT_NOTIFICATION_FIELDS)
.eq("id", merchantId)
.maybeSingle();

if (error) {
logger.warn(
{ err: error, merchantId },
"Horizon poller: failed to load merchant notification config",
);
return null;
}

return data ?? null;
}
119 changes: 103 additions & 16 deletions backend/src/lib/horizon-poller.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ function makePayment(overrides = {}) {
created_at: new Date(Date.now() - 5_000).toISOString(),
merchant_id: "merchant-001",
metadata: {},
merchants: {
webhook_secret: "secret",
webhook_version: "v1",
notification_email: "merchant@example.com",
email: "merchant@example.com",
business_name: "Test Merchant",
webhook_custom_headers: {},
},
...overrides,
};
}

function makeMerchant(overrides = {}) {
return {
webhook_secret: "secret",
webhook_version: "v1",
notification_email: "merchant@example.com",
email: "merchant@example.com",
business_name: "Test Merchant",
webhook_custom_headers: {},
...overrides,
};
}
Expand Down Expand Up @@ -181,9 +185,10 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
const payment = makePayment();

// The poller makes 3 calls to supabase.from("payments"):
// 1. Fetch pending payments (select + limit)
// 1. Fetch pending payments (select + order + limit)
// 2. Duplicate-tx guard (select + neq + maybeSingle → null)
// 3. Atomic update (update + maybeSingle → { id })
// 4. Merchant notification config lookup
let fromCallCount = 0;
mockSupabaseFrom.mockImplementation(() => {
fromCallCount += 1;
Expand All @@ -194,6 +199,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
};
}
Expand All @@ -207,14 +213,21 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
maybeSingle: vi.fn().mockResolvedValue({ data: null, error: null }),
};
}
// Atomic update — success
if (fromCallCount === 3) {
// Atomic update — success
return {
update: vi.fn().mockReturnValue({
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
select: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: { id: payment.id }, error: null }),
}),
};
}
return {
update: vi.fn().mockReturnValue({
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
select: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: { id: payment.id }, error: null }),
}),
select: vi.fn().mockReturnThis(),
eq: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: makeMerchant(), error: null }),
};
});

Expand Down Expand Up @@ -256,6 +269,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
}));

Expand Down Expand Up @@ -287,6 +301,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
}));

Expand Down Expand Up @@ -314,6 +329,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
}));

Expand Down Expand Up @@ -356,6 +372,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: null, error: { message: "DB down" } }),
}));

Expand All @@ -379,6 +396,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockImplementation(() => {
callCount += 1;
if (callCount === 1) {
Expand Down Expand Up @@ -422,6 +440,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
update: updateMock,
}));
Expand Down Expand Up @@ -466,6 +485,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
update: updateMock,
}));
Expand Down Expand Up @@ -503,6 +523,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
}));

Expand All @@ -519,6 +540,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
}));

Expand All @@ -540,6 +562,7 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
neq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
// Duplicate check returns an existing payment
maybeSingle: vi.fn().mockResolvedValue({ data: { id: "other-pay" }, error: null }),
Expand Down Expand Up @@ -574,9 +597,73 @@ describe("Ledger Monitor — error recovery (Issue #627)", () => {
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [], error: null }),
}));

describe("merchant notification lookup", () => {
it("continues confirmation when merchant notification config lookup fails", async () => {
const payment = makePayment();

let fromCallCount = 0;
mockSupabaseFrom.mockImplementation(() => {
fromCallCount += 1;
if (fromCallCount === 1) {
return {
select: vi.fn().mockReturnThis(),
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
gte: vi.fn().mockReturnThis(),
order: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue({ data: [payment], error: null }),
};
}
if (fromCallCount === 2) {
return {
select: vi.fn().mockReturnThis(),
eq: vi.fn().mockReturnThis(),
neq: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: null, error: null }),
};
}
if (fromCallCount === 3) {
return {
update: vi.fn().mockReturnValue({
eq: vi.fn().mockReturnThis(),
is: vi.fn().mockReturnThis(),
select: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: { id: payment.id }, error: null }),
}),
};
}

return {
select: vi.fn().mockReturnThis(),
eq: vi.fn().mockReturnThis(),
maybeSingle: vi.fn().mockResolvedValue({ data: null, error: { message: "merchant lookup failed" } }),
};
});

mockFindMatchingPayment.mockResolvedValue({
id: "op-1",
transaction_hash: "tx-abc",
received_amount: "10.0000000",
});
mockVerifyTransactionSignature.mockResolvedValue({
valid: true,
reason: "ok",
isMultiSig: false,
signatureCount: 1,
thresholdMet: true,
});

await pollOnce();

expect(mockPaymentConfirmedCounter.inc).toHaveBeenCalledWith({ asset: payment.asset });
expect(mockSendWebhook).not.toHaveBeenCalled();
});
});

await pollOnce();

expect(mockFindMatchingPayment).not.toHaveBeenCalled();
Expand Down
31 changes: 28 additions & 3 deletions backend/src/lib/rate-limit.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import rateLimit from "express-rate-limit";
import { createHash } from "node:crypto";
import rateLimit, { ipKeyGenerator } from "express-rate-limit";
import { RedisStore } from "rate-limit-redis";

export const RATE_LIMIT_REDIS_PREFIX = "rl:";
export const VERIFY_PAYMENT_RATE_LIMIT_WINDOW_MS = 60 * 1000;
export const VERIFY_PAYMENT_RATE_LIMIT_MAX = 30;

function setStandardRateLimitHeaders(res, rateLimitState) {
if (!res || !rateLimitState) {
Expand Down Expand Up @@ -34,18 +37,40 @@ export function createRedisRateLimitStore({
});
}

export function getVerifyPaymentRateLimitKey(req) {
const paymentId =
typeof req?.params?.id === "string" && req.params.id.length > 0
? req.params.id
: "unknown-payment";
const merchantId =
typeof req?.merchant?.id === "string" && req.merchant.id.length > 0
? `merchant:${req.merchant.id}`
: null;
const apiKey =
typeof req?.headers?.["x-api-key"] === "string" &&
req.headers["x-api-key"].length > 0
? `api:${createHash("sha256").update(req.headers["x-api-key"]).digest("hex")}`
: null;
const ipKey = ipKeyGenerator(req?.ip ?? req?.socket?.remoteAddress ?? "unknown-ip");
const actor = merchantId ?? apiKey ?? `ip:${ipKey}`;

return `${paymentId}:${actor}`;
}

export function createVerifyPaymentRateLimit({
store,
rateLimitFactory = rateLimit,
} = {}) {
return rateLimitFactory({
windowMs: 15 * 60 * 1000,
max: 10,
windowMs: VERIFY_PAYMENT_RATE_LIMIT_WINDOW_MS,
max: VERIFY_PAYMENT_RATE_LIMIT_MAX,
message: {
error: "Too many verification requests, please try again later.",
},
standardHeaders: true,
legacyHeaders: false,
validate: { ip: false },
keyGenerator: getVerifyPaymentRateLimitKey,
requestWasSuccessful: (req, res) => {
setStandardRateLimitHeaders(res, req.rateLimit);
return res.statusCode < 400;
Expand Down
Loading
Loading