Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit b90ccf8

Browse files
author
Grisha Kruglov
committed
WIP renumber partition when performing a fast-forward
1 parent a863509 commit b90ccf8

7 files changed

Lines changed: 44 additions & 35 deletions

File tree

db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ pub use bootstrap::{
7171
CORE_SCHEMA_VERSION,
7272
};
7373

74+
pub use renumber::renumber;
75+
7476
use edn::symbols;
7577

7678
pub use entids::{
@@ -106,6 +108,7 @@ pub use tx_observer::{
106108
pub use types::{
107109
AttributeSet,
108110
DB,
111+
Partition,
109112
PartitionMap,
110113
TransactableValue,
111114
TxReport,

src/sync.rs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,11 @@ use errors::{
2222
};
2323

2424
use mentat_core::{
25-
Entid,
2625
KnownEntid,
2726
};
2827
use mentat_db::{
29-
USER0,
30-
TX0,
31-
};
32-
use mentat_db::db;
33-
use mentat_db::db::{
34-
PartitionMapping,
28+
renumber,
29+
PartitionMap,
3530
};
3631

3732
use entity_builder::{
@@ -52,11 +47,7 @@ pub trait Syncable {
5247
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
5348
}
5449

55-
fn within_user_partition(entid: Entid) -> bool {
56-
entid >= USER0 && entid < TX0
57-
}
58-
59-
fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>) -> Result<()> {
50+
fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>) -> Result<Option<PartitionMap>> {
6051
let mut last_tx = None;
6152

6253
// During fast-forwarding, we will insert datoms with known entids
@@ -69,21 +60,22 @@ fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>
6960
// In absence of excision and implementation bugs, this should work
7061
// just as if we counted number of incoming entids and expanded by
7162
// that number instead.
72-
let mut largest_entid_encountered = USER0;
63+
let mut last_encountered_partition_map = None;
7364

7465
for tx in txs {
7566
let mut builder = TermBuilder::new();
7667

68+
last_encountered_partition_map = match tx.parts[0].partitions.clone() {
69+
Some(parts) => Some(parts),
70+
None => bail!(TolstoyError::BadServerState("Missing partition map in incoming transaction".to_string()))
71+
};
72+
7773
for part in tx.parts {
7874
if part.added {
7975
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
8076
} else {
8177
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
8278
}
83-
// Ignore datoms that fall outside of the user partition:
84-
if within_user_partition(part.e) && part.e > largest_entid_encountered {
85-
largest_entid_encountered = part.e;
86-
}
8779
}
8880

8981
let report = in_progress.transact_builder(builder)?;
@@ -96,15 +88,9 @@ fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>
9688
if let Some((entid, uuid)) = last_tx {
9789
SyncMetadataClient::set_remote_head(&mut in_progress.transaction, &uuid)?;
9890
TxMapper::set_tx_uuid(&mut in_progress.transaction, entid, &uuid)?;
99-
100-
// We only need to advance the user partition, since we're using KnownEntid and partition
101-
// won't get auto-updated; shouldn't be a problem for tx partition, since we're relying on
102-
// the builder to create a tx and advance the partition for us.
103-
in_progress.partition_map.expand_up_to(":db.part/user", largest_entid_encountered);
104-
db::update_partition_map(&mut in_progress.transaction, &in_progress.partition_map)?;
10591
}
10692

107-
Ok(())
93+
Ok(last_encountered_partition_map)
10894
}
10995

11096
impl Conn {
@@ -118,6 +104,7 @@ impl Conn {
118104
let mut in_progress = self.begin_transaction(sqlite)?;
119105

120106
let sync_result = Syncer::flow(&mut in_progress.transaction, server_uri, &uuid)?;
107+
let mut incoming_partition = None;
121108

122109
match sync_result {
123110
SyncResult::EmptyServer => (),
@@ -126,7 +113,10 @@ impl Conn {
126113
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
127114
format!("Can't sync against diverged local.")
128115
)),
129-
SyncResult::LocalFastForward(txs) => fast_forward_local(&mut in_progress, txs)?,
116+
SyncResult::LocalFastForward(txs) => {
117+
incoming_partition = fast_forward_local(&mut in_progress, txs)?;
118+
()
119+
},
130120
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
131121
format!("Bad server state.")
132122
)),
@@ -136,6 +126,16 @@ impl Conn {
136126
)),
137127
}
138128

129+
match incoming_partition {
130+
Some(incoming) => {
131+
let root = SyncMetadataClient::get_partitions(&in_progress.transaction, true)?;
132+
let current = SyncMetadataClient::get_partitions(&in_progress.transaction, false)?;
133+
renumber(&in_progress.transaction, &root, &current, &incoming)?;
134+
()
135+
},
136+
None => ()
137+
}
138+
139139
in_progress.commit()
140140
}
141141
}

