Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pmtree = { git = "https://github.com/arcpay/pmtree", rev = "577f222fd22d68ea33c3903d7eaf19f9ea24083e" }
#pmtree = { git = "https://github.com/arcpay/pmtree", rev = "577f222fd22d68ea33c3903d7eaf19f9ea24083e" }
pmtree = { path = "../pmtree" }
utils = { git = "https://github.com/vacp2p/zerokit.git", rev = "be88a432d7ac58d02371025f909ba087d0a013c0" }
rln = { git = "https://github.com/vacp2p/zerokit.git", rev = "be88a432d7ac58d02371025f909ba087d0a013c0" }
async-graphql = "5.0.10"
Expand Down
125 changes: 50 additions & 75 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ use async_graphql::{EmptySubscription, Schema};
use ethers::prelude::*;

use axum::{extract::Extension, routing::get, Router, Server};
use lapin::Channel;
use serde::{Deserialize, Serialize};
use tokio_postgres::NoTls;
use tokio_postgres::{IsolationLevel, NoTls};
use tower_http::cors::{Any, CorsLayer};
use user_balance::UserBalanceConfig;

use model::{Leaf, MutationRoot, SendMessageType, WithdrawMessageType};
use std::sync::Arc;
use tokio::sync::RwLock;

use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};

use clap::Parser;
use eyre::Result; // TODO replace .unwrap() with `?` and `wrap_err` from eyre::Result.

Expand All @@ -38,9 +35,6 @@ struct Cli {
/// Create or load a merkle tree
#[arg(long)]
merkle: MerkleCommand,
// Activates the mint pipeline
// #[arg(long, action=clap::ArgAction::SetTrue)]
// mint: bool,
}

