From 4e3b1f9bd9c90bbc0479f88276cd44eeac9789bf Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 14:53:22 -0700 Subject: [PATCH 1/7] Add a top-level "syncable" feature. --- Cargo.toml | 4 +++- src/lib.rs | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9f86a5dec..c26926621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,9 @@ version = "0.7.0" build = "build/version.rs" [features] -default = ["bundled_sqlite3"] +default = ["bundled_sqlite3", "syncable"] bundled_sqlite3 = ["rusqlite/bundled"] +syncable = ["mentat_tolstoy"] [workspace] members = ["tools/cli", "ffi"] @@ -72,6 +73,7 @@ path = "query-translator" [dependencies.mentat_tolstoy] path = "tolstoy" +optional = true [profile.release] opt-level = 3 diff --git a/src/lib.rs b/src/lib.rs index 7d783ea82..4ad7ec374 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,8 @@ extern crate mentat_query_projector; extern crate mentat_query_pull; extern crate mentat_query_translator; extern crate mentat_sql; + +#[cfg(feature = "syncable")] extern crate mentat_tolstoy; pub use mentat_core::{ From b94613b7b23382d5788b65ff3a6a84549c27881b Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 14:59:42 -0700 Subject: [PATCH 2/7] Add get_{chunks,transactions} to Tolstoy's remote client. --- tolstoy/src/syncer.rs | 129 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 5 deletions(-) diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index c4a77515c..074041986 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -289,9 +289,24 @@ struct SerializedTransaction<'a> { chunks: &'a Vec } +#[derive(Deserialize)] +struct DeserializableTransaction { + parent: Uuid, + chunks: Vec, + id: Uuid, + seq: i64, +} + +#[derive(Deserialize)] +struct SerializedTransactions { + limit: i64, + from: Uuid, + transactions: Vec, +} + struct RemoteClient { base_uri: String, - user_uuid: Uuid + user_uuid: Uuid, } @@ -308,9 +323,14 @@ impl RemoteClient { format!("{}/{}", self.base_uri, self.user_uuid) } + // TODO what we want is a method that returns a deserialized json structure. + // It'll need a type T so that consumers can specify what downloaded json will + // map to. I ran into borrow issues doing that - probably need to restructure + // this and use PhantomData markers or somesuch. + // But for now, we get code duplication. fn get_uuid(&self, uri: String) -> Result { let mut core = Core::new()?; - // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 + // TODO https://github.com/mozilla/mentat/issues/569 // let client = hyper::Client::configure() // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) // .build(&core.handle()); @@ -326,10 +346,10 @@ impl RemoteClient { println!("Response: {}", res.status()); res.body().concat2().and_then(move |body| { - let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { + let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { std::io::Error::new(std::io::ErrorKind::Other, e) })?; - Ok(head_json) + Ok(json) }) }); @@ -343,7 +363,7 @@ impl RemoteClient { fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> where hyper::Body: std::convert::From, { let mut core = Core::new()?; - // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 + // TODO https://github.com/mozilla/mentat/issues/569 // let client = hyper::Client::configure() // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) // .build(&core.handle()); @@ -372,6 +392,105 @@ impl RemoteClient { Ok(()) } + fn get_transactions(&self, parent_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transactions_json = core.run(work)?; + d(&format!("got transactions: {:?}", &transactions_json.transactions)); + Ok(transactions_json.transactions) + } + + fn get_chunks(&self, transaction_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transaction_json = core.run(work)?; + d(&format!("got transaction chunks: {:?}", &transaction_json.chunks)); + Ok(transaction_json.chunks) + } + + fn get_chunk(&self, chunk_uuid: &Uuid) -> Result { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: TxPart = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let chunk = core.run(work)?; + d(&format!("got transaction chunk: {:?}", &chunk)); + Ok(chunk) + } + fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { // {"parent": uuid, "chunks": [chunk1, chunk2...]} let transaction = SerializedTransaction { From 2c0203f74daae0b30ea46788c018f567f652c54b Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 15:20:05 -0700 Subject: [PATCH 3/7] Move Tolstoy remote client into separate module. --- tolstoy/src/lib.rs | 2 + tolstoy/src/remote_client.rs | 314 +++++++++++++++++++++++++++++++++++ tolstoy/src/syncer.rs | 283 +------------------------------ 3 files changed, 321 insertions(+), 278 deletions(-) create mode 100644 tolstoy/src/remote_client.rs diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index 51306f2bc..14d4d9f76 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -33,6 +33,8 @@ extern crate mentat_core; extern crate rusqlite; extern crate uuid; +mod remote_client; + pub mod schema; pub mod metadata; pub mod tx_processor; diff --git a/tolstoy/src/remote_client.rs b/tolstoy/src/remote_client.rs new file mode 100644 index 000000000..bea79b343 --- /dev/null +++ b/tolstoy/src/remote_client.rs @@ -0,0 +1,314 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std; + +use futures::{future, Future, Stream}; +use hyper; +// TODO: enable TLS support; hurdle is cross-compiling openssl for Android. +// See https://github.com/mozilla/mentat/issues/569 +// use hyper_tls; +use hyper::{ + Method, + Request, + StatusCode, + Error as HyperError +}; +use hyper::header::{ + ContentType, +}; +use serde::{ + Serialize, +}; +// TODO: https://github.com/mozilla/mentat/issues/570 +// use serde_cbor; +use serde_json; +use tokio_core::reactor::Core; +use uuid::Uuid; + +use errors::{ + Result, +}; + +use syncer::{ + // TODO: use `log` crate. + d, +}; + +use tx_processor::{ + TxPart, +}; + +#[derive(Serialize,Deserialize)] +struct SerializedHead { + head: Uuid +} + +#[derive(Serialize)] +struct SerializedTransaction<'a> { + parent: &'a Uuid, + chunks: &'a Vec +} + +#[derive(Deserialize)] +struct DeserializableTransaction { + parent: Uuid, + chunks: Vec, + id: Uuid, + seq: i64, +} + +#[derive(Deserialize)] +struct SerializedTransactions { + limit: i64, + from: Uuid, + transactions: Vec, +} + +pub(crate) struct RemoteClient { + base_uri: String, + user_uuid: Uuid, +} + +impl RemoteClient { + pub(crate) fn new(base_uri: String, user_uuid: Uuid) -> Self { + RemoteClient { + base_uri: base_uri, + user_uuid: user_uuid, + } + } + + fn bound_base_uri(&self) -> String { + // TODO escaping + format!("{}/{}", self.base_uri, self.user_uuid) + } + + // TODO what we want is a method that returns a deserialized json structure. + // It'll need a type T so that consumers can specify what downloaded json will + // map to. I ran into borrow issues doing that - probably need to restructure + // this and use PhantomData markers or somesuch. + // But for now, we get code duplication. + pub(crate) fn get_uuid(&self, uri: String) -> Result { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let head_json = core.run(work)?; + d(&format!("got head: {:?}", &head_json.head)); + Ok(head_json.head) + } + + pub(crate) fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> + where hyper::Body: std::convert::From, { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + let uri = uri.parse()?; + + d(&format!("PUT {:?}", uri)); + + let mut req = Request::new(Method::Put, uri); + req.headers_mut().set(ContentType::json()); + req.set_body(payload); + + let put = client.request(req).and_then(|res| { + let status_code = res.status(); + + if status_code != expected { + d(&format!("bad put response: {:?}", status_code)); + future::err(HyperError::Status) + } else { + future::ok(()) + } + }); + + core.run(put)?; + Ok(()) + } + + pub(crate) fn get_transactions(&self, parent_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transactions_json = core.run(work)?; + d(&format!("got transactions: {:?}", &transactions_json.transactions)); + Ok(transactions_json.transactions) + } + + pub(crate) fn get_chunks(&self, transaction_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transaction_json = core.run(work)?; + d(&format!("got transaction chunks: {:?}", &transaction_json.chunks)); + Ok(transaction_json.chunks) + } + + pub(crate) fn get_chunk(&self, chunk_uuid: &Uuid) -> Result { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: TxPart = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let chunk = core.run(work)?; + d(&format!("got transaction chunk: {:?}", &chunk)); + Ok(chunk) + } + + pub(crate) fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { + // {"parent": uuid, "chunks": [chunk1, chunk2...]} + let transaction = SerializedTransaction { + parent: parent_uuid, + chunks: chunks + }; + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let json = serde_json::to_string(&transaction)?; + d(&format!("serialized transaction: {:?}", json)); + self.put(uri, json, StatusCode::Created) + } + + pub(crate) fn get_head(&self) -> Result { + let uri = format!("{}/head", self.bound_base_uri()); + self.get_uuid(uri) + } + + pub(crate) fn put_head(&self, uuid: &Uuid) -> Result<()> { + // {"head": uuid} + let head = SerializedHead { + head: uuid.clone() + }; + + let uri = format!("{}/head", self.bound_base_uri()); + let json = serde_json::to_string(&head)?; + d(&format!("serialized head: {:?}", json)); + self.put(uri, json, StatusCode::NoContent) + } + + pub(crate) fn put_chunk(&self, chunk_uuid: &Uuid, payload: &T) -> Result<()> where T: Serialize { + let payload: String = serde_json::to_string(payload)?; + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + d(&format!("serialized chunk: {:?}", payload)); + // TODO don't want to clone every datom! + self.put(uri, payload, StatusCode::Created) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + #[test] + fn test_remote_client_bound_uri() { + let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); + let server_uri = String::from("https://example.com/api/0.1"); + let remote_client = RemoteClient::new(server_uri, user_uuid); + assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); + } +} diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index 074041986..d5a8d5dab 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -8,21 +8,9 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std; use std::collections::HashMap; -use futures::{future, Future, Stream}; -use hyper; -// TODO: enable TLS support; hurdle is cross-compiling openssl for Android. -// See https://github.com/mozilla/mentat/issues/569 -// use hyper_tls; -use hyper::{Method, Request, StatusCode, Error as HyperError}; -use hyper::header::{ContentType}; use rusqlite; -// TODO: https://github.com/mozilla/mentat/issues/570 -// use serde_cbor; -use serde_json; -use tokio_core::reactor::Core; use uuid::Uuid; use mentat_core::Entid; @@ -35,6 +23,10 @@ use errors::{ Result, }; +use remote_client::{ + RemoteClient, +}; + use tx_processor::{ Processor, TxReceiver, @@ -142,7 +134,7 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { // See https://github.com/mozilla/mentat/issues/570 // let cbor_val = serde_cbor::to_value(&datom)?; // self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?; - self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?; + self.remote_client.put_chunk(&datom_uuid, &datom)?; } // Upload tx. @@ -277,268 +269,3 @@ impl Syncer { Ok(()) } } - -#[derive(Serialize,Deserialize)] -struct SerializedHead { - head: Uuid -} - -#[derive(Serialize)] -struct SerializedTransaction<'a> { - parent: &'a Uuid, - chunks: &'a Vec -} - -#[derive(Deserialize)] -struct DeserializableTransaction { - parent: Uuid, - chunks: Vec, - id: Uuid, - seq: i64, -} - -#[derive(Deserialize)] -struct SerializedTransactions { - limit: i64, - from: Uuid, - transactions: Vec, -} - -struct RemoteClient { - base_uri: String, - user_uuid: Uuid, -} - - -impl RemoteClient { - fn new(base_uri: String, user_uuid: Uuid) -> Self { - RemoteClient { - base_uri: base_uri, - user_uuid: user_uuid - } - } - - fn bound_base_uri(&self) -> String { - // TODO escaping - format!("{}/{}", self.base_uri, self.user_uuid) - } - - // TODO what we want is a method that returns a deserialized json structure. - // It'll need a type T so that consumers can specify what downloaded json will - // map to. I ran into borrow issues doing that - probably need to restructure - // this and use PhantomData markers or somesuch. - // But for now, we get code duplication. - fn get_uuid(&self, uri: String) -> Result { - let mut core = Core::new()?; - // TODO https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - d(&format!("client")); - - let uri = uri.parse()?; - - d(&format!("parsed uri {:?}", uri)); - - let work = client.get(uri).and_then(|res| { - println!("Response: {}", res.status()); - - res.body().concat2().and_then(move |body| { - let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - Ok(json) - }) - }); - - d(&format!("running...")); - - let head_json = core.run(work)?; - d(&format!("got head: {:?}", &head_json.head)); - Ok(head_json.head) - } - - fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> - where hyper::Body: std::convert::From, { - let mut core = Core::new()?; - // TODO https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - let uri = uri.parse()?; - - d(&format!("PUT {:?}", uri)); - - let mut req = Request::new(Method::Put, uri); - req.headers_mut().set(ContentType::json()); - req.set_body(payload); - - let put = client.request(req).and_then(|res| { - let status_code = res.status(); - - if status_code != expected { - d(&format!("bad put response: {:?}", status_code)); - future::err(HyperError::Status) - } else { - future::ok(()) - } - }); - - core.run(put)?; - Ok(()) - } - - fn get_transactions(&self, parent_uuid: &Uuid) -> Result> { - let mut core = Core::new()?; - // TODO https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - d(&format!("client")); - - let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid); - let uri = uri.parse()?; - - d(&format!("parsed uri {:?}", uri)); - - let work = client.get(uri).and_then(|res| { - println!("Response: {}", res.status()); - - res.body().concat2().and_then(move |body| { - let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - Ok(json) - }) - }); - - d(&format!("running...")); - - let transactions_json = core.run(work)?; - d(&format!("got transactions: {:?}", &transactions_json.transactions)); - Ok(transactions_json.transactions) - } - - fn get_chunks(&self, transaction_uuid: &Uuid) -> Result> { - let mut core = Core::new()?; - // TODO https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - d(&format!("client")); - - let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); - let uri = uri.parse()?; - - d(&format!("parsed uri {:?}", uri)); - - let work = client.get(uri).and_then(|res| { - println!("Response: {}", res.status()); - - res.body().concat2().and_then(move |body| { - let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - Ok(json) - }) - }); - - d(&format!("running...")); - - let transaction_json = core.run(work)?; - d(&format!("got transaction chunks: {:?}", &transaction_json.chunks)); - Ok(transaction_json.chunks) - } - - fn get_chunk(&self, chunk_uuid: &Uuid) -> Result { - let mut core = Core::new()?; - // TODO https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - d(&format!("client")); - - let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); - let uri = uri.parse()?; - - d(&format!("parsed uri {:?}", uri)); - - let work = client.get(uri).and_then(|res| { - println!("Response: {}", res.status()); - - res.body().concat2().and_then(move |body| { - let json: TxPart = serde_json::from_slice(&body).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - Ok(json) - }) - }); - - d(&format!("running...")); - - let chunk = core.run(work)?; - d(&format!("got transaction chunk: {:?}", &chunk)); - Ok(chunk) - } - - fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { - // {"parent": uuid, "chunks": [chunk1, chunk2...]} - let transaction = SerializedTransaction { - parent: parent_uuid, - chunks: chunks - }; - - let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); - let json = serde_json::to_string(&transaction)?; - d(&format!("serialized transaction: {:?}", json)); - self.put(uri, json, StatusCode::Created) - } - - fn get_head(&self) -> Result { - let uri = format!("{}/head", self.bound_base_uri()); - self.get_uuid(uri) - } - - fn put_head(&self, uuid: &Uuid) -> Result<()> { - // {"head": uuid} - let head = SerializedHead { - head: uuid.clone() - }; - - let uri = format!("{}/head", self.bound_base_uri()); - let json = serde_json::to_string(&head)?; - d(&format!("serialized head: {:?}", json)); - self.put(uri, json, StatusCode::NoContent) - } - - fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> { - let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); - d(&format!("serialized chunk: {:?}", payload)); - // TODO don't want to clone every datom! - self.put(uri, payload.clone(), StatusCode::Created) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::str::FromStr; - - #[test] - fn test_remote_client_bound_uri() { - let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); - let server_uri = String::from("https://example.com/api/0.1"); - let remote_client = RemoteClient::new(server_uri, user_uuid); - assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); - } -} From 523fd09df9493eef6dbf60898b4b0770f0e6e9a1 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 15:38:16 -0700 Subject: [PATCH 4/7] Skip first (bootstrap) transaction. It's technically very challenging to consider syncing Mentat stores with different bootstrap transactions. Therefore, Tolstoy blindly skips the first (bootstrap) transaction. In the future, we can at least recognize that the underlying Mentat stores are incompatible and bail out early. --- tests/tolstoy.rs | 22 +++++++++++----------- tolstoy/src/tx_processor.rs | 14 ++++++++++++-- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 788dc442b..cdbeb9ef0 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -97,12 +97,12 @@ fn test_reader() { let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); { let db_tx = c.transaction().expect("db tx"); - // Don't inspect the bootstrap transaction, but we'd like to see it's there. + // Ensure that the first (bootstrap) transaction is skipped over. let mut receiver = TxCountingReceiver::new(); assert_eq!(false, receiver.is_done); Processor::process(&db_tx, None, &mut receiver).expect("processor"); assert_eq!(true, receiver.is_done); - assert_eq!(1, receiver.tx_count); + assert_eq!(0, receiver.tx_count); } let ids = conn.transact(&mut c, r#"[ @@ -112,7 +112,7 @@ fn test_reader() { ]"#).expect("successful transaction").tempids; let numba_entity_id = ids.get("s").unwrap(); - let bootstrap_tx; + let first_tx; { let db_tx = c.transaction().expect("db tx"); // Expect to see one more transaction of four parts (one for tx datom itself). @@ -121,10 +121,10 @@ fn test_reader() { println!("{:#?}", receiver); - assert_eq!(2, receiver.txes.keys().count()); - assert_tx_datoms_count(&receiver, 1, 4); + assert_eq!(1, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 0, 4); - bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx")); + first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx")); } let ids = conn.transact(&mut c, r#"[ @@ -138,14 +138,14 @@ fn test_reader() { // Expect to see a single two part transaction let mut receiver = TestingReceiver::new(); - // Note that we're asking for the bootstrap tx to be skipped by the processor. - Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor"); + // Note that we're asking for the first transacted tx to be skipped by the processor. + Processor::process(&db_tx, first_tx, &mut receiver).expect("processor"); - assert_eq!(2, receiver.txes.keys().count()); - assert_tx_datoms_count(&receiver, 1, 2); + assert_eq!(1, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 0, 2); // Inspect the transaction part. - let tx_id = receiver.txes.keys().nth(1).expect("tx"); + let tx_id = receiver.txes.keys().nth(0).expect("tx"); let datoms = receiver.txes.get(tx_id).expect("datoms"); let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom"); diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index ebc12caed..75318b788 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -130,14 +130,20 @@ impl Processor { pub fn process(sqlite: &rusqlite::Transaction, from_tx: Option, receiver: &mut R) -> Result<()> where R: TxReceiver { let tx_filter = match from_tx { - Some(tx) => format!(" WHERE tx > {} ", tx), + Some(tx) => format!(" WHERE tx > {}", tx), None => format!("") }; - let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter); + + let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions{} ORDER BY tx", tx_filter); let mut stmt = sqlite.prepare(&select_query)?; let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable(); + + // If no starting tx is provided, get everything but skip over the first (bootstrap) transaction. + let skip_first_tx = from_tx.is_none(); + let mut at_first_tx = true; let mut current_tx = None; + while let Some(row) = rows.next() { let datom = row?; @@ -153,6 +159,10 @@ impl Processor { }, None => { current_tx = Some(datom.tx); + if at_first_tx && skip_first_tx { + at_first_tx = false; + continue; + } receiver.tx( datom.tx, &mut DatomsIterator::new(&datom, &mut rows) From e91faae3fb5ed832c4470155a40696cc5bf5ff0e Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 15:47:32 -0700 Subject: [PATCH 5/7] Remove `open_empty`. This was a work-around for Tolstoy, which couldn't gracefully handle syncing a store with a bootstrap transaction. Tolstoy now handles that single transaction, so this is no longer necessary. --- src/conn.rs | 16 --------------- tools/cli/src/mentat_cli/command_parser.rs | 23 +--------------------- tools/cli/src/mentat_cli/repl.rs | 18 ----------------- 3 files changed, 1 insertion(+), 56 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 43d5a57e9..585e37db4 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -182,22 +182,6 @@ impl Store { }) } - /// Returns a totally blank store with no bootstrap schema. Use `open` instead. - pub fn open_empty(path: &str) -> Result { - if !path.is_empty() { - if Path::new(path).exists() { - bail!(ErrorKind::PathAlreadyExists(path.to_string())); - } - } - - let mut connection = ::new_connection(path)?; - let conn = Conn::empty(&mut connection)?; - Ok(Store { - conn: conn, - sqlite: connection, - }) - } - pub fn transact(&mut self, transaction: &str) -> Result { let mut ip = self.begin_transaction()?; let report = ip.transact(transaction)?; diff --git a/tools/cli/src/mentat_cli/command_parser.rs b/tools/cli/src/mentat_cli/command_parser.rs index 9eaa40a0a..eb2cefc38 100644 --- a/tools/cli/src/mentat_cli/command_parser.rs +++ b/tools/cli/src/mentat_cli/command_parser.rs @@ -46,7 +46,6 @@ pub static COMMAND_HELP: &'static str = &"help"; pub static COMMAND_IMPORT_LONG: &'static str = &"import"; pub static COMMAND_IMPORT_SHORT: &'static str = &"i"; pub static COMMAND_OPEN: &'static str = &"open"; -pub static COMMAND_OPEN_EMPTY: &'static str = &"empty"; pub static COMMAND_QUERY_LONG: &'static str = &"query"; pub static COMMAND_QUERY_SHORT: &'static str = &"q"; pub static COMMAND_QUERY_EXPLAIN_LONG: &'static str = &"explain_query"; @@ -66,7 +65,6 @@ pub enum Command { Help(Vec), Import(String), Open(String), - OpenEmpty(String), Query(String), QueryExplain(String), QueryPrepared(String), @@ -96,7 +94,6 @@ impl Command { &Command::Help(_) | &Command::Import(_) | &Command::Open(_) | - &Command::OpenEmpty(_) | &Command::Timer(_) | &Command::Schema | &Command::Sync(_) @@ -117,7 +114,6 @@ impl Command { &Command::Exit | &Command::Help(_) | &Command::Open(_) | - &Command::OpenEmpty(_) | &Command::QueryExplain(_) | &Command::Timer(_) | &Command::Schema | @@ -146,9 +142,6 @@ impl Command { &Command::Open(ref args) => { format!(".{} {}", COMMAND_OPEN, args) }, - &Command::OpenEmpty(ref args) => { - format!(".{} {}", COMMAND_OPEN_EMPTY, args) - }, &Command::Query(ref args) => { format!(".{} {}", COMMAND_QUERY_LONG, args) }, @@ -259,19 +252,6 @@ pub fn command(s: &str) -> Result { Ok(Command::Open(args[0].clone())) }); - let open_empty_parser = string(COMMAND_OPEN_EMPTY) - .with(spaces()) - .with(arguments()) - .map(|args| { - if args.len() < 1 { - bail!(cli::ErrorKind::CommandParse("Missing required argument".to_string())); - } - if args.len() > 1 { - bail!(cli::ErrorKind::CommandParse(format!("Unrecognized argument {:?}", args[1]))); - } - Ok(Command::OpenEmpty(args[0].clone())) - }); - let query_parser = try(string(COMMAND_QUERY_LONG)).or(try(string(COMMAND_QUERY_SHORT))) .with(edn_arg_parser()) .map(|x| { @@ -321,13 +301,12 @@ pub fn command(s: &str) -> Result { spaces() .skip(token('.')) - .with(choice::<[&mut Parser>; 14], _> + .with(choice::<[&mut Parser>; 13], _> ([&mut try(help_parser), &mut try(import_parser), &mut try(timer_parser), &mut try(cache_parser), &mut try(open_parser), - &mut try(open_empty_parser), &mut try(close_parser), &mut try(explain_query_parser), &mut try(exit_parser), diff --git a/tools/cli/src/mentat_cli/repl.rs b/tools/cli/src/mentat_cli/repl.rs index f6a47c09d..be460fb7d 100644 --- a/tools/cli/src/mentat_cli/repl.rs +++ b/tools/cli/src/mentat_cli/repl.rs @@ -261,12 +261,6 @@ impl Repl { Err(e) => eprintln!("{}", e.to_string()), }; }, - Command::OpenEmpty(db) => { - match self.open_empty(db) { - Ok(_) => println!("Empty database {:?} opened", self.db_name()), - Err(e) => eprintln!("{}", e.to_string()), - }; - }, Command::Query(query) => { self.store .q_once(query.as_str(), None) @@ -357,18 +351,6 @@ impl Repl { Ok(()) } - fn open_empty(&mut self, path: T) -> ::mentat::errors::Result<()> - where T: Into { - let path = path.into(); - if self.path.is_empty() || path != self.path { - let next = Store::open_empty(path.as_str())?; - self.path = path; - self.store = next; - } - - Ok(()) - } - // Close the current store by opening a new in-memory store in its place. fn close(&mut self) { let old_db_name = self.db_name(); From 4f4a169c71968fd1b9241ea8e48d08d4e452c587 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 6 Jun 2018 16:07:03 -0700 Subject: [PATCH 6/7] Accept a `rusqlite::Transaction` in Tolstoy's sync. This is part of a larger shift towards atomically updating `Conn`. The division of labour is (and must be) similar to `db` and `Conn` in a `transact`: `transact` coordinates `db` to do low-level work and processes progress reports, and then commits the SQL transaction and atomically advances `Conn`'s metadata. In a parallel fashion, `sync` will coordinate `tolstoy` to do low-level work before committing the SQL transaction and atomically advancing `Conn`'s metadata. --- src/conn.rs | 6 +++++- tolstoy/src/metadata.rs | 8 ++++---- tolstoy/src/schema.rs | 37 ++++++++++++++++++------------------- tolstoy/src/syncer.rs | 15 ++++++--------- tolstoy/src/tx_mapper.rs | 8 ++++---- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 585e37db4..b8a0af3a8 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -687,7 +687,11 @@ pub enum CacheAction { impl Syncable for Store { fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { let uuid = Uuid::parse_str(&user_uuid)?; - Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?) + let mut db_tx = self.sqlite.transaction()?; + Syncer::flow(&mut db_tx, server_uri, &uuid)?; + db_tx.commit()?; + + Ok(()) } } diff --git a/tolstoy/src/metadata.rs b/tolstoy/src/metadata.rs index b81969649..a52262c9d 100644 --- a/tolstoy/src/metadata.rs +++ b/tolstoy/src/metadata.rs @@ -54,16 +54,16 @@ mod tests { #[test] fn test_get_remote_head_default() { - let mut conn = schema::tests::setup_conn(); - let tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let tx = schema::tests::setup_tx(&mut conn); assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } #[test] fn test_set_and_get_remote_head() { - let mut conn = schema::tests::setup_conn(); + let mut conn = schema::tests::setup_conn_bare(); + let tx = schema::tests::setup_tx(&mut conn); let uuid = Uuid::new_v4(); - let tx = conn.transaction().expect("db tx"); SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded"); assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } diff --git a/tolstoy/src/schema.rs b/tolstoy/src/schema.rs index 9bd175ff6..6f0488a04 100644 --- a/tolstoy/src/schema.rs +++ b/tolstoy/src/schema.rs @@ -24,15 +24,13 @@ lazy_static! { }; } -pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> { - let tx = conn.transaction()?; - +pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> { for statement in (&SCHEMA_STATEMENTS).iter() { tx.execute(statement, &[])?; } tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?; - tx.commit().map_err(|e| e.into()) + Ok(()) } #[cfg(test)] @@ -40,7 +38,7 @@ pub mod tests { use super::*; use uuid::Uuid; - fn setup_conn_bare() -> rusqlite::Connection { + pub fn setup_conn_bare() -> rusqlite::Connection { let conn = rusqlite::Connection::open_in_memory().unwrap(); conn.execute_batch(" @@ -54,19 +52,24 @@ pub mod tests { conn } - pub fn setup_conn() -> rusqlite::Connection { - let mut conn = setup_conn_bare(); - ensure_current_version(&mut conn).expect("connection setup"); - conn + pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> { + conn.transaction().expect("tx") + } + + pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> { + let mut tx = conn.transaction().expect("tx"); + ensure_current_version(&mut tx).expect("connection setup"); + tx } #[test] fn test_empty() { let mut conn = setup_conn_bare(); + let mut tx = setup_tx_bare(&mut conn); - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); - let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap(); + let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap(); let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works"); let first: Result = keys_iter.next().unwrap().map_err(|e| e.into()); @@ -82,27 +85,23 @@ pub mod tests { #[test] fn test_non_empty() { let mut conn = setup_conn_bare(); + let mut tx = setup_tx_bare(&mut conn); - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); let test_uuid = Uuid::new_v4(); { - let tx = conn.transaction().unwrap(); let uuid_bytes = test_uuid.as_bytes().to_vec(); match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) { Err(e) => panic!("Error running an update: {}", e), _ => () } - match tx.commit() { - Err(e) => panic!("Error committing an update: {}", e), - _ => () - } } - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); // Check that running ensure_current_version on an initialized conn doesn't change anything. - let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap(); + let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap(); let mut values_iter = stmt.query_map(&[], |r| { let raw_uuid: Vec = r.get(0); Uuid::from_bytes(raw_uuid.as_slice()).unwrap() diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index d5a8d5dab..f3ee3a7a6 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -189,14 +189,13 @@ impl Syncer { Ok(()) } - pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> { + pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result<()> { d(&format!("sync flowing")); - ensure_current_version(sqlite)?; + ensure_current_version(db_tx)?; // TODO configure this sync with some auth data let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone()); - let mut db_tx = sqlite.transaction()?; let remote_head = remote_client.get_head()?; d(&format!("remote head {:?}", remote_head)); @@ -212,7 +211,7 @@ impl Syncer { let mut inquiring_tx_receiver = InquiringTxReceiver::new(); // TODO don't just start from the beginning... but then again, we should do this // without walking the table at all, and use the tx index. - Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?; + Processor::process(db_tx, None, &mut inquiring_tx_receiver)?; if !inquiring_tx_receiver.is_done { bail!(ErrorKind::TxProcessorUnfinished); } @@ -229,7 +228,7 @@ impl Syncer { // Check if the server is empty - populate it. if remote_head == Uuid::nil() { d(&format!("empty server!")); - Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?; + Syncer::upload_ours(db_tx, None, &remote_client, &remote_head)?; // Check if the server is the same as us, and if our HEAD moved. } else if locally_known_remote_head == remote_head { @@ -246,7 +245,7 @@ impl Syncer { // our sync becomes just bumping our local head. AFAICT below would currently fail. if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? { d(&format!("Fast-forwarding the server.")); - Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + Syncer::upload_ours(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; } else { d(&format!("Unable to fast-forward the server; missing local tx mapping")); bail!(ErrorKind::TxIncorrectlyMapped(0)); @@ -262,9 +261,7 @@ impl Syncer { )); } - // Commit everything, if there's anything to commit! - // Any new tx->uuid mappings and the new HEAD. We're synced! - db_tx.commit()?; + // Our caller will commit the tx with our changes when it's done. Ok(()) } diff --git a/tolstoy/src/tx_mapper.rs b/tolstoy/src/tx_mapper.rs index a5a84405e..eeee16f99 100644 --- a/tolstoy/src/tx_mapper.rs +++ b/tolstoy/src/tx_mapper.rs @@ -92,8 +92,8 @@ pub mod tests { #[test] fn test_getters() { - let mut conn = schema::tests::setup_conn(); - let mut tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let mut tx = schema::tests::setup_tx(&mut conn); assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success")); let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success"); assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success")); @@ -101,8 +101,8 @@ pub mod tests { #[test] fn test_bulk_setter() { - let mut conn = schema::tests::setup_conn(); - let mut tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let mut tx = schema::tests::setup_tx(&mut conn); let mut map = HashMap::new(); TxMapper::set_bulk(&mut tx, &map).expect("empty map success"); From 4b57ec738a413803aef29d61b55f06338546b9f9 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Fri, 16 Feb 2018 04:13:18 -0500 Subject: [PATCH 7/7] Implement fast-forward syncing. This handles the situation where only one of the local client and the remote server have advanced. --- db/src/db.rs | 18 ++++++ db/src/lib.rs | 3 +- src/conn.rs | 30 +++------ src/errors.rs | 5 ++ src/lib.rs | 9 ++- src/sync.rs | 133 +++++++++++++++++++++++++++++++++++++++ tolstoy/src/errors.rs | 5 -- tolstoy/src/lib.rs | 2 + tolstoy/src/syncer.rs | 84 +++++++++++++++++++------ tolstoy/src/tx_mapper.rs | 7 +++ 10 files changed, 250 insertions(+), 46 deletions(-) create mode 100644 src/sync.rs diff --git a/db/src/db.rs b/db/src/db.rs index de2ea52d2..89cb0e145 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1051,6 +1051,7 @@ pub trait PartitionMapping { fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow; fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow; fn contains_entid(&self, entid: Entid) -> bool; + fn expand_up_to(&mut self, partition: &S, entid: i64) where String: Borrow; } impl PartitionMapping for PartitionMap { @@ -1072,6 +1073,23 @@ impl PartitionMapping for PartitionMap { } } + fn expand_up_to(&mut self, partition: &S, entid: i64) where String: Borrow { + match self.get_mut(partition) { + Some(partition) => { + // Don't honour requests to shrink the partition. + if partition.index > entid { + return () + } + let new_index = entid + 1; + if partition.index != new_index { + partition.index = new_index; + } + }, + // This is a programming error. + None => panic!("Cannot expand unknown partition: {}", partition), + } + } + fn contains_entid(&self, entid: Entid) -> bool { self.values().any(|partition| partition.contains_entid(entid)) } diff --git a/db/src/lib.rs b/db/src/lib.rs index 39ec88200..92b8d77e0 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -50,8 +50,7 @@ mod tx_checking; pub mod types; mod upsert_resolution; -// Export these for reference from tests. cfg(test) should work, but doesn't. -// #[cfg(test)] +// Export these for reference from sync code and tests. pub use bootstrap::{ TX0, USER0, diff --git a/src/conn.rs b/src/conn.rs index b8a0af3a8..2acf49599 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -77,6 +77,7 @@ use mentat_db::{ }; use mentat_db::internal_types::TermWithTempIds; +use mentat_db::db::PartitionMapping; use mentat_query_pull::{ pull_attributes_for_entities, @@ -88,10 +89,6 @@ use edn::entities::{ OpType, }; -use mentat_tolstoy::Syncer; - -use uuid::Uuid; - use entity_builder::{ InProgressBuilder, TermBuilder, @@ -167,8 +164,8 @@ pub struct Conn { /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable /// for applications that don't require complex connection management. pub struct Store { + pub sqlite: rusqlite::Connection, conn: Conn, - sqlite: rusqlite::Connection, } impl Store { @@ -188,6 +185,12 @@ impl Store { ip.commit()?; Ok(report) } + + pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> { + let mut metadata = self.conn.metadata.lock().unwrap(); + metadata.partition_map.expand_up_to(":db.part/user", new_head); + db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into()) + } } pub trait Queryable { @@ -211,10 +214,6 @@ pub trait Pullable { where A: IntoIterator; } -pub trait Syncable { - fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; -} - /// Represents an in-progress, not yet committed, set of changes to the store. /// Call `commit` to commit your changes, or `rollback` to discard them. /// A transaction is held open until you do so. @@ -684,17 +683,6 @@ pub enum CacheAction { Deregister, } -impl Syncable for Store { - fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { - let uuid = Uuid::parse_str(&user_uuid)?; - let mut db_tx = self.sqlite.transaction()?; - Syncer::flow(&mut db_tx, server_uri, &uuid)?; - db_tx.commit()?; - - Ok(()) - } -} - impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { @@ -976,6 +964,8 @@ mod tests { Instant, }; + use uuid::Uuid; + use mentat_core::{ CachedAttributes, Binding, diff --git a/src/errors.rs b/src/errors.rs index 126faa00c..607a1913a 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -107,5 +107,10 @@ error_chain! { description("provided value doesn't match value type") display("provided value of type {} doesn't match attribute value type {}", provided, expected) } + + NotYetImplemented(t: String) { + description("not yet implemented") + display("not yet implemented: {}", t) + } } } diff --git a/src/lib.rs b/src/lib.rs index 4ad7ec374..edcdc680c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,14 @@ pub mod query; pub mod entity_builder; pub mod query_builder; +#[cfg(feature = "syncable")] +pub mod sync; + +#[cfg(feature = "syncable")] +pub use sync::{ + Syncable, +}; + pub use query::{ IntoResult, PlainSymbol, @@ -131,7 +139,6 @@ pub use conn::{ Metadata, Pullable, Queryable, - Syncable, Store, }; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 000000000..58e58c098 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,133 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use uuid::Uuid; + +use conn::Store; +use errors::{ + Result, + ErrorKind, +}; + +use mentat_core::{ + Entid, + KnownEntid, +}; +use mentat_db as db; + +use entity_builder::BuildTerms; + +use mentat_tolstoy::{ + Syncer, + SyncMetadataClient, + TxMapper, +}; +use mentat_tolstoy::syncer::{ + Tx, + SyncResult, +}; +use mentat_tolstoy::metadata::HeadTrackable; + +pub trait Syncable { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; + fn fast_forward_local(&mut self, txs: Vec) -> Result<()>; +} + +fn within_user_partition(entid: Entid) -> bool { + entid >= db::USER0 && entid < db::TX0 +} + +impl Syncable for Store { + fn fast_forward_local(&mut self, txs: Vec) -> Result<()> { + let mut last_tx_entid = None; + let mut last_tx_uuid = None; + + // During fast-forwarding, we will insert datoms with known entids + // which, by definition, fall outside of our user partition. + // Once we've done with insertion, we need to ensure that user + // partition's next allocation will not overlap with just-inserted datoms. + // To allow for "holes" in the user partition (due to data excision), + // we track the highest incoming entid we saw, and expand our + // local partition to match. + // In absence of excision and implementation bugs, this should work + // just as if we counted number of incoming entids and expanded by + // that number instead. + let mut largest_endid_encountered = db::USER0; + + for tx in txs { + let in_progress = self.begin_transaction()?; + let mut builder = in_progress.builder(); + for part in tx.parts { + if part.added { + builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?; + } else { + builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?; + } + // Ignore datoms that fall outside of the user partition: + if within_user_partition(part.e) && part.e > largest_endid_encountered { + largest_endid_encountered = part.e; + } + } + let report = builder.commit()?; + last_tx_entid = Some(report.tx_id); + last_tx_uuid = Some(tx.tx.clone()); + } + + // We've just transacted a new tx, and generated a new tx entid. + // Map it to the corresponding incoming tx uuid, advance our + // "locally known remote head". + if let Some(uuid) = last_tx_uuid { + if let Some(entid) = last_tx_entid { + { + let mut db_tx = self.sqlite.transaction()?; + SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?; + TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?; + db_tx.commit()?; + } + + // only need to advance the user partition, since we're using KnownEntid and partition won't + // get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder + // to create a tx and advance the partition for us. + self.fast_forward_user_partition(largest_endid_encountered)?; + } + } + + Ok(()) + } + + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { + let uuid = Uuid::parse_str(&user_uuid)?; + + let sync_result; + { + let mut db_tx = self.sqlite.transaction()?; + sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?; + + // TODO this should be done _after_ all of the operations below conclude! + // Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc) + db_tx.commit()?; + } + + // TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above, + // and so for now we split up sync into multiple db transactions /o\ + // Fixing this likely involves either implementing flow on InProgress, or changing flow to + // take an InProgress instead of a raw sql transaction. + + match sync_result { + SyncResult::EmptyServer => Ok(()), + SyncResult::NoChanges => Ok(()), + SyncResult::ServerFastForward => Ok(()), + SyncResult::Merge => bail!(ErrorKind::NotYetImplemented( + format!("Can't sync against diverged local.") + )), + SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs) + } + } +} diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs index de5dfdba0..33f2b8232 100644 --- a/tolstoy/src/errors.rs +++ b/tolstoy/src/errors.rs @@ -49,11 +49,6 @@ error_chain! { display("encountered unexpected state: {}", t) } - NotYetImplemented(t: String) { - description("not yet implemented") - display("not yet implemented: {}", t) - } - DuplicateMetadata(k: String) { description("encountered more than one metadata value for key") display("encountered more than one metadata value for key: {}", k) diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index 14d4d9f76..e9f699ca4 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -41,7 +41,9 @@ pub mod tx_processor; pub mod errors; pub mod syncer; pub mod tx_mapper; +pub use tx_mapper::TxMapper; pub use syncer::Syncer; +pub use metadata::SyncMetadataClient; pub use errors::{ Error, ErrorKind, diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index f3ee3a7a6..f9b7e169f 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -166,8 +166,23 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { } } +// For returning out of the downloader as an ordered list. +#[derive(Debug)] +pub struct Tx { + pub tx: Uuid, + pub parts: Vec, +} + +pub enum SyncResult { + EmptyServer, + NoChanges, + ServerFastForward, + LocalFastForward(Vec), + Merge, +} + impl Syncer { - fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { + fn fast_forward_server(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { let mut uploader = UploadingTxReceiver::new(remote_client, remote_head); Processor::process(db_tx, from_tx, &mut uploader)?; if !uploader.is_done { @@ -189,7 +204,34 @@ impl Syncer { Ok(()) } - pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result<()> { + fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result> { + let new_txs = remote_client.get_transactions(remote_head)?; + let mut tx_list = Vec::new(); + + for tx in new_txs { + let mut tx_parts = Vec::new(); + let chunks = remote_client.get_chunks(&tx)?; + + // We pass along all of the downloaded parts, including transaction's + // metadata datom. Transactor is expected to do the right thing, and + // use txInstant from one of our datoms. + for chunk in chunks { + let part = remote_client.get_chunk(&chunk)?; + tx_parts.push(part); + } + + tx_list.push(Tx { + tx: tx, + parts: tx_parts + }); + } + + d(&format!("got tx list: {:?}", &tx_list)); + + Ok(tx_list) + } + + pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result { d(&format!("sync flowing")); ensure_current_version(db_tx)?; @@ -200,7 +242,7 @@ impl Syncer { let remote_head = remote_client.get_head()?; d(&format!("remote head {:?}", remote_head)); - let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; + let locally_known_remote_head = SyncMetadataClient::remote_head(db_tx)?; d(&format!("local head {:?}", locally_known_remote_head)); // Local head: latest transaction that we have in the store, @@ -215,20 +257,21 @@ impl Syncer { if !inquiring_tx_receiver.is_done { bail!(ErrorKind::TxProcessorUnfinished); } - let have_local_changes = match inquiring_tx_receiver.last_tx { + let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx { Some(tx) => { - match TxMapper::get(&db_tx, tx)? { - Some(_) => false, - None => true + match TxMapper::get(db_tx, tx)? { + Some(_) => (false, false), + None => (true, false) } }, - None => false + None => (false, true) }; // Check if the server is empty - populate it. if remote_head == Uuid::nil() { d(&format!("empty server!")); - Syncer::upload_ours(db_tx, None, &remote_client, &remote_head)?; + Syncer::fast_forward_server(db_tx, None, &remote_client, &remote_head)?; + return Ok(SyncResult::EmptyServer); // Check if the server is the same as us, and if our HEAD moved. } else if locally_known_remote_head == remote_head { @@ -236,33 +279,38 @@ impl Syncer { if !have_local_changes { d(&format!("local HEAD did not move. Nothing to do!")); - return Ok(()); + return Ok(SyncResult::NoChanges); } d(&format!("local HEAD moved.")); // TODO it's possible that we've successfully advanced remote head previously, // but failed to advance our own local head. If that's the case, and we can recognize it, // our sync becomes just bumping our local head. AFAICT below would currently fail. - if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? { + if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(db_tx, &locally_known_remote_head)? { d(&format!("Fast-forwarding the server.")); - Syncer::upload_ours(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + Syncer::fast_forward_server(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + return Ok(SyncResult::ServerFastForward); } else { d(&format!("Unable to fast-forward the server; missing local tx mapping")); bail!(ErrorKind::TxIncorrectlyMapped(0)); } - // We diverged from the server. - // We'll need to rebase/merge ourselves on top of it. + // We diverged from the server. If we're lucky, we can just fast-forward local. + // Otherwise, a merge (or a rebase) is required. } else { d(&format!("server changed since last sync.")); - bail!(ErrorKind::NotYetImplemented( - format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) + // TODO local store moved forward since we last synced. Need to merge or rebase. + if !local_store_empty && have_local_changes { + return Ok(SyncResult::Merge); + } + + d(&format!("fast-forwarding local store.")); + return Ok(SyncResult::LocalFastForward( + Syncer::download_theirs(db_tx, &remote_client, &locally_known_remote_head)? )); } // Our caller will commit the tx with our changes when it's done. - - Ok(()) } } diff --git a/tolstoy/src/tx_mapper.rs b/tolstoy/src/tx_mapper.rs index eeee16f99..dd000b914 100644 --- a/tolstoy/src/tx_mapper.rs +++ b/tolstoy/src/tx_mapper.rs @@ -33,6 +33,13 @@ impl TxMapper { Ok(()) } + // TODO upsert...? error checking..? + pub fn set_tx_uuid(db_tx: &mut rusqlite::Transaction, tx: Entid, uuid: &Uuid) -> Result<()> { + let uuid_bytes = uuid.as_bytes().to_vec(); + db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?; + Ok(()) + } + // TODO for when we're downloading, right? pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result { match TxMapper::get(db_tx, tx)? {