Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit d9d2b3a

Browse files
author
Grisha Kruglov
committed
Replication syncing
1 parent 36d4551 commit d9d2b3a

16 files changed

Lines changed: 431 additions & 96 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ version = "0.6.1"
1616
build = "build/version.rs"
1717

1818
[features]
19-
default = ["bundled_sqlite3"]
19+
default = ["bundled_sqlite3", "syncable"]
2020
bundled_sqlite3 = ["rusqlite/bundled"]
21+
syncable = ["mentat_tolstoy"]
2122

2223
[workspace]
2324
members = ["tools/cli"]
@@ -78,6 +79,7 @@ path = "tx-parser"
7879

7980
[dependencies.mentat_tolstoy]
8081
path = "tolstoy"
82+
optional = true
8183

8284
[profile.release]
8385
debug = true

db/src/db.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,7 @@ pub trait PartitionMapping {
11151115
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
11161116
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
11171117
fn contains_entid(&self, entid: Entid) -> bool;
1118+
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S>;
11181119
}
11191120

11201121
impl PartitionMapping for PartitionMap {
@@ -1136,6 +1137,23 @@ impl PartitionMapping for PartitionMap {
11361137
}
11371138
}
11381139

1140+
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S> {
1141+
match self.get_mut(partition) {
1142+
Some(partition) => {
1143+
// Don't honour requests to shrink the partition.
1144+
if partition.index > entid {
1145+
return ()
1146+
}
1147+
let new_index = entid + 1;
1148+
if partition.index != new_index {
1149+
partition.index = new_index;
1150+
}
1151+
},
1152+
// This is a programming error.
1153+
None => panic!("Cannot expand unknown partition: {}", partition),
1154+
}
1155+
}
1156+
11391157
fn contains_entid(&self, entid: Entid) -> bool {
11401158
self.values().any(|partition| partition.contains_entid(entid))
11411159
}

db/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ mod tx;
4747
pub mod types;
4848
mod upsert_resolution;
4949

50-
// Export these for reference from tests. cfg(test) should work, but doesn't.
51-
// #[cfg(test)]
50+
// Export these for reference from sync code and tests.
5251
pub use bootstrap::{
5352
TX0,
5453
USER0,

src/conn.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,14 @@ use mentat_db::{
5252
};
5353

5454
use mentat_db::internal_types::TermWithTempIds;
55+
use mentat_db::db::PartitionMapping;
5556

5657
use mentat_tx;
5758

5859
use mentat_tx::entities::TempId;
5960

6061
use mentat_tx_parser;
6162

62-
use mentat_tolstoy::Syncer;
63-
64-
use uuid::Uuid;
65-
6663
use entity_builder::{
6764
InProgressBuilder,
6865
};
@@ -129,8 +126,8 @@ pub struct Conn {
129126
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
130127
/// for applications that don't require complex connection management.
131128
pub struct Store {
129+
pub sqlite: rusqlite::Connection,
132130
conn: Conn,
133-
sqlite: rusqlite::Connection,
134131
}
135132

136133
impl Store {
@@ -157,6 +154,12 @@ impl Store {
157154
sqlite: connection,
158155
})
159156
}
157+
158+
pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> {
159+
let mut metadata = self.conn.metadata.lock().unwrap();
160+
metadata.partition_map.expand_up_to(":db.part/user", new_head);
161+
db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into())
162+
}
160163
}
161164

162165
pub trait Queryable {
@@ -172,10 +175,6 @@ pub trait Queryable {
172175
where E: Into<Entid>;
173176
}
174177

175-
pub trait Syncable {
176-
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
177-
}
178-
179178
/// Represents an in-progress, not yet committed, set of changes to the store.
180179
/// Call `commit` to commit your changes, or `rollback` to discard them.
181180
/// A transaction is held open until you do so.
@@ -493,13 +492,6 @@ pub enum CacheAction {
493492
Deregister,
494493
}
495494

496-
impl Syncable for Store {
497-
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
498-
let uuid = Uuid::parse_str(&user_uuid)?;
499-
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
500-
}
501-
}
502-
503495
impl Conn {
504496
// Intentionally not public.
505497
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {

src/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,5 +108,10 @@ error_chain! {
108108
description("provided value doesn't match value type")
109109
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
110110
}
111+
112+
NotYetImplemented(t: String) {
113+
description("not yet implemented")
114+
display("not yet implemented: {}", t)
115+
}
111116
}
112117
}

