Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std; // To refer to std::result::Result.
use std::collections::BTreeSet;

use rusqlite;
use uuid;

use edn;

Expand All @@ -30,6 +31,9 @@ use mentat_query_projector;
use mentat_query_pull;
use mentat_sql;

#[cfg(feature = "syncable")]
use mentat_tolstoy;

pub type Result<T> = std::result::Result<T, MentatError>;

#[macro_export]
Expand Down Expand Up @@ -80,6 +84,11 @@ pub enum MentatError {
#[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)]
ValueTypeMismatch(ValueType, ValueType),

/// We're just not done yet. Message that the feature is recognized but not yet
/// implemented.
#[fail(display = "not yet implemented: {}", _0)]
NotYetImplemented(String),

#[fail(display = "{}", _0)]
IoError(#[cause] std::io::Error),

Expand All @@ -103,8 +112,15 @@ pub enum MentatError {
#[fail(display = "{}", _0)]
PullError(#[cause] mentat_query_pull::PullError),

#[fail(display = "{}", _0)]
UuidError(#[cause] uuid::ParseError),

#[fail(display = "{}", _0)]
SQLError(#[cause] mentat_sql::SQLError),

#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)]
TolstoyError(#[cause] mentat_tolstoy::TolstoyError),
}

impl From<std::io::Error> for MentatError {
Expand Down Expand Up @@ -154,3 +170,16 @@ impl From<mentat_sql::SQLError> for MentatError {
MentatError::SQLError(error)
}
}

#[cfg(feature = "syncable")]
impl From<mentat_tolstoy::TolstoyError> for MentatError {
fn from(error: mentat_tolstoy::TolstoyError) -> MentatError {
MentatError::TolstoyError(error)
}
}

impl From<uuid::ParseError> for MentatError {
fn from(error: uuid::ParseError) -> MentatError {
MentatError::UuidError(error)
}
}
2 changes: 1 addition & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Pullable for Store {
}

impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::failure::Error> {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
}
Expand Down
142 changes: 142 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use uuid::Uuid;

use rusqlite;

use conn::{
Conn,
InProgress,
};

use errors::{
Result,
};

use mentat_core::{
KnownEntid,
};
use mentat_db::{
renumber,
PartitionMap,
};

use entity_builder::{
BuildTerms,
TermBuilder,
};

use mentat_tolstoy::{
Syncer,
SyncMetadataClient,
SyncResult,
Tx,
TxMapper,
TolstoyError,
};

pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError>;
}

fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>) -> Result<Option<PartitionMap>> {
let mut last_tx = None;

// During fast-forwarding, we will insert datoms with known entids
// which, by definition, fall outside of our user partition.
// Once we've done with insertion, we need to ensure that user
// partition's next allocation will not overlap with just-inserted datoms.
// To allow for "holes" in the user partition (due to data excision),
// we track the highest incoming entid we saw, and expand our
// local partition to match.
// In absence of excision and implementation bugs, this should work
// just as if we counted number of incoming entids and expanded by
// that number instead.
let mut last_encountered_partition_map = None;

for tx in txs {
let mut builder = TermBuilder::new();

last_encountered_partition_map = match tx.parts[0].partitions.clone() {
Some(parts) => Some(parts),
None => bail!(TolstoyError::BadServerState("Missing partition map in incoming transaction".to_string()))
};

for part in tx.parts {
if part.added {
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
} else {
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
}
}

let report = in_progress.transact_builder(builder)?;

last_tx = Some((report.tx_id, tx.tx.clone()));
}

// We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding
// incoming tx uuid, advance our "locally known remote head".
if let Some((entid, uuid)) = last_tx {
SyncMetadataClient::set_remote_head(&mut in_progress.transaction, &uuid)?;
TxMapper::set_tx_uuid(&mut in_progress.transaction, entid, &uuid)?;
}

Ok(last_encountered_partition_map)
}

impl Conn {
pub(crate) fn sync(&mut self,
sqlite: &mut rusqlite::Connection,
server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
let uuid = Uuid::parse_str(&user_uuid)?;

// Take an IMMEDIATE transaction right away. We have an SQL transaction, and complete
// control over the `Conn` metadata at this point, just like `transact()`.
let mut in_progress = self.begin_transaction(sqlite)?;

let sync_result = Syncer::flow(&mut in_progress.transaction, server_uri, &uuid)?;
let mut incoming_partition = None;

match sync_result {
SyncResult::EmptyServer => (),
SyncResult::NoChanges => (),
SyncResult::ServerFastForward => (),
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
format!("Can't sync against diverged local.")
)),
SyncResult::LocalFastForward(txs) => {
incoming_partition = fast_forward_local(&mut in_progress, txs)?;
()
},
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
format!("Bad server state.")
)),
SyncResult::AdoptedRemoteOnFirstSync => (),
SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented(
format!("IncompatibleBootstrapSchema.")
)),
}

match incoming_partition {
Some(incoming) => {
let root = SyncMetadataClient::get_partitions(&in_progress.transaction, true)?;
let current = SyncMetadataClient::get_partitions(&in_progress.transaction, false)?;
let updated_db = renumber(&in_progress.transaction, &root, &current, &incoming)?;
in_progress.partition_map = updated_db.partition_map;
()
},
None => ()
}

in_progress.commit()
}
}
76 changes: 74 additions & 2 deletions tolstoy/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

