Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 948a87e

Browse files
authored
Merge branch 'master' into dx-821
2 parents fc7f266 + 533d88b commit 948a87e

File tree

7 files changed

+457
-19
lines changed

7 files changed

+457
-19
lines changed

data_diff/__main__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ def main(conf, run, **kw):
289289
project_dir_override=project_dir_override,
290290
is_cloud=kw["cloud"],
291291
dbt_selection=kw["select"],
292+
json_output=kw["json_output"],
292293
state=state,
293294
)
294295
else:

data_diff/dbt.py

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
import re
34
import time
@@ -17,6 +18,8 @@
1718
from . import connect_to_table, diff_tables, Algorithm
1819
from .cloud import DatafoldAPI, TCloudApiDataDiff, TCloudApiOrgMeta
1920
from .dbt_parser import DbtParser, TDatadiffConfig
21+
from .diff_tables import DiffResultWrapper
22+
from .format import jsonify, jsonify_error
2023
from .tracking import (
2124
bool_ask_for_email,
2225
create_email_signup_event_json,
@@ -54,13 +57,15 @@ class TDiffVars(pydantic.BaseModel):
5457
where_filter: Optional[str] = None
5558
include_columns: List[str]
5659
exclude_columns: List[str]
60+
dbt_model: Optional[str] = None
5761

5862

5963
def dbt_diff(
6064
profiles_dir_override: Optional[str] = None,
6165
project_dir_override: Optional[str] = None,
6266
is_cloud: bool = False,
6367
dbt_selection: Optional[str] = None,
68+
json_output: bool = False,
6469
state: Optional[str] = None,
6570
) -> None:
6671
print_version_info()
@@ -115,12 +120,25 @@ def dbt_diff(
115120
diff_thread = run_as_daemon(_cloud_diff, diff_vars, config.datasource_id, api, org_meta)
116121
diff_threads.append(diff_thread)
117122
else:
118-
_local_diff(diff_vars)
123+
_local_diff(diff_vars, json_output)
119124
else:
120-
rich.print(
121-
_diff_output_base(".".join(diff_vars.dev_path), ".".join(diff_vars.prod_path))
122-
+ "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n"
123-
)
125+
if json_output:
126+
print(
127+
json.dumps(
128+
jsonify_error(
129+
table1=diff_vars.prod_path,
130+
table2=diff_vars.dev_path,
131+
dbt_model=diff_vars.dbt_model,
132+
error="No primary key found. Add uniqueness tests, meta, or tags.",
133+
)
134+
),
135+
flush=True,
136+
)
137+
else:
138+
rich.print(
139+
_diff_output_base(".".join(diff_vars.dev_path), ".".join(diff_vars.prod_path))
140+
+ "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n"
141+
)
124142

125143
# wait for all threads
126144
if diff_threads:
@@ -155,6 +173,7 @@ def _get_diff_vars(
155173
datadiff_model_config = dbt_parser.get_datadiff_model_config(model.meta)
156174

157175
return TDiffVars(
176+
dbt_model=model.unique_id,
158177
dev_path=dev_qualified_list,
159178
prod_path=prod_qualified_list,
160179
primary_keys=primary_keys,
@@ -205,15 +224,15 @@ def _get_prod_path_from_manifest(model, prod_manifest) -> Union[Tuple[str, str],
205224
return prod_database, prod_schema
206225

207226

208-
def _local_diff(diff_vars: TDiffVars) -> None:
227+
def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None:
209228
dev_qualified_str = ".".join(diff_vars.dev_path)
210229
prod_qualified_str = ".".join(diff_vars.prod_path)
211230
diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str)
212231

213-
table1 = connect_to_table(diff_vars.connection, dev_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads)
214-
table2 = connect_to_table(
232+
table1 = connect_to_table(
215233
diff_vars.connection, prod_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads
216234
)
235+
table2 = connect_to_table(diff_vars.connection, dev_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads)
217236

218237
table1_columns = table1.get_schema()
219238
try:
@@ -228,11 +247,11 @@ def _local_diff(diff_vars: TDiffVars) -> None:
228247
table1_column_names = set(table1_columns.keys())
229248
table2_column_names = set(table2_columns.keys())
230249
column_set = table1_column_names.intersection(table2_column_names)
231-
columns_added = table1_column_names.difference(table2_column_names)
232-
columns_removed = table2_column_names.difference(table1_column_names)
250+
columns_added = table2_column_names.difference(table1_column_names)
251+
columns_removed = table1_column_names.difference(table2_column_names)
233252
# col type is i = 1 in tuple
234253
columns_type_changed = {
235-
k for k, v in table1_columns.items() if k in table2_columns and v[1] != table2_columns[k][1]
254+
k for k, v in table2_columns.items() if k in table1_columns and v[1] != table1_columns[k][1]
236255
}
237256

238257
if columns_added:
@@ -255,7 +274,7 @@ def _local_diff(diff_vars: TDiffVars) -> None:
255274

256275
extra_columns = tuple(column_set)
257276

258-
diff = diff_tables(
277+
diff: DiffResultWrapper = diff_tables(
259278
table1,
260279
table2,
261280
threaded=True,
@@ -264,6 +283,35 @@ def _local_diff(diff_vars: TDiffVars) -> None:
264283
where=diff_vars.where_filter,
265284
skip_null_keys=True,
266285
)
286+
if json_output:
287+
# drain the iterator to get accumulated stats in diff.info_tree
288+
try:
289+
list(diff)
290+
except Exception as e:
291+
print(
292+
json.dumps(
293+
jsonify_error(list(table1.table_path), list(table2.table_path), diff_vars.dbt_model, str(e))
294+
),
295+
flush=True,
296+
)
297+
return
298+
299+
print(
300+
json.dumps(
301+
jsonify(
302+
diff,
303+
dbt_model=diff_vars.dbt_model,
304+
with_summary=True,
305+
with_columns={
306+
"added": columns_added,
307+
"removed": columns_removed,
308+
"changed": columns_type_changed,
309+
},
310+
)
311+
),
312+
flush=True,
313+
)
314+
return
267315

268316
if list(diff):
269317
diff_output_str += f"{diff.get_stats_string(is_dbt=True)} \n"

0 commit comments

Comments
 (0)