Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit c42d100

Browse files
author
Grisha Kruglov
committed
Replication syncing
1 parent 5e50d2a commit c42d100

12 files changed

Lines changed: 352 additions & 84 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ version = "0.6.1"
1616
build = "build/version.rs"
1717

1818
[features]
19-
default = ["bundled_sqlite3"]
19+
default = ["bundled_sqlite3", "syncable"]
2020
bundled_sqlite3 = ["rusqlite/bundled"]
21+
syncable = ["mentat_tolstoy"]
2122

2223
[workspace]
2324
members = ["tools/cli"]
@@ -78,6 +79,7 @@ path = "tx-parser"
7879

7980
[dependencies.mentat_tolstoy]
8081
path = "tolstoy"
82+
optional = true
8183

8284
[profile.release]
8385
debug = true

src/conn.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ use mentat_tx::entities::TempId;
5959

6060
use mentat_tx_parser;
6161

62-
use mentat_tolstoy::Syncer;
63-
64-
use uuid::Uuid;
65-
6662
use entity_builder::{
6763
InProgressBuilder,
6864
};
@@ -129,8 +125,8 @@ pub struct Conn {
129125
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
130126
/// for applications that don't require complex connection management.
131127
pub struct Store {
128+
pub sqlite: rusqlite::Connection,
132129
conn: Conn,
133-
sqlite: rusqlite::Connection,
134130
}
135131

136132
impl Store {
@@ -172,10 +168,6 @@ pub trait Queryable {
172168
where E: Into<Entid>;
173169
}
174170

175-
pub trait Syncable {
176-
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
177-
}
178-
179171
/// Represents an in-progress, not yet committed, set of changes to the store.
180172
/// Call `commit` to commit your changes, or `rollback` to discard them.
181173
/// A transaction is held open until you do so.
@@ -493,13 +485,6 @@ pub enum CacheAction {
493485
Deregister,
494486
}
495487

496-
impl Syncable for Store {
497-
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
498-
let uuid = Uuid::parse_str(&user_uuid)?;
499-
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
500-
}
501-
}
502-
503488
impl Conn {
504489
// Intentionally not public.
505490
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {

src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ extern crate mentat_query_parser;
2929
extern crate mentat_query_projector;
3030
extern crate mentat_query_translator;
3131
extern crate mentat_sql;
32-
extern crate mentat_tolstoy;
3332
extern crate mentat_tx;
3433
extern crate mentat_tx_parser;
3534

35+
#[cfg(feature = "syncable")]
36+
extern crate mentat_tolstoy;
37+
3638
pub use mentat_core::{
3739
Attribute,
3840
Entid,
@@ -95,6 +97,13 @@ pub mod conn;
9597
pub mod query;
9698
pub mod entity_builder;
9799

100+
#[cfg(feature = "syncable")]
101+
pub mod sync;
102+
103+
pub fn get_name() -> String {
104+
return String::from("mentat");
105+
}
106+
98107
pub use query::{
99108
IntoResult,
100109
PlainSymbol,
@@ -115,7 +124,6 @@ pub use conn::{
115124
InProgress,
116125
Metadata,
117126
Queryable,
118-
Syncable,
119127
Store,
120128
};
121129

src/sync.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2016 Mozilla
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at http://www.apache.org/licenses/LICENSE-2.0
6+
// Unless required by applicable law or agreed to in writing, software distributed
7+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
8+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
// specific language governing permissions and limitations under the License.
10+
11+
use uuid::Uuid;
12+
13+
use conn::Store;
14+
use errors::Result;
15+
16+
use mentat_core::KnownEntid;
17+
18+
use entity_builder::BuildTerms;
19+
20+
use mentat_tolstoy::{
21+
Syncer,
22+
SyncMetadataClient,
23+
TxMapper,
24+
};
25+
use mentat_tolstoy::metadata::HeadTrackable;
26+
27+
pub trait Syncable {
28+
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
29+
}
30+
31+
impl Syncable for Store {
32+
// TODO refactor this out of there, somehow!
33+
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
34+
let uuid = Uuid::parse_str(&user_uuid)?;
35+
let mut tx_list = None;
36+
{
37+
let mut db_tx = self.sqlite.transaction()?;
38+
if let Some(list) = Syncer::flow(&mut db_tx, server_uri, &uuid)? {
39+
tx_list = Some(list);
40+
}
41+
42+
// TODO this should be done _after_ any parts below are transacted!!
43+
// Commits any changes Syncer made (tu mappings during an upload!)
44+
db_tx.commit()?;
45+
}
46+
let mut last_tx_entid = None;
47+
let mut last_tx_uuid = None;
48+
49+
match tx_list {
50+
Some(list) => {
51+
for tx in list {
52+
let in_progress = self.begin_transaction()?;
53+
let mut builder = in_progress.builder();
54+
for part in tx.parts {
55+
if part.added {
56+
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
57+
} else {
58+
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
59+
}
60+
}
61+
let report = builder.commit()?;
62+
last_tx_entid = Some(report.tx_id);
63+
last_tx_uuid = Some(tx.tx.clone());
64+
}
65+
()
66+
},
67+
None => ()
68+
}
69+
70+
if let Some(uuid) = last_tx_uuid {
71+
if let Some(entid) = last_tx_entid {
72+
let mut db_tx = self.sqlite.transaction()?;
73+
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
74+
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
75+
db_tx.commit()?;
76+
}
77+
}
78+
79+
Ok(())
80+
}
81+
}

tests/tolstoy.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ fn test_reader() {
9797
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
9898
{
9999
let db_tx = c.transaction().expect("db tx");
100-
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
100+
// Ensure that the first (bootstrap) transaction is skipped over.
101101
let mut receiver = TxCountingReceiver::new();
102102
assert_eq!(false, receiver.is_done);
103103
Processor::process(&db_tx, None, &mut receiver).expect("processor");
104104
assert_eq!(true, receiver.is_done);
105-
assert_eq!(1, receiver.tx_count);
105+
assert_eq!(0, receiver.tx_count);
106106
}
107107

108108
let ids = conn.transact(&mut c, r#"[
@@ -112,7 +112,7 @@ fn test_reader() {
112112
]"#).expect("successful transaction").tempids;
113113
let numba_entity_id = ids.get("s").unwrap();
114114

115-
let mut bootstrap_tx = None;
115+
let first_tx;
116116
{
117117
let db_tx = c.transaction().expect("db tx");
118118
// Expect to see one more transaction of four parts (one for tx datom itself).
@@ -121,10 +121,10 @@ fn test_reader() {
121121

122122
println!("{:#?}", receiver);
123123

124-
assert_eq!(2, receiver.txes.keys().count());
125-
assert_tx_datoms_count(&receiver, 1, 4);
124+
assert_eq!(1, receiver.txes.keys().count());
125+
assert_tx_datoms_count(&receiver, 0, 4);
126126

127-
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
127+
first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx"));
128128
}
129129

130130
let ids = conn.transact(&mut c, r#"[
@@ -138,14 +138,14 @@ fn test_reader() {
138138
// Expect to see a single two part transaction
139139
let mut receiver = TestingReceiver::new();
140140

141-
// Note that we're asking for the bootstrap tx to be skipped by the processor.
142-
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
141+
// Note that we're asking for the first transacted tx to be skipped by the processor.
142+
Processor::process(&db_tx, first_tx, &mut receiver).expect("processor");
143143

144-
assert_eq!(2, receiver.txes.keys().count());
145-
assert_tx_datoms_count(&receiver, 1, 2);
144+
assert_eq!(1, receiver.txes.keys().count());
145+
assert_tx_datoms_count(&receiver, 0, 2);
146146

147147
// Inspect the transaction part.
148-
let tx_id = receiver.txes.keys().nth(1).expect("tx");
148+
let tx_id = receiver.txes.keys().nth(0).expect("tx");
149149
let datoms = receiver.txes.get(tx_id).expect("datoms");
150150
let part = &datoms[0];
151151

tolstoy/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ pub mod tx_processor;
3838
pub mod errors;
3939
pub mod syncer;
4040
pub mod tx_mapper;
41+
pub use tx_mapper::TxMapper;
4142
pub use syncer::Syncer;
43+
pub use metadata::SyncMetadataClient;
4244
pub use errors::{
4345
Error,
4446
ErrorKind,

tolstoy/src/metadata.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ mod tests {
5454

5555
#[test]
5656
fn test_get_remote_head_default() {
57-
let mut conn = schema::tests::setup_conn();
58-
let tx = conn.transaction().expect("db tx");
57+
let mut conn = schema::tests::setup_conn_bare();
58+
let tx = schema::tests::setup_tx(&mut conn);
5959
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
6060
}
6161

6262
#[test]
6363
fn test_set_and_get_remote_head() {
64-
let mut conn = schema::tests::setup_conn();
64+
let mut conn = schema::tests::setup_conn_bare();
65+
let tx = schema::tests::setup_tx(&mut conn);
6566
let uuid = Uuid::new_v4();
66-
let tx = conn.transaction().expect("db tx");
6767
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
6868
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
6969
}

tolstoy/src/schema.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,21 @@ lazy_static! {
2424
};
2525
}
2626

27-
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> {
28-
let tx = conn.transaction()?;
29-
27+
pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> {
3028
for statement in (&SCHEMA_STATEMENTS).iter() {
3129
tx.execute(statement, &[])?;
3230
}
3331

3432
tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?;
35-
tx.commit().map_err(|e| e.into())
33+
Ok(())
3634
}
3735

3836
#[cfg(test)]
3937
pub mod tests {
4038
use super::*;
4139
use uuid::Uuid;
4240

43-
fn setup_conn_bare() -> rusqlite::Connection {
41+
pub fn setup_conn_bare() -> rusqlite::Connection {
4442
let conn = rusqlite::Connection::open_in_memory().unwrap();
4543

4644
conn.execute_batch("
@@ -54,19 +52,24 @@ pub mod tests {
5452
conn
5553
}
5654

57-
pub fn setup_conn() -> rusqlite::Connection {
58-
let mut conn = setup_conn_bare();
59-
ensure_current_version(&mut conn).expect("connection setup");
60-
conn
55+
pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
56+
conn.transaction().expect("tx")
57+
}
58+
59+
pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
60+
let mut tx = conn.transaction().expect("tx");
61+
ensure_current_version(&mut tx).expect("connection setup");
62+
tx
6163
}
6264

6365
#[test]
6466
fn test_empty() {
6567
let mut conn = setup_conn_bare();
68+
let mut tx = setup_tx_bare(&mut conn);
6669

67-
assert!(ensure_current_version(&mut conn).is_ok());
70+
assert!(ensure_current_version(&mut tx).is_ok());
6871

69-
let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
72+
let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
7073
let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works");
7174

7275
let first: Result<String> = keys_iter.next().unwrap().map_err(|e| e.into());
@@ -82,27 +85,23 @@ pub mod tests {
8285
#[test]
8386
fn test_non_empty() {
8487
let mut conn = setup_conn_bare();
88+
let mut tx = setup_tx_bare(&mut conn);
8589

86-
assert!(ensure_current_version(&mut conn).is_ok());
90+
assert!(ensure_current_version(&mut tx).is_ok());
8791

8892
let test_uuid = Uuid::new_v4();
8993
{
90-
let tx = conn.transaction().unwrap();
9194
let uuid_bytes = test_uuid.as_bytes().to_vec();
9295
match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) {
9396
Err(e) => panic!("Error running an update: {}", e),
9497
_ => ()
9598
}
96-
match tx.commit() {
97-
Err(e) => panic!("Error committing an update: {}", e),
98-
_ => ()
99-
}
10099
}
101100

102-
assert!(ensure_current_version(&mut conn).is_ok());
101+
assert!(ensure_current_version(&mut tx).is_ok());
103102

104103
// Check that running ensure_current_version on an initialized conn doesn't change anything.
105-
let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap();
104+
let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap();
106105
let mut values_iter = stmt.query_map(&[], |r| {
107106
let raw_uuid: Vec<u8> = r.get(0);
108107
Uuid::from_bytes(raw_uuid.as_slice()).unwrap()

0 commit comments

Comments
 (0)