#[derive(clap::ValueEnum, Debug, Clone)]
Expand All @@ -52,7 +46,7 @@ enum MerkleCommand {
struct ApiContext {
user_balance_db: UserBalanceConfig,
mt: Arc<RwLock<MerkleTree<PostgresDBConfig, MyPoseidon>>>,
channel: Arc<Channel>,
channel: Arc<RwLock<tokio_postgres::Client>>,
}

const MERKLE_DEPTH: usize = 32; // TODO: read in from parameters file
Expand All @@ -76,51 +70,9 @@ enum QueueMessage {

#[tokio::main]
async fn main() -> Result<()> {
//////////////// experimental RabbitMQ integration ////////////////
// Connect to the RabbitMQ server
let addr = "amqp://guest:guest@localhost:5672/%2f";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;

// Create a channel
let channel = conn.create_channel().await?;

// TODO: declare queue as persistent through `QueueDeclareOptions`. See message durability section:
// https://www.rabbitmq.com/tutorials/tutorial-two-python.html.
// Also make messages persistent by updating publisher. The value `PERSISTENT_DELIVERY_MODE` is 2,
// see: https://pika.readthedocs.io/en/stable/_modules/pika/spec.html.
// Even this doesn't fully guarantee message persistence, see "Note on message persistence"
// in the tutorial which links to: https://www.rabbitmq.com/confirms.html#publisher-confirms.
// We have enabled publisher confirms in the next step.
channel
.queue_declare(
QUEUE_NAME,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
// Enable publisher confirms
channel
.confirm_select(ConfirmSelectOptions::default())
.await?;

// Consume messages from the queue
let consumer = channel
.basic_consume(
QUEUE_NAME,
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume");

println!("Waiting for messages...");

let cli = Cli::parse();
let mut handles = vec![];
let channel = Arc::new(channel);

// if let Some(t) = cli.merkle {
//////////////// experimental GraphQL integration /////////////////
let (client, connection) =
tokio_postgres::connect("host=localhost user=dev dbname=arcpay", NoTls).await?;
Expand All @@ -141,38 +93,63 @@ async fn main() -> Result<()> {
pre_image_table: "pre_image".to_string(),
};

let (proven_client, proven_connection) =
tokio_postgres::connect("host=localhost user=dev dbname=proven_arcpay", NoTls).await?;

// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = proven_connection.await {
eprintln!("connection error: {}", e);
}
});

let proven_client = Arc::new(RwLock::new(proven_client));

let proven_db_config = PostgresDBConfig {
client: proven_client,
merkle_table: "merkle".to_string(),
pre_image_table: "pre_image".to_string(),
client: client.clone(),
merkle_table: "proven_merkle".to_string(),
pre_image_table: "proven_pre_image".to_string(),
};

let mt = match cli.merkle {
MerkleCommand::New => (
MerkleTree::<PostgresDBConfig, MyPoseidon>::new(MERKLE_DEPTH, db_config).await?,
MerkleTree::<PostgresDBConfig, MyPoseidon>::new(MERKLE_DEPTH, proven_db_config).await?,
),
MerkleCommand::New => {
let mut client = client.write().await;
let tx = client
.build_transaction()
.isolation_level(IsolationLevel::Serializable)
.start()
.await
.expect("Database::new message queue build_transaction.start() error");

// TODO remove drop
let query: String = format!(
"DROP TABLE IF EXISTS {queue_table};",
queue_table = QUEUE_NAME
);

let statement = tx.prepare(&query).await.unwrap();
tx.execute(&statement, &[]).await.unwrap();

let query = format!(
"CREATE TABLE {queue_table} (
id SERIAL PRIMARY KEY,
payload BYTEA NOT NULL,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP
);",
queue_table = QUEUE_NAME
);

let statement = tx.prepare(&query).await.unwrap();
tx.execute(&statement, &[]).await.unwrap();

tx.commit()
.await
.expect("Database::new message queue create table error");

(
MerkleTree::<PostgresDBConfig, MyPoseidon>::new(MERKLE_DEPTH, db_config).await?,
MerkleTree::<PostgresDBConfig, MyPoseidon>::new(MERKLE_DEPTH, proven_db_config)
.await?,
)
}
MerkleCommand::Load => (
MerkleTree::<PostgresDBConfig, MyPoseidon>::load(db_config).await?,
MerkleTree::<PostgresDBConfig, MyPoseidon>::load(proven_db_config).await?,
),
};

handles.push(tokio::spawn(async move {
send_consumer::send_consumer(consumer, RwLock::new(mt.1)).await
send_consumer::send_consumer(client.clone(), RwLock::new(mt.1)).await
}));

let user_balance_db = UserBalanceConfig {
Expand All @@ -185,7 +162,7 @@ async fn main() -> Result<()> {
.data(ApiContext {
user_balance_db,
mt: cur_merkle_tree.clone(),
channel: channel.clone(),
channel: client.clone(),
})
.finish();
let cors = CorsLayer::new()
Expand All @@ -204,9 +181,7 @@ async fn main() -> Result<()> {
.await
.unwrap()
}));
// }

// if cli.mint {
let provider = Arc::new(Provider::<Http>::try_from("https://rpc2.sepolia.org")?);

let contract_address: H160 = ARCPAY_ADDRESS.parse::<Address>().unwrap();
Expand All @@ -224,7 +199,7 @@ async fn main() -> Result<()> {
let filter = event.filter.from_block(0).to_block(BlockNumber::Finalized);
*/
handles.push(tokio::spawn(async move {
mint(contract, channel.clone(), cur_merkle_tree.clone()).await
mint(contract, client.clone(), cur_merkle_tree.clone()).await
}));
// }

Expand Down
15 changes: 8 additions & 7 deletions src/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rln::{
};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tokio_postgres::{Client, IsolationLevel};
use tokio_postgres::{Client, IsolationLevel, Transaction};

use crate::model::{CoinRange, Leaf};

Expand Down Expand Up @@ -196,6 +196,7 @@ impl Database for PostgresDBConfig {
key: DBKey,
value: Value,
pre_image: Option<Self::PreImage>,
tx: &Transaction<'_>,
) -> PmtreeResult<()> {
match pre_image {
Some(t) => {
Expand Down Expand Up @@ -223,12 +224,12 @@ impl Database for PostgresDBConfig {

let mut client = self.client.write().await;

let tx = client
.build_transaction()
.isolation_level(IsolationLevel::Serializable)
.start()
.await
.expect("Database::put_with_pre_image build_transaction error");
// let tx = client
// .build_transaction()
// .isolation_level(IsolationLevel::Serializable)
// .start()
// .await
// .expect("Database::put_with_pre_image build_transaction error");

// If `key` already exists in the table, update `leaf` column.
let query = format!(
Expand Down
2 changes: 1 addition & 1 deletion src/mint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{

pub(crate) async fn mint(
contract: arc_pay_contract::ArcPayContract<Provider<Http>>,
channel: Arc<Channel>,
channel: Arc<RwLock<tokio_postgres::Client>>,
mt: Arc<RwLock<pmtree::MerkleTree<PostgresDBConfig, MyPoseidon>>>,
) {
dbg!("inmint");
Expand Down
Loading