From f84cf4dd6179f5b079ac32001ada3201d476910a Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Fri, 20 Apr 2018 15:10:31 -0700 Subject: [PATCH] (tx) Implement :db/retractEntity. (#378) Datomic used to expose `:db.fn/retractEntity` and `:db.fn/retractAttribute`, but there latest Cloud offering has only :db/retractEntity. Since that's what I am interested in using, that's all I've implemented. This transformation doesn't follow the existing pattern of inserting into the temp.*_searches table. It instead populates temp.search_results directly from datoms, and in so doing it makes some assumptions about how the searches tables will be accessed. It might be even more efficient to have an entirely new temporary table just for these retractions. One advantage with the current scheme is that indexing restrictions placed on the search results table will apply to the datoms retracted by :db/retractEntity as well. There are a few remaining items TODO. TODO: ensure that we mark the schema as potentially modified when we :db/retractEntity. It's not clear to me how to do this efficiently. TODO: ensure that transaction watchers get the correct transacted datom stream. I didn't try to address this yet because it appears to me that the existing watcher implementation isn't quite correct: it tells watchers about datoms that are potentially to be transacted but not about datoms that actually are transacted. This, of course, matters when watching :db/retractEntity entities being transacted. --- db/src/db.rs | 110 ++++++++++++++++++++++++++++++++++++ db/src/internal_types.rs | 5 +- db/src/tx.rs | 20 +++++++ db/src/upsert_resolution.rs | 6 ++ src/entity_builder.rs | 10 ++++ tx-parser/src/lib.rs | 65 ++++++++++++++++++++- tx/src/entities.rs | 17 ++++++ 7 files changed, 229 insertions(+), 4 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index a0ed1ffbb..299bbee98 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -549,6 +549,9 @@ pub trait MentatStoring { fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; fn insert_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; + // :db/retractEntity the given entids. + fn insert_retract_entities<'a>(&self, entids: &'a [Entid]) -> Result<()>; + /// Finalize the underlying storage layer after a Mentat transaction. /// /// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the @@ -982,6 +985,48 @@ impl MentatStoring for rusqlite::Connection { results.map(|_| ()) } + /// Insert datoms corresponding to :db/retractEntity entities into the search results. + fn insert_retract_entities(&self, entids: &[Entid]) -> Result<()> { + let max_vars = self.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) as usize; + let bindings_per_statement = 2; + + let chunks: itertools::IntoChunks<_> = entids.into_iter().chunks(max_vars / bindings_per_statement); + + // We'd like to flat_map here, but it's not obvious how to flat_map across Result. + let results: Result> = chunks.into_iter().map(|chunk| -> Result<()> { + let mut count = 0; + let mut params: Vec<&ToSql> = chunk.flat_map(|e| { + count += 1; + once(e as &ToSql) + }).collect(); + + // Two copies, first for testing e, then for testing v. + // TODO: perhaps we can reference the same named parameter multiple times, making this + // more efficient. + let mut params2 = params.clone(); + params.append(&mut params2); + + let values: String = repeat_values(count, 1); + + // Note that the value for flags (-1) is nonsense. Since these rows are being removed + // from datoms, flags/flags0 is not referenced and this is fine. The value for the + // search type is also ignored. + let s: String = format!(r#" + INSERT INTO temp.search_results + SELECT d.e, d.a, d.v, d.value_type_tag, 0, -1, ':db.cardinality/many', d.rowid, d.v + FROM datoms AS d + WHERE d.e IN ({}) OR (d.value_type_tag = 0 AND d.v IN ({})) + "#, values, values); + + let mut stmt = self.prepare_cached(s.as_str())?; + stmt.execute(¶ms) + .map(|_c| ()) + .chain_err(|| "Could not retract entities!") + }).collect::>>(); + + results.and(Ok(())) + } + fn commit_transaction(&self, tx_id: Entid) -> Result<()> { search(&self)?; insert_transaction(&self, tx_id)?; @@ -2387,6 +2432,71 @@ mod tests { Err("EDN value \'1.23\' is not the expected Mentat value type Ref")); } + #[test] + fn test_retract_entity() { + let mut conn = TestConn::default(); + + // Start by installing a few attributes. + assert_transact!(conn, "[[:db/add 111 :db/ident :test/many] + [:db/add 111 :db/valueType :db.type/long] + [:db/add 111 :db/cardinality :db.cardinality/many] + [:db/add 222 :db/ident :test/component] + [:db/add 222 :db/isComponent true] + [:db/add 222 :db/valueType :db.type/ref] + [:db/add 333 :db/ident :test/dangling] + [:db/add 333 :db/valueType :db.type/ref]]"); + + // Verify that we can retract simple entities by entid. + assert_transact!(conn, "[[:db/retractEntity 111]]"); + assert_matches!(conn.last_transaction(), + "[[111 :db/ident :test/many ?tx false] + [111 :db/valueType :db.type/long ?tx false] + [111 :db/cardinality :db.cardinality/many ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Verify that we can retract entities that don't exist. + assert_transact!(conn, "[[:db/retractEntity 111]]"); + + // Verify that we can retract simple entities by keyword. + assert_transact!(conn, "[[:db/retractEntity :test/component]]"); + assert_matches!(conn.last_transaction(), + "[[222 :db/ident :test/component ?tx false] + [222 :db/valueType :db.type/ref ?tx false] + [222 :db/isComponent true ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Some data for testing associated references. + assert_transact!(conn, "[[:db/add 555 :test/dangling 666] + [:db/add 666 :test/dangling 777] + [:db/add 777 :test/dangling 777] + [:db/add 888 :test/dangling 999]]"); + + // Verify that associated references are also retracted. + assert_transact!(conn, "[[:db/retractEntity 666]]"); + assert_matches!(conn.last_transaction(), + "[[555 :test/dangling 666 ?tx false] + [666 :test/dangling 777 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // What happens if we have a self reference? + assert_transact!(conn, "[[:db/retractEntity 777]]"); + assert_matches!(conn.last_transaction(), + "[[777 :test/dangling 777 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Verify that we can retract entities that aren't recognized, but that appear as dangling + // references. + assert_transact!(conn, "[[:db/retractEntity 999]]"); + assert_matches!(conn.last_transaction(), + "[[888 :test/dangling 999 ?tx false] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Let's make sure we actually retracted from the datoms table. + assert_matches!(conn.datoms(), + "[[333 :db/ident :test/dangling] + [333 :db/valueType :db.type/ref]]"); + } + #[test] fn test_cardinality_one_violation_existing_entity() { let mut conn = TestConn::default(); diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs index 111044996..e10efd74a 100644 --- a/db/src/internal_types.rs +++ b/db/src/internal_types.rs @@ -35,6 +35,7 @@ use mentat_tx::entities::{ #[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub enum Term { AddOrRetract(OpType, E, Entid, V), + RetractEntity(E), } use self::Either::*; @@ -68,7 +69,9 @@ impl TermWithTempIds { pub(crate) fn unwrap(self) -> TermWithoutTempIds { match self { Term::AddOrRetract(op, Left(n), a, Left(v)) => Term::AddOrRetract(op, n, a, v), - _ => unreachable!(), + Term::AddOrRetract(_, _, _, _) => unreachable!(), + Term::RetractEntity(Left(e)) => Term::RetractEntity(e), + Term::RetractEntity(_) => unreachable!(), } } } diff --git a/db/src/tx.rs b/db/src/tx.rs index a08128f22..7d741a8d6 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -381,6 +381,11 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } }, + Entity::RetractEntity(e) => { + let e = in_process.entity_e_into_term_e(e.into())?; + terms.push(Term::RetractEntity(e)); + }, + Entity::AddOrRetract { op, e, a, v } => { if let Some(reversed_a) = a.unreversed() { let reversed_e = in_process.entity_v_into_term_e(v, &a)?; @@ -519,6 +524,10 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; Ok(Term::AddOrRetract(op, e, a, v)) }, + Term::RetractEntity(e) => { + let e = replace_lookup_ref(&lookup_ref_map, e, |x| KnownEntid(x))?; + Ok(Term::RetractEntity(e)) + }, } }).collect::>>() } @@ -629,6 +638,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // Assertions that are :db.cardinality/many and :db.fulltext. let mut fts_many: Vec = vec![]; + // :db/retractEntity entities. + let mut retract_entities: Vec = vec![]; + // We need to ensure that callers can't blindly transact entities that haven't been // allocated by this store. @@ -677,6 +689,12 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { (true, true) => fts_many.push(reduced), } }, + + Term::RetractEntity(KnownEntid(e)) => { + // TODO: Might update metadata? + // TODO: Watching/counting datoms? + retract_entities.push(e); + }, } } @@ -705,6 +723,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?; } + self.store.insert_retract_entities(&retract_entities[..])?; + self.store.commit_transaction(self.tx_id)?; } diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs index da44edc94..9abc29e5b 100644 --- a/db/src/upsert_resolution.rs +++ b/db/src/upsert_resolution.rs @@ -124,6 +124,9 @@ impl Generation { Term::AddOrRetract(op, Left(e), a, Left(v)) => { inert.push(Term::AddOrRetract(op, Left(e), a, Left(v))); }, + t @ Term::RetractEntity(_) => { + inert.push(t); + }, } } @@ -189,6 +192,7 @@ impl Generation { } }, Term::AddOrRetract(_, Left(_), _, Left(_)) => unreachable!(), + Term::RetractEntity(_) => unreachable!(), // This is a coding error -- these should not be in allocations. } } @@ -233,6 +237,7 @@ impl Generation { // [:db/retract ...] entities never allocate entids; they have to resolve due to // other upserts (or they fail the transaction). }, + &Term::RetractEntity(_) => unreachable!(), // This is a coding error -- these should not be in allocations. } } @@ -275,6 +280,7 @@ impl Generation { } }, Term::AddOrRetract(_, Left(_), _, Left(_)) => unreachable!(), // This is a coding error -- these should not be in allocations. + Term::RetractEntity(_) => unreachable!(), // This is a coding error -- these should not be in allocations. }; populations.allocated.push(allocated); } diff --git a/src/entity_builder.rs b/src/entity_builder.rs index 5cac5d340..cfa3b7b72 100644 --- a/src/entity_builder.rs +++ b/src/entity_builder.rs @@ -111,6 +111,7 @@ pub trait BuildTerms where Self: Sized { fn retract(&mut self, e: E, a: KnownEntid, v: V) -> Result<()> where E: IntoThing>, V: IntoThing>; + fn retract_entity(&mut self, e: KnownEntid) -> Result<()>; } impl BuildTerms for TermBuilder { @@ -147,6 +148,11 @@ impl BuildTerms for TermBuilder { self.terms.push(Term::AddOrRetract(OpType::Retract, e, a.into(), v)); Ok(()) } + + fn retract_entity(&mut self, e: KnownEntid) -> Result<()> { + self.terms.push(Term::RetractEntity(Either::Left(e.into()))); + Ok(()) + } } impl TermBuilder { @@ -253,6 +259,10 @@ impl<'a, 'c> BuildTerms for InProgressBuilder<'a, 'c> { V: IntoThing> { self.builder.retract(e, a, v) } + + fn retract_entity(&mut self, e: KnownEntid) -> Result<()> { + self.builder.retract_entity(e) + } } impl<'a, 'c> InProgressBuilder<'a, 'c> { diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 111b5e653..899ff70cc 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -34,6 +34,7 @@ use combine::{ use mentat_tx::entities::{ AtomOrLookupRefOrVectorOrMapNotation, Entid, + EntidOrLookupRef, EntidOrLookupRefOrTempId, Entity, LookupRef, @@ -88,6 +89,11 @@ def_parser!(Tx, lookup_ref, LookupRef, { .map(|(a, v)| LookupRef { a: a, v: v.clone().without_spans() })) }); +def_parser!(Tx, entid_or_lookup_ref, EntidOrLookupRef, { + Tx::entid().map(EntidOrLookupRef::Entid) + .or(Tx::lookup_ref().map(EntidOrLookupRef::LookupRef)) +}); + def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, { Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId) .or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid)) @@ -127,8 +133,7 @@ def_matches_namespaced_keyword!(Tx, literal_db_add, "db", "add"); def_matches_namespaced_keyword!(Tx, literal_db_retract, "db", "retract"); def_parser!(Tx, add_or_retract, Entity, { - vector().of_exactly( - // TODO: This commits as soon as it sees :db/{add,retract}, but could use an improved error message. + try(vector().of_exactly( (Tx::literal_db_add().map(|_| OpType::Add).or(Tx::literal_db_retract().map(|_| OpType::Retract)), try((Tx::entid_or_lookup_ref_or_temp_id(), Tx::forward_entid(), @@ -144,7 +149,14 @@ def_parser!(Tx, add_or_retract, Entity, { a: a, v: v, } - })) + }))) +}); + +def_matches_namespaced_keyword!(Tx, literal_db_retract_entity, "db", "retractEntity"); + +def_parser!(Tx, retract_entity, Entity, { + try(vector().of_exactly( + (Tx::literal_db_retract_entity(), Tx::entid_or_lookup_ref()).map(|(_, e)| Entity::RetractEntity(e)))) }); def_parser!(Tx, map_notation, MapNotation, { @@ -157,6 +169,7 @@ def_parser!(Tx, map_notation, MapNotation, { def_parser!(Tx, entity, Entity, { Tx::add_or_retract() + .or(Tx::retract_entity()) .or(Tx::map_notation().map(Entity::MapNotation)) }); @@ -278,6 +291,52 @@ mod tests { })); } + + #[test] + fn test_retract_entity() { + let input = Value::Vector(vec![kw("db", "retractEntity"), + Value::Integer(101)]); + + let input = input.with_spans(); + let stream = input.atom_stream(); + let result = Tx::entity().parse(stream).map(|x| x.0); + + assert_eq!(result, + Ok(Entity::RetractEntity(EntidOrLookupRef::Entid(Entid::Entid(101))))); + } + + #[test] + fn test_retract_entity_kw() { + let input = Value::Vector(vec![kw("db", "retractEntity"), + kw("known", "ident")]); + + let input = input.with_spans(); + let stream = input.atom_stream(); + let result = Tx::entity().parse(stream).map(|x| x.0); + + assert_eq!(result, + Ok(Entity::RetractEntity(EntidOrLookupRef::Entid(Entid::Ident(NamespacedKeyword::new("known", "ident")))))); + } + + #[test] + fn test_retract_entity_lookup_ref() { + let input = Value::Vector(vec![kw("db", "retractEntity"), + Value::List(vec![Value::PlainSymbol(PlainSymbol::new("lookup-ref")), + kw("test", "a1"), + Value::Text("v1".into())].into_iter().collect()), + ]); + + let input = input.with_spans(); + let stream = input.atom_stream(); + let result = Tx::entity().parse(stream).map(|x| x.0); + + assert_eq!(result, + Ok(Entity::RetractEntity(EntidOrLookupRef::LookupRef(LookupRef { + a: Entid::Ident(NamespacedKeyword::new("test", "a1")), + v: Value::Text("v1".into()), + })))); + } + #[test] fn test_lookup_ref() { let input = Value::Vector(vec![kw("db", "add"), diff --git a/tx/src/entities.rs b/tx/src/entities.rs index e37814ff3..314b519bd 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -79,6 +79,12 @@ pub enum AtomOrLookupRefOrVectorOrMapNotation { MapNotation(MapNotation), } +#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] +pub enum EntidOrLookupRef { + Entid(Entid), + LookupRef(LookupRef), +} + #[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub enum EntidOrLookupRefOrTempId { Entid(Entid), @@ -86,6 +92,15 @@ pub enum EntidOrLookupRefOrTempId { TempId(TempId), } +impl From for EntidOrLookupRefOrTempId { + fn from(k: EntidOrLookupRef) -> EntidOrLookupRefOrTempId { + match k { + EntidOrLookupRef::Entid(x) => EntidOrLookupRefOrTempId::Entid(x), + EntidOrLookupRef::LookupRef(x) => EntidOrLookupRefOrTempId::LookupRef(x), + } + } +} + #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub enum OpType { Add, @@ -101,6 +116,8 @@ pub enum Entity { a: Entid, v: AtomOrLookupRefOrVectorOrMapNotation, }, + // Like [:db/retractEntity e]. + RetractEntity(EntidOrLookupRef), // Like {:db/id "tempid" a1 v1 a2 v2}. MapNotation(MapNotation), }