Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ version = "0.7.0"
build = "build/version.rs"

[features]
default = ["bundled_sqlite3"]
default = ["bundled_sqlite3", "syncable"]
bundled_sqlite3 = ["rusqlite/bundled"]
syncable = ["mentat_tolstoy"]

[workspace]
members = ["tools/cli", "ffi"]
Expand Down Expand Up @@ -72,6 +73,7 @@ path = "query-translator"

[dependencies.mentat_tolstoy]
path = "tolstoy"
optional = true

[profile.release]
opt-level = 3
Expand Down
18 changes: 18 additions & 0 deletions db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ pub trait PartitionMapping {
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
fn contains_entid(&self, entid: Entid) -> bool;
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S>;
}

impl PartitionMapping for PartitionMap {
Expand All @@ -1072,6 +1073,23 @@ impl PartitionMapping for PartitionMap {
}
}

fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S> {
match self.get_mut(partition) {
Some(partition) => {
// Don't honour requests to shrink the partition.
if partition.index > entid {
return ()
}
let new_index = entid + 1;
if partition.index != new_index {
partition.index = new_index;
}
},
// This is a programming error.
None => panic!("Cannot expand unknown partition: {}", partition),
}
}

fn contains_entid(&self, entid: Entid) -> bool {
self.values().any(|partition| partition.contains_entid(entid))
}
Expand Down
3 changes: 1 addition & 2 deletions db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ mod tx_checking;
pub mod types;
mod upsert_resolution;

// Export these for reference from tests. cfg(test) should work, but doesn't.
// #[cfg(test)]
// Export these for reference from sync code and tests.
pub use bootstrap::{
TX0,
USER0,
Expand Down
42 changes: 10 additions & 32 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use mentat_db::{
};

use mentat_db::internal_types::TermWithTempIds;
use mentat_db::db::PartitionMapping;

use mentat_query_pull::{
pull_attributes_for_entities,
Expand All @@ -88,10 +89,6 @@ use edn::entities::{
OpType,
};

use mentat_tolstoy::Syncer;

use uuid::Uuid;

use entity_builder::{
InProgressBuilder,
TermBuilder,
Expand Down Expand Up @@ -167,8 +164,8 @@ pub struct Conn {
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
pub sqlite: rusqlite::Connection,
conn: Conn,
sqlite: rusqlite::Connection,
}

impl Store {
Expand All @@ -182,28 +179,18 @@ impl Store {
})
}

/// Returns a totally blank store with no bootstrap schema. Use `open` instead.
pub fn open_empty(path: &str) -> Result<Store> {
if !path.is_empty() {
if Path::new(path).exists() {
bail!(ErrorKind::PathAlreadyExists(path.to_string()));
}
}

let mut connection = ::new_connection(path)?;
let conn = Conn::empty(&mut connection)?;
Ok(Store {
conn: conn,
sqlite: connection,
})
}

pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
let mut ip = self.begin_transaction()?;
let report = ip.transact(transaction)?;
ip.commit()?;
Ok(report)
}

pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> {
let mut metadata = self.conn.metadata.lock().unwrap();
metadata.partition_map.expand_up_to(":db.part/user", new_head);
db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into())
}
}

pub trait Queryable {
Expand All @@ -227,10 +214,6 @@ pub trait Pullable {
where A: IntoIterator<Item=Entid>;
}

pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
}

/// Represents an in-progress, not yet committed, set of changes to the store.
/// Call `commit` to commit your changes, or `rollback` to discard them.
/// A transaction is held open until you do so.
Expand Down Expand Up @@ -700,13 +683,6 @@ pub enum CacheAction {
Deregister,
}

impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
}
}

impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Expand Down Expand Up @@ -988,6 +964,8 @@ mod tests {
Instant,
};

use uuid::Uuid;

use mentat_core::{
CachedAttributes,
Binding,
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,10 @@ error_chain! {
description("provided value doesn't match value type")
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
}

NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
}
}
11 changes: 10 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extern crate mentat_query_projector;
extern crate mentat_query_pull;
extern crate mentat_query_translator;
extern crate mentat_sql;

#[cfg(feature = "syncable")]
extern crate mentat_tolstoy;

pub use mentat_core::{
Expand Down Expand Up @@ -103,6 +105,14 @@ pub mod query;
pub mod entity_builder;
pub mod query_builder;

#[cfg(feature = "syncable")]
pub mod sync;

#[cfg(feature = "syncable")]
pub use sync::{
Syncable,
};

pub use query::{
IntoResult,
PlainSymbol,
Expand All @@ -129,7 +139,6 @@ pub use conn::{
Metadata,
Pullable,
Queryable,
Syncable,
Store,
};

Expand Down
133 changes: 133 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use uuid::Uuid;

use conn::Store;
use errors::{
Result,
ErrorKind,
};

use mentat_core::{
Entid,
KnownEntid,
};
use mentat_db as db;

use entity_builder::BuildTerms;

use mentat_tolstoy::{
Syncer,
SyncMetadataClient,
TxMapper,
};
use mentat_tolstoy::syncer::{
Tx,
SyncResult,
};
use mentat_tolstoy::metadata::HeadTrackable;

pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()>;
}

fn within_user_partition(entid: Entid) -> bool {
entid >= db::USER0 && entid < db::TX0
}

impl Syncable for Store {
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()> {
let mut last_tx_entid = None;
let mut last_tx_uuid = None;

// During fast-forwarding, we will insert datoms with known entids
// which, by definition, fall outside of our user partition.
// Once we've done with insertion, we need to ensure that user
// partition's next allocation will not overlap with just-inserted datoms.
// To allow for "holes" in the user partition (due to data excision),
// we track the highest incoming entid we saw, and expand our
// local partition to match.
// In absence of excision and implementation bugs, this should work
// just as if we counted number of incoming entids and expanded by
// that number instead.
let mut largest_endid_encountered = db::USER0;

for tx in txs {
let in_progress = self.begin_transaction()?;
let mut builder = in_progress.builder();
for part in tx.parts {
if part.added {
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
} else {
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
}
// Ignore datoms that fall outside of the user partition:
if within_user_partition(part.e) && part.e > largest_endid_encountered {
largest_endid_encountered = part.e;
}
}
let report = builder.commit()?;
last_tx_entid = Some(report.tx_id);
last_tx_uuid = Some(tx.tx.clone());
}

// We've just transacted a new tx, and generated a new tx entid.
// Map it to the corresponding incoming tx uuid, advance our
// "locally known remote head".
if let Some(uuid) = last_tx_uuid {
if let Some(entid) = last_tx_entid {
{
let mut db_tx = self.sqlite.transaction()?;
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
db_tx.commit()?;
}

// only need to advance the user partition, since we're using KnownEntid and partition won't
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
// to create a tx and advance the partition for us.
self.fast_forward_user_partition(largest_endid_encountered)?;
}
}

Ok(())
}

fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;

let sync_result;
{
let mut db_tx = self.sqlite.transaction()?;
sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?;

// TODO this should be done _after_ all of the operations below conclude!
// Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc)
db_tx.commit()?;
}

// TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above,
// and so for now we split up sync into multiple db transactions /o\
// Fixing this likely involves either implementing flow on InProgress, or changing flow to
// take an InProgress instead of a raw sql transaction.

match sync_result {
SyncResult::EmptyServer => Ok(()),
SyncResult::NoChanges => Ok(()),
SyncResult::ServerFastForward => Ok(()),
SyncResult::Merge => bail!(ErrorKind::NotYetImplemented(
format!("Can't sync against diverged local.")
)),
SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs)
}
}
}
22 changes: 11 additions & 11 deletions tests/tolstoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ fn test_reader() {
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
{
let db_tx = c.transaction().expect("db tx");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
// Ensure that the first (bootstrap) transaction is skipped over.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&db_tx, None, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
assert_eq!(0, receiver.tx_count);
}

let ids = conn.transact(&mut c, r#"[
Expand All @@ -112,7 +112,7 @@ fn test_reader() {
]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap();

let bootstrap_tx;
let first_tx;
{
let db_tx = c.transaction().expect("db tx");
// Expect to see one more transaction of four parts (one for tx datom itself).
Expand All @@ -121,10 +121,10 @@ fn test_reader() {

println!("{:#?}", receiver);

assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
assert_eq!(1, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 0, 4);

bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx"));
}

let ids = conn.transact(&mut c, r#"[
Expand All @@ -138,14 +138,14 @@ fn test_reader() {
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();

// Note that we're asking for the bootstrap tx to be skipped by the processor.
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
// Note that we're asking for the first transacted tx to be skipped by the processor.
Processor::process(&db_tx, first_tx, &mut receiver).expect("processor");

assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 2);
assert_eq!(1, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 0, 2);

// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(1).expect("tx");
let tx_id = receiver.txes.keys().nth(0).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom");

Expand Down
Loading