Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 7444053

Browse files
authored
Merge branch 'master' into version-tracking
2 parents a9e3555 + 92c6274 commit 7444053

File tree

10 files changed

+64
-37
lines changed

10 files changed

+64
-37
lines changed

data_diff/__init__.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
from .sqeleton.abcs import DbKey, DbTime, DbPath
66
from .diff_tables import Algorithm
77
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
8-
from .joindiff_tables import JoinDiffer
8+
from .joindiff_tables import JoinDiffer, TABLE_WRITE_LIMIT
99
from .table_segment import TableSegment
10+
from .utils import eval_name_template
1011

1112
def connect_to_table(
1213
db_info: Union[str, dict],
@@ -53,17 +54,27 @@ def diff_tables(
5354
# Start/end update_column values, used to restrict the segment
5455
min_update: DbTime = None,
5556
max_update: DbTime = None,
56-
# Algorithm
57-
algorithm: Algorithm = Algorithm.HASHDIFF,
58-
# Into how many segments to bisect per iteration (hashdiff only)
59-
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
60-
# When should we stop bisecting and compare locally (in row count; hashdiff only)
61-
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
6257
# Enable/disable threaded diffing. Needed to take advantage of database threads.
6358
threaded: bool = True,
6459
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
6560
# There may be many pools, so number of actual threads can be a lot higher.
6661
max_threadpool_size: Optional[int] = 1,
62+
# Algorithm
63+
algorithm: Algorithm = Algorithm.AUTO,
64+
# Into how many segments to bisect per iteration (hashdiff only)
65+
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
66+
# When should we stop bisecting and compare locally (in row count; hashdiff only)
67+
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
68+
# Enable/disable validating that the key columns are unique. (joindiff only)
69+
validate_unique_key: bool = True,
70+
# Enable/disable sampling of exclusive rows. Creates a temporary table. (joindiff only)
71+
sample_exclusive_rows: bool = False,
72+
# Path of new table to write diff results to. Disabled if not provided. (joindiff only)
73+
materialize_to_table: Union[str, DbPath] = None,
74+
# Materialize every row, not just those that are different. (joindiff only)
75+
materialize_all_rows: bool = False,
76+
# Maximum number of rows to write when materializing, per thread. (joindiff only)
77+
table_write_limit: int = TABLE_WRITE_LIMIT,
6778
) -> Iterator:
6879
"""Finds the diff between table1 and table2.
6980
@@ -76,14 +87,21 @@ def diff_tables(
7687
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
7788
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
7889
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
79-
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
80-
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
81-
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
82-
and compare locally. (Used when algorithm is `HASHDIFF`).
8390
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
8491
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
8592
Only relevant when `threaded` is ``True``.
8693
There may be many pools, so number of actual threads can be a lot higher.
94+
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`. Default=`AUTO`)
95+
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
96+
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
97+
and compare locally. (Used when algorithm is `HASHDIFF`).
98+
validate_unique_key (bool): Enable/disable validating that the key columns are unique. (used for `JOINDIFF`. default: True)
99+
Single query, and can't be threaded, so it's very slow on non-cloud dbs.
100+
Future versions will detect UNIQUE constraints in the schema.
101+
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table. (used for `JOINDIFF`. default: False)
102+
materialize_to_table (Union[str, DbPath], optional): Path of new table to write diff results to. Disabled if not provided. Used for `JOINDIFF`.
103+
materialize_all_rows (bool): Materialize every row, not just those that are different. (used for `JOINDIFF`. default: False)
104+
table_write_limit (int): Maximum number of rows to write when materializing, per thread.
87105
88106
Note:
89107
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
@@ -123,6 +141,9 @@ def diff_tables(
123141
segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables
124142

125143
algorithm = Algorithm(algorithm)
144+
if algorithm == Algorithm.AUTO:
145+
algorithm = Algorithm.JOINDIFF if table1.database is table2.database else Algorithm.HASHDIFF
146+
126147
if algorithm == Algorithm.HASHDIFF:
127148
differ = HashDiffer(
128149
bisection_factor=bisection_factor,
@@ -131,9 +152,16 @@ def diff_tables(
131152
max_threadpool_size=max_threadpool_size,
132153
)
133154
elif algorithm == Algorithm.JOINDIFF:
155+
if isinstance(materialize_to_table, str):
156+
materialize_to_table = table1.database.parse_table_name(eval_name_template(materialize_to_table))
134157
differ = JoinDiffer(
135158
threaded=threaded,
136159
max_threadpool_size=max_threadpool_size,
160+
validate_unique_key=validate_unique_key,
161+
sample_exclusive_rows=sample_exclusive_rows,
162+
materialize_to_table=materialize_to_table,
163+
materialize_all_rows=materialize_all_rows,
164+
table_write_limit=table_write_limit,
137165
)
138166
else:
139167
raise ValueError(f"Unknown algorithm: {algorithm}")

data_diff/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def write_usage(self, prog: str, args: str = "", prefix: Optional[str] = None) -
123123
)
124124
@click.option(
125125
"-m",
126-
"--materialize",
126+
"--materialize-to-table",
127127
default=None,
128128
metavar="TABLE_NAME",
129129
help="(joindiff only) Materialize the diff results into a new table in the database. If a table exists by that name, it will be replaced.",
@@ -248,7 +248,7 @@ def _main(
248248
sample_exclusive_rows,
249249
materialize_all_rows,
250250
table_write_limit,
251-
materialize,
251+
materialize_to_table,
252252
threads1=None,
253253
threads2=None,
254254
__conf__=None,
@@ -340,7 +340,7 @@ def _main(
340340
sample_exclusive_rows=sample_exclusive_rows,
341341
materialize_all_rows=materialize_all_rows,
342342
table_write_limit=table_write_limit,
343-
materialize_to_table=materialize and db1.parse_table_name(eval_name_template(materialize)),
343+
materialize_to_table=materialize_to_table and db1.parse_table_name(eval_name_template(materialize_to_table)),
344344
)
345345
else:
346346
assert algorithm == Algorithm.HASHDIFF

data_diff/joindiff_tables.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,18 @@ class JoinDiffer(TableDiffer):
123123
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
124124
Only relevant when `threaded` is ``True``.
125125
There may be many pools, so number of actual threads can be a lot higher.
126-
validate_unique_key (bool): Enable/disable validating that the key columns are unique.
127-
Single query, and can't be threaded, so it's very slow on non-cloud dbs.
128-
Future versions will detect UNIQUE constraints in the schema.
129-
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table.
126+
validate_unique_key (bool): Enable/disable validating that the key columns are unique. (default: True)
127+
If there are no UNIQUE constraints in the schema, it is done in a single query,
128+
and can't be threaded, so it's very slow on non-cloud dbs.
129+
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. (default: False)
130+
Creates a temporary table.
130131
materialize_to_table (DbPath, optional): Path of new table to write diff results to. Disabled if not provided.
132+
materialize_all_rows (bool): Materialize every row, not just those that are different. (default: False)
131133
table_write_limit (int): Maximum number of rows to write when materializing, per thread.
132134
"""
133135

134136
validate_unique_key: bool = True
135-
sample_exclusive_rows: bool = True
137+
sample_exclusive_rows: bool = False
136138
materialize_to_table: DbPath = None
137139
materialize_all_rows: bool = False
138140
table_write_limit: int = TABLE_WRITE_LIMIT

data_diff/sqeleton/databases/connect.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from .duckdb import DuckDB
2222

2323

24-
2524
@dataclass
2625
class MatchUriPath:
2726
database_cls: Type[Database]

data_diff/sqeleton/queries/ast_classes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,7 @@ class CurrentTimestamp(ExprNode):
786786
def compile(self, c: Compiler) -> str:
787787
return c.dialect.current_timestamp()
788788

789+
789790
# DDL
790791

791792

data_diff/sqeleton/queries/compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
cv_params = contextvars.ContextVar("params")
1313

14+
1415
class Root:
1516
"Nodes inheriting from Root can be used as root statements in SQL (e.g. SELECT yes, RANDOM() no)"
1617

@@ -38,6 +39,7 @@ def compile(self, elem, params=None) -> str:
3839

3940
if self.root and isinstance(elem, Compilable) and not isinstance(elem, Root):
4041
from .ast_classes import Select
42+
4143
elem = Select(columns=[elem])
4244

4345
res = self._compile(elem)

tests/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def get_git_revision_short_hash() -> str:
8787
_database_instances = {}
8888

8989

90-
def get_conn(cls: type, shared: bool =True) -> Database:
90+
def get_conn(cls: type, shared: bool = True) -> Database:
9191
if shared:
9292
if cls not in _database_instances:
9393
_database_instances[cls] = get_conn(cls, shared=False)
@@ -181,6 +181,7 @@ def _test_per_database(cls):
181181

182182
return _test_per_database
183183

184+
184185
def table_segment(database, table_path, key_columns, *args, **kw):
185186
if isinstance(key_columns, str):
186187
key_columns = (key_columns,)

tests/sqeleton/test_sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ def test_compile_int(self):
1818
self.assertEqual("1", self.compiler.compile(1))
1919

2020
def test_compile_table_name(self):
21-
self.assertEqual("`marine_mammals`.`walrus`", self.compiler.replace(root=False).compile(table("marine_mammals", "walrus")))
21+
self.assertEqual(
22+
"`marine_mammals`.`walrus`", self.compiler.replace(root=False).compile(table("marine_mammals", "walrus"))
23+
)
2224

2325
def test_compile_select(self):
2426
expected_sql = "SELECT name FROM `marine_mammals`.`walrus`"

tests/test_api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,14 @@ def test_api_get_stats_dict(self):
6767
"updated": 0,
6868
"unchanged": 4,
6969
"total": 1,
70-
"stats": {"rows_downloaded": 5},
70+
# "stats": {"rows_downloaded": 5},
7171
}
7272
t1 = connect_to_table(TEST_MYSQL_CONN_STRING, self.table_src_name)
7373
t2 = connect_to_table(TEST_MYSQL_CONN_STRING, self.table_dst_name)
7474
diff = diff_tables(t1, t2)
75-
output = diff.get_stats_dict()
7675

