Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions clouddq/classes/dq_row_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

from dataclasses import dataclass
from string import Template

from clouddq.utils import assert_not_none_or_empty

Expand Down Expand Up @@ -84,3 +85,16 @@ def dict_values(self: DqRowFilter) -> dict:
"""

return dict(self.to_dict().get(self.row_filter_id))


def resolve_sql_expr(self: DqRule, arguments: dict) -> None:
try:
self.filter_sql_expr = Template(self.filter_sql_expr).safe_substitute(
arguments
)
except Exception as e:
raise ValueError(
f"Failed to resolve row_filter_id '{self.row_filter_id}' in "
f"rule_binding_id '{self.rule_binding_id}' "
f"with error:\n{e}"
)
61 changes: 47 additions & 14 deletions clouddq/classes/dq_rule_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DqRuleBinding:
entity_id: str | None
entity_uri: EntityUri | None
column_id: str
row_filter_id: str
row_filter_ids: list
incremental_time_filter_column_id: str | None
rule_ids: list
reference_columns_id: str | None
Expand Down Expand Up @@ -95,13 +95,22 @@ def from_dict(
)
if column_id:
column_id.upper()
row_filter_id: str = get_from_dict_and_assert(
row_filter_config: dict = get_keys_from_dict_and_assert_oneof(
config_id=rule_binding_id,
kwargs=kwargs,
key="row_filter_id",
keys=["row_filter_id", "row_filter_ids"]
)
if row_filter_id:
row_filter_id.upper()
row_filter_ids = []
if "row_filter_id" in row_filter_config:
row_filter_ids.append(row_filter_config["row_filter_id"].upper())
if "row_filter_ids" in row_filter_config:
for row_filter in row_filter_config["row_filter_ids"]:
if type(row_filter) == str:
row_filter_ids.append(row_filter.upper())
if type(row_filter) == dict:
row_filter_ids.extend(
[id.upper() for id in row_filter]
)
rule_ids: list[str] = get_from_dict_and_assert(
config_id=rule_binding_id,
kwargs=kwargs,
Expand Down Expand Up @@ -130,7 +139,7 @@ def from_dict(
entity_id=entity_id,
entity_uri=entity_uri,
column_id=column_id,
row_filter_id=row_filter_id,
row_filter_ids=row_filter_ids,
incremental_time_filter_column_id=incremental_time_filter_column_id,
rule_ids=rule_ids,
reference_columns_id=reference_columns_id,
Expand All @@ -157,7 +166,7 @@ def to_dict(self: DqRuleBinding) -> dict:
"entity_id": self.entity_id,
"entity_uri": entity_uri,
"column_id": self.column_id,
"row_filter_id": self.row_filter_id,
"row_filter_ids": self.row_filter_ids,
"incremental_time_filter_column_id": self.incremental_time_filter_column_id, # noqa: E501
"rule_ids": self.rule_ids,
"reference_columns_id": self.reference_columns_id,
Expand Down Expand Up @@ -259,12 +268,35 @@ def resolve_rule_config_list(
)
return resolved_rule_config_list

def resolve_row_filter_config(
def resolve_row_filter_config_list(
self: DqRuleBinding,
configs_cache: dq_configs_cache.DqConfigsCache,
) -> DqRowFilter:
row_filter = configs_cache.get_row_filter_id(self.row_filter_id.upper())
return row_filter
) -> list[DqRowFilter]:
resolved_row_filter_config_list = []
for row_filter in self.row_filter_ids:
if type(row_filter) == dict:
if len(row_filter) > 1:
raise ValueError(
f"Rule Binding {self.rule_binding_id} has "
f"invalid configs in row_filter_ids. "
f"Each nested row_filter_id objects cannot "
f"have more than one row_filter_id. "
f"Current value: \n {row_filter}"
)
else:
row_filter_id = next(iter(row_filter))
arguments = row_filter[row_filter_id]
else:
row_filter_id = row_filter
arguments = None
row_filter_config = configs_cache.get_row_filter_id(row_filter_id)
row_filter_config.resolve_sql_expr(arguments)
resolved_row_filter_config_list.append(row_filter_config)
assert_not_none_or_empty(
resolved_row_filter_config_list,
"Rule Binding must have non-empty row_filter list.",
)
return resolved_row_filter_config_list

def resolve_reference_columns_config(
self: DqRuleBinding,
Expand Down Expand Up @@ -333,7 +365,8 @@ def resolve_all_configs_to_dict(
rule_config["rule_sql_expr"] = rule_sql_expr
rule_configs_dict[rule_id] = rule_config
# Resolve filter configs
row_filter_config = self.resolve_row_filter_config(configs_cache)
row_filters_configs = {row_filter.row_filter_id: row_filter.dict_values()
for row_filter in self.resolve_row_filter_config_list(configs_cache)}
# resolve reference columns config
if self.reference_columns_id:
include_all_reference_columns = False
Expand Down Expand Up @@ -370,8 +403,8 @@ def resolve_all_configs_to_dict(
"column_configs": dict(column_configs.dict_values()),
"rule_ids": list(self.rule_ids),
"rule_configs_dict": rule_configs_dict,
"row_filter_id": self.row_filter_id,
"row_filter_configs": dict(row_filter_config.dict_values()),
"row_filter_ids": self.row_filter_ids,
"row_filter_configs": row_filters_configs,
"incremental_time_filter_column": incremental_time_filter_column,
"metadata": self.metadata,
}
Expand Down
16 changes: 10 additions & 6 deletions clouddq/templates/dbt/macros/create_rule_binding_view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{%- macro create_rule_binding_view(configs, environment, dq_summary_table_name, metadata, configs_hashsum, progress_watermark, dq_summary_table_exists, high_watermark_value, current_timestamp_value) -%}
{% set rule_binding_id = configs.get('rule_binding_id') -%}
{% set rule_configs_dict = configs.get('rule_configs_dict') -%}
{% set filter_sql_expr = configs.get('row_filter_configs').get('filter_sql_expr') -%}
{% set row_filter_configs = configs.get('row_filter_configs') -%}
{% set column_name = configs.get('column_configs').get('name') -%}
{% set entity_configs = configs.get('entity_configs') -%}
{% set partition_fields = entity_configs.get('partition_fields')-%}
Expand Down Expand Up @@ -64,16 +64,20 @@ data AS (
CAST(d.{{ time_column_id }} AS TIMESTAMP)
BETWEEN CAST('{{ high_watermark_value }}' AS TIMESTAMP) AND CAST('{{ current_timestamp_value }}' AS TIMESTAMP)
AND
{{ filter_sql_expr }}
{% else %}
{%- else %}
WHERE
{{ filter_sql_expr }}
{% endif -%}
{%- endif %}
{%- for id, row_filter_config in row_filter_configs.items() %}
{{ row_filter_config.get('filter_sql_expr') -}}
{%- if loop.nextitem is defined %}
AND
{%- endif -%}
{%- endfor %}
{%- if partition_fields %}
{% for field in partition_fields %}
AND {{ field['name'] }} IS NOT NULL
{%- endfor -%}
{% endif -%}
{%- endif -%}
),
last_mod AS (
SELECT
Expand Down
4 changes: 3 additions & 1 deletion configs/row_filters/row-filters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ row_filters:
True

DATA_TYPE_EMAIL:
params: |-
column
filter_sql_expr: |-
contact_type = 'email'
$column = 'email'
3 changes: 2 additions & 1 deletion configs/rule_bindings/team-1-rule-bindings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ rule_bindings:
T1_DQ_1_VALUE_NOT_NULL:
entity_id: TEST_TABLE
column_id: VALUE
row_filter_id: NONE
row_filter_ids:
- NONE
reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS
rule_ids:
- NOT_NULL_SIMPLE
4 changes: 3 additions & 1 deletion configs/rule_bindings/team-2-rule-bindings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ rule_bindings:
T2_DQ_1_EMAIL:
entity_id: TEST_TABLE
column_id: VALUE
row_filter_id: DATA_TYPE_EMAIL
row_filter_ids:
- DATA_TYPE_EMAIL:
column: "contact_type"
reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS
rule_ids:
- NOT_NULL_SIMPLE
Expand Down
4 changes: 3 additions & 1 deletion configs/rule_bindings/team-3-rule-bindings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ rule_bindings:
T3_DQ_1_EMAIL_DUPLICATE:
entity_id: TEST_TABLE
column_id: VALUE
row_filter_id: DATA_TYPE_EMAIL
row_filter_ids:
- DATA_TYPE_EMAIL:
column: "contact_type"
reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS
# incremental_time_filter_column_id: TS
rule_ids:
Expand Down