Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/@types/Escrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,22 @@ export interface EscrowLock {
expiry: BigInt
token: string
}

export interface EscrowEvent {
id: string
eventType: string
chainId: number
contract: string
block: number
txHash: string
payer?: string
payee?: string
token?: string
jobId?: string
amount?: string
expiry?: string
proof?: string
maxLockedAmount?: string
maxLockSeconds?: string
maxLockCounts?: string
}
12 changes: 12 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ export interface QueryCommand extends Command {
maxResultsPerPage?: number
pageNumber?: number
}

export interface GetEscrowEventsCommand extends Command {
chainId?: number
eventType?: string
payer?: string
payee?: string
token?: string
jobId?: string
txId?: string
maxResultsPerPage?: number
pageNumber?: number
}
export interface ReindexCommand extends Command {
txId: string
chainId: number
Expand Down
9 changes: 8 additions & 1 deletion src/components/Indexer/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
NewAccessListEventProcessor,
AddressAddedEventProcessor,
AddressRemovedEventProcessor,
EscrowEventProcessor,
ProcessorConstructor
} from './processors/index.js'
import { findEventByKey } from './utils.js'
Expand All @@ -42,7 +43,13 @@ const EVENT_PROCESSOR_MAP: Record<string, ProcessorConstructor> = {
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor,
[EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor,
[EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor,
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor,
[EVENTS.ESCROW_AUTH]: EscrowEventProcessor,
[EVENTS.ESCROW_LOCK]: EscrowEventProcessor,
[EVENTS.ESCROW_CLAIMED]: EscrowEventProcessor,
[EVENTS.ESCROW_CANCELED]: EscrowEventProcessor,
[EVENTS.ESCROW_DEPOSIT]: EscrowEventProcessor,
[EVENTS.ESCROW_WITHDRAW]: EscrowEventProcessor
}

const processorInstances = new Map<string, BaseEventProcessor>()
Expand Down
112 changes: 112 additions & 0 deletions src/components/Indexer/processors/EscrowEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import { getContractAddress } from '../utils.js'
import { EVENTS } from '../../../utils/constants.js'
import { EscrowEvent } from '../../../@types/Escrow.js'
import { OceanNodeConfig } from '../../../@types/OceanNode.js'
import EscrowJson from '@oceanprotocol/contracts/artifacts/contracts/escrow/Escrow.sol/Escrow.json' with { type: 'json' }

const escrowInterface = new Interface(EscrowJson.abi)

const addr = (v: any): string => v?.toString().toLowerCase()
const num = (v: any): string => v?.toString()

export class EscrowEventProcessor extends BaseEventProcessor {
private readonly escrowAddress: string

constructor(chainId: number, config: OceanNodeConfig) {
super(chainId, config)
this.escrowAddress = getContractAddress(chainId, 'Escrow')
}

async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider,
eventName?: string
): Promise<any> {
try {
if (
!this.escrowAddress ||
event.address.toLowerCase() !== this.escrowAddress.toLowerCase()
) {
return null
}

const decoded = escrowInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const { args } = decoded
const record: EscrowEvent = {
id: `${event.transactionHash}-${event.index}`,
eventType: eventName,
chainId,
contract: event.address.toLowerCase(),
block: event.blockNumber,
txHash: event.transactionHash
}

switch (eventName) {
case EVENTS.ESCROW_AUTH:
record.payer = addr(args.payer)
record.payee = addr(args.payee)
record.maxLockedAmount = num(args.maxLockedAmount)
record.maxLockSeconds = num(args.maxLockSeconds)
record.maxLockCounts = num(args.maxLockCounts)
break
case EVENTS.ESCROW_LOCK:
record.payer = addr(args.payer)
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.amount = num(args.amount)
record.expiry = num(args.expiry)
record.token = addr(args.token)
break
case EVENTS.ESCROW_CLAIMED:
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.token = addr(args.token)
record.payer = addr(args.payer)
record.amount = num(args.amount)
record.proof = args.proof?.toString()
break
case EVENTS.ESCROW_CANCELED:
record.payee = addr(args.payee)
record.jobId = num(args.jobId)
record.token = addr(args.token)
record.payer = addr(args.payer)
record.amount = num(args.amount)
break
case EVENTS.ESCROW_DEPOSIT:
case EVENTS.ESCROW_WITHDRAW:
record.payer = addr(args.payer)
record.token = addr(args.token)
record.amount = num(args.amount)
break
default:
return null
}

const { escrow } = await this.getDatabase()
if (!escrow) return null
const result = await escrow.create(record)
INDEXER_LOGGER.logMessage(
`[Escrow] ${eventName} indexed for tx ${event.transactionHash} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing Escrow ${eventName} event: ${err.message}`,
true
)
return null
}
}
}
1 change: 1 addition & 0 deletions src/components/Indexer/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export * from './OrderStartedEventProcessor.js'
export * from './NewAccessListEventProcessor.js'
export * from './AddressAddedEventProcessor.js'
export * from './AddressRemovedEventProcessor.js'
export * from './EscrowEventProcessor.js'
export * from './BaseProcessor.js'

export type ProcessorConstructor = new (
Expand Down
5 changes: 5 additions & 0 deletions src/components/core/handler/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import {
PersistentStorageUploadFileHandler
} from './persistentStorage.js'
import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js'
import { EscrowEventsHandler } from './escrowHandler.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -208,6 +209,10 @@ export class CoreHandlersRegistry {
PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST,
new SearchAccessListHandler(node)
)
this.registerCoreHandler(
PROTOCOL_COMMANDS.GET_ESCROW_EVENTS,
new EscrowEventsHandler(node)
)
}

