Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ The following details how each provenance model affects capturing and querying:
| `polynomial` | [Todd J. Green, Grigoris Karvounarakis, and Val Tannen. 2007. Provenance semirings. In Proceedings of the twenty-sixth ACM SIGMOD-SIGACT-SIGART symposium on Principles of database systems (PODS '07). Association for Computing Machinery, New York, NY, USA, 31–40. https://doi.org/10.1145/1265530.1265535](https://web.cs.ucdavis.edu/~green/papers/pods07.pdf) | Polynomial (referring to Provenance Polynomials) captures the same data as Lineage and only differs in querying. | Polynomial returns how each out_index was created in the form of a polynomial. Items multiplied together must co-occur, and those added together can either occur in order for the out_index to exist. In general, joins produce multiplication and aggregations product addition. |
| `ksemimodule` | [Yael Amsterdamer, Daniel Deutch, and Val Tannen. 2011. Provenance for aggregate queries. In Proceedings of the thirtieth ACM SIGMOD-SIGACT-SIGART symposium on Principles of database systems (PODS '11). Association for Computing Machinery, New York, NY, USA, 153–164. https://doi.org/10.1145/1989284.1989302](https://arxiv.org/pdf/1101.1110.pdf) | K-Semimodule captures the same data as Lineage, and also captures all query intermediates that are materialized before an aggregation. | Querying K-Semimodule lineage results in a partial recalculation of a base query aggregate over whichever input or output tuples are selected. We're expanding the set of aggregate functions and complexity of queries that are supported. |

Note that all provenance models turn off DuckDB short-circuiting to ensure complete lineage is captured. In particular, semi and anti joins are turned off within the optimizer, since otherwise not all input indexes that match the join condition would be considered. Let us know if you're interested in a provenance capture mode that supports short-circuiting.

## Data Import
For CSV files and Parquet files, data import is as simple as referencing the file in the FROM clause:

Expand Down
181 changes: 178 additions & 3 deletions src/execution/lineage/lineage_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,184 @@
#ifdef LINEAGE
#include "duckdb/execution/lineage/lineage_manager.hpp"

#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
#include "duckdb/execution/operator/helper/physical_execute.hpp"
#include "duckdb/execution/operator/join/physical_delim_join.hpp"
#include "duckdb/execution/operator/join/physical_join.hpp"
#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/parser/parsed_data/create_table_info.hpp"
#include "duckdb/parser/statement/create_statement.hpp"
#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
#include "duckdb/execution/operator/join/physical_delim_join.hpp"
#include "duckdb/execution/operator/helper/physical_execute.hpp"
#include <utility>

namespace duckdb {
class PhysicalDelimJoin;

struct Projections {
string in_index;
string alias;
string orig_table_name;
};

struct Join {
string left_table;
string right_table;
bool is_agg;
JoinType join_type;
};

struct Query {
vector<struct Projections> proj;
vector<struct Join> from;
string table_name;
bool is_agg_child;
};

vector<string> join_side = {".lhs_index", ".rhs_index"};

struct Query GetEndToEndQuery(PhysicalOperator* op, idx_t qid,
string parent_join_cond, JoinType join_type) {
if (op->type == PhysicalOperatorType::PROJECTION) {
return GetEndToEndQuery(op->children[0].get(), qid, parent_join_cond, join_type);
}

Query Q;

bool is_agg = false;
Q.is_agg_child = false;
if (op->type == PhysicalOperatorType::UNGROUPED_AGGREGATE) {
is_agg = true;
Q.is_agg_child = true;
// find a child that is not projection
PhysicalOperator* c = op->children[0].get();
while (c->type == PhysicalOperatorType::PROJECTION) {
c = op->children[0].get();
}
op = c;
}

// Example: LINEAGE_1_HASH_JOIN_3_0
string table_name = "LINEAGE_" + to_string(qid) + "_"
+ op->GetName() + "_0";
if (parent_join_cond.empty() ) // root
Q.from.push_back({"", table_name, is_agg, join_type});
else
Q.from.push_back({parent_join_cond, table_name, is_agg, join_type});
Q.table_name = table_name;
JoinType parent_join_type = JoinType::INNER;
if (op->type == PhysicalOperatorType::HASH_JOIN ||
op->type == PhysicalOperatorType::BLOCKWISE_NL_JOIN ||
op->type == PhysicalOperatorType::NESTED_LOOP_JOIN ||
op->type == PhysicalOperatorType::PIECEWISE_MERGE_JOIN) {
parent_join_type = dynamic_cast<PhysicalJoin*>(op)->join_type;
}
for (idx_t i = 0; i < op->children.size(); i++) {
string join_cond = table_name;
if (op->children.size() > 1) {
join_cond += join_side[i];
} else {
join_cond += ".in_index";
}

struct Query child = GetEndToEndQuery(op->children[i].get(), qid, join_cond, parent_join_type);
Q.proj.insert( Q.proj.end(), child.proj.begin(), child.proj.end() );
Q.from.insert( Q.from.end(), child.from.begin(), child.from.end() );
}

if (op->type == PhysicalOperatorType::TABLE_SCAN) {
string tname = op->lineage_op->table_name;
Q.proj.push_back({table_name+".in_index", tname, tname});
}

return Q;
}

string from_prefix(string model) {
if (model == "polynomial") return "string_agg(";
else if (model == "why") return "list([";
else return "";
}

string from_suffix(string model) {
if (model == "polynomial") return ", '+') AS prov";
else if (model == "why") return "]) AS prov";
else return "";
}

string query_suffix(string model, string out_index) {
if (model == "polynomial" || model == "why") return " GROUP BY " + out_index;
else return "";
}

string visit_from(string model, vector<struct Projections>& proj) {
string from = "";
if (model == "polynomial") {
for (idx_t i = 0; i < proj.size(); i++) {
if (i > 0) {
from += "|| '*' ||";
}
from += proj[i].in_index;
}
} else if (model == "why") {
for (idx_t i=0; i < proj.size(); i++) {
if (i > 0) {
from += ",";
}
from += proj[i].in_index;
}
} else {
for (idx_t i=0; i < proj.size(); i++) {
if (i > 0) {
from += ",";
}
from += proj[i].in_index + " AS " + proj[i].alias;
}
}
return from;
}

string LineageManager::Lineage(string model, idx_t qid) {
PhysicalOperator* op = queryid_to_plan[qid].get();
struct Query qobj= GetEndToEndQuery(op, qid, "", JoinType::INNER);
string query = "SELECT " + from_prefix(model);
query += visit_from(model, qobj.proj);
query += from_suffix(model);
string out_index;
if (qobj.is_agg_child) {
out_index = "0 as out_index";
} else {
out_index = qobj.table_name + ".out_index";
}

query += ", " + out_index + " FROM ";

for (idx_t i=0; i < qobj.from.size(); i++) {
if (i > 0) {
if (qobj.from[i].join_type != JoinType::INNER) {
query += " LEFT ";
}

query += " JOIN " + qobj.from[i].right_table + " ON "
+ qobj.from[i].left_table + " = ";
if (qobj.from[i].is_agg) {
query += "0";
} else {
query += qobj.from[i].right_table + ".out_index";
}
} else {
query += qobj.from[i].right_table;
}

}

if (!qobj.is_agg_child) {
query += query_suffix(model, out_index) + " order by " + out_index;
}

return query;
}

void LineageManager::CreateOperatorLineage(ClientContext &context,
PhysicalOperator *op,
bool trace_lineage) {
Expand Down Expand Up @@ -130,5 +295,15 @@ bool LineageManager::CheckIfShouldPersistForKSemimodule(PhysicalOperator *op) {
return persist_k_semimodule && op->child_of_aggregate;
}

void LineageManager::PostProcess(PhysicalOperator *op) {
// massage the data to make it easier to query
// for hash join, build hash table on the build side that map the address to id
// for group by, build hash table on the unique groups
if (op->lineage_op) {
op->lineage_op->PostProcess();
}
}


} // namespace duckdb
#endif
Loading