#![allow(dead_code)]

use failure::Error;
use std;
use rusqlite;
use uuid;
use hyper;
use serde_json;

use mentat_db;

#[macro_export]
macro_rules! bail {
Expand All @@ -19,7 +25,7 @@ macro_rules! bail {
)
}

pub type Result<T> = ::std::result::Result<T, Error>;
pub type Result<T> = ::std::result::Result<T, TolstoyError>;

#[derive(Debug, Fail)]
pub enum TolstoyError {
Expand All @@ -40,4 +46,70 @@ pub enum TolstoyError {

#[fail(display = "not yet implemented: {}", _0)]
NotYetImplemented(String),

#[fail(display = "{}", _0)]
DbError(#[cause] mentat_db::DbError),

#[fail(display = "{}", _0)]
SerializationError(#[cause] serde_json::Error),

// It would be better to capture the underlying `rusqlite::Error`, but that type doesn't
// implement many useful traits, including `Clone`, `Eq`, and `PartialEq`.
#[fail(display = "SQL error: _0")]
RusqliteError(String),

#[fail(display = "{}", _0)]
IoError(#[cause] std::io::Error),

#[fail(display = "{}", _0)]
UuidError(#[cause] uuid::ParseError),

#[fail(display = "{}", _0)]
NetworkError(#[cause] hyper::Error),

#[fail(display = "{}", _0)]
UriError(#[cause] hyper::error::UriError),
}

impl From<mentat_db::DbError> for TolstoyError {
fn from(error: mentat_db::DbError) -> TolstoyError {
TolstoyError::DbError(error)
}
}

impl From<serde_json::Error> for TolstoyError {
fn from(error: serde_json::Error) -> TolstoyError {
TolstoyError::SerializationError(error)
}
}

impl From<rusqlite::Error> for TolstoyError {
fn from(error: rusqlite::Error) -> TolstoyError {
TolstoyError::RusqliteError(error.to_string())
}
}

impl From<std::io::Error> for TolstoyError {
fn from(error: std::io::Error) -> TolstoyError {
TolstoyError::IoError(error)
}
}

impl From<uuid::ParseError> for TolstoyError {
fn from(error: uuid::ParseError) -> TolstoyError {
TolstoyError::UuidError(error)
}
}

impl From<hyper::Error> for TolstoyError {
fn from(error: hyper::Error) -> TolstoyError {
TolstoyError::NetworkError(error)
}
}

impl From<hyper::error::UriError> for TolstoyError {
fn from(error: hyper::error::UriError) -> TolstoyError {
TolstoyError::UriError(error)
}
}