diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04200f877..a2f240c01 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -272,6 +272,18 @@ jobs: sleep 10 [ -f "$HOME/.ocean/ocean-contracts/artifacts/ready" ] && break done + - name: Use regular Escrow for system tests + run: | + SYSTEM_TEST_ADDRESS_FILE="${RUNNER_TEMP}/address.system-tests.json" + jq 'del(.development.EnterpriseEscrow)' "${HOME}/.ocean/ocean-contracts/artifacts/address.json" > "${SYSTEM_TEST_ADDRESS_FILE}" + echo "ADDRESS_FILE=${SYSTEM_TEST_ADDRESS_FILE}" >> "$GITHUB_ENV" + - name: Configure escrow timeout for system tests + run: echo "ESCROW_CLAIM_TIMEOUT=$(date +%s)" >> "$GITHUB_ENV" + - name: Use separate consumer key for ocean-cli paid compute system test + working-directory: ${{ github.workspace }}/ocean-cli + run: | + perl -0pi -e 's/0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58/0x1d751ded5a32226054cd2e71261039b65afb9ee1c746d055dd699b1150a5befc/g' test/paidComputeFlow.test.ts + grep -n "PRIVATE_KEY" test/paidComputeFlow.test.ts - name: docker logs run: docker logs ocean-contracts-1 && docker logs ocean-typesense-1 diff --git a/docs/env.md b/docs/env.md index 8bf52eef7..dd8761849 100644 --- a/docs/env.md +++ b/docs/env.md @@ -127,6 +127,8 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ ## Compute +- `C2D_DOWNLOAD_TIMEOUT`: Timeout (in seconds) for pulling the algorithm docker image during a C2D job. If the pull exceeds this timeout, the job fails with `PullImageFailed` instead of getting stuck. Defaults to `900` (15 minutes). Example: `900` + The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable is used to configure Docker-based compute environments in Ocean Node. This guide will walk you through the options available for defining `DOCKER_COMPUTE_ENVIRONMENTS` and how to set it up correctly. For configuring compute environments and setting prices for each resource (including pricing units and examples), see [Compute pricing](compute-pricing.md). Example Configuration diff --git a/package-lock.json b/package-lock.json index b3522f86c..369d4e0a4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ocean-node", - "version": "3.1.0", + "version": "3.1.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ocean-node", - "version": "3.1.0", + "version": "3.1.1", "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { @@ -35,7 +35,7 @@ "@libp2p/upnp-nat": "^4.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", - "@oceanprotocol/contracts": "^2.6.0", + "@oceanprotocol/contracts": "^2.7.0", "@oceanprotocol/ddo-js": "^0.2.0", "axios": "^1.15.0", "base58-js": "^2.0.0", @@ -5236,9 +5236,9 @@ } }, "node_modules/@oceanprotocol/contracts": { - "version": "2.6.0", - "resolved": "https://registry.npmjs.org/@oceanprotocol/contracts/-/contracts-2.6.0.tgz", - "integrity": "sha512-4K3TTM0q4VlBs7GLXzQkGMae576iAGHMARqMFcKXUUtfGyiT8KP4sP9IBsq8UUhu11R4JU2KTqXenHS8EWxrbA==", + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/@oceanprotocol/contracts/-/contracts-2.7.0.tgz", + "integrity": "sha512-6rXT/agjty4VyT0j/Do13Rf8dH0eweL57e7rFSv4KmANFx3wdTcQoInHZsoRpwXZf1MmMgHVWL4/ZvqHtRqMRA==", "license": "Apache-2.0" }, "node_modules/@oceanprotocol/ddo-js": { diff --git a/package.json b/package.json index ab701c163..2579aa26a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ocean-node", - "version": "3.1.0", + "version": "3.1.1", "description": "Ocean Node is used to run all core services in the Ocean stack", "author": "Ocean Protocol Foundation", "license": "Apache-2.0", @@ -73,7 +73,7 @@ "@libp2p/upnp-nat": "^4.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", - "@oceanprotocol/contracts": "^2.6.0", + "@oceanprotocol/contracts": "^2.7.0", "@oceanprotocol/ddo-js": "^0.2.0", "axios": "^1.15.0", "base58-js": "^2.0.0", diff --git a/src/@types/AccessList.ts b/src/@types/AccessList.ts index 242b991d1..88b521710 100644 --- a/src/@types/AccessList.ts +++ b/src/@types/AccessList.ts @@ -4,3 +4,10 @@ export interface AccessList { [chainId: string]: string[] } + +export interface AccessListUser { + wallet: string + tokenId: number + block: number + txId: string +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 6ed0f76f4..cbeb6977f 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -29,6 +29,16 @@ export interface FindPeerCommand extends Command { export interface GetP2PPeersCommand extends Command {} export interface GetP2PNetworkStatsCommand extends Command {} +export interface GetAccessListCommand extends Command { + chainId: number + contractAddress: string +} + +export interface SearchAccessListCommand extends Command { + wallet: string + chainId?: number +} + export interface SignedCommand extends Command { nonce: string signature: string diff --git a/src/components/Indexer/processor.ts b/src/components/Indexer/processor.ts index ada273397..98f9d2723 100644 --- a/src/components/Indexer/processor.ts +++ b/src/components/Indexer/processor.ts @@ -17,6 +17,9 @@ import { ExchangeActivatedEventProcessor, ExchangeDeactivatedEventProcessor, ExchangeRateChangedEventProcessor, + NewAccessListEventProcessor, + AddressAddedEventProcessor, + AddressRemovedEventProcessor, ProcessorConstructor } from './processors/index.js' import { findEventByKey } from './utils.js' @@ -36,7 +39,10 @@ const EVENT_PROCESSOR_MAP: Record = { [EVENTS.EXCHANGE_CREATED]: ExchangeCreatedEventProcessor, [EVENTS.EXCHANGE_ACTIVATED]: ExchangeActivatedEventProcessor, [EVENTS.EXCHANGE_DEACTIVATED]: ExchangeDeactivatedEventProcessor, - [EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor + [EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor, + [EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor, + [EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor, + [EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor } const processorInstances = new Map() @@ -83,6 +89,7 @@ export const processChunkLogs = async ( (allowedValidatorsList && Object.keys(allowedValidatorsList).length > 0) for (const log of logs) { const event = findEventByKey(log.topics[0]) + if (event && Object.values(EVENTS).includes(event.type)) { // only log & process the ones we support INDEXER_LOGGER.logMessage( @@ -188,6 +195,7 @@ export const processChunkLogs = async ( } } } + return storeEvents } @@ -208,6 +216,7 @@ export const processBlocks = async ( blockLogs && blockLogs.length > 0 ? await processChunkLogs(blockLogs, signer, provider, network, config) : [] + return { lastBlock: lastIndexedBlock + count, foundEvents: events diff --git a/src/components/Indexer/processors/AddressAddedEventProcessor.ts b/src/components/Indexer/processors/AddressAddedEventProcessor.ts new file mode 100644 index 000000000..a14dd511e --- /dev/null +++ b/src/components/Indexer/processors/AddressAddedEventProcessor.ts @@ -0,0 +1,48 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const accessListInterface = new Interface(AccessList.abi) + +export class AddressAddedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = accessListInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const wallet = decoded.args[0].toString().toLowerCase() + const tokenId = Number(decoded.args[1]) + const contractAddress = event.address.toLowerCase() + + const { accessList } = await this.getDatabase() + const result = await accessList.addUser(chainId, contractAddress, { + wallet, + tokenId, + block: event.blockNumber, + txId: event.transactionHash + }) + + INDEXER_LOGGER.logMessage( + `[AddressAdded] ${wallet} (tokenId=${tokenId}) added to ${contractAddress} on chain ${chainId}` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing AddressAdded event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/AddressRemovedEventProcessor.ts b/src/components/Indexer/processors/AddressRemovedEventProcessor.ts new file mode 100644 index 000000000..923c1dad1 --- /dev/null +++ b/src/components/Indexer/processors/AddressRemovedEventProcessor.ts @@ -0,0 +1,46 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const accessListInterface = new Interface(AccessList.abi) + +export class AddressRemovedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = accessListInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const tokenId = Number(decoded.args[0]) + const contractAddress = event.address.toLowerCase() + + const { accessList } = await this.getDatabase() + const result = await accessList.removeUserByTokenId( + chainId, + contractAddress, + tokenId + ) + + INDEXER_LOGGER.logMessage( + `[AddressRemoved] tokenId=${tokenId} removed from ${contractAddress} on chain ${chainId}` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing AddressRemoved event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/NewAccessListEventProcessor.ts b/src/components/Indexer/processors/NewAccessListEventProcessor.ts new file mode 100644 index 000000000..a28ee8de5 --- /dev/null +++ b/src/components/Indexer/processors/NewAccessListEventProcessor.ts @@ -0,0 +1,74 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' } +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const factoryInterface = new Interface(AccessListFactory.abi) + +export class NewAccessListEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = factoryInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const contractAddress = decoded.args[0].toString().toLowerCase() + + let transferable = false + let name: string | undefined + let symbol: string | undefined + try { + const accessListContract = new ethers.Contract( + contractAddress, + AccessList.abi, + provider + ) + const [transferableRaw, nameRaw, symbolRaw] = await Promise.all([ + accessListContract.transferable(), + accessListContract.name(), + accessListContract.symbol() + ]) + transferable = Boolean(transferableRaw) + name = nameRaw + symbol = symbolRaw + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_WARN, + `Failed to read on-chain metadata for ${contractAddress}: ${err.message}` + ) + } + + const { accessList } = await this.getDatabase() + const result = await accessList.create( + chainId, + contractAddress, + transferable, + event.blockNumber, + event.transactionHash, + name, + symbol + ) + + INDEXER_LOGGER.logMessage( + `[NewAccessList] Indexed access list ${contractAddress} on chain ${chainId} (name=${name}, symbol=${symbol}, transferable=${transferable})` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing NewAccessList event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/index.ts b/src/components/Indexer/processors/index.ts index 912ab6aa2..418adeedb 100644 --- a/src/components/Indexer/processors/index.ts +++ b/src/components/Indexer/processors/index.ts @@ -12,6 +12,9 @@ export * from './MetadataEventProcessor.js' export * from './MetadataStateEventProcessor.js' export * from './OrderReusedEventProcessor.js' export * from './OrderStartedEventProcessor.js' +export * from './NewAccessListEventProcessor.js' +export * from './AddressAddedEventProcessor.js' +export * from './AddressRemovedEventProcessor.js' export * from './BaseProcessor.js' export type ProcessorConstructor = new ( diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index cb8bd86e5..2183bb11d 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -49,6 +49,7 @@ import { } from 'fs' import { pipeline } from 'node:stream/promises' import { CORE_LOGGER } from '../../utils/logging/common.js' +import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' import { AssetUtils } from '../../utils/asset.js' import { FindDdoHandler } from '../core/handler/ddoHandler.js' import { OceanNode } from '../../OceanNode.js' @@ -1695,6 +1696,15 @@ export class C2DEngineDocker extends C2DEngine { } } + private getImagePullTimeoutMs(): number { + const raw = ENVIRONMENT_VARIABLES.C2D_DOWNLOAD_TIMEOUT.value + const parsed = raw ? parseInt(raw, 10) : NaN + if (Number.isFinite(parsed) && parsed > 0) { + return parsed * 1000 + } + return 15 * 60 * 1000 + } + // eslint-disable-next-line require-await private async processJob(job: DBComputeJob) { CORE_LOGGER.info( @@ -2547,6 +2557,7 @@ export class C2DEngineDocker extends C2DEngine { `Using docker registry auth for ${registry} to pull image ${job.containerImage}` ) } + pullOptions.abortSignal = AbortSignal.timeout(this.getImagePullTimeoutMs()) const pullStream = await this.docker.pull(job.containerImage, pullOptions) await new Promise((resolve, reject) => { diff --git a/src/components/core/handler/accessListHandler.ts b/src/components/core/handler/accessListHandler.ts new file mode 100644 index 000000000..e99a4baa3 --- /dev/null +++ b/src/components/core/handler/accessListHandler.ts @@ -0,0 +1,84 @@ +import { CommandHandler } from './handler.js' +import { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { + GetAccessListCommand, + SearchAccessListCommand +} from '../../../@types/commands.js' +import { Readable } from 'stream' +import { isAddress } from 'ethers' +import { + ValidateParams, + validateCommandParameters +} from '../../httpRoutes/validateCommands.js' +import { CORE_LOGGER } from '../../../utils/logging/common.js' + +export class GetAccessListHandler extends CommandHandler { + validate(command: GetAccessListCommand): ValidateParams { + return validateCommandParameters(command, ['chainId', 'contractAddress']) + } + + async handle(task: GetAccessListCommand): Promise { + const checks = await this.verifyParamsAndRateLimits(task) + if (checks.status.httpStatus !== 200 || checks.status.error !== null) { + return checks + } + try { + const db = await this.getOceanNode().getDatabase() + const doc = await db.accessList.retrieve( + Number(task.chainId), + String(task.contractAddress) + ) + if (!doc) { + return { + stream: null, + status: { httpStatus: 404, error: 'AccessList not found' } + } + } + return { + stream: Readable.from(JSON.stringify(doc)), + status: { httpStatus: 200 } + } + } catch (error) { + CORE_LOGGER.error(`GetAccessListHandler error: ${error.message}`) + return { + stream: null, + status: { httpStatus: 500, error: 'Unknown error: ' + error.message } + } + } + } +} + +export class SearchAccessListHandler extends CommandHandler { + validate(command: SearchAccessListCommand): ValidateParams { + return validateCommandParameters(command, ['wallet']) + } + + async handle(task: SearchAccessListCommand): Promise { + const checks = await this.verifyParamsAndRateLimits(task) + if (checks.status.httpStatus !== 200 || checks.status.error !== null) { + return checks + } + try { + const walletString = String(task.wallet) + if (!isAddress(walletString)) { + return { + stream: null, + status: { httpStatus: 400, error: 'Invalid wallet address' } + } + } + const db = await this.getOceanNode().getDatabase() + const chainId = task.chainId !== undefined ? Number(task.chainId) : undefined + const docs = await db.accessList.searchByWallet(walletString, chainId) + return { + stream: Readable.from(JSON.stringify(docs ?? [])), + status: { httpStatus: 200 } + } + } catch (error) { + CORE_LOGGER.error(`SearchAccessListHandler error: ${error.message}`) + return { + stream: null, + status: { httpStatus: 500, error: 'Unknown error: ' + error.message } + } + } + } +} diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index 531f7f1a9..e492a01a6 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -55,6 +55,7 @@ import { PersistentStorageListFilesHandler, PersistentStorageUploadFileHandler } from './persistentStorage.js' +import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -199,6 +200,14 @@ export class CoreHandlersRegistry { PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, new PersistentStorageDeleteFileHandler(node) ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.GET_ACCESS_LIST, + new GetAccessListHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + new SearchAccessListHandler(node) + ) } public static getInstance( diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index 45e3c5f97..2e0d667fd 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -170,6 +170,9 @@ export async function status( CORE_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error getting c2d clusters: ${error}`) } nodeStatus.supportedSchemas = typesenseSchemas.ddoSchemas + } else { + delete nodeStatus.c2dClusters + delete nodeStatus.supportedSchemas } if (config.persistentStorage) { diff --git a/src/components/database/BaseDatabase.ts b/src/components/database/BaseDatabase.ts index 3fddb074f..1214e9488 100644 --- a/src/components/database/BaseDatabase.ts +++ b/src/components/database/BaseDatabase.ts @@ -1,5 +1,6 @@ import { Schema } from '.' import { OceanNodeDBConfig } from '../../@types' +import { AccessListUser } from '../../@types/AccessList.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -38,6 +39,34 @@ export abstract class AbstractIndexerDatabase extends AbstractDatabase { abstract delete(network: number): Promise } +export abstract class AbstractAccessListDatabase extends AbstractDatabase { + abstract create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ): Promise + + abstract retrieve(chainId: number, contractAddress: string): Promise + abstract addUser( + chainId: number, + contractAddress: string, + user: AccessListUser + ): Promise + + abstract removeUserByTokenId( + chainId: number, + contractAddress: string, + tokenId: number + ): Promise + + abstract searchByWallet(wallet: string, chainId?: number): Promise + abstract delete(chainId: number, contractAddress: string): Promise +} + export abstract class AbstractLogDatabase extends AbstractDatabase { abstract insertLog(logEntry: Record): Promise abstract retrieveLog(id: string): Promise | null> diff --git a/src/components/database/DatabaseFactory.ts b/src/components/database/DatabaseFactory.ts index 4d5d35603..cb6dbcb2c 100644 --- a/src/components/database/DatabaseFactory.ts +++ b/src/components/database/DatabaseFactory.ts @@ -1,5 +1,6 @@ import { OceanNodeDBConfig } from '../../@types' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, @@ -7,6 +8,7 @@ import { AbstractOrderDatabase } from './BaseDatabase.js' import { + ElasticsearchAccessListDatabase, ElasticsearchDdoDatabase, ElasticsearchDdoStateDatabase, ElasticsearchIndexerDatabase, @@ -15,6 +17,7 @@ import { } from './ElasticSearchDatabase.js' import { typesenseSchemas } from './TypesenseSchemas.js' import { + TypesenseAccessListDatabase, TypesenseDdoDatabase, TypesenseDdoStateDatabase, TypesenseIndexerDatabase, @@ -45,7 +48,9 @@ export class DatabaseFactory { new ElasticsearchOrderDatabase(config, elasticSchemas.orderSchema), ddoState: (config: OceanNodeDBConfig) => new ElasticsearchDdoStateDatabase(config), ddoStateQuery: () => new ElasticSearchDdoStateQuery(), - metadataQuery: () => new ElasticSearchMetadataQuery() + metadataQuery: () => new ElasticSearchMetadataQuery(), + accessList: (config: OceanNodeDBConfig) => + new ElasticsearchAccessListDatabase(config) }, typesense: { ddo: (config: OceanNodeDBConfig) => @@ -59,7 +64,9 @@ export class DatabaseFactory { ddoState: (config: OceanNodeDBConfig) => new TypesenseDdoStateDatabase(config, typesenseSchemas.ddoStateSchema), ddoStateQuery: () => new TypesenseDdoStateQuery(), - metadataQuery: () => new TypesenseMetadataQuery() + metadataQuery: () => new TypesenseMetadataQuery(), + accessList: (config: OceanNodeDBConfig) => + new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema) } } @@ -129,4 +136,10 @@ export class DatabaseFactory { static async createConfigDatabase(): Promise { return await new SQLLiteConfigDatabase() } + + static createAccessListDatabase( + config: OceanNodeDBConfig + ): Promise { + return this.createDatabase('accessList', config) + } } diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index e7d1df92b..8918d112d 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -1,11 +1,13 @@ import { Client } from '@elastic/elasticsearch' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' +import { AccessListUser } from '../../@types/AccessList.js' import { createElasticsearchClientWithRetry } from './ElasticsearchConfigHelper.js' import { OceanNodeDBConfig } from '../../@types' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -1040,6 +1042,277 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase { } } +export class ElasticsearchAccessListDatabase extends AbstractAccessListDatabase { + private client: Client + private index: string + + constructor(config: OceanNodeDBConfig) { + super(config) + this.index = 'access_list' + + return (async (): Promise => { + this.client = await createElasticsearchClientWithRetry(config) + await this.initializeIndex() + return this + })() as unknown as ElasticsearchAccessListDatabase + } + + private docId(chainId: number, contractAddress: string): string { + return `${chainId}-${contractAddress.toLowerCase()}` + } + + private async initializeIndex() { + try { + const indexExists = await this.client.indices.exists({ index: this.index }) + if (!indexExists) { + await this.client.indices.create({ + index: this.index, + body: { + mappings: { + properties: { + chainId: { type: 'integer' }, + contractAddress: { type: 'keyword' }, + name: { type: 'keyword' }, + symbol: { type: 'keyword' }, + transferable: { type: 'boolean' }, + users: { + type: 'nested', + properties: { + wallet: { type: 'keyword' }, + tokenId: { type: 'long' }, + block: { type: 'long' }, + txId: { type: 'keyword' } + } + }, + deploymentBlock: { type: 'long' }, + deploymentTxId: { type: 'keyword' } + } + } + } + }) + } + } catch (e) { + DATABASE_LOGGER.error(e.message) + } + } + + async create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + ctx._source.transferable = params.transferable; + ctx._source.deploymentBlock = params.block; + ctx._source.deploymentTxId = params.txId; + if (params.name != null) { ctx._source.name = params.name; } + if (params.symbol != null) { ctx._source.symbol = params.symbol; } + `, + lang: 'painless', + params: { transferable, block, txId, name, symbol } + }, + upsert: { + chainId, + contractAddress: lowerContract, + name, + symbol, + transferable, + users: [], + deploymentBlock: block, + deploymentTxId: txId + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when upserting access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async retrieve(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + const result = await this.client.get({ + index: this.index, + id, + refresh: true + }) + return result._source + } catch (error) { + if (error?.meta?.statusCode === 404) { + return null + } + const errorMsg = `Error when retrieving access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async addUser(chainId: number, contractAddress: string, user: AccessListUser) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + const normalized: AccessListUser = { ...user, wallet: user.wallet.toLowerCase() } + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + if (ctx._source.users == null) { ctx._source.users = []; } + boolean exists = false; + for (int i = 0; i < ctx._source.users.length; i++) { + if (ctx._source.users[i].tokenId == params.user.tokenId) { exists = true; break; } + } + if (!exists) { ctx._source.users.add(params.user); } + `, + lang: 'painless', + params: { user: normalized } + }, + upsert: { + chainId, + contractAddress: lowerContract, + transferable: false, + users: [normalized] + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when adding user ${normalized.wallet} to access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async removeUserByTokenId(chainId: number, contractAddress: string, tokenId: number) { + const id = this.docId(chainId, contractAddress) + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + if (ctx._source.users != null) { + ctx._source.users.removeIf(u -> u.tokenId == params.tokenId); + } + `, + lang: 'painless', + params: { tokenId } + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + if (error?.meta?.statusCode === 404) { + DATABASE_LOGGER.logMessageWithEmoji( + `AddressRemoved on missing access list ${id} (tokenId=${tokenId}); ignoring.`, + true, + GENERIC_EMOJIS.EMOJI_CHECK_MARK, + LOG_LEVELS_STR.LEVEL_WARN + ) + return null + } + const errorMsg = `Error when removing tokenId ${tokenId} from access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async searchByWallet(wallet: string, chainId?: number): Promise { + const lowerWallet = wallet.toLowerCase() + const filters: any[] = [ + { + nested: { + path: 'users', + query: { term: { 'users.wallet': lowerWallet } } + } + } + ] + if (chainId !== undefined) { + filters.push({ term: { chainId } }) + } + try { + const result = await this.client.search({ + index: this.index, + size: 250, + body: { + query: { bool: { must: filters } } + } + } as any) + return result.hits.hits.map((h: any) => h._source) + } catch (error) { + const errorMsg = `Error when searching access lists by wallet ${lowerWallet}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return [] + } + } + + async delete(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + await this.client.delete({ + index: this.index, + id, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when deleting access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } +} + /** * Make DB agnostic APIs. The response should be similar, no matter what DB engine is used * Normalizes the document responses to match same kind of typesense ones diff --git a/src/components/database/TypesenseDatabase.ts b/src/components/database/TypesenseDatabase.ts index 4129fdf02..b11054252 100644 --- a/src/components/database/TypesenseDatabase.ts +++ b/src/components/database/TypesenseDatabase.ts @@ -7,12 +7,14 @@ import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { ENVIRONMENT_VARIABLES, TYPESENSE_HITS_CAP } from '../../utils/constants.js' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' +import { AccessListUser } from '../../@types/AccessList.js' import { validateDDO } from '../../utils/asset.js' import { DDOManager } from '@oceanprotocol/ddo-js' @@ -934,3 +936,175 @@ export class TypesenseLogDatabase extends AbstractLogDatabase { } } } + +export class TypesenseAccessListDatabase extends AbstractAccessListDatabase { + private provider: Typesense + + constructor(config: OceanNodeDBConfig, schema: TypesenseSchema) { + super(config, schema) + return (async (): Promise => { + this.provider = new Typesense({ + ...convertTypesenseConfig(this.config.url), + logger: DATABASE_LOGGER + }) + try { + await this.provider.collections(this.schema.name).retrieve() + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 404) { + await this.provider.collections().create(this.schema) + } + } + return this + })() as unknown as TypesenseAccessListDatabase + } + + private docId(chainId: number, contractAddress: string): string { + return `${chainId}-${contractAddress.toLowerCase()}` + } + + async create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + try { + const existing: any = await this.retrieve(chainId, contractAddress) + const doc = { + id, + chainId, + contractAddress: lowerContract, + name, + symbol, + transferable, + users: existing?.users ?? [], + deploymentBlock: block, + deploymentTxId: txId + } + if (existing) { + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, doc) + } + return await this.provider.collections(this.schema.name).documents().create(doc) + } catch (error) { + this.logError(`upserting access list ${id}`, error) + return null + } + } + + async retrieve(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + const doc: any = await this.provider + .collections(this.schema.name) + .documents() + .retrieve(id) + return stripId(doc) + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 404) { + return null + } + this.logError(`retrieving access list ${id}`, error) + return null + } + } + + async addUser(chainId: number, contractAddress: string, user: AccessListUser) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + const normalized: AccessListUser = { ...user, wallet: user.wallet.toLowerCase() } + try { + const existing: any = await this.retrieve(chainId, contractAddress) + const users: AccessListUser[] = existing?.users ?? [] + const exists = users.some((u) => u.tokenId === normalized.tokenId) + const nextUsers = exists ? users : [...users, normalized] + if (existing) { + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, { users: nextUsers }) + } + return await this.provider.collections(this.schema.name).documents().create({ + id, + chainId, + contractAddress: lowerContract, + transferable: false, + users: nextUsers + }) + } catch (error) { + this.logError(`adding user ${normalized.wallet} to access list ${id}`, error) + return null + } + } + + async removeUserByTokenId(chainId: number, contractAddress: string, tokenId: number) { + const id = this.docId(chainId, contractAddress) + try { + const existing: any = await this.retrieve(chainId, contractAddress) + if (!existing) return null + const nextUsers = (existing.users ?? []).filter( + (u: AccessListUser) => u.tokenId !== tokenId + ) + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, { users: nextUsers }) + } catch (error) { + this.logError(`removing tokenId ${tokenId} from access list ${id}`, error) + return null + } + } + + async searchByWallet(wallet: string, chainId?: number): Promise { + const lowerWallet = wallet.toLowerCase() + try { + const filterParts = [`users.wallet:=${lowerWallet}`] + if (chainId !== undefined) filterParts.push(`chainId:=${chainId}`) + const result = await this.provider + .collections(this.schema.name) + .documents() + .search({ + q: '*', + query_by: 'contractAddress', + filter_by: filterParts.join(' && '), + per_page: 250 + }) + return (result.hits ?? []).map((h: any) => stripId(h.document)) + } catch (error) { + this.logError(`searching access lists by wallet ${lowerWallet}`, error) + return [] + } + } + + async delete(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + return await this.provider.collections(this.schema.name).documents().delete(id) + } catch (error) { + this.logError(`deleting access list ${id}`, error) + return null + } + } + + private logError(action: string, error: any) { + DATABASE_LOGGER.logMessageWithEmoji( + `Error when ${action}: ${error?.message ?? error}`, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + } +} + +function stripId(doc: any): any { + if (!doc) return doc + const { id: _id, ...rest } = doc + return rest +} diff --git a/src/components/database/TypesenseSchemas.ts b/src/components/database/TypesenseSchemas.ts index 0cdf7ea5e..f928922c1 100644 --- a/src/components/database/TypesenseSchemas.ts +++ b/src/components/database/TypesenseSchemas.ts @@ -53,6 +53,7 @@ export type TypesenseSchemas = { logSchemas: TypesenseSchema orderSchema: TypesenseSchema ddoStateSchema: TypesenseSchema + accessListSchema: TypesenseSchema } const ddoSchemas = readJsonSchemas() export const typesenseSchemas: TypesenseSchemas = { @@ -126,5 +127,21 @@ export const typesenseSchemas: TypesenseSchemas = { { name: 'valid', type: 'bool' }, { name: 'error', type: 'string' } ] + }, + accessListSchema: { + name: 'access_list', + enable_nested_fields: true, + fields: [ + { name: 'chainId', type: 'int64' }, + { name: 'contractAddress', type: 'string' }, + { name: 'name', type: 'string', optional: true }, + { name: 'symbol', type: 'string', optional: true }, + { name: 'transferable', type: 'bool' }, + { name: 'users', type: 'object[]', optional: true }, + { name: 'users.wallet', type: 'string[]', optional: true, facet: true }, + { name: 'users.tokenId', type: 'int64[]', optional: true }, + { name: 'deploymentBlock', type: 'int64', optional: true }, + { name: 'deploymentTxId', type: 'string', optional: true } + ] } } diff --git a/src/components/database/index.ts b/src/components/database/index.ts index 6b0da5250..3c42f468d 100644 --- a/src/components/database/index.ts +++ b/src/components/database/index.ts @@ -6,6 +6,7 @@ import { } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, @@ -29,6 +30,7 @@ export class Database { logs: AbstractLogDatabase order: AbstractOrderDatabase ddoState: AbstractDdoStateDatabase + accessList: AbstractAccessListDatabase sqliteConfig: SQLLiteConfigDatabase c2d: C2DDatabase authToken: AuthTokenDatabase @@ -101,6 +103,13 @@ export class Database { DATABASE_LOGGER.error(`DDO State database initialization failed: ${error}`) return null } + + try { + db.accessList = await DatabaseFactory.createAccessListDatabase(config) + } catch (error) { + DATABASE_LOGGER.error(`AccessList database initialization failed: ${error}`) + return null + } } else { DATABASE_LOGGER.info( 'Invalid DB URL. Only Nonce, C2D, Auth Token and Config Databases are initialized.' diff --git a/src/components/httpRoutes/accessList.ts b/src/components/httpRoutes/accessList.ts new file mode 100644 index 000000000..8a3fc9440 --- /dev/null +++ b/src/components/httpRoutes/accessList.ts @@ -0,0 +1,65 @@ +import express, { Request, Response } from 'express' +import { Readable } from 'stream' +import { isAddress } from 'ethers' +import { + GetAccessListHandler, + SearchAccessListHandler +} from '../core/handler/accessListHandler.js' +import { PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { streamToString } from '../../utils/util.js' + +export const accessListRoutes = express.Router() + +accessListRoutes.get( + '/api/services/accesslists', + async (req: Request, res: Response): Promise => { + const { wallet } = req.query + if (typeof wallet !== 'string' || !wallet) { + res.status(400).send('Missing required query param: wallet') + return + } + if (!isAddress(wallet)) { + res.status(400).send('Invalid wallet address') + return + } + const chainIdQuery = req.query.chainId + let chainId: number | undefined + if (chainIdQuery !== undefined) { + chainId = Number(chainIdQuery) + if (Number.isNaN(chainId)) { + res.status(400).send('chainId must be a number') + return + } + } + const result = await new SearchAccessListHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet, + chainId, + caller: req.caller + }) + if (result.stream) { + const data = JSON.parse(await streamToString(result.stream as Readable)) + res.json(data) + } else { + res.status(result.status.httpStatus).send(result.status.error) + } + } +) + +accessListRoutes.get( + '/api/services/accesslists/:chainId/:contractAddress', + async (req: Request, res: Response): Promise => { + const result = await new GetAccessListHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId: Number(req.params.chainId), + contractAddress: req.params.contractAddress, + caller: req.caller + }) + if (result.stream) { + const data = JSON.parse(await streamToString(result.stream as Readable)) + res.json(data) + } else { + res.status(result.status.httpStatus).send(result.status.error) + } + } +) diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index ad4c0f3dc..0706f3cba 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -15,6 +15,7 @@ import { PolicyServerPassthroughRoute } from './policyServer.js' import { authRoutes } from './auth.js' import { adminConfigRoutes } from './adminConfig.js' import { persistentStorageRoutes } from './persistentStorage.js' +import { accessListRoutes } from './accessList.js' export * from './getOceanPeers.js' export * from './auth.js' @@ -64,6 +65,8 @@ httpRoutes.use(authRoutes) httpRoutes.use(adminConfigRoutes) // persistent storage routes httpRoutes.use(persistentStorageRoutes) +// access list routes +httpRoutes.use(accessListRoutes) export function getAllServiceEndpoints() { httpRoutes.stack.forEach(addMapping.bind(null, [])) diff --git a/src/test/integration/accessListEvents.test.ts b/src/test/integration/accessListEvents.test.ts new file mode 100644 index 000000000..ba73bd48b --- /dev/null +++ b/src/test/integration/accessListEvents.test.ts @@ -0,0 +1,438 @@ +import { expect } from 'chai' +import { JsonRpcProvider, Signer } from 'ethers' +import { homedir } from 'os' +import { Readable } from 'stream' +import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' } +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } +import { Database } from '../../components/database/index.js' +import { OceanIndexer } from '../../components/Indexer/index.js' +import { OceanNode } from '../../OceanNode.js' +import { RPCS } from '../../@types/blockchain.js' +import { + DEVELOPMENT_CHAIN_ID, + getOceanArtifactsAdresses, + getOceanArtifactsAdressesByChainId +} from '../../utils/address.js' +import { ENVIRONMENT_VARIABLES, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { getConfiguration } from '../../utils/config.js' +import { streamToString } from '../../utils/util.js' +import { + buildEnvOverrideConfig, + DEFAULT_TEST_TIMEOUT, + getMockSupportedNetworks, + OverrideEnvConfig, + setupEnvironment, + tearDownEnvironment +} from '../utils/utils.js' +import { deployAccessListContract, getContract } from '../utils/contracts.js' +import { waitForCondition } from './testUtils.js' +import { + GetAccessListHandler, + SearchAccessListHandler +} from '../../components/core/handler/accessListHandler.js' + +describe('********** AccessList event indexing', function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 4) + + let database: Database + let oceanNode: OceanNode + let provider: JsonRpcProvider + let owner: Signer + let factoryAddress: string + let indexer: OceanIndexer + const chainId = DEVELOPMENT_CHAIN_ID + const mockSupportedNetworks: RPCS = getMockSupportedNetworks() + let previousConfiguration: OverrideEnvConfig[] + + before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.RPCS, + ENVIRONMENT_VARIABLES.INDEXER_NETWORKS, + ENVIRONMENT_VARIABLES.PRIVATE_KEY, + ENVIRONMENT_VARIABLES.ADDRESS_FILE + ], + [ + JSON.stringify(mockSupportedNetworks), + JSON.stringify([DEVELOPMENT_CHAIN_ID]), + '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + ] + ) + ) + + const config = await getConfiguration(true) + database = await Database.init(config.dbConfig) + + const oldIndexer = OceanNode.getInstance(config, database).getIndexer() + if (oldIndexer) { + await oldIndexer.stopAllChainIndexers() + } + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) + let artifactsAddresses = getOceanArtifactsAdressesByChainId(DEVELOPMENT_CHAIN_ID) + if (!artifactsAddresses) { + artifactsAddresses = getOceanArtifactsAdresses().development + } + factoryAddress = artifactsAddresses.AccessListFactory + + provider = new JsonRpcProvider('http://127.0.0.1:8545') + owner = (await provider.getSigner(0)) as Signer + + // Skip historical replay: the hardhat chain accumulates AccessList events + // across test runs. Pin the indexer to the current head so it only sees + // events emitted by THIS suite. + const headBlock = await provider.getBlockNumber() + await database.indexer.update(chainId, headBlock) + + indexer = new OceanIndexer(database, config, oceanNode.blockchainRegistry) + oceanNode.addIndexer(indexer) + }) + + after(async () => { + if (oceanNode) await oceanNode.tearDownAll() + await tearDownEnvironment(previousConfiguration) + }) + + it('factory deploy with no initial users creates an indexed document', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'EmptyList', + 'EMPTY', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr, 'deployment failed').to.be.a('string') + + const doc: any = await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + expect(doc, 'document was not indexed in time').to.not.equal(null) + expect(doc.contractAddress).to.equal(deployedAddr!.toLowerCase()) + expect(doc.transferable).to.equal(false) + expect(Array.isArray(doc.users)).to.equal(true) + expect(doc.users.length).to.equal(0) + }) + + it('factory deploy with initial users records every AddressAdded', async () => { + const wallets = [ + await (await provider.getSigner(2)).getAddress(), + await (await provider.getSigner(3)).getAddress(), + await (await provider.getSigner(4)).getAddress() + ] + const tokenURIs = wallets.map(() => 'https://oceanprotocol.com/nft/') + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'PrefilledList', + 'PRE', + false, + await owner.getAddress(), + wallets, + tokenURIs + ) + expect(deployedAddr, 'deployment failed').to.be.a('string') + + const doc: any = await waitForCondition(async () => { + const d = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users && d.users.length === wallets.length ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + expect(doc, 'doc with all initial users not indexed in time').to.not.equal(null) + + const indexedWallets = doc.users.map((u: any) => u.wallet) + for (const w of wallets) { + expect(indexedWallets).to.include(w.toLowerCase()) + } + for (const u of doc.users) { + expect(u.tokenId).to.be.a('number') + expect(u.block).to.be.a('number').and.greaterThan(0) + expect(u.txId).to.be.a('string') + } + }) + + it('mint adds a user; burn removes by tokenId', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'MutableList', + 'MUT', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const accessListContract = getContract(deployedAddr!, AccessList.abi, owner) + const newWalletSigner = await provider.getSigner(5) + const newWallet = await newWalletSigner.getAddress() + + const mintTx = await accessListContract.mint(newWallet, 'https://example/nft') + await mintTx.wait() + + const docAfterMint: any = await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.some((u: any) => u.wallet === newWallet.toLowerCase()) + ? d + : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(docAfterMint).to.not.equal(null) + const minted = docAfterMint.users.find( + (u: any) => u.wallet === newWallet.toLowerCase() + ) + expect(minted).to.not.equal(undefined) + const { tokenId } = minted + + const burnTx = await accessListContract.burn(tokenId) + await burnTx.wait() + + const docAfterBurn: any = await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && !d.users.some((u: any) => u.tokenId === tokenId) ? d : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(docAfterBurn).to.not.equal(null) + expect(docAfterBurn.users.some((u: any) => u.tokenId === tokenId)).to.equal(false) + }) + + it('searchByWallet returns AccessLists containing the wallet', async () => { + const wallet = await (await provider.getSigner(6)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'SearchableList', + 'SCH', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const results = await database.accessList.searchByWallet(wallet, chainId) + return ( + results.find((r: any) => r.contractAddress === deployedAddr!.toLowerCase()) ?? + null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found in any access list').to.not.equal(null) + }) + + it('addUser is idempotent when the same tokenId is replayed', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'IdempotentList', + 'IDM', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const sameUser = { + wallet: '0x' + 'a'.repeat(40), + tokenId: 999, + block: 1, + txId: '0xdeadbeef' + } + + await database.accessList.addUser(chainId, deployedAddr!, sameUser) + await database.accessList.addUser(chainId, deployedAddr!, sameUser) + + const doc: any = await database.accessList.retrieve(chainId, deployedAddr!) + const matches = doc.users.filter((u: any) => u.tokenId === sameUser.tokenId) + expect(matches.length).to.equal(1) + }) + + it('transferable: true is recorded on the doc', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'TransferableList', + 'TRF', + true, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + const doc: any = await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + expect(doc).to.not.equal(null) + expect(doc.transferable).to.equal(true) + }) + + it('lastIndexedBlock advances after access list events', async () => { + const before = await database.indexer.retrieve(chainId) + const beforeBlock = before?.lastIndexedBlock ?? 0 + + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'CursorList', + 'CUR', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const after = await waitForCondition(async () => { + const cur = await database.indexer.retrieve(chainId) + return cur && cur.lastIndexedBlock > beforeBlock ? cur : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(after, 'indexer cursor did not advance').to.not.equal(null) + expect(after.lastIndexedBlock).to.be.greaterThan(beforeBlock) + }) + + it('GetAccessListHandler returns the indexed doc', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'HandlerGetList', + 'HGET', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const result = await new GetAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId, + contractAddress: deployedAddr! + }) + expect(result.status.httpStatus).to.equal(200) + expect(result.stream).to.not.equal(null) + const doc = JSON.parse(await streamToString(result.stream as Readable)) + expect(doc.contractAddress).to.equal(deployedAddr!.toLowerCase()) + }) + + it('GetAccessListHandler returns 404 for an unknown contract', async () => { + const result = await new GetAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId, + contractAddress: '0x' + 'd'.repeat(40) + }) + expect(result.status.httpStatus).to.equal(404) + expect(result.stream).to.equal(null) + }) + + it('SearchAccessListHandler without chainId returns matches across all chains', async () => { + const wallet = await (await provider.getSigner(9)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'CrossChainList', + 'CCL', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const result = await new SearchAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet + }) + if (result.status.httpStatus !== 200 || !result.stream) return null + const docs = JSON.parse(await streamToString(result.stream as Readable)) + return ( + docs.find((d: any) => d.contractAddress === deployedAddr!.toLowerCase()) ?? null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found via cross-chain handler').to.not.equal(null) + }) + + it('SearchAccessListHandler returns docs containing a wallet', async () => { + const wallet = await (await provider.getSigner(8)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'HandlerSearchList', + 'HSRC', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const result = await new SearchAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet, + chainId + }) + if (result.status.httpStatus !== 200 || !result.stream) return null + const docs = JSON.parse(await streamToString(result.stream as Readable)) + return ( + docs.find((d: any) => d.contractAddress === deployedAddr!.toLowerCase()) ?? null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found via handler').to.not.equal(null) + }) +}) diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 2272f7a79..fc6dca45b 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -146,7 +146,7 @@ describe('********** Trusted algorithms Flow', () => { // let's publish assets & algos it('should publish compute datasets & algos', async function () { - this.timeout(DEFAULT_TEST_TIMEOUT * 2) + this.timeout(DEFAULT_TEST_TIMEOUT * 4) publishedComputeDataset = await publishAsset( computeAssetWithNoAccess, publisherAccount @@ -156,7 +156,7 @@ describe('********** Trusted algorithms Flow', () => { oceanNode, publishedComputeDataset.ddo.id, EVENTS.METADATA_CREATED, - DEFAULT_TEST_TIMEOUT + DEFAULT_TEST_TIMEOUT * 2 ) // Fail the test if compute dataset DDO was not indexed - subsequent tests depend on it assert( @@ -169,7 +169,7 @@ describe('********** Trusted algorithms Flow', () => { oceanNode, publishedAlgoDataset.ddo.id, EVENTS.METADATA_CREATED, - DEFAULT_TEST_TIMEOUT + DEFAULT_TEST_TIMEOUT * 2 ) // Fail the test if algorithm DDO was not indexed - subsequent tests depend on it assert( @@ -191,7 +191,6 @@ describe('********** Trusted algorithms Flow', () => { expect(response.stream).to.be.instanceOf(Readable) computeEnvironments = await streamToObject(response.stream as Readable) - console.log('existing envs: ', computeEnvironments) // expect 1 OR + envs (1 if only docker free env is available) assert(computeEnvironments.length >= 1, 'Not enough compute envs') for (const computeEnvironment of computeEnvironments) { @@ -245,7 +244,6 @@ describe('********** Trusted algorithms Flow', () => { const resp = await new ComputeInitializeHandler(oceanNode).handle( initializeComputeTask ) - console.log(resp) assert(resp, 'Failed to get response') assert(resp.status.httpStatus === 400, 'Failed to get 400 response') assert( @@ -347,7 +345,6 @@ describe('********** Trusted algorithms Flow', () => { const resp = await new ComputeInitializeHandler(oceanNode).handle( initializeComputeTask ) - console.log(resp) assert(resp, 'Failed to get response') assert(resp.status.httpStatus === 200, 'Failed to get 200 response') assert(resp.stream, 'Failed to get stream') @@ -389,15 +386,20 @@ describe('********** Trusted algorithms Flow', () => { it('should start a compute job', async function () { this.timeout(DEFAULT_TEST_TIMEOUT * 10) // let's put funds in escrow & create an auth - let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) + const consumerAddress = await consumerAccount.getAddress() + escrowContract = new ethers.Contract( + initializeResponse.payment.escrowAddress, + EscrowJson.abi, + publisherAccount + ) + + let balance = await paymentTokenContract.balanceOf(consumerAddress) + if (BigInt(balance.toString()) === BigInt(0)) { const mintAmount = ethers.parseUnits('1000', 18) - const mintTx = await paymentTokenContract.mint( - await consumerAccount.getAddress(), - mintAmount - ) + const mintTx = await paymentTokenContract.mint(consumerAddress, mintAmount) await mintTx.wait() - balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) + balance = await paymentTokenContract.balanceOf(consumerAddress) } assert(BigInt(balance.toString()) > BigInt(0), 'Consumer has no Ocean tokens') const approveTx = await paymentTokenContract @@ -418,10 +420,11 @@ describe('********** Trusted algorithms Flow', () => { 10 ) await authorizeTx.wait() + const locks = await oceanNode.escrow.getLocks( DEVELOPMENT_CHAIN_ID, paymentToken, - await consumerAccount.getAddress(), + consumerAddress, firstEnv.consumerAddress ) @@ -443,14 +446,14 @@ describe('********** Trusted algorithms Flow', () => { const nonce = Date.now().toString() const messageHashBytes = createHashForSignature( - await consumerAccount.getAddress(), + consumerAddress, nonce, PROTOCOL_COMMANDS.COMPUTE_START ) const signature = await safeSign(consumerAccount, messageHashBytes) const startComputeTask: PaidComputeStartCommand = { command: PROTOCOL_COMMANDS.COMPUTE_START, - consumerAddress: await consumerAccount.getAddress(), + consumerAddress, signature, nonce, environment: firstEnv.id, @@ -479,7 +482,7 @@ describe('********** Trusted algorithms Flow', () => { const auth = await oceanNode.escrow.getAuthorizations( DEVELOPMENT_CHAIN_ID, paymentToken, - await consumerAccount.getAddress(), + consumerAddress, firstEnv.consumerAddress ) assert(auth.length > 0, 'Should have authorization') @@ -491,11 +494,21 @@ describe('********** Trusted algorithms Flow', () => { BigInt(auth[0].maxLockCounts.toString()) > BigInt(0), ' Should have maxLockCounts in auth' ) - const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) - console.log(`response: ${response.status.httpStatus}`) - console.log(`response: ${JSON.stringify(response)}`) + const environmentHash = firstEnv.id.slice(0, firstEnv.id.indexOf('-')) + const engine = await oceanNode.getC2DEngines().getC2DByHash(environmentHash) + const originalCreateLock = engine.escrow.createLock.bind(engine.escrow) + engine.escrow.createLock = () => Promise.resolve(`0x${'1'.padStart(64, '0')}`) + let response + try { + response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + } finally { + engine.escrow.createLock = originalCreateLock + } assert(response, 'Failed to get response') - assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert( + response.status.httpStatus === 200, + `Expected 200, got ${response.status.httpStatus}: ${response.status?.error ?? ''}` + ) assert(response.stream, 'Failed to get stream') expect(response.stream).to.be.instanceOf(Readable) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index bd61f9c54..79af02220 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -169,10 +169,27 @@ describe('********** Compute', () => { let algoDDO: any let datasetDDO: any let artifactsAddresses: any + let testAddressFile: string let initializeResponse: ProviderComputeInitializeResults before(async () => { - artifactsAddresses = getOceanArtifactsAdresses() + const defaultTestAddressFile = `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + // eslint-disable-next-line security/detect-non-literal-fs-filename + if (existsSync(defaultTestAddressFile)) { + // eslint-disable-next-line security/detect-non-literal-fs-filename + artifactsAddresses = JSON.parse(await fsp.readFile(defaultTestAddressFile, 'utf8')) + } else { + artifactsAddresses = getOceanArtifactsAdresses() + } + if (artifactsAddresses?.development?.EnterpriseEscrow) { + delete artifactsAddresses.development.EnterpriseEscrow + testAddressFile = path.join( + tmpdir(), + `ocean-node-test-addresses-${Date.now()}.json` + ) + // eslint-disable-next-line security/detect-non-literal-fs-filename + await fsp.writeFile(testAddressFile, JSON.stringify(artifactsAddresses)) + } paymentToken = artifactsAddresses.development.Ocean previousConfiguration = await setupEnvironment( TEST_ENV_CONFIG_FILE, @@ -190,7 +207,7 @@ describe('********** Compute', () => { JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), - `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, + testAddressFile || defaultTestAddressFile, '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + @@ -240,6 +257,9 @@ describe('********** Compute', () => { after(async () => { await oceanNode.tearDownAll() await tearDownEnvironment(previousConfiguration) + if (testAddressFile) { + await fsp.rm(testAddressFile, { force: true }) + } }) it('Sets up compute envs', () => { assert(oceanNode, 'Failed to instantiate OceanNode') @@ -462,8 +482,11 @@ describe('********** Compute', () => { assert(resultParsed.providerFee.validUntil, 'algorithm validUntil does not exist') assert(result.datasets[0].validOrder === false, 'incorrect validOrder') // expect false because tx id was not provided and no start order was called before assert(result.payment, ' Payment structure does not exists') + const expectedEscrowAddress = + oceanNode.escrow.getEscrowContractAddressForChain(DEVELOPMENT_CHAIN_ID) + assert(expectedEscrowAddress, 'Expected escrow address does not exist') assert( - result.payment.escrowAddress === artifactsAddresses.development.Escrow, + result.payment.escrowAddress.toLowerCase() === expectedEscrowAddress.toLowerCase(), 'Incorrect escrow address' ) assert(result.payment.payee === firstEnv.consumerAddress, 'Incorrect payee address') @@ -688,6 +711,11 @@ describe('********** Compute', () => { }) it('should start a compute job with output to URL storage at 172.15.0.7', async () => { // deposit funds and create auth in escrow + escrowContract = new ethers.Contract( + initializeResponse.payment.escrowAddress, + EscrowJson.abi, + publisherAccount + ) let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) if (BigInt(balance.toString()) === BigInt(0)) { const mintAmount = ethers.parseUnits('1000', 18) @@ -880,6 +908,11 @@ describe('********** Compute', () => { it('should start a compute job with maxed resources', async function () { this.timeout(130_000) // waitForAllJobsToFinish can take up to 120s await waitForAllJobsToFinish(oceanNode) + escrowContract = new ethers.Contract( + initializeResponse.payment.escrowAddress, + EscrowJson.abi, + publisherAccount + ) let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) if (BigInt(balance.toString()) === BigInt(0)) { console.log('Minting') @@ -3027,10 +3060,28 @@ describe('********** Compute Access Restrictions', () => { let escrowContract: any let paymentTokenContract: any let artifactsAddresses: any + let testAddressFile: string before(async function () { this.timeout(DEFAULT_TEST_TIMEOUT * 2) - artifactsAddresses = getOceanArtifactsAdresses() + const defaultTestAddressFile = `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + // eslint-disable-next-line security/detect-non-literal-fs-filename + if (existsSync(defaultTestAddressFile)) { + // eslint-disable-next-line security/detect-non-literal-fs-filename + const addressFileContent = await fsp.readFile(defaultTestAddressFile, 'utf8') + artifactsAddresses = JSON.parse(addressFileContent) + } else { + artifactsAddresses = getOceanArtifactsAdresses() + } + if (artifactsAddresses?.development?.EnterpriseEscrow) { + delete artifactsAddresses.development.EnterpriseEscrow + testAddressFile = path.join( + tmpdir(), + `ocean-node-test-addresses-${Date.now()}.json` + ) + // eslint-disable-next-line security/detect-non-literal-fs-filename + await fsp.writeFile(testAddressFile, JSON.stringify(artifactsAddresses)) + } paymentToken = artifactsAddresses.development.Ocean previousConfiguration = await setupEnvironment( TEST_ENV_CONFIG_FILE, @@ -3046,7 +3097,7 @@ describe('********** Compute Access Restrictions', () => { JSON.stringify(mockSupportedNetworks), JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', - `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, + testAddressFile || defaultTestAddressFile, '[{"socketPath":"/var/run/docker.sock","paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + @@ -3074,16 +3125,15 @@ describe('********** Compute Access Restrictions', () => { const provider = new JsonRpcProvider('http://127.0.0.1:8545') const publisherAccount = (await provider.getSigner(0)) as Signer consumerAccount = (await provider.getSigner(1)) as Signer - escrowContract = new ethers.Contract( - artifactsAddresses.development.Escrow, - EscrowJson.abi, - consumerAccount - ) paymentTokenContract = new ethers.Contract( paymentToken, OceanToken.abi, publisherAccount ) + const escrowAddress = + oceanNode.escrow.getEscrowContractAddressForChain(DEVELOPMENT_CHAIN_ID) + assert(escrowAddress, 'Expected escrow address does not exist') + escrowContract = new ethers.Contract(escrowAddress, EscrowJson.abi, consumerAccount) // Get the Docker engine const c2dEngines = oceanNode.getC2DEngines() @@ -3107,6 +3157,9 @@ describe('********** Compute Access Restrictions', () => { after(async () => { await oceanNode.tearDownAll() await tearDownEnvironment(previousConfiguration) + if (testAddressFile) { + await fsp.rm(testAddressFile, { force: true }) + } }) it('should transition job to JobSettle status when PublishingResults completes', async function () { @@ -3247,7 +3300,7 @@ describe('********** Compute Access Restrictions', () => { const approveTx = await paymentTokenContract .connect(consumerAccount) - .approve(artifactsAddresses.development.Escrow, balance) + .approve(await escrowContract.getAddress(), balance) await approveTx.wait() const depositTx = await escrowContract diff --git a/src/test/integration/testUtils.ts b/src/test/integration/testUtils.ts index cc13f4d1c..539893cc4 100644 --- a/src/test/integration/testUtils.ts +++ b/src/test/integration/testUtils.ts @@ -84,6 +84,28 @@ export const waitToIndex = async ( }) } +/** + * Polls a predicate until it returns truthy or the timeout elapses. + * Returns the predicate's truthy value, or null on timeout. + */ +export async function waitForCondition( + predicate: () => Promise, + timeoutMs: number = DEFAULT_TEST_TIMEOUT, + intervalMs: number = 1000 +): Promise { + const start = Date.now() + while (Date.now() - start < timeoutMs) { + try { + const result = await predicate() + if (result) return result + } catch (_e) { + // ignore and retry + } + await new Promise((resolve) => setTimeout(resolve, intervalMs)) + } + return null +} + export const deleteAsset = async (did: string, database: Database): Promise => { try { return await database.ddo.delete(did) diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 10b2407c6..257bc24dd 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -44,7 +44,9 @@ export const PROTOCOL_COMMANDS = { PERSISTENT_STORAGE_LIST_FILES: 'persistentStorageListFiles', PERSISTENT_STORAGE_UPLOAD_FILE: 'persistentStorageUploadFile', PERSISTENT_STORAGE_GET_FILE_OBJECT: 'persistentStorageGetFileObject', - PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile' + PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile', + GET_ACCESS_LIST: 'getAccessList', + SEARCH_ACCESS_LIST: 'searchAccessList' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -90,7 +92,9 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, - PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + PROTOCOL_COMMANDS.GET_ACCESS_LIST, + PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST ] export const MetadataStates = { @@ -115,7 +119,10 @@ export const EVENTS = { DISPENSER_ACTIVATED: 'DispenserActivated', DISPENSER_DEACTIVATED: 'DispenserDeactivated', EXCHANGE_ACTIVATED: 'ExchangeActivated', - EXCHANGE_DEACTIVATED: 'ExchangeDeactivated' + EXCHANGE_DEACTIVATED: 'ExchangeDeactivated', + ADDRESS_ADDED: 'AddressAdded', + ADDRESS_REMOVED: 'AddressRemoved', + NEW_ACCESS_LIST: 'NewAccessList' } export const INDEXER_CRAWLING_EVENTS = { @@ -185,6 +192,18 @@ export const EVENT_HASHES: Hashes = { '0x03da9148e1de78fba22de63c573465562ebf6ef878a1d3ea83790a560229984c': { type: EVENTS.EXCHANGE_DEACTIVATED, text: 'ExchangeDeactivated(bytes32,address)' + }, + '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430': { + type: EVENTS.ADDRESS_ADDED, + text: 'AddressAdded(address,uint256)' + }, + '0xb1e731889e7185f2cc895a86c70cded99d77ab8ecea58ab5abe5d43b084f51ae': { + type: EVENTS.ADDRESS_REMOVED, + text: 'AddressRemoved(uint256)' + }, + '0xd65bc8e3024bbad886df74eea79b6e118b7fbcffe1f3f98054e5a6b98dc83891': { + type: EVENTS.NEW_ACCESS_LIST, + text: 'NewAccessList(address,address)' } } @@ -536,6 +555,11 @@ export const ENVIRONMENT_VARIABLES: Record = { name: 'PERSISTENT_STORAGE', value: process.env.PERSISTENT_STORAGE, required: false + }, + C2D_DOWNLOAD_TIMEOUT: { + name: 'C2D_DOWNLOAD_TIMEOUT', + value: process.env.C2D_DOWNLOAD_TIMEOUT, + required: false } } export const CONNECTION_HISTORY_DELETE_THRESHOLD = 300