From 5a48f9bce0708be629cad2fae515f507af6e6895 Mon Sep 17 00:00:00 2001 From: 0xbok <1689531+0xbok@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:44:55 +0530 Subject: [PATCH 1/3] create message queue table on new --- src/main.rs | 73 ++++++++++++++++++++++++++++---------------- src/send_consumer.rs | 72 +------------------------------------------ 2 files changed, 47 insertions(+), 98 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3d78e9b..d21ec90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use ethers::prelude::*; use axum::{extract::Extension, routing::get, Router, Server}; use lapin::Channel; use serde::{Deserialize, Serialize}; -use tokio_postgres::NoTls; +use tokio_postgres::{IsolationLevel, NoTls}; use tower_http::cors::{Any, CorsLayer}; use user_balance::UserBalanceConfig; @@ -38,9 +38,6 @@ struct Cli { /// Create or load a merkle tree #[arg(long)] merkle: MerkleCommand, - // Activates the mint pipeline - // #[arg(long, action=clap::ArgAction::SetTrue)] - // mint: bool, } #[derive(clap::ValueEnum, Debug, Clone)] @@ -120,7 +117,6 @@ async fn main() -> Result<()> { let mut handles = vec![]; let channel = Arc::new(channel); - // if let Some(t) = cli.merkle { //////////////// experimental GraphQL integration ///////////////// let (client, connection) = tokio_postgres::connect("host=localhost user=dev dbname=arcpay", NoTls).await?; @@ -141,30 +137,55 @@ async fn main() -> Result<()> { pre_image_table: "pre_image".to_string(), }; - let (proven_client, proven_connection) = - tokio_postgres::connect("host=localhost user=dev dbname=proven_arcpay", NoTls).await?; - - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = proven_connection.await { - eprintln!("connection error: {}", e); - } - }); - - let proven_client = Arc::new(RwLock::new(proven_client)); - let proven_db_config = PostgresDBConfig { - client: proven_client, - merkle_table: "merkle".to_string(), - pre_image_table: "pre_image".to_string(), + client: client.clone(), + merkle_table: "proven_merkle".to_string(), + pre_image_table: "proven_pre_image".to_string(), }; let mt = match cli.merkle { - MerkleCommand::New => ( - MerkleTree::::new(MERKLE_DEPTH, db_config).await?, - MerkleTree::::new(MERKLE_DEPTH, proven_db_config).await?, - ), + MerkleCommand::New => { + let mut client = client.write().await; + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await + .expect("Database::new message queue build_transaction.start() error"); + + // TODO remove drop + let query: String = format!( + "DROP TABLE IF EXISTS {queue_table};", + queue_table = QUEUE_NAME + ); + + let statement = tx.prepare(&query).await.unwrap(); + tx.execute(&statement, &[]).await.unwrap(); + + let query = format!( + "CREATE TABLE {queue_table} ( + id SERIAL PRIMARY KEY, + payload BYTEA NOT NULL, + status TEXT DEFAULT 'pending', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + processed_at TIMESTAMP + );", + queue_table = QUEUE_NAME + ); + + let statement = tx.prepare(&query).await.unwrap(); + tx.execute(&statement, &[]).await.unwrap(); + + tx.commit() + .await + .expect("Database::new message queue create table error"); + + ( + MerkleTree::::new(MERKLE_DEPTH, db_config).await?, + MerkleTree::::new(MERKLE_DEPTH, proven_db_config) + .await?, + ) + } MerkleCommand::Load => ( MerkleTree::::load(db_config).await?, MerkleTree::::load(proven_db_config).await?, @@ -204,9 +225,7 @@ async fn main() -> Result<()> { .await .unwrap() })); - // } - // if cli.mint { let provider = Arc::new(Provider::::try_from("https://rpc2.sepolia.org")?); let contract_address: H160 = ARCPAY_ADDRESS.parse::
().unwrap(); diff --git a/src/send_consumer.rs b/src/send_consumer.rs index de429fd..1a27c81 100644 --- a/src/send_consumer.rs +++ b/src/send_consumer.rs @@ -1,7 +1,5 @@ -use ethers::abi::{encode, Token}; -use ethers::types::transaction::eip712::{EIP712Domain, Eip712}; +use ethers::types::transaction::eip712::Eip712; -use ethers::utils::keccak256; use ethers::{ prelude::{Eip712, EthAbiType, U256}, types::Address, @@ -38,74 +36,6 @@ struct Send712 { receiver: Address, } -////// remove this debug trait ////// -// trait TraitName { -// // Compute the domain separator; -// // See: https://github.com/gakonst/ethers-rs/blob/master/examples/permit_hash.rs#L41 -// fn separato(&self) -> [u8; 32]; -// } - -// impl TraitName for EIP712Domain { -// // Compute the domain separator; -// // See: https://github.com/gakonst/ethers-rs/blob/master/examples/permit_hash.rs#L41 -// fn separato(&self) -> [u8; 32] { -// // full name is `EIP712Domain(string name,string version,uint256 chainId,address -// // verifyingContract,bytes32 salt)` -// let mut ty = "EIP712Domain(".to_string(); - -// let mut tokens = Vec::new(); -// let mut needs_comma = false; -// if let Some(ref name) = self.name { -// ty += "string name"; -// tokens.push(Token::Uint(U256::from(keccak256(name)))); -// needs_comma = true; -// } - -// if let Some(ref version) = self.version { -// if needs_comma { -// ty.push(','); -// } -// ty += "string version"; -// tokens.push(Token::Uint(U256::from(keccak256(version)))); -// needs_comma = true; -// } - -// if let Some(chain_id) = self.chain_id { -// if needs_comma { -// ty.push(','); -// } -// ty += "uint256 chainId"; -// tokens.push(Token::Uint(chain_id)); -// needs_comma = true; -// } - -// if let Some(verifying_contract) = self.verifying_contract { -// if needs_comma { -// ty.push(','); -// } -// ty += "address verifyingContract"; -// tokens.push(Token::Address(verifying_contract)); -// needs_comma = true; -// } - -// if let Some(salt) = self.salt { -// if needs_comma { -// ty.push(','); -// } -// ty += "bytes32 salt"; -// tokens.push(Token::Uint(U256::from(salt))); -// } - -// ty.push(')'); - -// tokens.insert(0, Token::Uint(U256::from(keccak256(ty)))); -// dbg!(&tokens); -// dbg!(&encode(&tokens)); -// keccak256(encode(&tokens)) -// } -// } -/////////////////////////// - /// Verify signature and public key in `sig` is correct. pub(crate) fn verify_ecdsa( leaf: &Leaf, From 061d3bf7f62fd7659cd8134823807107143025e0 Mon Sep 17 00:00:00 2001 From: 0xbok <1689531+0xbok@users.noreply.github.com> Date: Fri, 15 Sep 2023 02:17:26 +0530 Subject: [PATCH 2/3] use tx in pmtree api --- Cargo.lock | 24 +++++++++++++++----- Cargo.toml | 3 ++- src/main.rs | 52 ++++---------------------------------------- src/merkle.rs | 15 +++++++------ src/mint.rs | 2 +- src/model.rs | 41 ++++++++++++++++++++++++++++++---- src/send_consumer.rs | 6 ++--- 7 files changed, 73 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3dedd0c..6ebb88e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3069,7 +3069,7 @@ dependencies = [ "lazy_static", "num-bigint", "pg_bigdecimal", - "pmtree 1.0.0 (git+https://github.com/arcpay/pmtree?rev=577f222fd22d68ea33c3903d7eaf19f9ea24083e)", + "pmtree 1.0.0", "rln", "secp256k1", "serde", @@ -3707,12 +3707,12 @@ dependencies = [ [[package]] name = "pmtree" version = "1.0.0" -source = "git+https://github.com/arcpay/pmtree?rev=577f222fd22d68ea33c3903d7eaf19f9ea24083e#577f222fd22d68ea33c3903d7eaf19f9ea24083e" dependencies = [ "async-recursion", "async-trait", "rayon", "tokio", + "tokio-postgres", ] [[package]] @@ -3755,9 +3755,9 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d" +checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ "base64 0.21.2", "byteorder", @@ -5074,9 +5074,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.8" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1" +checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" dependencies = [ "async-trait", "byteorder", @@ -5091,9 +5091,11 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", + "rand", "socket2 0.5.3", "tokio", "tokio-util", + "whoami", ] [[package]] @@ -5888,6 +5890,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "whoami" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 5c19f20..f5fbb6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -pmtree = { git = "https://github.com/arcpay/pmtree", rev = "577f222fd22d68ea33c3903d7eaf19f9ea24083e" } +#pmtree = { git = "https://github.com/arcpay/pmtree", rev = "577f222fd22d68ea33c3903d7eaf19f9ea24083e" } +pmtree = { path = "../pmtree" } utils = { git = "https://github.com/vacp2p/zerokit.git", rev = "be88a432d7ac58d02371025f909ba087d0a013c0" } rln = { git = "https://github.com/vacp2p/zerokit.git", rev = "be88a432d7ac58d02371025f909ba087d0a013c0" } async-graphql = "5.0.10" diff --git a/src/main.rs b/src/main.rs index d21ec90..5695504 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use async_graphql::{EmptySubscription, Schema}; use ethers::prelude::*; use axum::{extract::Extension, routing::get, Router, Server}; -use lapin::Channel; use serde::{Deserialize, Serialize}; use tokio_postgres::{IsolationLevel, NoTls}; use tower_http::cors::{Any, CorsLayer}; @@ -12,8 +11,6 @@ use model::{Leaf, MutationRoot, SendMessageType, WithdrawMessageType}; use std::sync::Arc; use tokio::sync::RwLock; -use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties}; - use clap::Parser; use eyre::Result; // TODO replace .unwrap() with `?` and `wrap_err` from eyre::Result. @@ -49,7 +46,7 @@ enum MerkleCommand { struct ApiContext { user_balance_db: UserBalanceConfig, mt: Arc>>, - channel: Arc, + channel: Arc>, } const MERKLE_DEPTH: usize = 32; // TODO: read in from parameters file @@ -73,49 +70,8 @@ enum QueueMessage { #[tokio::main] async fn main() -> Result<()> { - //////////////// experimental RabbitMQ integration //////////////// - // Connect to the RabbitMQ server - let addr = "amqp://guest:guest@localhost:5672/%2f"; - let conn = Connection::connect(addr, ConnectionProperties::default()).await?; - - // Create a channel - let channel = conn.create_channel().await?; - - // TODO: declare queue as persistent through `QueueDeclareOptions`. See message durability section: - // https://www.rabbitmq.com/tutorials/tutorial-two-python.html. - // Also make messages persistent by updating publisher. The value `PERSISTENT_DELIVERY_MODE` is 2, - // see: https://pika.readthedocs.io/en/stable/_modules/pika/spec.html. - // Even this doesn't fully guarantee message persistence, see "Note on message persistence" - // in the tutorial which links to: https://www.rabbitmq.com/confirms.html#publisher-confirms. - // We have enabled publisher confirms in the next step. - channel - .queue_declare( - QUEUE_NAME, - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - // Enable publisher confirms - channel - .confirm_select(ConfirmSelectOptions::default()) - .await?; - - // Consume messages from the queue - let consumer = channel - .basic_consume( - QUEUE_NAME, - "my_consumer", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await - .expect("basic_consume"); - - println!("Waiting for messages..."); - let cli = Cli::parse(); let mut handles = vec![]; - let channel = Arc::new(channel); //////////////// experimental GraphQL integration ///////////////// let (client, connection) = @@ -193,7 +149,7 @@ async fn main() -> Result<()> { }; handles.push(tokio::spawn(async move { - send_consumer::send_consumer(consumer, RwLock::new(mt.1)).await + send_consumer::send_consumer(client.clone(), RwLock::new(mt.1)).await })); let user_balance_db = UserBalanceConfig { @@ -206,7 +162,7 @@ async fn main() -> Result<()> { .data(ApiContext { user_balance_db, mt: cur_merkle_tree.clone(), - channel: channel.clone(), + channel: client.clone(), }) .finish(); let cors = CorsLayer::new() @@ -243,7 +199,7 @@ async fn main() -> Result<()> { let filter = event.filter.from_block(0).to_block(BlockNumber::Finalized); */ handles.push(tokio::spawn(async move { - mint(contract, channel.clone(), cur_merkle_tree.clone()).await + mint(contract, client.clone(), cur_merkle_tree.clone()).await })); // } diff --git a/src/merkle.rs b/src/merkle.rs index 81cc4b8..ff0a05c 100644 --- a/src/merkle.rs +++ b/src/merkle.rs @@ -11,7 +11,7 @@ use rln::{ }; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; -use tokio_postgres::{Client, IsolationLevel}; +use tokio_postgres::{Client, IsolationLevel, Transaction}; use crate::model::{CoinRange, Leaf}; @@ -196,6 +196,7 @@ impl Database for PostgresDBConfig { key: DBKey, value: Value, pre_image: Option, + tx: &Transaction<'_>, ) -> PmtreeResult<()> { match pre_image { Some(t) => { @@ -223,12 +224,12 @@ impl Database for PostgresDBConfig { let mut client = self.client.write().await; - let tx = client - .build_transaction() - .isolation_level(IsolationLevel::Serializable) - .start() - .await - .expect("Database::put_with_pre_image build_transaction error"); + // let tx = client + // .build_transaction() + // .isolation_level(IsolationLevel::Serializable) + // .start() + // .await + // .expect("Database::put_with_pre_image build_transaction error"); // If `key` already exists in the table, update `leaf` column. let query = format!( diff --git a/src/mint.rs b/src/mint.rs index 998f8ba..bf7fcd1 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -13,7 +13,7 @@ use crate::{ pub(crate) async fn mint( contract: arc_pay_contract::ArcPayContract>, - channel: Arc, + channel: Arc>, mt: Arc>>, ) { dbg!("inmint"); diff --git a/src/model.rs b/src/model.rs index b7ba4a0..c658152 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,4 +1,4 @@ -use ethers::types::Address; +use ethers::types::{Address, Transaction}; use tokio::sync::RwLockWriteGuard; use async_graphql::{Context, EmptySubscription, InputObject, Object, Schema, SimpleObject}; @@ -7,6 +7,7 @@ use pmtree::{Hasher, MerkleTree}; use rln::circuit::Fr; use serde::{Deserialize, Deserializer, Serialize}; use serde_with::serde_as; +use tokio_postgres::IsolationLevel; use crate::{ merkle::{MyPoseidon, PostgresDBConfig}, @@ -134,6 +135,7 @@ fn to_my_fr(from: Vec<(Fr, u8)>) -> Vec<(MyFr, u8)> { pub(crate) async fn mint_in_merkle( mt: &mut RwLockWriteGuard<'_, MerkleTree>, leaf: Leaf, + tx: &tokio_postgres::Transaction<'_>, ) -> MerkleInfo { assert!(leaf.low_coin <= leaf.high_coin); @@ -142,7 +144,7 @@ pub(crate) async fn mint_in_merkle( Fr::from(leaf.low_coin), Fr::from(leaf.high_coin), ]); - mt.update_next(hash, Some(leaf)).await.unwrap(); + mt.update_next(hash, Some(leaf), tx).await.unwrap(); MerkleInfo { root: MyPoseidon::serialize(mt.root()), leaf: MyPoseidon::serialize(hash), @@ -156,6 +158,7 @@ pub(crate) async fn send_in_merkle( highest_coin_to_send: u64, recipient: &[u8; 20], is_return_proof: bool, + tx: &tokio_postgres::Transaction<'_>, ) -> Option<[Vec<(MyFr, u8)>; 3]> { assert!( leaf.low_coin <= highest_coin_to_send && highest_coin_to_send <= leaf.high_coin, @@ -187,7 +190,7 @@ pub(crate) async fn send_in_merkle( proofs[0] = from_proof; } match highest_coin_to_send == leaf.high_coin { - true => mt.set(index as usize, MyPoseidon::default_leaf(), None), + true => mt.set(index as usize, MyPoseidon::default_leaf(), None, tx), false => mt.set( index as usize, MyPoseidon::hash(&[ @@ -200,6 +203,7 @@ pub(crate) async fn send_in_merkle( low_coin: highest_coin_to_send + 1, high_coin: leaf.high_coin, }), + tx, ), } .await @@ -219,6 +223,7 @@ pub(crate) async fn send_in_merkle( low_coin: leaf.low_coin, high_coin: highest_coin_to_send, }), + tx, ) .await .unwrap(); @@ -245,6 +250,13 @@ impl MutationRoot { ) -> Vec { let api_context = ctx.data_unchecked::(); let mut mt = api_context.mt.write().await; + let client = mt.db.client.write().await; + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await + .expect("Database::put_with_pre_image build_transaction error"); multi_coin_tx.authorized().unwrap(); @@ -268,6 +280,7 @@ impl MutationRoot { highest_coin_to_send, &recipient, true, + &tx, ) .await .ok_or("proofs should be returned") @@ -287,6 +300,9 @@ impl MutationRoot { ))) .expect("unsafe_send: queue message should be serialized"); + let client = api_context.channel.write().await; + // .execute("INSERT INTO message_queue (payload) VALUES ($1)", &[&data]).await.unwrap(); + let confirm = channel .basic_publish( "", @@ -328,6 +344,13 @@ impl MutationRoot { let mut mt = api_context.mt.write().await; verify_ecdsa(&leaf, highest_coin_to_send, &recipient, sig); + let client = mt.db.client.write().await; + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await + .expect("Database::put_with_pre_image build_transaction error"); let proofs = send_in_merkle( &mut mt, index, @@ -335,6 +358,7 @@ impl MutationRoot { highest_coin_to_send, &recipient, true, + &tx, ) .await .ok_or("proofs should be returned") @@ -401,7 +425,16 @@ impl MutationRoot { ]) ); - mt.set(index as usize, MyPoseidon::default_leaf(), None) + let client = mt.db.client.write().await; + + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await + .expect("Database::put_with_pre_image build_transaction error"); + + mt.set(index as usize, MyPoseidon::default_leaf(), None, &tx) .await .unwrap(); diff --git a/src/send_consumer.rs b/src/send_consumer.rs index 1a27c81..fe0b4b9 100644 --- a/src/send_consumer.rs +++ b/src/send_consumer.rs @@ -4,8 +4,7 @@ use ethers::{ prelude::{Eip712, EthAbiType, U256}, types::Address, }; -use futures_lite::stream::StreamExt; -use lapin::{options::BasicAckOptions, Consumer}; +use lapin::options::BasicAckOptions; use pmtree::Hasher; use tokio::sync::RwLock; @@ -19,6 +18,7 @@ use crate::{ }; use crate::{QueueMessage, MAX_SINCE_LAST_PROVE}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Eip712, EthAbiType, Clone, Debug)] @@ -67,7 +67,7 @@ pub(crate) fn verify_ecdsa( // Run this in a separate thread. pub(crate) async fn send_consumer( - mut consumer: Consumer, + mut consumer: Arc>, mt: RwLock>, ) { let mut mint_time: U256 = U256::default(); From 4304d7c9b929721bd20eba850cdd1ab036517c32 Mon Sep 17 00:00:00 2001 From: 0xbok <1689531+0xbok@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:23:28 +0530 Subject: [PATCH 3/3] unsafe_send sends mesg to psql --- src/model.rs | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/src/model.rs b/src/model.rs index c658152..05f9a7d 100644 --- a/src/model.rs +++ b/src/model.rs @@ -379,27 +379,16 @@ impl MutationRoot { ))) .expect("unsafe_send: queue message should be serialized"); - let confirm = channel - .basic_publish( - "", - QUEUE_NAME, - BasicPublishOptions { - mandatory: true, - ..BasicPublishOptions::default() - }, - queue_message.as_slice(), - BasicProperties::default(), - ) - .await - .expect("basic_publish") - .await - .expect("publisher-confirms"); + let query = format!( + "INSERT INTO {message_queue} (payload) VALUES $1", + message_queue = QUEUE_NAME + ); - assert!(confirm.is_ack()); - // when `mandatory` is on, if the message is not sent to a queue for any reason - // (example, queues are full), the message is returned back. - // If the message isn't received back, then a queue has received the message. - assert_eq!(confirm.take_message(), None); + let statement = tx.prepare(&query).await.unwrap(); + let rows_modified = tx.execute(&statement, &[&queue_message]).await.unwrap(); + assert_eq!(rows_modified, 1, "should be only 1 new row"); + + tx.commit().await.unwrap(); root }