Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ jobs:
with:
repository: 'oceanprotocol/ocean.js'
path: 'ocean.js'
ref: feature/refactor_signatures
ref: main
- name: Build ocean-js
working-directory: ${{ github.workspace }}/ocean.js
run: |
Expand Down
8 changes: 7 additions & 1 deletion src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,14 @@ export abstract class C2DEngine {
for (const request of activeResources) {
let envResource = this.getResource(env.resources, request.id)
if (!envResource) throw new Error(`No such resource ${request.id}`)
if (envResource.total - envResource.inUse < request.amount)
if (envResource.total - envResource.inUse < request.amount) {
console.log({
totalResources: envResource.total,
inUseResources: envResource.inUse,
requested: request.amount
})
throw new Error(`Not enough available ${request.id}`)
}
if (isFree) {
if (!env.free) throw new Error(`No free resources`)
envResource = this.getResource(env.free?.resources, request.id)
Expand Down
2 changes: 1 addition & 1 deletion src/components/database/DatabaseFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
TypesenseIndexerDatabase,
TypesenseLogDatabase,
TypesenseOrderDatabase
} from './TypenseDatabase.js'
} from './TypesenseDatabase.js'
import { elasticSchemas } from './ElasticSchemas.js'
import { IDdoStateQuery } from '../../@types/DDO/IDdoStateQuery.js'
import { TypesenseDdoStateQuery } from './TypesenseDdoStateQuery.js'
Expand Down
5 changes: 3 additions & 2 deletions src/components/database/ElasticSearchDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ElasticsearchSchema } from './ElasticSchemas.js'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'

import { DDOManager } from '@oceanprotocol/ddo-js'
import { validateDDO } from '../../utils/asset.js'
import { DDOManager } from '@oceanprotocol/ddo-js'

