Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ jobs:
sleep 10
[ -f "$HOME/.ocean/ocean-contracts/artifacts/ready" ] && break
done
- name: Use regular Escrow for system tests
run: |
SYSTEM_TEST_ADDRESS_FILE="${RUNNER_TEMP}/address.system-tests.json"
jq 'del(.development.EnterpriseEscrow)' "${HOME}/.ocean/ocean-contracts/artifacts/address.json" > "${SYSTEM_TEST_ADDRESS_FILE}"
echo "ADDRESS_FILE=${SYSTEM_TEST_ADDRESS_FILE}" >> "$GITHUB_ENV"
- name: Configure escrow timeout for system tests
run: echo "ESCROW_CLAIM_TIMEOUT=$(date +%s)" >> "$GITHUB_ENV"
- name: Use separate consumer key for ocean-cli paid compute system test
working-directory: ${{ github.workspace }}/ocean-cli
run: |
perl -0pi -e 's/0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58/0x1d751ded5a32226054cd2e71261039b65afb9ee1c746d055dd699b1150a5befc/g' test/paidComputeFlow.test.ts
grep -n "PRIVATE_KEY" test/paidComputeFlow.test.ts

- name: docker logs
run: docker logs ocean-contracts-1 && docker logs ocean-typesense-1
Expand Down
2 changes: 2 additions & 0 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/

## Compute

- `C2D_DOWNLOAD_TIMEOUT`: Timeout (in seconds) for pulling the algorithm docker image during a C2D job. If the pull exceeds this timeout, the job fails with `PullImageFailed` instead of getting stuck. Defaults to `900` (15 minutes). Example: `900`

The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable is used to configure Docker-based compute environments in Ocean Node. This guide will walk you through the options available for defining `DOCKER_COMPUTE_ENVIRONMENTS` and how to set it up correctly. For configuring compute environments and setting prices for each resource (including pricing units and examples), see [Compute pricing](compute-pricing.md).

Example Configuration
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ocean-node",
"version": "3.1.0",
"version": "3.1.1",
"description": "Ocean Node is used to run all core services in the Ocean stack",
"author": "Ocean Protocol Foundation",
"license": "Apache-2.0",
Expand Down Expand Up @@ -73,7 +73,7 @@
"@libp2p/upnp-nat": "^4.0.9",
"@libp2p/websockets": "^10.1.2",
"@multiformats/multiaddr": "^12.2.3",
"@oceanprotocol/contracts": "^2.6.0",
"@oceanprotocol/contracts": "^2.7.0",
"@oceanprotocol/ddo-js": "^0.2.0",
"axios": "^1.15.0",
"base58-js": "^2.0.0",
Expand Down
7 changes: 7 additions & 0 deletions src/@types/AccessList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@
export interface AccessList {
[chainId: string]: string[]
}

