From 7fcdfc29d3f44e2afd6cb304351b475c9fd41af2 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Fri, 22 Jun 2018 16:01:37 -0700 Subject: [PATCH 01/18] Pre: Remove unused scope. --- db/src/tx.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/db/src/tx.rs b/db/src/tx.rs index 85010f4b7..c1526122d 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -714,7 +714,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { let mut aev_trie = into_aev_trie(&self.schema, final_populations, inert_terms)?; let tx_instant; - { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. // Assertions that are :db.cardinality/one and not :db.fulltext. let mut non_fts_one: Vec = vec![]; @@ -787,7 +786,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.store.commit_transaction(self.tx_id)?; - } db::update_partition_map(self.store, &self.partition_map)?; self.watcher.done(&self.tx_id, self.schema)?; From 321bd1a0b81fae6f704d206a3c6f6361eb41e8be Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 11 Jul 2018 17:13:42 -0700 Subject: [PATCH 02/18] Pre: Sort debug::Datoms in Rust, not SQL. This makes `Datoms` own more of its own sorting and display (as `edn::Value`). The goal is to make it pleasant to create `debug::Datoms` instances that don't originate from an SQL query; the immediate consumer is a `TransactWatcher` that collects witnessed datoms... into a `debug::Datoms` instance, ready to compare with `assert_matches!`. --- db/src/debug.rs | 195 +++++++++++++++++++++++++++++------------------- 1 file changed, 119 insertions(+), 76 deletions(-) diff --git a/db/src/debug.rs b/db/src/debug.rs index 80cb1fe4d..ff6e00b9b 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -52,7 +52,15 @@ macro_rules! assert_transact { use std::borrow::Borrow; use std::collections::BTreeMap; -use std::io::{Write}; +use std::cmp::{ + Ordering, +}; +use std::io::{ + Write, +}; +use std::ops::{ + Deref, +}; use itertools::Itertools; use rusqlite; @@ -61,12 +69,21 @@ use rusqlite::types::{ToSql}; use tabwriter::TabWriter; use bootstrap; -use db::*; -use db::{read_attribute_map,read_ident_map}; -use edn; +use db::{ + TypedSQLValue, + ensure_current_version, + new_connection, + read_attribute_map, + read_ident_map, +}; +use edn::{ + self, + ValueRc, +}; use entids; use errors::Result; use mentat_core::{ + Entid, HasSchema, SQLValueType, TxReport, @@ -96,11 +113,10 @@ use watcher::NullWatcher; /// Represents a *datom* (assertion) in the store. #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] pub struct Datom { - // TODO: generalize this. - pub e: EntidOrIdent, - pub a: EntidOrIdent, - pub v: edn::Value, - pub tx: i64, + pub e: Entid, + pub a: Entid, + pub v: TypedValue, + pub tx: Entid, pub added: Option, } @@ -109,7 +125,44 @@ pub struct Datom { /// To make comparision easier, we deterministically order. The ordering is the ascending tuple /// ordering determined by `(e, a, (value_type_tag, v), tx)`, where `value_type_tag` is an internal /// value that is not exposed but is deterministic. -pub struct Datoms(pub Vec); +pub struct Datoms { + pub schema: ValueRc, + pub datoms: Vec, +} + +/// Sort datoms by `[e a v]`, grouping by `tx` first if `added` is present. +fn datom_cmp(x: &Datom, y: &Datom) -> Ordering { + match x.added.is_some() { + true => + (&x.tx, &x.e, &x.a, x.v.value_type().value_type_tag(), &x.v, &x.added).cmp( + &(&y.tx, &y.e, &y.a, y.v.value_type().value_type_tag(), &y.v, &y.added)), + false => + (&x.e, &x.a, x.v.value_type().value_type_tag(), &x.v, &x.tx).cmp( + &(&y.e, &y.a, y.v.value_type().value_type_tag(), &y.v, &y.tx)), + } +} + +impl Datoms { + pub fn new(schema: I, datoms: Vec) -> Self where I: Into> { + let schema = schema.into(); + + let mut datoms = datoms; + datoms[..].sort_unstable_by(datom_cmp); + + Datoms { + schema, + datoms, + } + } +} + +impl Deref for Datoms { + type Target = [Datom]; + + fn deref(&self) -> &Self::Target { + self.datoms.deref() + } +} /// Represents an ordered sequence of transactions in the store. /// @@ -119,11 +172,19 @@ pub struct Datoms(pub Vec); /// retracted assertions appear before added assertions. pub struct Transactions(pub Vec); +impl Deref for Transactions { + type Target = [Datoms]; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + /// Represents the fulltext values in the store. pub struct FulltextValues(pub Vec<(i64, String)>); impl Datom { - pub fn to_edn(&self) -> edn::Value { + pub fn to_edn(&self, schema: &Schema) -> edn::Value { let f = |entid: &EntidOrIdent| -> edn::Value { match *entid { EntidOrIdent::Entid(ref y) => edn::Value::Integer(y.clone()), @@ -131,7 +192,9 @@ impl Datom { } }; - let mut v = vec![f(&self.e), f(&self.a), self.v.clone()]; + let mut v = vec![edn::Value::Integer(self.e), + f(&to_entid_or_ident(schema, self.a)), + self.v.clone().map_ident(schema).to_edn_value_pair().0]; if let Some(added) = self.added { v.push(edn::Value::Integer(self.tx)); v.push(edn::Value::Boolean(added)); @@ -143,7 +206,7 @@ impl Datom { impl Datoms { pub fn to_edn(&self) -> edn::Value { - edn::Value::Vector((&self.0).into_iter().map(|x| x.to_edn()).collect()) + edn::Value::Vector((&self.datoms).into_iter().map(|x| x.to_edn(&self.schema)).collect()) } } @@ -175,97 +238,77 @@ impl ToIdent for TypedValue { } /// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`. -pub fn to_entid(schema: &Schema, entid: i64) -> EntidOrIdent { +pub fn to_entid_or_ident(schema: &Schema, entid: Entid) -> EntidOrIdent { schema.get_ident(entid).map_or(EntidOrIdent::Entid(entid), |ident| EntidOrIdent::Ident(ident.clone())) } // /// Convert a symbolic ident to an ident `Entid` if possible, otherwise a numeric `Entid`. -// pub fn to_ident(schema: &Schema, entid: i64) -> Entid { +// pub fn to_ident(schema: &Schema, entid: Entid) -> Entid { // schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone())) // } /// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of /// the form [... :db/txInstant ...]. -pub fn datoms>(conn: &rusqlite::Connection, schema: &S) -> Result { +pub fn datoms(conn: &rusqlite::Connection, schema: &Schema) -> Result { datoms_after(conn, schema, bootstrap::TX0 - 1) } -/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, -/// ordered by (e, a, v, tx). -/// -/// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. -pub fn datoms_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { - let borrowed_schema = schema.borrow(); +/// Turn a row like `SELECT e, a, v, value_type_tag, tx[, added]?` into a `Datom`, optionally +/// filtering `:db/txInstant` datoms out. +fn row_to_datom(schema: &Schema, filter_tx_instant: bool, row: &rusqlite::Row) -> Result> { + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; - let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, value_type_tag ASC, v ASC, tx ASC")?; + if filter_tx_instant && a == entids::DB_TX_INSTANT { + return Ok(None); + } - let r: Result> = stmt.query_and_then(&[&tx], |row| { - let e: i64 = row.get_checked(0)?; - let a: i64 = row.get_checked(1)?; + let v: rusqlite::types::Value = row.get_checked(2)?; + let value_type_tag: i32 = row.get_checked(3)?; - if a == entids::DB_TX_INSTANT { - return Ok(None); - } + let attribute = schema.require_attribute_for_entid(a)?; + let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; - let v: rusqlite::types::Value = row.get_checked(2)?; - let value_type_tag: i32 = row.get_checked(3)?; + let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; - let attribute = borrowed_schema.require_attribute_for_entid(a)?; - let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; + let tx: Entid = row.get_checked(4)?; + let added: Option = row.get_checked(5).ok(); - let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema); - let (value, _) = typed_value.to_edn_value_pair(); + Ok(Some(Datom { + e, + a, + v: typed_value, + tx, + added, + })) +} - let tx: i64 = row.get_checked(4)?; +/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, +/// ordered by (e, a, v, tx). +/// +/// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. +pub(crate) fn datoms_after(conn: &rusqlite::Connection, schema: &Schema, tx: Entid) -> Result { + let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ?")?; - Ok(Some(Datom { - e: EntidOrIdent::Entid(e), - a: to_entid(borrowed_schema, a), - v: value, - tx: tx, - added: None, - })) - })?.collect(); + let r: Result> = stmt.query_and_then(&[&tx], |row| row_to_datom(schema, true, row))?.collect(); - Ok(Datoms(r?.into_iter().filter_map(|x| x).collect())) + Ok(Datoms::new(schema.clone(), r?.into_iter().filter_map(|x| x).collect())) } /// Return the sequence of transactions in the store with transaction ID strictly greater than the /// given `tx`, ordered by (tx, e, a, v). /// /// Each transaction returned includes the [(transaction-tx) :db/txInstant ...] datom. -pub fn transactions_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { - let borrowed_schema = schema.borrow(); - - let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, value_type_tag ASC, v ASC, added ASC")?; - - let r: Result> = stmt.query_and_then(&[&tx], |row| { - let e: i64 = row.get_checked(0)?; - let a: i64 = row.get_checked(1)?; +pub fn transactions_after(conn: &rusqlite::Connection, schema: &Schema, tx: Entid) -> Result { + let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ?")?; - let v: rusqlite::types::Value = row.get_checked(2)?; - let value_type_tag: i32 = row.get_checked(3)?; + let r: Result> = stmt.query_and_then(&[&tx], |row| row_to_datom(schema, false, row))?.collect(); - let attribute = borrowed_schema.require_attribute_for_entid(a)?; - let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; - - let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema); - let (value, _) = typed_value.to_edn_value_pair(); - - let tx: i64 = row.get_checked(4)?; - let added: bool = row.get_checked(5)?; - - Ok(Datom { - e: EntidOrIdent::Entid(e), - a: to_entid(borrowed_schema, a), - v: value, - tx: tx, - added: Some(added), - }) - })?.collect(); + let schema_rc: ValueRc = schema.clone().into(); // Group by tx. - let r: Vec = r?.into_iter().group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms(group.collect())).collect(); + let r: Vec = r?.into_iter().filter_map(|x| x).group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms::new(schema_rc.clone(), group.collect())).collect(); + Ok(Transactions(r)) } @@ -403,12 +446,12 @@ impl TestConn { // Does not include :db/txInstant. let datoms = datoms_after(&conn, &db.schema, 0).unwrap(); - assert_eq!(datoms.0.len(), 94); + assert_eq!(datoms.len(), 94); // Includes :db/txInstant. let transactions = transactions_after(&conn, &db.schema, 0).unwrap(); - assert_eq!(transactions.0.len(), 1); - assert_eq!(transactions.0[0].0.len(), 95); + assert_eq!(transactions.len(), 1); + assert_eq!(transactions[0].len(), 95); let mut parts = db.partition_map; From 9c34b6ffad084d2974102c6494ab7afe09a513e0 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 11 Jul 2018 17:49:12 -0700 Subject: [PATCH 03/18] Pre: Test transaction watchers at the `db` level. This adds testing machinery and uses it to at least assert the existing behaviour. (Whether the existing behaviour is sensible is a topic for another time.) --- db/src/db.rs | 157 +++++++++++++++++++++++++++++++++++++++++- db/src/debug.rs | 85 ++++++++++++++++++++--- tolstoy/src/syncer.rs | 5 +- 3 files changed, 236 insertions(+), 11 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 93094eba8..9e6ce3fe9 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1108,7 +1108,11 @@ mod tests { }; use super::*; - use debug::{TestConn,tempids}; + use debug::{ + self, + TestConn, + tempids, + }; use edn::{ self, InternSet, @@ -2613,4 +2617,155 @@ mod tests { // Run a basic test as a sanity check. run_test_add(TestConn::with_sqlite(sqlite)); } + + #[test] + fn test_transaction_watcher() { + let mut conn = TestConn::default(); + + // Insert a few :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 1] + [:db/add 101 :db.schema/version 2]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 1 ?tx true] + [101 :db.schema/version 2 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 1] + [101 :db.schema/version 2]]"); + assert_matches!(witnessed, + "[[100 :db.schema/version 1 ?tx true] + [101 :db.schema/version 2 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // And a few :db.cardinality/many elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 200 :db.schema/attribute 100] + [:db/add 200 :db.schema/attribute 101]] + "#); + assert_matches!(conn.last_transaction(), + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 1] + [101 :db.schema/version 2] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + + // Test replacing existing :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 11] + [:db/add 101 :db.schema/version 22]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 1 ?tx false] + [100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 2 ?tx false] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers do not "witness" all datoms that are implied by entities + // transacted. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that asserting existing :db.cardinality/one elements doesn't change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 11] + [:db/add 101 :db.schema/version 22]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that asserting existing :db.cardinality/many elements doesn't change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 200 :db.schema/attribute 100] + [:db/add 200 :db.schema/attribute 101]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that we can retract :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 100 :db.schema/version 11]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 11 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that we can retract :db.cardinality/many elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 200 :db.schema/attribute 100]] + "#); + assert_matches!(conn.last_transaction(), + "[[200 :db.schema/attribute 100 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Verify that retracting :db.cardinality/{one,many} elements that are not present doesn't + // change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 100 :db.schema/version 11] + [:db/retract 200 :db.schema/attribute 100]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx false] ; Not actually applied! + [200 :db.schema/attribute 100 ?tx false] ; Not actually applied! + [?tx :db/txInstant ?ms ?tx true]]"); + } } diff --git a/db/src/debug.rs b/db/src/debug.rs index ff6e00b9b..625f38b16 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -39,22 +39,52 @@ macro_rules! assert_matches { macro_rules! assert_transact { ( $conn: expr, $input: expr, $expected: expr ) => {{ trace!("assert_transact: {}", $input); - let result = $conn.transact($input).map_err(|e| e.to_string()); + let result = $conn.transact($input, NullWatcher()).map_err(|e| e.to_string()); assert_eq!(result, $expected.map_err(|e| e.to_string())); }}; ( $conn: expr, $input: expr ) => {{ trace!("assert_transact: {}", $input); - let result = $conn.transact($input); + let result = $conn.transact($input, NullWatcher()); assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err()); result.unwrap() }}; } -use std::borrow::Borrow; -use std::collections::BTreeMap; +// Transact $input against the given $conn, expecting success. +// +// This unwraps safely and makes asserting errors pleasant. +#[macro_export] +macro_rules! assert_transact_witnessed { + ( $conn: expr, $input: expr ) => {{ + let witnessed = ::std::cell::RefCell::new(None); + let result = { // Scope borrow of witnessed. + let watcher = debug::CollectingWatcher::new(&witnessed); + + trace!("assert_transact: {}", $input); + let result = $conn.transact($input, watcher); + assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err()); + result + }; + + let witnessed = witnessed.into_inner(); + assert!(witnessed.is_some(), "Expected Some(_), got `None`"); + + let (datoms, tx) = witnessed.unwrap(); + let datoms = debug::Datoms::new($conn.schema.clone(), datoms.into_iter().map(|(op, e, a, v)| debug::Datom { e, a, v, tx, added: Some(op == OpType::Add) }).collect()); + + (result.unwrap(), datoms) + }}; +} + +use std::cell::{ + RefCell, +}; use std::cmp::{ Ordering, }; +use std::collections::{ + BTreeMap, +}; use std::io::{ Write, }; @@ -95,6 +125,7 @@ use edn::{ }; use edn::entities::{ EntidOrIdent, + OpType, TempId, }; use internal_types::{ @@ -103,12 +134,19 @@ use internal_types::{ use schema::{ SchemaBuilding, }; -use types::*; use tx::{ transact, transact_terms, }; -use watcher::NullWatcher; +use types::{ + Partition, + PartitionMap, + Schema, +}; +pub use watcher::{ + NullWatcher, + TransactWatcher, +}; /// Represents a *datom* (assertion) in the store. #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] @@ -372,16 +410,16 @@ impl TestConn { assert_eq!(materialized_schema, self.schema); } - pub fn transact(&mut self, transaction: I) -> Result where I: Borrow { + pub fn transact(&mut self, transaction: &str, watcher: W) -> Result where W: TransactWatcher { // Failure to parse the transaction is a coding error, so we unwrap. - let entities = edn::parse::entities(transaction.borrow()).expect(format!("to be able to parse {} into entities", transaction.borrow()).as_str()); + let entities = edn::parse::entities(transaction).expect(format!("to be able to parse {} into entities", transaction).as_str()); let details = { // The block scopes the borrow of self.sqlite. // We're about to write, so go straight ahead and get an IMMEDIATE transaction. let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; // Applying the transaction can fail, so we don't unwrap. - let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, NullWatcher(), entities)?; + let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, watcher, entities)?; tx.commit()?; details }; @@ -496,3 +534,32 @@ pub fn tempids(report: &TxReport) -> TempIds { } TempIds(edn::Value::Map(map)) } + +/// A `CollectingWatcher` accumulates witnessed datoms. +/// +/// The internal `RefCell` is how we get data _out_ of a `TransactWatcher`, which isn't well +/// supported right now. +pub struct CollectingWatcher<'a> { + pub cell: &'a RefCell, Entid)>>, + pub collection: Vec<(OpType, Entid, Entid, TypedValue)>, +} + +impl<'a> CollectingWatcher<'a> { + pub fn new(cell: &'a RefCell, Entid)>>) -> Self { + CollectingWatcher { cell, collection: Vec::new() } + } +} + +impl<'a> TransactWatcher for CollectingWatcher<'a> { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { + self.collection.push((op, e, a, v.clone())) + } + + fn done(&mut self, tx: &Entid, _schema: &Schema) -> Result<()> { + let mut temp = vec![]; + ::std::mem::swap(&mut temp, &mut self.collection); + self.cell.replace(Some((temp, *tx))); + + Ok(()) + } +} diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index 9bc90fb18..25abac8e1 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -418,7 +418,10 @@ mod tests { use edn; - use mentat_db::debug::{TestConn}; + use mentat_db::debug::{ + NullWatcher, + TestConn, + }; #[test] fn test_remote_client_bound_uri() { From b7dcb35701332242c6bace0e3ff76cdbbb766de6 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Sun, 24 Jun 2018 20:55:11 -0700 Subject: [PATCH 04/18] Part 1: Include excision details in core schema. Thinking about migrating forward makes my head hurt, so I'm ignoring it for now. --- db/src/bootstrap.rs | 14 +++++++++++++- db/src/debug.rs | 4 ++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/db/src/bootstrap.rs b/db/src/bootstrap.rs index 4584284d5..7dc9be646 100644 --- a/db/src/bootstrap.rs +++ b/db/src/bootstrap.rs @@ -92,11 +92,15 @@ lazy_static! { ] }; - static ref V1_CORE_SCHEMA: [(symbols::Keyword); 16] = { + static ref V1_CORE_SCHEMA: [(symbols::Keyword); 20] = { [(ns_keyword!("db", "ident")), (ns_keyword!("db.install", "partition")), (ns_keyword!("db.install", "valueType")), (ns_keyword!("db.install", "attribute")), + (ns_keyword!("db", "excise")), + (ns_keyword!("db.excise", "attrs")), + (ns_keyword!("db.excise", "beforeT")), + (ns_keyword!("db.excise", "before")), (ns_keyword!("db", "txInstant")), (ns_keyword!("db", "valueType")), (ns_keyword!("db", "cardinality")), @@ -124,6 +128,14 @@ lazy_static! { :db/cardinality :db.cardinality/many} :db.install/attribute {:db/valueType :db.type/ref :db/cardinality :db.cardinality/many} + :db/excise {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many} + :db.excise/attrs {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many} + :db.excise/beforeT {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/one} + :db.excise/before {:db/valueType :db.type/instant + :db/cardinality :db.cardinality/one} ;; TODO: support user-specified functions in the future. ;; :db.install/function {:db/valueType :db.type/ref ;; :db/cardinality :db.cardinality/many} diff --git a/db/src/debug.rs b/db/src/debug.rs index 625f38b16..3cb432128 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -484,12 +484,12 @@ impl TestConn { // Does not include :db/txInstant. let datoms = datoms_after(&conn, &db.schema, 0).unwrap(); - assert_eq!(datoms.len(), 94); + assert_eq!(datoms.len(), 106); // Includes :db/txInstant. let transactions = transactions_after(&conn, &db.schema, 0).unwrap(); assert_eq!(transactions.len(), 1); - assert_eq!(transactions[0].len(), 95); + assert_eq!(transactions[0].len(), datoms.len() + 1); let mut parts = db.partition_map; From 03dd4a37a35602c5ee0256be8504f9672393d744 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 25 Jun 2018 09:00:13 -0700 Subject: [PATCH 05/18] Part 2: Extract excision details from transaction inputs. I've elected to do this by turning the AEV trie into an EAV trie. This is expensive and not always necessary, but I'm going for correctness first, performance second. I intend to follow-up by using the same EAV trie for the schema detail extraction, which right now is a little ad-hoc. --- db/src/db.rs | 63 ++++++++++++++++++++ db/src/errors.rs | 4 ++ db/src/excision.rs | 125 +++++++++++++++++++++++++++++++++++++++ db/src/internal_types.rs | 23 +++++++ db/src/lib.rs | 1 + db/src/tx.rs | 6 ++ 6 files changed, 222 insertions(+) create mode 100644 db/src/excision.rs diff --git a/db/src/db.rs b/db/src/db.rs index 9e6ce3fe9..8bc8866be 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1097,6 +1097,10 @@ impl PartitionMap { pub(crate) fn contains_entid(&self, entid: Entid) -> bool { self.values().any(|partition| partition.contains_entid(entid)) } + + pub(crate) fn partition_for_entid(&self, entid: Entid) -> Option<&str> { + self.iter().find(|(_name, partition)| partition.contains_entid(entid)).map(|x| x.0.as_ref()) + } } #[cfg(test)] @@ -2768,4 +2772,63 @@ mod tests { [200 :db.schema/attribute 100 ?tx false] ; Not actually applied! [?tx :db/txInstant ?ms ?tx true]]"); } + + fn test_excision_bad_excisions() { + let mut conn = TestConn::default(); + + // Can't specify `:db.excise/before` at all. + assert_transact!(conn, r#"[ + {:db.excise/before #inst "2016-06-06T00:00:00.000Z"} + ]"#, + Err("bad excision: :db.excise/before")); + + // Must specify `:db/excise`. + assert_transact!(conn, r#"[ + {:db.excise/attrs [:db/ident :db/doc]} + ]"#, + Err("bad excision: no :db/excise")); + + assert_transact!(conn, r#"[ + {:db.excise/beforeT (transaction-tx)} + ]"#, + Err("bad excision: no :db/excise")); + + // Can't retract anything to do with excision. + assert_transact!(conn, r#"[ + [:db/retract 100 :db/excise 101] + ]"#, + Err("bad excision: retraction")); + + assert_transact!(conn, r#"[ + [:db/retract 100 :db.excise/beforeT (transaction-tx)] + ]"#, + Err("bad excision: retraction")); + + assert_transact!(conn, r#"[ + [:db/retract 100 :db.excise/attrs :db/ident] + ]"#, + Err("bad excision: retraction")); + + // Can't mutate the schema. This isn't completely implemented yet; right now, Mentat will + // prevent a consumer excising an attribute entity, but not a datom describing an attribute + // of an attribute. That is, you can't excise `:db/txInstant`, but you could remove the + // `:db/valueType :db.type/instant` from `:db/txInstant`. + assert_transact!(conn, r#"[ + {:db/excise :db/txInstant} + ]"#, + Err("bad excision: cannot mutate schema")); + + // Can't excise in the `:db.part/{db,tx}` partitions. + // TODO: test that we can't excise in the `:db.part/db` partition. + let report = assert_transact!(conn, r#"[ + ]"#); + + assert_transact!(conn, &format!(r#"[ + [:db/add "e" :db/excise {}] + ]"#, report.tx_id), + Err("bad excision: cannot target entity in partition :db.part/tx")); + + // TODO: Don't allow anything more than excisions in the excising transaction, except + // additional facts about the (transaction-tx). + } } diff --git a/db/src/errors.rs b/db/src/errors.rs index 5fcd743aa..b0a7967ce 100644 --- a/db/src/errors.rs +++ b/db/src/errors.rs @@ -217,6 +217,10 @@ pub enum DbErrorKind { #[fail(display = "bad schema assertion: {}", _0)] BadSchemaAssertion(String), + /// A bad excision was transacted. + #[fail(display = "bad excision: {}", _0)] + BadExcision(String), + /// An ident->entid mapping failed. #[fail(display = "no entid found for ident: {}", _0)] UnrecognizedIdent(String), diff --git a/db/src/excision.rs b/db/src/excision.rs new file mode 100644 index 000000000..d9ffddee4 --- /dev/null +++ b/db/src/excision.rs @@ -0,0 +1,125 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::collections::{ + BTreeSet, + BTreeMap, +}; + +use mentat_core::{ + Attribute, + Entid, + HasSchema, + Schema, +}; + +use entids; + +use errors::{ + DbErrorKind, + Result, +}; + +use internal_types::{ + AEVTrie, + filter_aev_to_eav, +}; + +use schema::{ + SchemaBuilding, +}; + +use types::{ + PartitionMap, +}; + +/// Details about an excision: +/// - a target to excise (for now, an entid); +/// - a possibly empty set of attributes to excise (the empty set means all attributes, not no +/// attributes); +/// - and a possibly omitted transaction ID to limit the excision before. (TODO: check whether +/// Datomic excises the last retraction before the first remaining assertion, and make our +/// behaviour agree.) +/// +/// `:db/before` doesn't make sense globally, since in Mentat, monotonically increasing +/// transaction IDs don't guarantee monotonically increasing txInstant values. Therefore, we +/// accept only `:db/beforeT` and allow consumers to turn `:db/before` timestamps into +/// transaction IDs in whatever way they see fit. +#[derive(Clone, Debug, Default)] +pub(crate) struct Excision { + pub(crate) target: Entid, + pub(crate) attrs: Option>, + pub(crate) before_tx: Option, +} + +pub(crate) type Excisions = BTreeMap; + +/// Extract excisions from the given transacted datoms. +pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: &'schema Schema, aev_trie: &AEVTrie<'schema>) -> Result> { + let pair = |a: Entid| -> Result<(Entid, &'schema Attribute)> { + schema.require_attribute_for_entid(a).map(|attribute| (a, attribute)) + }; + + if aev_trie.contains_key(&pair(entids::DB_EXCISE_BEFORE)?) { + bail!(DbErrorKind::BadExcision(":db.excise/before".into())); // TODO: more details. + } + + let eav_trie = filter_aev_to_eav(aev_trie, |&(a, _)| + a == entids::DB_EXCISE || + a == entids::DB_EXCISE_ATTRS || + a == entids::DB_EXCISE_BEFORE_T); + + let mut excisions = BTreeMap::default(); + + for (&e, avs) in eav_trie.iter() { + for (&(_a, _attribute), ars) in avs { + if !ars.retract.is_empty() { + bail!(DbErrorKind::BadExcision("retraction".into())); // TODO: more details. + } + } + + let target = avs.get(&pair(entids::DB_EXCISE)?) + .and_then(|ars| ars.add.iter().next().cloned()) + .and_then(|v| v.into_entid()) + .ok_or_else(|| DbErrorKind::BadExcision("no :db/excise".into()))?; // TODO: more details. + + if schema.get_ident(target).is_some() { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); // TODO: more details. + } + + let partition = partition_map.partition_for_entid(target) + .ok_or_else(|| DbErrorKind::BadExcision("target has no partition".into()))?; // TODO: more details. + // Right now, Mentat only supports `:db.part/{db,user,tx}`, and tests hack in `:db.part/fake`. + if partition == ":db.part/db" || partition == ":db.part/tx" { + bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", partition).into())); // TODO: more details. + } + + let before_tx = avs.get(&pair(entids::DB_EXCISE_BEFORE_T)?) + .and_then(|ars| ars.add.iter().next().cloned()) + .and_then(|v| v.into_entid()); + + let attrs = avs.get(&pair(entids::DB_EXCISE_ATTRS)?) + .map(|ars| ars.add.clone().into_iter().filter_map(|v| v.into_entid()).collect()); + + let excision = Excision { + target, + attrs, + before_tx, + }; + + excisions.insert(e, excision); + } + + if excisions.is_empty() { + Ok(None) + } else { + Ok(Some(excisions)) + } +} diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs index 782ff22d6..4438cf2eb 100644 --- a/db/src/internal_types.rs +++ b/db/src/internal_types.rs @@ -214,3 +214,26 @@ pub(crate) struct AddAndRetract { // A trie-like structure mapping a -> e -> v that prefix compresses and makes uniqueness constraint // checking more efficient. BTree* for deterministic errors. pub(crate) type AEVTrie<'schema> = BTreeMap<(Entid, &'schema Attribute), BTreeMap>; + +// A trie-like structure mapping e -> a -> v that prefix compresses and makes accumulating fragments +// (schema fragments, say, or excision fragments) efficient. BTree* for deterministic errors. +pub(crate) type EAVTrie<'schema> = BTreeMap>; + +pub(crate) fn filter_aev_to_eav<'schema, P>(aev_trie: &AEVTrie<'schema>, mut attr_pair_predicate: P) -> EAVTrie<'schema> +where P: FnMut(&(Entid, &'schema Attribute)) -> bool { + let mut eav_trie: EAVTrie<'schema> = Default::default(); + + for (&a_pair, evs) in aev_trie.iter() { + if !attr_pair_predicate(&a_pair) { + continue + } + + for (&e, ars) in evs { + eav_trie + .entry(e).or_insert(BTreeMap::default()) + .insert(a_pair, ars.clone()); + } + } + + eav_trie +} diff --git a/db/src/lib.rs b/db/src/lib.rs index 27cd09438..eacdfebe5 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -46,6 +46,7 @@ pub mod cache; pub mod db; mod bootstrap; pub mod entids; +mod excision; pub mod internal_types; // pub because we need them for building entities programmatically. mod metadata; mod schema; diff --git a/db/src/tx.rs b/db/src/tx.rs index c1526122d..ba7da698b 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -106,6 +106,7 @@ use edn::entities::{ OpType, TempId, }; +use excision; use metadata; use rusqlite; use schema::{ @@ -740,6 +741,11 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { bail!(DbErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::CardinalityConflicts { conflicts: errors })); } + let excisions = excision::excisions(&self.partition_map, &self.schema, &aev_trie)?; + if !excisions.is_none() { + bail!(DbErrorKind::NotYetImplemented(format!("Excision not yet implemented: {:?}", excisions))); + } + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. // Collect into non_fts_*. From d80e5d924efe97eca0ab30e2fa39b270934c5e17 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 25 Jun 2018 14:01:12 -0700 Subject: [PATCH 06/18] Part 3: First cut at handling excision. --- db/src/db.rs | 178 +++++++++++++++++++++++++++++++++++++++++++-- db/src/debug.rs | 4 +- db/src/excision.rs | 118 +++++++++++++++++++++++++++++- db/src/tx.rs | 11 ++- 4 files changed, 295 insertions(+), 16 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 8bc8866be..2736d5ab1 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -14,7 +14,10 @@ use failure::{ ResultExt, }; -use std::collections::HashMap; +use std::collections::{ + BTreeMap, + HashMap, +}; use std::collections::hash_map::{ Entry, }; @@ -59,6 +62,15 @@ use errors::{ DbErrorKind, Result, }; +// use excision::{ +// excisions, +// Excision, +// ExcisionMap, +// }; +use internal_types::{ + AddAndRetract, + AEVTrie, +}; use metadata; use schema::{ SchemaBuilding, @@ -246,6 +258,16 @@ lazy_static! { r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#, // TODO: store entid instead of ident for partition name. r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, + + // Excisions are transacted as data, so they end up in the transactions table; this + // materializes that view. Excisions are never removed from the transactions (or datoms) + // table so it's not required to include the `added` column, but let's do it so that if we + // grow a more uniform approach to materialized views of the transactions table, this is + // close to the shape of that table. + // + // `status` tracks whether an excision has been applied (0) or is pending (> 0). + r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, target INTEGER NOT NULL, before_tx INTEGER, status INTEGER NOT NULL)"#, + r#"CREATE TABLE excision_attrs (e INTEGER NOT NULL, a SMALLINT NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, ] }; } @@ -437,6 +459,39 @@ pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) - m } +/// Read an arbitrary [e a v value_type_tag tx added] materialized view from the given table in the +/// SQL store, returing an AEV trie. +pub(crate) fn read_materialized_transaction_aev_trie<'schema>(conn: &rusqlite::Connection, schema: &'schema Schema, table: &str) -> Result> { + let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag, tx, added FROM {}", table).as_str())?; + let m: Result> = stmt.query_and_then(&[], |row| { + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; + let v: rusqlite::types::Value = row.get_checked(2)?; + let value_type_tag: i32 = row.get_checked(3)?; + let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; + let tx: Entid = row.get_checked(4)?; + let added: bool = row.get_checked(5)?; + Ok((e, a, typed_value, tx, added)) + })?.collect(); + + let mut trie = AEVTrie::default(); + + for (e, a, v, _, added) in m? { + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + + let a_and_r = trie + .entry((a, attribute)).or_insert(BTreeMap::default()) + .entry(e).or_insert(AddAndRetract::default()); + + match added { + true => a_and_r.add.insert(v), + false => a_and_r.retract.insert(v), + }; + } + + Ok(trie) +} + /// Read the partition map materialized view from the given SQL store. fn read_partition_map(conn: &rusqlite::Connection) -> Result { let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, end, idx, allow_excision FROM parts")?; @@ -494,7 +549,7 @@ pub enum SearchType { /// Right now, the only implementation of `MentatStoring` is the SQLite-specific SQL schema. In the /// future, we might consider other SQL engines (perhaps with different fulltext indexing), or /// entirely different data stores, say ones shaped like key-value stores. -pub trait MentatStoring { +pub(crate) trait MentatStoring { /// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples. /// /// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one @@ -1107,16 +1162,14 @@ impl PartitionMap { mod tests { extern crate env_logger; - use std::borrow::{ - Borrow, - }; - use super::*; use debug::{ self, TestConn, tempids, }; + use errors; + use excision; use edn::{ self, InternSet, @@ -1134,7 +1187,6 @@ mod tests { use std::collections::{ BTreeMap, }; - use errors; use internal_types::{ Term, }; @@ -2831,4 +2883,116 @@ mod tests { // TODO: Don't allow anything more than excisions in the excising transaction, except // additional facts about the (transaction-tx). } + + #[test] + fn test_excision() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002]} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let report = assert_transact!(conn, r#"[ + {:db/id "e" :db/excise 300} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [301 :test/one 1001] + [?e :db/excise 300]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + target: 300, + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the target entity of the excision. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/one 1001 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } } diff --git a/db/src/debug.rs b/db/src/debug.rs index 3cb432128..28c62062d 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -13,7 +13,7 @@ /// Low-level functions for testing. -// Macro to parse a `Borrow` to an `edn::Value` and assert the given `edn::Value` `matches` +// Macro to parse a `&str` to an `edn::Value` and assert the given `edn::Value` `matches` // against it. // // This is a macro only to give nice line numbers when tests fail. @@ -21,7 +21,7 @@ macro_rules! assert_matches { ( $input: expr, $expected: expr ) => {{ // Failure to parse the expected pattern is a coding error, so we unwrap. - let pattern_value = edn::parse::value($expected.borrow()) + let pattern_value = edn::parse::value($expected) .expect(format!("to be able to parse expected {}", $expected).as_str()) .without_spans(); let input_value = $input.to_edn(); diff --git a/db/src/excision.rs b/db/src/excision.rs index d9ffddee4..a707a1648 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -13,6 +13,8 @@ use std::collections::{ BTreeMap, }; +use rusqlite; + use mentat_core::{ Attribute, Entid, @@ -52,17 +54,18 @@ use types::{ /// transaction IDs don't guarantee monotonically increasing txInstant values. Therefore, we /// accept only `:db/beforeT` and allow consumers to turn `:db/before` timestamps into /// transaction IDs in whatever way they see fit. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)] pub(crate) struct Excision { pub(crate) target: Entid, pub(crate) attrs: Option>, pub(crate) before_tx: Option, } -pub(crate) type Excisions = BTreeMap; +/// Map from `entid` to excision details. `entid` is not the excision `target`! +pub(crate) type ExcisionMap = BTreeMap; /// Extract excisions from the given transacted datoms. -pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: &'schema Schema, aev_trie: &AEVTrie<'schema>) -> Result> { +pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: &'schema Schema, aev_trie: &AEVTrie<'schema>) -> Result> { let pair = |a: Entid| -> Result<(Entid, &'schema Attribute)> { schema.require_attribute_for_entid(a).map(|attribute| (a, attribute)) }; @@ -71,12 +74,14 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & bail!(DbErrorKind::BadExcision(":db.excise/before".into())); // TODO: more details. } + // TODO: Don't allow anything more than excisions in the excising transaction, except + // additional facts about the (transaction-tx). let eav_trie = filter_aev_to_eav(aev_trie, |&(a, _)| a == entids::DB_EXCISE || a == entids::DB_EXCISE_ATTRS || a == entids::DB_EXCISE_BEFORE_T); - let mut excisions = BTreeMap::default(); + let mut excisions = ExcisionMap::default(); for (&e, avs) in eav_trie.iter() { for (&(_a, _attribute), ars) in avs { @@ -123,3 +128,108 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & Ok(Some(excisions)) } } + +pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: ExcisionMap) -> Result<()> { + // excisions.into_iter().map(|(entid, excision)| enqueue_pending_excision(self, entid, excision)).collect().and(Ok(())) + + // if !excisions.is_none() { + // bail!(DbError::NotYetImplemented(format!("Excision not yet implemented: {:?}", excisions))); + // } + + let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?, ?)")?; + let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; + + for (entid, excision) in excisions { + stmt1.execute(&[&entid, &excision.target, &excision.before_tx, &excision.before_tx.unwrap_or(tx_id)])?; // XXX + if let Some(attrs) = excision.attrs { + // println!("attrs {:?}", attrs); + for attr in attrs { + stmt2.execute(&[&entid, &attr])?; + } + } + } + + // TODO: filter by attrs. + let mut stmt: rusqlite::Statement = conn.prepare(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d, excisions AS e WHERE e.status > 0 AND (e.target IS d.e OR (e.target IS d.v AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", entids::DB_EXCISE).as_ref())?; + + stmt.execute(&[])?; + + Ok(()) +} + +pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { + let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, target, before_tx, status FROM excisions WHERE status > 0 ORDER BY e")?; + let mut stmt2: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; + + let m: Result = stmt1.query_and_then(&[], |row| { + let e: Entid = row.get_checked(0)?; + let target: Entid = row.get_checked(1)?; + let before_tx: Option = row.get_checked(2)?; + + let attrs: Result> = stmt2.query_and_then(&[&e], |row| { + let a: Entid = row.get_checked(0)?; + Ok(a) + })?.collect(); + let attrs = attrs.map(|attrs| { + if attrs.is_empty() { + None + } else { + Some(attrs) + } + })?; + + let excision = Excision { + target, + before_tx, + attrs, + }; + + Ok((e, excision)) + })?.collect(); + + m + + // let aev_trie = read_materialized_transaction_aev_trie(&conn, schema, "excisions")?; + + // excisions(&partition_map, &schema, &aev_trie).map(|o| o.unwrap_or_default()) +} + +pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection) -> Result<()> { + // let pending = pending_excisions(self)?; + + // WITH ids AS (SELECT rid + // FROM temp.search_results + // WHERE rid IS NOT NULL AND + // ((added0 IS 0) OR + // (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))) + // DELETE FROM datoms WHERE rowid IN ids"#; + + // TODO: filter by attrs. + let mut stmt: rusqlite::Statement = conn.prepare(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t, excisions AS e WHERE e.status > 0 AND t.tx <= e.status AND (e.target IS t.e OR (e.target IS t.v AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", entids::DB_EXCISE).as_ref())?; + + stmt.execute(&[])?; + + let mut stmt: rusqlite::Statement = conn.prepare("UPDATE excisions SET status = 0")?; + + stmt.execute(&[])?; + + // let relevant_tx_ids: Result> = stmt.query_and_then(&[], |row| { + // let e: Entid = row.get_checked(0)?; + // let target: + // Ok(tx) + // })?.collect(); + + // println!("relevant_tx_ids: {:?}", relevant_tx_ids?); + + // let mut stmt: rusqlite::Statement = conn.prepare(format!("DELETE FROM transactions AS t WHERE t.tx = ? AND (t.e = ? OR (t.v = ? AND t.a != {}))", entids::DB_EXCISE).as_str()); + + // for tx_id in relevant_tx_ids { + + // } + + Ok(()) +} + +// fn enqueue_pending_excision(conn: &rusqlite::Connection, excision: Excision) -> Result<()> { +// Ok(()) +// } diff --git a/db/src/tx.rs b/db/src/tx.rs index ba7da698b..9c3588ae1 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -742,9 +742,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } let excisions = excision::excisions(&self.partition_map, &self.schema, &aev_trie)?; - if !excisions.is_none() { - bail!(DbErrorKind::NotYetImplemented(format!("Excision not yet implemented: {:?}", excisions))); - } // Pipeline stage 4: final terms (after rewriting) -> DB insertions. // Collect into non_fts_*. @@ -809,12 +806,20 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // regular transactor code paths, updating the schema and materialized views uniformly. // But, belt-and-braces: handle it gracefully. if new_schema != *self.schema_for_mutation { + if excisions.is_some() { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); + } + let old_schema = (*self.schema_for_mutation).clone(); // Clone the original Schema for comparison. *self.schema_for_mutation.to_mut() = new_schema; // Store the new Schema. db::update_metadata(self.store, &old_schema, &*self.schema_for_mutation, &metadata_report)?; } } + if let Some(excisions) = excisions { + excision::enqueue_pending_excisions(self.store, self.schema, self.tx_id, excisions)?; + } + Ok(TxReport { tx_id: self.tx_id, tx_instant, From 2a5d5c38cdb3426d5350348b04c7b99d1e7fc8d5 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 25 Jun 2018 20:34:30 -0700 Subject: [PATCH 07/18] Part 4: Handle :db.excise/attrs. --- db/src/db.rs | 133 +++++++++++++++++++++++++++++++++++++++++++++ db/src/excision.rs | 112 ++++++++++++++++++++------------------ db/src/tx.rs | 2 +- 3 files changed, 192 insertions(+), 55 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 2736d5ab1..521f52571 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -2973,6 +2973,8 @@ mod tests { [[?e :db/excise 300 ?tx3 true] [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + // After processing the pending excision, we have nothing left pending. let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); assert_eq!(pending, Default::default()); @@ -2995,4 +2997,135 @@ mod tests { [[?e :db/excise 300 ?tx3 true] [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); } + + #[test] + fn test_excision_with_attrs() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. Also + // includes a self ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002] + :test/ref 300} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [300 :test/ref 300] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let report = assert_transact!(conn, r#"[ + {:db/id "e" :db/excise 300 :db.excise/attrs [:test/one :test/many]} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/ref 300] + [301 :test/one 1001] + [301 :test/ref 300] + [?e :db/excise 300] + [?e :db.excise/attrs :test/one] + [?e :db.excise/attrs :test/many]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + target: 300, + // Ordered by keyword. + attrs: Some(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "many")).unwrap().0) + .chain(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "one")).unwrap().0)) + .collect()), + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [300 :test/ref 300 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db.excise/attrs :test/one ?tx3 true] + [?e :db.excise/attrs :test/many ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/ref 300 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db.excise/attrs :test/one ?tx3 true] + [?e :db.excise/attrs :test/many ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } } diff --git a/db/src/excision.rs b/db/src/excision.rs index a707a1648..9c068c27b 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -13,6 +13,8 @@ use std::collections::{ BTreeMap, }; +use itertools::Itertools; + use rusqlite; use mentat_core::{ @@ -129,42 +131,71 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & } } -pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: ExcisionMap) -> Result<()> { - // excisions.into_iter().map(|(entid, excision)| enqueue_pending_excision(self, entid, excision)).collect().and(Ok(())) +fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> { + match excision.attrs { + Some(ref attrs) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.a IN ({})) DELETE FROM datoms WHERE rowid IN ids", + excision.target, s).as_ref(), &[])?; + }, + None => { + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE (d.e IS {} OR (d.v IS {} AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", + excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; + }, + } + + Ok(()) +} + +fn excise_transactions_before_tx(conn: &rusqlite::Connection, excision: &Excision, before_tx: Entid) -> Result<()> { + match excision.attrs { + Some(ref attrs) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.a IN ({})) DELETE FROM transactions WHERE rowid IN ids", + excision.target, s).as_ref(), &[])?; + }, + None => { + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.tx <= {} AND (t.e IS {} OR (t.v IS {} AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", + before_tx, excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; + }, + } - // if !excisions.is_none() { - // bail!(DbError::NotYetImplemented(format!("Excision not yet implemented: {:?}", excisions))); - // } + Ok(()) +} +pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?, ?)")?; let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; for (entid, excision) in excisions { - stmt1.execute(&[&entid, &excision.target, &excision.before_tx, &excision.before_tx.unwrap_or(tx_id)])?; // XXX - if let Some(attrs) = excision.attrs { - // println!("attrs {:?}", attrs); + let status = excision.before_tx.unwrap_or(tx_id); + stmt1.execute(&[entid, &excision.target, &excision.before_tx, &status])?; + + if let Some(ref attrs) = excision.attrs { for attr in attrs { - stmt2.execute(&[&entid, &attr])?; + stmt2.execute(&[entid, attr])?; } } } - // TODO: filter by attrs. - let mut stmt: rusqlite::Statement = conn.prepare(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d, excisions AS e WHERE e.status > 0 AND (e.target IS d.e OR (e.target IS d.v AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", entids::DB_EXCISE).as_ref())?; - - stmt.execute(&[])?; + // Might as well not interleave writes to "excisions" and "excision_attrs" with writes to + // "datoms". This also leaves open the door for a more efficient bulk operation. + for (_entid, excision) in excisions { + excise_datoms(conn, &excision)?; + } Ok(()) } -pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { +fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result> { let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, target, before_tx, status FROM excisions WHERE status > 0 ORDER BY e")?; let mut stmt2: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; - let m: Result = stmt1.query_and_then(&[], |row| { + let m: Result> = stmt1.query_and_then(&[], |row| { let e: Entid = row.get_checked(0)?; let target: Entid = row.get_checked(1)?; let before_tx: Option = row.get_checked(2)?; + let status: Entid = row.get_checked(3)?; let attrs: Result> = stmt2.query_and_then(&[&e], |row| { let a: Entid = row.get_checked(0)?; @@ -184,52 +215,25 @@ pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &Par attrs, }; - Ok((e, excision)) + Ok((e, excision, status)) })?.collect(); m - - // let aev_trie = read_materialized_transaction_aev_trie(&conn, schema, "excisions")?; - - // excisions(&partition_map, &schema, &aev_trie).map(|o| o.unwrap_or_default()) } -pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection) -> Result<()> { - // let pending = pending_excisions(self)?; - - // WITH ids AS (SELECT rid - // FROM temp.search_results - // WHERE rid IS NOT NULL AND - // ((added0 IS 0) OR - // (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))) - // DELETE FROM datoms WHERE rowid IN ids"#; - - // TODO: filter by attrs. - let mut stmt: rusqlite::Statement = conn.prepare(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t, excisions AS e WHERE e.status > 0 AND t.tx <= e.status AND (e.target IS t.e OR (e.target IS t.v AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", entids::DB_EXCISE).as_ref())?; - - stmt.execute(&[])?; - - let mut stmt: rusqlite::Statement = conn.prepare("UPDATE excisions SET status = 0")?; - - stmt.execute(&[])?; - - // let relevant_tx_ids: Result> = stmt.query_and_then(&[], |row| { - // let e: Entid = row.get_checked(0)?; - // let target: - // Ok(tx) - // })?.collect(); - - // println!("relevant_tx_ids: {:?}", relevant_tx_ids?); +pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { + let list = pending_excision_list(conn, partition_map, schema)?; + Ok(list.into_iter().map(|(entity, excision, _status)| (entity, excision)).collect()) +} - // let mut stmt: rusqlite::Statement = conn.prepare(format!("DELETE FROM transactions AS t WHERE t.tx = ? AND (t.e = ? OR (t.v = ? AND t.a != {}))", entids::DB_EXCISE).as_str()); +pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { + let list = pending_excision_list(conn, partition_map, schema)?; - // for tx_id in relevant_tx_ids { + for (_entid, excision, status) in &list { + excise_transactions_before_tx(conn, &excision, *status)?; + } - // } + conn.execute("UPDATE excisions SET status = 0", &[])?; - Ok(()) + Ok(list.into_iter().map(|(entity, excision, _status)| (entity, excision)).collect()) } - -// fn enqueue_pending_excision(conn: &rusqlite::Connection, excision: Excision) -> Result<()> { -// Ok(()) -// } diff --git a/db/src/tx.rs b/db/src/tx.rs index 9c3588ae1..4afa69bbe 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -817,7 +817,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } if let Some(excisions) = excisions { - excision::enqueue_pending_excisions(self.store, self.schema, self.tx_id, excisions)?; + excision::enqueue_pending_excisions(self.store, self.schema, self.tx_id, &excisions)?; } Ok(TxReport { From c81c51c56a42e75ee026d6c9a15480baeda51011 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 25 Jun 2018 21:03:07 -0700 Subject: [PATCH 08/18] Part 5: Handle :db.excise/beforeT. --- db/src/db.rs | 136 +++++++++++++++++++++++++++++++++++++++++++++ db/src/excision.rs | 30 ++++++++-- 2 files changed, 160 insertions(+), 6 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 521f52571..9a036b76b 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -3128,4 +3128,140 @@ mod tests { [?e :db.excise/attrs :test/many ?tx3 true] [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); } + + #[test] + fn test_excision_with_before_tx() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + let report = assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001]} + {:db/id 301 + :test/ref 300} + ]"#); + + // Let's assert a new cardinality one value. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1001 + :test/many [2002 2003]} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1001] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [300 :test/many 2003] + [301 :test/ref 300]]"#); + + let tempid_report = assert_transact!(conn, format!(r#"[ + [:db/add "e" :db/excise 300] + [:db/add "e" :db.excise/beforeT {}] + ]"#, report.tx_id)); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&tempid_report), + "{\"e\" 65536}"); + + // After. + assert_matches!(conn.datoms(), format!(r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1001] + [300 :test/many 2002] + [300 :test/many 2003] + [301 :test/ref 300] + [?e :db/excise 300] + [?e :db.excise/beforeT {}]]"#, report.tx_id)); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + target: 300, + // Ordered by keyword. + attrs: None, + before_tx: Some(report.tx_id), + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), format!(r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[300 :test/one 1000 ?tx3 false] + [300 :test/one 1001 ?tx3 true] + [300 :test/many 2002 ?tx3 true] + [300 :test/many 2003 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?e :db.excise/beforeT {} ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#, report.tx_id)); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), format!(r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[300 :test/one 1000 ?tx3 false] ; XXX Is this right? + [300 :test/one 1001 ?tx3 true] + [300 :test/many 2002 ?tx3 true] + [300 :test/many 2003 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?e :db.excise/beforeT {} ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#, report.tx_id)); + } } diff --git a/db/src/excision.rs b/db/src/excision.rs index 9c068c27b..146af889d 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -132,13 +132,22 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & } fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> { - match excision.attrs { - Some(ref attrs) => { + match (excision.before_tx, &excision.attrs) { + (Some(before_tx), Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.a IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", + excision.target, s, before_tx).as_ref(), &[])?; + }, + (Some(before_tx), None) => { + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", + excision.target, before_tx).as_ref(), &[])?; + }, + (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.a IN ({})) DELETE FROM datoms WHERE rowid IN ids", excision.target, s).as_ref(), &[])?; }, - None => { + (None, None) => { conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE (d.e IS {} OR (d.v IS {} AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; }, @@ -148,13 +157,22 @@ fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> } fn excise_transactions_before_tx(conn: &rusqlite::Connection, excision: &Excision, before_tx: Entid) -> Result<()> { - match excision.attrs { - Some(ref attrs) => { + match (excision.before_tx, &excision.attrs) { + (Some(before_tx), Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.a IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", + excision.target, s, before_tx).as_ref(), &[])?; + }, + (Some(before_tx), None) => { + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", + excision.target, before_tx).as_ref(), &[])?; + }, + (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.a IN ({})) DELETE FROM transactions WHERE rowid IN ids", excision.target, s).as_ref(), &[])?; }, - None => { + (None, None) => { conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.tx <= {} AND (t.e IS {} OR (t.v IS {} AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", before_tx, excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; }, From 72293f94e37ca7a82c25a41adb766c0d1be26754 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Fri, 29 Jun 2018 10:20:39 -0700 Subject: [PATCH 09/18] Part 6: Vacuum the `fulltext_values` table after clearing excisions. --- db/src/db.rs | 115 +++++++++++++++++++++++++++++++++++++++++++++ db/src/excision.rs | 34 ++++++++++++++ 2 files changed, 149 insertions(+) diff --git a/db/src/db.rs b/db/src/db.rs index 9a036b76b..cf3d6194c 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -3264,4 +3264,119 @@ mod tests { [?e :db.excise/beforeT {} ?tx4 true] [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#, report.tx_id)); } + + #[test] + fn test_excision_fulltext() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 + :db/ident :test/fulltext + :db/valueType :db.type/string + :db/cardinality :db.cardinality/one + :db/fulltext true + :db/index true} + ]"#); + + assert_transact!(conn, r#"[ + {:db/id 300 + :test/fulltext "test1"} + {:db/id 301 + :test/fulltext "test2"} + ]"#); + + assert_transact!(conn, r#"[ + {:db/id 300 + :test/fulltext "test3"} + {:db/id 301 + :test/fulltext "test4"} + ]"#); + + // Before. + assert_matches!(conn.fulltext_values(), r#" + [[1 "test1"] + [2 "test2"] + [3 "test3"] + [4 "test4"]]"#); + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/fulltext] + [200 :db/valueType :db.type/string] + [200 :db/cardinality :db.cardinality/one] + [200 :db/index true] + [200 :db/fulltext true] + [300 :test/fulltext 3] + [301 :test/fulltext 4]]"#); + + let tempid_report = assert_transact!(conn, r#"[ + [:db/add "e" :db/excise 300] + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&tempid_report), + "{\"e\" 65536}"); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/fulltext] + [200 :db/valueType :db.type/string] + [200 :db/cardinality :db.cardinality/one] + [200 :db/index true] + [200 :db/fulltext true] + [301 :test/fulltext 4] + [?e :db/excise 300]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + target: 300, + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/fulltext ?tx1 true] + [200 :db/valueType :db.type/string ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [200 :db/index true ?tx1 true] + [200 :db/fulltext true ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/fulltext 1 ?tx2 true] + [301 :test/fulltext 2 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[300 :test/fulltext 1 ?tx3 false] + [300 :test/fulltext 3 ?tx3 true] + [301 :test/fulltext 2 ?tx3 false] + [301 :test/fulltext 4 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/fulltext ?tx1 true] + [200 :db/valueType :db.type/string ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [200 :db/index true ?tx1 true] + [200 :db/fulltext true ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/fulltext 2 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[301 :test/fulltext 2 ?tx3 false] + [301 :test/fulltext 4 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#); + + // After processing the pending excision, we have vacuumed dangling fulltext values. + assert_matches!(conn.fulltext_values(), r#" + [[2 "test2"] + [4 "test4"]]"#); + } } diff --git a/db/src/excision.rs b/db/src/excision.rs index 146af889d..0a72ae9d1 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -22,6 +22,7 @@ use mentat_core::{ Entid, HasSchema, Schema, + TypedValue, }; use entids; @@ -253,5 +254,38 @@ pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition conn.execute("UPDATE excisions SET status = 0", &[])?; + // TODO: only vacuum fulltext if an excision (likely) impacted fulltext values, since this is + // very expensive. As always, correctness first, performance second. + vacuum_fulltext_table(conn)?; + Ok(list.into_iter().map(|(entity, excision, _status)| (entity, excision)).collect()) } + + +/// Delete fulltext values that are no longer refered to in the `datoms` or `transactions` table. +pub(crate) fn vacuum_fulltext_table(conn: &rusqlite::Connection) -> Result<()> { + let (true_value, true_value_type_tag) = TypedValue::Boolean(true).to_sql_value_pair(); + + // First, collect all `:db/fulltext true` attributes. This is easier than extracting them from + // a `Schema` (no need to execute multiple insertions for large collections), but less flexible. + conn.execute(r#"CREATE TABLE temp.fulltext_as (a SMALLINT NOT NULL)"# , &[])?; + conn.execute(r#"INSERT INTO temp.fulltext_as (a) + SELECT e FROM schema WHERE a = ? AND v = ? AND value_type_tag = ?"# , + &[&entids::DB_FULLTEXT, &true_value, &true_value_type_tag])?; + + // Next, purge values that aren't referenced. We're using that `:db/fulltext true` attributes + // always have `:db/index true`, so that we can use the `avet` index. + conn.execute(r#"DELETE FROM fulltext_values + WHERE rowid NOT IN + (SELECT v + FROM datoms + WHERE index_avet IS NOT 0 AND a IN temp.fulltext_as + UNION ALL + SELECT v + FROM transactions + WHERE a IN temp.fulltext_as)"#, &[])?; + + conn.execute(r#"DROP TABLE temp.fulltext_as"# , &[])?; + + Ok(()) +} From 00787cb392597e40a9a7b1eeea62e62f6baa396a Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 9 Jul 2018 14:47:37 -0700 Subject: [PATCH 10/18] Review comment: Handle multiple excision targets (like `{:db/excise [x y]}`). This was a fundamental error in the initial implementation; I made an error and didn't realize `:db/excise` was cardinality many. --- db/src/db.rs | 126 +++++++++++++++++++++++++++++++++++++++++++-- db/src/excision.rs | 123 ++++++++++++++++++++++++++----------------- 2 files changed, 197 insertions(+), 52 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index cf3d6194c..86ba2da93 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -266,7 +266,8 @@ lazy_static! { // close to the shape of that table. // // `status` tracks whether an excision has been applied (0) or is pending (> 0). - r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, target INTEGER NOT NULL, before_tx INTEGER, status INTEGER NOT NULL)"#, + r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, before_tx INTEGER, status INTEGER NOT NULL)"#, + r#"CREATE TABLE excision_targets (e INTEGER NOT NULL, target INTEGER NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, r#"CREATE TABLE excision_attrs (e INTEGER NOT NULL, a SMALLINT NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, ] }; @@ -2946,7 +2947,7 @@ mod tests { // We have enqueued a pending excision. let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); assert_eq!(pending, ::std::iter::once((65536, excision::Excision { - target: 300, + targets: vec![300].into_iter().collect(), attrs: None, before_tx: None, })).collect()); @@ -3067,7 +3068,7 @@ mod tests { // We have enqueued a pending excision. let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); assert_eq!(pending, ::std::iter::once((65536, excision::Excision { - target: 300, + targets: vec![300].into_iter().collect(), // Ordered by keyword. attrs: Some(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "many")).unwrap().0) .chain(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "one")).unwrap().0)) @@ -3202,7 +3203,7 @@ mod tests { // We have enqueued a pending excision. let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); assert_eq!(pending, ::std::iter::once((65536, excision::Excision { - target: 300, + targets: vec![300].into_iter().collect(), // Ordered by keyword. attrs: None, before_tx: Some(report.tx_id), @@ -3327,7 +3328,7 @@ mod tests { // We have enqueued a pending excision. let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); assert_eq!(pending, ::std::iter::once((65536, excision::Excision { - target: 300, + targets: vec![300].into_iter().collect(), attrs: None, before_tx: None, })).collect()); @@ -3379,4 +3380,119 @@ mod tests { [[2 "test2"] [4 "test4"]]"#); } + + #[test] + fn test_excision_multiple() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002]} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let report = assert_transact!(conn, r#"[ + {:db/id "e" :db/excise [300 301]} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [?e :db/excise 300] + [?e :db/excise 301]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300, 301].into_iter().collect(), + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db/excise 301 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the target entity of the excision. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db/excise 301 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } } diff --git a/db/src/excision.rs b/db/src/excision.rs index 0a72ae9d1..a73f19fdb 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -22,7 +22,9 @@ use mentat_core::{ Entid, HasSchema, Schema, + SQLValueType, TypedValue, + ValueType, }; use entids; @@ -59,7 +61,7 @@ use types::{ /// transaction IDs in whatever way they see fit. #[derive(Clone, Debug, Default, Eq, Hash, PartialEq)] pub(crate) struct Excision { - pub(crate) target: Entid, + pub(crate) targets: BTreeSet, pub(crate) attrs: Option>, pub(crate) before_tx: Option, } @@ -93,32 +95,43 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & } } - let target = avs.get(&pair(entids::DB_EXCISE)?) - .and_then(|ars| ars.add.iter().next().cloned()) - .and_then(|v| v.into_entid()) - .ok_or_else(|| DbErrorKind::BadExcision("no :db/excise".into()))?; // TODO: more details. - - if schema.get_ident(target).is_some() { - bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); // TODO: more details. - } - - let partition = partition_map.partition_for_entid(target) - .ok_or_else(|| DbErrorKind::BadExcision("target has no partition".into()))?; // TODO: more details. - // Right now, Mentat only supports `:db.part/{db,user,tx}`, and tests hack in `:db.part/fake`. - if partition == ":db.part/db" || partition == ":db.part/tx" { - bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", partition).into())); // TODO: more details. - } - let before_tx = avs.get(&pair(entids::DB_EXCISE_BEFORE_T)?) - .and_then(|ars| ars.add.iter().next().cloned()) + .and_then(|ars| { + assert_eq!(ars.add.len(), 1, "witnessed more than one :db.excise/beforeT"); + assert!(ars.retract.is_empty(), "witnessed [:db/retract ... :db.excise/beforeT ...]"); + ars.add.iter().next().cloned() + }) .and_then(|v| v.into_entid()); let attrs = avs.get(&pair(entids::DB_EXCISE_ATTRS)?) .map(|ars| ars.add.clone().into_iter().filter_map(|v| v.into_entid()).collect()); + let targets = avs.get(&pair(entids::DB_EXCISE)?) + .map(|ars| { + assert!(ars.retract.is_empty(), "witnessed [:db/retract ... :db/excise ...]"); + assert!(!ars.add.is_empty(), "witnessed empty :db/excise target set"); + let targets: BTreeSet<_> = ars.add.clone().into_iter().filter_map(|v| v.into_entid()).collect(); + assert_eq!(targets.len(), ars.add.len(), "witnessed non-entid :db/excise target"); + targets + }) + .ok_or_else(|| DbErrorKind::BadExcision("no :db/excise".into()))?; // TODO: more details. + + for target in &targets { + if schema.get_ident(*target).is_some() { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); // TODO: more details. + } + + let partition = partition_map.partition_for_entid(*target) + .ok_or_else(|| DbErrorKind::BadExcision("target has no partition".into()))?; // TODO: more details. + // Right now, Mentat only supports `:db.part/{db,user,tx}`, and tests hack in `:db.part/fake`. + if partition == ":db.part/db" || partition == ":db.part/tx" { + bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", partition).into())); // TODO: more details. + } + } + let excision = Excision { - target, - attrs, + targets, + attrs: attrs.clone(), before_tx, }; @@ -133,24 +146,27 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & } fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> { + let targets = excision.targets.iter().join(", "); + match (excision.before_tx, &excision.attrs) { (Some(before_tx), Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.a IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", - excision.target, s, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.a IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", + targets, s, before_tx).as_ref(), &[])?; }, (Some(before_tx), None) => { - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", - excision.target, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", + targets, before_tx).as_ref(), &[])?; }, (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IS {} AND d.a IN ({})) DELETE FROM datoms WHERE rowid IN ids", - excision.target, s).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.a IN ({})) DELETE FROM datoms WHERE rowid IN ids", + targets, s).as_ref(), &[])?; }, (None, None) => { - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE (d.e IS {} OR (d.v IS {} AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", - excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; + // TODO: Use AVET index to speed this up? + conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE (d.e IN ({}) OR (d.v IN ({}) AND d.value_type_tag IS {} AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", + targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; }, } @@ -158,24 +174,26 @@ fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> } fn excise_transactions_before_tx(conn: &rusqlite::Connection, excision: &Excision, before_tx: Entid) -> Result<()> { + let targets = excision.targets.iter().join(", "); + match (excision.before_tx, &excision.attrs) { (Some(before_tx), Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.a IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", - excision.target, s, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", + targets, s, before_tx).as_ref(), &[])?; }, (Some(before_tx), None) => { - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", - excision.target, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", + targets, before_tx).as_ref(), &[])?; }, (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IS {} AND t.a IN ({})) DELETE FROM transactions WHERE rowid IN ids", - excision.target, s).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({})) DELETE FROM transactions WHERE rowid IN ids", + targets, s).as_ref(), &[])?; }, (None, None) => { - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.tx <= {} AND (t.e IS {} OR (t.v IS {} AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", - before_tx, excision.target, excision.target, entids::DB_EXCISE).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.tx <= {} AND (t.e IN ({}) OR (t.v IN ({}) AND t.value_type_tag IS {} AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", + before_tx, targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; }, } @@ -183,16 +201,21 @@ fn excise_transactions_before_tx(conn: &rusqlite::Connection, excision: &Excisio } pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { - let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?, ?)")?; - let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; + let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?)")?; + let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_targets VALUES (?, ?)")?; + let mut stmt3: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; for (entid, excision) in excisions { let status = excision.before_tx.unwrap_or(tx_id); - stmt1.execute(&[entid, &excision.target, &excision.before_tx, &status])?; + stmt1.execute(&[entid, &excision.before_tx, &status])?; + + for target in &excision.targets { + stmt2.execute(&[entid, target])?; + } if let Some(ref attrs) = excision.attrs { for attr in attrs { - stmt2.execute(&[entid, attr])?; + stmt3.execute(&[entid, attr])?; } } } @@ -207,16 +230,22 @@ pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Sc } fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result> { - let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, target, before_tx, status FROM excisions WHERE status > 0 ORDER BY e")?; - let mut stmt2: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; + let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, before_tx, status FROM excisions WHERE status > 0 ORDER BY e")?; + let mut stmt2: rusqlite::Statement = conn.prepare("SELECT target FROM excision_targets WHERE e IS ?")?; + let mut stmt3: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; let m: Result> = stmt1.query_and_then(&[], |row| { let e: Entid = row.get_checked(0)?; - let target: Entid = row.get_checked(1)?; - let before_tx: Option = row.get_checked(2)?; - let status: Entid = row.get_checked(3)?; + let before_tx: Option = row.get_checked(1)?; + let status: Entid = row.get_checked(2)?; + + let targets: Result> = stmt2.query_and_then(&[&e], |row| { + let target: Entid = row.get_checked(0)?; + Ok(target) + })?.collect(); + let targets = targets?; - let attrs: Result> = stmt2.query_and_then(&[&e], |row| { + let attrs: Result> = stmt3.query_and_then(&[&e], |row| { let a: Entid = row.get_checked(0)?; Ok(a) })?.collect(); @@ -229,7 +258,7 @@ fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionM })?; let excision = Excision { - target, + targets, before_tx, attrs, }; From 4c5693f52945ba5632f79ff772b59be548c86986 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Mon, 9 Jul 2018 16:35:42 -0700 Subject: [PATCH 11/18] Review comment: Delete dangling retractions. --- db/src/db.rs | 3 ++- db/src/excision.rs | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/db/src/db.rs b/db/src/db.rs index 86ba2da93..14fff507c 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -3256,7 +3256,8 @@ mod tests { [?tx1 :db/txInstant ?ms ?tx1 true]] [[301 :test/ref 300 ?tx2 true] [?tx2 :db/txInstant ?ms2 ?tx2 true]] - [[300 :test/one 1000 ?tx3 false] ; XXX Is this right? + [; Observe that the "dangling retraction" commented out immediately below is not present! + ; [300 :test/one 1000 ?tx3 false] [300 :test/one 1001 ?tx3 true] [300 :test/many 2002 ?tx3 true] [300 :test/many 2003 ?tx3 true] diff --git a/db/src/excision.rs b/db/src/excision.rs index a73f19fdb..d72c5b4ec 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -281,6 +281,8 @@ pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition excise_transactions_before_tx(conn, &excision, *status)?; } + delete_dangling_retractions(conn)?; + conn.execute("UPDATE excisions SET status = 0", &[])?; // TODO: only vacuum fulltext if an excision (likely) impacted fulltext values, since this is @@ -318,3 +320,38 @@ pub(crate) fn vacuum_fulltext_table(conn: &rusqlite::Connection) -> Result<()> { Ok(()) } + +/// Delete dangling retractions from the transaction log. +/// +/// Suppose that `E` is a fixed entid and that the following transactions are transacted: +/// ```edn +/// [[:db/add E :db/doc "first"]] +/// [[:db/retract E :db/doc "first"]] +/// [[:db/add E :db/doc "second"]] +/// ``` +/// +/// If we excise just the first datom -- the datom corresponding to `[:db/add E :db/doc "first"]` -- +/// then there will be a "dangling retraction" in the log, which will look like: +/// ```edn +/// [[E :db/doc "first" TX1 false]] +/// [[E :db/doc "second" TX2 true]] +/// ``` +/// +/// This function purges such dangling retractions, so that a datom is always asserted before it is +/// retracted. +pub(crate) fn delete_dangling_retractions(conn: &rusqlite::Connection) -> Result<()> { + // We walk the transactions table. For each `[e a v]`, we find all of the log entries + // corresponding to the first transaction that it appeared in. We delete any entries that are + // retractions; it is not possible to retract an `[e a v]` not asserted in a prior transaction. + conn.execute(r#"WITH ids AS + (SELECT rowid FROM + (SELECT MIN(tx), added, rowid + FROM transactions + GROUP BY e, a, v, value_type_tag) + WHERE added = 0) + DELETE FROM transactions + WHERE rowid IN ids"#, + &[])?; + + Ok(()) +} From ba3be43a675f31c6d59b5e707725a536716eb52b Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 10 Jul 2018 11:04:24 -0700 Subject: [PATCH 12/18] Review comment: Use `Partition.allow_excision` rather than hard-coded partitions. --- db/src/db.rs | 12 ++++++------ db/src/excision.rs | 11 +++++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 14fff507c..727b1eb1c 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1154,8 +1154,8 @@ impl PartitionMap { self.values().any(|partition| partition.contains_entid(entid)) } - pub(crate) fn partition_for_entid(&self, entid: Entid) -> Option<&str> { - self.iter().find(|(_name, partition)| partition.contains_entid(entid)).map(|x| x.0.as_ref()) + pub(crate) fn partition_for_entid(&self, entid: Entid) -> Option<(&str, &Partition)> { + self.iter().find(|(_name, partition)| partition.contains_entid(entid)).map(|p| (p.0.as_ref(), p.1)) } } @@ -3174,7 +3174,7 @@ mod tests { [300 :test/many 2003] [301 :test/ref 300]]"#); - let tempid_report = assert_transact!(conn, format!(r#"[ + let tempid_report = assert_transact!(conn, &format!(r#"[ [:db/add "e" :db/excise 300] [:db/add "e" :db.excise/beforeT {}] ]"#, report.tx_id)); @@ -3183,7 +3183,7 @@ mod tests { "{\"e\" 65536}"); // After. - assert_matches!(conn.datoms(), format!(r#" + assert_matches!(conn.datoms(), &format!(r#" [[200 :db/ident :test/one] [200 :db/valueType :db.type/long] [200 :db/cardinality :db.cardinality/one] @@ -3210,7 +3210,7 @@ mod tests { })).collect()); // Before processing the pending excision, we have full transactions in the transaction log. - assert_matches!(conn.transactions(), format!(r#" + assert_matches!(conn.transactions(), &format!(r#" [[[200 :db/ident :test/one ?tx1 true] [200 :db/valueType :db.type/long ?tx1 true] [200 :db/cardinality :db.cardinality/one ?tx1 true] @@ -3243,7 +3243,7 @@ mod tests { // After processing the pending excision, we have rewritten transactions in the transaction // log to not refer to the targeted attributes of the target entity. - assert_matches!(conn.transactions(), format!(r#" + assert_matches!(conn.transactions(), &format!(r#" [[[200 :db/ident :test/one ?tx1 true] [200 :db/valueType :db.type/long ?tx1 true] [200 :db/cardinality :db.cardinality/one ?tx1 true] diff --git a/db/src/excision.rs b/db/src/excision.rs index d72c5b4ec..f2e6d9d47 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -27,6 +27,10 @@ use mentat_core::{ ValueType, }; +use db::{ + TypedSQLValue, +}; + use entids; use errors::{ @@ -121,11 +125,10 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); // TODO: more details. } - let partition = partition_map.partition_for_entid(*target) + let (name, partition) = partition_map.partition_for_entid(*target) .ok_or_else(|| DbErrorKind::BadExcision("target has no partition".into()))?; // TODO: more details. - // Right now, Mentat only supports `:db.part/{db,user,tx}`, and tests hack in `:db.part/fake`. - if partition == ":db.part/db" || partition == ":db.part/tx" { - bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", partition).into())); // TODO: more details. + if !partition.allow_excision { + bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", name).into())); // TODO: more details. } } From d8ded2a7ffb075ed7c772e082679ab42cf2afafd Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 10 Jul 2018 11:33:00 -0700 Subject: [PATCH 13/18] Review comment: Cull inaccurate comment. --- db/src/db.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 727b1eb1c..15a81ff36 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -62,11 +62,6 @@ use errors::{ DbErrorKind, Result, }; -// use excision::{ -// excisions, -// Excision, -// ExcisionMap, -// }; use internal_types::{ AddAndRetract, AEVTrie, @@ -260,10 +255,7 @@ lazy_static! { r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, // Excisions are transacted as data, so they end up in the transactions table; this - // materializes that view. Excisions are never removed from the transactions (or datoms) - // table so it's not required to include the `added` column, but let's do it so that if we - // grow a more uniform approach to materialized views of the transactions table, this is - // close to the shape of that table. + // materializes that view. // // `status` tracks whether an excision has been applied (0) or is pending (> 0). r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, before_tx INTEGER, status INTEGER NOT NULL)"#, From a646245359922eeab0c6b294aca93268c1a7d4ec Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 10 Jul 2018 13:18:51 -0700 Subject: [PATCH 14/18] Review comment: replace `status` with `last_tx_needing_rewrite`. --- db/src/db.rs | 13 ++++++++-- db/src/excision.rs | 60 +++++++++++++++++++++++++--------------------- db/src/tx.rs | 2 +- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 15a81ff36..f90048c79 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -257,8 +257,17 @@ lazy_static! { // Excisions are transacted as data, so they end up in the transactions table; this // materializes that view. // - // `status` tracks whether an excision has been applied (0) or is pending (> 0). - r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, before_tx INTEGER, status INTEGER NOT NULL)"#, + // In practice, the transactions table will be rewritten after excision incrementally. The + // interval `(0, last_tx_needing_rewrite]` (which may be empty) tracks the status of + // incremental rewriting. Initially, `last_tx_needing_rewrite` is set to `before_tx - 1`, + // which itself defaults to `(tx-transaction) - 1` if `:db.excise/beforeT` is not specified. + // Each incremental step fixes a length N and rewrites the transactions with IDs in the + // interval `(last_tx_needing_rewrite - N, last_tx_needing_rewrite]`. Then + // `last_tx_needing_rewrite` is decremented by the length of the interval N. + // + // It follows that `last_tx_needing_rewrite` also tracks the whether an excision has + // been totally applied (0) or has transaction rewriting still pending (> 0). + r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, before_tx INTEGER, last_tx_needing_rewrite INTEGER NOT NULL)"#, r#"CREATE TABLE excision_targets (e INTEGER NOT NULL, target INTEGER NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, r#"CREATE TABLE excision_attrs (e INTEGER NOT NULL, a SMALLINT NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, ] diff --git a/db/src/excision.rs b/db/src/excision.rs index f2e6d9d47..3ac1a1fa8 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -52,12 +52,10 @@ use types::{ }; /// Details about an excision: -/// - a target to excise (for now, an entid); +/// - targets to excise (for now, a non-empty set of entids); /// - a possibly empty set of attributes to excise (the empty set means all attributes, not no /// attributes); -/// - and a possibly omitted transaction ID to limit the excision before. (TODO: check whether -/// Datomic excises the last retraction before the first remaining assertion, and make our -/// behaviour agree.) +/// - and a possibly omitted transaction ID to limit the excision before. /// /// `:db/before` doesn't make sense globally, since in Mentat, monotonically increasing /// transaction IDs don't guarantee monotonically increasing txInstant values. Therefore, we @@ -70,7 +68,7 @@ pub(crate) struct Excision { pub(crate) before_tx: Option, } -/// Map from `entid` to excision details. `entid` is not the excision `target`! +/// Map from `entid` to excision details. `entid` is not one of the excision `targets`! pub(crate) type ExcisionMap = BTreeMap; /// Extract excisions from the given transacted datoms. @@ -176,41 +174,49 @@ fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> Ok(()) } -fn excise_transactions_before_tx(conn: &rusqlite::Connection, excision: &Excision, before_tx: Entid) -> Result<()> { +/// Given an `excision`, rewrite transactions with IDs in the interval `(first_tx_needing_rewrite, last_tx_needing_rewrite]`. +fn excise_transactions_in_range(conn: &rusqlite::Connection, excision: &Excision, first_tx_needing_rewrite: Entid, last_tx_needing_rewrite: Entid) -> Result<()> { let targets = excision.targets.iter().join(", "); + // TODO: intersect the ranges ourselves to statically save SQLite doing some work. + let tx_where = format!("({} < t.tx AND t.tx <= {})", first_tx_needing_rewrite, last_tx_needing_rewrite); + match (excision.before_tx, &excision.attrs) { (Some(before_tx), Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", - targets, s, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND t.tx <= {} AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, s, before_tx, tx_where).as_ref(), &[])?; }, (Some(before_tx), None) => { - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.tx <= {}) DELETE FROM transactions WHERE rowid IN ids", - targets, before_tx).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.tx <= {} AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, before_tx, tx_where).as_ref(), &[])?; }, (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({})) DELETE FROM transactions WHERE rowid IN ids", - targets, s).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, s, tx_where).as_ref(), &[])?; }, (None, None) => { - conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.tx <= {} AND (t.e IN ({}) OR (t.v IN ({}) AND t.value_type_tag IS {} AND t.a IS NOT {}))) DELETE FROM transactions WHERE rowid IN ids", - before_tx, targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE (t.e IN ({}) OR (t.v IN ({}) AND t.value_type_tag IS {} AND t.a IS NOT {})) AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE, tx_where).as_ref(), &[])?; }, } Ok(()) } -pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { +/// Record the given `excisions` as applying to the transaction with ID `tx_id`. +/// +/// This also starts processing the excision: right now, that means that the `datoms` table is +/// updated in place synchronously. +pub(crate) fn begin_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?)")?; let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_targets VALUES (?, ?)")?; let mut stmt3: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; for (entid, excision) in excisions { - let status = excision.before_tx.unwrap_or(tx_id); - stmt1.execute(&[entid, &excision.before_tx, &status])?; + let last_tx_needing_rewrite = excision.before_tx.unwrap_or(tx_id); + stmt1.execute(&[entid, &excision.before_tx, &last_tx_needing_rewrite])?; for target in &excision.targets { stmt2.execute(&[entid, target])?; @@ -223,8 +229,8 @@ pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Sc } } - // Might as well not interleave writes to "excisions" and "excision_attrs" with writes to - // "datoms". This also leaves open the door for a more efficient bulk operation. + // Might as well not interleave writes to "excisions{_attrs,_targets}" with writes to "datoms". + // This also leaves open the door for a more efficient bulk operation. for (_entid, excision) in excisions { excise_datoms(conn, &excision)?; } @@ -233,14 +239,14 @@ pub(crate) fn enqueue_pending_excisions(conn: &rusqlite::Connection, schema: &Sc } fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result> { - let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, before_tx, status FROM excisions WHERE status > 0 ORDER BY e")?; + let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, before_tx, last_tx_needing_rewrite FROM excisions WHERE last_tx_needing_rewrite > 0 ORDER BY e")?; let mut stmt2: rusqlite::Statement = conn.prepare("SELECT target FROM excision_targets WHERE e IS ?")?; let mut stmt3: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; let m: Result> = stmt1.query_and_then(&[], |row| { let e: Entid = row.get_checked(0)?; let before_tx: Option = row.get_checked(1)?; - let status: Entid = row.get_checked(2)?; + let last_tx_needing_rewrite: Entid = row.get_checked(2)?; let targets: Result> = stmt2.query_and_then(&[&e], |row| { let target: Entid = row.get_checked(0)?; @@ -266,7 +272,7 @@ fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionM attrs, }; - Ok((e, excision, status)) + Ok((e, excision, last_tx_needing_rewrite)) })?.collect(); m @@ -274,25 +280,25 @@ fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionM pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { let list = pending_excision_list(conn, partition_map, schema)?; - Ok(list.into_iter().map(|(entity, excision, _status)| (entity, excision)).collect()) + Ok(list.into_iter().map(|(entity, excision, _last_tx_needing_rewrite)| (entity, excision)).collect()) } pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { let list = pending_excision_list(conn, partition_map, schema)?; - for (_entid, excision, status) in &list { - excise_transactions_before_tx(conn, &excision, *status)?; + for (_entid, excision, last_tx_needing_rewrite) in &list { + excise_transactions_in_range(conn, &excision, 0, *last_tx_needing_rewrite)?; } delete_dangling_retractions(conn)?; - conn.execute("UPDATE excisions SET status = 0", &[])?; + conn.execute("UPDATE excisions SET last_tx_needing_rewrite = 0", &[])?; // TODO: only vacuum fulltext if an excision (likely) impacted fulltext values, since this is // very expensive. As always, correctness first, performance second. vacuum_fulltext_table(conn)?; - Ok(list.into_iter().map(|(entity, excision, _status)| (entity, excision)).collect()) + Ok(list.into_iter().map(|(entity, excision, _last_tx_needing_rewrite)| (entity, excision)).collect()) } diff --git a/db/src/tx.rs b/db/src/tx.rs index 4afa69bbe..5c18c9f25 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -817,7 +817,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } if let Some(excisions) = excisions { - excision::enqueue_pending_excisions(self.store, self.schema, self.tx_id, &excisions)?; + excision::begin_excisions(self.store, self.schema, self.tx_id, &excisions)?; } Ok(TxReport { From e29a98a4ff04b453a581325211b06c5cd569b2f4 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 10 Jul 2018 13:24:49 -0700 Subject: [PATCH 15/18] Part 7: VACUUM the database after ensuring there are no pending excisions. This is extraordinarily expensive, but it's also the only way to ensure that excised data doesn't remain in the SQLite database files after excision. We might want to make this a flag on the connection: an encrypted database file (i.e., one using SQLCipher) might not need vacuuming for some use cases. --- db/src/excision.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/db/src/excision.rs b/db/src/excision.rs index 3ac1a1fa8..1528c984d 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -298,6 +298,10 @@ pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition // very expensive. As always, correctness first, performance second. vacuum_fulltext_table(conn)?; + // This is very (!) expensive, but if we really want to ensure that excised data doesn't remain + // on the filesystem, it's necessary. + conn.execute("VACUUM", &[])?; + Ok(list.into_iter().map(|(entity, excision, _last_tx_needing_rewrite)| (entity, excision)).collect()) } From 94082bd8db7a1a7e7def303df70a649f5b97448c Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 11 Jul 2018 13:44:15 -0700 Subject: [PATCH 16/18] Part 8: Inform transaction watchers of excised datoms. This is awkward and expensive but an expedient way to support `Store` instances with active caches. There are other ways to update active caches: we could drop all caches entirely and repopulate them, or we could try to associate caches to impacted entities. All such things are follow-up work as we solidify our approach to cache invalidation and try to improve performance. --- db/src/db.rs | 52 ++++++++++++++++++++++++++--- db/src/excision.rs | 81 ++++++++++++++++++++++++++++++++++++---------- db/src/tx.rs | 19 ++++++----- 3 files changed, 122 insertions(+), 30 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index f90048c79..5e5833632 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -2924,12 +2924,21 @@ mod tests { [301 :test/one 1001] [301 :test/ref 300]]"#); - let report = assert_transact!(conn, r#"[ + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ {:db/id "e" :db/excise 300} ]"#); // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&report), "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [301 :test/ref 300 ?tx false] + [65536 :db/excise 300 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); // After. assert_matches!(conn.datoms(), r#" @@ -3041,12 +3050,22 @@ mod tests { [301 :test/one 1001] [301 :test/ref 300]]"#); - let report = assert_transact!(conn, r#"[ + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ {:db/id "e" :db/excise 300 :db.excise/attrs [:test/one :test/many]} ]"#); // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&report), "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db.excise/attrs :test/one ?tx true] + [65536 :db.excise/attrs :test/many ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); // After. assert_matches!(conn.datoms(), r#" @@ -3175,13 +3194,20 @@ mod tests { [300 :test/many 2003] [301 :test/ref 300]]"#); - let tempid_report = assert_transact!(conn, &format!(r#"[ + let (tempid_report, witnessed) = assert_transact_witnessed!(conn, &format!(r#"[ [:db/add "e" :db/excise 300] [:db/add "e" :db.excise/beforeT {}] ]"#, report.tx_id)); // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&tempid_report), "{\"e\" 65536}"); + assert_matches!(witnessed, format!(r#" + [[300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db.excise/beforeT {} ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#, report.tx_id)); // After. assert_matches!(conn.datoms(), &format!(r#" @@ -3310,12 +3336,17 @@ mod tests { [300 :test/fulltext 3] [301 :test/fulltext 4]]"#); - let tempid_report = assert_transact!(conn, r#"[ + let (tempid_report, witnessed) = assert_transact_witnessed!(conn, r#"[ [:db/add "e" :db/excise 300] ]"#); // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&tempid_report), "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/fulltext "test3" ?tx false] + [65536 :db/excise 300 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); // After. assert_matches!(conn.datoms(), r#" @@ -3421,12 +3452,23 @@ mod tests { [301 :test/one 1001] [301 :test/ref 300]]"#); - let report = assert_transact!(conn, r#"[ + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ {:db/id "e" :db/excise [300 301]} ]"#); // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&report), "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [301 :test/one 1001 ?tx false] + [301 :test/ref 300 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db/excise 301 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); // After. assert_matches!(conn.datoms(), r#" diff --git a/db/src/excision.rs b/db/src/excision.rs index 1528c984d..e23d02def 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -17,6 +17,10 @@ use itertools::Itertools; use rusqlite; +use edn::entities::{ + OpType, +}; + use mentat_core::{ Attribute, Entid, @@ -51,6 +55,10 @@ use types::{ PartitionMap, }; +use watcher::{ + TransactWatcher, +}; + /// Details about an excision: /// - targets to excise (for now, a non-empty set of entids); /// - a possibly empty set of attributes to excise (the empty set means all attributes, not no @@ -146,31 +154,66 @@ pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: & } } -fn excise_datoms(conn: &rusqlite::Connection, excision: &Excision) -> Result<()> { +fn excise_datoms_for_excision(conn: &rusqlite::Connection, watcher: &mut W, entid: Entid, excision: &Excision) -> Result<()> +where W: TransactWatcher { let targets = excision.targets.iter().join(", "); + // Each branch below collects rowids and datoms to excise: datoms for reporting to the given + // watcher and rowids for deleting. match (excision.before_tx, &excision.attrs) { (Some(before_tx), Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.a IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", - targets, s, before_tx).as_ref(), &[])?; + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND a IN ({}) AND tx <= {}", + entid, targets, s, before_tx).as_ref(), &[])?; }, (Some(before_tx), None) => { - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.tx <= {}) DELETE FROM datoms WHERE rowid IN ids", - targets, before_tx).as_ref(), &[])?; + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND tx <= {}", + entid, targets, before_tx).as_ref(), &[])?; }, (None, Some(ref attrs)) => { let s = attrs.iter().join(", "); - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE d.e IN ({}) AND d.a IN ({})) DELETE FROM datoms WHERE rowid IN ids", - targets, s).as_ref(), &[])?; + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND a IN ({})", + entid, targets, s).as_ref(), &[])?; }, (None, None) => { // TODO: Use AVET index to speed this up? - conn.execute(format!("WITH ids AS (SELECT d.rowid FROM datoms AS d WHERE (d.e IN ({}) OR (d.v IN ({}) AND d.value_type_tag IS {} AND d.a IS NOT {}))) DELETE FROM datoms WHERE rowid IN ids", - targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE (e IN ({}) OR (v IN ({}) AND value_type_tag IS {} AND a IS NOT {}))", + entid, targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; }, } + // Report all datoms (fulltext and non-fulltext) to the watcher. This is functionally + // equivalent to the `all_datoms` view, but that view doesn't pass through rowid, which is + // required for deleting. + let s = r#" + SELECT e, a, v, value_type_tag + FROM temp.excision_{} + WHERE index_fulltext IS 0 + + UNION ALL + + SELECT e, a, fulltext_values.text AS v, value_type_tag + FROM temp.excision_{}, fulltext_values + WHERE index_fulltext IS NOT 0 AND temp.excision_{}.v = fulltext_values.rowid + "#; + let mut stmt = conn.prepare(format!(s, entid, entid, entid).as_ref())?; + + let mut rows = stmt.query(&[])?; + + while let Some(row) = rows.next() { + let row = row?; + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; + let value_type_tag: i32 = row.get_checked(3)?; + let v = TypedValue::from_sql_value_pair(row.get_checked(2)?, value_type_tag)?; + + watcher.datom(OpType::Retract, e, a, &v); + } + + conn.execute(format!("WITH rowids AS (SELECT rowid FROM temp.excision_{}) DELETE FROM datoms WHERE rowid IN rowids", entid).as_ref(), &[])?; + + conn.execute(format!("DROP TABLE temp.excision_{}", entid).as_ref(), &[])?; + Ok(()) } @@ -206,10 +249,7 @@ fn excise_transactions_in_range(conn: &rusqlite::Connection, excision: &Excision } /// Record the given `excisions` as applying to the transaction with ID `tx_id`. -/// -/// This also starts processing the excision: right now, that means that the `datoms` table is -/// updated in place synchronously. -pub(crate) fn begin_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { +pub(crate) fn enqueue_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?)")?; let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_targets VALUES (?, ?)")?; let mut stmt3: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; @@ -229,10 +269,17 @@ pub(crate) fn begin_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_i } } - // Might as well not interleave writes to "excisions{_attrs,_targets}" with writes to "datoms". - // This also leaves open the door for a more efficient bulk operation. - for (_entid, excision) in excisions { - excise_datoms(conn, &excision)?; + Ok(()) +} + +/// Start processing the given `excisions`: update the `datoms` table in place, synchronously. +/// +/// Purged datoms are reported to the given `watcher`. +pub(crate) fn excise_datoms_for_excisions(conn: &rusqlite::Connection, watcher: &mut W, excisions: &ExcisionMap) -> Result<()> +where W: TransactWatcher { + // TODO: do this more efficiently. + for (entid, excision) in excisions { + excise_datoms_for_excision(conn, watcher, *entid, &excision)?; } Ok(()) diff --git a/db/src/tx.rs b/db/src/tx.rs index 5c18c9f25..d6ba89ae1 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -791,6 +791,17 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { self.store.commit_transaction(self.tx_id)?; db::update_partition_map(self.store, &self.partition_map)?; + + if let Some(excisions) = excisions { + if tx_might_update_metadata { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); + } + + excision::enqueue_excisions(self.store, self.schema, self.tx_id, &excisions)?; + + excision::excise_datoms_for_excisions(self.store, &mut self.watcher, &excisions)?; + } + self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata { @@ -806,20 +817,12 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // regular transactor code paths, updating the schema and materialized views uniformly. // But, belt-and-braces: handle it gracefully. if new_schema != *self.schema_for_mutation { - if excisions.is_some() { - bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); - } - let old_schema = (*self.schema_for_mutation).clone(); // Clone the original Schema for comparison. *self.schema_for_mutation.to_mut() = new_schema; // Store the new Schema. db::update_metadata(self.store, &old_schema, &*self.schema_for_mutation, &metadata_report)?; } } - if let Some(excisions) = excisions { - excision::begin_excisions(self.store, self.schema, self.tx_id, &excisions)?; - } - Ok(TxReport { tx_id: self.tx_id, tx_instant, From 8b9623ea61bf7eb60b3918e0f14688ca29bf2f61 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 18 Jul 2018 11:53:13 -0700 Subject: [PATCH 17/18] Fixes. --- db/src/db.rs | 2 +- db/src/excision.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 5e5833632..8affc0c86 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -3201,7 +3201,7 @@ mod tests { // This is implementation specific, but it should be deterministic. assert_matches!(tempids(&tempid_report), "{\"e\" 65536}"); - assert_matches!(witnessed, format!(r#" + assert_matches!(witnessed, &format!(r#" [[300 :test/many 2000 ?tx false] [300 :test/many 2001 ?tx false] [65536 :db/excise 300 ?tx true] diff --git a/db/src/excision.rs b/db/src/excision.rs index e23d02def..adbf1ea0c 100644 --- a/db/src/excision.rs +++ b/db/src/excision.rs @@ -185,7 +185,7 @@ where W: TransactWatcher { // Report all datoms (fulltext and non-fulltext) to the watcher. This is functionally // equivalent to the `all_datoms` view, but that view doesn't pass through rowid, which is // required for deleting. - let s = r#" + let s = format!(r#" SELECT e, a, v, value_type_tag FROM temp.excision_{} WHERE index_fulltext IS 0 @@ -195,8 +195,8 @@ where W: TransactWatcher { SELECT e, a, fulltext_values.text AS v, value_type_tag FROM temp.excision_{}, fulltext_values WHERE index_fulltext IS NOT 0 AND temp.excision_{}.v = fulltext_values.rowid - "#; - let mut stmt = conn.prepare(format!(s, entid, entid, entid).as_ref())?; + "#, entid, entid, entid); + let mut stmt = conn.prepare(s.as_ref())?; let mut rows = stmt.query(&[])?; From 9db61e346b6ffa3a8d5a1d9906199aa3336baaf1 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 18 Jul 2018 11:55:25 -0700 Subject: [PATCH 18/18] [build] List all crates in the workspace. --- Cargo.toml | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f408a849d..275f1915d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,21 @@ sqlcipher = ["rusqlite/sqlcipher", "mentat_db/sqlcipher"] syncable = ["mentat_tolstoy", "mentat_db/syncable"] [workspace] -members = ["tools/cli", "ffi"] +members = [ + "tools/cli", + "ffi", + "core", + "db", + "edn", + "query", + "query-algebrizer", + "query-projector", + "query-pull", + "query-sql", + "query-translator", + "sql", + "tolstoy", +] [build-dependencies] rustc_version = "0.2"