Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion nilcc-api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ async function main() {
const { app, metrics } = await buildApp(bindings);

const paymentPoller = new PaymentPoller(bindings, bindings.services.payment);
paymentPoller.start();

bindings.log.info("Starting servers ...");
const appServer = serve(
Expand All @@ -55,7 +54,13 @@ async function main() {
},
);

let shuttingDown = false;
const shutdown = async (): Promise<void> => {
if (shuttingDown) {
return;
}
shuttingDown = true;

bindings.log.info(
"Received shutdown signal. Starting graceful shutdown...",
);
Expand All @@ -80,6 +85,11 @@ async function main() {

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

void paymentPoller.start().catch(async (error) => {
bindings.log.error(error, "Failed to start payment poller");
await shutdown();
});
}

main().catch((error) => {
Expand Down
179 changes: 95 additions & 84 deletions nilcc-api/src/payment/payment-poller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Logger } from "pino";
import { createPublicClient, http } from "viem";
import type { AppBindings } from "#/env";
import { BlockCursorEntity } from "./block-cursor.entity";
import { burnWithDigestEventAbi } from "./burn-contract";
import type { PaymentService } from "./payment.service";

Expand All @@ -19,7 +18,7 @@ export class PaymentPoller {
this.log = bindings.log.child({ component: "payment-poller" });
}

start(): void {
async start(): Promise<void> {
const { rpcUrl, burnContractAddress } = this.bindings.config;
if (!rpcUrl || !burnContractAddress) {
this.log.info(
Expand All @@ -28,10 +27,20 @@ export class PaymentPoller {
return;
}

// Seed the cursor row so SELECT ... FOR UPDATE always finds a row to lock.
const seedBlock = (
BigInt(this.bindings.config.paymentStartBlock) - 1n
).toString();
await this.bindings.dataSource.query(
`INSERT INTO block_cursors (id, last_processed_block, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (id) DO NOTHING`,
[CURSOR_ID, seedBlock],
);

const intervalMs = this.bindings.config.paymentPollerIntervalMs;
this.log.info(`Starting payment poller with interval ${intervalMs}ms`);

// Run immediately, then on interval
this.poll();
this.intervalId = setInterval(() => this.poll(), intervalMs);
}
Expand Down Expand Up @@ -74,101 +83,103 @@ export class PaymentPoller {
transport: http(rpcUrl),
});

// Get block cursor
const cursorRepo =
this.bindings.dataSource.getRepository(BlockCursorEntity);
const cursor = await cursorRepo.findOneBy({ id: CURSOR_ID });
const fromBlock = cursor
? BigInt(cursor.lastProcessedBlock) + 1n
: BigInt(this.bindings.config.paymentStartBlock);

// Get current block
const currentBlock = await client.getBlockNumber();
if (fromBlock > currentBlock) {
this.log.debug(
`No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`,
const queryRunner = this.bindings.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
// Single-writer gate across replicas: blocks until any concurrent poll commits,
// then reads the already-advanced cursor and processes only the new range.
const rows: Array<{ last_processed_block: string }> =
await queryRunner.query(
`SELECT last_processed_block FROM block_cursors
WHERE id = $1 FOR UPDATE`,
[CURSOR_ID],
);
const fromBlock = BigInt(rows[0].last_processed_block) + 1n;

const currentBlock = await client.getBlockNumber();
if (fromBlock > currentBlock) {
this.log.debug(
`No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`,
);
await queryRunner.commitTransaction();
return;
}

const toBlock =
currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange)
? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n
: currentBlock;

this.log.info(
`Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`,
);
return;
}

// Clamp range
const toBlock =
currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange)
? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n
: currentBlock;
const logs = await client.getLogs({
address: burnContractAddress as `0x${string}`,
event: burnWithDigestEventAbi[0],
fromBlock,
toBlock,
});

this.log.info(
`Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`,
);
this.log.info(`Found ${logs.length} LogBurnWithDigest events`);

// Fetch logs
const logs = await client.getLogs({
address: burnContractAddress as `0x${string}`,
event: burnWithDigestEventAbi[0],
fromBlock,
toBlock,
});
let firstFailedBlock: bigint | null = null;
for (const log of logs) {
if (!log.transactionHash || !log.args.account || !log.args.amount) {
this.log.warn("Skipping malformed log entry");
continue;
}

this.log.info(`Found ${logs.length} LogBurnWithDigest events`);
try {
await this.paymentService.processEvent(this.bindings, {
txHash: log.transactionHash,
logIndex: log.logIndex ?? 0,
blockNumber: Number(log.blockNumber),
fromAddress: log.args.account,
amount: log.args.amount,
digest: log.args.digest ?? "0x",
});
} catch (e) {
if (firstFailedBlock === null) {
firstFailedBlock = log.blockNumber ?? fromBlock;
}
this.log.warn(
`Failed to process event from tx ${log.transactionHash}: ${e}`,
);
}
}

// Process each log
let firstFailedBlock: bigint | null = null;
for (const log of logs) {
if (!log.transactionHash || !log.args.account || !log.args.amount) {
this.log.warn("Skipping malformed log entry");
continue;
// If any event failed, only advance up to the block before the first failure
// so failed events are retried next tick.
let nextCursorBlock: bigint | null = toBlock;
if (firstFailedBlock !== null) {
nextCursorBlock =
firstFailedBlock <= fromBlock ? null : firstFailedBlock - 1n;
}

try {
await this.paymentService.processEvent(this.bindings, {
txHash: log.transactionHash,
logIndex: log.logIndex ?? 0,
blockNumber: Number(log.blockNumber),
fromAddress: log.args.account,
amount: log.args.amount,
digest: log.args.digest ?? "0x",
});
} catch (e) {
if (firstFailedBlock === null) {
firstFailedBlock = log.blockNumber ?? fromBlock;
}
if (nextCursorBlock === null) {
this.log.warn(
`Failed to process event from tx ${log.transactionHash}: ${e}`,
`Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`,
);
await queryRunner.commitTransaction();
return;
}
}

// Update cursor. If any event failed, only advance up to the block before
// the first failure so failed events are retried in the next poll.
let nextCursorBlock: bigint | null = toBlock;
if (firstFailedBlock !== null) {
if (firstFailedBlock <= fromBlock) {
// Would only happen if firstFailedBlock == fromBlock, other states should be impossible
nextCursorBlock = null;
} else {
nextCursorBlock = firstFailedBlock - 1n;
}
}

if (nextCursorBlock === null) {
this.log.warn(
`Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`,
await queryRunner.query(
`UPDATE block_cursors
SET last_processed_block = $1, updated_at = NOW()
WHERE id = $2`,
[nextCursorBlock.toString(), CURSOR_ID],
);
return;
}
await queryRunner.commitTransaction();

if (cursor) {
cursor.lastProcessedBlock = nextCursorBlock.toString();
cursor.updatedAt = new Date();
await cursorRepo.save(cursor);
} else {
await cursorRepo.save({
id: CURSOR_ID,
lastProcessedBlock: nextCursorBlock.toString(),
updatedAt: new Date(),
});
this.log.debug(`Updated block cursor to ${nextCursorBlock}`);
} catch (e) {
await queryRunner.rollbackTransaction();
throw e;
} finally {
await queryRunner.release();
}

this.log.debug(`Updated block cursor to ${nextCursorBlock}`);
}
}
5 changes: 5 additions & 0 deletions nilcc-api/src/payment/payment.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { QueryRunner, Repository } from "typeorm";
import { v4 as uuidv4 } from "uuid";
import type { AccountEntity } from "#/account/account.entity";
import { isUniqueConstraint } from "#/common/errors";
import {
microdollarsToUsd,
nilToMicrodollars,
Expand Down Expand Up @@ -116,6 +117,10 @@ export class PaymentService {
return payment;
} catch (e) {
await queryRunner.rollbackTransaction();
if (isUniqueConstraint(e)) {
bindings.log.info(`Payment already processed: ${event.txHash}`);
return null;
}
throw e;
} finally {
await queryRunner.release();
Expand Down
Loading