diff --git a/Cargo.toml b/Cargo.toml index 9f86a5dec..c26926621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,9 @@ version = "0.7.0" build = "build/version.rs" [features] -default = ["bundled_sqlite3"] +default = ["bundled_sqlite3", "syncable"] bundled_sqlite3 = ["rusqlite/bundled"] +syncable = ["mentat_tolstoy"] [workspace] members = ["tools/cli", "ffi"] @@ -72,6 +73,7 @@ path = "query-translator" [dependencies.mentat_tolstoy] path = "tolstoy" +optional = true [profile.release] opt-level = 3 diff --git a/db/src/db.rs b/db/src/db.rs index de2ea52d2..89cb0e145 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1051,6 +1051,7 @@ pub trait PartitionMapping { fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow; fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow; fn contains_entid(&self, entid: Entid) -> bool; + fn expand_up_to(&mut self, partition: &S, entid: i64) where String: Borrow; } impl PartitionMapping for PartitionMap { @@ -1072,6 +1073,23 @@ impl PartitionMapping for PartitionMap { } } + fn expand_up_to(&mut self, partition: &S, entid: i64) where String: Borrow { + match self.get_mut(partition) { + Some(partition) => { + // Don't honour requests to shrink the partition. + if partition.index > entid { + return () + } + let new_index = entid + 1; + if partition.index != new_index { + partition.index = new_index; + } + }, + // This is a programming error. + None => panic!("Cannot expand unknown partition: {}", partition), + } + } + fn contains_entid(&self, entid: Entid) -> bool { self.values().any(|partition| partition.contains_entid(entid)) } diff --git a/db/src/lib.rs b/db/src/lib.rs index 39ec88200..92b8d77e0 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -50,8 +50,7 @@ mod tx_checking; pub mod types; mod upsert_resolution; -// Export these for reference from tests. cfg(test) should work, but doesn't. -// #[cfg(test)] +// Export these for reference from sync code and tests. pub use bootstrap::{ TX0, USER0, diff --git a/src/conn.rs b/src/conn.rs index 43d5a57e9..2acf49599 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -77,6 +77,7 @@ use mentat_db::{ }; use mentat_db::internal_types::TermWithTempIds; +use mentat_db::db::PartitionMapping; use mentat_query_pull::{ pull_attributes_for_entities, @@ -88,10 +89,6 @@ use edn::entities::{ OpType, }; -use mentat_tolstoy::Syncer; - -use uuid::Uuid; - use entity_builder::{ InProgressBuilder, TermBuilder, @@ -167,8 +164,8 @@ pub struct Conn { /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable /// for applications that don't require complex connection management. pub struct Store { + pub sqlite: rusqlite::Connection, conn: Conn, - sqlite: rusqlite::Connection, } impl Store { @@ -182,28 +179,18 @@ impl Store { }) } - /// Returns a totally blank store with no bootstrap schema. Use `open` instead. - pub fn open_empty(path: &str) -> Result { - if !path.is_empty() { - if Path::new(path).exists() { - bail!(ErrorKind::PathAlreadyExists(path.to_string())); - } - } - - let mut connection = ::new_connection(path)?; - let conn = Conn::empty(&mut connection)?; - Ok(Store { - conn: conn, - sqlite: connection, - }) - } - pub fn transact(&mut self, transaction: &str) -> Result { let mut ip = self.begin_transaction()?; let report = ip.transact(transaction)?; ip.commit()?; Ok(report) } + + pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> { + let mut metadata = self.conn.metadata.lock().unwrap(); + metadata.partition_map.expand_up_to(":db.part/user", new_head); + db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into()) + } } pub trait Queryable { @@ -227,10 +214,6 @@ pub trait Pullable { where A: IntoIterator; } -pub trait Syncable { - fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; -} - /// Represents an in-progress, not yet committed, set of changes to the store. /// Call `commit` to commit your changes, or `rollback` to discard them. /// A transaction is held open until you do so. @@ -700,13 +683,6 @@ pub enum CacheAction { Deregister, } -impl Syncable for Store { - fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { - let uuid = Uuid::parse_str(&user_uuid)?; - Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?) - } -} - impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { @@ -988,6 +964,8 @@ mod tests { Instant, }; + use uuid::Uuid; + use mentat_core::{ CachedAttributes, Binding, diff --git a/src/errors.rs b/src/errors.rs index 126faa00c..607a1913a 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -107,5 +107,10 @@ error_chain! { description("provided value doesn't match value type") display("provided value of type {} doesn't match attribute value type {}", provided, expected) } + + NotYetImplemented(t: String) { + description("not yet implemented") + display("not yet implemented: {}", t) + } } } diff --git a/src/lib.rs b/src/lib.rs index 7d783ea82..edcdc680c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,8 @@ extern crate mentat_query_projector; extern crate mentat_query_pull; extern crate mentat_query_translator; extern crate mentat_sql; + +#[cfg(feature = "syncable")] extern crate mentat_tolstoy; pub use mentat_core::{ @@ -103,6 +105,14 @@ pub mod query; pub mod entity_builder; pub mod query_builder; +#[cfg(feature = "syncable")] +pub mod sync; + +#[cfg(feature = "syncable")] +pub use sync::{ + Syncable, +}; + pub use query::{ IntoResult, PlainSymbol, @@ -129,7 +139,6 @@ pub use conn::{ Metadata, Pullable, Queryable, - Syncable, Store, }; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 000000000..58e58c098 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,133 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use uuid::Uuid; + +use conn::Store; +use errors::{ + Result, + ErrorKind, +}; + +use mentat_core::{ + Entid, + KnownEntid, +}; +use mentat_db as db; + +use entity_builder::BuildTerms; + +use mentat_tolstoy::{ + Syncer, + SyncMetadataClient, + TxMapper, +}; +use mentat_tolstoy::syncer::{ + Tx, + SyncResult, +}; +use mentat_tolstoy::metadata::HeadTrackable; + +pub trait Syncable { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; + fn fast_forward_local(&mut self, txs: Vec) -> Result<()>; +} + +fn within_user_partition(entid: Entid) -> bool { + entid >= db::USER0 && entid < db::TX0 +} + +impl Syncable for Store { + fn fast_forward_local(&mut self, txs: Vec) -> Result<()> { + let mut last_tx_entid = None; + let mut last_tx_uuid = None; + + // During fast-forwarding, we will insert datoms with known entids + // which, by definition, fall outside of our user partition. + // Once we've done with insertion, we need to ensure that user + // partition's next allocation will not overlap with just-inserted datoms. + // To allow for "holes" in the user partition (due to data excision), + // we track the highest incoming entid we saw, and expand our + // local partition to match. + // In absence of excision and implementation bugs, this should work + // just as if we counted number of incoming entids and expanded by + // that number instead. + let mut largest_endid_encountered = db::USER0; + + for tx in txs { + let in_progress = self.begin_transaction()?; + let mut builder = in_progress.builder(); + for part in tx.parts { + if part.added { + builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?; + } else { + builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?; + } + // Ignore datoms that fall outside of the user partition: + if within_user_partition(part.e) && part.e > largest_endid_encountered { + largest_endid_encountered = part.e; + } + } + let report = builder.commit()?; + last_tx_entid = Some(report.tx_id); + last_tx_uuid = Some(tx.tx.clone()); + } + + // We've just transacted a new tx, and generated a new tx entid. + // Map it to the corresponding incoming tx uuid, advance our + // "locally known remote head". + if let Some(uuid) = last_tx_uuid { + if let Some(entid) = last_tx_entid { + { + let mut db_tx = self.sqlite.transaction()?; + SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?; + TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?; + db_tx.commit()?; + } + + // only need to advance the user partition, since we're using KnownEntid and partition won't + // get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder + // to create a tx and advance the partition for us. + self.fast_forward_user_partition(largest_endid_encountered)?; + } + } + + Ok(()) + } + + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { + let uuid = Uuid::parse_str(&user_uuid)?; + + let sync_result; + { + let mut db_tx = self.sqlite.transaction()?; + sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?; + + // TODO this should be done _after_ all of the operations below conclude! + // Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc) + db_tx.commit()?; + } + + // TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above, + // and so for now we split up sync into multiple db transactions /o\ + // Fixing this likely involves either implementing flow on InProgress, or changing flow to + // take an InProgress instead of a raw sql transaction. + + match sync_result { + SyncResult::EmptyServer => Ok(()), + SyncResult::NoChanges => Ok(()), + SyncResult::ServerFastForward => Ok(()), + SyncResult::Merge => bail!(ErrorKind::NotYetImplemented( + format!("Can't sync against diverged local.") + )), + SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs) + } + } +} diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 788dc442b..cdbeb9ef0 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -97,12 +97,12 @@ fn test_reader() { let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); { let db_tx = c.transaction().expect("db tx"); - // Don't inspect the bootstrap transaction, but we'd like to see it's there. + // Ensure that the first (bootstrap) transaction is skipped over. let mut receiver = TxCountingReceiver::new(); assert_eq!(false, receiver.is_done); Processor::process(&db_tx, None, &mut receiver).expect("processor"); assert_eq!(true, receiver.is_done); - assert_eq!(1, receiver.tx_count); + assert_eq!(0, receiver.tx_count); } let ids = conn.transact(&mut c, r#"[ @@ -112,7 +112,7 @@ fn test_reader() { ]"#).expect("successful transaction").tempids; let numba_entity_id = ids.get("s").unwrap(); - let bootstrap_tx; + let first_tx; { let db_tx = c.transaction().expect("db tx"); // Expect to see one more transaction of four parts (one for tx datom itself). @@ -121,10 +121,10 @@ fn test_reader() { println!("{:#?}", receiver); - assert_eq!(2, receiver.txes.keys().count()); - assert_tx_datoms_count(&receiver, 1, 4); + assert_eq!(1, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 0, 4); - bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx")); + first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx")); } let ids = conn.transact(&mut c, r#"[ @@ -138,14 +138,14 @@ fn test_reader() { // Expect to see a single two part transaction let mut receiver = TestingReceiver::new(); - // Note that we're asking for the bootstrap tx to be skipped by the processor. - Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor"); + // Note that we're asking for the first transacted tx to be skipped by the processor. + Processor::process(&db_tx, first_tx, &mut receiver).expect("processor"); - assert_eq!(2, receiver.txes.keys().count()); - assert_tx_datoms_count(&receiver, 1, 2); + assert_eq!(1, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 0, 2); // Inspect the transaction part. - let tx_id = receiver.txes.keys().nth(1).expect("tx"); + let tx_id = receiver.txes.keys().nth(0).expect("tx"); let datoms = receiver.txes.get(tx_id).expect("datoms"); let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom"); diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs index de5dfdba0..33f2b8232 100644 --- a/tolstoy/src/errors.rs +++ b/tolstoy/src/errors.rs @@ -49,11 +49,6 @@ error_chain! { display("encountered unexpected state: {}", t) } - NotYetImplemented(t: String) { - description("not yet implemented") - display("not yet implemented: {}", t) - } - DuplicateMetadata(k: String) { description("encountered more than one metadata value for key") display("encountered more than one metadata value for key: {}", k) diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index 51306f2bc..e9f699ca4 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -33,13 +33,17 @@ extern crate mentat_core; extern crate rusqlite; extern crate uuid; +mod remote_client; + pub mod schema; pub mod metadata; pub mod tx_processor; pub mod errors; pub mod syncer; pub mod tx_mapper; +pub use tx_mapper::TxMapper; pub use syncer::Syncer; +pub use metadata::SyncMetadataClient; pub use errors::{ Error, ErrorKind, diff --git a/tolstoy/src/metadata.rs b/tolstoy/src/metadata.rs index b81969649..a52262c9d 100644 --- a/tolstoy/src/metadata.rs +++ b/tolstoy/src/metadata.rs @@ -54,16 +54,16 @@ mod tests { #[test] fn test_get_remote_head_default() { - let mut conn = schema::tests::setup_conn(); - let tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let tx = schema::tests::setup_tx(&mut conn); assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } #[test] fn test_set_and_get_remote_head() { - let mut conn = schema::tests::setup_conn(); + let mut conn = schema::tests::setup_conn_bare(); + let tx = schema::tests::setup_tx(&mut conn); let uuid = Uuid::new_v4(); - let tx = conn.transaction().expect("db tx"); SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded"); assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } diff --git a/tolstoy/src/remote_client.rs b/tolstoy/src/remote_client.rs new file mode 100644 index 000000000..bea79b343 --- /dev/null +++ b/tolstoy/src/remote_client.rs @@ -0,0 +1,314 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std; + +use futures::{future, Future, Stream}; +use hyper; +// TODO: enable TLS support; hurdle is cross-compiling openssl for Android. +// See https://github.com/mozilla/mentat/issues/569 +// use hyper_tls; +use hyper::{ + Method, + Request, + StatusCode, + Error as HyperError +}; +use hyper::header::{ + ContentType, +}; +use serde::{ + Serialize, +}; +// TODO: https://github.com/mozilla/mentat/issues/570 +// use serde_cbor; +use serde_json; +use tokio_core::reactor::Core; +use uuid::Uuid; + +use errors::{ + Result, +}; + +use syncer::{ + // TODO: use `log` crate. + d, +}; + +use tx_processor::{ + TxPart, +}; + +#[derive(Serialize,Deserialize)] +struct SerializedHead { + head: Uuid +} + +#[derive(Serialize)] +struct SerializedTransaction<'a> { + parent: &'a Uuid, + chunks: &'a Vec +} + +#[derive(Deserialize)] +struct DeserializableTransaction { + parent: Uuid, + chunks: Vec, + id: Uuid, + seq: i64, +} + +#[derive(Deserialize)] +struct SerializedTransactions { + limit: i64, + from: Uuid, + transactions: Vec, +} + +pub(crate) struct RemoteClient { + base_uri: String, + user_uuid: Uuid, +} + +impl RemoteClient { + pub(crate) fn new(base_uri: String, user_uuid: Uuid) -> Self { + RemoteClient { + base_uri: base_uri, + user_uuid: user_uuid, + } + } + + fn bound_base_uri(&self) -> String { + // TODO escaping + format!("{}/{}", self.base_uri, self.user_uuid) + } + + // TODO what we want is a method that returns a deserialized json structure. + // It'll need a type T so that consumers can specify what downloaded json will + // map to. I ran into borrow issues doing that - probably need to restructure + // this and use PhantomData markers or somesuch. + // But for now, we get code duplication. + pub(crate) fn get_uuid(&self, uri: String) -> Result { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let head_json = core.run(work)?; + d(&format!("got head: {:?}", &head_json.head)); + Ok(head_json.head) + } + + pub(crate) fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> + where hyper::Body: std::convert::From, { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + let uri = uri.parse()?; + + d(&format!("PUT {:?}", uri)); + + let mut req = Request::new(Method::Put, uri); + req.headers_mut().set(ContentType::json()); + req.set_body(payload); + + let put = client.request(req).and_then(|res| { + let status_code = res.status(); + + if status_code != expected { + d(&format!("bad put response: {:?}", status_code)); + future::err(HyperError::Status) + } else { + future::ok(()) + } + }); + + core.run(put)?; + Ok(()) + } + + pub(crate) fn get_transactions(&self, parent_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transactions_json = core.run(work)?; + d(&format!("got transactions: {:?}", &transactions_json.transactions)); + Ok(transactions_json.transactions) + } + + pub(crate) fn get_chunks(&self, transaction_uuid: &Uuid) -> Result> { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let transaction_json = core.run(work)?; + d(&format!("got transaction chunks: {:?}", &transaction_json.chunks)); + Ok(transaction_json.chunks) + } + + pub(crate) fn get_chunk(&self, chunk_uuid: &Uuid) -> Result { + let mut core = Core::new()?; + // TODO https://github.com/mozilla/mentat/issues/569 + // let client = hyper::Client::configure() + // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + // .build(&core.handle()); + let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); + + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + let uri = uri.parse()?; + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let json: TxPart = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(json) + }) + }); + + d(&format!("running...")); + + let chunk = core.run(work)?; + d(&format!("got transaction chunk: {:?}", &chunk)); + Ok(chunk) + } + + pub(crate) fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { + // {"parent": uuid, "chunks": [chunk1, chunk2...]} + let transaction = SerializedTransaction { + parent: parent_uuid, + chunks: chunks + }; + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let json = serde_json::to_string(&transaction)?; + d(&format!("serialized transaction: {:?}", json)); + self.put(uri, json, StatusCode::Created) + } + + pub(crate) fn get_head(&self) -> Result { + let uri = format!("{}/head", self.bound_base_uri()); + self.get_uuid(uri) + } + + pub(crate) fn put_head(&self, uuid: &Uuid) -> Result<()> { + // {"head": uuid} + let head = SerializedHead { + head: uuid.clone() + }; + + let uri = format!("{}/head", self.bound_base_uri()); + let json = serde_json::to_string(&head)?; + d(&format!("serialized head: {:?}", json)); + self.put(uri, json, StatusCode::NoContent) + } + + pub(crate) fn put_chunk(&self, chunk_uuid: &Uuid, payload: &T) -> Result<()> where T: Serialize { + let payload: String = serde_json::to_string(payload)?; + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + d(&format!("serialized chunk: {:?}", payload)); + // TODO don't want to clone every datom! + self.put(uri, payload, StatusCode::Created) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + #[test] + fn test_remote_client_bound_uri() { + let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); + let server_uri = String::from("https://example.com/api/0.1"); + let remote_client = RemoteClient::new(server_uri, user_uuid); + assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); + } +} diff --git a/tolstoy/src/schema.rs b/tolstoy/src/schema.rs index 9bd175ff6..6f0488a04 100644 --- a/tolstoy/src/schema.rs +++ b/tolstoy/src/schema.rs @@ -24,15 +24,13 @@ lazy_static! { }; } -pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> { - let tx = conn.transaction()?; - +pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> { for statement in (&SCHEMA_STATEMENTS).iter() { tx.execute(statement, &[])?; } tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?; - tx.commit().map_err(|e| e.into()) + Ok(()) } #[cfg(test)] @@ -40,7 +38,7 @@ pub mod tests { use super::*; use uuid::Uuid; - fn setup_conn_bare() -> rusqlite::Connection { + pub fn setup_conn_bare() -> rusqlite::Connection { let conn = rusqlite::Connection::open_in_memory().unwrap(); conn.execute_batch(" @@ -54,19 +52,24 @@ pub mod tests { conn } - pub fn setup_conn() -> rusqlite::Connection { - let mut conn = setup_conn_bare(); - ensure_current_version(&mut conn).expect("connection setup"); - conn + pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> { + conn.transaction().expect("tx") + } + + pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> { + let mut tx = conn.transaction().expect("tx"); + ensure_current_version(&mut tx).expect("connection setup"); + tx } #[test] fn test_empty() { let mut conn = setup_conn_bare(); + let mut tx = setup_tx_bare(&mut conn); - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); - let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap(); + let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap(); let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works"); let first: Result = keys_iter.next().unwrap().map_err(|e| e.into()); @@ -82,27 +85,23 @@ pub mod tests { #[test] fn test_non_empty() { let mut conn = setup_conn_bare(); + let mut tx = setup_tx_bare(&mut conn); - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); let test_uuid = Uuid::new_v4(); { - let tx = conn.transaction().unwrap(); let uuid_bytes = test_uuid.as_bytes().to_vec(); match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) { Err(e) => panic!("Error running an update: {}", e), _ => () } - match tx.commit() { - Err(e) => panic!("Error committing an update: {}", e), - _ => () - } } - assert!(ensure_current_version(&mut conn).is_ok()); + assert!(ensure_current_version(&mut tx).is_ok()); // Check that running ensure_current_version on an initialized conn doesn't change anything. - let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap(); + let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap(); let mut values_iter = stmt.query_map(&[], |r| { let raw_uuid: Vec = r.get(0); Uuid::from_bytes(raw_uuid.as_slice()).unwrap() diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index c4a77515c..f9b7e169f 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -8,21 +8,9 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std; use std::collections::HashMap; -use futures::{future, Future, Stream}; -use hyper; -// TODO: enable TLS support; hurdle is cross-compiling openssl for Android. -// See https://github.com/mozilla/mentat/issues/569 -// use hyper_tls; -use hyper::{Method, Request, StatusCode, Error as HyperError}; -use hyper::header::{ContentType}; use rusqlite; -// TODO: https://github.com/mozilla/mentat/issues/570 -// use serde_cbor; -use serde_json; -use tokio_core::reactor::Core; use uuid::Uuid; use mentat_core::Entid; @@ -35,6 +23,10 @@ use errors::{ Result, }; +use remote_client::{ + RemoteClient, +}; + use tx_processor::{ Processor, TxReceiver, @@ -142,7 +134,7 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { // See https://github.com/mozilla/mentat/issues/570 // let cbor_val = serde_cbor::to_value(&datom)?; // self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?; - self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?; + self.remote_client.put_chunk(&datom_uuid, &datom)?; } // Upload tx. @@ -174,8 +166,23 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { } } +// For returning out of the downloader as an ordered list. +#[derive(Debug)] +pub struct Tx { + pub tx: Uuid, + pub parts: Vec, +} + +pub enum SyncResult { + EmptyServer, + NoChanges, + ServerFastForward, + LocalFastForward(Vec), + Merge, +} + impl Syncer { - fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { + fn fast_forward_server(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { let mut uploader = UploadingTxReceiver::new(remote_client, remote_head); Processor::process(db_tx, from_tx, &mut uploader)?; if !uploader.is_done { @@ -197,19 +204,45 @@ impl Syncer { Ok(()) } - pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> { + fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result> { + let new_txs = remote_client.get_transactions(remote_head)?; + let mut tx_list = Vec::new(); + + for tx in new_txs { + let mut tx_parts = Vec::new(); + let chunks = remote_client.get_chunks(&tx)?; + + // We pass along all of the downloaded parts, including transaction's + // metadata datom. Transactor is expected to do the right thing, and + // use txInstant from one of our datoms. + for chunk in chunks { + let part = remote_client.get_chunk(&chunk)?; + tx_parts.push(part); + } + + tx_list.push(Tx { + tx: tx, + parts: tx_parts + }); + } + + d(&format!("got tx list: {:?}", &tx_list)); + + Ok(tx_list) + } + + pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result { d(&format!("sync flowing")); - ensure_current_version(sqlite)?; + ensure_current_version(db_tx)?; // TODO configure this sync with some auth data let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone()); - let mut db_tx = sqlite.transaction()?; let remote_head = remote_client.get_head()?; d(&format!("remote head {:?}", remote_head)); - let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; + let locally_known_remote_head = SyncMetadataClient::remote_head(db_tx)?; d(&format!("local head {:?}", locally_known_remote_head)); // Local head: latest transaction that we have in the store, @@ -220,24 +253,25 @@ impl Syncer { let mut inquiring_tx_receiver = InquiringTxReceiver::new(); // TODO don't just start from the beginning... but then again, we should do this // without walking the table at all, and use the tx index. - Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?; + Processor::process(db_tx, None, &mut inquiring_tx_receiver)?; if !inquiring_tx_receiver.is_done { bail!(ErrorKind::TxProcessorUnfinished); } - let have_local_changes = match inquiring_tx_receiver.last_tx { + let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx { Some(tx) => { - match TxMapper::get(&db_tx, tx)? { - Some(_) => false, - None => true + match TxMapper::get(db_tx, tx)? { + Some(_) => (false, false), + None => (true, false) } }, - None => false + None => (false, true) }; // Check if the server is empty - populate it. if remote_head == Uuid::nil() { d(&format!("empty server!")); - Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?; + Syncer::fast_forward_server(db_tx, None, &remote_client, &remote_head)?; + return Ok(SyncResult::EmptyServer); // Check if the server is the same as us, and if our HEAD moved. } else if locally_known_remote_head == remote_head { @@ -245,181 +279,38 @@ impl Syncer { if !have_local_changes { d(&format!("local HEAD did not move. Nothing to do!")); - return Ok(()); + return Ok(SyncResult::NoChanges); } d(&format!("local HEAD moved.")); // TODO it's possible that we've successfully advanced remote head previously, // but failed to advance our own local head. If that's the case, and we can recognize it, // our sync becomes just bumping our local head. AFAICT below would currently fail. - if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? { + if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(db_tx, &locally_known_remote_head)? { d(&format!("Fast-forwarding the server.")); - Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + Syncer::fast_forward_server(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + return Ok(SyncResult::ServerFastForward); } else { d(&format!("Unable to fast-forward the server; missing local tx mapping")); bail!(ErrorKind::TxIncorrectlyMapped(0)); } - // We diverged from the server. - // We'll need to rebase/merge ourselves on top of it. + // We diverged from the server. If we're lucky, we can just fast-forward local. + // Otherwise, a merge (or a rebase) is required. } else { d(&format!("server changed since last sync.")); - bail!(ErrorKind::NotYetImplemented( - format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) - )); - } - - // Commit everything, if there's anything to commit! - // Any new tx->uuid mappings and the new HEAD. We're synced! - db_tx.commit()?; - - Ok(()) - } -} - -#[derive(Serialize,Deserialize)] -struct SerializedHead { - head: Uuid -} - -#[derive(Serialize)] -struct SerializedTransaction<'a> { - parent: &'a Uuid, - chunks: &'a Vec -} - -struct RemoteClient { - base_uri: String, - user_uuid: Uuid -} - - -impl RemoteClient { - fn new(base_uri: String, user_uuid: Uuid) -> Self { - RemoteClient { - base_uri: base_uri, - user_uuid: user_uuid - } - } - - fn bound_base_uri(&self) -> String { - // TODO escaping - format!("{}/{}", self.base_uri, self.user_uuid) - } - - fn get_uuid(&self, uri: String) -> Result { - let mut core = Core::new()?; - // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - d(&format!("client")); - - let uri = uri.parse()?; - - d(&format!("parsed uri {:?}", uri)); - - let work = client.get(uri).and_then(|res| { - println!("Response: {}", res.status()); - - res.body().concat2().and_then(move |body| { - let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - Ok(head_json) - }) - }); - - d(&format!("running...")); - - let head_json = core.run(work)?; - d(&format!("got head: {:?}", &head_json.head)); - Ok(head_json.head) - } - - fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> - where hyper::Body: std::convert::From, { - let mut core = Core::new()?; - // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 - // let client = hyper::Client::configure() - // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) - // .build(&core.handle()); - let client = hyper::Client::new(&core.handle()); - - let uri = uri.parse()?; - - d(&format!("PUT {:?}", uri)); - - let mut req = Request::new(Method::Put, uri); - req.headers_mut().set(ContentType::json()); - req.set_body(payload); - - let put = client.request(req).and_then(|res| { - let status_code = res.status(); - - if status_code != expected { - d(&format!("bad put response: {:?}", status_code)); - future::err(HyperError::Status) - } else { - future::ok(()) + // TODO local store moved forward since we last synced. Need to merge or rebase. + if !local_store_empty && have_local_changes { + return Ok(SyncResult::Merge); } - }); - - core.run(put)?; - Ok(()) - } - - fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { - // {"parent": uuid, "chunks": [chunk1, chunk2...]} - let transaction = SerializedTransaction { - parent: parent_uuid, - chunks: chunks - }; - - let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); - let json = serde_json::to_string(&transaction)?; - d(&format!("serialized transaction: {:?}", json)); - self.put(uri, json, StatusCode::Created) - } - - fn get_head(&self) -> Result { - let uri = format!("{}/head", self.bound_base_uri()); - self.get_uuid(uri) - } - fn put_head(&self, uuid: &Uuid) -> Result<()> { - // {"head": uuid} - let head = SerializedHead { - head: uuid.clone() - }; - - let uri = format!("{}/head", self.bound_base_uri()); - let json = serde_json::to_string(&head)?; - d(&format!("serialized head: {:?}", json)); - self.put(uri, json, StatusCode::NoContent) - } - - fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> { - let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); - d(&format!("serialized chunk: {:?}", payload)); - // TODO don't want to clone every datom! - self.put(uri, payload.clone(), StatusCode::Created) - } -} + d(&format!("fast-forwarding local store.")); + return Ok(SyncResult::LocalFastForward( + Syncer::download_theirs(db_tx, &remote_client, &locally_known_remote_head)? + )); + } -#[cfg(test)] -mod tests { - use super::*; - use std::str::FromStr; - - #[test] - fn test_remote_client_bound_uri() { - let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); - let server_uri = String::from("https://example.com/api/0.1"); - let remote_client = RemoteClient::new(server_uri, user_uuid); - assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); + // Our caller will commit the tx with our changes when it's done. } } diff --git a/tolstoy/src/tx_mapper.rs b/tolstoy/src/tx_mapper.rs index a5a84405e..dd000b914 100644 --- a/tolstoy/src/tx_mapper.rs +++ b/tolstoy/src/tx_mapper.rs @@ -33,6 +33,13 @@ impl TxMapper { Ok(()) } + // TODO upsert...? error checking..? + pub fn set_tx_uuid(db_tx: &mut rusqlite::Transaction, tx: Entid, uuid: &Uuid) -> Result<()> { + let uuid_bytes = uuid.as_bytes().to_vec(); + db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?; + Ok(()) + } + // TODO for when we're downloading, right? pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result { match TxMapper::get(db_tx, tx)? { @@ -92,8 +99,8 @@ pub mod tests { #[test] fn test_getters() { - let mut conn = schema::tests::setup_conn(); - let mut tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let mut tx = schema::tests::setup_tx(&mut conn); assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success")); let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success"); assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success")); @@ -101,8 +108,8 @@ pub mod tests { #[test] fn test_bulk_setter() { - let mut conn = schema::tests::setup_conn(); - let mut tx = conn.transaction().expect("db tx"); + let mut conn = schema::tests::setup_conn_bare(); + let mut tx = schema::tests::setup_tx(&mut conn); let mut map = HashMap::new(); TxMapper::set_bulk(&mut tx, &map).expect("empty map success"); diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index ebc12caed..75318b788 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -130,14 +130,20 @@ impl Processor { pub fn process(sqlite: &rusqlite::Transaction, from_tx: Option, receiver: &mut R) -> Result<()> where R: TxReceiver { let tx_filter = match from_tx { - Some(tx) => format!(" WHERE tx > {} ", tx), + Some(tx) => format!(" WHERE tx > {}", tx), None => format!("") }; - let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter); + + let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions{} ORDER BY tx", tx_filter); let mut stmt = sqlite.prepare(&select_query)?; let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable(); + + // If no starting tx is provided, get everything but skip over the first (bootstrap) transaction. + let skip_first_tx = from_tx.is_none(); + let mut at_first_tx = true; let mut current_tx = None; + while let Some(row) = rows.next() { let datom = row?; @@ -153,6 +159,10 @@ impl Processor { }, None => { current_tx = Some(datom.tx); + if at_first_tx && skip_first_tx { + at_first_tx = false; + continue; + } receiver.tx( datom.tx, &mut DatomsIterator::new(&datom, &mut rows) diff --git a/tools/cli/src/mentat_cli/command_parser.rs b/tools/cli/src/mentat_cli/command_parser.rs index 9eaa40a0a..eb2cefc38 100644 --- a/tools/cli/src/mentat_cli/command_parser.rs +++ b/tools/cli/src/mentat_cli/command_parser.rs @@ -46,7 +46,6 @@ pub static COMMAND_HELP: &'static str = &"help"; pub static COMMAND_IMPORT_LONG: &'static str = &"import"; pub static COMMAND_IMPORT_SHORT: &'static str = &"i"; pub static COMMAND_OPEN: &'static str = &"open"; -pub static COMMAND_OPEN_EMPTY: &'static str = &"empty"; pub static COMMAND_QUERY_LONG: &'static str = &"query"; pub static COMMAND_QUERY_SHORT: &'static str = &"q"; pub static COMMAND_QUERY_EXPLAIN_LONG: &'static str = &"explain_query"; @@ -66,7 +65,6 @@ pub enum Command { Help(Vec), Import(String), Open(String), - OpenEmpty(String), Query(String), QueryExplain(String), QueryPrepared(String), @@ -96,7 +94,6 @@ impl Command { &Command::Help(_) | &Command::Import(_) | &Command::Open(_) | - &Command::OpenEmpty(_) | &Command::Timer(_) | &Command::Schema | &Command::Sync(_) @@ -117,7 +114,6 @@ impl Command { &Command::Exit | &Command::Help(_) | &Command::Open(_) | - &Command::OpenEmpty(_) | &Command::QueryExplain(_) | &Command::Timer(_) | &Command::Schema | @@ -146,9 +142,6 @@ impl Command { &Command::Open(ref args) => { format!(".{} {}", COMMAND_OPEN, args) }, - &Command::OpenEmpty(ref args) => { - format!(".{} {}", COMMAND_OPEN_EMPTY, args) - }, &Command::Query(ref args) => { format!(".{} {}", COMMAND_QUERY_LONG, args) }, @@ -259,19 +252,6 @@ pub fn command(s: &str) -> Result { Ok(Command::Open(args[0].clone())) }); - let open_empty_parser = string(COMMAND_OPEN_EMPTY) - .with(spaces()) - .with(arguments()) - .map(|args| { - if args.len() < 1 { - bail!(cli::ErrorKind::CommandParse("Missing required argument".to_string())); - } - if args.len() > 1 { - bail!(cli::ErrorKind::CommandParse(format!("Unrecognized argument {:?}", args[1]))); - } - Ok(Command::OpenEmpty(args[0].clone())) - }); - let query_parser = try(string(COMMAND_QUERY_LONG)).or(try(string(COMMAND_QUERY_SHORT))) .with(edn_arg_parser()) .map(|x| { @@ -321,13 +301,12 @@ pub fn command(s: &str) -> Result { spaces() .skip(token('.')) - .with(choice::<[&mut Parser>; 14], _> + .with(choice::<[&mut Parser>; 13], _> ([&mut try(help_parser), &mut try(import_parser), &mut try(timer_parser), &mut try(cache_parser), &mut try(open_parser), - &mut try(open_empty_parser), &mut try(close_parser), &mut try(explain_query_parser), &mut try(exit_parser), diff --git a/tools/cli/src/mentat_cli/repl.rs b/tools/cli/src/mentat_cli/repl.rs index f6a47c09d..be460fb7d 100644 --- a/tools/cli/src/mentat_cli/repl.rs +++ b/tools/cli/src/mentat_cli/repl.rs @@ -261,12 +261,6 @@ impl Repl { Err(e) => eprintln!("{}", e.to_string()), }; }, - Command::OpenEmpty(db) => { - match self.open_empty(db) { - Ok(_) => println!("Empty database {:?} opened", self.db_name()), - Err(e) => eprintln!("{}", e.to_string()), - }; - }, Command::Query(query) => { self.store .q_once(query.as_str(), None) @@ -357,18 +351,6 @@ impl Repl { Ok(()) } - fn open_empty(&mut self, path: T) -> ::mentat::errors::Result<()> - where T: Into { - let path = path.into(); - if self.path.is_empty() || path != self.path { - let next = Store::open_empty(path.as_str())?; - self.path = path; - self.store = next; - } - - Ok(()) - } - // Close the current store by opening a new in-memory store in its place. fn close(&mut self) { let old_db_name = self.db_name();