export interface AccessListUser {
wallet: string
tokenId: number
block: number
txId: string
}
10 changes: 10 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ export interface FindPeerCommand extends Command {
export interface GetP2PPeersCommand extends Command {}
export interface GetP2PNetworkStatsCommand extends Command {}

export interface GetAccessListCommand extends Command {
chainId: number
contractAddress: string
}

export interface SearchAccessListCommand extends Command {
wallet: string
chainId?: number
}

export interface SignedCommand extends Command {
nonce: string
signature: string
Expand Down
11 changes: 10 additions & 1 deletion src/components/Indexer/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import {
ExchangeActivatedEventProcessor,
ExchangeDeactivatedEventProcessor,
ExchangeRateChangedEventProcessor,
NewAccessListEventProcessor,
AddressAddedEventProcessor,
AddressRemovedEventProcessor,
ProcessorConstructor
} from './processors/index.js'
import { findEventByKey } from './utils.js'
Expand All @@ -36,7 +39,10 @@ const EVENT_PROCESSOR_MAP: Record<string, ProcessorConstructor> = {
[EVENTS.EXCHANGE_CREATED]: ExchangeCreatedEventProcessor,
[EVENTS.EXCHANGE_ACTIVATED]: ExchangeActivatedEventProcessor,
[EVENTS.EXCHANGE_DEACTIVATED]: ExchangeDeactivatedEventProcessor,
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor,
[EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor,
[EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor,
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor
}

const processorInstances = new Map<string, BaseEventProcessor>()
Expand Down Expand Up @@ -83,6 +89,7 @@ export const processChunkLogs = async (
(allowedValidatorsList && Object.keys(allowedValidatorsList).length > 0)
for (const log of logs) {
const event = findEventByKey(log.topics[0])

if (event && Object.values(EVENTS).includes(event.type)) {
// only log & process the ones we support
INDEXER_LOGGER.logMessage(
Expand Down Expand Up @@ -188,6 +195,7 @@ export const processChunkLogs = async (
}
}
}

return storeEvents
}

Expand All @@ -208,6 +216,7 @@ export const processBlocks = async (
blockLogs && blockLogs.length > 0
? await processChunkLogs(blockLogs, signer, provider, network, config)
: []

return {
lastBlock: lastIndexedBlock + count,
foundEvents: events
Expand Down
48 changes: 48 additions & 0 deletions src/components/Indexer/processors/AddressAddedEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const accessListInterface = new Interface(AccessList.abi)

export class AddressAddedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = accessListInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const wallet = decoded.args[0].toString().toLowerCase()
const tokenId = Number(decoded.args[1])
const contractAddress = event.address.toLowerCase()

const { accessList } = await this.getDatabase()
const result = await accessList.addUser(chainId, contractAddress, {
wallet,
tokenId,
block: event.blockNumber,
txId: event.transactionHash
})

INDEXER_LOGGER.logMessage(
`[AddressAdded] ${wallet} (tokenId=${tokenId}) added to ${contractAddress} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing AddressAdded event: ${err.message}`,
true
)
return null
}
}
}
46 changes: 46 additions & 0 deletions src/components/Indexer/processors/AddressRemovedEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const accessListInterface = new Interface(AccessList.abi)

export class AddressRemovedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = accessListInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const tokenId = Number(decoded.args[0])
const contractAddress = event.address.toLowerCase()

const { accessList } = await this.getDatabase()
const result = await accessList.removeUserByTokenId(
chainId,
contractAddress,
tokenId
)

INDEXER_LOGGER.logMessage(
`[AddressRemoved] tokenId=${tokenId} removed from ${contractAddress} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing AddressRemoved event: ${err.message}`,
true
)
return null
}
}
}
74 changes: 74 additions & 0 deletions src/components/Indexer/processors/NewAccessListEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' }
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const factoryInterface = new Interface(AccessListFactory.abi)

export class NewAccessListEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = factoryInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const contractAddress = decoded.args[0].toString().toLowerCase()

let transferable = false
let name: string | undefined
let symbol: string | undefined
try {
const accessListContract = new ethers.Contract(
contractAddress,
AccessList.abi,
provider
)
const [transferableRaw, nameRaw, symbolRaw] = await Promise.all([
accessListContract.transferable(),
accessListContract.name(),
accessListContract.symbol()
])
transferable = Boolean(transferableRaw)
name = nameRaw
symbol = symbolRaw
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_WARN,
`Failed to read on-chain metadata for ${contractAddress}: ${err.message}`
)
}

const { accessList } = await this.getDatabase()
const result = await accessList.create(
chainId,
contractAddress,
transferable,
event.blockNumber,
event.transactionHash,
name,
symbol
)

INDEXER_LOGGER.logMessage(
`[NewAccessList] Indexed access list ${contractAddress} on chain ${chainId} (name=${name}, symbol=${symbol}, transferable=${transferable})`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing NewAccessList event: ${err.message}`,
true
)
return null
}
}
}
3 changes: 3 additions & 0 deletions src/components/Indexer/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export * from './MetadataEventProcessor.js'
export * from './MetadataStateEventProcessor.js'
export * from './OrderReusedEventProcessor.js'
export * from './OrderStartedEventProcessor.js'
export * from './NewAccessListEventProcessor.js'
export * from './AddressAddedEventProcessor.js'
export * from './AddressRemovedEventProcessor.js'
export * from './BaseProcessor.js'

export type ProcessorConstructor = new (
Expand Down
11 changes: 11 additions & 0 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
} from 'fs'
import { pipeline } from 'node:stream/promises'
import { CORE_LOGGER } from '../../utils/logging/common.js'
import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js'
import { AssetUtils } from '../../utils/asset.js'
import { FindDdoHandler } from '../core/handler/ddoHandler.js'
import { OceanNode } from '../../OceanNode.js'
Expand Down Expand Up @@ -1695,6 +1696,15 @@ export class C2DEngineDocker extends C2DEngine {
}
}

private getImagePullTimeoutMs(): number {
const raw = ENVIRONMENT_VARIABLES.C2D_DOWNLOAD_TIMEOUT.value
const parsed = raw ? parseInt(raw, 10) : NaN
if (Number.isFinite(parsed) && parsed > 0) {
return parsed * 1000
}
return 15 * 60 * 1000
}

// eslint-disable-next-line require-await
private async processJob(job: DBComputeJob) {
CORE_LOGGER.info(
Expand Down Expand Up @@ -2547,6 +2557,7 @@ export class C2DEngineDocker extends C2DEngine {
`Using docker registry auth for ${registry} to pull image ${job.containerImage}`
)
}
pullOptions.abortSignal = AbortSignal.timeout(this.getImagePullTimeoutMs())

const pullStream = await this.docker.pull(job.containerImage, pullOptions)
await new Promise((resolve, reject) => {
Expand Down
Loading
Loading