Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit 9d1f0fa

Browse files
author
Grisha Kruglov
committed
Sync atomically - within a single db transaction
... except for user partition changes, everything is performed in a single database transaction. It's expected that current way user partition is moved forward won't persist once renumbering work moves forward.
1 parent 9923671 commit 9d1f0fa

2 files changed

Lines changed: 66 additions & 45 deletions

File tree

src/conn.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use rusqlite::{
4242

4343
use edn;
4444

45+
use uuid::Uuid;
46+
4547
use mentat_core::{
4648
Attribute,
4749
Entid,
@@ -98,6 +100,14 @@ use errors::{
98100
MentatError,
99101
};
100102

103+
#[cfg(feature = "syncable")]
104+
use mentat_tolstoy::syncer::{
105+
SyncResult,
106+
};
107+
108+
#[cfg(feature = "syncable")]
109+
use mentat_tolstoy::types::Tx;
110+
101111
use query::{
102112
Known,
103113
PreparedResult,
@@ -189,7 +199,7 @@ pub trait Pullable {
189199
/// A transaction is held open until you do so.
190200
/// Your changes will be implicitly dropped along with this struct.
191201
pub struct InProgress<'a, 'c> {
192-
transaction: rusqlite::Transaction<'c>,
202+
pub(crate) transaction: rusqlite::Transaction<'c>,
193203
mutex: &'a Mutex<Metadata>,
194204
generation: u64,
195205
partition_map: PartitionMap,
@@ -200,6 +210,12 @@ pub struct InProgress<'a, 'c> {
200210
tx_observer_watcher: InProgressObserverTransactWatcher,
201211
}
202212

213+
#[cfg(feature = "syncable")]
214+
pub trait Syncable {
215+
fn flow(&mut self, server_uri: &String, user_uuid: &Uuid) -> Result<SyncResult>;
216+
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<Entid>;
217+
}
218+
203219
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
204220
/// which is read-write, but only allows for reads.
205221
pub struct InProgressRead<'a, 'c>(InProgress<'a, 'c>);

src/sync.rs

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ use errors::{
1515
Result,
1616
};
1717

18+
use conn::{
19+
InProgress,
20+
Syncable as SyncableInProgress
21+
};
22+
use entity_builder::TermBuilder;
1823
use mentat_core::{
1924
Entid,
2025
KnownEntid,
@@ -37,15 +42,18 @@ use mentat_tolstoy::metadata::HeadTrackable;
3742

3843
pub trait Syncable {
3944
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
40-
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()>;
4145
}
4246

4347
fn within_user_partition(entid: Entid) -> bool {
4448
entid >= db::USER0 && entid < db::TX0
4549
}
4650

47-
impl Syncable for Store {
48-
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()> {
51+
impl<'a, 'c> SyncableInProgress for InProgress<'a, 'c> {
52+
fn flow(&mut self, server_uri: &String, user_uuid: &Uuid) -> Result<SyncResult> {
53+
Syncer::flow(&mut self.transaction, server_uri, user_uuid)
54+
}
55+
56+
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<Entid> {
4957
let mut last_tx_entid = None;
5058
let mut last_tx_uuid = None;
5159

@@ -62,8 +70,7 @@ impl Syncable for Store {
6270
let mut largest_endid_encountered = db::USER0;
6371

6472
for tx in txs {
65-
let in_progress = self.begin_transaction()?;
66-
let mut builder = in_progress.builder();
73+
let mut builder = TermBuilder::new();
6774
for part in tx.parts {
6875
if part.added {
6976
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
@@ -75,7 +82,7 @@ impl Syncable for Store {
7582
largest_endid_encountered = part.e;
7683
}
7784
}
78-
let report = builder.commit()?;
85+
let report = self.transact_builder(builder)?;
7986
last_tx_entid = Some(report.tx_id);
8087
last_tx_uuid = Some(tx.tx.clone());
8188
}
@@ -85,56 +92,54 @@ impl Syncable for Store {
8592
// "locally known remote head".
8693
if let Some(uuid) = last_tx_uuid {
8794
if let Some(entid) = last_tx_entid {
88-
{
89-
let mut db_tx = self.sqlite.transaction()?;
90-
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
91-
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
92-
db_tx.commit()?;
93-
}
94-
95-
// only need to advance the user partition, since we're using KnownEntid and partition won't
96-
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
97-
// to create a tx and advance the partition for us.
98-
self.fast_forward_user_partition(largest_endid_encountered)?;
95+
SyncMetadataClient::set_remote_head(&mut self.transaction, &uuid)?;
96+
TxMapper::set_tx_uuid(&mut self.transaction, entid, &uuid)?;
9997
}
10098
}
10199

102-
Ok(())
100+
Ok(largest_endid_encountered)
103101
}
102+
}
104103

104+
impl Syncable for Store {
105105
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
106106
let uuid = Uuid::parse_str(&user_uuid)?;
107107

108-
let sync_result;
108+
let mut largest_endid_encountered = None;
109109
{
110-
let mut db_tx = self.sqlite.transaction()?;
111-
sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?;
110+
let mut in_progress = self.begin_transaction()?;
111+
let sync_result = in_progress.flow(server_uri, &uuid)?;
112+
113+
match sync_result {
114+
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
115+
format!("Can't sync against diverged local.")
116+
)),
117+
SyncResult::LocalFastForward(txs) => {
118+
largest_endid_encountered = Some(in_progress.fast_forward_local(txs)?);
119+
},
120+
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
121+
format!("Bad server state.")
122+
)),
123+
SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented(
124+
format!("IncompatibleBootstrapSchema.")
125+
)),
126+
_ => ()
127+
}
112128

113-
// TODO this should be done _after_ all of the operations below conclude!
114-
// Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc)
115-
db_tx.commit()?;
129+
// All of the work we've done while syncing is committed at this point.
130+
in_progress.commit()?;
116131
}
117132

118-
// TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above,
119-
// and so for now we split up sync into multiple db transactions /o\
120-
// Fixing this likely involves either implementing flow on InProgress, or changing flow to
121-
// take an InProgress instead of a raw sql transaction.
122-
123-
match sync_result {
124-
SyncResult::EmptyServer => Ok(()),
125-
SyncResult::NoChanges => Ok(()),
126-
SyncResult::ServerFastForward => Ok(()),
127-
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
128-
format!("Can't sync against diverged local.")
129-
)),
130-
SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs),
131-
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
132-
format!("Bad server state.")
133-
)),
134-
SyncResult::AdoptedRemoteOnFirstSync => Ok(()),
135-
SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented(
136-
format!("IncompatibleBootstrapSchema.")
137-
)),
133+
// See https://github.com/mozilla/mentat/pull/494 for "renumbering" work which is a generalized take on this.
134+
135+
// Only need to advance the user partition, since we're using KnownEntid and partition won't
136+
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
137+
// to create a tx and advance the partition for us.
138+
match largest_endid_encountered {
139+
Some(v) => self.fast_forward_user_partition(v)?,
140+
None => ()
138141
}
142+
143+
Ok(())
139144
}
140145
}

0 commit comments

Comments
 (0)