export class ElasticsearchIndexerDatabase extends AbstractIndexerDatabase {
private client: Client
Expand Down Expand Up @@ -76,7 +76,8 @@ export class ElasticsearchIndexerDatabase extends AbstractIndexerDatabase {
try {
const result = await this.client.get({
index: this.index,
id: network.toString()
id: network.toString(),
refresh: true
})
return result._source
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {
AbstractLogDatabase,
AbstractOrderDatabase
} from './BaseDatabase.js'
import { DDOManager } from '@oceanprotocol/ddo-js'
import { validateDDO } from '../../utils/asset.js'
import { DDOManager } from '@oceanprotocol/ddo-js'

export class TypesenseOrderDatabase extends AbstractOrderDatabase {
private provider: Typesense
Expand Down Expand Up @@ -372,6 +372,7 @@ export class TypesenseDdoDatabase extends AbstractDdoDatabase {
getDDOSchema(ddo: Record<string, any>): TypesenseSchema {
// Find the schema based on the DDO version OR use the short DDO schema when state !== 0
let schemaName: string

const ddoInstance = DDOManager.getDDOClass(ddo)
const ddoData = ddoInstance.getDDOData()
if ('indexedMetadata' in ddoData && ddoData?.indexedMetadata?.nft.state !== 0) {
Expand Down
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,10 @@ if (config.hasHttp) {
// Call the function to schedule the cron job to delete old logs
scheduleCronJobs(oceanNode)
}

process.on('unhandledRejection', (reason) => {
console.log({ reason })
})
process.on('uncaughtException', (reason) => {
console.log({ reason })
})
184 changes: 106 additions & 78 deletions src/test/integration/compute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,13 @@ describe('Compute', () => {
)
config = await getConfiguration(true)
dbconn = await Database.init(config.dbConfig)
oceanNode = await OceanNode.getInstance(
config,
dbconn,
null,
null,
null,
null,
null,
true
)

const staleJobs = await dbconn.c2d.getRunningJobs()
for (const job of staleJobs) {
await dbconn.c2d.deleteJob(job.jobId)
}

oceanNode = OceanNode.getInstance(config, dbconn, null, null, null, null, null, true)
indexer = new OceanIndexer(
dbconn,
config.indexingNetworks,
Expand Down Expand Up @@ -641,62 +638,43 @@ describe('Compute', () => {
assert(!response.stream, 'We should not have a stream')
})

it('should start a compute job with maxed resources', async () => {
// first check escrow auth

let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress())
let funds = await oceanNode.escrow.getUserAvailableFunds(
it('should fail to start a compute job without escrow funds', async () => {
// ensure clean escrow state: no funds, no auths, no locks
const funds = await oceanNode.escrow.getUserAvailableFunds(
DEVELOPMENT_CHAIN_ID,
await consumerAccount.getAddress(),
paymentToken
)
// make sure we have 0 funds
if (BigInt(funds.toString()) > BigInt(0)) {
await escrowContract
.connect(consumerAccount)
.withdraw([initializeResponse.payment.token], [funds])
}
let auth = await oceanNode.escrow.getAuthorizations(
const auth = await oceanNode.escrow.getAuthorizations(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
if (auth.length > 0) {
// remove any auths
await escrowContract
.connect(consumerAccount)
.authorize(initializeResponse.payment.token, firstEnv.consumerAddress, 0, 0, 0)
}
let locks = await oceanNode.escrow.getLocks(
const locks = await oceanNode.escrow.getLocks(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)

if (locks.length > 0) {
// cancel all locks
for (const lock of locks) {
try {
await escrowContract
.connect(consumerAccount)
.cancelExpiredLock(
lock.jobId,
lock.token,
lock.payer,
firstEnv.consumerAddress
)
} catch (e) {}
}
locks = await oceanNode.escrow.getLocks(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
for (const lock of locks) {
try {
await escrowContract
.connect(consumerAccount)
.cancelExpiredLock(lock.jobId, lock.token, lock.payer, firstEnv.consumerAddress)
} catch (e) {}
}
const locksBefore = locks.length

const nonce = Date.now().toString()
const messageHashBytes = createHashForSignature(
await consumerAccount.getAddress(),
Expand Down Expand Up @@ -738,15 +716,17 @@ describe('Compute', () => {
additionalViewers: [await additionalViewerAccount.getAddress()],
maxJobDuration: computeJobDuration,
resources: re
// additionalDatasets?: ComputeAsset[]
// output?: ComputeOutput
}
// it should fail, because we don't have funds & auths in escrow
let response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask)
const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask)
assert(response.status.httpStatus === 400, 'Failed to get 400 response')
assert(!response.stream, 'We should not have a stream')
// let's put funds in escrow & create an auth
balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress())
})

it('should start a compute job with maxed resources', async () => {
// deposit funds and create auth in escrow
const balance = await paymentTokenContract.balanceOf(
await consumerAccount.getAddress()
)
await paymentTokenContract
.connect(consumerAccount)
.approve(initializeResponse.payment.escrowAddress, balance)
Expand All @@ -762,20 +742,13 @@ describe('Compute', () => {
initializeResponse.payment.minLockSeconds,
10
)
auth = await oceanNode.escrow.getAuthorizations(

const auth = await oceanNode.escrow.getAuthorizations(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
const authBefore = auth[0]
funds = await oceanNode.escrow.getUserAvailableFunds(
DEVELOPMENT_CHAIN_ID,
await consumerAccount.getAddress(),
paymentToken
)
const fundsBefore = funds
assert(BigInt(funds.toString()) > BigInt(0), 'Should have funds in escrow')
assert(auth.length > 0, 'Should have authorization')
assert(
BigInt(auth[0].maxLockedAmount.toString()) > BigInt(0),
Expand All @@ -785,19 +758,68 @@ describe('Compute', () => {
BigInt(auth[0].maxLockCounts.toString()) > BigInt(0),
' Should have maxLockCounts in auth'
)
const nonce2 = Date.now().toString()
const messageHashBytes2 = createHashForSignature(
const authBefore = auth[0]

const fundsBefore = await oceanNode.escrow.getUserAvailableFunds(
DEVELOPMENT_CHAIN_ID,
await consumerAccount.getAddress(),
paymentToken
)
assert(BigInt(fundsBefore.toString()) > BigInt(0), 'Should have funds in escrow')

const locksBefore = (
await oceanNode.escrow.getLocks(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
).length

const nonce = Date.now().toString()
const messageHashBytes = createHashForSignature(
await consumerAccount.getAddress(),
nonce2,
nonce,
PROTOCOL_COMMANDS.COMPUTE_START
)
const signature2 = await safeSign(consumerAccount, messageHashBytes2)
response = await new PaidComputeStartHandler(oceanNode).handle({
...startComputeTask,
nonce: nonce2,
signature: signature2
})
console.log(response)
const signature = await safeSign(consumerAccount, messageHashBytes)
const re = []
for (const res of firstEnv.resources) {
re.push({ id: res.id, amount: res.total })
}
const startComputeTask: PaidComputeStartCommand = {
command: PROTOCOL_COMMANDS.COMPUTE_START,
consumerAddress: await consumerAccount.getAddress(),
signature,
nonce,
environment: firstEnv.id,
datasets: [
{
documentId: publishedComputeDataset.ddo.id,
serviceId: publishedComputeDataset.ddo.services[0].id,
transferTxId: datasetOrderTxId
}
],
algorithm: {
documentId: publishedAlgoDataset.ddo.id,
serviceId: publishedAlgoDataset.ddo.services[0].id,
transferTxId: algoOrderTxId,
meta: publishedAlgoDataset.ddo.metadata.algorithm
},
output: {},
payment: {
chainId: DEVELOPMENT_CHAIN_ID,
token: paymentToken
},
metadata: {
key: 'value'
},
additionalViewers: [await additionalViewerAccount.getAddress()],
maxJobDuration: computeJobDuration,
resources: re
}
const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask)
console.log({ response })
assert(response, 'Failed to get response')
assert(response.status.httpStatus === 200, 'Failed to get 200 response')
assert(response.stream, 'Failed to get stream')
Expand All @@ -807,29 +829,35 @@ describe('Compute', () => {
// eslint-disable-next-line prefer-destructuring
jobId = jobs[0].jobId
console.log('**** Started compute job with id: ', jobId)
// check escrow
funds = await oceanNode.escrow.getUserAvailableFunds(

// check escrow state changed after job start
const fundsAfter = await oceanNode.escrow.getUserAvailableFunds(
DEVELOPMENT_CHAIN_ID,
await consumerAccount.getAddress(),
paymentToken
)
assert(fundsBefore > funds, 'We should have less funds')
locks = await oceanNode.escrow.getLocks(
assert(fundsBefore > fundsAfter, 'We should have less funds')

const locksAfter = await oceanNode.escrow.getLocks(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
assert(locks.length > locksBefore, 'We should have locks')
auth = await oceanNode.escrow.getAuthorizations(
assert(locksAfter.length > locksBefore, 'We should have locks')

const authAfter = await oceanNode.escrow.getAuthorizations(
DEVELOPMENT_CHAIN_ID,
paymentToken,
await consumerAccount.getAddress(),
firstEnv.consumerAddress
)
assert(auth[0].currentLocks > authBefore.currentLocks, 'We should have running jobs')
assert(
auth[0].currentLockedAmount > authBefore.currentLockedAmount,
authAfter[0].currentLocks > authBefore.currentLocks,
'We should have running jobs'
)
assert(
authAfter[0].currentLockedAmount > authBefore.currentLockedAmount,
'We should have higher currentLockedAmount'
)
})
Expand Down Expand Up @@ -2124,7 +2152,7 @@ describe('Compute Access Restrictions', () => {
)
config = await getConfiguration(true)
dbconn = await Database.init(config.dbConfig)
oceanNode = await OceanNode.getInstance(
oceanNode = OceanNode.getInstance(
config,
dbconn,
null,
Expand Down Expand Up @@ -2315,7 +2343,7 @@ describe('Compute Access Restrictions', () => {
)
config = await getConfiguration(true)
dbconn = await Database.init(config.dbConfig)
oceanNode = await OceanNode.getInstance(
oceanNode = OceanNode.getInstance(
config,
dbconn,
null,
Expand Down Expand Up @@ -2445,7 +2473,7 @@ describe('Compute Access Restrictions', () => {
)
config = await getConfiguration(true)
dbconn = await Database.init(config.dbConfig)
oceanNode = await OceanNode.getInstance(
oceanNode = OceanNode.getInstance(
config,
dbconn,
null,
Expand Down Expand Up @@ -2575,7 +2603,7 @@ describe('Compute Access Restrictions', () => {
const now = Math.floor(Date.now() / 1000)
const expiry = 3500

const providerAddress = await (await oceanNode.getKeyManager()).getEthAddress()
const providerAddress = oceanNode.getKeyManager().getEthAddress()

// Clean up existing locks and authorizations first
const locks = await oceanNode.escrow.getLocks(
Expand Down
Loading
Loading