src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ extern crate mentat_query_parser;
2929
extern crate mentat_query_projector;
3030
extern crate mentat_query_translator;
3131
extern crate mentat_sql;
32-
extern crate mentat_tolstoy;
3332
extern crate mentat_tx;
3433
extern crate mentat_tx_parser;
3534

35+
#[cfg(feature = "syncable")]
36+
extern crate mentat_tolstoy;
37+
3638
pub use mentat_core::{
3739
Attribute,
3840
Entid,
@@ -95,6 +97,13 @@ pub mod conn;
9597
pub mod query;
9698
pub mod entity_builder;
9799

100+
#[cfg(feature = "syncable")]
101+
pub mod sync;
102+
103+
pub fn get_name() -> String {
104+
return String::from("mentat");
105+
}
106+
98107
pub use query::{
99108
IntoResult,
100109
PlainSymbol,
@@ -115,7 +124,6 @@ pub use conn::{
115124
InProgress,
116125
Metadata,
117126
Queryable,
118-
Syncable,
119127
Store,
120128
};
121129

src/sync.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright 2016 Mozilla
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at http://www.apache.org/licenses/LICENSE-2.0
6+
// Unless required by applicable law or agreed to in writing, software distributed
7+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
8+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
// specific language governing permissions and limitations under the License.
10+
11+
use uuid::Uuid;
12+
13+
use conn::Store;
14+
use errors::{
15+
Result,
16+
ErrorKind,
17+
};
18+
19+
use mentat_core::{
20+
Entid,
21+
KnownEntid,
22+
};
23+
use mentat_db as db;
24+
25+
use entity_builder::BuildTerms;
26+
27+
use mentat_tolstoy::{
28+
Syncer,
29+
SyncMetadataClient,
30+
TxMapper,
31+
};
32+
use mentat_tolstoy::syncer::{
33+
Tx,
34+
SyncResult,
35+
};
36+
use mentat_tolstoy::metadata::HeadTrackable;
37+
38+
pub trait Syncable {
39+
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
40+
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()>;
41+
}
42+
43+
fn within_user_partition(entid: Entid) -> bool {
44+
entid >= db::USER0 && entid < db::TX0
45+
}
46+
47+
impl Syncable for Store {
48+
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()> {
49+
let mut last_tx_entid = None;
50+
let mut last_tx_uuid = None;
51+
52+
// During fast-forwarding, we will insert datoms with known entids
53+
// which, by definition, fall outside of our user partition.
54+
// Once we've done with insertion, we need to ensure that user
55+
// partition's next allocation will not overlap with just-inserted datoms.
56+
// To allow for "holes" in the user partition (due to data excision),
57+
// we track the highest incoming entid we saw, and expand our
58+
// local partition to match.
59+
// In absence of excision and implementation bugs, this should work
60+
// just as if we counted number of incoming entids and expanded by
61+
// that number instead.
62+
let mut largest_endid_encountered = db::USER0;
63+
64+
for tx in txs {
65+
let in_progress = self.begin_transaction()?;
66+
let mut builder = in_progress.builder();
67+
for part in tx.parts {
68+
if part.added {
69+
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
70+
} else {
71+
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
72+
}
73+
// Ignore datoms that fall outside of the user partition:
74+
if within_user_partition(part.e) && part.e > largest_endid_encountered {
75+
largest_endid_encountered = part.e;
76+
}
77+
}
78+
let report = builder.commit()?;
79+
last_tx_entid = Some(report.tx_id);
80+
last_tx_uuid = Some(tx.tx.clone());
81+
}
82+
83+
// We've just transacted a new tx, and generated a new tx entid.
84+
// Map it to the corresponding incoming tx uuid, advance our
85+
// "locally known remote head".
86+
if let Some(uuid) = last_tx_uuid {
87+
if let Some(entid) = last_tx_entid {
88+
{
89+
let mut db_tx = self.sqlite.transaction()?;
90+
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
91+
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
92+
db_tx.commit()?;
93+
}
94+
95+
// only need to advance the user partition, since we're using KnownEntid and partition won't
96+
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
97+
// to create a tx and advance the partition for us.
98+
self.fast_forward_user_partition(largest_endid_encountered)?;
99+
}
100+
}
101+
102+
Ok(())
103+
}
104+
105+
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
106+
let uuid = Uuid::parse_str(&user_uuid)?;
107+
108+
let sync_result;
109+
{
110+
let mut db_tx = self.sqlite.transaction()?;
111+
sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?;
112+
113+
// TODO this should be done _after_ all of the operations below conclude!
114+
// Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc)
115+
db_tx.commit()?;
116+
}
117+
118+
// TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above,
119+
// and so for now we split up sync into multiple db transactions /o\
120+
// Fixing this likely involves either implementing flow on InProgress, or changing flow to
121+
// take an InProgress instead of a raw sql transaction.
122+
123+
match sync_result {
124+
SyncResult::EmptyServer => Ok(()),
125+
SyncResult::NoChanges => Ok(()),
126+
SyncResult::ServerFastForward => Ok(()),
127+
SyncResult::Merge => bail!(ErrorKind::NotYetImplemented(
128+
format!("Can't sync against diverged local.")
129+
)),
130+
SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs)
131+
}
132+
}
133+
}

