diff --git a/src/plugins/contracts/api.js b/src/plugins/contracts/api.js index 2f869b6..66fab92 100644 --- a/src/plugins/contracts/api.js +++ b/src/plugins/contracts/api.js @@ -3,59 +3,29 @@ const logger = require('../../logger') const { encrypt } = require('ecies-geth') const { Implementation } = require('contracts-js') const { remove0xPrefix, add65BytesPrefix } = require('./helpers') -const { ContractEventsListener } = require('./events-listener') const ethereumWallet = require('ethereumjs-wallet').default /** * @param {import('web3').default} web3 - * @param {string} implementationAddress - * @param {string} [walletAddress] + * @param {string} contractId + * @param {string} walletAddress + * @returns {Promise} */ -async function _loadContractInstance( +async function getContract( web3, - implementationAddress, + contractId, walletAddress ) { try { - const implementationContract = Implementation(web3, implementationAddress) - const contract = await implementationContract.methods + const implementation = Implementation(web3, contractId) + const contract = await implementation.methods .getPublicVariables() .call() - const stats = await implementationContract.methods.getStats().call() - - const history = await implementationContract.methods - .getHistory('0', '100') - .call() - const buyerHistory = history - .filter((h) => { - return h[6] === walletAddress - }) - .map((h) => ({ - ...h, - id: implementationAddress, - })) - - const { _successCount: successCount, _failCount: failCount } = stats - - const { - _state: state, - _price: price, // cost to purchase the contract - _limit: limit, // max th provided - _speed: speed, // th/s of contract - _length: length, // duration of the contract in seconds - _startingBlockTimestamp: timestamp, // timestamp of the block at moment of purchase - _buyer: buyer, // wallet address of the purchasing party - _seller: seller, // wallet address of the selling party - _encryptedPoolData: encryptedPoolData, // encrypted data for pool target info, - _isDeleted: isDead, // check if contract is dead - _balance: balance, - _hasFutureTerms: hasFutureTerms, - _version: version, - } = contract + const stats = await implementation.methods.getStats().call() let futureTerms = null - if (walletAddress && hasFutureTerms && seller === walletAddress) { - const data = await implementationContract.methods.futureTerms().call() + if (walletAddress && contract._hasFutureTerms && contract._seller === walletAddress) { + const data = await implementation.methods.futureTerms().call() futureTerms = { price: data._price, speed: data._speed, @@ -66,28 +36,25 @@ async function _loadContractInstance( } return { - data: { - id: implementationAddress, - price, - speed, - length, - buyer, - seller, - timestamp, - state, - encryptedPoolData, - limit, - isDead, - balance, - stats: { - successCount, - failCount, - }, - hasFutureTerms, - futureTerms, - history: buyerHistory, - version, + id: contractId, + price: contract._price, + speed: contract._speed, + length: contract._length, + buyer: contract._buyer, + seller: contract._seller, + timestamp: contract._startingBlockTimestamp, + state: contract._state, + encryptedPoolData: contract._encryptedPoolData, + limit: contract._limit, + isDead: contract._isDeleted, + balance: contract._balance, + hasFutureTerms: contract._hasFutureTerms, + version: contract._version, + stats: { + successCount: stats._successCount, + failCount: stats._failCount, }, + futureTerms, } } catch (err) { logger.error( @@ -100,74 +67,26 @@ async function _loadContractInstance( /** * @param {import('web3').default} web3 - * @param {import('web3').default} web3Subscriptionable - * @param {import('contracts-js').LumerinContext} lumerin - * @param {import('contracts-js').CloneFactoryContext} cloneFactory - * @param {string[]} addresses + * @param {string} contractId * @param {string} walletAddress */ -async function getContracts( - web3, - web3Subscriptionable, - lumerin, - cloneFactory, - addresses, - walletAddress, - eventBus -) { - const chunkSize = 5 - const result = [] - for (let i = 0; i < addresses.length; i += chunkSize) { - const contracts = await Promise.all( - addresses - .slice(i, i + chunkSize) - .map((address) => - getContract( - web3, - web3Subscriptionable, - lumerin, - cloneFactory, - address, - walletAddress - ) - ) - ) - eventBus.emit('contract-updated', { - actives: contracts, +async function getContractHistory(web3, contractId, walletAddress) { + const implementation = Implementation(web3, contractId) + + const history = await implementation.methods + .getHistory('0', '100') + .call() + + const buyerHistory = history + .filter((h) => { + return h[6] === walletAddress }) - result.push(...contracts) - } - return result -} + .map((h) => ({ + ...h, + id: contractId, + })) -/** - * @param {import('web3').default} web3 - * @param {import('web3').default} web3Subscriptionable - * @param {import('contracts-js').LumerinContext} lumerin - * @param {string} contractId - * @param {string} walletAddress - */ -async function getContract( - web3, - web3Subscriptionable, - lumerin, - cloneFactory, - contractId, - walletAddress -) { - const contractEventsListener = ContractEventsListener.getInstance() - const contractInfo = await _loadContractInstance( - web3, - contractId, - walletAddress - ) - - contractEventsListener.addContract( - contractInfo.data.id, - Implementation(web3Subscriptionable, contractId), - walletAddress - ) - return contractInfo.data + return buyerHistory } /** @@ -188,7 +107,6 @@ function createContract(web3, cloneFactory) { } return async function (params) { - // const { gasPrice } = await plugins.wallet.getGasPrice() let { price, limit = 0, @@ -244,6 +162,7 @@ function createContract(web3, cloneFactory) { /** * @param {import('web3').default} web3 + * @param {import('contracts-js').CloneFactoryContext} cloneFactory */ function cancelContract(web3, cloneFactory) { if (!web3) { @@ -264,15 +183,15 @@ function cancelContract(web3, cloneFactory) { const marketplaceFee = await cloneFactory.methods.marketplaceFee().call() - const gas = await Implementation(web3, contractId) - .methods.setContractCloseOut(closeOutType) + const gas = await cloneFactory.methods + .setContractCloseout(contractId, closeOutType) .estimateGas({ from: walletAddress, value: marketplaceFee, }) - return await Implementation(web3, contractId) - .methods.setContractCloseOut(closeOutType) + return await cloneFactory.methods + .setContractCloseout(contractId, closeOutType) .send({ from: walletAddress, gas, @@ -284,6 +203,7 @@ function cancelContract(web3, cloneFactory) { /** * @param {import('web3').default} web3 * @param {import('contracts-js').CloneFactoryContext} cloneFactory + * @param {(contractId: string)=>Promise} onUpdate */ function setContractDeleteStatus(web3, cloneFactory, onUpdate) { if (!web3) { @@ -302,9 +222,7 @@ function setContractDeleteStatus(web3, cloneFactory, onUpdate) { const account = web3.eth.accounts.privateKeyToAccount(privateKey) web3.eth.accounts.wallet.create(0).add(account) - const { - data: { isDead }, - } = await _loadContractInstance(web3, contractId) + const isDead = await Implementation(web3, contractId).methods.isDeleted().call() if (Boolean(isDead) === Boolean(deleteContract)) { return true } @@ -321,7 +239,7 @@ function setContractDeleteStatus(web3, cloneFactory, onUpdate) { from: walletAddress, gas, }) - onUpdate(contractId, walletAddress).catch((err) => + onUpdate(contractId).catch((err) => logger.error(`Failed to refresh after setContractDeadStatus: ${err}`) ) return result @@ -354,9 +272,7 @@ function purchaseContract(web3, cloneFactory, lumerin) { const account = web3.eth.accounts.privateKeyToAccount(privateKey) web3.eth.accounts.wallet.create(0).add(account) - const { - data: { isDead, price: p }, - } = await _loadContractInstance(web3, contractId) + const isDead = await Implementation(web3, contractId).methods.isDeleted().call() if (isDead) { throw new Error('Contract is deleted already') } @@ -407,10 +323,9 @@ function purchaseContract(web3, cloneFactory, lumerin) { * * @param {import('web3').default} web3 * @param {import('contracts-js').CloneFactoryContext} cloneFactory - * @param {import('contracts-js').LumerinContext} lumerin - * @returns + * @returns {(params: Object)=>Promise} */ -function editContract(web3, cloneFactory, lumerin) { +function editContract(web3, cloneFactory) { return async (params) => { const { walletId, @@ -421,7 +336,6 @@ function editContract(web3, cloneFactory, lumerin) { speed, duration, } = params - const sendOptions = { from: walletId } const account = web3.eth.accounts.privateKeyToAccount(privateKey) web3.eth.accounts.wallet.create(0).add(account) @@ -431,14 +345,14 @@ function editContract(web3, cloneFactory, lumerin) { const editGas = await cloneFactory.methods .setUpdateContractInformation(contractId, price, limit, speed, duration) .estimateGas({ - from: sendOptions.from, + from: walletId, value: marketplaceFee, }) const editResult = await cloneFactory.methods .setUpdateContractInformation(contractId, price, limit, speed, duration) .send({ - ...sendOptions, + from: walletId, gas: editGas, value: marketplaceFee, }) @@ -448,8 +362,8 @@ function editContract(web3, cloneFactory, lumerin) { } module.exports = { - getContracts, getContract, + getContractHistory, createContract, cancelContract, purchaseContract, diff --git a/src/plugins/contracts/api.types.js b/src/plugins/contracts/api.types.js new file mode 100644 index 0000000..7b9a42a --- /dev/null +++ b/src/plugins/contracts/api.types.js @@ -0,0 +1,19 @@ +/** + * @typedef {Object} Contract + * @property {string} id + * @property {string} price + * @property {string} speed + * @property {string} length + * @property {string} buyer + * @property {string} seller + * @property {string} timestamp + * @property {string} state - 0 available, 1 running + * @property {string} encryptedPoolData + * @property {string} limit + * @property {boolean} isDead + * @property {string} balance + * @property {{successCount: string, failCount: string}} stats + * @property {boolean} hasFutureTerms + * @property {{price: string, speed: string, length: string, limit: string, version: string}} futureTerms + * @property {string} version + */ \ No newline at end of file diff --git a/src/plugins/contracts/events-controller.js b/src/plugins/contracts/events-controller.js new file mode 100644 index 0000000..999f8b8 --- /dev/null +++ b/src/plugins/contracts/events-controller.js @@ -0,0 +1,97 @@ +const logger = require('../../logger'); +const { getContract } = require('./api'); + +/** + * Interface for a contract watcher + * @typedef Watcher + * @prop {(onChange: (contractID: string) => void, onError: (e: Error) => void, block: number) => void} startWatching + * @prop {() => Promise} stopWatching + */ + +class EventsController { + /** @type {Watcher} */ + watcher = null + /** @type {string} */ + walletAddress = null + /** @type {import('web3').default} */ + web3 = null + /** @type {import("node:events").EventEmitter} */ + eventBus = null + /** @type {import('contracts-js').CloneFactoryContext} */ + cloneFactory = null + + /** + * @param {import('web3').default} web3 + * @param {import("node:events").EventEmitter} eventBus + * @param {Watcher} watcher + * @param {string} walletAddress + * @param {import('contracts-js').CloneFactoryContext} cloneFactory + */ + constructor(web3, eventBus, watcher, walletAddress, cloneFactory) { + this.web3 = web3; + this.watcher = watcher; + this.walletAddress = walletAddress; + this.eventBus = eventBus; + this.cloneFactory = cloneFactory; + } + + /** + * Pulls all contracts and starts watching in the background + * @returns {Promise} + */ + async start() { + await this.refreshContracts() + const lastBlock = await this.web3.eth.getBlockNumber() + this.watcher.startWatching(this.updateContract.bind(this), (e) => { + logger.error(`WatcherPolling error: ${e}, ${e.stack}`) + this.eventBus.emit('wallet-error', { + inner: e, + message: 'Could not update contract state', + meta: { plugin: 'contracts' }, + }) + }, lastBlock) + } + + /** + * Stops watching contracts is the background. Resolves when watching is finished. + */ + async stop() { + await this.watcher.stopWatching() + } + + /** + * Arbitratily refreshes all contracts + */ + async refreshContracts() { + this.eventBus.emit('contracts-scan-started') + const contractIDs = await this.cloneFactory.methods.getContractList().call(); + + // cannot use .revese() because it mutates the original array, which is not allowed in readonly {contractIDs} + const reversedContracts = []; + for (let i = contractIDs.length - 1; i >= 0; i--) { + reversedContracts.push(contractIDs[i]); + } + + const chunkSize = 5; + for (let i = 0; i < reversedContracts.length; i += chunkSize) { + const chunk = reversedContracts.slice(i, i + chunkSize); + await Promise.all(chunk.map((id) => this.updateContract(id))) + } + this.eventBus.emit("contracts-scan-finished") + } + + /** + * Arbitrary refreshes single contract + * @param {string} contractID + */ + async updateContract(contractID) { + const data = await getContract(this.web3, contractID, this.walletAddress) + this.eventBus.emit('contracts-updated', { + actives: [data], + }) + } +} + +module.exports = { + EventsController, +} \ No newline at end of file diff --git a/src/plugins/contracts/events-listener.js b/src/plugins/contracts/events-listener.js deleted file mode 100644 index 3929472..0000000 --- a/src/plugins/contracts/events-listener.js +++ /dev/null @@ -1,96 +0,0 @@ -//@ts-check -// const debug = require('debug')('lmr-wallet:core:contracts:event-listener') -const logger = require('../../logger'); - -class ContractEventsListener { - /** - * @param {import('contracts-js').CloneFactoryContext} cloneFactory - */ - constructor(cloneFactory) { - this.cloneFactory = cloneFactory - this.cloneFactoryListener = null - this.contracts = {} - this.walletAddress = null; - } - - /** - * @param {(contractId?: string, walletAddress?: string) => void} onUpdate - */ - setOnUpdate(onUpdate) { - this.onUpdate = onUpdate - } - - /** - * - * @param {string} id - * @param {import('contracts-js').ImplementationContext} instance - * @param {string} walletAddress - */ - addContract(id, instance, walletAddress) { - if (!this.contracts[id]) { - this.contracts[id] = instance.events.allEvents() - this.contracts[id] - .on('connected', () => { - logger.debug(`Start listen contract (${id}) events`) - }) - .on('data', () => { - logger.debug(`Contract (${id}) updated`) - if (this.onUpdate){ - this.onUpdate(id, this.walletAddress || walletAddress) - } - }) - } - } - - listenCloneFactory() { - if (!this.cloneFactoryListener) { - this.cloneFactoryListener = this.cloneFactory.events.contractCreated() - this.cloneFactoryListener - .on('connected', () => { - logger.debug('Start listen clone factory events') - }) - .on('data', (event) => { - const contractId = event.returnValues._address - logger.debug('New contract created', contractId) - this.onUpdate(contractId, this.walletAddress) - }) - } - } - - /** - * @static - * @param {import('contracts-js').CloneFactoryContext} cloneFactory - * @param {boolean} [debugEnabled=false] - * @returns {ContractEventsListener} - */ - static create(cloneFactory, debugEnabled = false) { - if (ContractEventsListener.instance) { - return ContractEventsListener.instance - } - - const instance = new ContractEventsListener(cloneFactory) - ContractEventsListener.instance = instance - instance.listenCloneFactory() - return instance - } - - /** - * @returns {ContractEventsListener} - */ - static getInstance() { - if (!ContractEventsListener.instance) { - throw new Error("ContractEventsListener instance not created") - } - return ContractEventsListener.instance - } - - /** - * @static - * @param {(contractId?: string) => void} onUpdate - */ - static setOnUpdate(onUpdate) { - ContractEventsListener.getInstance().onUpdate = onUpdate - } -} - -module.exports = { ContractEventsListener } diff --git a/src/plugins/contracts/events.js b/src/plugins/contracts/events.js deleted file mode 100644 index dad70e4..0000000 --- a/src/plugins/contracts/events.js +++ /dev/null @@ -1,100 +0,0 @@ -'use strict' - -const { utils: { hexToUtf8 } } = require('web3') -// const LumerinContracts = require('metronome-contracts') -// const LumerinContracts = require('@lumerin/contracts') - -const exportMetaParser = ({ returnValues }) => ({ - lumerin: { - export: { - blockTimestamp: returnValues.blockTimestamp, - burnSequence: returnValues.burnSequence, - currentBurnHash: returnValues.currentBurnHash, - currentTick: returnValues.currentTick, - dailyAuctionStartTime: returnValues.dailyAuctionStartTime, - dailyMintable: returnValues.dailyMintable, - destinationChain: hexToUtf8(returnValues.destinationChain), - extraData: returnValues.extraData, - fee: returnValues.fee, - genesisTime: returnValues.genesisTime, - previousBurnHash: returnValues.prevBurnHash, - supply: returnValues.supplyOnChain, - to: returnValues.destinationRecipientAddr, - value: returnValues.amountToBurn - } - } -}) - -const importRequestMetaParser = ({ returnValues }) => ({ - lumerin: { - importRequest: { - currentBurnHash: returnValues.currentBurnHash, - fee: returnValues.fee, - originChain: hexToUtf8(returnValues.originChain), - to: returnValues.destinationRecipientAddr, - value: returnValues.amountToImport - } - } -}) - -const importMetaParser = ({ returnValues }) => ({ - lumerin: { - import: { - currentBurnHash: returnValues.currentHash, - fee: returnValues.fee, - originChain: hexToUtf8(returnValues.originChain), - to: returnValues.destinationRecipientAddr, - value: returnValues.amountImported - } - } -}) - -// function getEventDataCreator (chain) { -// const { -// abi, -// address: contractAddress, -// birthblock: minBlock -// } = LumerinContracts[chain].TokenPorter - -// return [ -// address => ({ -// contractAddress, -// abi, -// eventName: 'LogExportReceipt', -// filter: { exporter: address }, -// metaParser: exportMetaParser, -// minBlock -// }), -// address => ({ -// contractAddress, -// abi, -// eventName: 'LogExportReceipt', -// filter: { destinationRecipientAddr: address }, -// metaParser: exportMetaParser, -// minBlock -// }), -// address => ({ -// contractAddress, -// abi, -// eventName: 'LogImportRequest', -// filter: { destinationRecipientAddr: address }, -// metaParser: importRequestMetaParser, -// minBlock -// }), -// address => ({ -// contractAddress, -// abi, -// eventName: 'LogImport', -// filter: { destinationRecipientAddr: address }, -// metaParser: importMetaParser, -// minBlock -// }) -// ] -// } - -module.exports = { - // getEventDataCreator, - exportMetaParser, - importMetaParser, - importRequestMetaParser -} diff --git a/src/plugins/contracts/index.js b/src/plugins/contracts/index.js index 8d633cc..668159a 100644 --- a/src/plugins/contracts/index.js +++ b/src/plugins/contracts/index.js @@ -1,113 +1,65 @@ -//@ts-check -'use strict' - -// const debug = require('debug')('lmr-wallet:core:contracts') -const logger = require('../../logger'); const { Lumerin, CloneFactory } = require('contracts-js') - -/** - * @type {typeof import('web3').default} - */ -//@ts-ignore -const Web3 = require('web3') - const { - getContracts, createContract, cancelContract, purchaseContract, setContractDeleteStatus, editContract, - getMarketplaceFee + getMarketplaceFee, + getContractHistory } = require('./api') -const { ContractEventsListener } = require('./events-listener') +const { EventsController } = require('./events-controller'); +const { WatcherPolling } = require('./watcher-polling'); /** * Create a plugin instance. - * * @returns {({ start: Function, stop: () => void})} The plugin instance. */ function createPlugin() { + /** @type {EventsController | null} */ + let eventsController = null + /** * Start the plugin instance. - * * @param {object} options Start options. * @returns {{ api: {[key: string]:any}, events: string[], name: string }} The instance details. */ function start({ config, eventBus, plugins }) { const { lmrTokenAddress, cloneFactoryAddress } = config - const { eth } = plugins - const web3 = eth.web3 - const web3Subscriptionable = new Web3(plugins.eth.web3SubscriptionProvider) + /** @type {import('web3').default} */ + const web3 = plugins.eth.web3 const lumerin = Lumerin(web3, lmrTokenAddress) const cloneFactory = CloneFactory(web3, cloneFactoryAddress) - const cloneFactorySubscriptionable = CloneFactory( - web3Subscriptionable, - cloneFactoryAddress - ) - const refreshContracts = - (web3, lumerin, cloneFactory) => async (contractId, walletAddress) => { - eventBus.emit('contracts-scan-started', {}) - ContractEventsListener.getInstance().walletAddress = walletAddress; - const addresses = contractId - ? [contractId] - : await cloneFactory.methods - .getContractList() - .call() - .catch((error) => { - logger.error('cannot get list of contract addresses:', error) - throw error - }) - - return getContracts( - web3, - web3Subscriptionable, - lumerin, - cloneFactory, - addresses, - walletAddress, - eventBus, - ) - .then((contracts) => { - eventBus.emit('contracts-scan-finished', { - actives: contracts, - }) - }) - .catch(function (error) { - logger.error('Could not sync contracts/events', error) - throw error - }) - } - - const contractEventsListener = ContractEventsListener.create( - cloneFactorySubscriptionable, - config.debug + const watcher = new WatcherPolling(web3, config.walletAddress, cloneFactory ,config.pollingIntervalMs) + eventsController = new EventsController( + web3, eventBus, watcher, config.walletAddress, cloneFactory ) - const onUpdate = refreshContracts(web3, lumerin, cloneFactory) - contractEventsListener.setOnUpdate(onUpdate) - return { api: { - refreshContracts: refreshContracts(web3, lumerin, cloneFactory), + startWatching: eventsController.start.bind(eventsController), + stopWatching: eventsController.stop.bind(eventsController), + refreshContracts: eventsController.refreshContracts.bind(eventsController), + getContractHistory: ({ contractAddr, walletAddress }) => getContractHistory(web3, contractAddr, walletAddress), createContract: createContract(web3, cloneFactory), cancelContract: cancelContract(web3, cloneFactory), purchaseContract: purchaseContract(web3, cloneFactory, lumerin), - editContract: editContract(web3, cloneFactory, lumerin), + editContract: editContract(web3, cloneFactory), getMarketplaceFee: getMarketplaceFee(cloneFactory), setContractDeleteStatus: setContractDeleteStatus( web3, cloneFactory, - onUpdate, + eventsController.updateContract.bind(eventsController), ), }, events: [ 'contracts-scan-started', - 'contracts-scan-finished', - 'contract-updated', + 'contracts-scan-finished', // also on update of single contract + 'contracts-updated', + 'wallet-error', ], name: 'contracts', } @@ -117,7 +69,9 @@ function createPlugin() { * Stop the plugin instance. */ function stop() { - logger.debug('Plugin stopping') + if (eventsController) { + eventsController.stop() + } } return { diff --git a/src/plugins/contracts/status.js b/src/plugins/contracts/status.js deleted file mode 100644 index 5c7c104..0000000 --- a/src/plugins/contracts/status.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict'; - -// const LumerinContracts = require('metronome-contracts'); -// const LumerinContracts = require('@lumerin/contracts'); - -/** - * Get the chain hop start time. - * - * @param {object} web3 A Web3.js instance. - * @param {string} chain The chain name. - * @returns {Promise<{chainHopStartTime:object}>} The start time in ms. - */ -// function getChainHopStartTime (web3, chain) { -// const { TokenPorter } = new LumerinContracts(web3, chain) - -// return TokenPorter.methods -// .chainHopStartTime() -// .call() -// .then(chainHopStartTime => ({ -// chainHopStartTime: Number.parseInt(chainHopStartTime, 10) * 1000 -// })) -// } - -// module.exports = getChainHopStartTime; diff --git a/src/plugins/contracts/stream.js b/src/plugins/contracts/stream.js deleted file mode 100644 index 0b14911..0000000 --- a/src/plugins/contracts/stream.js +++ /dev/null @@ -1,47 +0,0 @@ -'use strict'; - -// const debug = require('debug')('lmr-wallet:core:contracts-stream'); -const logger = require('../../logger'); - -/** - * Create a "classic" stream that connects to Lumerin Contracts. - * - * @param {Web3} web3 The function to poll. - * @returns {object} The stream instance. - */ -function createStream (web3) { - const subscription = web3.eth.subscribe('newBlockHeaders'); - - web3.eth.getBlock('latest') - .then(function (block) { - subscription.emit('data', block); - }) - .catch(function (err) { - subscription.emit('error', err); - }) - // const emitTickerValue = () => - // Promise.resolve() - // .then(fn) - // .then(function (data) { - // stream.emit('data', data); - // }) - // .catch(function (err) { - // stream.emit('error', err); - // }) - // .then(function () { - // if (!stop) { - // setTimeout(emitTickerValue, minInterval); - // } - // }); - - // emitTickerValue(); - - subscription.unsubscribe(function(error, success) { - success || logger.error('Could not successfully unsubscribe from web3 block-stream'); - }); - - - return subscription; -} - -module.exports = createStream; diff --git a/src/plugins/contracts/watcher-polling.js b/src/plugins/contracts/watcher-polling.js new file mode 100644 index 0000000..67a41bc --- /dev/null +++ b/src/plugins/contracts/watcher-polling.js @@ -0,0 +1,156 @@ +//@ts-check +/** @type {typeof import('web3-eth-abi').default} */ +//@ts-ignore +const abi = require('web3-eth-abi') +const { sleep } = require('../explorer/watcher-helpers') + +const CONTRACT_CREATED = 'contractCreated(address,string)' +const CONTRACT_PURCHASED = 'clonefactoryContractPurchased(address,address)' +const CONTRACT_DELETE_UPDATED = 'contractDeleteUpdated(address,bool)' +const CONTRACT_CLOSED = 'contractClosed(address,uint256)' +const CONTRACT_UPDATED = 'purchaseInfoUpdated(address)' + +const CONTRACT_CREATED_SIG = abi.encodeEventSignature(CONTRACT_CREATED) +const CONTRACT_PURCHASED_SIG = abi.encodeEventSignature(CONTRACT_PURCHASED) +const CONTRACT_DELETE_UPDATED_SIG = abi.encodeEventSignature( + CONTRACT_DELETE_UPDATED +) +const CONTRACT_CLOSED_SIG = abi.encodeEventSignature(CONTRACT_CLOSED) +const CONTRACT_UPDATED_SIG = abi.encodeEventSignature(CONTRACT_UPDATED) + +class WatcherPolling { + /** @type {import('contracts-js').CloneFactoryContext} */ + cloneFactory = null + /** @type {boolean} */ + stop = false + /** @type {Promise} */ + job = null + /** @type {Map} */ + contracts = new Map() + /** @type {(contractID: string) => void} */ + onChange = null + /** @type {(error: any) => void} */ + onError = null + + /** + * @param {import('web3').default} web3 + * @param {string} walletAddress + * @param {import('contracts-js').CloneFactoryContext} cloneFactory + * @param {number} pollingIntervalMs + */ + constructor(web3, walletAddress, cloneFactory, pollingIntervalMs = 3000) { + this.web3 = web3 + this.walletAddress = walletAddress + this.cloneFactory = cloneFactory + this.pollingIntervalMs = pollingIntervalMs + this.lastSyncedBlock = 0 + } + + /** @param { (contractID: string) => void} onChange */ + startWatching(onChange, onError, fromBlock) { + if (this.job !== null) { + throw new Error('Already started') + } + this.lastSyncedBlock = +fromBlock + this.onChange = onChange + this.onError = onError + this.stop = false + this.job = this.poller() + } + + async stopWatching() { + this.stop = true + await this.job + this.job = null + } + + /** + * @private + * @returns {Promise} + */ + async poller() { + for (;;) { + if (this.stop) { + return + } + + try { + const changes = await this.getChanges() + + for (const log of changes) { + if (this.stop) { + break + } + + const { topics, blockNumber, data } = log + const [eventTopic, contractAddressTopic] = topics + const contractAddress = this.decodeAddress(data, contractAddressTopic) + switch (eventTopic) { + case CONTRACT_CLOSED_SIG: + this.onChange(contractAddress) + break + case CONTRACT_PURCHASED_SIG: + this.onChange(contractAddress) + break + case CONTRACT_DELETE_UPDATED_SIG: + this.onChange(contractAddress) + break + case CONTRACT_CREATED_SIG: + this.onChange(contractAddress) + break + case CONTRACT_UPDATED_SIG: + this.onChange(contractAddress) + break + } + + if (+this.lastSyncedBlock < blockNumber) { + this.lastSyncedBlock = blockNumber + } + } + } catch (err) { + this.onError(err) + } + + await sleep(this.pollingIntervalMs) + } + } + + decodeAddress(data, topic) { + return this.web3.eth.abi.decodeLog( + [ + { + type: 'address', + name: '_address', + indexed: true, + }, + ], + data, + topic + )._address + } + + async getChanges() { + const options = { + fromBlock: this.lastSyncedBlock, + toBlock: 'latest', + address: this.cloneFactory._address, + } + const changes = await this.web3.eth.getPastLogs({ + ...options, + topics: [ + [ + CONTRACT_CREATED_SIG, + CONTRACT_PURCHASED_SIG, + CONTRACT_DELETE_UPDATED_SIG, + CONTRACT_CLOSED_SIG, + CONTRACT_UPDATED_SIG, + ], + ], + }) + return changes + } +} + +module.exports = { + WatcherPolling, +} diff --git a/src/plugins/eth/index.js b/src/plugins/eth/index.js index e5412aa..bc388f0 100644 --- a/src/plugins/eth/index.js +++ b/src/plugins/eth/index.js @@ -2,24 +2,23 @@ const logger = require('../../logger'); -const { createWeb3, destroyWeb3, createWeb3Subscribable } = require('./web3'); +const { createWeb3, destroyWeb3 } = require('./web3'); const checkChain = require('./check-chain'); -function createPlugin () { +function createPlugin() { let web3 = null; - let web3SubscribAble = null; - function start ({ config, eventBus }) { + function start({ config, eventBus }) { // debug.enabled = config.debug; - - web3 = createWeb3(config, eventBus); - web3SubscribAble = createWeb3Subscribable(config, eventBus); + + web3 = createWeb3(config); checkChain(web3, config.chainId) .then(function () { logger.debug('Chain ID is correct'); }) .catch(function (err) { + logger.error(err?.message, err?.stack) eventBus.emit('wallet-error', { inner: err, message: 'Could not check chain ID', @@ -31,7 +30,6 @@ function createPlugin () { api: { web3, web3Provider: web3.currentProvider, - web3SubscriptionProvider: web3SubscribAble.currentProvider, }, events: [ 'wallet-error', @@ -41,7 +39,7 @@ function createPlugin () { }; } - function stop () { + function stop() { destroyWeb3(web3); web3 = null; } diff --git a/src/plugins/eth/web3.js b/src/plugins/eth/web3.js index 8ce768c..ebda7fc 100644 --- a/src/plugins/eth/web3.js +++ b/src/plugins/eth/web3.js @@ -1,58 +1,22 @@ 'use strict' const logger = require('../../logger'); - -const Web3 = require('web3') const { Web3Http } = require('./web3Http'); function createWeb3(config) { // debug.enabled = config.debug - const web3 = new Web3Http(config.httpApiUrls) return web3 } -function createWeb3Subscribable(config, eventBus) { - // debug.enabled = config.debug - - const options = { - timeout: 1000 * 15, // ms - // Enable auto reconnection - reconnect: { - auto: true, - delay: 5000, // ms - maxAttempts: false, - onTimeout: false, - }, - } - - const web3 = new Web3( - new Web3.providers.WebsocketProvider(config.wsApiUrl, options) - ) - - web3.currentProvider.on('connect', function () { - logger.debug('Web3 provider connected') - eventBus.emit('web3-connection-status-changed', { connected: true }) - }) - web3.currentProvider.on('error', function (event) { - logger.debug('Web3 provider connection error: ', event.type || event.message) - eventBus.emit('web3-connection-status-changed', { connected: false }) - }) - web3.currentProvider.on('end', function (event) { - logger.debug('Web3 provider connection ended: ', event.reason) - eventBus.emit('web3-connection-status-changed', { connected: false }) - }) - - return web3 -} - function destroyWeb3(web3) { - web3.currentProvider?.disconnect() + web3.currentProvider?.disconnect().catch(() => { + logger.error('Web3 disconnect error') + }) } module.exports = { createWeb3, destroyWeb3, - createWeb3Subscribable, } diff --git a/src/plugins/eth/web3Http.js b/src/plugins/eth/web3Http.js index 88c8841..3437c85 100644 --- a/src/plugins/eth/web3Http.js +++ b/src/plugins/eth/web3Http.js @@ -49,7 +49,7 @@ class Web3Http extends Web3 { this.retryCount = 0 // Initialize Web3 with the first provider from the list - this.setCustomProvider(this.providers[this.currentIndex]) + this.setCustomProvider(this.providers[this.currentIndex]); // Set options if provided if (options) { @@ -90,6 +90,7 @@ class Web3Http extends Web3 { } }) } + return true } diff --git a/src/plugins/explorer/api/etherscan-api.js b/src/plugins/explorer/api/etherscan-api.js index 31fd760..723e139 100644 --- a/src/plugins/explorer/api/etherscan-api.js +++ b/src/plugins/explorer/api/etherscan-api.js @@ -3,7 +3,7 @@ const axios = require('axios').default; class EtherscanApi { constructor({ baseURL }) { - this.api = axios.create({baseURL}) + this.api = axios.create({ baseURL }) } /** @@ -28,7 +28,9 @@ class EtherscanApi { page, offset: pageSize } + const { data } = await this.api.get('', { params }) + const { status, message, result } = data if (status !== '1' && message !== 'No transactions found') { throw new Error(result) @@ -46,7 +48,7 @@ class EtherscanApi { * @param {number} [pageSize] page size * @returns {Promise} array of transaction hashes */ - async getEthTransactions(from, to, address, page = 1, pageSize = 1000) { + async getEthTransactions(from, to, address, page = 1, pageSize = 10) { const params = { module: 'account', action: 'txlist', diff --git a/src/plugins/explorer/blocks-stream.js b/src/plugins/explorer/blocks-stream.js index dbbc8db..7676cbd 100644 --- a/src/plugins/explorer/blocks-stream.js +++ b/src/plugins/explorer/blocks-stream.js @@ -1,29 +1,30 @@ 'use strict'; -const logger = require('../../logger'); const EventEmitter = require('events'); -function createStream (web3, updateInterval = 10000) { - const ee = new EventEmitter(); +function createStream(web3, updateInterval = 10000) { + const stream = new EventEmitter(); - web3.eth.getBlock('latest') - .then(function (block) { - ee.emit('data', block); - }) - .catch(function (err) { - ee.emit('error', err); - }) - - const interval = setInterval(async () => { + const getBlockFunc = async () => { try { const block = await web3.eth.getBlock('latest'); - ee.emit('data', block); + stream.emit('data', block); } catch (err) { - ee.emit('error', err); + stream.emit('error', err); + } + } + + getBlockFunc() + const interval = setInterval(getBlockFunc, updateInterval); + + const stop = () => { + stream.removeAllListeners() + if (interval) { + clearInterval(interval) } - }, updateInterval); + } - return { interval, stream: ee }; + return { stream, stop }; } module.exports = createStream; diff --git a/src/plugins/explorer/events.js b/src/plugins/explorer/events.js deleted file mode 100644 index c978b08..0000000 --- a/src/plugins/explorer/events.js +++ /dev/null @@ -1,12 +0,0 @@ -'use strict'; - -function createEventsRegistry () { - const registeredEvents = []; - - return { - getAll: () => registeredEvents, - register: registration => registeredEvents.push(registration) - }; -} - -module.exports = createEventsRegistry; diff --git a/src/plugins/explorer/explorer-mapper.js b/src/plugins/explorer/explorer-mapper.js new file mode 100644 index 0000000..0387ff3 --- /dev/null +++ b/src/plugins/explorer/explorer-mapper.js @@ -0,0 +1,183 @@ +const { decodeAbiSignature, decodeEvent } = require("./watcher-helpers") + +/** + * @typedef {import("web3-core").Transaction} Transaction + * @typedef {import("web3-core").TransactionReceipt} TransactionReceipt + */ + +/** + * Maps external indexer tokentx call response to TransactionEvent + * @param {TokenTxResItem} tx + * @returns {TransactionEvent} + */ +function mapLMRResToTxEvent(tx) { + return { + type: "transfer", + timestamp: tx.timeStamp, + txhash: tx.hash, + blockNumber: Number(tx.blockNumber), + transactionFee: Number(tx.gasPrice) * Number(tx.gasUsed), + transfers: [ + { + from: tx.from, + to: tx.to, + amount: tx.value, + token: "LMR" + } + ] + } +} + +/** + * Maps external indexer txlist call response to TransactionEvent + * @param {AbiItemSignature[]} contractAbi + * @param {TxListResItem} tx + * @returns {TransactionEvent} + */ +function mapETHResToTxEvent(contractAbi, tx) { + const funcName = decodeAbiSignature(contractAbi, tx.methodId) + const type = mapContractCallToTxType(funcName?.name) + return { + timestamp: tx.timeStamp, + txhash: tx.hash, + type: type, + blockNumber: Number(tx.blockNumber), + transactionFee: Number(tx.gasPrice) * Number(tx.gasUsed), + transfers: [ + { + from: tx.from, + to: tx.to, + amount: tx.value, + token: "ETH" + } + ] + } +} + +/** + * Merges calls to tokenTx and txList into one list of transactions + * @param {TransactionEvent[]} txs + * @returns {TransactionEvent[]} + */ +function mergeTxs(txs) { + /** @type {Map} */ + const txsMap = new Map() + for (const tx of txs) { + const existingTx = txsMap.get(tx.txhash) + if (existingTx) { + existingTx.transfers.push(...tx.transfers) + if (tx.type !== 'transfer') { + existingTx.type = tx.type + } + } else { + txsMap.set(tx.txhash, tx) + } + } + return Array.from(txsMap.values()) + .sort((a, b) => Number(b.timestamp) - Number(a.timestamp)) +} + +/** + * Maps sent transaction data and receipt to TransactionEvent + * @param {AbiItemSignature[]} contractAbi + * @param {Object} obj + * @param {Partial} obj.transaction + * @param {TransactionReceipt} obj.receipt + * @returns {TransactionEvent} + */ +function mapSentTxToTxEvent(contractAbi, { transaction, receipt }) { + const logs = receipt.logs + + /** @type {TransactionEvent['transfers']} */ + const transfers = [{ + from: transaction.from, + to: transaction.to, + amount: transaction.value, + token: "ETH" + }] + + /** @type {TransactionType} */ + let eventType = "transfer" + for (const log of logs) { + try { + //@ts-ignore + const event = decodeEvent(contractAbi, log) + if (event.eventName === 'Transfer') { + transfers.push({ + from: event.from, + to: event.to, + amount: event.value, + token: "LMR" + }) + } + const type = mapEventToTxType(event.eventName) + if (type !== 'transfer') { + eventType = type + } + } catch (err) { + console.log(err); + } + } + + return { + blockNumber: Number(receipt.blockNumber), + timestamp: `${Date.now()}`, + txhash: receipt.transactionHash, + type: eventType, + transactionFee: Number(receipt.effectiveGasPrice) * Number(receipt.gasUsed), + transfers: transfers, + } +} + +/** + * Maps contract call returned by indexer to transaction type + * @param {string | undefined} contractCall + * @returns {TransactionType} + */ +function mapContractCallToTxType(contractCall) { + switch (contractCall) { + case 'setPurchaseRentalContract': + return 'purchase' + case 'setCreateNewRentalContract': + return 'create' + case 'setContractDeleted': + return 'delete' + case 'setUpdateContractInformation': + return 'update' + case 'setContractCloseout': + return 'closeout' + default: + return 'transfer' + } +} + +/** + * Maps event returned by eth node getTransaction call to transaction type + * @param {string | undefined} eventName + * @returns {TransactionType} + */ +function mapEventToTxType(eventName) { + switch (eventName) { + case 'contractClosed': + return 'closeout' + case 'purchaseInfoUpdated': + return 'update' + case 'contractCreated': + return 'create' + case 'clonefactoryContractPurchased': + return 'purchase' + case 'contractDeleteUpdated': + return 'delete' + default: + return 'transfer' + } +} + +module.exports = { + mapLMRResToTxEvent, + mapETHResToTxEvent, + mapSentTxToTxEvent, + mergeTxs, + mapContractCallToTxType, + mapEventToTxType, +} \ No newline at end of file diff --git a/src/plugins/explorer/explorer.js b/src/plugins/explorer/explorer.js index 539e5a3..d6577b4 100644 --- a/src/plugins/explorer/explorer.js +++ b/src/plugins/explorer/explorer.js @@ -1,126 +1,181 @@ 'use strict' -const EventEmitter = require('events') const pRetry = require('p-retry'); const { createExplorerApis } = require('./api/factory'); +const { mapLMRResToTxEvent, mapETHResToTxEvent, mapSentTxToTxEvent, mergeTxs } = require('./explorer-mapper') +const { sleep } = require('./watcher-helpers'); + +/** + * @typedef {import('contracts-js').LumerinContext} LumerinContext + * @typedef {import('contracts-js').CloneFactoryContext} CloneFactoryContext + * @typedef {import('web3').default} Web3 + * @typedef {import("web3-core").Transaction} Transaction + * @typedef {import("web3-core").TransactionReceipt} TransactionReceipt + */ + /** * * @param {string[]} explorerApiURLs * @param {*} web3 * @param {*} lumerin - * @param {*} eventBus + * @param {*} cf + * @param {number} [pollingIntervalMs] * @returns */ -const createExplorer = (explorerApiURLs, web3, lumerin, eventBus) => { +const createExplorer = (explorerApiURLs, web3, lumerin, cf, pollingIntervalMs) => { const apis = createExplorerApis(explorerApiURLs); - return new Explorer({ apis, lumerin, web3, eventBus }) + return new Explorer({ apis, lumerin, web3, cloneFactory: cf, pollingIntervalMs }) } class Explorer { - /** @type {import('contracts-js').LumerinContext} */ + /** @type {LumerinContext} */ lumerin = null; - - constructor({ apis, lumerin, web3, eventBus }) { + /** @type {CloneFactoryContext} */ + cloneFactory = null + /** @type {Web3} */ + web3 = null; + /** @type {Indexer[]} */ + apis = []; + latestSyncedBlock = 0; + stop = false; + /** @type {Promise | null} */ + job = null; + pollingIntervalMs = 0; + pageSize = 3; // number of txs retrieved in one call + /** @type {(txEvent: TransactionEvent) => void | null} onChange */ + onChange = null; + + constructor({ apis, lumerin, cloneFactory, web3, pollingIntervalMs = 5000 }) { this.apis = apis this.lumerin = lumerin + this.cloneFactory = cloneFactory + this.walletAddress = null; this.web3 = web3 - this.eventBus = eventBus + this.pollingIntervalMs = pollingIntervalMs } /** * Returns list of transactions for ETH and LMR token * @param {string} from start block * @param {string} to end block - * @param {string} address wallet address - * @returns {Promise} + * @param {number} page + * @param {number} pageSize + * @param {string} [walletAddress] + * @returns {Promise} */ - async getTransactions(from, to, address, page, pageSize) { - const lmrTransactions = await this.invoke('getTokenTransactions', from, to, address, this.lumerin._address, page, pageSize) - const ethTransactions = await this.invoke('getEthTransactions', from, to, address, page, pageSize) - - if (page && pageSize) { - const hasNextPage = lmrTransactions.length || ethTransactions.length; - this.eventBus.emit('transactions-next-page', { - hasNextPage: Boolean(hasNextPage), - page: page + 1, - }) + async getTransactions(from, to, page, pageSize, walletAddress = this.walletAddress) { + if (!walletAddress) { + throw new Error('walletAddress is required'); } - return [...lmrTransactions, ...ethTransactions] + //@ts-ignore + const lmrTransactions = await this.invoke('getTokenTransactions', from, to || 'latest', walletAddress, this.lumerin._address, page, pageSize) + const ethTransactions = await this.invoke('getEthTransactions', from, to || 'latest', walletAddress, page, pageSize) + + const abis = this.abis() + + return mergeTxs([ + ...lmrTransactions.map(mapLMRResToTxEvent), + ...ethTransactions.map((item) => mapETHResToTxEvent(abis, item)) + ]) } /** - * Returns list of transactions for LMR token - * @param {string} from start block - * @param {string} to end block - * @param {string} address wallet address - * @returns {Promise} + * Wraps the transaction call and emits event with a parsed transaction data + * @param {PromiEvent} promiEvent + * @returns {Promise<{ receipt: import("web3-core").TransactionReceipt }>} */ - async getETHTransactions(from, to, address) { - return await this.invoke('getEthTransactions', from, to, address) + logTransaction(promiEvent) { + if (!promiEvent.once) { + return + } + + const onChange = this.onChange + const abis = this.abis() + + return new Promise(function (resolve, reject) { + const txData = { + /** @type {Partial} */ + transaction: null, + /** @type {TransactionReceipt} */ + receipt: null, + } + + promiEvent + .once('sending', function (payload) { + txData.transaction = payload.params[0] + }) + .once('receipt', function (receipt) { + txData.receipt = receipt + const tx = mapSentTxToTxEvent(abis, txData) + if (onChange) { + onChange(tx) + } + resolve({ receipt }); + }) + .once('error', function (err) { + promiEvent.removeAllListeners(); + reject(err); + }); + }); } - /** - * Create a stream that will emit an event each time a transaction for the - * specified address is indexed. - * - * The stream will emit `data` for each transaction. If the connection is lost - * or an error occurs, an `error` event will be emitted. - * - * @param {string} address The address. - * @returns {object} The event emitter. + /** + * Starts watching for new transactions in background + * @param {(txId: TransactionEvent) => void} onChange + * @param {(e: Error) => void} onError */ - getTransactionStream = (address) => { - const stream = new EventEmitter() - - this.lumerin.events - .Transfer({ - filter: { - to: address, - }, - }) - .on('data', (data) => { - stream.emit('data', data) - }) - .on('error', (err) => { - stream.emit('error', err) - }) - - this.lumerin.events - .Transfer({ - filter: { - from: address, - }, - }) - .on('data', (data) => { - stream.emit('data', data) - }) - .on('error', (err) => { - stream.emit('error', err) - }) - - setInterval(() => { - stream.emit('resync') - }, 60000) - - return stream + startWatching(walletAddress, onChange, onError) { + if (this.job) { + throw new Error('Already watching') + } + this.walletAddress = walletAddress; + this.onChange = onChange + this.onError = onError + this.job = this.poll() + } + + /** + * Stops watching for new transactions in background + */ + async stopWatching() { + this.stop = true + await this.job + this.job = null + this.walletAddress = null; } - getLatestBlock() { - return this.web3.eth.getBlock('latest').then((block) => { - return { - number: block.number, - hash: block.hash, - totalDifficulty: block.totalDifficulty, + async poll() { + for (; ;) { + for (let page = 1; ; page++) { + if (this.stop) { + return + } + try { + const txs = await this.getTransactions(String(this.latestSyncedBlock + 1), "latest", page, this.pageSize) + for (const tx of txs) { + this.onChange(tx) + if (tx.blockNumber > this.latestSyncedBlock) { + this.latestSyncedBlock = tx.blockNumber + } + } + if (!txs.length) { + break; + } + } catch (err) { + this.onError(err) + } } - }) + await sleep(this.pollingIntervalMs) + } } /** * Helper method that attempts to make a function call for multiple providers - * @param {string} methodName + * @param {keyof Indexer} methodName * @param {...any} args * @returns {Promise} + * @private */ async invoke(methodName, ...args) { return await pRetry(async () => { @@ -128,6 +183,7 @@ class Explorer { for (const api of this.apis) { try { + //@ts-ignore return await api[methodName](...args) } catch (err) { lastErr = err @@ -135,8 +191,21 @@ class Explorer { } throw new Error(`Explorer error, tried all of the providers without success, ${lastErr}`) + //@ts-ignore }, { minTimeout: 5000, retries: 5 }) } + + /** + * @returns {AbiItemSignature[]} + */ + abis() { + //@ts-ignore + return [...this.lumerin._jsonInterface, ...this.cloneFactory._jsonInterface] + } + } -module.exports = createExplorer \ No newline at end of file +module.exports = { + createExplorer, + Explorer, +} \ No newline at end of file diff --git a/src/plugins/explorer/explorer.test.js b/src/plugins/explorer/explorer.test.js new file mode 100644 index 0000000..d5ef395 --- /dev/null +++ b/src/plugins/explorer/explorer.test.js @@ -0,0 +1,138 @@ +/** + * @type {typeof import('web3').default} + */ +//@ts-ignore +const Web3 = require('web3') +const { Lumerin, CloneFactory } = require('contracts-js'); +const { mapSentTxToTxEvent } = require('./explorer-mapper'); +const { expect } = require('chai'); + +describe('mapper tests', () => { + const web3 = new Web3('http://localhost:8545') + + const lumerinAddr = "0x0000000000000000000000000000000000000000" + const cfAddr = "0x0000000000000000000000000000000000000000" + + const lumerin = Lumerin(web3, lumerinAddr) + const cf = CloneFactory(web3, cfAddr) + //@ts-ignore + const abis = [...lumerin._jsonInterface, ...cf._jsonInterface] + + it('should map closeout correctly', () => { + const tx = { + "transaction": { + "hash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "type": 2, + "accessList": [], + "blockHash": + "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da", + "blockNumber": 44, + "transactionIndex": 0, + "confirmations": 1, + "from": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "gasPrice": "1000", + "maxPriorityFeePerGas": "1000", + "maxFeePerGas": "1000", + "gasLimit": "1000", + "to": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853", + "value": "2000", + "nonce": 30, + "data": "0xd7ef57a20000000000000000000000009bd03768a7dcc129555de410ff8e85528a4f88b50000000000000000000000000000000000000000000000000000000000000000", + "r": "0xeddd9b62b1a9147458abf09d55b1b67a2e9327ca1b2996040f32cb3b9de7603f", + "s": "0x6e84c45bbf9460af71f33ca2861fda772e0d79facf7ca11efb37eacd2cd6bf2f", + "v": "0", + "creates": null, + "chainId": "31337" + }, + "receipt": { + "to": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853", + "from": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "contractAddress": null, + "transactionIndex": 0, + "gasUsed": 1000, + "logsBloom": "0x00020000000000000002000000400000010000000000000000000000000000010000000000000000000400000000000000000000000000000000000000000000000000000000000000000008000000000000000010000000010000000000000000000840000000000000000100000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000200000008000000000000000000000000000800000000000000010000000000000200000000000042000000200000000000000000000000002000000000200000000001000000000000000000000000000000000001000000008000000004000000000000", + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da", + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "logs": [{ + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0x9bd03768a7DCc129555dE410FF8E85528A4F88b5", + "topics": ["0xaadd128c35976a01ffffa9dfb8d363b3358597ce6b30248bcf25e80bd3af4512", + "0x000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266"], + "data": "0x", + "logIndex": 0, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da" + }, { + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", + "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000009bd03768a7dcc129555de410ff8e85528a4f88b5", + "0x00000000000000000000000070997970c51812dc3a010c7d01b50e0d17dc79c8"], + "data": "0x0000000000000000000000000000000000000000000000000000000005f49b7b", + "logIndex": 1, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da" + }, { + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853", + "topics": ["0x61cf0b3820dc69659f8669254f3b445c24c7080cb569a6316dacca8400b05e3b", + "0x0000000000000000000000009bd03768a7dcc129555de410ff8e85528a4f88b5"], + "data": "0x0000000000000000000000000000000000000000000000000000000000000000", + "logIndex": 2, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da" + }], + "blockNumber": 44, + "confirmations": 1, + "cumulativeGasUsed": 1000, + "effectiveGasPrice": 1000, + "status": true, + "type": 2, + "byzantium": true, + "events": [{ + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0x9bd03768a7DCc129555dE410FF8E85528A4F88b5", + "topics": ["0xaadd128c35976a01ffffa9dfb8d363b3358597ce6b30248bcf25e80bd3af4512", + "0x000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266"], + "data": "0x", + "logIndex": 0, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da" + }, { + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", + "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000009bd03768a7dcc129555de410ff8e85528a4f88b5", + "0x00000000000000000000000070997970c51812dc3a010c7d01b50e0d17dc79c8"], + "data": "0x0000000000000000000000000000000000000000000000000000000005f49b7b", + "logIndex": 1, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da" + }, { + "transactionIndex": 0, + "blockNumber": 44, + "transactionHash": "0x11adf820d448e3eda4ea387d93c3a9f6bc45ec49a8ca998e236cea16693e5526", + "address": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853", + "topics": ["0x61cf0b3820dc69659f8669254f3b445c24c7080cb569a6316dacca8400b05e3b", + "0x0000000000000000000000009bd03768a7dcc129555de410ff8e85528a4f88b5"], + "data": "0x0000000000000000000000000000000000000000000000000000000000000000", + "logIndex": 2, + "blockHash": "0xa88720eea0d2919dbce118f1df3eea63562323a777df9f2fc3ae7b1e78c726da", + "args": ["0x9bd03768a7DCc129555dE410FF8E85528A4F88b5", "0"], + "event": "contractClosed", + "eventSignature": "contractClosed(address,uint256)" + }] + } + } + + //@ts-ignore + const data = mapSentTxToTxEvent(abis, tx) + console.log(data); + expect(data.type).to.be.equal('closeout') + }) +}) \ No newline at end of file diff --git a/src/plugins/explorer/explorer.types.js b/src/plugins/explorer/explorer.types.js new file mode 100644 index 0000000..b85cc34 --- /dev/null +++ b/src/plugins/explorer/explorer.types.js @@ -0,0 +1,79 @@ +/** + * @typedef Indexer + * @property {(from: string, to: string, address: string, tokenAddress: string, page: number, pageSize: number) => Promise} getTokenTransactions + * @property {(from: string, to: string, address: string, tokenAddress: string, page: number, pageSize: number) => Promise} getEthTransactions +*/ + +/** + * @typedef TokenTxResItem + * @prop {string} blockNumber: '48651460', + * @prop {string} timeStamp: '1697549374', + * @prop {string} hash: '0x4c97a8c505e0e1ee15cd5788a44fd137ea475821b037b1bd9e36f0efb771f08a', + * @prop {string} nonce: '27', + * @prop {string} blockHash: '0x998bcbb22a4c769b088a26e0ddc9faba69b37b5f75f2b66b8a2b1160781d9f94', + * @prop {string} from: '0x1441bc52156cf18c12cde6a92ae6bde8b7f775d4', + * @prop {string} to: '0x6370ec8171bf9c0f858859b6430ab49149ac517c', + * @prop {string} contractAddress: '0x769313b5dfa559a592587bda63e083487db4dd74', + * @prop {string} value: '100000000', + * @prop {string} tokenName: 'Lumerin', + * @prop {string} tokenSymbol: 'LMR', + * @prop {string} tokenDecimal: '8', + * @prop {string} transactionIndex: '1', + * @prop {string} gas: '465374', + * @prop {string} gasPrice: '2600000000', + * @prop {string} gasUsed: '444521', + * @prop {string} cumulativeGasUsed: '444521', + * @prop {string} input: 'deprecated', + * @prop {string} confirmations: '302292' + */ + +/** + * @typedef TxListResItem + * @prop {string} blockNumber: '48651460', + * @prop {string} timeStamp: '1697549374', + * @prop {string} hash: '0x4c97a8c505e0e1ee15cd5788a44fd137ea475821b037b1bd9e36f0efb771f08a', + * @prop {string} nonce: '27', + * @prop {string} blockHash: '0x998bcbb22a4c769b088a26e0ddc9faba69b37b5f75f2b66b8a2b1160781d9f94', + * @prop {string} transactionIndex: '1', + * @prop {string} from: '0x1441bc52156cf18c12cde6a92ae6bde8b7f775d4', + * @prop {string} to: '0xa86b306790ece69bddcfb4c4fac3847524ce7e08', + * @prop {string} value: '200000000000000', + * @prop {string} gas: '465374', + * @prop {string} gasPrice: '100000000', + * @prop {string} gasPriceBid: '2600000000', + * @prop {string} isError: '0', + * @prop {string} txreceipt_status: '1', + * @prop {string} input: '0xee878a4a0000000000000000000000006370ec8171bf9c0f858859b6430ab49149ac517c00000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000017e303466376532383238646538613564313931633666666165313638306534303165316262363961393833633136366138356136383961393735373730326130393361336235393134303963666465633530343739303734326261623030383663613934353862336664363232643832303439663766343637623832613563376364613162613038393461613431356236663838616462663235633236323337303933623438366631386662373332653832353531623338613466623132323139613661656536656436636461326661326337313930353831636139346535393538376433653939316132323061333533333032373966363336383933663966653165643231366363346133383431616538613739666331636331616238663133366530343562353433323231363364333936333133383634323139373465633936336536653234633238623430386536393365393034393136386265616233303939316136373030653539366131646538333661666662643733616639640000', + * @prop {string} contractAddress: '', + * @prop {string} cumulativeGasUsed: '444521', + * @prop {string} gasUsed: '444521', + * @prop {string} confirmations: '302300', + * @prop {string} methodId: '0xee878a4a', + * @prop {string} functionName: '' + */ + +/** + * @typedef {Object} TransactionEvent + * if event is associated with a contract interaction, this will be named after the method, + * if not, it will be just transafer + * @property {TransactionType} type + * @property {string} txhash + * @property {number} blockNumber + * @property {string} timestamp + * @property {number} transactionFee // blockchain transaction fee + * array of transfers involved in the transaction + * @property {{token: "LMR"|"ETH", from: string, to: string, amount: string}[]} transfers + */ + +/** + * @typedef {"transfer" | "purchase" | "closeout" | "create" | "update" | "delete"} TransactionType + */ + +/** + * @typedef { Promise < T > & { once?: (e: string, cb: (payload: T) => void) => PromiEvent < T >, removeAllListeners: () => void } } PromiEvent < T > + * @template { any } T + */ + +/** + * @typedef {import("web3-utils").AbiItem & {signature?: string}} AbiItemSignature + */ \ No newline at end of file diff --git a/src/plugins/explorer/index.js b/src/plugins/explorer/index.js index 1ee5fc3..d5905e4 100644 --- a/src/plugins/explorer/index.js +++ b/src/plugins/explorer/index.js @@ -3,88 +3,89 @@ const logger = require('../../logger'); const Web3 = require('web3'); -const { Lumerin } = require('contracts-js'); +const { Lumerin, CloneFactory } = require('contracts-js'); -const createEventsRegistry = require('./events'); -const { logTransaction } = require('./log-transaction'); -const createQueue = require('./queue'); -const createStream = require('./blocks-stream'); -const createTransactionSyncer = require('./sync-transactions'); -const tryParseEventLog = require('./parse-log'); -const createExplorer = require('./explorer'); +const createBlockStream = require('./blocks-stream'); +const { createExplorer } = require('./explorer'); function createPlugin() { let blocksStream; - let syncer; - let interval; + /** @type {import('./explorer').Explorer} */ + let explorer = null; function start({ config, eventBus, plugins }) { // debug.enabled = config.debug; - const { lmrTokenAddress } = config; - const web3 = new Web3(plugins.eth.web3Provider); + /** @type { import('web3').default } */ + const web3 = plugins.eth.web3 - const web3Subscribable = new Web3(plugins.eth.web3SubscriptionProvider); + const lumerin = Lumerin(web3, config.lmrTokenAddress); + const cf = CloneFactory(web3, config.cloneFactoryAddress); - const eventsRegistry = createEventsRegistry(); - const queue = createQueue(config, eventBus, web3); - const lumerin = Lumerin(web3Subscribable, lmrTokenAddress); - - const explorer = createExplorer(config.explorerApiURLs, web3, lumerin, eventBus); - - syncer = createTransactionSyncer( - config, - eventBus, - web3, - queue, - eventsRegistry, - explorer - ); + explorer = createExplorer(config.explorerApiURLs, web3, lumerin, cf, config.pollingIntervalMs); logger.debug('Initiating blocks stream'); - const streamData = createStream(web3, config.blocksUpdateMs); - blocksStream = streamData.stream; - interval = streamData.interval; + blocksStream = createBlockStream(web3, config.blocksUpdateMs); - blocksStream.on('data', function ({ hash, number, timestamp }) { + blocksStream.stream.on('data', function ({ hash, number, timestamp }) { logger.debug('New block', hash, number); eventBus.emit('coin-block', { hash, number, timestamp }); - }); - blocksStream.on('error', function (err) { + eventBus.emit('web3-connection-status-changed', { connected: true }); + }).on('error', function (err) { logger.debug('Could not get latest block'); eventBus.emit('wallet-error', { inner: err, message: 'Could not get latest block', meta: { plugin: 'explorer' } }); + eventBus.emit('web3-connection-status-changed', { connected: false }); }); return { api: { - logTransaction: logTransaction(queue), - refreshAllTransactions: syncer.refreshAllTransactions, - registerEvent: eventsRegistry.register, - syncTransactions: syncer.syncTransactions, - tryParseEventLog: tryParseEventLog(web3, eventsRegistry), - getPastCoinTransactions: syncer.getPastCoinTransactions, + logTransaction: explorer.logTransaction, + syncTransactions: async (from, to, page, pageSize, walletAddress) => { + const txs = await explorer.getTransactions(from, to, page, pageSize, walletAddress); + if(page && pageSize) { + const hasNextPage = txs.length; + eventBus.emit('transactions-next-page', { + hasNextPage: Boolean(hasNextPage), + page: page + 1, + }) + } + + eventBus.emit('token-transactions-changed', txs); + return txs; + }, + startWatching: ({ walletAddress }) => { + explorer.startWatching( + walletAddress, + (tx) => eventBus.emit('token-transactions-changed', [tx]), + (err) => eventBus.emit('wallet-error', { + inner: err, + message: 'Could not get latest transactions', + meta: { plugin: 'explorer' }, + }) + ) + }, + stop: () => explorer.stopWatching(), }, events: [ 'token-transactions-changed', - 'wallet-state-changed', 'coin-block', + 'wallet-error', + 'web3-connection-status-changed', + // 'wallet-state-changed', 'transactions-next-page', - 'indexer-connection-status-changed', - 'wallet-error' + // 'indexer-connection-status-changed', ], name: 'explorer' }; } function stop() { - // blocksStream.destroy(); - blocksStream.removeAllListeners(); - clearInterval(interval); - syncer.stop(); + blocksStream.stop() + explorer.stopWatching() } return { diff --git a/src/plugins/explorer/log-transaction.js b/src/plugins/explorer/log-transaction.js deleted file mode 100644 index 0e43bfb..0000000 --- a/src/plugins/explorer/log-transaction.js +++ /dev/null @@ -1,39 +0,0 @@ -const logTransaction = queue => - function (promiEvent, from, metaParser) { - if (promiEvent.once) { - return new Promise(function (resolve, reject) { - /** - * @type {Partial} - */ - let transaction = {}; - promiEvent - .once('sending', function(payload){ - const params = payload.params[0] - transaction = { - from: params.from, - to: params.to, - value: params.value, - input: params.data, - gas: params.gas, - gasPrice: params.gasPrice, - maxFeePerGas: params.maxFeePerGas, - maxPriorityFeePerGas: params.maxPriorityFeePerGas, - } - }) - .once('receipt', function (receipt) { - // todo: get from to value from return data - transaction.hash = receipt.transactionHash - queue.addTx(from, metaParser)({ transaction, receipt }) - resolve({ receipt }); - }) - .once('error', function (err) { - promiEvent.removeAllListeners(); - reject(err); - }); - }); - } - } - -module.exports = { - logTransaction, -} \ No newline at end of file diff --git a/src/plugins/explorer/parse-log.js b/src/plugins/explorer/parse-log.js deleted file mode 100644 index ca216c5..0000000 --- a/src/plugins/explorer/parse-log.js +++ /dev/null @@ -1,48 +0,0 @@ -'use strict'; - -const tryParseEventLog = (web3, eventsRegistry) => - (log, address) => - eventsRegistry.getAll().map(function (registration) { - const { - abi, - contractAddress, - eventName, - filter, - metaParser - } = registration(address); - - const eventAbi = abi.find(e => e.type === 'event' && e.name === eventName); - - if (!eventAbi) { - return null; - } - - const signature = web3.eth.abi.encodeEventSignature(eventAbi); - - const data = log.data || (log.raw && log.raw.data); - const topics = log.topics || (log.raw && log.raw.topics); - - if (log.address !== contractAddress || topics[0] !== signature) { - return null; - } - - const returnValues = web3.eth.abi.decodeLog( - eventAbi.inputs, - data, - eventAbi.anonymous ? topics : topics.slice(1) - ); - - return { - contractAddress, - filter, - metaParser, - parsed: Object.assign({}, log, { - event: eventName, - returnValues, - signature - }) - } - }) - .filter(data => !!data); - -module.exports = tryParseEventLog; diff --git a/src/plugins/explorer/polling.js b/src/plugins/explorer/polling.js new file mode 100644 index 0000000..f48cc91 --- /dev/null +++ b/src/plugins/explorer/polling.js @@ -0,0 +1,174 @@ +//@ts-check +const { Lumerin, CloneFactory } = require('contracts-js') +const EventEmitter = require('events') +/** + * @type {typeof import('web3-eth-abi').default} + */ +//@ts-ignore +const abi = require('web3-eth-abi'); +const { promisify } = require('util') + + +class SubscriptionPolling { + /** @type {import('contracts-js').LumerinContext} */ + lumerin = null + /** @type {import('contracts-js').CloneFactoryContext} */ + clonefactory = null + + /** + * @param {import('web3').default} web3 + * @param {string} lmrAddress + * @param {string} cfAddr + * @param {number} pollingIntervalMs + */ + constructor(web3, lmrAddress, cfAddr, pollingIntervalMs = 3000) { + this.web3 = web3 + this.lumerin = Lumerin(web3, lmrAddress) + this.clonefactory = CloneFactory(web3, cfAddr) + this.pollingIntervalMs = pollingIntervalMs + } + + /** + * + * @param {string} address + * @param {number} fromBlock + * @param {number|"latest"} toBlock + * @return {Promise} + */ + async getPastTransactions(address, fromBlock, toBlock = "latest") { + return await this.lumerin.getPastEvents('Transfer', { + fromBlock: fromBlock, + toBlock: toBlock, + filter: [{ + from: address, + }, { to: address }] + }) + } + + /** + * @param {string} address + * @param {number} fromBlock + * @returns {import("ethereum-abi-types-generator").EventResponse} + */ + watchLMRTransactions(address, fromBlock) { + const eventEmitter = new EventEmitter() + let from = fromBlock || 0; + + const poll = async () => { + const events = await this.lumerin.getPastEvents('Transfer', { + fromBlock: from, + toBlock: "latest", + filter: { + from: "0x1111111111111111111111111111111111111111", + to: "0x2222222222222222222222222222222222222222", + } + }) + if (events.length > 0) { + from = events[events.length - 1].blockNumber + 1; + } + events.forEach(event => eventEmitter.emit('data', event)) + } + + setInterval(poll, this.pollingIntervalMs) + return eventEmitter + } + + watchETHTransactions(address) { + + } + + watchContractEvents(address) { + + } + + /** + * + * @param {string} walletAddress + * @param {string[]} contractAddresses + */ + async watchAllEvents(walletAddress, contractAddresses) { + // const batch = new this.web3.BatchRequest() + // batch.add({ + // name: 'eth_getLogs', + // }) + + const filter = await this.sendRaw({ + id: 1, + jsonrpc: '2.0', + method: 'eth_newFilter', + params: [{ + address: [ + this.lumerin._address, + this.clonefactory._address, + ], + fromBlock: '0x0', + toBlock: 'latest', + topics: [ + [ + abi.encodeEventSignature('Transfer(address,address,uint256)'), + abi.encodeEventSignature('clonefactoryContractPurchased(address,address)'), + abi.encodeEventSignature('contractDeleteUpdated(address,bool)'), + abi.encodeEventSignature('purchaseInfoUpdated(address)'), + abi.encodeEventSignature('contractClosed(address,uint256)'), + ], + [ + abi.encodeParameter('address', walletAddress), + ...contractAddresses.map(addr => abi.encodeParameter('address', addr)) + ], + ] + }], + }) + + const eventEmitter = new EventEmitter() + const poll = async () => { + const events = await this.sendRaw({ + id: 1, + jsonrpc: '2.0', + method: 'eth_getFilterChanges', + params: [filter.result], + }) + events.result.forEach(event => eventEmitter.emit('data', event)) + } + + setInterval(poll, this.pollingIntervalMs) + + return eventEmitter + } + + /** + * @param {import("web3-core-helpers").JsonRpcPayload} params + * @returns {Promise} + */ + sendRaw(params) { + /** @type {import('web3-core').HttpProvider} */ + //@ts-ignore + const provider = this.web3.currentProvider + return promisify(provider.send.bind(provider))(params) + } +} + +/** + * @typedef {Object} TransferEvent + * @property {"transfer"} type + * @property {string} txhash + * @property {string} internalId internal identifier, incremental, used in cases when one blockchain tx represents two transaction in multiple tokens + * @property {"LMR"|"ETH"} token + * @property {string} from + * @property {string} to + * @property {string} amount + * @property {string} timestamp + */ + +/** + * @typedef {Object} ContractEvent + * @property {"contract"} type + * @property {string} txhash + * @property {string} internalId + * @property {string} contractID + * @property {string} eventName + * @property {string} timestamp + */ + +module.exports = { + SubscriptionPolling, +} \ No newline at end of file diff --git a/src/plugins/explorer/polling.test.js b/src/plugins/explorer/polling.test.js new file mode 100644 index 0000000..ebad0c9 --- /dev/null +++ b/src/plugins/explorer/polling.test.js @@ -0,0 +1,30 @@ +/** + * @type {typeof import('web3').default} + */ +//@ts-ignore +const Web3 = require('web3') +const { SubscriptionPolling } = require('./polling') + +const web3 = new Web3('http://localhost:8545') + +const lumerinAddr = process.env.LUMERIN_ADDRESS || "" +const walletAddr = process.env.WALLET_ADDRESS || "" +const cfAddr = process.env.CLONEFACTORY_ADDRESS || "" + +if (!lumerinAddr || !walletAddr || !cfAddr) { + throw new Error('LUMERIN_ADDRESS or WALLET_ADDRESS or CLONEFACTORY_ADDRESS env variables must be set') +} + +new SubscriptionPolling(web3, lumerinAddr, cfAddr) + // .watchLMRTransactions(walletAddr, 0) + .watchAllEvents(walletAddr, []) + .then(emitter => { + emitter.on('data', (data) => { + console.log('data', data) + }) + emitter.on('error', (error) => { + console.log('error', error) + }) + }) + + diff --git a/src/plugins/explorer/queue.js b/src/plugins/explorer/queue.js deleted file mode 100644 index 919b76d..0000000 --- a/src/plugins/explorer/queue.js +++ /dev/null @@ -1,289 +0,0 @@ -'use strict' - -const { debounce, groupBy, merge, noop, reduce } = require('lodash') -const logger = require('../../logger'); -const getTransactionStatus = require('./transaction-status') -const promiseAllProps = require('promise-all-props') -const abiDecoder = require('abi-decoder') -const { Lumerin, CloneFactory } = require('contracts-js') - -function createQueue(config, eventBus, web3) { - // debug.enabled = config.debug - - const lumerin = Lumerin(web3, config.lmrTokenAddress) - const cloneFactory = CloneFactory(web3, config.cloneFactoryAddress) - abiDecoder.addABI(lumerin.options.jsonInterface) - abiDecoder.addABI(cloneFactory.options.jsonInterface) - - const metasCache = {} - - let pendingEvents = [] - - function mergeEvents(hash, events) { - const metas = events.map(({ event, metaParser }) => { - return metaParser(event)} - ) - - metas.unshift(metasCache[hash] || {}) - - metasCache[hash] = reduce(metas, merge) - - return metasCache[hash] - } - - const mergeDones = (events) => events.map((event) => event.done || noop) - - function fillInStatus({ transaction, receipt, meta }) { - if (receipt && meta) { - meta.contractCallFailed = !getTransactionStatus(transaction, receipt) - } - return { transaction, receipt, meta } - } - - const decodeInput = (address) => - ({ transaction, receipt, meta }) => { - try { - - // for lmr token transactions retrieved by rpc call - if (receipt.events){ - if (receipt.events.Transfer){ - const {from, to, value} = receipt.events.Transfer.returnValues - transaction.input = { - to, - from, - amount: value, - } - return { transaction, receipt, meta } - } - } - - // for lmr token transactions retrieved by etherscan api - if (receipt.tokenSymbol === 'LMR'){ - transaction.input = { - to: receipt.to, - from: receipt.from, - amount: receipt.value, - } - return { transaction, receipt, meta } - } - - if ( - typeof transaction.input === 'string' && - transaction.input !== '0x' - ) { - if (!receipt.logs){ - return { transaction, receipt, meta } - } - - - const logs = abiDecoder.decodeLogs(receipt.logs) - if (!logs) { - return null - } - - const transfer = logs.find( - (l) => - l.name === 'Transfer' && - l.events.find( - (e) => - (e.name === 'to' && e.value === address) || - (e.name === 'from' && e.value === address) - ) - ) - if (!transfer) { - return null - } - const { events } = transfer - - const valueParam = events.find((p) => p.name === 'value') - if (valueParam === undefined) { - return null - } - - const toParam = events.find((p) => p.name === 'to') - if (toParam === undefined) { - return null - } - - const fromParam = events.find((p) => p.name === 'from') - if (fromParam === undefined) { - return null - } - - transaction.input = { - to: toParam.value, - amount: valueParam.value, - from: fromParam.value, - } - return { transaction, receipt, meta } - } - return { transaction, receipt, meta } - } catch (err) { - return null - } - } - - function emitTransactions(address, transactions) { - eventBus.emit('token-transactions-changed', { - transactions: transactions - .filter((data) => !!data.transaction) - .map(fillInStatus) - .map(decodeInput(address.toLowerCase())) - .filter((i) => Number(i.transaction.value) !== 0) // filters out eth transactions that correspond to token transfers - .filter((i) => !!i) - }) - - eventBus.emit('eth-tx'); - eventBus.emit('lmr-tx'); - } - - function tryEmitTransactions(address, transactions) { - try { - emitTransactions(address, transactions) - return null - } catch (err) { - return err - } - } - - //TODO: accept transaction and reciept to avoid api calls - //TODO: if transaction/reciept are not available, use block number to get all transactions and receipts in one api call - function emitPendingEvents(address) { - logger.debug('About to emit pending events') - - const eventsToEmit = pendingEvents.filter((e) => e.address === address) - const eventsToKeep = pendingEvents.filter((e) => e.address !== address) - pendingEvents = eventsToKeep - - const grouped = groupBy(eventsToEmit, 'event.transactionHash') - - Promise.all( - Object.keys(grouped).map((hash) => - promiseAllProps({ - transaction: web3.eth.getTransaction(hash), - receipt: web3.eth.getTransactionReceipt(hash), - meta: mergeEvents(hash, grouped[hash]), - done: mergeDones(grouped[hash]), - }) - ) - ) - .then(function (transactions) { - const err = tryEmitTransactions(address, transactions) - return Promise.all( - transactions.map((transaction) => - Promise.all(transaction.done.map((done) => done(err))) - ) - ) - }) - .catch(function (err) { - eventBus.emit('wallet-error', { - inner: err, - message: 'Could not emit event transaction', - meta: { plugin: 'explorer' }, - }) - eventsToEmit.forEach(function (event) { - event.done(err) - }) - }) - } - - /** - * - * @param {string} address - * @param {any[]} eventsData - * @returns {void} - */ - function emitPendingEventsV2(address, eventsData) { - const transactionItems = eventsData.map((eventData) => ({ - transaction: eventData.event.transaction, - receipt: eventData.event.receipt, - meta: mergeEvents(eventData.event.receipt.transactionHash, [eventData]), - done: eventData.done - })) - - const err = tryEmitTransactions(address, transactionItems) - - transactionItems.forEach(async function (transaction) { - transaction.done(err) - }) - - if (err){ - eventBus.emit('wallet-error', { - inner: err, - message: 'Could not emit event transaction', - meta: { plugin: 'explorer' }, - }) - } - - return - } - - const debouncedEmitPendingEvents = debounce( - emitPendingEvents, - config.explorerDebounce - ) - - const addTransaction = (address, meta) => - function (hash) { - return new Promise(function (resolve, reject) { - const event = { - address, - event: { transactionHash: hash }, - metaParser: () => meta || {}, - done: (err) => (err ? reject(err) : resolve()), - } - pendingEvents.push(event) - - debouncedEmitPendingEvents(address) - }) - } - - const addTx = (address, metaParser) => (txAndReceipt) => { - return new Promise(function (resolve, reject) { - const event = { - address, - event: txAndReceipt, - metaParser: () => metaParser || {}, - done: (err) => (err ? reject(err) : resolve()), - } - - emitPendingEventsV2(address, [event]) - }) - } - - const addTxs = (address, metaParser) => (txAndReceipts) => { - return new Promise(function (resolve, reject) { - const events = txAndReceipts.map((txAndReceipt) => ({ - address, - event: txAndReceipt, - metaParser: () => metaParser || {}, - done: (err) => (err ? reject(err) : resolve()), - })) - - emitPendingEventsV2(address, events) - }) - } - - const addEvent = (address, metaParser) => - function (event) { - logger.debug('Queueing event', event.event) - return new Promise(function (resolve, reject) { - pendingEvents.push({ - address, - event, - metaParser, - done: (err) => (err ? reject(err) : resolve()), - }) - debouncedEmitPendingEvents(address) - }) - } - - return { - addEvent, - addTransaction, - addTx, - addTxs, - } -} - -module.exports = createQueue diff --git a/src/plugins/explorer/sync-transactions.js b/src/plugins/explorer/sync-transactions.js deleted file mode 100644 index f00432a..0000000 --- a/src/plugins/explorer/sync-transactions.js +++ /dev/null @@ -1,255 +0,0 @@ -'use strict'; - -const logger = require('../../logger'); - -// eslint-disable-next-line max-params -function createSyncer (config, eventBus, web3, queue, eventsRegistry, indexer) { - // debug.enabled = config.debug; - - let bestBlock; - const gotBestBlockPromise = new Promise(function (resolve) { - eventBus.once('coin-block', function (header) { - bestBlock = header.number; - logger.debug('Got best block', bestBlock); - resolve(bestBlock); - }); - }) - - function subscribeCoinTransactions (fromBlock, address) { - let shallResync = false; - let resyncing = false; - let bestSyncBlock = fromBlock; - - const { symbol, displayName } = config; - - // LMR transactions - indexer.getTransactionStream(address) - .on('data', (data)=>{ - queue.addTx(address, null)(mapApiResponseToTrxReceipt(data)) - }) - .on('resync', function () { - logger.debug(`Shall resync ${symbol} transactions on next block`) - shallResync = true; - }) - .on('error', function (err) { - logger.debug(`Shall resync ${symbol} transactions on next block`) - shallResync = true; - eventBus.emit('wallet-error', { - inner: err, - message: `Failed to sync ${displayName} transactions`, - meta: { plugin: 'explorer' } - }); - }); - - // ETH transactions - // Check if shall resync when a new block is seen, as that is the - // indication of proper reconnection to the Ethereum node. - eventBus.on('coin-block', function ({ number }) { - if (shallResync && !resyncing) { - resyncing = true; - shallResync = false; - // eslint-disable-next-line promise/catch-or-return - indexer.getETHTransactions(bestSyncBlock, number, address) - .then(function (transactions) { - const { length } = transactions; - logger.debug(`${length} past ETH transactions retrieved`) - const txs = transactions.map(mapApiResponseToTrxReceipt) - queue.addTxs(address,null)(txs) - bestSyncBlock = number; - }) - .catch(function (err) { - shallResync = true; - eventBus.emit('wallet-error', { - inner: err, - message: 'Failed to resync transactions', - meta: { plugin: 'explorer' } - }); - }) - .then(function () { - resyncing = false; - }) - } else if (!resyncing) { - bestBlock = number; - } - }) - } - - function mapApiResponseToTrxReceipt(trx){ - const transaction = { - from: trx.from, - to: trx.to, - value: trx.value, - input: trx.input, - gas: trx.gas, - gasPrice: trx.gasPrice, - hash: trx.hash, - nonce: trx.nonce, - logIndex: trx.logIndex, // emitted only in events, used to differentiate between LMR transfers within one transaction - // maxFeePerGas: params.maxFeePerGas, - // maxPriorityFeePerGas: params.maxPriorityFeePerGas, - } - - if (trx.returnValues){ - transaction.from = trx.returnValues.from; - transaction.to = trx.returnValues.to; - transaction.value = trx.returnValues.value; - transaction.hash = trx.transactionHash; - } - - const receipt = { - transactionHash: trx.hash, - transactionIndex: trx.transactionIndex, - blockHash: trx.blockHash, - blockNumber: trx.blockNumber, - from: trx.from, - to: trx.to, - value: trx.value, - contractAddress: trx.contractAddress, - cumulativeGasUsed: trx.cumulativeGasUsed, - gasUsed: trx.gasUsed, - tokenSymbol: trx.tokenSymbol, - } - - if (trx.returnValues){ - receipt.from = trx.returnValues.from; - receipt.to = trx.returnValues.to; - receipt.value = trx.returnValues.value; - receipt.transactionHash = trx.transactionHash; - receipt.tokenSymbol = trx.address === config.chain.lmrTokenAddress ? 'LMR' : undefined; - } - - return {transaction, receipt} - } - - /** - * @param {string} fromBlock - * @param {string} toBlock - * @param {string} address - * @returns {Promise} lastSyncedBlock - */ - async function getPastCoinTransactions (fromBlock, toBlock, address, page, pageSize) { - const { symbol } = config; - - const transactions = await indexer.getTransactions(fromBlock, toBlock || bestBlock, address, page, pageSize) - logger.debug(`${transactions.length} past ${symbol} transactions retrieved`); - - queue.addTxs(address, null)(transactions.map(mapApiResponseToTrxReceipt)) - - return toBlock; - } - - const subscriptions = []; - - function subscribeEvents (fromBlock, address) { - eventsRegistry.getAll().forEach(function (registration) { - let shallResync = false; - let resyncing = false; - let bestSyncBlock = fromBlock; - - const { - contractAddress, - abi, - eventName, - filter, - metaParser - } = registration(address); - - const contract = new web3.eth.Contract(abi, contractAddress); - - // Ignore missing events - if (!contract.events[eventName]) { - logger.error('Could not subscribe: event not found', eventName); - return; - } - - // Get past events and subscribe to incoming events - const emitter = contract.events[eventName]({ fromBlock, filter }) - .on('data', queue.addEvent(address, metaParser)) - .on('changed', queue.addEvent(address, metaParser)) - .on('error', function (err) { - logger.error('Shall resync events on next block'); - shallResync = true; - eventBus.emit('wallet-error', { - inner: err, - message: `Subscription to event ${eventName} failed`, - meta: { plugin: 'explorer' } - }) - }); - subscriptions.push(emitter); - - // Resync on new block or save it as best sync block - eventBus.on('coin-block', function ({ number }) { - if (shallResync && !resyncing) { - resyncing = true; - shallResync = false; - // eslint-disable-next-line promise/catch-or-return - getPastEventsWithChunks({ - address, - contract, - eventName, - fromBlock: bestSyncBlock, - toBlock: number, - filter, - metaParser - }) - .catch(function (err) { - shallResync = true - eventBus.emit('wallet-error', { - inner: err, - message: `Failed to resync event ${eventName}`, - meta: { plugin: 'explorer' } - }) - }) - .then(function () { - resyncing = false - }); - } else if (!resyncing) { - bestSyncBlock = number; - bestBlock = number; - } - }); - }); - } - - const syncTransactions = (fromBlock, address, onProgress, page, pageSize) => - gotBestBlockPromise - .then(function () { - logger.debug('Syncing', fromBlock, bestBlock); - subscribeCoinTransactions(bestBlock, address); - subscribeEvents(bestBlock, address); - return getPastCoinTransactions(fromBlock, bestBlock, address, page, pageSize) - }) - .then(function (syncedBlock) { - bestBlock = syncedBlock; - return syncedBlock; - }); - - const refreshAllTransactions = address => - gotBestBlockPromise - .then(() => { - return getPastCoinTransactions(0, bestBlock, address) - .then(function ([syncedBlock]) { - bestBlock = syncedBlock; - return syncedBlock; - }) - }); - - function stop () { - subscriptions.forEach(function (subscription) { - subscription.unsubscribe(function (err) { - if (err) { - logger.error('Could not unsubscribe from event', err.message); - } - }); - }); - } - - return { - getPastCoinTransactions, - refreshAllTransactions, - stop, - syncTransactions - }; -} - -module.exports = createSyncer; diff --git a/src/plugins/explorer/transaction-status.js b/src/plugins/explorer/transaction-status.js deleted file mode 100644 index e096d93..0000000 --- a/src/plugins/explorer/transaction-status.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict'; - -const { isNumber } = require('lodash'); - -function getTransactionStatus (transaction, receipt) { - if (!receipt) { - throw new Error('No transaction receipt'); - } - - const isMined = isNumber(receipt.blockNumber); - - const failed = receipt.status === false || ( - receipt.status === null && // no Byzantinum fork - transaction.input !== '0x' && // is contract call - transaction.gas === receipt.gasUsed && // used all gas - !receipt.logs.length // and no logs - ); - - return !isMined || (isMined && !failed); -} - -module.exports = getTransactionStatus; diff --git a/src/plugins/explorer/watcher-helpers.js b/src/plugins/explorer/watcher-helpers.js new file mode 100644 index 0000000..175fd30 --- /dev/null +++ b/src/plugins/explorer/watcher-helpers.js @@ -0,0 +1,54 @@ +/** + * @type {typeof import('web3-eth-abi').default} + */ +//@ts-ignore +const abi = require('web3-eth-abi'); + +/** + * @typedef {Object} EventData + * @property {string} data + * @property {string[]} topics + */ + +/** + * + * @param {AbiItemSignature[]} contractAbi + * @param {string} signature + * @returns {AbiItemSignature | null} + */ +function decodeAbiSignature(contractAbi, signature) { + return contractAbi.find((e) => e.signature === signature) +} +/** + * + * @param {AbiItemSignature[]} contractAbi + * @param {EventData} eventData + * @returns {{eventName: string, [key: string]: string}} + */ +function decodeEvent(contractAbi, eventData) { + const [eventSignature, ...restTopics] = eventData.topics + const eventAbi = decodeAbiSignature(contractAbi, eventSignature) + if (!eventAbi) { + throw new Error(`Event ${eventSignature} not found`) + } + try { + const data = abi.decodeLog(eventAbi.inputs, eventData.data, restTopics) + return { + eventName: eventAbi.name, + ...data + } + } catch (err) { + throw new Error(`Event ${eventAbi.name} decode error: ${err}`) + } +} + +/** @param {number} ms */ +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +module.exports = { + decodeEvent, + decodeAbiSignature, + sleep, +} \ No newline at end of file