tolstoy/src/metadata.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@ impl SyncMetadataClient {
4747
Ok(())
4848
}
4949

50-
pub fn get_partitions(tx: &rusqlite::Transaction) -> Result<PartitionMap> {
51-
let mut stmt: ::rusqlite::Statement = tx.prepare("SELECT part, start, idx FROM tolstoy_parts")?;
50+
pub fn get_partitions(tx: &rusqlite::Transaction, core: bool) -> Result<PartitionMap> {
51+
let db_table = match core {
52+
true => "parts",
53+
false => "tolstoy_parts"
54+
};
55+
let mut stmt: ::rusqlite::Statement = tx.prepare(&format!("SELECT part, start, idx FROM {}", db_table))?;
5256
let m: Result<PartitionMap> = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> {
5357
Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?)))
5458
})?.collect();

tolstoy/src/schema.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub mod tests {
9797
(_, _) => { panic!("Wrong number of results."); },
9898
}
9999

100-
let partitions = SyncMetadataClient::get_partitions(&tx).unwrap();
100+
let partitions = SyncMetadataClient::get_partitions(&tx, false).unwrap();
101101

102102
assert_eq!(partitions.len(), 1);
103103

@@ -146,7 +146,7 @@ pub mod tests {
146146
(_, _) => { panic!("Wrong number of results."); },
147147
}
148148

149-
let partitions = SyncMetadataClient::get_partitions(&tx).unwrap();
149+
let partitions = SyncMetadataClient::get_partitions(&tx, false).unwrap();
150150

151151
assert_eq!(partitions.len(), 1);
152152

tolstoy/src/syncer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use mentat_core::{
1919
use mentat_db::{
2020
CORE_SCHEMA_VERSION,
2121
USER0,
22+
Partition,
23+
PartitionMap,
2224
};
2325
use bootstrap::{
2426
BootstrapHelper,
@@ -45,8 +47,6 @@ use tx_mapper::{
4547
TxMapper,
4648
};
4749
use types::{
48-
Partition,
49-
PartitionMap,
5050
Tx,
5151
TxPart,
5252
};
@@ -164,7 +164,7 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
164164
// so this is temporary and for development purposes only.
165165
let mut tx_partition_map = PartitionMap::default();
166166
tx_partition_map.insert(PARTITION_USER.to_string(), Partition::new(USER0, largest_e + 1));
167-
datoms[0].parts = Some(tx_partition_map);
167+
datoms[0].partitions = Some(tx_partition_map);
168168

169169
// Upload all chunks.
170170
for datom in &datoms {

tolstoy/src/tx_processor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
9595
Err(_) => None,
9696
Ok(datom) => {
9797
Some(TxPart {
98-
parts: None,
98+
partitions: None,
9999
e: datom.e,
100100
a: datom.a,
101101
v: datom.v.clone(),
@@ -113,7 +113,7 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
113113

114114
fn to_tx_part(row: &rusqlite::Row) -> Result<TxPart> {
115115
Ok(TxPart {
116-
parts: None,
116+
partitions: None,
117117
e: row.get_checked(0)?,
118118
a: row.get_checked(1)?,
119119
v: TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?,

tolstoy/src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use mentat_core::{
1515
TypedValue,
1616
};
1717

18+
use mentat_db::PartitionMap;
19+
1820
// For returning out of the downloader as an ordered list.
1921
#[derive(Debug, Clone)]
2022
pub struct Tx {
@@ -26,7 +28,7 @@ pub struct Tx {
2628
pub struct TxPart {
2729
// TODO this is a temporary for development. Only first TxPart in a chunk series should have a non-None 'parts'.
2830
// 'parts' should actually live in a transaction, but we do this now to avoid changing the server until dust settles.
29-
pub parts: Option<PartitionMap>,
31+
pub partitions: Option<PartitionMap>,
3032
pub e: Entid,
3133
pub a: Entid,
3234
pub v: TypedValue,

0 commit comments

Comments
 (0)