76+
output = diff.get_stats_dict()
77+
output.pop('stats')
7778
self.assertEqual(expected_dict, output)
7879
self.assertIsNotNone(diff)
7980
assert len(list(diff)) == 1

tests/test_diff_tables.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
test_each_database: Callable = test_each_database_in_list(TEST_DATABASES)
3232

3333

34-
3534
class TestUtils(unittest.TestCase):
3635
def test_split_space(self):
3736
for i in range(0, 10):
@@ -90,24 +89,16 @@ def test_basic(self):
9089
def test_offset(self):
9190
differ = HashDiffer(bisection_factor=2, bisection_threshold=10)
9291
sec1 = self.now.shift(seconds=-3).datetime
93-
a = table_segment(
94-
self.connection, self.table_src_path, "id", "datetime", max_update=sec1, case_sensitive=False
95-
)
96-
b = table_segment(
97-
self.connection, self.table_dst_path, "id", "datetime", max_update=sec1, case_sensitive=False
98-
)
92+
a = table_segment(self.connection, self.table_src_path, "id", "datetime", max_update=sec1, case_sensitive=False)
93+
b = table_segment(self.connection, self.table_dst_path, "id", "datetime", max_update=sec1, case_sensitive=False)
9994
assert a.count() == 4, a.count()
10095
assert b.count() == 3
10196

10297
assert not list(differ.diff_tables(a, a))
10398
self.assertEqual(len(list(differ.diff_tables(a, b))), 1)
10499

105-
a = table_segment(
106-
self.connection, self.table_src_path, "id", "datetime", min_update=sec1, case_sensitive=False
107-
)
108-
b = table_segment(
109-
self.connection, self.table_dst_path, "id", "datetime", min_update=sec1, case_sensitive=False
110-
)
100+
a = table_segment(self.connection, self.table_src_path, "id", "datetime", min_update=sec1, case_sensitive=False)
101+
b = table_segment(self.connection, self.table_dst_path, "id", "datetime", min_update=sec1, case_sensitive=False)
111102
assert a.count() == 2
112103
assert b.count() == 2
113104
assert not list(differ.diff_tables(a, b))

0 commit comments

Comments
 (0)