|
12 | 12 |
|
13 | 13 | from runtype import dataclass |
14 | 14 |
|
| 15 | +from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled |
| 16 | + |
| 17 | + |
15 | 18 | from .sql import Select, Checksum, Compare, DbPath, DbKey, DbTime, Count, TableName, Time, Value |
16 | 19 | from .utils import CaseAwareMapping, CaseInsensitiveDict, safezip, split_space, CaseSensitiveDict, ArithString |
17 | 20 | from .databases.base import Database |
@@ -257,6 +260,11 @@ def query_key_range(self) -> Tuple[int, int]: |
257 | 260 | def is_bounded(self): |
258 | 261 | return self.min_key is not None and self.max_key is not None |
259 | 262 |
|
| 263 | + def approximate_size(self): |
| 264 | + if not self.is_bounded: |
| 265 | + raise RuntimeError("Cannot approximate the size of an unbounded segment. Must have min_key and max_key.") |
| 266 | + return self.max_key - self.min_key |
| 267 | + |
260 | 268 |
|
261 | 269 | def diff_sets(a: set, b: set) -> Iterator: |
262 | 270 | s1 = set(a) |
@@ -322,45 +330,72 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult: |
322 | 330 | if self.bisection_factor < 2: |
323 | 331 | raise ValueError("Must have at least two segments per iteration (i.e. bisection_factor >= 2)") |
324 | 332 |
|
325 | | - # Query and validate schema |
326 | | - table1, table2 = self._threaded_call("with_schema", [table1, table2]) |
327 | | - self._validate_and_adjust_columns(table1, table2) |
| 333 | + if is_tracking_enabled(): |
| 334 | + options = dict(self) |
| 335 | + event_json = create_start_event_json(options) |
| 336 | + send_event_json(event_json) |
328 | 337 |
|
329 | | - key_type = table1._schema[table1.key_column] |
330 | | - key_type2 = table2._schema[table2.key_column] |
331 | | - if not isinstance(key_type, IKey): |
332 | | - raise NotImplementedError(f"Cannot use column of type {key_type} as a key") |
333 | | - if not isinstance(key_type2, IKey): |
334 | | - raise NotImplementedError(f"Cannot use column of type {key_type2} as a key") |
335 | | - assert key_type.python_type is key_type2.python_type |
336 | | - |
337 | | - # Query min/max values |
338 | | - key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) |
| 338 | + self.stats["diff_count"] = 0 |
| 339 | + start = time.time() |
| 340 | + try: |
339 | 341 |
|
340 | | - # Start with the first completed value, so we don't waste time waiting |
341 | | - min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 342 | + # Query and validate schema |
| 343 | + table1, table2 = self._threaded_call("with_schema", [table1, table2]) |
| 344 | + self._validate_and_adjust_columns(table1, table2) |
342 | 345 |
|
343 | | - table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
| 346 | + key_type = table1._schema[table1.key_column] |
| 347 | + key_type2 = table2._schema[table2.key_column] |
| 348 | + if not isinstance(key_type, IKey): |
| 349 | + raise NotImplementedError(f"Cannot use column of type {key_type} as a key") |
| 350 | + if not isinstance(key_type2, IKey): |
| 351 | + raise NotImplementedError(f"Cannot use column of type {key_type2} as a key") |
| 352 | + assert key_type.python_type is key_type2.python_type |
344 | 353 |
|
345 | | - logger.info( |
346 | | - f"Diffing tables | segments: {self.bisection_factor}, bisection threshold: {self.bisection_threshold}. " |
347 | | - f"key-range: {table1.min_key}..{table2.max_key}, " |
348 | | - f"size: {table2.max_key-table1.min_key}" |
349 | | - ) |
| 354 | + # Query min/max values |
| 355 | + key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) |
350 | 356 |
|
351 | | - # Bisect (split) the table into segments, and diff them recursively. |
352 | | - yield from self._bisect_and_diff_tables(table1, table2) |
| 357 | + # Start with the first completed value, so we don't waste time waiting |
| 358 | + min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) |
353 | 359 |
|
354 | | - # Now we check for the second min-max, to diff the portions we "missed". |
355 | | - min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 360 | + table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] |
356 | 361 |
|
357 | | - if min_key2 < min_key1: |
358 | | - pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] |
359 | | - yield from self._bisect_and_diff_tables(*pre_tables) |
| 362 | + logger.info( |
| 363 | + f"Diffing tables | segments: {self.bisection_factor}, bisection threshold: {self.bisection_threshold}. " |
| 364 | + f"key-range: {table1.min_key}..{table2.max_key}, " |
| 365 | + f"size: {table1.approximate_size()}" |
| 366 | + ) |
360 | 367 |
|
361 | | - if max_key2 > max_key1: |
362 | | - post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)] |
363 | | - yield from self._bisect_and_diff_tables(*post_tables) |
| 368 | + # Bisect (split) the table into segments, and diff them recursively. |
| 369 | + yield from self._bisect_and_diff_tables(table1, table2) |
| 370 | + |
| 371 | + # Now we check for the second min-max, to diff the portions we "missed". |
| 372 | + min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) |
| 373 | + |
| 374 | + if min_key2 < min_key1: |
| 375 | + pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] |
| 376 | + yield from self._bisect_and_diff_tables(*pre_tables) |
| 377 | + |
| 378 | + if max_key2 > max_key1: |
| 379 | + post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)] |
| 380 | + yield from self._bisect_and_diff_tables(*post_tables) |
| 381 | + |
| 382 | + error = None |
| 383 | + except BaseException as e: # Catch KeyboardInterrupt too |
| 384 | + error = e |
| 385 | + finally: |
| 386 | + if is_tracking_enabled(): |
| 387 | + runtime = time.time() - start |
| 388 | + table1_count = self.stats.get("table1_count") |
| 389 | + table2_count = self.stats.get("table2_count") |
| 390 | + diff_count = self.stats.get("diff_count") |
| 391 | + err_message = str(error)[:20] # Truncate possibly sensitive information. |
| 392 | + event_json = create_end_event_json( |
| 393 | + error is None, runtime, table1.database.name, table2.database.name, table1_count, table2_count, diff_count, err_message |
| 394 | + ) |
| 395 | + send_event_json(event_json) |
| 396 | + |
| 397 | + if error: |
| 398 | + raise error |
364 | 399 |
|
365 | 400 | def _parse_key_range_result(self, key_type, key_range): |
366 | 401 | mn, mx = key_range |
@@ -438,6 +473,8 @@ def _bisect_and_diff_tables(self, table1, table2, level=0, max_rows=None): |
438 | 473 | self.stats["table1_count"] = len(rows1) |
439 | 474 | self.stats["table2_count"] = len(rows2) |
440 | 475 |
|
| 476 | + self.stats["diff_count"] += len(diff) |
| 477 | + |
441 | 478 | logger.info(". " * level + f"Diff found {len(diff)} different rows.") |
442 | 479 | self.stats["rows_downloaded"] = self.stats.get("rows_downloaded", 0) + max(len(rows1), len(rows2)) |
443 | 480 | yield from diff |
|
0 commit comments