diff --git a/Cargo.toml b/Cargo.toml index f408a849d..275f1915d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,21 @@ sqlcipher = ["rusqlite/sqlcipher", "mentat_db/sqlcipher"] syncable = ["mentat_tolstoy", "mentat_db/syncable"] [workspace] -members = ["tools/cli", "ffi"] +members = [ + "tools/cli", + "ffi", + "core", + "db", + "edn", + "query", + "query-algebrizer", + "query-projector", + "query-pull", + "query-sql", + "query-translator", + "sql", + "tolstoy", +] [build-dependencies] rustc_version = "0.2" diff --git a/db/src/bootstrap.rs b/db/src/bootstrap.rs index 4584284d5..7dc9be646 100644 --- a/db/src/bootstrap.rs +++ b/db/src/bootstrap.rs @@ -92,11 +92,15 @@ lazy_static! { ] }; - static ref V1_CORE_SCHEMA: [(symbols::Keyword); 16] = { + static ref V1_CORE_SCHEMA: [(symbols::Keyword); 20] = { [(ns_keyword!("db", "ident")), (ns_keyword!("db.install", "partition")), (ns_keyword!("db.install", "valueType")), (ns_keyword!("db.install", "attribute")), + (ns_keyword!("db", "excise")), + (ns_keyword!("db.excise", "attrs")), + (ns_keyword!("db.excise", "beforeT")), + (ns_keyword!("db.excise", "before")), (ns_keyword!("db", "txInstant")), (ns_keyword!("db", "valueType")), (ns_keyword!("db", "cardinality")), @@ -124,6 +128,14 @@ lazy_static! { :db/cardinality :db.cardinality/many} :db.install/attribute {:db/valueType :db.type/ref :db/cardinality :db.cardinality/many} + :db/excise {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many} + :db.excise/attrs {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many} + :db.excise/beforeT {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/one} + :db.excise/before {:db/valueType :db.type/instant + :db/cardinality :db.cardinality/one} ;; TODO: support user-specified functions in the future. ;; :db.install/function {:db/valueType :db.type/ref ;; :db/cardinality :db.cardinality/many} diff --git a/db/src/db.rs b/db/src/db.rs index 93094eba8..8affc0c86 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -14,7 +14,10 @@ use failure::{ ResultExt, }; -use std::collections::HashMap; +use std::collections::{ + BTreeMap, + HashMap, +}; use std::collections::hash_map::{ Entry, }; @@ -59,6 +62,10 @@ use errors::{ DbErrorKind, Result, }; +use internal_types::{ + AddAndRetract, + AEVTrie, +}; use metadata; use schema::{ SchemaBuilding, @@ -246,6 +253,23 @@ lazy_static! { r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#, // TODO: store entid instead of ident for partition name. r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, + + // Excisions are transacted as data, so they end up in the transactions table; this + // materializes that view. + // + // In practice, the transactions table will be rewritten after excision incrementally. The + // interval `(0, last_tx_needing_rewrite]` (which may be empty) tracks the status of + // incremental rewriting. Initially, `last_tx_needing_rewrite` is set to `before_tx - 1`, + // which itself defaults to `(tx-transaction) - 1` if `:db.excise/beforeT` is not specified. + // Each incremental step fixes a length N and rewrites the transactions with IDs in the + // interval `(last_tx_needing_rewrite - N, last_tx_needing_rewrite]`. Then + // `last_tx_needing_rewrite` is decremented by the length of the interval N. + // + // It follows that `last_tx_needing_rewrite` also tracks the whether an excision has + // been totally applied (0) or has transaction rewriting still pending (> 0). + r#"CREATE TABLE excisions (e INTEGER NOT NULL UNIQUE, before_tx INTEGER, last_tx_needing_rewrite INTEGER NOT NULL)"#, + r#"CREATE TABLE excision_targets (e INTEGER NOT NULL, target INTEGER NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, + r#"CREATE TABLE excision_attrs (e INTEGER NOT NULL, a SMALLINT NOT NULL, FOREIGN KEY (e) REFERENCES excisions(e))"#, ] }; } @@ -437,6 +461,39 @@ pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) - m } +/// Read an arbitrary [e a v value_type_tag tx added] materialized view from the given table in the +/// SQL store, returing an AEV trie. +pub(crate) fn read_materialized_transaction_aev_trie<'schema>(conn: &rusqlite::Connection, schema: &'schema Schema, table: &str) -> Result> { + let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag, tx, added FROM {}", table).as_str())?; + let m: Result> = stmt.query_and_then(&[], |row| { + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; + let v: rusqlite::types::Value = row.get_checked(2)?; + let value_type_tag: i32 = row.get_checked(3)?; + let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; + let tx: Entid = row.get_checked(4)?; + let added: bool = row.get_checked(5)?; + Ok((e, a, typed_value, tx, added)) + })?.collect(); + + let mut trie = AEVTrie::default(); + + for (e, a, v, _, added) in m? { + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + + let a_and_r = trie + .entry((a, attribute)).or_insert(BTreeMap::default()) + .entry(e).or_insert(AddAndRetract::default()); + + match added { + true => a_and_r.add.insert(v), + false => a_and_r.retract.insert(v), + }; + } + + Ok(trie) +} + /// Read the partition map materialized view from the given SQL store. fn read_partition_map(conn: &rusqlite::Connection) -> Result { let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, end, idx, allow_excision FROM parts")?; @@ -494,7 +551,7 @@ pub enum SearchType { /// Right now, the only implementation of `MentatStoring` is the SQLite-specific SQL schema. In the /// future, we might consider other SQL engines (perhaps with different fulltext indexing), or /// entirely different data stores, say ones shaped like key-value stores. -pub trait MentatStoring { +pub(crate) trait MentatStoring { /// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples. /// /// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one @@ -1097,18 +1154,24 @@ impl PartitionMap { pub(crate) fn contains_entid(&self, entid: Entid) -> bool { self.values().any(|partition| partition.contains_entid(entid)) } + + pub(crate) fn partition_for_entid(&self, entid: Entid) -> Option<(&str, &Partition)> { + self.iter().find(|(_name, partition)| partition.contains_entid(entid)).map(|p| (p.0.as_ref(), p.1)) + } } #[cfg(test)] mod tests { extern crate env_logger; - use std::borrow::{ - Borrow, - }; - use super::*; - use debug::{TestConn,tempids}; + use debug::{ + self, + TestConn, + tempids, + }; + use errors; + use excision; use edn::{ self, InternSet, @@ -1126,7 +1189,6 @@ mod tests { use std::collections::{ BTreeMap, }; - use errors; use internal_types::{ Term, }; @@ -2613,4 +2675,868 @@ mod tests { // Run a basic test as a sanity check. run_test_add(TestConn::with_sqlite(sqlite)); } + + #[test] + fn test_transaction_watcher() { + let mut conn = TestConn::default(); + + // Insert a few :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 1] + [:db/add 101 :db.schema/version 2]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 1 ?tx true] + [101 :db.schema/version 2 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 1] + [101 :db.schema/version 2]]"); + assert_matches!(witnessed, + "[[100 :db.schema/version 1 ?tx true] + [101 :db.schema/version 2 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // And a few :db.cardinality/many elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 200 :db.schema/attribute 100] + [:db/add 200 :db.schema/attribute 101]] + "#); + assert_matches!(conn.last_transaction(), + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 1] + [101 :db.schema/version 2] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + + // Test replacing existing :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 11] + [:db/add 101 :db.schema/version 22]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 1 ?tx false] + [100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 2 ?tx false] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers do not "witness" all datoms that are implied by entities + // transacted. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that asserting existing :db.cardinality/one elements doesn't change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 100 :db.schema/version 11] + [:db/add 101 :db.schema/version 22]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx true] + [101 :db.schema/version 22 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that asserting existing :db.cardinality/many elements doesn't change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/add 200 :db.schema/attribute 100] + [:db/add 200 :db.schema/attribute 101]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[100 :db.schema/version 11] + [101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx true] + [200 :db.schema/attribute 101 ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that we can retract :db.cardinality/one elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 100 :db.schema/version 11]] + "#); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 11 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 100] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Test that we can retract :db.cardinality/many elements. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 200 :db.schema/attribute 100]] + "#); + assert_matches!(conn.last_transaction(), + "[[200 :db.schema/attribute 100 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 101]]"); + assert_matches!(witnessed, + "[[200 :db.schema/attribute 100 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Verify that retracting :db.cardinality/{one,many} elements that are not present doesn't + // change the store. + let (_, witnessed) = assert_transact_witnessed!(conn, r#" + [[:db/retract 100 :db.schema/version 11] + [:db/retract 200 :db.schema/attribute 100]] + "#); + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + assert_matches!(conn.datoms(), + "[[101 :db.schema/version 22] + [200 :db.schema/attribute 101]]"); + // Right now, transaction watchers "witness" datoms that don't actually change the store. + // That is, transaction watchers are cheap to implement, not maximally useful. + assert_matches!(witnessed, + "[[100 :db.schema/version 11 ?tx false] ; Not actually applied! + [200 :db.schema/attribute 100 ?tx false] ; Not actually applied! + [?tx :db/txInstant ?ms ?tx true]]"); + } + + fn test_excision_bad_excisions() { + let mut conn = TestConn::default(); + + // Can't specify `:db.excise/before` at all. + assert_transact!(conn, r#"[ + {:db.excise/before #inst "2016-06-06T00:00:00.000Z"} + ]"#, + Err("bad excision: :db.excise/before")); + + // Must specify `:db/excise`. + assert_transact!(conn, r#"[ + {:db.excise/attrs [:db/ident :db/doc]} + ]"#, + Err("bad excision: no :db/excise")); + + assert_transact!(conn, r#"[ + {:db.excise/beforeT (transaction-tx)} + ]"#, + Err("bad excision: no :db/excise")); + + // Can't retract anything to do with excision. + assert_transact!(conn, r#"[ + [:db/retract 100 :db/excise 101] + ]"#, + Err("bad excision: retraction")); + + assert_transact!(conn, r#"[ + [:db/retract 100 :db.excise/beforeT (transaction-tx)] + ]"#, + Err("bad excision: retraction")); + + assert_transact!(conn, r#"[ + [:db/retract 100 :db.excise/attrs :db/ident] + ]"#, + Err("bad excision: retraction")); + + // Can't mutate the schema. This isn't completely implemented yet; right now, Mentat will + // prevent a consumer excising an attribute entity, but not a datom describing an attribute + // of an attribute. That is, you can't excise `:db/txInstant`, but you could remove the + // `:db/valueType :db.type/instant` from `:db/txInstant`. + assert_transact!(conn, r#"[ + {:db/excise :db/txInstant} + ]"#, + Err("bad excision: cannot mutate schema")); + + // Can't excise in the `:db.part/{db,tx}` partitions. + // TODO: test that we can't excise in the `:db.part/db` partition. + let report = assert_transact!(conn, r#"[ + ]"#); + + assert_transact!(conn, &format!(r#"[ + [:db/add "e" :db/excise {}] + ]"#, report.tx_id), + Err("bad excision: cannot target entity in partition :db.part/tx")); + + // TODO: Don't allow anything more than excisions in the excising transaction, except + // additional facts about the (transaction-tx). + } + + #[test] + fn test_excision() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002]} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ + {:db/id "e" :db/excise 300} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [301 :test/ref 300 ?tx false] + [65536 :db/excise 300 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [301 :test/one 1001] + [?e :db/excise 300]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300].into_iter().collect(), + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the target entity of the excision. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/one 1001 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } + + #[test] + fn test_excision_with_attrs() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. Also + // includes a self ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002] + :test/ref 300} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [300 :test/ref 300] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ + {:db/id "e" :db/excise 300 :db.excise/attrs [:test/one :test/many]} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db.excise/attrs :test/one ?tx true] + [65536 :db.excise/attrs :test/many ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/ref 300] + [301 :test/one 1001] + [301 :test/ref 300] + [?e :db/excise 300] + [?e :db.excise/attrs :test/one] + [?e :db.excise/attrs :test/many]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300].into_iter().collect(), + // Ordered by keyword. + attrs: Some(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "many")).unwrap().0) + .chain(::std::iter::once(conn.schema.require_entid(&Keyword::namespaced("test", "one")).unwrap().0)) + .collect()), + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [300 :test/ref 300 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db.excise/attrs :test/one ?tx3 true] + [?e :db.excise/attrs :test/many ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/ref 300 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db.excise/attrs :test/one ?tx3 true] + [?e :db.excise/attrs :test/many ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } + + #[test] + fn test_excision_with_before_tx() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + let report = assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001]} + {:db/id 301 + :test/ref 300} + ]"#); + + // Let's assert a new cardinality one value. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1001 + :test/many [2002 2003]} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1001] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [300 :test/many 2003] + [301 :test/ref 300]]"#); + + let (tempid_report, witnessed) = assert_transact_witnessed!(conn, &format!(r#"[ + [:db/add "e" :db/excise 300] + [:db/add "e" :db.excise/beforeT {}] + ]"#, report.tx_id)); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&tempid_report), + "{\"e\" 65536}"); + assert_matches!(witnessed, &format!(r#" + [[300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db.excise/beforeT {} ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#, report.tx_id)); + + // After. + assert_matches!(conn.datoms(), &format!(r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1001] + [300 :test/many 2002] + [300 :test/many 2003] + [301 :test/ref 300] + [?e :db/excise 300] + [?e :db.excise/beforeT {}]]"#, report.tx_id)); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300].into_iter().collect(), + // Ordered by keyword. + attrs: None, + before_tx: Some(report.tx_id), + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), &format!(r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[300 :test/one 1000 ?tx3 false] + [300 :test/one 1001 ?tx3 true] + [300 :test/many 2002 ?tx3 true] + [300 :test/many 2003 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?e :db.excise/beforeT {} ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#, report.tx_id)); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), &format!(r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [; Observe that the "dangling retraction" commented out immediately below is not present! + ; [300 :test/one 1000 ?tx3 false] + [300 :test/one 1001 ?tx3 true] + [300 :test/many 2002 ?tx3 true] + [300 :test/many 2003 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?e :db.excise/beforeT {} ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#, report.tx_id)); + } + + #[test] + fn test_excision_fulltext() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 + :db/ident :test/fulltext + :db/valueType :db.type/string + :db/cardinality :db.cardinality/one + :db/fulltext true + :db/index true} + ]"#); + + assert_transact!(conn, r#"[ + {:db/id 300 + :test/fulltext "test1"} + {:db/id 301 + :test/fulltext "test2"} + ]"#); + + assert_transact!(conn, r#"[ + {:db/id 300 + :test/fulltext "test3"} + {:db/id 301 + :test/fulltext "test4"} + ]"#); + + // Before. + assert_matches!(conn.fulltext_values(), r#" + [[1 "test1"] + [2 "test2"] + [3 "test3"] + [4 "test4"]]"#); + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/fulltext] + [200 :db/valueType :db.type/string] + [200 :db/cardinality :db.cardinality/one] + [200 :db/index true] + [200 :db/fulltext true] + [300 :test/fulltext 3] + [301 :test/fulltext 4]]"#); + + let (tempid_report, witnessed) = assert_transact_witnessed!(conn, r#"[ + [:db/add "e" :db/excise 300] + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&tempid_report), + "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/fulltext "test3" ?tx false] + [65536 :db/excise 300 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/fulltext] + [200 :db/valueType :db.type/string] + [200 :db/cardinality :db.cardinality/one] + [200 :db/index true] + [200 :db/fulltext true] + [301 :test/fulltext 4] + [?e :db/excise 300]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300].into_iter().collect(), + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/fulltext ?tx1 true] + [200 :db/valueType :db.type/string ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [200 :db/index true ?tx1 true] + [200 :db/fulltext true ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/fulltext 1 ?tx2 true] + [301 :test/fulltext 2 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[300 :test/fulltext 1 ?tx3 false] + [300 :test/fulltext 3 ?tx3 true] + [301 :test/fulltext 2 ?tx3 false] + [301 :test/fulltext 4 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the targeted attributes of the target entity. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/fulltext ?tx1 true] + [200 :db/valueType :db.type/string ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [200 :db/index true ?tx1 true] + [200 :db/fulltext true ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[301 :test/fulltext 2 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[301 :test/fulltext 2 ?tx3 false] + [301 :test/fulltext 4 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]] + [[?e :db/excise 300 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]]]"#); + + // After processing the pending excision, we have vacuumed dangling fulltext values. + assert_matches!(conn.fulltext_values(), r#" + [[2 "test2"] + [4 "test4"]]"#); + } + + #[test] + fn test_excision_multiple() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + {:db/id 202 :db/ident :test/ref :db/valueType :db.type/ref :db/cardinality :db.cardinality/one} + ]"#); + + // Simplest case: just a `:db/excise` target, potentially including an inbound ref. + assert_transact!(conn, r#"[ + {:db/id 300 + :test/one 1000 + :test/many [2000 2001 2002]} + {:db/id 301 + :test/one 1001 + :test/ref 300} + ]"#); + + // Before. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [300 :test/one 1000] + [300 :test/many 2000] + [300 :test/many 2001] + [300 :test/many 2002] + [301 :test/one 1001] + [301 :test/ref 300]]"#); + + let (report, witnessed) = assert_transact_witnessed!(conn, r#"[ + {:db/id "e" :db/excise [300 301]} + ]"#); + // This is implementation specific, but it should be deterministic. + assert_matches!(tempids(&report), + "{\"e\" 65536}"); + assert_matches!(witnessed, r#" + [[300 :test/one 1000 ?tx false] + [300 :test/many 2000 ?tx false] + [300 :test/many 2001 ?tx false] + [300 :test/many 2002 ?tx false] + [301 :test/one 1001 ?tx false] + [301 :test/ref 300 ?tx false] + [65536 :db/excise 300 ?tx true] + [65536 :db/excise 301 ?tx true] + [?tx :db/txInstant ?ms ?tx true]] + "#); + + // After. + assert_matches!(conn.datoms(), r#" + [[200 :db/ident :test/one] + [200 :db/valueType :db.type/long] + [200 :db/cardinality :db.cardinality/one] + [201 :db/ident :test/many] + [201 :db/valueType :db.type/long] + [201 :db/cardinality :db.cardinality/many] + [202 :db/ident :test/ref] + [202 :db/valueType :db.type/ref] + [202 :db/cardinality :db.cardinality/one] + [?e :db/excise 300] + [?e :db/excise 301]]"#); + + // We have enqueued a pending excision. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, ::std::iter::once((65536, excision::Excision { + targets: vec![300, 301].into_iter().collect(), + attrs: None, + before_tx: None, + })).collect()); + + // Before processing the pending excision, we have full transactions in the transaction log. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[300 :test/one 1000 ?tx2 true] + [300 :test/many 2000 ?tx2 true] + [300 :test/many 2001 ?tx2 true] + [300 :test/many 2002 ?tx2 true] + [301 :test/one 1001 ?tx2 true] + [301 :test/ref 300 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db/excise 301 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + + excision::ensure_no_pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("ensure_no_pending_excisions"); + + // After processing the pending excision, we have nothing left pending. + let pending = excision::pending_excisions(&conn.sqlite, &conn.partition_map, &conn.schema).expect("pending_excisions"); + assert_eq!(pending, Default::default()); + + // After processing the pending excision, we have rewritten transactions in the transaction + // log to not refer to the target entity of the excision. + assert_matches!(conn.transactions(), r#" + [[[200 :db/ident :test/one ?tx1 true] + [200 :db/valueType :db.type/long ?tx1 true] + [200 :db/cardinality :db.cardinality/one ?tx1 true] + [201 :db/ident :test/many ?tx1 true] + [201 :db/valueType :db.type/long ?tx1 true] + [201 :db/cardinality :db.cardinality/many ?tx1 true] + [202 :db/ident :test/ref ?tx1 true] + [202 :db/valueType :db.type/ref ?tx1 true] + [202 :db/cardinality :db.cardinality/one ?tx1 true] + [?tx1 :db/txInstant ?ms ?tx1 true]] + [[?tx2 :db/txInstant ?ms2 ?tx2 true]] + [[?e :db/excise 300 ?tx3 true] + [?e :db/excise 301 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]]]"#); + } } diff --git a/db/src/debug.rs b/db/src/debug.rs index 80cb1fe4d..28c62062d 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -13,7 +13,7 @@ /// Low-level functions for testing. -// Macro to parse a `Borrow` to an `edn::Value` and assert the given `edn::Value` `matches` +// Macro to parse a `&str` to an `edn::Value` and assert the given `edn::Value` `matches` // against it. // // This is a macro only to give nice line numbers when tests fail. @@ -21,7 +21,7 @@ macro_rules! assert_matches { ( $input: expr, $expected: expr ) => {{ // Failure to parse the expected pattern is a coding error, so we unwrap. - let pattern_value = edn::parse::value($expected.borrow()) + let pattern_value = edn::parse::value($expected) .expect(format!("to be able to parse expected {}", $expected).as_str()) .without_spans(); let input_value = $input.to_edn(); @@ -39,20 +39,58 @@ macro_rules! assert_matches { macro_rules! assert_transact { ( $conn: expr, $input: expr, $expected: expr ) => {{ trace!("assert_transact: {}", $input); - let result = $conn.transact($input).map_err(|e| e.to_string()); + let result = $conn.transact($input, NullWatcher()).map_err(|e| e.to_string()); assert_eq!(result, $expected.map_err(|e| e.to_string())); }}; ( $conn: expr, $input: expr ) => {{ trace!("assert_transact: {}", $input); - let result = $conn.transact($input); + let result = $conn.transact($input, NullWatcher()); assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err()); result.unwrap() }}; } -use std::borrow::Borrow; -use std::collections::BTreeMap; -use std::io::{Write}; +// Transact $input against the given $conn, expecting success. +// +// This unwraps safely and makes asserting errors pleasant. +#[macro_export] +macro_rules! assert_transact_witnessed { + ( $conn: expr, $input: expr ) => {{ + let witnessed = ::std::cell::RefCell::new(None); + let result = { // Scope borrow of witnessed. + let watcher = debug::CollectingWatcher::new(&witnessed); + + trace!("assert_transact: {}", $input); + let result = $conn.transact($input, watcher); + assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err()); + result + }; + + let witnessed = witnessed.into_inner(); + assert!(witnessed.is_some(), "Expected Some(_), got `None`"); + + let (datoms, tx) = witnessed.unwrap(); + let datoms = debug::Datoms::new($conn.schema.clone(), datoms.into_iter().map(|(op, e, a, v)| debug::Datom { e, a, v, tx, added: Some(op == OpType::Add) }).collect()); + + (result.unwrap(), datoms) + }}; +} + +use std::cell::{ + RefCell, +}; +use std::cmp::{ + Ordering, +}; +use std::collections::{ + BTreeMap, +}; +use std::io::{ + Write, +}; +use std::ops::{ + Deref, +}; use itertools::Itertools; use rusqlite; @@ -61,12 +99,21 @@ use rusqlite::types::{ToSql}; use tabwriter::TabWriter; use bootstrap; -use db::*; -use db::{read_attribute_map,read_ident_map}; -use edn; +use db::{ + TypedSQLValue, + ensure_current_version, + new_connection, + read_attribute_map, + read_ident_map, +}; +use edn::{ + self, + ValueRc, +}; use entids; use errors::Result; use mentat_core::{ + Entid, HasSchema, SQLValueType, TxReport, @@ -78,6 +125,7 @@ use edn::{ }; use edn::entities::{ EntidOrIdent, + OpType, TempId, }; use internal_types::{ @@ -86,21 +134,27 @@ use internal_types::{ use schema::{ SchemaBuilding, }; -use types::*; use tx::{ transact, transact_terms, }; -use watcher::NullWatcher; +use types::{ + Partition, + PartitionMap, + Schema, +}; +pub use watcher::{ + NullWatcher, + TransactWatcher, +}; /// Represents a *datom* (assertion) in the store. #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] pub struct Datom { - // TODO: generalize this. - pub e: EntidOrIdent, - pub a: EntidOrIdent, - pub v: edn::Value, - pub tx: i64, + pub e: Entid, + pub a: Entid, + pub v: TypedValue, + pub tx: Entid, pub added: Option, } @@ -109,7 +163,44 @@ pub struct Datom { /// To make comparision easier, we deterministically order. The ordering is the ascending tuple /// ordering determined by `(e, a, (value_type_tag, v), tx)`, where `value_type_tag` is an internal /// value that is not exposed but is deterministic. -pub struct Datoms(pub Vec); +pub struct Datoms { + pub schema: ValueRc, + pub datoms: Vec, +} + +/// Sort datoms by `[e a v]`, grouping by `tx` first if `added` is present. +fn datom_cmp(x: &Datom, y: &Datom) -> Ordering { + match x.added.is_some() { + true => + (&x.tx, &x.e, &x.a, x.v.value_type().value_type_tag(), &x.v, &x.added).cmp( + &(&y.tx, &y.e, &y.a, y.v.value_type().value_type_tag(), &y.v, &y.added)), + false => + (&x.e, &x.a, x.v.value_type().value_type_tag(), &x.v, &x.tx).cmp( + &(&y.e, &y.a, y.v.value_type().value_type_tag(), &y.v, &y.tx)), + } +} + +impl Datoms { + pub fn new(schema: I, datoms: Vec) -> Self where I: Into> { + let schema = schema.into(); + + let mut datoms = datoms; + datoms[..].sort_unstable_by(datom_cmp); + + Datoms { + schema, + datoms, + } + } +} + +impl Deref for Datoms { + type Target = [Datom]; + + fn deref(&self) -> &Self::Target { + self.datoms.deref() + } +} /// Represents an ordered sequence of transactions in the store. /// @@ -119,11 +210,19 @@ pub struct Datoms(pub Vec); /// retracted assertions appear before added assertions. pub struct Transactions(pub Vec); +impl Deref for Transactions { + type Target = [Datoms]; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + /// Represents the fulltext values in the store. pub struct FulltextValues(pub Vec<(i64, String)>); impl Datom { - pub fn to_edn(&self) -> edn::Value { + pub fn to_edn(&self, schema: &Schema) -> edn::Value { let f = |entid: &EntidOrIdent| -> edn::Value { match *entid { EntidOrIdent::Entid(ref y) => edn::Value::Integer(y.clone()), @@ -131,7 +230,9 @@ impl Datom { } }; - let mut v = vec![f(&self.e), f(&self.a), self.v.clone()]; + let mut v = vec![edn::Value::Integer(self.e), + f(&to_entid_or_ident(schema, self.a)), + self.v.clone().map_ident(schema).to_edn_value_pair().0]; if let Some(added) = self.added { v.push(edn::Value::Integer(self.tx)); v.push(edn::Value::Boolean(added)); @@ -143,7 +244,7 @@ impl Datom { impl Datoms { pub fn to_edn(&self) -> edn::Value { - edn::Value::Vector((&self.0).into_iter().map(|x| x.to_edn()).collect()) + edn::Value::Vector((&self.datoms).into_iter().map(|x| x.to_edn(&self.schema)).collect()) } } @@ -175,97 +276,77 @@ impl ToIdent for TypedValue { } /// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`. -pub fn to_entid(schema: &Schema, entid: i64) -> EntidOrIdent { +pub fn to_entid_or_ident(schema: &Schema, entid: Entid) -> EntidOrIdent { schema.get_ident(entid).map_or(EntidOrIdent::Entid(entid), |ident| EntidOrIdent::Ident(ident.clone())) } // /// Convert a symbolic ident to an ident `Entid` if possible, otherwise a numeric `Entid`. -// pub fn to_ident(schema: &Schema, entid: i64) -> Entid { +// pub fn to_ident(schema: &Schema, entid: Entid) -> Entid { // schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone())) // } /// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of /// the form [... :db/txInstant ...]. -pub fn datoms>(conn: &rusqlite::Connection, schema: &S) -> Result { +pub fn datoms(conn: &rusqlite::Connection, schema: &Schema) -> Result { datoms_after(conn, schema, bootstrap::TX0 - 1) } -/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, -/// ordered by (e, a, v, tx). -/// -/// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. -pub fn datoms_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { - let borrowed_schema = schema.borrow(); +/// Turn a row like `SELECT e, a, v, value_type_tag, tx[, added]?` into a `Datom`, optionally +/// filtering `:db/txInstant` datoms out. +fn row_to_datom(schema: &Schema, filter_tx_instant: bool, row: &rusqlite::Row) -> Result> { + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; - let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, value_type_tag ASC, v ASC, tx ASC")?; + if filter_tx_instant && a == entids::DB_TX_INSTANT { + return Ok(None); + } - let r: Result> = stmt.query_and_then(&[&tx], |row| { - let e: i64 = row.get_checked(0)?; - let a: i64 = row.get_checked(1)?; + let v: rusqlite::types::Value = row.get_checked(2)?; + let value_type_tag: i32 = row.get_checked(3)?; - if a == entids::DB_TX_INSTANT { - return Ok(None); - } + let attribute = schema.require_attribute_for_entid(a)?; + let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; - let v: rusqlite::types::Value = row.get_checked(2)?; - let value_type_tag: i32 = row.get_checked(3)?; + let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; - let attribute = borrowed_schema.require_attribute_for_entid(a)?; - let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; + let tx: Entid = row.get_checked(4)?; + let added: Option = row.get_checked(5).ok(); - let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema); - let (value, _) = typed_value.to_edn_value_pair(); + Ok(Some(Datom { + e, + a, + v: typed_value, + tx, + added, + })) +} - let tx: i64 = row.get_checked(4)?; +/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, +/// ordered by (e, a, v, tx). +/// +/// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. +pub(crate) fn datoms_after(conn: &rusqlite::Connection, schema: &Schema, tx: Entid) -> Result { + let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ?")?; - Ok(Some(Datom { - e: EntidOrIdent::Entid(e), - a: to_entid(borrowed_schema, a), - v: value, - tx: tx, - added: None, - })) - })?.collect(); + let r: Result> = stmt.query_and_then(&[&tx], |row| row_to_datom(schema, true, row))?.collect(); - Ok(Datoms(r?.into_iter().filter_map(|x| x).collect())) + Ok(Datoms::new(schema.clone(), r?.into_iter().filter_map(|x| x).collect())) } /// Return the sequence of transactions in the store with transaction ID strictly greater than the /// given `tx`, ordered by (tx, e, a, v). /// /// Each transaction returned includes the [(transaction-tx) :db/txInstant ...] datom. -pub fn transactions_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { - let borrowed_schema = schema.borrow(); - - let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, value_type_tag ASC, v ASC, added ASC")?; - - let r: Result> = stmt.query_and_then(&[&tx], |row| { - let e: i64 = row.get_checked(0)?; - let a: i64 = row.get_checked(1)?; +pub fn transactions_after(conn: &rusqlite::Connection, schema: &Schema, tx: Entid) -> Result { + let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ?")?; - let v: rusqlite::types::Value = row.get_checked(2)?; - let value_type_tag: i32 = row.get_checked(3)?; + let r: Result> = stmt.query_and_then(&[&tx], |row| row_to_datom(schema, false, row))?.collect(); - let attribute = borrowed_schema.require_attribute_for_entid(a)?; - let value_type_tag = if !attribute.fulltext { value_type_tag } else { ValueType::Long.value_type_tag() }; - - let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema); - let (value, _) = typed_value.to_edn_value_pair(); - - let tx: i64 = row.get_checked(4)?; - let added: bool = row.get_checked(5)?; - - Ok(Datom { - e: EntidOrIdent::Entid(e), - a: to_entid(borrowed_schema, a), - v: value, - tx: tx, - added: Some(added), - }) - })?.collect(); + let schema_rc: ValueRc = schema.clone().into(); // Group by tx. - let r: Vec = r?.into_iter().group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms(group.collect())).collect(); + let r: Vec = r?.into_iter().filter_map(|x| x).group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms::new(schema_rc.clone(), group.collect())).collect(); + Ok(Transactions(r)) } @@ -329,16 +410,16 @@ impl TestConn { assert_eq!(materialized_schema, self.schema); } - pub fn transact(&mut self, transaction: I) -> Result where I: Borrow { + pub fn transact(&mut self, transaction: &str, watcher: W) -> Result where W: TransactWatcher { // Failure to parse the transaction is a coding error, so we unwrap. - let entities = edn::parse::entities(transaction.borrow()).expect(format!("to be able to parse {} into entities", transaction.borrow()).as_str()); + let entities = edn::parse::entities(transaction).expect(format!("to be able to parse {} into entities", transaction).as_str()); let details = { // The block scopes the borrow of self.sqlite. // We're about to write, so go straight ahead and get an IMMEDIATE transaction. let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; // Applying the transaction can fail, so we don't unwrap. - let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, NullWatcher(), entities)?; + let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, watcher, entities)?; tx.commit()?; details }; @@ -403,12 +484,12 @@ impl TestConn { // Does not include :db/txInstant. let datoms = datoms_after(&conn, &db.schema, 0).unwrap(); - assert_eq!(datoms.0.len(), 94); + assert_eq!(datoms.len(), 106); // Includes :db/txInstant. let transactions = transactions_after(&conn, &db.schema, 0).unwrap(); - assert_eq!(transactions.0.len(), 1); - assert_eq!(transactions.0[0].0.len(), 95); + assert_eq!(transactions.len(), 1); + assert_eq!(transactions[0].len(), datoms.len() + 1); let mut parts = db.partition_map; @@ -453,3 +534,32 @@ pub fn tempids(report: &TxReport) -> TempIds { } TempIds(edn::Value::Map(map)) } + +/// A `CollectingWatcher` accumulates witnessed datoms. +/// +/// The internal `RefCell` is how we get data _out_ of a `TransactWatcher`, which isn't well +/// supported right now. +pub struct CollectingWatcher<'a> { + pub cell: &'a RefCell, Entid)>>, + pub collection: Vec<(OpType, Entid, Entid, TypedValue)>, +} + +impl<'a> CollectingWatcher<'a> { + pub fn new(cell: &'a RefCell, Entid)>>) -> Self { + CollectingWatcher { cell, collection: Vec::new() } + } +} + +impl<'a> TransactWatcher for CollectingWatcher<'a> { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { + self.collection.push((op, e, a, v.clone())) + } + + fn done(&mut self, tx: &Entid, _schema: &Schema) -> Result<()> { + let mut temp = vec![]; + ::std::mem::swap(&mut temp, &mut self.collection); + self.cell.replace(Some((temp, *tx))); + + Ok(()) + } +} diff --git a/db/src/errors.rs b/db/src/errors.rs index 5fcd743aa..b0a7967ce 100644 --- a/db/src/errors.rs +++ b/db/src/errors.rs @@ -217,6 +217,10 @@ pub enum DbErrorKind { #[fail(display = "bad schema assertion: {}", _0)] BadSchemaAssertion(String), + /// A bad excision was transacted. + #[fail(display = "bad excision: {}", _0)] + BadExcision(String), + /// An ident->entid mapping failed. #[fail(display = "no entid found for ident: {}", _0)] UnrecognizedIdent(String), diff --git a/db/src/excision.rs b/db/src/excision.rs new file mode 100644 index 000000000..adbf1ea0c --- /dev/null +++ b/db/src/excision.rs @@ -0,0 +1,417 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::collections::{ + BTreeSet, + BTreeMap, +}; + +use itertools::Itertools; + +use rusqlite; + +use edn::entities::{ + OpType, +}; + +use mentat_core::{ + Attribute, + Entid, + HasSchema, + Schema, + SQLValueType, + TypedValue, + ValueType, +}; + +use db::{ + TypedSQLValue, +}; + +use entids; + +use errors::{ + DbErrorKind, + Result, +}; + +use internal_types::{ + AEVTrie, + filter_aev_to_eav, +}; + +use schema::{ + SchemaBuilding, +}; + +use types::{ + PartitionMap, +}; + +use watcher::{ + TransactWatcher, +}; + +/// Details about an excision: +/// - targets to excise (for now, a non-empty set of entids); +/// - a possibly empty set of attributes to excise (the empty set means all attributes, not no +/// attributes); +/// - and a possibly omitted transaction ID to limit the excision before. +/// +/// `:db/before` doesn't make sense globally, since in Mentat, monotonically increasing +/// transaction IDs don't guarantee monotonically increasing txInstant values. Therefore, we +/// accept only `:db/beforeT` and allow consumers to turn `:db/before` timestamps into +/// transaction IDs in whatever way they see fit. +#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)] +pub(crate) struct Excision { + pub(crate) targets: BTreeSet, + pub(crate) attrs: Option>, + pub(crate) before_tx: Option, +} + +/// Map from `entid` to excision details. `entid` is not one of the excision `targets`! +pub(crate) type ExcisionMap = BTreeMap; + +/// Extract excisions from the given transacted datoms. +pub(crate) fn excisions<'schema>(partition_map: &'schema PartitionMap, schema: &'schema Schema, aev_trie: &AEVTrie<'schema>) -> Result> { + let pair = |a: Entid| -> Result<(Entid, &'schema Attribute)> { + schema.require_attribute_for_entid(a).map(|attribute| (a, attribute)) + }; + + if aev_trie.contains_key(&pair(entids::DB_EXCISE_BEFORE)?) { + bail!(DbErrorKind::BadExcision(":db.excise/before".into())); // TODO: more details. + } + + // TODO: Don't allow anything more than excisions in the excising transaction, except + // additional facts about the (transaction-tx). + let eav_trie = filter_aev_to_eav(aev_trie, |&(a, _)| + a == entids::DB_EXCISE || + a == entids::DB_EXCISE_ATTRS || + a == entids::DB_EXCISE_BEFORE_T); + + let mut excisions = ExcisionMap::default(); + + for (&e, avs) in eav_trie.iter() { + for (&(_a, _attribute), ars) in avs { + if !ars.retract.is_empty() { + bail!(DbErrorKind::BadExcision("retraction".into())); // TODO: more details. + } + } + + let before_tx = avs.get(&pair(entids::DB_EXCISE_BEFORE_T)?) + .and_then(|ars| { + assert_eq!(ars.add.len(), 1, "witnessed more than one :db.excise/beforeT"); + assert!(ars.retract.is_empty(), "witnessed [:db/retract ... :db.excise/beforeT ...]"); + ars.add.iter().next().cloned() + }) + .and_then(|v| v.into_entid()); + + let attrs = avs.get(&pair(entids::DB_EXCISE_ATTRS)?) + .map(|ars| ars.add.clone().into_iter().filter_map(|v| v.into_entid()).collect()); + + let targets = avs.get(&pair(entids::DB_EXCISE)?) + .map(|ars| { + assert!(ars.retract.is_empty(), "witnessed [:db/retract ... :db/excise ...]"); + assert!(!ars.add.is_empty(), "witnessed empty :db/excise target set"); + let targets: BTreeSet<_> = ars.add.clone().into_iter().filter_map(|v| v.into_entid()).collect(); + assert_eq!(targets.len(), ars.add.len(), "witnessed non-entid :db/excise target"); + targets + }) + .ok_or_else(|| DbErrorKind::BadExcision("no :db/excise".into()))?; // TODO: more details. + + for target in &targets { + if schema.get_ident(*target).is_some() { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); // TODO: more details. + } + + let (name, partition) = partition_map.partition_for_entid(*target) + .ok_or_else(|| DbErrorKind::BadExcision("target has no partition".into()))?; // TODO: more details. + if !partition.allow_excision { + bail!(DbErrorKind::BadExcision(format!("cannot target entity in partition {}", name).into())); // TODO: more details. + } + } + + let excision = Excision { + targets, + attrs: attrs.clone(), + before_tx, + }; + + excisions.insert(e, excision); + } + + if excisions.is_empty() { + Ok(None) + } else { + Ok(Some(excisions)) + } +} + +fn excise_datoms_for_excision(conn: &rusqlite::Connection, watcher: &mut W, entid: Entid, excision: &Excision) -> Result<()> +where W: TransactWatcher { + let targets = excision.targets.iter().join(", "); + + // Each branch below collects rowids and datoms to excise: datoms for reporting to the given + // watcher and rowids for deleting. + match (excision.before_tx, &excision.attrs) { + (Some(before_tx), Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND a IN ({}) AND tx <= {}", + entid, targets, s, before_tx).as_ref(), &[])?; + }, + (Some(before_tx), None) => { + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND tx <= {}", + entid, targets, before_tx).as_ref(), &[])?; + }, + (None, Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE e IN ({}) AND a IN ({})", + entid, targets, s).as_ref(), &[])?; + }, + (None, None) => { + // TODO: Use AVET index to speed this up? + conn.execute(format!("CREATE TABLE temp.excision_{} AS SELECT rowid, index_fulltext, e, a, v, value_type_tag FROM datoms WHERE (e IN ({}) OR (v IN ({}) AND value_type_tag IS {} AND a IS NOT {}))", + entid, targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE).as_ref(), &[])?; + }, + } + + // Report all datoms (fulltext and non-fulltext) to the watcher. This is functionally + // equivalent to the `all_datoms` view, but that view doesn't pass through rowid, which is + // required for deleting. + let s = format!(r#" + SELECT e, a, v, value_type_tag + FROM temp.excision_{} + WHERE index_fulltext IS 0 + + UNION ALL + + SELECT e, a, fulltext_values.text AS v, value_type_tag + FROM temp.excision_{}, fulltext_values + WHERE index_fulltext IS NOT 0 AND temp.excision_{}.v = fulltext_values.rowid + "#, entid, entid, entid); + let mut stmt = conn.prepare(s.as_ref())?; + + let mut rows = stmt.query(&[])?; + + while let Some(row) = rows.next() { + let row = row?; + let e: Entid = row.get_checked(0)?; + let a: Entid = row.get_checked(1)?; + let value_type_tag: i32 = row.get_checked(3)?; + let v = TypedValue::from_sql_value_pair(row.get_checked(2)?, value_type_tag)?; + + watcher.datom(OpType::Retract, e, a, &v); + } + + conn.execute(format!("WITH rowids AS (SELECT rowid FROM temp.excision_{}) DELETE FROM datoms WHERE rowid IN rowids", entid).as_ref(), &[])?; + + conn.execute(format!("DROP TABLE temp.excision_{}", entid).as_ref(), &[])?; + + Ok(()) +} + +/// Given an `excision`, rewrite transactions with IDs in the interval `(first_tx_needing_rewrite, last_tx_needing_rewrite]`. +fn excise_transactions_in_range(conn: &rusqlite::Connection, excision: &Excision, first_tx_needing_rewrite: Entid, last_tx_needing_rewrite: Entid) -> Result<()> { + let targets = excision.targets.iter().join(", "); + + // TODO: intersect the ranges ourselves to statically save SQLite doing some work. + let tx_where = format!("({} < t.tx AND t.tx <= {})", first_tx_needing_rewrite, last_tx_needing_rewrite); + + match (excision.before_tx, &excision.attrs) { + (Some(before_tx), Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND t.tx <= {} AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, s, before_tx, tx_where).as_ref(), &[])?; + }, + (Some(before_tx), None) => { + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.tx <= {} AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, before_tx, tx_where).as_ref(), &[])?; + }, + (None, Some(ref attrs)) => { + let s = attrs.iter().join(", "); + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE t.e IN ({}) AND t.a IN ({}) AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, s, tx_where).as_ref(), &[])?; + }, + (None, None) => { + conn.execute(format!("WITH ids AS (SELECT t.rowid FROM transactions AS t WHERE (t.e IN ({}) OR (t.v IN ({}) AND t.value_type_tag IS {} AND t.a IS NOT {})) AND {}) DELETE FROM transactions WHERE rowid IN ids", + targets, targets, ValueType::Ref.value_type_tag(), entids::DB_EXCISE, tx_where).as_ref(), &[])?; + }, + } + + Ok(()) +} + +/// Record the given `excisions` as applying to the transaction with ID `tx_id`. +pub(crate) fn enqueue_excisions(conn: &rusqlite::Connection, schema: &Schema, tx_id: Entid, excisions: &ExcisionMap) -> Result<()> { + let mut stmt1: rusqlite::Statement = conn.prepare("INSERT INTO excisions VALUES (?, ?, ?)")?; + let mut stmt2: rusqlite::Statement = conn.prepare("INSERT INTO excision_targets VALUES (?, ?)")?; + let mut stmt3: rusqlite::Statement = conn.prepare("INSERT INTO excision_attrs VALUES (?, ?)")?; + + for (entid, excision) in excisions { + let last_tx_needing_rewrite = excision.before_tx.unwrap_or(tx_id); + stmt1.execute(&[entid, &excision.before_tx, &last_tx_needing_rewrite])?; + + for target in &excision.targets { + stmt2.execute(&[entid, target])?; + } + + if let Some(ref attrs) = excision.attrs { + for attr in attrs { + stmt3.execute(&[entid, attr])?; + } + } + } + + Ok(()) +} + +/// Start processing the given `excisions`: update the `datoms` table in place, synchronously. +/// +/// Purged datoms are reported to the given `watcher`. +pub(crate) fn excise_datoms_for_excisions(conn: &rusqlite::Connection, watcher: &mut W, excisions: &ExcisionMap) -> Result<()> +where W: TransactWatcher { + // TODO: do this more efficiently. + for (entid, excision) in excisions { + excise_datoms_for_excision(conn, watcher, *entid, &excision)?; + } + + Ok(()) +} + +fn pending_excision_list(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result> { + let mut stmt1: rusqlite::Statement = conn.prepare("SELECT e, before_tx, last_tx_needing_rewrite FROM excisions WHERE last_tx_needing_rewrite > 0 ORDER BY e")?; + let mut stmt2: rusqlite::Statement = conn.prepare("SELECT target FROM excision_targets WHERE e IS ?")?; + let mut stmt3: rusqlite::Statement = conn.prepare("SELECT a FROM excision_attrs WHERE e IS ?")?; + + let m: Result> = stmt1.query_and_then(&[], |row| { + let e: Entid = row.get_checked(0)?; + let before_tx: Option = row.get_checked(1)?; + let last_tx_needing_rewrite: Entid = row.get_checked(2)?; + + let targets: Result> = stmt2.query_and_then(&[&e], |row| { + let target: Entid = row.get_checked(0)?; + Ok(target) + })?.collect(); + let targets = targets?; + + let attrs: Result> = stmt3.query_and_then(&[&e], |row| { + let a: Entid = row.get_checked(0)?; + Ok(a) + })?.collect(); + let attrs = attrs.map(|attrs| { + if attrs.is_empty() { + None + } else { + Some(attrs) + } + })?; + + let excision = Excision { + targets, + before_tx, + attrs, + }; + + Ok((e, excision, last_tx_needing_rewrite)) + })?.collect(); + + m +} + +pub(crate) fn pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { + let list = pending_excision_list(conn, partition_map, schema)?; + Ok(list.into_iter().map(|(entity, excision, _last_tx_needing_rewrite)| (entity, excision)).collect()) +} + +pub(crate) fn ensure_no_pending_excisions(conn: &rusqlite::Connection, partition_map: &PartitionMap, schema: &Schema) -> Result { + let list = pending_excision_list(conn, partition_map, schema)?; + + for (_entid, excision, last_tx_needing_rewrite) in &list { + excise_transactions_in_range(conn, &excision, 0, *last_tx_needing_rewrite)?; + } + + delete_dangling_retractions(conn)?; + + conn.execute("UPDATE excisions SET last_tx_needing_rewrite = 0", &[])?; + + // TODO: only vacuum fulltext if an excision (likely) impacted fulltext values, since this is + // very expensive. As always, correctness first, performance second. + vacuum_fulltext_table(conn)?; + + // This is very (!) expensive, but if we really want to ensure that excised data doesn't remain + // on the filesystem, it's necessary. + conn.execute("VACUUM", &[])?; + + Ok(list.into_iter().map(|(entity, excision, _last_tx_needing_rewrite)| (entity, excision)).collect()) +} + + +/// Delete fulltext values that are no longer refered to in the `datoms` or `transactions` table. +pub(crate) fn vacuum_fulltext_table(conn: &rusqlite::Connection) -> Result<()> { + let (true_value, true_value_type_tag) = TypedValue::Boolean(true).to_sql_value_pair(); + + // First, collect all `:db/fulltext true` attributes. This is easier than extracting them from + // a `Schema` (no need to execute multiple insertions for large collections), but less flexible. + conn.execute(r#"CREATE TABLE temp.fulltext_as (a SMALLINT NOT NULL)"# , &[])?; + conn.execute(r#"INSERT INTO temp.fulltext_as (a) + SELECT e FROM schema WHERE a = ? AND v = ? AND value_type_tag = ?"# , + &[&entids::DB_FULLTEXT, &true_value, &true_value_type_tag])?; + + // Next, purge values that aren't referenced. We're using that `:db/fulltext true` attributes + // always have `:db/index true`, so that we can use the `avet` index. + conn.execute(r#"DELETE FROM fulltext_values + WHERE rowid NOT IN + (SELECT v + FROM datoms + WHERE index_avet IS NOT 0 AND a IN temp.fulltext_as + UNION ALL + SELECT v + FROM transactions + WHERE a IN temp.fulltext_as)"#, &[])?; + + conn.execute(r#"DROP TABLE temp.fulltext_as"# , &[])?; + + Ok(()) +} + +/// Delete dangling retractions from the transaction log. +/// +/// Suppose that `E` is a fixed entid and that the following transactions are transacted: +/// ```edn +/// [[:db/add E :db/doc "first"]] +/// [[:db/retract E :db/doc "first"]] +/// [[:db/add E :db/doc "second"]] +/// ``` +/// +/// If we excise just the first datom -- the datom corresponding to `[:db/add E :db/doc "first"]` -- +/// then there will be a "dangling retraction" in the log, which will look like: +/// ```edn +/// [[E :db/doc "first" TX1 false]] +/// [[E :db/doc "second" TX2 true]] +/// ``` +/// +/// This function purges such dangling retractions, so that a datom is always asserted before it is +/// retracted. +pub(crate) fn delete_dangling_retractions(conn: &rusqlite::Connection) -> Result<()> { + // We walk the transactions table. For each `[e a v]`, we find all of the log entries + // corresponding to the first transaction that it appeared in. We delete any entries that are + // retractions; it is not possible to retract an `[e a v]` not asserted in a prior transaction. + conn.execute(r#"WITH ids AS + (SELECT rowid FROM + (SELECT MIN(tx), added, rowid + FROM transactions + GROUP BY e, a, v, value_type_tag) + WHERE added = 0) + DELETE FROM transactions + WHERE rowid IN ids"#, + &[])?; + + Ok(()) +} diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs index 782ff22d6..4438cf2eb 100644 --- a/db/src/internal_types.rs +++ b/db/src/internal_types.rs @@ -214,3 +214,26 @@ pub(crate) struct AddAndRetract { // A trie-like structure mapping a -> e -> v that prefix compresses and makes uniqueness constraint // checking more efficient. BTree* for deterministic errors. pub(crate) type AEVTrie<'schema> = BTreeMap<(Entid, &'schema Attribute), BTreeMap>; + +// A trie-like structure mapping e -> a -> v that prefix compresses and makes accumulating fragments +// (schema fragments, say, or excision fragments) efficient. BTree* for deterministic errors. +pub(crate) type EAVTrie<'schema> = BTreeMap>; + +pub(crate) fn filter_aev_to_eav<'schema, P>(aev_trie: &AEVTrie<'schema>, mut attr_pair_predicate: P) -> EAVTrie<'schema> +where P: FnMut(&(Entid, &'schema Attribute)) -> bool { + let mut eav_trie: EAVTrie<'schema> = Default::default(); + + for (&a_pair, evs) in aev_trie.iter() { + if !attr_pair_predicate(&a_pair) { + continue + } + + for (&e, ars) in evs { + eav_trie + .entry(e).or_insert(BTreeMap::default()) + .insert(a_pair, ars.clone()); + } + } + + eav_trie +} diff --git a/db/src/lib.rs b/db/src/lib.rs index 27cd09438..eacdfebe5 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -46,6 +46,7 @@ pub mod cache; pub mod db; mod bootstrap; pub mod entids; +mod excision; pub mod internal_types; // pub because we need them for building entities programmatically. mod metadata; mod schema; diff --git a/db/src/tx.rs b/db/src/tx.rs index 85010f4b7..d6ba89ae1 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -106,6 +106,7 @@ use edn::entities::{ OpType, TempId, }; +use excision; use metadata; use rusqlite; use schema::{ @@ -714,7 +715,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { let mut aev_trie = into_aev_trie(&self.schema, final_populations, inert_terms)?; let tx_instant; - { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. // Assertions that are :db.cardinality/one and not :db.fulltext. let mut non_fts_one: Vec = vec![]; @@ -741,6 +741,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { bail!(DbErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::CardinalityConflicts { conflicts: errors })); } + let excisions = excision::excisions(&self.partition_map, &self.schema, &aev_trie)?; + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. // Collect into non_fts_*. @@ -787,9 +789,19 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.store.commit_transaction(self.tx_id)?; - } db::update_partition_map(self.store, &self.partition_map)?; + + if let Some(excisions) = excisions { + if tx_might_update_metadata { + bail!(DbErrorKind::BadExcision("cannot mutate schema".into())); + } + + excision::enqueue_excisions(self.store, self.schema, self.tx_id, &excisions)?; + + excision::excise_datoms_for_excisions(self.store, &mut self.watcher, &excisions)?; + } + self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata { diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index 9bc90fb18..25abac8e1 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -418,7 +418,10 @@ mod tests { use edn; - use mentat_db::debug::{TestConn}; + use mentat_db::debug::{ + NullWatcher, + TestConn, + }; #[test] fn test_remote_client_bound_uri() {