Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ use mentat_core::{
attribute,
Attribute,
AttributeBitFlags,
AttributeMap,
Entid,
FromMicros,
IdentMap,
SQLValueType,
Schema,
AttributeMap,
TypedValue,
ToMicros,
ValueType,
TypedValue,
ValueRc,
ValueType,
};

use errors::{ErrorKind, Result, ResultExt};
Expand Down Expand Up @@ -477,6 +478,80 @@ pub trait MentatStoring {
fn committed_metadata_assertions(&self, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>>;
}

/// Make room in the local store for new entities allocated remotely, shifting local entities
/// to be numbered after remote entities. If successful, returns a new partition map that
/// reflects local and remote data.
/// TODO: also return a Schema and stuff, right?
fn renumber(conn: &rusqlite::Connection, root: &PartitionMap, local: &PartitionMap, remote: &PartitionMap) -> Result<PartitionMap> {
// For each partition present in both local and remote, that has advanced locally since remote,
// renumber local entities.
// Take each partition's largest value from either local or remote, if only one has advanced.
let mut output = remote.clone();
for (name, local_partition) in local {
if let Some(remote_partition) = remote.get(name) {
// The partition exists in both places.
if local_partition.start != remote_partition.start {
// Oh dear.
unimplemented!();
}

if let Some(root_partition) = root.get(name) {
let root_index = root_partition.index;
let advanced_locally_by = local_partition.index - root_index;
let advanced_remotely_by = remote_partition.index - root_index;
if advanced_locally_by > 0 && advanced_remotely_by > 0 {
// Both changed. Renumber locally.
// Note that we bound the set to include the max in order to avoid renumbering other partitions!
// TODO: we're going to renumber in the schema, so we will know precisely which attributes
// we might need to renumber. Save some work!
let lower = root_index;
let upper = lower + advanced_locally_by;
let increment = advanced_remotely_by;
let entity_tag = ValueType::Ref.value_type_tag();

let datoms_e = "UPDATE datoms SET e = e + ? WHERE e >= ? AND e < ?";
let datoms_a = "UPDATE datoms SET a = a + ? WHERE a >= ? AND a < ?";
let datoms_v = "UPDATE datoms SET v = v + ? WHERE value_type_tag = ? AND v >= ? AND v < ?";

// TODO: filter by tx, too -- no need to touch historical values, because they can't
// possibly be affected.
let tx_e = "UPDATE transactions SET e = e + ? WHERE e >= ? AND e < ?";
let tx_a = "UPDATE transactions SET a = a + ? WHERE a >= ? AND a < ?";
let tx_v = "UPDATE transactions SET v = v + ? WHERE value_type_tag = ? AND v >= ? AND v < ?";

// Well, this is awkward.
let ea_args: Vec<&ToSql> = vec![&increment, &lower, &upper];
let v_args: Vec<&ToSql> = vec![&increment, &entity_tag, &lower, &upper];
conn.execute(datoms_e, &ea_args)?;
conn.execute(datoms_a, &ea_args)?;
conn.execute(datoms_v, &v_args)?;
conn.execute(tx_e, &ea_args)?;
conn.execute(tx_a, &ea_args)?;
conn.execute(tx_v, &v_args)?;

// TODO: update `idents` and `schema`, too.

output.get_mut(name).unwrap().index += advanced_locally_by;
} else {
// Take the largest value. We already defaulted to remote….
if local_partition.index > remote_partition.index {
output.get_mut(name).unwrap().index = local_partition.index;
}
}
} else {
// There is no partition in the root! Oh dear.
// This must be implemented before we can add partitions.
unimplemented!();
}
} else {
// There is no conflict: there is no remote partition with this name.
output.insert(name.clone(), local_partition.clone());
}
}
// TODO: update parts to match the returned partition map.
Ok(output)
}

/// Take search rows and complete `temp.search_results`.
///
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
Expand Down