tests/tolstoy.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ fn test_reader() {
9797
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
9898
{
9999
let db_tx = c.transaction().expect("db tx");
100-
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
100+
// Ensure that the first (bootstrap) transaction is skipped over.
101101
let mut receiver = TxCountingReceiver::new();
102102
assert_eq!(false, receiver.is_done);
103103
Processor::process(&db_tx, None, &mut receiver).expect("processor");
104104
assert_eq!(true, receiver.is_done);
105-
assert_eq!(1, receiver.tx_count);
105+
assert_eq!(0, receiver.tx_count);
106106
}
107107

108108
let ids = conn.transact(&mut c, r#"[
@@ -112,7 +112,7 @@ fn test_reader() {
112112
]"#).expect("successful transaction").tempids;
113113
let numba_entity_id = ids.get("s").unwrap();
114114

115-
let mut bootstrap_tx = None;
115+
let first_tx;
116116
{
117117
let db_tx = c.transaction().expect("db tx");
118118
// Expect to see one more transaction of four parts (one for tx datom itself).
@@ -121,10 +121,10 @@ fn test_reader() {
121121

122122
println!("{:#?}", receiver);
123123

124-
assert_eq!(2, receiver.txes.keys().count());
125-
assert_tx_datoms_count(&receiver, 1, 4);
124+
assert_eq!(1, receiver.txes.keys().count());
125+
assert_tx_datoms_count(&receiver, 0, 4);
126126

127-
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
127+
first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx"));
128128
}
129129

130130
let ids = conn.transact(&mut c, r#"[
@@ -138,14 +138,14 @@ fn test_reader() {
138138
// Expect to see a single two part transaction
139139
let mut receiver = TestingReceiver::new();
140140

141-
// Note that we're asking for the bootstrap tx to be skipped by the processor.
142-
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
141+
// Note that we're asking for the first transacted tx to be skipped by the processor.
142+
Processor::process(&db_tx, first_tx, &mut receiver).expect("processor");
143143

144-
assert_eq!(2, receiver.txes.keys().count());
145-
assert_tx_datoms_count(&receiver, 1, 2);
144+
assert_eq!(1, receiver.txes.keys().count());
145+
assert_tx_datoms_count(&receiver, 0, 2);
146146

147147
// Inspect the transaction part.
148-
let tx_id = receiver.txes.keys().nth(1).expect("tx");
148+
let tx_id = receiver.txes.keys().nth(0).expect("tx");
149149
let datoms = receiver.txes.get(tx_id).expect("datoms");
150150
let part = &datoms[0];
151151

tolstoy/src/errors.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@ error_chain! {
4949
display("encountered unexpected state: {}", t)
5050
}
5151

52-
NotYetImplemented(t: String) {
53-
description("not yet implemented")
54-
display("not yet implemented: {}", t)
55-
}
56-
5752
DuplicateMetadata(k: String) {
5853
description("encountered more than one metadata value for key")
5954
display("encountered more than one metadata value for key: {}", k)

tolstoy/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ pub mod tx_processor;
3939
pub mod errors;
4040
pub mod syncer;
4141
pub mod tx_mapper;
42+
pub use tx_mapper::TxMapper;
4243
pub use syncer::Syncer;
44+
pub use metadata::SyncMetadataClient;
4345
pub use errors::{
4446
Error,
4547
ErrorKind,

0 commit comments

Comments
 (0)