diff --git a/docs/API.md b/docs/API.md index 4abcc55d4..f2ace4dce 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1329,6 +1329,61 @@ updates node configuration and reloads it gracefully (admin only) --- +## Get Escrow Events + +### `HTTP` GET /api/services/escrow/events? + +### `HTTP` POST /directCommand + +### `P2P` command: getEscrowEvents + +#### Description + +Returns indexed Escrow contract events. The indexer matches Escrow logs by topic hash, verifies they came from the chain's `Escrow` contract (`Deposit`/`Withdraw`/`Lock` are generic signatures), and stores one row per event in the append-only `escrow` collection keyed by `${txHash}-${logIndex}`. All filters are optional. + +#### Parameters + +| name | type | required | description | +| --------- | ------ | --------- | --------------------------------------------------------- | +| command | string | POST only | command name (`getEscrowEvents`) | +| chainId | number | | chain id | +| eventType | string | | one of `Auth, Lock, Claimed, Canceled, Deposit, Withdraw` | +| payer | string | | payer address (case-insensitive) | +| payee | string | | payee address (case-insensitive) | +| token | string | | token address (case-insensitive) | +| jobId | string | | compute job id | +| txId | string | | transaction hash | +| offset | number | | rows to skip (default 0) | +| size | number | | page size (default 100, max 250) | + +#### Request (POST /directCommand) + +```json +{ "command": "getEscrowEvents", "chainId": 8996, "eventType": "Deposit", "offset": 0, "size": 50 } +``` + +#### Response + +Every row has `id, eventType, chainId, contract, block, txHash` plus event-specific fields (`payer, payee, token, jobId, amount, expiry, proof, maxLockedAmount, maxLockSeconds, maxLockCounts`). + +```json +[ + { + "id": "0x39f3...6575-3", + "eventType": "Deposit", + "chainId": 8996, + "contract": "0x282d...a1a1", + "block": 55, + "txHash": "0x39f3...6575", + "payer": "0xbe54...ab5e", + "token": "0x282d...a1a1", + "amount": "100000000000000000000" + } +] +``` + +--- + # Compute For starters, you can find a list of algorithms in the [Ocean Algorithms repository](https://github.com/oceanprotocol/algo_dockers) and the docker images in the [Algo Dockerhub](https://hub.docker.com/r/oceanprotocol/algo_dockers/tags). diff --git a/docs/Arhitecture.md b/docs/Arhitecture.md index 8b4e47c55..80a9e9f3b 100644 --- a/docs/Arhitecture.md +++ b/docs/Arhitecture.md @@ -80,6 +80,7 @@ An off-chain, multi-chain metadata & chain events cache. It continually monitors - validates DDO, according to multiple SHACL schemas - provides proof for valid DDOs - monitors datatokens contracts & stores orders + - monitors the Escrow contract events (Auth, Lock, Claimed, Canceled, Deposit, Withdraw) and stores them for querying - allows querys for all the above - supports graceful shutdown and chain-specific reindexing diff --git a/src/@types/Escrow.ts b/src/@types/Escrow.ts index 7f9f25575..20690f042 100644 --- a/src/@types/Escrow.ts +++ b/src/@types/Escrow.ts @@ -14,3 +14,22 @@ export interface EscrowLock { expiry: BigInt token: string } + +export interface EscrowEvent { + id: string + eventType: string + chainId: number + contract: string + block: number + txHash: string + payer?: string + payee?: string + token?: string + jobId?: string + amount?: string + expiry?: string + proof?: string + maxLockedAmount?: string + maxLockSeconds?: string + maxLockCounts?: string +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index cbeb6977f..badc755f3 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -106,6 +106,18 @@ export interface QueryCommand extends Command { maxResultsPerPage?: number pageNumber?: number } + +export interface GetEscrowEventsCommand extends Command { + chainId?: number + eventType?: string + payer?: string + payee?: string + token?: string + jobId?: string + txId?: string + offset?: number + size?: number +} export interface ReindexCommand extends Command { txId: string chainId: number diff --git a/src/components/Indexer/processor.ts b/src/components/Indexer/processor.ts index 98f9d2723..171d6750a 100644 --- a/src/components/Indexer/processor.ts +++ b/src/components/Indexer/processor.ts @@ -20,6 +20,7 @@ import { NewAccessListEventProcessor, AddressAddedEventProcessor, AddressRemovedEventProcessor, + EscrowEventProcessor, ProcessorConstructor } from './processors/index.js' import { findEventByKey } from './utils.js' @@ -42,7 +43,13 @@ const EVENT_PROCESSOR_MAP: Record = { [EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor, [EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor, [EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor, - [EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor + [EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor, + [EVENTS.ESCROW_AUTH]: EscrowEventProcessor, + [EVENTS.ESCROW_LOCK]: EscrowEventProcessor, + [EVENTS.ESCROW_CLAIMED]: EscrowEventProcessor, + [EVENTS.ESCROW_CANCELED]: EscrowEventProcessor, + [EVENTS.ESCROW_DEPOSIT]: EscrowEventProcessor, + [EVENTS.ESCROW_WITHDRAW]: EscrowEventProcessor } const processorInstances = new Map() diff --git a/src/components/Indexer/processors/EscrowEventProcessor.ts b/src/components/Indexer/processors/EscrowEventProcessor.ts new file mode 100644 index 000000000..8b9c98792 --- /dev/null +++ b/src/components/Indexer/processors/EscrowEventProcessor.ts @@ -0,0 +1,112 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import { getContractAddress } from '../utils.js' +import { EVENTS } from '../../../utils/constants.js' +import { EscrowEvent } from '../../../@types/Escrow.js' +import { OceanNodeConfig } from '../../../@types/OceanNode.js' +import EscrowJson from '@oceanprotocol/contracts/artifacts/contracts/escrow/Escrow.sol/Escrow.json' with { type: 'json' } + +const escrowInterface = new Interface(EscrowJson.abi) + +const addr = (v: any): string => v?.toString().toLowerCase() +const num = (v: any): string => v?.toString() + +export class EscrowEventProcessor extends BaseEventProcessor { + private readonly escrowAddress: string + + constructor(chainId: number, config: OceanNodeConfig) { + super(chainId, config) + this.escrowAddress = getContractAddress(chainId, 'Escrow') + } + + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider, + eventName?: string + ): Promise { + try { + if ( + !this.escrowAddress || + event.address.toLowerCase() !== this.escrowAddress.toLowerCase() + ) { + return null + } + + const decoded = escrowInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const { args } = decoded + const record: EscrowEvent = { + id: `${event.transactionHash}-${event.index}`, + eventType: eventName, + chainId, + contract: event.address.toLowerCase(), + block: event.blockNumber, + txHash: event.transactionHash + } + + switch (eventName) { + case EVENTS.ESCROW_AUTH: + record.payer = addr(args.payer) + record.payee = addr(args.payee) + record.maxLockedAmount = num(args.maxLockedAmount) + record.maxLockSeconds = num(args.maxLockSeconds) + record.maxLockCounts = num(args.maxLockCounts) + break + case EVENTS.ESCROW_LOCK: + record.payer = addr(args.payer) + record.payee = addr(args.payee) + record.jobId = num(args.jobId) + record.amount = num(args.amount) + record.expiry = num(args.expiry) + record.token = addr(args.token) + break + case EVENTS.ESCROW_CLAIMED: + record.payee = addr(args.payee) + record.jobId = num(args.jobId) + record.token = addr(args.token) + record.payer = addr(args.payer) + record.amount = num(args.amount) + record.proof = args.proof?.toString() + break + case EVENTS.ESCROW_CANCELED: + record.payee = addr(args.payee) + record.jobId = num(args.jobId) + record.token = addr(args.token) + record.payer = addr(args.payer) + record.amount = num(args.amount) + break + case EVENTS.ESCROW_DEPOSIT: + case EVENTS.ESCROW_WITHDRAW: + record.payer = addr(args.payer) + record.token = addr(args.token) + record.amount = num(args.amount) + break + default: + return null + } + + const { escrow } = await this.getDatabase() + if (!escrow) return null + const result = await escrow.create(record) + INDEXER_LOGGER.logMessage( + `[Escrow] ${eventName} indexed for tx ${event.transactionHash} on chain ${chainId}` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing Escrow ${eventName} event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/index.ts b/src/components/Indexer/processors/index.ts index 418adeedb..c538c26b0 100644 --- a/src/components/Indexer/processors/index.ts +++ b/src/components/Indexer/processors/index.ts @@ -15,6 +15,7 @@ export * from './OrderStartedEventProcessor.js' export * from './NewAccessListEventProcessor.js' export * from './AddressAddedEventProcessor.js' export * from './AddressRemovedEventProcessor.js' +export * from './EscrowEventProcessor.js' export * from './BaseProcessor.js' export type ProcessorConstructor = new ( diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index e492a01a6..3b0aee4d4 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -56,6 +56,7 @@ import { PersistentStorageUploadFileHandler } from './persistentStorage.js' import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js' +import { EscrowEventsHandler } from './escrowHandler.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -208,6 +209,10 @@ export class CoreHandlersRegistry { PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, new SearchAccessListHandler(node) ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.GET_ESCROW_EVENTS, + new EscrowEventsHandler(node) + ) } public static getInstance( diff --git a/src/components/core/handler/escrowHandler.ts b/src/components/core/handler/escrowHandler.ts new file mode 100644 index 000000000..1c4155197 --- /dev/null +++ b/src/components/core/handler/escrowHandler.ts @@ -0,0 +1,57 @@ +import { CommandHandler } from './handler.js' +import { GetEscrowEventsCommand } from '../../../@types/commands.js' +import { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { Readable } from 'stream' +import { + ValidateParams, + validateCommandParameters +} from '../../httpRoutes/validateCommands.js' +import { CORE_LOGGER } from '../../../utils/logging/common.js' + +export class EscrowEventsHandler extends CommandHandler { + validate(command: GetEscrowEventsCommand): ValidateParams { + return validateCommandParameters(command, []) + } + + async handle(task: GetEscrowEventsCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) { + return validationResponse + } + try { + const database = await this.getOceanNode().getDatabase() + if (!database || !database.escrow) { + CORE_LOGGER.error('Escrow database is not available') + return { + stream: null, + status: { httpStatus: 503, error: 'Escrow database is not available' } + } + } + + const filters: Record = { + chainId: task.chainId, + eventType: task.eventType, + payer: typeof task.payer === 'string' ? task.payer.toLowerCase() : undefined, + payee: typeof task.payee === 'string' ? task.payee.toLowerCase() : undefined, + token: typeof task.token === 'string' ? task.token.toLowerCase() : undefined, + jobId: task.jobId, + txHash: task.txId + } + + let result = await database.escrow.search(filters, task.offset, task.size) + if (!result) { + result = [] + } + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200 } + } + } catch (error) { + CORE_LOGGER.error(`Error in EscrowEventsHandler: ${error.message}`) + return { + stream: null, + status: { httpStatus: 500, error: 'Unknown error: ' + error.message } + } + } + } +} diff --git a/src/components/database/BaseDatabase.ts b/src/components/database/BaseDatabase.ts index 1214e9488..f64043401 100644 --- a/src/components/database/BaseDatabase.ts +++ b/src/components/database/BaseDatabase.ts @@ -1,6 +1,7 @@ import { Schema } from '.' import { OceanNodeDBConfig } from '../../@types' import { AccessListUser } from '../../@types/AccessList.js' +import { EscrowEvent } from '../../@types/Escrow.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -152,6 +153,28 @@ export abstract class AbstractOrderDatabase { abstract delete(orderId: string): Promise } +export abstract class AbstractEscrowDatabase { + protected config: OceanNodeDBConfig + protected schema: Schema + + constructor(config: OceanNodeDBConfig, schema: Schema) { + this.config = config + this.schema = schema + } + + abstract create(event: EscrowEvent): Promise + + abstract retrieve(id: string): Promise | null> + + abstract search( + filters: Record, + offset?: number, + size?: number + ): Promise[] | null> + + abstract delete(id: string): Promise +} + export abstract class AbstractDdoDatabase { protected config: OceanNodeDBConfig protected schemas: Schema[] diff --git a/src/components/database/DatabaseFactory.ts b/src/components/database/DatabaseFactory.ts index cb6dbcb2c..baa5a2379 100644 --- a/src/components/database/DatabaseFactory.ts +++ b/src/components/database/DatabaseFactory.ts @@ -3,6 +3,7 @@ import { AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, + AbstractEscrowDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase @@ -11,6 +12,7 @@ import { ElasticsearchAccessListDatabase, ElasticsearchDdoDatabase, ElasticsearchDdoStateDatabase, + ElasticsearchEscrowDatabase, ElasticsearchIndexerDatabase, ElasticsearchLogDatabase, ElasticsearchOrderDatabase @@ -20,6 +22,7 @@ import { TypesenseAccessListDatabase, TypesenseDdoDatabase, TypesenseDdoStateDatabase, + TypesenseEscrowDatabase, TypesenseIndexerDatabase, TypesenseLogDatabase, TypesenseOrderDatabase @@ -50,7 +53,9 @@ export class DatabaseFactory { ddoStateQuery: () => new ElasticSearchDdoStateQuery(), metadataQuery: () => new ElasticSearchMetadataQuery(), accessList: (config: OceanNodeDBConfig) => - new ElasticsearchAccessListDatabase(config) + new ElasticsearchAccessListDatabase(config), + escrow: (config: OceanNodeDBConfig) => + new ElasticsearchEscrowDatabase(config, elasticSchemas.escrowSchema) }, typesense: { ddo: (config: OceanNodeDBConfig) => @@ -66,7 +71,9 @@ export class DatabaseFactory { ddoStateQuery: () => new TypesenseDdoStateQuery(), metadataQuery: () => new TypesenseMetadataQuery(), accessList: (config: OceanNodeDBConfig) => - new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema) + new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema), + escrow: (config: OceanNodeDBConfig) => + new TypesenseEscrowDatabase(config, typesenseSchemas.escrowSchema) } } @@ -142,4 +149,10 @@ export class DatabaseFactory { ): Promise { return this.createDatabase('accessList', config) } + + static createEscrowDatabase( + config: OceanNodeDBConfig + ): Promise { + return this.createDatabase('escrow', config) + } } diff --git a/src/components/database/ElasticSchemas.ts b/src/components/database/ElasticSchemas.ts index 9e2064fb4..b2b09965a 100644 --- a/src/components/database/ElasticSchemas.ts +++ b/src/components/database/ElasticSchemas.ts @@ -22,6 +22,7 @@ export type ElasticsearchSchemas = { orderSchema: ElasticsearchSchema ddoStateSchema: ElasticsearchSchema accessListSchema: ElasticsearchSchema + escrowSchema: ElasticsearchSchema } // "op_ddo_short" is a node-side index for deprecated DDOs (state !== 0). @@ -143,5 +144,30 @@ export const elasticSchemas: ElasticsearchSchemas = { } } } + }, + escrowSchema: { + index: 'escrow', + body: { + mappings: { + properties: { + id: { type: 'keyword' }, + eventType: { type: 'keyword' }, + chainId: { type: 'long' }, + contract: { type: 'keyword' }, + block: { type: 'long' }, + txHash: { type: 'keyword' }, + payer: { type: 'keyword' }, + payee: { type: 'keyword' }, + token: { type: 'keyword' }, + jobId: { type: 'keyword' }, + amount: { type: 'text' }, + expiry: { type: 'text' }, + proof: { type: 'text' }, + maxLockedAmount: { type: 'text' }, + maxLockSeconds: { type: 'text' }, + maxLockCounts: { type: 'text' } + } + } + } } } diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index 0a4876276..eb7be8b81 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -3,11 +3,13 @@ import { AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, + AbstractEscrowDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' import { AccessListUser } from '../../@types/AccessList.js' +import { EscrowEvent } from '../../@types/Escrow.js' import { createElasticsearchClientWithRetry } from './ElasticsearchConfigHelper.js' import { OceanNodeDBConfig } from '../../@types' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -476,6 +478,108 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { } } +export class ElasticsearchEscrowDatabase extends AbstractEscrowDatabase { + private provider: Client + + constructor(config: OceanNodeDBConfig, schema: ElasticsearchSchema) { + super(config, schema) + + return (async (): Promise => { + this.provider = await createElasticsearchClientWithRetry(config) + await this.initializeIndex() + return this + })() as unknown as ElasticsearchEscrowDatabase + } + + getSchema(): ElasticsearchSchema { + return this.schema as ElasticsearchSchema + } + + private async initializeIndex() { + try { + const { index } = this.getSchema() + const exists = await this.provider.indices.exists({ index }) + if (!exists) { + await this.provider.indices.create({ + index, + body: this.getSchema().body as any + }) + } + } catch (e) { + DATABASE_LOGGER.error(`Failed to create escrow index: ${e.message}`) + } + } + + async create(event: EscrowEvent) { + try { + await this.provider.index({ + index: this.getSchema().index, + id: event.id, + body: event + }) + return event + } catch (error) { + const errorMsg = `Error when creating escrow event ${event.id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji(errorMsg, true, LOG_LEVELS_STR.LEVEL_ERROR) + return null + } + } + + async retrieve(id: string) { + try { + const result = await this.provider.get({ + index: this.getSchema().index, + id + }) + return normalizeDocumentId(result._source, result._id) + } catch (error) { + const errorMsg = `Error when retrieving escrow event ${id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji(errorMsg, true, LOG_LEVELS_STR.LEVEL_ERROR) + return null + } + } + + async search(filters: Record, offset?: number, size?: number) { + try { + // clamp the page size so a single request can't return an unbounded set + const limit = Math.min(size && size > 0 ? size : 100, 250) + const from = offset && offset > 0 ? offset : 0 + + const terms = Object.entries(filters || {}) + .filter(([, value]) => value !== undefined && value !== null && value !== '') + .map(([field, value]) => ({ term: { [field]: value } })) + const query = terms.length ? { bool: { must: terms } } : { match_all: {} } + + const searchParams = { + index: this.getSchema().index, + body: { query, from, size: limit } + } + const result = await this.provider.search(searchParams) + return result.hits.hits.map((hit: any) => normalizeDocumentId(hit._source, hit._id)) + } catch (error) { + const errorMsg = + `Error when searching escrow events by ${JSON.stringify(filters)}: ` + + error.message + DATABASE_LOGGER.logMessageWithEmoji(errorMsg, true, LOG_LEVELS_STR.LEVEL_ERROR) + return null + } + } + + async delete(id: string) { + try { + await this.provider.delete({ + index: this.getSchema().index, + id + }) + return { id } + } catch (error) { + const errorMsg = `Error when deleting escrow event ${id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji(errorMsg, true, LOG_LEVELS_STR.LEVEL_ERROR) + return null + } + } +} + export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { private client: Client diff --git a/src/components/database/TypesenseDatabase.ts b/src/components/database/TypesenseDatabase.ts index b11054252..c022af99e 100644 --- a/src/components/database/TypesenseDatabase.ts +++ b/src/components/database/TypesenseDatabase.ts @@ -10,11 +10,13 @@ import { AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, + AbstractEscrowDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' import { AccessListUser } from '../../@types/AccessList.js' +import { EscrowEvent } from '../../@types/Escrow.js' import { validateDDO } from '../../utils/asset.js' import { DDOManager } from '@oceanprotocol/ddo-js' @@ -210,6 +212,133 @@ export class TypesenseOrderDatabase extends AbstractOrderDatabase { } } +export class TypesenseEscrowDatabase extends AbstractEscrowDatabase { + private provider: Typesense + + constructor(config: OceanNodeDBConfig, schema: TypesenseSchema) { + super(config, schema) + return (async (): Promise => { + this.provider = new Typesense({ + ...convertTypesenseConfig(this.config.url), + logger: DATABASE_LOGGER + }) + try { + await this.provider.collections(this.getSchema().name).retrieve() + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 404) { + await this.provider.collections().create(this.getSchema()) + } + } + return this + })() as unknown as TypesenseEscrowDatabase + } + + getSchema(): TypesenseSchema { + return this.schema as TypesenseSchema + } + + async create(event: EscrowEvent) { + try { + return await this.provider + .collections(this.getSchema().name) + .documents() + .create({ ...event }) + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 409) { + return { ...event } + } + const errorMsg = `Error when creating escrow event ${event.id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async retrieve(id: string) { + try { + return await this.provider + .collections(this.getSchema().name) + .documents() + .retrieve(id) + } catch (error) { + const errorMsg = `Error when retrieving escrow event ${id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async search(filters: Record, offset?: number, size?: number) { + try { + const filterBy = Object.entries(filters || {}) + .filter(([, value]) => value !== undefined && value !== null && value !== '') + // Backtick string values so spaces/special chars can't break the syntax; + // strip backticks from the value so it can't escape the quoted literal. + .map(([field, value]) => + typeof value === 'string' + ? `${field}:=\`${value.replace(/`/g, '')}\`` + : `${field}:=${value}` + ) + .join(' && ') + + // clamp the page size so a single request can't return an unbounded set + const limit = Math.min(size && size > 0 ? size : 100, TYPESENSE_HITS_CAP) + const from = offset && offset > 0 ? offset : 0 + + const searchParams: TypesenseSearchParams = { + q: '*', + query_by: 'eventType', + offset: from, + limit + } + if (filterBy) { + searchParams.filter_by = filterBy + } + + const result = await this.provider + .collections(this.getSchema().name) + .documents() + .search(searchParams) + + return result.hits.map((hit) => hit.document) + } catch (error) { + const errorMsg = + `Error when searching escrow events by ${JSON.stringify(filters)}: ` + + error.message + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async delete(id: string) { + try { + return await this.provider.collections(this.getSchema().name).documents().delete(id) + } catch (error) { + const errorMsg = `Error when deleting escrow event ${id}: ` + error.message + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } +} + export class TypesenseDdoStateDatabase extends AbstractDdoStateDatabase { private provider: Typesense diff --git a/src/components/database/TypesenseSchemas.ts b/src/components/database/TypesenseSchemas.ts index f928922c1..292aef8f8 100644 --- a/src/components/database/TypesenseSchemas.ts +++ b/src/components/database/TypesenseSchemas.ts @@ -54,6 +54,7 @@ export type TypesenseSchemas = { orderSchema: TypesenseSchema ddoStateSchema: TypesenseSchema accessListSchema: TypesenseSchema + escrowSchema: TypesenseSchema } const ddoSchemas = readJsonSchemas() export const typesenseSchemas: TypesenseSchemas = { @@ -143,5 +144,29 @@ export const typesenseSchemas: TypesenseSchemas = { { name: 'deploymentBlock', type: 'int64', optional: true }, { name: 'deploymentTxId', type: 'string', optional: true } ] + }, + escrowSchema: { + name: 'escrow', + enable_nested_fields: true, + fields: [ + { name: 'eventType', type: 'string', facet: true }, + { name: 'chainId', type: 'int64', facet: true }, + { name: 'contract', type: 'string' }, + { name: 'block', type: 'int64' }, + { name: 'txHash', type: 'string' }, + { name: 'payer', type: 'string', optional: true }, + { name: 'payee', type: 'string', optional: true }, + { name: 'token', type: 'string', optional: true }, + { name: 'jobId', type: 'string', optional: true }, + // uint256 values kept as raw strings to avoid precision loss + { name: 'amount', type: 'string', optional: true }, + { name: 'expiry', type: 'string', optional: true }, + // proof (Claimed event bytes) is stored but never filtered; skip indexing + // it so a large hex value can't hit Typesense's indexed-field length limit. + { name: 'proof', type: 'string', optional: true, index: false }, + { name: 'maxLockedAmount', type: 'string', optional: true }, + { name: 'maxLockSeconds', type: 'string', optional: true }, + { name: 'maxLockCounts', type: 'string', optional: true } + ] } } diff --git a/src/components/database/index.ts b/src/components/database/index.ts index 3c42f468d..b01a85c4d 100644 --- a/src/components/database/index.ts +++ b/src/components/database/index.ts @@ -9,6 +9,7 @@ import { AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, + AbstractEscrowDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase @@ -31,6 +32,7 @@ export class Database { order: AbstractOrderDatabase ddoState: AbstractDdoStateDatabase accessList: AbstractAccessListDatabase + escrow: AbstractEscrowDatabase sqliteConfig: SQLLiteConfigDatabase c2d: C2DDatabase authToken: AuthTokenDatabase @@ -110,6 +112,13 @@ export class Database { DATABASE_LOGGER.error(`AccessList database initialization failed: ${error}`) return null } + + try { + db.escrow = await DatabaseFactory.createEscrowDatabase(config) + } catch (error) { + DATABASE_LOGGER.error(`Escrow database initialization failed: ${error}`) + return null + } } else { DATABASE_LOGGER.info( 'Invalid DB URL. Only Nonce, C2D, Auth Token and Config Databases are initialized.' diff --git a/src/components/httpRoutes/escrow.ts b/src/components/httpRoutes/escrow.ts new file mode 100644 index 000000000..f22288b77 --- /dev/null +++ b/src/components/httpRoutes/escrow.ts @@ -0,0 +1,45 @@ +import express, { Request, Response } from 'express' +import { Readable } from 'stream' +import { EscrowEventsHandler } from '../core/handler/escrowHandler.js' +import { PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { streamToString } from '../../utils/util.js' +import { GetEscrowEventsCommand } from '../../@types/commands.js' + +export const escrowRoutes = express.Router() + +escrowRoutes.get( + '/api/services/escrow/events', + async (req: Request, res: Response): Promise => { + const { chainId } = req.query + let parsedChainId: number | undefined + if (chainId !== undefined) { + parsedChainId = Number(chainId) + if (Number.isNaN(parsedChainId)) { + res.status(400).send('chainId must be a number') + return + } + } + + const command: GetEscrowEventsCommand = { + command: PROTOCOL_COMMANDS.GET_ESCROW_EVENTS, + chainId: parsedChainId, + eventType: req.query.eventType ? String(req.query.eventType) : undefined, + payer: req.query.payer ? String(req.query.payer) : undefined, + payee: req.query.payee ? String(req.query.payee) : undefined, + token: req.query.token ? String(req.query.token) : undefined, + jobId: req.query.jobId ? String(req.query.jobId) : undefined, + txId: req.query.txId ? String(req.query.txId) : undefined, + offset: req.query.offset ? Number(req.query.offset) : undefined, + size: req.query.size ? Number(req.query.size) : undefined, + caller: req.caller + } + + const result = await new EscrowEventsHandler(req.oceanNode).handle(command) + if (result.stream) { + const data = JSON.parse(await streamToString(result.stream as Readable)) + res.json(data) + } else { + res.status(result.status.httpStatus).send(result.status.error) + } + } +) diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index 0706f3cba..75e06cc02 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -16,6 +16,7 @@ import { authRoutes } from './auth.js' import { adminConfigRoutes } from './adminConfig.js' import { persistentStorageRoutes } from './persistentStorage.js' import { accessListRoutes } from './accessList.js' +import { escrowRoutes } from './escrow.js' export * from './getOceanPeers.js' export * from './auth.js' @@ -67,6 +68,9 @@ httpRoutes.use(adminConfigRoutes) httpRoutes.use(persistentStorageRoutes) // access list routes httpRoutes.use(accessListRoutes) +// escrow events routes +// /api/services/escrow/events +httpRoutes.use(escrowRoutes) export function getAllServiceEndpoints() { httpRoutes.stack.forEach(addMapping.bind(null, [])) diff --git a/src/test/integration/escrow.test.ts b/src/test/integration/escrow.test.ts new file mode 100644 index 000000000..b0a2c1e4d --- /dev/null +++ b/src/test/integration/escrow.test.ts @@ -0,0 +1,257 @@ +import { assert, expect } from 'chai' +import { JsonRpcProvider, Signer, ethers, parseUnits } from 'ethers' +import { Readable } from 'stream' +import OceanToken from '@oceanprotocol/contracts/artifacts/contracts/utils/OceanToken.sol/OceanToken.json' with { type: 'json' } +import EscrowJson from '@oceanprotocol/contracts/artifacts/contracts/escrow/Escrow.sol/Escrow.json' with { type: 'json' } +import { Database } from '../../components/database/index.js' +import { OceanIndexer } from '../../components/Indexer/index.js' +import { OceanNode } from '../../OceanNode.js' +import { RPCS } from '../../@types/blockchain.js' +import { EscrowEventsHandler } from '../../components/core/handler/escrowHandler.js' +import { streamToString } from '../../utils/util.js' +import { + DEVELOPMENT_CHAIN_ID, + getOceanArtifactsAdresses, + getOceanArtifactsAdressesByChainId +} from '../../utils/address.js' +import { + ENVIRONMENT_VARIABLES, + EVENTS, + PROTOCOL_COMMANDS +} from '../../utils/constants.js' +import { + DEFAULT_TEST_TIMEOUT, + OverrideEnvConfig, + buildEnvOverrideConfig, + getMockSupportedNetworks, + setupEnvironment, + tearDownEnvironment +} from '../utils/utils.js' +import { waitForCondition } from './testUtils.js' +import { getConfiguration } from '../../utils/config.js' +import { homedir } from 'os' + +describe('Indexer stores Escrow contract events', () => { + let database: Database + let oceanNode: OceanNode + let indexer: OceanIndexer + let provider: JsonRpcProvider + let publisherAccount: Signer // payee that creates/claims locks + let consumerAccount: Signer // payer that deposits/authorizes + let payerAddress: string + let payeeAddress: string + let paymentToken: string + let escrowAddress: string + let tokenContract: any + let escrowContract: any + + const chainId = DEVELOPMENT_CHAIN_ID + const depositAmount = parseUnits('100', 18) + const lockAmount = parseUnits('10', 18) + const jobId = BigInt(Date.now()) + const expiry = 7200 + + let depositTxHash: string + let authTxHash: string + let lockTxHash: string + + const mockSupportedNetworks: RPCS = getMockSupportedNetworks() + let previousConfiguration: OverrideEnvConfig[] + + // search() returns [] (truthy) when empty, which would make waitForCondition + // resolve on the first poll; return null until a matching row is indexed. + const waitForEscrowEvents = (filters: Record) => + waitForCondition( + async () => { + const found = await database.escrow.search(filters) + return found && found.length ? found : null + }, + DEFAULT_TEST_TIMEOUT * 3 - 5000 + ) + + before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.RPCS, + ENVIRONMENT_VARIABLES.INDEXER_NETWORKS, + ENVIRONMENT_VARIABLES.PRIVATE_KEY, + ENVIRONMENT_VARIABLES.ADDRESS_FILE + ], + [ + JSON.stringify(mockSupportedNetworks), + JSON.stringify([DEVELOPMENT_CHAIN_ID]), + '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + ] + ) + ) + + const config = await getConfiguration(true) + database = await Database.init(config.dbConfig) + + const oldIndexer = OceanNode.getInstance(config, database).getIndexer() + if (oldIndexer) { + await oldIndexer.stopAllChainIndexers() + } + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) + + let artifactsAddresses = getOceanArtifactsAdressesByChainId(DEVELOPMENT_CHAIN_ID) + if (!artifactsAddresses) { + artifactsAddresses = getOceanArtifactsAdresses().development + } + escrowAddress = artifactsAddresses?.Escrow + paymentToken = artifactsAddresses?.Ocean + + provider = new JsonRpcProvider('http://127.0.0.1:8545') + publisherAccount = (await provider.getSigner(0)) as Signer + consumerAccount = (await provider.getSigner(1)) as Signer + payerAddress = await consumerAccount.getAddress() + payeeAddress = await publisherAccount.getAddress() + + const headBlock = await provider.getBlockNumber() + await database.indexer.update(chainId, headBlock) + + indexer = new OceanIndexer(database, config, oceanNode.blockchainRegistry) + oceanNode.addIndexer(indexer) + + if (escrowAddress && paymentToken) { + tokenContract = new ethers.Contract(paymentToken, OceanToken.abi, publisherAccount) + escrowContract = new ethers.Contract(escrowAddress, EscrowJson.abi, consumerAccount) + } + }) + + after(async () => { + await oceanNode.tearDownAll() + await tearDownEnvironment(previousConfiguration) + }) + + it('escrow database is available', function () { + if (!escrowAddress || !paymentToken) { + // Escrow not deployed on this chain — nothing to index. + this.skip() + } + assert(database.escrow, 'escrow database should be initialized') + }) + + it('indexes a Deposit event', async function () { + if (!escrowAddress || !paymentToken) this.skip() + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + let balance = await tokenContract.balanceOf(payerAddress) + if (BigInt(balance.toString()) < depositAmount) { + const mintTx = await tokenContract.mint(payerAddress, depositAmount) + await mintTx.wait() + balance = await tokenContract.balanceOf(payerAddress) + } + await ( + await tokenContract.connect(consumerAccount).approve(escrowAddress, depositAmount) + ).wait() + const tx = await escrowContract.deposit(paymentToken, depositAmount) + const receipt = await tx.wait() + depositTxHash = receipt.hash + + const events = await waitForEscrowEvents({ + txHash: depositTxHash, + eventType: EVENTS.ESCROW_DEPOSIT + }) + assert(events && events.length > 0, 'Deposit event should be indexed') + const event = events[0] + expect(event.eventType).to.equal(EVENTS.ESCROW_DEPOSIT) + expect(event.payer).to.equal(payerAddress.toLowerCase()) + expect(event.token).to.equal(paymentToken.toLowerCase()) + expect(event.amount).to.equal(depositAmount.toString()) + expect(event.chainId).to.equal(chainId) + }) + + it('indexes an Auth event', async function () { + if (!escrowAddress || !paymentToken) this.skip() + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + const tx = await escrowContract.authorize( + paymentToken, + payeeAddress, + depositAmount, + expiry, + 10 + ) + const receipt = await tx.wait() + authTxHash = receipt.hash + + const events = await waitForEscrowEvents({ + txHash: authTxHash, + eventType: EVENTS.ESCROW_AUTH + }) + assert(events && events.length > 0, 'Auth event should be indexed') + const event = events[0] + expect(event.payer).to.equal(payerAddress.toLowerCase()) + expect(event.payee).to.equal(payeeAddress.toLowerCase()) + expect(event.maxLockedAmount).to.equal(depositAmount.toString()) + expect(event.maxLockCounts).to.equal('10') + }) + + it('indexes a Lock event', async function () { + if (!escrowAddress || !paymentToken) this.skip() + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + const tx = await escrowContract + .connect(publisherAccount) + .createLock(jobId, paymentToken, payerAddress, lockAmount, expiry) + const receipt = await tx.wait() + lockTxHash = receipt.hash + + const events = await waitForEscrowEvents({ + txHash: lockTxHash, + eventType: EVENTS.ESCROW_LOCK + }) + assert(events && events.length > 0, 'Lock event should be indexed') + const event = events[0] + expect(event.payer).to.equal(payerAddress.toLowerCase()) + expect(event.payee).to.equal(payeeAddress.toLowerCase()) + expect(event.jobId).to.equal(jobId.toString()) + expect(event.amount).to.equal(lockAmount.toString()) + expect(event.token).to.equal(paymentToken.toLowerCase()) + }) + + it('returns indexed events through the EscrowEventsHandler (query command)', async function () { + if (!escrowAddress || !paymentToken) this.skip() + this.timeout(DEFAULT_TEST_TIMEOUT) + + const response = await new EscrowEventsHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ESCROW_EVENTS, + chainId, + eventType: EVENTS.ESCROW_DEPOSIT, + payer: payerAddress, + caller: '127.0.0.1' + }) + expect(response.status.httpStatus).to.equal(200) + assert(response.stream, 'handler should return a stream') + const result = JSON.parse(await streamToString(response.stream as Readable)) + assert(Array.isArray(result), 'result should be an array') + assert( + result.some((e: any) => e.txHash === depositTxHash), + 'query should return the indexed Deposit event' + ) + }) + + it('respects offset and size pagination', async function () { + if (!escrowAddress || !paymentToken) this.skip() + // Deposit, Auth and Lock are all indexed for this chain by now (>= 2 rows). + const page = await database.escrow.search({ chainId }, 0, 2) + assert(page && page.length === 2, 'size should cap the page to 2 rows') + + const next = await database.escrow.search({ chainId }, 1, 1) + assert(next && next.length === 1, 'offset + size should return a single row') + expect(next[0].id).to.not.equal(page[0].id) // offset advanced past the first row + }) +}) diff --git a/src/utils/constants.ts b/src/utils/constants.ts index b2d211e04..779916c70 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -46,7 +46,8 @@ export const PROTOCOL_COMMANDS = { PERSISTENT_STORAGE_GET_FILE_OBJECT: 'persistentStorageGetFileObject', PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile', GET_ACCESS_LIST: 'getAccessList', - SEARCH_ACCESS_LIST: 'searchAccessList' + SEARCH_ACCESS_LIST: 'searchAccessList', + GET_ESCROW_EVENTS: 'getEscrowEvents' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -94,7 +95,8 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, PROTOCOL_COMMANDS.GET_ACCESS_LIST, - PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST + PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + PROTOCOL_COMMANDS.GET_ESCROW_EVENTS ] export const MetadataStates = { @@ -122,7 +124,14 @@ export const EVENTS = { EXCHANGE_DEACTIVATED: 'ExchangeDeactivated', ADDRESS_ADDED: 'AddressAdded', ADDRESS_REMOVED: 'AddressRemoved', - NEW_ACCESS_LIST: 'NewAccessList' + NEW_ACCESS_LIST: 'NewAccessList', + // Escrow contract events. Values must equal the on-chain event name. + ESCROW_AUTH: 'Auth', + ESCROW_LOCK: 'Lock', + ESCROW_CLAIMED: 'Claimed', + ESCROW_CANCELED: 'Canceled', + ESCROW_DEPOSIT: 'Deposit', + ESCROW_WITHDRAW: 'Withdraw' } export const INDEXER_CRAWLING_EVENTS = { @@ -204,6 +213,30 @@ export const EVENT_HASHES: Hashes = { '0xd65bc8e3024bbad886df74eea79b6e118b7fbcffe1f3f98054e5a6b98dc83891': { type: EVENTS.NEW_ACCESS_LIST, text: 'NewAccessList(address,address)' + }, + '0x118cb6c6a02e26bfdb39cab8d70573499942c4ee3f0d7616d3c4100fe9163d9d': { + type: EVENTS.ESCROW_AUTH, + text: 'Auth(address,address,uint256,uint256,uint256)' + }, + '0xb746b0421b0b98debe76bb312ec9fb701603af22ddb107f7e639b0187e4ff880': { + type: EVENTS.ESCROW_LOCK, + text: 'Lock(address,address,uint256,uint256,uint256,address)' + }, + '0x77aeb72af8b0efaf7fd8c746d2fb78653ae489dd88dea7a851cb354e4cdc4eed': { + type: EVENTS.ESCROW_CLAIMED, + text: 'Claimed(address,uint256,address,address,uint256,bytes)' + }, + '0x5bcb66e310a3be233290f5d61fba34fe58a0e8045b4678714af2c7986f3a5e50': { + type: EVENTS.ESCROW_CANCELED, + text: 'Canceled(address,uint256,address,address,uint256)' + }, + '0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62': { + type: EVENTS.ESCROW_DEPOSIT, + text: 'Deposit(address,address,uint256)' + }, + '0x9b1bfa7fa9ee420a16e124f794c35ac9f90472acc99140eb2f6447c714cad8eb': { + type: EVENTS.ESCROW_WITHDRAW, + text: 'Withdraw(address,address,uint256)' } }