public static getInstance(
Expand Down
61 changes: 61 additions & 0 deletions src/components/core/handler/escrowHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { CommandHandler } from './handler.js'
import { GetEscrowEventsCommand } from '../../../@types/commands.js'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { Readable } from 'stream'
import {
ValidateParams,
validateCommandParameters
} from '../../httpRoutes/validateCommands.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'

export class EscrowEventsHandler extends CommandHandler {
validate(command: GetEscrowEventsCommand): ValidateParams {
return validateCommandParameters(command, [])
}

async handle(task: GetEscrowEventsCommand): Promise<P2PCommandResponse> {
const validationResponse = await this.verifyParamsAndRateLimits(task)
if (this.shouldDenyTaskHandling(validationResponse)) {
return validationResponse
}
try {
const database = await this.getOceanNode().getDatabase()
if (!database || !database.escrow) {
CORE_LOGGER.error('Escrow database is not available')
return {
stream: null,
status: { httpStatus: 503, error: 'Escrow database is not available' }
}
}

const filters: Record<string, any> = {
chainId: task.chainId,
eventType: task.eventType,
payer: task.payer ? task.payer.toLowerCase() : undefined,
payee: task.payee ? task.payee.toLowerCase() : undefined,
token: task.token ? task.token.toLowerCase() : undefined,
jobId: task.jobId,
txHash: task.txId
}

let result = await database.escrow.search(
filters,
task.maxResultsPerPage,
task.pageNumber
)
if (!result) {
result = []
}
return {
stream: Readable.from(JSON.stringify(result)),
status: { httpStatus: 200 }
}
} catch (error) {
CORE_LOGGER.error(`Error in EscrowEventsHandler: ${error.message}`)
return {
stream: null,
status: { httpStatus: 500, error: 'Unknown error: ' + error.message }
}
}
}
}
23 changes: 23 additions & 0 deletions src/components/database/BaseDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Schema } from '.'
import { OceanNodeDBConfig } from '../../@types'
import { AccessListUser } from '../../@types/AccessList.js'
import { EscrowEvent } from '../../@types/Escrow.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { ElasticsearchSchema } from './ElasticSchemas.js'
Expand Down Expand Up @@ -152,6 +153,28 @@ export abstract class AbstractOrderDatabase {
abstract delete(orderId: string): Promise<any>
}

