Replace SQLAlchemy with ibis-framework for all database operations#66
Replace SQLAlchemy with ibis-framework for all database operations#66daniel-thom wants to merge 2 commits intomainfrom
Conversation
Replace the SQLAlchemy ORM with ibis-framework to provide a cleaner multi-backend abstraction for DuckDB, SQLite, and Spark. This is a clean API break: engine->backend, Connection params removed, ibis expressions replace SQLAlchemy select/join chains. Key changes: - New src/chronify/ibis/ module with IbisBackend ABC and DuckDB/SQLite/Spark implementations - Remove SQLAlchemy, pyhive vendor code, Hive support, and related dependencies - Migrate all source modules (store, mappers, checker, converters) to ibis API - Migrate all tests to use backend fixtures instead of engine fixtures - Add ibis-framework[duckdb,sqlite] dependency, pyspark >= 4.0 for spark extra Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR migrates Chronify’s database layer from SQLAlchemy/Hive-specific code to an ibis-based backend abstraction, aiming to support multiple execution engines (DuckDB, SQLite, Spark) with a unified API.
Changes:
- Introduces
src/chronify/ibis/with backend implementations and I/O helpers (read_query,write_table, Parquet helpers). - Refactors mappers/checkers/time zone utilities and tests to use
IbisBackendinstead ofsqlalchemy.Engine/MetaData. - Removes SQLAlchemy + Hive/pyhive vendor code and updates project dependencies accordingly.
Reviewed changes
Copilot reviewed 53 out of 56 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_time_zone_localizer.py | Switches timezone localization tests from SQLAlchemy engine fixtures to IbisBackend fixtures. |
| tests/test_time_zone_converter.py | Switches timezone conversion tests to the ibis backend API. |
| tests/test_time_series_checker.py | Updates timestamp-checker tests to call check_timestamps(backend, table_name, schema). |
| tests/test_models.py | Updates dtype tests to use ibis datatypes (ibis.expr.datatypes). |
| tests/test_mapper_representative_time_to_datetime.py | Refactors representative-time mapper tests to use read_query/write_table. |
| tests/test_mapper_index_time_to_datetime.py | Refactors index-time mapper tests to use ibis backend querying. |
| tests/test_mapper_datetime_to_datetime.py | Updates datetime-to-datetime mapping tests and write-table duplicate-config behavior expectations. |
| tests/test_mapper_column_representative_to_datetime.py | Updates Store fixture + mapping assertions to use ibis querying instead of SQLAlchemy. |
| tests/test_csv_parser.py | Updates parser tests to construct Store(backend=...). |
| tests/test_checker_representative_time.py | Updates checker tests to use ibis backend and new check_timestamps signature. |
| tests/conftest.py | Replaces SQLAlchemy engine fixtures with ibis backend fixtures (iter_backends, make_backend). |
| src/chronify/utils/sqlalchemy_view.py | Removes SQLAlchemy view DDL utilities. |
| src/chronify/utils/sqlalchemy_table.py | Removes SQLAlchemy “CREATE TABLE AS SELECT” utilities and SQLite timestamp workaround. |
| src/chronify/time_zone_localizer.py | Refactors timezone localization to backend-driven querying + mapping. |
| src/chronify/time_zone_converter.py | Refactors timezone conversion to backend-driven querying + mapping. |
| src/chronify/time_series_mapper.py | Updates public mapping entrypoint to accept IbisBackend. |
| src/chronify/time_series_mapper_representative.py | Refactors mapper to fetch distinct time zones via ibis expressions. |
| src/chronify/time_series_mapper_index_time.py | Refactors index-time mapper to fetch distinct time zones via ibis expressions. |
| src/chronify/time_series_mapper_datetime.py | Refactors datetime mapper base wiring to use IbisBackend. |
| src/chronify/time_series_mapper_column_representative_to_datetime.py | Replaces SQLAlchemy-based intermediate table creation with ibis table creation + joins. |
| src/chronify/time_series_mapper_base.py | Reimplements mapping application as ibis join/aggregate + Parquet output support. |
| src/chronify/time_series_checker.py | Refactors timestamp checks to use backend SQL execution and ibis selects. |
| src/chronify/sqlalchemy/functions.py | Removes SQLAlchemy read/write optimization helpers. |
| src/chronify/schema_manager.py | Refactors schema persistence to a backend-managed table and DataFrame inserts. |
| src/chronify/models.py | Replaces SQLAlchemy dtype handling with ibis dtype handling and new conversion helpers. |
| src/chronify/ibis/base.py | Adds IbisBackend ABC and common helpers (raw SQL exec, “transaction” cleanup). |
| src/chronify/ibis/init.py | Exposes backends and make_backend factory. |
| src/chronify/ibis/duckdb_backend.py | Implements DuckDB backend wrapper. |
| src/chronify/ibis/sqlite_backend.py | Implements SQLite backend wrapper (including insert + Parquet handling). |
| src/chronify/ibis/spark_backend.py | Implements Spark backend wrapper and Spark-specific datetime preparation. |
| src/chronify/ibis/functions.py | Adds backend-agnostic read/write utilities (table/query, parquet, datetime conversions). |
| src/chronify/ibis/types.py | Adds DuckDB/ibis type conversion and dataframe schema inference helpers. |
| src/chronify/hive_functions.py | Removes Hive materialized-view workaround. |
| src/chronify/csv_io.py | Updates CSV dtype mapping to use ibis/duckdb conversion. |
| src/chronify/init.py | Removes pyhive/TCLIService module injection. |
| src/chronify/_vendor/kyuubi/* | Removes vendored pyhive/kyuubi code. |
| pyproject.toml | Drops SQLAlchemy/pyhive deps and adds ibis + pyspark extra. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if check_mapped_timestamps: | ||
| if output_file is not None: | ||
| output_file = to_path(output_file) | ||
| with engine.begin() as conn: | ||
| create_view_from_parquet(conn, to_schema.name, output_file) | ||
| metadata.reflect(engine, views=True) | ||
| create_view_from_parquet(backend, output_file, to_schema.name) | ||
| created_tmp_view = True | ||
| mapped_table = Table(to_schema.name, metadata) | ||
| with engine.connect() as conn: | ||
| try: | ||
| check_timestamps( | ||
| conn, | ||
| mapped_table, | ||
| to_schema, | ||
| leap_day_adjustment=data_adjustment.leap_day_adjustment, | ||
| ) | ||
| except Exception: | ||
| logger.exception( | ||
| "check_timestamps failed on mapped table {}. Drop it", | ||
| to_schema.name, | ||
| ) | ||
| if output_file is None: | ||
| table_type = "VIEW" if engine.name == "hive" else "TABLE" | ||
| conn.execute(text(f"DROP {table_type} {to_schema.name}")) | ||
| raise | ||
| try: | ||
| check_timestamps( | ||
| backend, | ||
| to_schema.name, | ||
| to_schema, | ||
| leap_day_adjustment=data_adjustment.leap_day_adjustment, | ||
| ) | ||
| except Exception: | ||
| logger.exception( | ||
| "check_timestamps failed on mapped table {}. Drop it", | ||
| to_schema.name, | ||
| ) | ||
| if output_file is None: | ||
| backend.drop_table(to_schema.name) | ||
| raise | ||
| finally: | ||
| with engine.begin() as conn: | ||
| table_type = "view" if engine.name == "hive" else "table" | ||
| conn.execute(text(f"DROP {table_type} IF EXISTS {mapping_schema.name}")) | ||
|
|
||
| if created_tmp_view: | ||
| conn.execute(text(f"DROP VIEW IF EXISTS {to_schema.name}")) | ||
| metadata.remove(Table(to_schema.name, metadata)) | ||
|
|
||
| metadata.remove(Table(mapping_schema.name, metadata)) | ||
| metadata.reflect(engine, views=True) | ||
| if backend.has_table(mapping_schema.name): | ||
| backend.drop_table(mapping_schema.name) | ||
| if created_tmp_view: | ||
| backend.drop_view(to_schema.name) |
There was a problem hiding this comment.
apply_mapping always cleans up the temporary object created by create_view_from_parquet via backend.drop_view(to_schema.name). This is incorrect for the SQLite backend, where SQLiteBackend.create_view_from_parquet currently materializes a table (it loads the Parquet into a table) rather than creating a view. That means the cleanup path can fail (or silently leave the temp table behind) and can also leave incorrect state when check_mapped_timestamps=True and output_file is used. Consider having create_view_from_parquet return the created object type (table vs view) or standardize backends to always create a view-like object, then drop accordingly (e.g., call drop_table for SQLite).
src/chronify/ibis/duckdb_backend.py
Outdated
| self._connection.raw_sql( | ||
| f"CREATE VIEW {name} AS SELECT * FROM read_parquet('{path}')" |
There was a problem hiding this comment.
DuckDBBackend.create_view_from_parquet uses read_parquet('{path}') directly. The old implementation supported directory inputs (e.g., partitioned datasets) by globbing path/**/*.parquet. With the current code, passing a directory path is likely to fail or not read all files. Consider detecting Path(path).is_dir() and using a DuckDB glob ({path}/**/*.parquet) or DuckDB’s directory-reading behavior explicitly, so Store.create_view_from_parquet and timestamp checks work for partitioned outputs.
| self._connection.raw_sql( | |
| f"CREATE VIEW {name} AS SELECT * FROM read_parquet('{path}')" | |
| parquet_path = Path(path) | |
| read_path = ( | |
| str(parquet_path / "**" / "*.parquet").replace("\\", "/") | |
| if parquet_path.is_dir() | |
| else str(parquet_path).replace("\\", "/") | |
| ) | |
| self._connection.raw_sql( | |
| f"CREATE VIEW {name} AS SELECT * FROM read_parquet('{read_path}')" |
| def _create_schemas_table(self) -> None: | ||
| import ibis | ||
|
|
||
| schema = ibis.schema({"name": "string", "schema": "string"}) | ||
| self._backend.create_table(self.SCHEMAS_TABLE, schema=schema) | ||
|
|
||
| def add_schema(self, schema: TableSchema) -> None: | ||
| """Add the schema to the store.""" | ||
| table = self._get_schema_table() | ||
| stmt = insert(table).values(name=schema.name, schema=schema.model_dump_json()) | ||
| conn.execute(stmt) | ||
| # If there is a rollback after this addition to cached, things _should_ still be OK. | ||
| # The table will be deleted and any attempted reads will fail with an error. | ||
| # There will be a stale entry in cache, but it will be overwritten if the user ever | ||
| # adds a new table with the same name. | ||
| df = pd.DataFrame({"name": [schema.name], "schema": [schema.model_dump_json()]}) | ||
| self._backend.insert(self.SCHEMAS_TABLE, df) | ||
| self._cache[schema.name] = schema |
There was a problem hiding this comment.
The schemas table is created without any uniqueness constraint on name (previously it was unique=True). If add_schema is called twice for the same table name, duplicates will be inserted and rebuild_cache will later fail on assert name not in self._cache. Consider enforcing uniqueness at table creation time (primary key / unique index) and/or making add_schema an upsert (delete-then-insert) to keep the table consistent.
| def remove_schema(self, name: str) -> None: | ||
| """Remove the schema from the store.""" | ||
| table = self._get_schema_table() | ||
| if self._engine.name == "hive": | ||
| # Hive/Spark doesn't support delete, so we have to re-create the table without | ||
| # this one entry | ||
| stmt = select(table).where(table.c.name != name) | ||
| rows = conn.execute(stmt).fetchall() | ||
| conn.execute(text(f"DROP TABLE {self.SCHEMAS_TABLE}")) | ||
| conn.execute(text(f"CREATE TABLE {self.SCHEMAS_TABLE}(name STRING, schema STRING)")) | ||
| for row in rows: | ||
| params = {"name": row[0], "schema": row[1]} | ||
| conn.execute( | ||
| text(f"INSERT INTO {self.SCHEMAS_TABLE} VALUES(:name, :schema)"), | ||
| params, | ||
| ) | ||
| else: | ||
| stmt2 = delete(table).where(table.c["name"] == name) | ||
| conn.execute(stmt2) | ||
|
|
||
| self._cache.pop(name) | ||
| self._backend.execute_sql(f"DELETE FROM {self.SCHEMAS_TABLE} WHERE name = '{name}'") | ||
| self._cache.pop(name, None) |
There was a problem hiding this comment.
remove_schema builds a DELETE statement via string interpolation (... WHERE name = '{name}'). This breaks when name contains quotes and also creates an avoidable SQL-injection vector. Use parameterized execution (backend-specific) or at least escape/quote identifiers safely; alternatively, implement a delete_where helper on IbisBackend for safe deletes across backends.
| @pytest.fixture(params=BACKEND_NAMES) | ||
| def iter_backends(request) -> Generator[IbisBackend, None, None]: | ||
| """Return an iterable of in-memory backends to test.""" | ||
| yield make_backend(request.param) | ||
|
|
There was a problem hiding this comment.
iter_backends yields a backend without disposing it. Since the backends maintain open connections, this can leak resources across tests (especially for file-based backends). Add a teardown after yield (e.g., backend.dispose()).
| @pytest.fixture(params=BACKEND_NAMES) | ||
| def iter_backends_file(request, tmp_path) -> Generator[tuple[IbisBackend, str], None, None]: | ||
| """Return an iterable of file-based backends to test.""" | ||
| file_path = tmp_path / "store.db" | ||
| url = engine["url"].replace(":memory:", str(file_path)) | ||
| yield create_engine(url, *engine["connect_args"], **engine["kwargs"]) | ||
| backend = make_backend(request.param, database=str(file_path)) | ||
| yield backend, request.param | ||
|
|
There was a problem hiding this comment.
iter_backends_file yields a file-backed backend without disposing it. On Windows in particular, this can keep the DB file locked and interfere with cleanup. Add teardown logic after yield to call backend.dispose().
| def _intermediate_mapping_ymdp_to_ymdh(self) -> TableSchema: | ||
| """Convert ymdp to ymdh for intermediate mapping.""" | ||
| mapping_table_name = "intermediate_ymdp_to_ymdh" | ||
| period_col = self._from_time_config.hour_columns[0] | ||
| with self._engine.begin() as conn: | ||
| periods = read_database( | ||
| f"SELECT DISTINCT {period_col} FROM {self._from_schema.name}", | ||
| conn, | ||
| self._from_time_config, | ||
| ) | ||
| df_mapping = generate_period_mapping(periods.iloc[:, 0]) | ||
| write_database( | ||
| df_mapping, | ||
| conn, | ||
| mapping_table_name, | ||
| [self._from_time_config], | ||
| if_table_exists="replace", | ||
| scratch_dir=scratch_dir, | ||
| ) | ||
|
|
||
| self._metadata.reflect(self._engine) | ||
| ymdp_table = sa.Table(self._from_schema.name, self._metadata) | ||
| mapping_table = sa.Table(mapping_table_name, self._metadata) | ||
| # Get distinct periods | ||
| df_periods = self._backend.execute_sql_to_df( | ||
| f"SELECT DISTINCT {period_col} FROM {self._from_schema.name}" | ||
| ) | ||
| df_mapping = generate_period_mapping(df_periods.iloc[:, 0]) | ||
| write_table( | ||
| self._backend, | ||
| df_mapping, | ||
| mapping_table_name, | ||
| [self._from_time_config], | ||
| if_exists="fail", | ||
| ) | ||
|
|
||
| # Build the join query using ibis | ||
| ymdp_table = self._backend.table(self._from_schema.name) | ||
| mapping_table = self._backend.table(mapping_table_name) | ||
|
|
||
| select_statement = [col for col in ymdp_table.columns if col.name != period_col] | ||
| select_statement.append(mapping_table.c["hour"]) | ||
| query = ( | ||
| sa.select(*select_statement) | ||
| .select_from(ymdp_table) | ||
| .join(mapping_table, ymdp_table.c[period_col] == mapping_table.c["from_period"]) | ||
| # Select all columns from ymdp except the period column, plus the hour column from mapping | ||
| ymdp_cols = [c for c in ymdp_table.columns if c != period_col] | ||
| select_exprs = [ymdp_table[c] for c in ymdp_cols] + [mapping_table["hour"]] | ||
|
|
||
| joined = ymdp_table.join( | ||
| mapping_table, ymdp_table[period_col] == mapping_table["from_period"] | ||
| ) | ||
| result = joined.select(select_exprs) | ||
|
|
||
| intermediate_ymdh_table_name = "intermediate_Ymdh" | ||
| create_table(intermediate_ymdh_table_name, query, self._engine, self._metadata) | ||
| self._backend.create_table(intermediate_ymdh_table_name, result) | ||
|
|
||
| # Clean up mapping table | ||
| self._backend.drop_table(mapping_table_name) | ||
|
|
There was a problem hiding this comment.
_intermediate_mapping_ymdp_to_ymdh uses fixed, global table names (intermediate_ymdp_to_ymdh, intermediate_Ymdh) and creates intermediate tables outside of a try/finally. If an exception occurs between creating these objects and the cleanup calls, the tables can be left behind and subsequent mappings in the same backend will fail (because create_table(..., overwrite=False) and if_exists="fail"). Consider generating unique temp names (e.g., with a UUID) and/or wrapping creation/cleanup in try/finally (or using IbisBackend.transaction) to guarantee cleanup on error.
src/chronify/ibis/sqlite_backend.py
Outdated
| def create_view_from_parquet(self, path: str, name: str) -> ir.Table: | ||
| # SQLite can't read Parquet natively. Load into a table instead. | ||
| df = pd.read_parquet(path) | ||
| return self.create_table(name, obj=df) | ||
|
|
There was a problem hiding this comment.
SQLiteBackend.create_view_from_parquet claims to create a view, but it actually reads the Parquet file into a physical table via create_table. This mismatch matters because callers (e.g., apply_mapping / Store.create_view_from_parquet) may attempt to drop a view afterward, leaving the table behind. Either implement it as an actual view (if possible), or rename/adjust the interface so callers can correctly clean up the created object type.
src/chronify/ibis/duckdb_backend.py
Outdated
| con = self._connection.con # raw duckdb connection | ||
| con.register("__insert_df", data) | ||
| try: | ||
| con.execute(f"INSERT INTO {name} SELECT * FROM __insert_df") |
There was a problem hiding this comment.
insert relies on positional column order: it registers the DataFrame and runs INSERT INTO {name} SELECT * FROM __insert_df. If the DataFrame column order differs from the destination table’s column order, values will be inserted into the wrong columns. Consider explicitly selecting/reordering columns to match the destination table (and/or specifying the column list in the INSERT) before executing.
| con = self._connection.con # raw duckdb connection | |
| con.register("__insert_df", data) | |
| try: | |
| con.execute(f"INSERT INTO {name} SELECT * FROM __insert_df") | |
| target_columns = list(self.table(name).schema().names) | |
| missing_columns = [column for column in target_columns if column not in data.columns] | |
| extra_columns = [column for column in data.columns if column not in target_columns] | |
| if missing_columns or extra_columns: | |
| raise ValueError( | |
| f"Insert data columns do not match table {name!r}. " | |
| f"Missing columns: {missing_columns}. Extra columns: {extra_columns}." | |
| ) | |
| ordered_data = data.loc[:, target_columns] | |
| quoted_columns = ", ".join(f'"{column}"' for column in target_columns) | |
| con = self._connection.con # raw duckdb connection | |
| con.register("__insert_df", ordered_data) | |
| try: | |
| con.execute( | |
| f"INSERT INTO {name} ({quoted_columns}) " | |
| f"SELECT {quoted_columns} FROM __insert_df" | |
| ) |
| def insert(self, name: str, data: pd.DataFrame) -> None: | ||
| # Use raw SQLite cursor for parameterized inserts | ||
| con = self._connection.con # raw sqlite3 connection | ||
| table = self._connection.table(name) | ||
| columns = table.columns | ||
| placeholders = ", ".join(["?"] * len(columns)) | ||
| col_list = ", ".join(columns) | ||
| sql = f"INSERT INTO {name} ({col_list}) VALUES ({placeholders})" | ||
|
|
||
| arrow_table = pa.Table.from_pandas(data) | ||
| cursor = con.cursor() | ||
| for batch in arrow_table.to_batches(): | ||
| rows = [tuple(row[col].as_py() for col in range(batch.num_columns)) for row in zip(*[batch.column(i) for i in range(batch.num_columns)])] | ||
| cursor.executemany(sql, rows) | ||
| con.commit() |
There was a problem hiding this comment.
insert builds the INSERT statement with an explicit (col_list) but then constructs row tuples purely by positional order from the incoming DataFrame/Arrow table. If data.columns is not in the same order as table.columns, values will be bound to the wrong columns. Reorder data = data[columns] (or build rows by column name) before batching, so inserts are column-safe.
- Fix create_view_from_parquet to return ObjectType so callers drop the correct object type (SQLite creates a table, not a view) - Handle directory paths in DuckDB create_view_from_parquet for partitioned parquet datasets - Add unique index on schemas table name column to prevent duplicates - Escape single quotes in schema remove_schema to prevent SQL injection - Add dispose() teardown to test fixtures to prevent resource leaks - Use explicit column ordering in DuckDB insert to prevent column mismatch when DataFrame column order differs from table - Clean up schema on failed ingestion rollback in all Store methods Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the SQLAlchemy ORM with ibis-framework to provide a cleaner multi-backend abstraction for DuckDB, SQLite, and Spark. This is a clean API break: engine->backend, Connection params removed, ibis expressions replace SQLAlchemy select/join chains.
Key changes: