From f08ebad961254d85732fbbd98b2b8ca254c8410c Mon Sep 17 00:00:00 2001 From: Seelam Balaji Nikitha Date: Wed, 12 Nov 2025 22:18:33 -0800 Subject: [PATCH 1/2] Added trasformation function with unit test cases --- .DS_Store | Bin 0 -> 6148 bytes .github/copilot-instructions.md | 97 ++++++++++++ .../test/utils/transforms/test_cleaning.py | 142 ++++++++++++++++++ .../test/utils/transforms/test_date.py | 72 +++++++++ .../test/utils/transforms/test_impute.py | 117 +++++++++++++++ .../test/utils/transforms/test_math.py | 129 ++++++++++++++++ airbyte_cdk/utils/transforms/__init__.py | 29 ++++ airbyte_cdk/utils/transforms/cleaning.py | 44 ++++++ airbyte_cdk/utils/transforms/date.py | 28 ++++ airbyte_cdk/utils/transforms/impute.py | 94 ++++++++++++ airbyte_cdk/utils/transforms/math.py | 50 ++++++ 11 files changed, 802 insertions(+) create mode 100644 .DS_Store create mode 100644 .github/copilot-instructions.md create mode 100644 airbyte_cdk/test/utils/transforms/test_cleaning.py create mode 100644 airbyte_cdk/test/utils/transforms/test_date.py create mode 100644 airbyte_cdk/test/utils/transforms/test_impute.py create mode 100644 airbyte_cdk/test/utils/transforms/test_math.py create mode 100644 airbyte_cdk/utils/transforms/__init__.py create mode 100644 airbyte_cdk/utils/transforms/cleaning.py create mode 100644 airbyte_cdk/utils/transforms/date.py create mode 100644 airbyte_cdk/utils/transforms/impute.py create mode 100644 airbyte_cdk/utils/transforms/math.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..fb38e850dcfff5f64ac9c611771f18c6ff331212 GIT binary patch literal 6148 zcmeHKF;2rk5S#@Uk 0 # Positive skew + assert _numeric_skewness([1, 2, 2]) < 0 # Negative skew + + # Test edge cases + assert _numeric_skewness([1, 1]) == 0.0 # Less than 3 values + assert _numeric_skewness([1, 1, 1]) == 0.0 # No variance + + # Test with floating point values + assert _numeric_skewness([1.0, 2.0, 3.0]) == pytest.approx(0.0, abs=1e-10) + +def test_choose_imputation_strategy(): + """Test imputation strategy selection function.""" + # Test numeric data + assert choose_imputation_strategy([1, 2, 3]) == "mean" # Low skew + assert choose_imputation_strategy([1, 1, 10]) == "median" # High skew + + # Test categorical data + assert choose_imputation_strategy(["a", "b", "c"], numeric=False) == "mode" + assert choose_imputation_strategy(["a", "a", "b"]) == "mode" # Autodetect non-numeric + + # Test repeated values with custom threshold + assert choose_imputation_strategy([1, 1, 1, 2], unique_ratio_threshold=0.6) == "mode" # Low unique ratio (0.5 < 0.6) + + # Test empty and None values + assert choose_imputation_strategy([]) == "mode" + assert choose_imputation_strategy([None, None]) == "mode" + + # Test with mixed types + assert choose_imputation_strategy([1, "2", 3]) == "mode" # Non-numeric detected + +def test_compute_imputation_value(): + """Test imputation value computation function.""" + # Test mean strategy + assert compute_imputation_value([1, 2, 3], "mean") == 2.0 + assert compute_imputation_value([1.5, 2.5, 3.5], "mean") == 2.5 + + # Test median strategy + assert compute_imputation_value([1, 2, 3, 4], "median") == 2.5 + assert compute_imputation_value([1, 2, 3], "median") == 2.0 + + # Test mode strategy + assert compute_imputation_value([1, 1, 2], "mode") == 1 + assert compute_imputation_value(["a", "a", "b"], "mode") == "a" + + # Test with None values + assert compute_imputation_value([1, None, 3], "mean") == 2.0 + assert compute_imputation_value([None, None], "mean") is None + + # Test invalid strategy + with pytest.raises(ValueError): + compute_imputation_value([1, 2, 3], "invalid") + +def test_fill_nulls_column(): + """Test column null filling function.""" + # Test numeric data + values, report = fill_nulls_column([1, None, 3]) + assert values == [1, 2.0, 3] + assert report.strategy == "mean" + assert report.value_used == 2.0 + + # Test categorical data + values, report = fill_nulls_column(["a", None, "a"]) + assert values == ["a", "a", "a"] + assert report.strategy == "mode" + assert report.value_used == "a" + + # Test explicit strategy + values, report = fill_nulls_column([1, None, 3], explicit_strategy="median") + assert values == [1, 2, 3] + assert report.strategy == "median" + + # Test all None values + values, report = fill_nulls_column([None, None]) + assert values == [None, None] + assert report.value_used is None + +def test_fill_nulls_record(): + """Test record null filling function.""" + # Test basic record filling + record = {"a": 1, "b": None, "c": "x"} + samples = {"a": [1, 2, 3], "b": [4, 5, 6], "c": ["x", "y", "x"]} + filled, reports = fill_nulls_record(record, ["a", "b", "c"], samples) + + assert filled["a"] == 1 + assert filled["b"] == 5.0 # Mean of samples + assert filled["c"] == "x" + assert len(reports) == 3 + assert all(isinstance(r, ImputationReport) for r in reports) + + # Test with explicit strategies + strategies = {"b": "median"} + filled, reports = fill_nulls_record(record, ["a", "b", "c"], samples, strategies=strategies) + assert filled["b"] == 5.0 # Median of samples + + # Test with empty samples + filled, reports = fill_nulls_record(record, ["a", "b", "c"], {}) + assert filled["b"] is None # No samples to impute from + + # Test with missing columns + filled, reports = fill_nulls_record(record, ["a", "d"], samples) + assert "d" in filled + assert len(reports) == 2 \ No newline at end of file diff --git a/airbyte_cdk/test/utils/transforms/test_math.py b/airbyte_cdk/test/utils/transforms/test_math.py new file mode 100644 index 000000000..a9de95aa7 --- /dev/null +++ b/airbyte_cdk/test/utils/transforms/test_math.py @@ -0,0 +1,129 @@ +"""Unit tests for math transforms.""" +import math +import pytest +from airbyte_cdk.utils.transforms.math import ( + minmax_scale, + zscore, + clip, + winsorize, + log1p_safe, + bucketize, + robust_percentile_scale, +) + +def test_minmax_scale(): + """Test minmax scaling function.""" + # Test normal scaling + assert minmax_scale(5, 0, 10) == 0.5 + assert minmax_scale(5, 0, 10, (0, 100)) == 50.0 + + # Test edge cases + assert minmax_scale(0, 0, 10) == 0.0 + assert minmax_scale(10, 0, 10) == 1.0 + + # Test custom range scaling + assert minmax_scale(5, 0, 10, (-1, 1)) == 0.0 + + # Test when data_max equals data_min (prevents division by zero) + assert minmax_scale(5, 5, 5) == 0.5 # Should return middle of output range + + # Test with float inputs + assert minmax_scale(5.5, 0.0, 10.0) == 0.55 + +def test_zscore(): + """Test z-score calculation function.""" + # Test normal cases + assert zscore(10, 5, 2) == 2.5 # (10 - 5) / 2 + assert zscore(0, 5, 2) == -2.5 # (0 - 5) / 2 + + # Test with zero sigma + assert zscore(10, 5, 0) == 0.0 # Should handle division by zero gracefully + + # Test with float inputs + assert zscore(10.5, 5.0, 2.0) == 2.75 + +def test_clip(): + """Test value clipping function.""" + # Test normal clipping + assert clip(5, 0, 10) == 5 + assert clip(-1, 0, 10) == 0 + assert clip(11, 0, 10) == 10 + + # Test with float values + assert clip(5.5, 0.0, 10.0) == 5.5 + assert clip(-1.5, 0.0, 10.0) == 0.0 + + # Test when low == high + assert clip(5, 3, 3) == 3 + +def test_winsorize(): + """Test winsorization function.""" + # Test normal cases + assert winsorize(5, 0, 10) == 5 + assert winsorize(-1, 0, 10) == 0 + assert winsorize(11, 0, 10) == 10 + + # Test with float values + assert winsorize(5.5, 0.0, 10.0) == 5.5 + + # Test when low == high + assert winsorize(5, 3, 3) == 3 + +def test_log1p_safe(): + """Test safe log1p calculation function.""" + # Test normal cases + assert log1p_safe(0) == 0.0 + assert log1p_safe(math.e - 1) == 1.0 + + # Test negative values > -1 + assert abs(log1p_safe(-0.5) - math.log1p(-0.5)) < 1e-10 + + # Test negative values <= -1 + assert log1p_safe(-2) == -2.0 # Should return input value + + # Test error cases + assert log1p_safe(float('inf')) == float('inf') + +def test_bucketize(): + """Test bucketization function.""" + edges = [0, 10, 20, 30] + + # Test normal cases + assert bucketize(-5, edges) == 0 + assert bucketize(5, edges) == 1 + assert bucketize(15, edges) == 2 + assert bucketize(25, edges) == 3 + assert bucketize(35, edges) == 4 + + # Test edge values + assert bucketize(0, edges) == 0 + assert bucketize(10, edges) == 1 + assert bucketize(20, edges) == 2 + assert bucketize(30, edges) == 3 + + # Test empty edges + assert bucketize(5, []) == 0 + + # Test single edge + assert bucketize(5, [10]) == 0 # 5 ≤ 10, so bucket 0 + assert bucketize(15, [10]) == 1 # 15 > 10, so bucket 1 + +def test_robust_percentile_scale(): + """Test robust percentile scaling function.""" + # Test normal scaling + assert robust_percentile_scale(5, 0, 10) == 0.5 + assert robust_percentile_scale(5, 0, 10, (0, 100)) == 50.0 + + # Test edge cases + assert robust_percentile_scale(0, 0, 10) == 0.0 + assert robust_percentile_scale(10, 0, 10) == 1.0 + + # Test custom range + assert robust_percentile_scale(5, 0, 10, (-1, 1)) == 0.0 + + # Test clipping + assert robust_percentile_scale(-1, 0, 10) == 0.0 # With clipping + assert robust_percentile_scale(-1, 0, 10, clip_outliers=False) < 0.0 # Without clipping + + # Test when high equals low + assert robust_percentile_scale(5, 5, 5) == 0.5 # Should return middle of output range diff --git a/airbyte_cdk/utils/transforms/__init__.py b/airbyte_cdk/utils/transforms/__init__.py new file mode 100644 index 000000000..6c5e3d214 --- /dev/null +++ b/airbyte_cdk/utils/transforms/__init__.py @@ -0,0 +1,29 @@ +from .math import ( + minmax_scale, zscore, clip, winsorize, log1p_safe, + bucketize, robust_percentile_scale +) +from .cleaning import ( + to_lower, strip_whitespace, squash_whitespace, + normalize_unicode, remove_punctuation, map_values, cast_numeric +) +from .date import ( + try_parse_date, extract_date_parts, floor_to_month, ceil_to_month +) +from .impute import ( + ImputationReport, choose_imputation_strategy, + compute_imputation_value, fill_nulls_column, fill_nulls_record +) + +__all__ = [ + # math + "minmax_scale","zscore","clip","winsorize","log1p_safe", + "bucketize","robust_percentile_scale", + # cleaning + "to_lower","strip_whitespace","squash_whitespace", + "normalize_unicode","remove_punctuation","map_values","cast_numeric", + # date + "try_parse_date","extract_date_parts","floor_to_month","ceil_to_month", + # impute + "ImputationReport","choose_imputation_strategy", + "compute_imputation_value","fill_nulls_column","fill_nulls_record", +] diff --git a/airbyte_cdk/utils/transforms/cleaning.py b/airbyte_cdk/utils/transforms/cleaning.py new file mode 100644 index 000000000..12219d02a --- /dev/null +++ b/airbyte_cdk/utils/transforms/cleaning.py @@ -0,0 +1,44 @@ +from __future__ import annotations +from typing import Any, Mapping, Optional, Union +import re, unicodedata + +Number = Union[int, float] + +def to_lower(s: Optional[str]) -> Optional[str]: + return None if s is None else s.lower() + +def strip_whitespace(s: Optional[str]) -> Optional[str]: + return None if s is None else s.strip() + +def squash_whitespace(s: Optional[str]) -> Optional[str]: + if s is None: + return None + return re.sub(r"\s+", " ", s).strip() + +def normalize_unicode(s: Optional[str], form: str="NFKC") -> Optional[str]: + return None if s is None else unicodedata.normalize(form, s) + +_PUNCT_RE = re.compile(r"[^\w\s]", re.UNICODE) +def remove_punctuation(s: Optional[str]) -> Optional[str]: + if s is None: + return None + return _PUNCT_RE.sub("", s) + +def map_values(value: Any, mapping: Mapping[Any, Any], default: Any=None) -> Any: + return mapping.get(value, default) + +def cast_numeric(value: Any, on_error: str="ignore", default: Optional[Number]=None) -> Optional[Number]: + try: + if value is None or (isinstance(value, str) and value.strip() == ""): + raise ValueError("empty") + f = float(value) + i = int(f) + return i if i == f else f + except Exception: + if on_error == "default": + return default + if on_error == "none": + return None + if on_error == "raise": + raise + return value diff --git a/airbyte_cdk/utils/transforms/date.py b/airbyte_cdk/utils/transforms/date.py new file mode 100644 index 000000000..0f2bf8abd --- /dev/null +++ b/airbyte_cdk/utils/transforms/date.py @@ -0,0 +1,28 @@ +from __future__ import annotations +from typing import Any, Dict, Optional + +def try_parse_date(value: Any): + # accept datetime/pandas/pendulum-like objects; else None + if hasattr(value, "year") and hasattr(value, "month") and hasattr(value, "day"): + return value + return None + +def extract_date_parts(dt) -> Dict[str, Optional[int]]: + try: + return {"year": dt.year, "month": dt.month, "day": dt.day, "dow": int(dt.weekday())} + except Exception: + return {"year": None, "month": None, "day": None, "dow": None} + +def floor_to_month(dt): + try: + return dt.replace(day=1) + except Exception: + return None + +def ceil_to_month(dt): + try: + if dt.month == 12: + return dt.replace(year=dt.year + 1, month=1, day=1) + return dt.replace(month=dt.month + 1, day=1) + except Exception: + return None diff --git a/airbyte_cdk/utils/transforms/impute.py b/airbyte_cdk/utils/transforms/impute.py new file mode 100644 index 000000000..f64bedde2 --- /dev/null +++ b/airbyte_cdk/utils/transforms/impute.py @@ -0,0 +1,94 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union +from statistics import mean, median +from collections import Counter +import math + +Number = Union[int, float] + +@dataclass +class ImputationReport: + field: str + strategy: str + value_used: Any + notes: str = "" + +def _numeric_skewness(values: List[Number]) -> float: + n = len(values) + if n < 3: return 0.0 + mu = mean(values) + var = sum((x - mu) ** 2 for x in values) / (n - 1) + if var == 0: return 0.0 + sd = math.sqrt(var) + m3 = sum((x - mu) ** 3 for x in values) / n + g1 = m3 / (sd ** 3) + return float(((n * (n - 1)) ** 0.5 / (n - 2)) * g1) + +def choose_imputation_strategy( + series: Sequence[Any], + numeric: bool | None = None, + skew_threshold: float = 0.75, + unique_ratio_threshold: float = 0.05, +) -> str: + data = [x for x in series if x is not None] + if not data: + return "mode" + if numeric is None: + numeric = all(isinstance(x, (int, float)) for x in data) + if not numeric: + return "mode" + uniq = len(set(data)) + if (uniq / max(len(data), 1)) <= unique_ratio_threshold: + return "mode" + skew = abs(_numeric_skewness([float(x) for x in data])) + return "median" if skew > skew_threshold else "mean" + +def compute_imputation_value(series: Sequence[Any], strategy: str) -> Any: + clean = [x for x in series if x is not None] + if not clean: + return None + if strategy == "mean": + nums = [float(x) for x in clean if isinstance(x, (int, float))] + return mean(nums) if nums else None + if strategy == "median": + nums = [float(x) for x in clean if isinstance(x, (int, float))] + return median(nums) if nums else None + if strategy == "mode": + counts = Counter(clean) + maxc = max(counts.values()) + # deterministic tie-break + return sorted([k for k, v in counts.items() if v == maxc], key=lambda x: repr(x))[0] + raise ValueError(f"Unknown strategy: {strategy}") + +def fill_nulls_column( + series: Sequence[Any], + explicit_strategy: Optional[str] = None, + numeric: Optional[bool] = None, + **choose_kwargs +) -> Tuple[List[Any], ImputationReport]: + strategy = explicit_strategy or choose_imputation_strategy(series, numeric=numeric, **choose_kwargs) + fill_value = compute_imputation_value(series, strategy) + return [fill_value if x is None else x for x in series], ImputationReport("", strategy, fill_value) + +def fill_nulls_record( + record: Dict[str, Any], + columns: Sequence[str], + samples: Mapping[str, Sequence[Any]], + strategies: Optional[Mapping[str, str]] = None, + choose_kwargs: Optional[Dict[str, Any]] = None, +) -> Tuple[Dict[str, Any], List[ImputationReport]]: + choose_kwargs = choose_kwargs or {} + out = dict(record) + reports: List[ImputationReport] = [] + for col in columns: + series = samples.get(col, []) + sflag = strategies.get(col) if strategies else None + # infer numeric from samples if not set + numeric = all(isinstance(x, (int, float)) for x in series if x is not None) if series else None + # Use sample data for imputation, but only fill the record's value + strategy = sflag or choose_imputation_strategy(series, numeric=numeric, **choose_kwargs) + fill_value = compute_imputation_value(series, strategy=strategy) + out[col] = record.get(col) if record.get(col) is not None else fill_value + reports.append(ImputationReport(col, strategy, fill_value)) + return out, reports diff --git a/airbyte_cdk/utils/transforms/math.py b/airbyte_cdk/utils/transforms/math.py new file mode 100644 index 000000000..563ee9109 --- /dev/null +++ b/airbyte_cdk/utils/transforms/math.py @@ -0,0 +1,50 @@ +from __future__ import annotations +from typing import Sequence, Tuple, Union +import math + +Number = Union[int, float] + +def minmax_scale(x: Number, data_min: Number, data_max: Number, out_range: Tuple[Number, Number]=(0.0, 1.0)) -> float: + a, b = out_range + if data_max == data_min: + return float(a + (b - a) / 2.0) + return ((float(x) - data_min) / (data_max - data_min)) * (b - a) + a + +def zscore(x: Number, mu: float, sigma: float) -> float: + return 0.0 if sigma == 0 else (float(x) - mu) / sigma + +def clip(x: Number, low: Number, high: Number) -> Number: + return max(low, min(high, x)) + +def winsorize(x: Number, low_value: Number, high_value: Number) -> Number: + return clip(x, low_value, high_value) + +def log1p_safe(x: Number) -> float: + if x < -1: + return float(x) + try: + return math.log1p(float(x)) + except Exception: + return float(x) + +def bucketize(x: Number, edges: Sequence[Number]) -> int: + for i, e in enumerate(edges): + if x <= e: + return i + return len(edges) + +def robust_percentile_scale( + x: Number, + p_low_value: Number, + p_high_value: Number, + out_range: Tuple[Number, Number]=(0.0, 1.0), + clip_outliers: bool=True +) -> float: + a, b = out_range + lo, hi = float(p_low_value), float(p_high_value) + if clip_outliers: + x = clip(float(x), lo, hi) + width = hi - lo + if width == 0: + return float(a + (b - a) / 2.0) + return ((float(x) - lo) / width) * (b - a) + a From a4c54d31353db972703fb8b5f3490c1e9a3ea44b Mon Sep 17 00:00:00 2001 From: Seelam Balaji Nikitha Date: Wed, 12 Nov 2025 23:30:06 -0800 Subject: [PATCH 2/2] Removed .github/copilot-instructions.md file --- .github/copilot-instructions.md | 97 --------------------------------- 1 file changed, 97 deletions(-) delete mode 100644 .github/copilot-instructions.md diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md deleted file mode 100644 index 2252de144..000000000 --- a/.github/copilot-instructions.md +++ /dev/null @@ -1,97 +0,0 @@ -# Airbyte Python CDK AI Development Guide - -This guide provides essential context for AI agents working with the Airbyte Python CDK codebase. - -## Project Overview - -The Airbyte Python CDK is a framework for building Source Connectors for the Airbyte data integration platform. It provides components for: - -- HTTP API connectors (REST, GraphQL) -- Declarative connectors using manifest files -- File-based source connectors -- Vector database destinations -- Concurrent data fetching - -## Key Architectural Concepts - -### Core Components - -- **Source Classes**: Implement the `Source` interface in `airbyte_cdk.sources.source`. Base implementations include: - - `AbstractSource` - Base class for Python sources - - `DeclarativeSource` - For low-code connectors defined via manifest files - - `ConcurrentSource` - For high-throughput parallel data fetching - -- **Streams**: Core abstraction for data sources (`airbyte_cdk.sources.streams.Stream`). Key types: - - `HttpStream` - Base class for HTTP API streams - - `DefaultStream` - Used with declarative sources - - Concurrent streams in `airbyte_cdk.sources.streams.concurrent` - -### Data Flow -1. Sources expose one or more Stream implementations -2. Streams define schema, state management, and record extraction -3. Records flow through the Airbyte protocol via standardized message types - -## Development Conventions - -### Testing Patterns - -- Unit tests use pytest with scenarios pattern (`unit_tests/sources/**/test_*.py`) -- Mock HTTP responses with `HttpMocker` and response builders -- Standard test suite base classes in `airbyte_cdk.test.standard_tests` -- Use `@pytest.mark.parametrize` for test variations - -### Source Implementation - -- Prefer declarative manifests using `SourceDeclarativeManifest` for simple API connectors -- Extend base classes for custom logic: - ```python - from airbyte_cdk.sources import AbstractSource - from airbyte_cdk.sources.streams import Stream - - class MySource(AbstractSource): - def check_connection(...): - # Verify credentials/connectivity - - def streams(self, config): - return [MyStream(config)] - ``` - -### State Management - -- Use `ConnectorStateManager` for handling incremental sync state -- Implement cursor fields in streams for incremental syncs -- State is persisted as JSON-serializable objects - -## Common Workflows - -### Building a New Connector - -1. Start with [Connector Builder UI](https://docs.airbyte.com/connector-development/connector-builder-ui/overview) -2. For complex cases, use low-code CDK with manifest files -3. Custom Python implementation only when necessary - -### Testing - -```bash -pytest unit_tests/ # Run all tests -pytest unit_tests/sources/my_connector/ # Test specific connector -``` - -### Dependencies - -- Manage with Poetry (`pyproject.toml`) -- Core requirements locked in `poetry.lock` -- Optional features via extras in `pyproject.toml` - -## Integration Points - -- Airbyte Protocol: Messages must conform to protocol models in `airbyte_cdk.models` -- External APIs: Use `HttpStream` with proper rate limiting -- Vector DBs: Implement destination logic using `destinations.vector_db_based` - -## Key Files - -- `airbyte_cdk/sources/abstract_source.py`: Base source implementation -- `airbyte_cdk/sources/streams/http/http.py`: HTTP stream base class -- `airbyte_cdk/sources/declarative/`: Low-code CDK components -- `unit_tests/sources/`: Test examples and patterns \ No newline at end of file