export abstract class AbstractEscrowDatabase {
protected config: OceanNodeDBConfig
protected schema: Schema

constructor(config: OceanNodeDBConfig, schema: Schema) {
this.config = config
this.schema = schema
}

abstract create(event: EscrowEvent): Promise<any>

abstract retrieve(id: string): Promise<Record<string, any> | null>

abstract search(
filters: Record<string, any>,
maxResultsPerPage?: number,
pageNumber?: number
): Promise<Record<string, any>[] | null>

abstract delete(id: string): Promise<any>
}

export abstract class AbstractDdoDatabase {
protected config: OceanNodeDBConfig
protected schemas: Schema[]
Expand Down
17 changes: 15 additions & 2 deletions src/components/database/DatabaseFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
AbstractAccessListDatabase,
AbstractDdoDatabase,
AbstractDdoStateDatabase,
AbstractEscrowDatabase,
AbstractIndexerDatabase,
AbstractLogDatabase,
AbstractOrderDatabase
Expand All @@ -11,6 +12,7 @@ import {
ElasticsearchAccessListDatabase,
ElasticsearchDdoDatabase,
ElasticsearchDdoStateDatabase,
ElasticsearchEscrowDatabase,
ElasticsearchIndexerDatabase,
ElasticsearchLogDatabase,
ElasticsearchOrderDatabase
Expand All @@ -20,6 +22,7 @@ import {
TypesenseAccessListDatabase,
TypesenseDdoDatabase,
TypesenseDdoStateDatabase,
TypesenseEscrowDatabase,
TypesenseIndexerDatabase,
TypesenseLogDatabase,
TypesenseOrderDatabase
Expand Down Expand Up @@ -50,7 +53,9 @@ export class DatabaseFactory {
ddoStateQuery: () => new ElasticSearchDdoStateQuery(),
metadataQuery: () => new ElasticSearchMetadataQuery(),
accessList: (config: OceanNodeDBConfig) =>
new ElasticsearchAccessListDatabase(config)
new ElasticsearchAccessListDatabase(config),
escrow: (config: OceanNodeDBConfig) =>
new ElasticsearchEscrowDatabase(config, elasticSchemas.escrowSchema)
},
typesense: {
ddo: (config: OceanNodeDBConfig) =>
Expand All @@ -66,7 +71,9 @@ export class DatabaseFactory {
ddoStateQuery: () => new TypesenseDdoStateQuery(),
metadataQuery: () => new TypesenseMetadataQuery(),
accessList: (config: OceanNodeDBConfig) =>
new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema)
new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema),
escrow: (config: OceanNodeDBConfig) =>
new TypesenseEscrowDatabase(config, typesenseSchemas.escrowSchema)
}
}

Expand Down Expand Up @@ -142,4 +149,10 @@ export class DatabaseFactory {
): Promise<AbstractAccessListDatabase> {
return this.createDatabase('accessList', config)
}

static createEscrowDatabase(
config: OceanNodeDBConfig
): Promise<AbstractEscrowDatabase> {
return this.createDatabase('escrow', config)
}
}
26 changes: 26 additions & 0 deletions src/components/database/ElasticSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export type ElasticsearchSchemas = {
orderSchema: ElasticsearchSchema
ddoStateSchema: ElasticsearchSchema
accessListSchema: ElasticsearchSchema
escrowSchema: ElasticsearchSchema
}

// "op_ddo_short" is a node-side index for deprecated DDOs (state !== 0).
Expand Down Expand Up @@ -143,5 +144,30 @@ export const elasticSchemas: ElasticsearchSchemas = {
}
}
}
},
escrowSchema: {
index: 'escrow',
body: {
mappings: {
properties: {
id: { type: 'keyword' },
eventType: { type: 'keyword' },
chainId: { type: 'long' },
contract: { type: 'keyword' },
block: { type: 'long' },
txHash: { type: 'keyword' },
payer: { type: 'keyword' },
payee: { type: 'keyword' },
token: { type: 'keyword' },
jobId: { type: 'keyword' },
amount: { type: 'text' },
expiry: { type: 'text' },
proof: { type: 'text' },
maxLockedAmount: { type: 'text' },
maxLockSeconds: { type: 'text' },
maxLockCounts: { type: 'text' }
}
}
}
}
}
Loading
Loading