|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import logging |
| 4 | +import os |
4 | 5 | import typing as t |
5 | 6 | from contextlib import contextmanager |
6 | 7 | from functools import partial |
|
45 | 46 | logger = logging.getLogger(__name__) |
46 | 47 |
|
47 | 48 |
|
| 49 | +def _optimizer_trace_enabled(model_fqn: t.Optional[str]) -> bool: |
| 50 | + """Enable verbose optimizer tracing for selected model(s). |
| 51 | +
|
| 52 | + Controlled via env vars: |
| 53 | + - SQLMESH_OPTIMIZER_TRACE: truthy enables tracing |
| 54 | + - SQLMESH_OPTIMIZER_TRACE_MODEL: optional comma-separated substrings to match model fqn(s) |
| 55 | + """ |
| 56 | + from sqlmesh.utils import str_to_bool |
| 57 | + |
| 58 | + if not str_to_bool(os.environ.get("SQLMESH_OPTIMIZER_TRACE")): |
| 59 | + return False |
| 60 | + |
| 61 | + selector = (os.environ.get("SQLMESH_OPTIMIZER_TRACE_MODEL") or "").strip() |
| 62 | + if not selector: |
| 63 | + return True |
| 64 | + |
| 65 | + if not model_fqn: |
| 66 | + return False |
| 67 | + |
| 68 | + patterns = [p.strip() for p in selector.split(",") if p.strip()] |
| 69 | + return any(p in model_fqn for p in patterns) |
| 70 | + |
| 71 | + |
48 | 72 | class BaseExpressionRenderer: |
49 | 73 | def __init__( |
50 | 74 | self, |
@@ -652,39 +676,96 @@ def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query: |
652 | 676 |
|
653 | 677 | # We don't want to normalize names in the schema because that's handled by the optimizer |
654 | 678 | original = query |
| 679 | + trace_enabled = _optimizer_trace_enabled(self._model_fqn) |
655 | 680 | missing_deps = set() |
656 | 681 | all_deps = all_deps - {self._model_fqn} |
657 | 682 | should_optimize = not self.schema.empty or not all_deps |
| 683 | + stage = "precheck" |
658 | 684 |
|
659 | 685 | for dep in all_deps: |
660 | 686 | if not self.schema.find(exp.to_table(dep)): |
661 | 687 | should_optimize = False |
662 | 688 | missing_deps.add(dep) |
663 | 689 |
|
| 690 | + if trace_enabled and not should_optimize: |
| 691 | + logger.info( |
| 692 | + "OPTIMIZER TRACE skip model=%s missing_deps=%s", |
| 693 | + self._model_fqn, |
| 694 | + sorted(missing_deps), |
| 695 | + ) |
| 696 | + |
664 | 697 | if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects): |
665 | 698 | deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps)) |
666 | 699 | self._violated_rules[InvalidSelectStarExpansion] = deps |
667 | 700 |
|
668 | 701 | try: |
669 | 702 | if should_optimize: |
| 703 | + if trace_enabled: |
| 704 | + logger.info( |
| 705 | + "OPTIMIZER TRACE model=%s dialect=%s default_catalog=%s deps=%s", |
| 706 | + self._model_fqn, |
| 707 | + self._dialect, |
| 708 | + self._default_catalog, |
| 709 | + sorted(all_deps), |
| 710 | + ) |
| 711 | + for dep in sorted(all_deps): |
| 712 | + logger.info( |
| 713 | + "OPTIMIZER TRACE dep_schema dep=%s schema=%s", |
| 714 | + dep, |
| 715 | + self.schema.find(exp.to_table(dep)), |
| 716 | + ) |
| 717 | + logger.info( |
| 718 | + "OPTIMIZER TRACE stage=input sql=%s", |
| 719 | + original.sql(dialect=self._dialect, identify=True, comments=False), |
| 720 | + ) |
| 721 | + |
670 | 722 | query = query.copy() |
671 | | - simplify( |
672 | | - annotate_types( |
673 | | - qualify( |
674 | | - query, |
675 | | - dialect=self._dialect, |
676 | | - schema=self.schema, |
677 | | - infer_schema=False, |
678 | | - catalog=self._default_catalog, |
679 | | - quote_identifiers=self._quote_identifiers, |
680 | | - ), |
681 | | - schema=self.schema, |
682 | | - dialect=self._dialect, |
683 | | - ), |
| 723 | + stage = "qualify" |
| 724 | + qualified = qualify( |
| 725 | + query, |
684 | 726 | dialect=self._dialect, |
| 727 | + schema=self.schema, |
| 728 | + infer_schema=False, |
| 729 | + catalog=self._default_catalog, |
| 730 | + quote_identifiers=self._quote_identifiers, |
685 | 731 | ) |
| 732 | + if trace_enabled: |
| 733 | + logger.info( |
| 734 | + "OPTIMIZER TRACE stage=qualified sql=%s", |
| 735 | + qualified.sql(dialect=self._dialect, identify=True, comments=False), |
| 736 | + ) |
| 737 | + |
| 738 | + stage = "annotate_types" |
| 739 | + typed = annotate_types(qualified, schema=self.schema, dialect=self._dialect) |
| 740 | + if trace_enabled: |
| 741 | + logger.info( |
| 742 | + "OPTIMIZER TRACE stage=typed sql=%s", |
| 743 | + typed.sql(dialect=self._dialect, identify=True, comments=False), |
| 744 | + ) |
| 745 | + |
| 746 | + stage = "simplify" |
| 747 | + simplified = simplify(typed, dialect=self._dialect) |
| 748 | + if trace_enabled: |
| 749 | + logger.info( |
| 750 | + "OPTIMIZER TRACE stage=simplified sql=%s", |
| 751 | + simplified.sql(dialect=self._dialect, identify=True, comments=False), |
| 752 | + ) |
| 753 | + query = t.cast(exp.Query, simplified) |
686 | 754 | except SqlglotError as ex: |
687 | 755 | self._violated_rules[AmbiguousOrInvalidColumn] = ex |
| 756 | + if trace_enabled: |
| 757 | + logger.info( |
| 758 | + "OPTIMIZER TRACE failed model=%s stage=%s error=%s", |
| 759 | + self._model_fqn, |
| 760 | + stage, |
| 761 | + ex, |
| 762 | + exc_info=True, |
| 763 | + ) |
| 764 | + logger.info( |
| 765 | + "OPTIMIZER TRACE failed_sql model=%s sql=%s", |
| 766 | + self._model_fqn, |
| 767 | + original.sql(dialect=self._dialect, identify=True, comments=False), |
| 768 | + ) |
688 | 769 |
|
689 | 770 | query = original |
690 | 771 |
|
|
0 commit comments