Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
edcea63
Add result_set_type_hints parameter for precise complex type conversion
laughingman7743 Feb 28, 2026
ce38540
Add pull request template
laughingman7743 Feb 28, 2026
5d05791
Refactor: Extract type parsing and typed conversion into parser.py
laughingman7743 Feb 28, 2026
2d0f222
Update test expected values for native format string conversion behavior
laughingman7743 Feb 28, 2026
9cd535f
Move parser tests from test_converter.py to test_parser.py
laughingman7743 Feb 28, 2026
cd9230e
Reorder parser.py: move TypeNode after _split_array_items
laughingman7743 Feb 28, 2026
2bc66a6
Add test conventions to CLAUDE.md
laughingman7743 Feb 28, 2026
40a6d93
Add integration test for result_set_type_hints with default cursor
laughingman7743 Feb 28, 2026
a2155a6
Rename _split_top_level to _split_type_args for clarity
laughingman7743 Feb 28, 2026
73ce2f0
Fix edge cases in typed conversion: backward compat, JSON nesting, nu…
laughingman7743 Feb 28, 2026
13a3805
Clean up PR: fix docstrings, remove unused param, improve type annota…
laughingman7743 Feb 28, 2026
5f5e6d5
Use name-based type matching in JSON struct conversion path
laughingman7743 Feb 28, 2026
3140978
Add documentation for result_set_type_hints feature
laughingman7743 Feb 28, 2026
2ff410c
Optimize hot path performance for type hint conversion
laughingman7743 Feb 28, 2026
311f039
Skip type hint path when no complex type columns exist
laughingman7743 Feb 28, 2026
989aab2
Fix nested array JSON detection and pre-compute column metadata
laughingman7743 Feb 28, 2026
ab33667
Add Hive syntax support, type aliases, parse fallback, and index-base…
laughingman7743 Feb 28, 2026
add20b4
Fix parser robustness: matching paren lookup and unnamed struct split
laughingman7743 Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## WHAT
<!-- (Write the change being made with this pull request) -->

## WHY
<!-- (Write the motivation why you submit this pull request) -->
13 changes: 13 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ export $(cat .env | xargs) && uv run pytest tests/pyathena/test_file.py -v
- Use pytest fixtures from `conftest.py`
- New features require tests; changes to SQLAlchemy dialects must pass `make test-sqla`

#### Test Conventions
- **Class-based tests** for integration tests that use fixtures (cursors, engines): `class TestCursor:` with methods like `def test_fetchone(self, cursor):`
- **Standalone functions** for unit tests of pure logic (converters, parsers, utils): `def test_to_struct_json_formats(input_value, expected):`
- Test file naming mirrors source: `pyathena/parser.py``tests/pyathena/test_parser.py`
- **Fixtures**: Cursor/engine fixtures are defined in `conftest.py` and injected by name (e.g., `cursor`, `engine`, `async_cursor`). Use `indirect=True` parametrization to pass connection options:
```python
@pytest.mark.parametrize("engine", [{"driver": "rest"}], indirect=True)
def test_query(self, engine):
engine, conn = engine
```
- **Parametrize** with `@pytest.mark.parametrize(("input", "expected"), [...])` for data-driven tests
- **Integration tests** (need AWS) use cursor/engine fixtures with real Athena queries; **unit tests** (no AWS) call functions directly with test data

## Architecture — Key Design Decisions

These are non-obvious conventions that can't be discovered by reading code alone.
Expand Down
106 changes: 106 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,112 @@ The `on_start_query_execution` callback is supported by the following cursor typ
Note: `AsyncCursor` and its variants do not support this callback as they already
return the query ID immediately through their different execution model.

## Type hints for complex types

*New in version 3.30.0.*

The Athena API does not return element-level type information for complex types
(array, map, row/struct). PyAthena parses the string representation returned by
Athena, but without type metadata the converter can only apply heuristics — which
may produce incorrect Python types for nested values (e.g. integers left as strings
inside a struct).

The `result_set_type_hints` parameter solves this by letting you provide Athena DDL
type signatures for specific columns. The converter then uses precise, recursive
type-aware conversion instead of heuristics.

```python
from pyathena import connect

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute(
"SELECT col_array, col_map, col_struct FROM one_row_complex",
result_set_type_hints={
"col_array": "array(integer)",
"col_map": "map(integer, integer)",
"col_struct": "row(a integer, b integer)",
},
)
row = cursor.fetchone()
# col_struct values are now integers, not strings:
# {"a": 1, "b": 2} instead of {"a": "1", "b": "2"}
```

Column name matching is case-insensitive. Type hints support arbitrarily nested types:

```python
cursor.execute(
"""
SELECT CAST(
ROW(ROW('2024-01-01', 123), 4.736, 0.583)
AS ROW(header ROW(stamp VARCHAR, seq INTEGER), x DOUBLE, y DOUBLE)
) AS positions
""",
result_set_type_hints={
"positions": "row(header row(stamp varchar, seq integer), x double, y double)",
},
)
row = cursor.fetchone()
positions = row[0]
# positions["header"]["seq"] == 123 (int, not "123")
# positions["x"] == 4.736 (float, not "4.736")
```

### Hive-style syntax

You can paste type signatures from Hive DDL or ``DESCRIBE TABLE`` output directly.
Hive-style angle brackets and colons are automatically converted to Trino-style syntax:

```python
# Both are equivalent:
result_set_type_hints={"col": "array(struct(a integer, b varchar))"} # Trino
result_set_type_hints={"col": "array<struct<a:int,b:varchar>>"} # Hive
```

The ``int`` alias is also supported and resolves to ``integer``.

### Index-based hints for duplicate column names

When a query produces columns with the same alias (e.g. ``SELECT a AS x, b AS x``),
name-based hints cannot distinguish between them. Use integer keys to specify hints
by zero-based column position:

```python
cursor.execute(
"SELECT a AS x, b AS x FROM my_table",
result_set_type_hints={
0: "array(integer)", # first "x" column
1: "map(varchar, integer)", # second "x" column
},
)
```

Integer (index-based) hints take priority over string (name-based) hints for the same
column. You can mix both styles in the same dictionary.

### Constraints

* **Nested arrays in native format** — Athena's native (non-JSON) string representation
does not clearly delimit nested arrays. If your query returns nested arrays
(e.g. `array(array(integer))`), use `CAST(... AS JSON)` in your query to get
JSON-formatted output, which is parsed reliably.
* **Arrow, Pandas, and Polars cursors** — These cursors accept `result_set_type_hints`
but their converters do not currently use the hints because they rely on their own
type systems. The parameter is passed through for forward compatibility and for
result sets that fall back to the default conversion path.

### Breaking change in 3.30.0

Prior to 3.30.0, PyAthena attempted to infer Python types for scalar values inside
complex types using heuristics (e.g. `"123"``123`). Starting with 3.30.0, values
inside complex types are **kept as strings** unless `result_set_type_hints` is provided.
This change avoids silent misconversion but means existing code that relied on the
heuristic behavior may see string values where it previously saw integers or floats.

To restore typed conversion, pass `result_set_type_hints` with the appropriate type
signatures for the affected columns.

## Environment variables

Support [Boto3 environment variables](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables).
Expand Down
5 changes: 5 additions & 0 deletions pyathena/aio/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async def execute( # type: ignore[override]
result_reuse_enable: bool | None = None,
result_reuse_minutes: int | None = None,
paramstyle: str | None = None,
result_set_type_hints: dict[str | int, str] | None = None,
**kwargs,
) -> AioCursor:
"""Execute a SQL query asynchronously.
Expand All @@ -93,6 +94,9 @@ async def execute( # type: ignore[override]
result_reuse_enable: Enable result reuse (optional).
result_reuse_minutes: Result reuse duration in minutes (optional).
paramstyle: Parameter style to use (optional).
result_set_type_hints: Optional dictionary mapping column names to
Athena DDL type signatures for precise type conversion within
complex types.
**kwargs: Additional execution parameters.
Returns:
Expand All @@ -119,6 +123,7 @@ async def execute( # type: ignore[override]
query_execution,
self.arraysize,
self._retry_config,
result_set_type_hints=result_set_type_hints,
)
else:
raise OperationalError(query_execution.state_change_reason)
Expand Down
14 changes: 13 additions & 1 deletion pyathena/aio/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
query_execution: AthenaQueryExecution,
arraysize: int,
retry_config: RetryConfig,
result_set_type_hints: dict[str | int, str] | None = None,
) -> None:
super().__init__(
connection=connection,
Expand All @@ -43,6 +44,7 @@ def __init__(
arraysize=arraysize,
retry_config=retry_config,
_pre_fetch=False,
result_set_type_hints=result_set_type_hints,
)

@classmethod
Expand All @@ -53,6 +55,7 @@ async def create(
query_execution: AthenaQueryExecution,
arraysize: int,
retry_config: RetryConfig,
result_set_type_hints: dict[str | int, str] | None = None,
) -> AthenaAioResultSet:
"""Async factory method.
Expand All @@ -64,11 +67,20 @@ async def create(
query_execution: Query execution metadata.
arraysize: Number of rows to fetch per request.
retry_config: Retry configuration for API calls.
result_set_type_hints: Optional dictionary mapping column names to
Athena DDL type signatures for precise type conversion.
Returns:
A fully initialized ``AthenaAioResultSet``.
"""
result_set = cls(connection, converter, query_execution, arraysize, retry_config)
result_set = cls(
connection,
converter,
query_execution,
arraysize,
retry_config,
result_set_type_hints=result_set_type_hints,
)
if result_set.state == AthenaQueryExecution.STATE_SUCCEEDED:
await result_set._async_pre_fetch()
return result_set
Expand Down
4 changes: 4 additions & 0 deletions pyathena/arrow/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def arraysize(self, value: int) -> None:
def _collect_result_set(
self,
query_id: str,
result_set_type_hints: dict[str | int, str] | None = None,
unload_location: str | None = None,
kwargs: dict[str, Any] | None = None,
) -> AthenaArrowResultSet:
Expand All @@ -165,6 +166,7 @@ def _collect_result_set(
unload_location=unload_location,
connect_timeout=self._connect_timeout,
request_timeout=self._request_timeout,
result_set_type_hints=result_set_type_hints,
**kwargs,
)

Expand All @@ -179,6 +181,7 @@ def execute(
result_reuse_enable: bool | None = None,
result_reuse_minutes: int | None = None,
paramstyle: str | None = None,
result_set_type_hints: dict[str | int, str] | None = None,
**kwargs,
) -> tuple[str, Future[AthenaArrowResultSet | Any]]:
operation, unload_location = self._prepare_unload(operation, s3_staging_dir)
Expand All @@ -198,6 +201,7 @@ def execute(
self._executor.submit(
self._collect_result_set,
query_id,
result_set_type_hints,
unload_location,
kwargs,
),
Expand Down
4 changes: 2 additions & 2 deletions pyathena/arrow/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _dtypes(self) -> dict[str, type[Any]]:
}
return self.__dtypes

def convert(self, type_: str, value: str | None) -> Any | None:
def convert(self, type_: str, value: str | None, type_hint: str | None = None) -> Any | None:
converter = self.get(type_)
return converter(value)

Expand All @@ -114,5 +114,5 @@ def __init__(self) -> None:
default=_to_default,
)

def convert(self, type_: str, value: str | None) -> Any | None:
def convert(self, type_: str, value: str | None, type_hint: str | None = None) -> Any | None:
pass
5 changes: 5 additions & 0 deletions pyathena/arrow/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def execute(
result_reuse_minutes: int | None = None,
paramstyle: str | None = None,
on_start_query_execution: Callable[[str], None] | None = None,
result_set_type_hints: dict[str | int, str] | None = None,
**kwargs,
) -> ArrowCursor:
"""Execute a SQL query and return results as Apache Arrow Tables.
Expand All @@ -156,6 +157,9 @@ def execute(
result_reuse_minutes: Minutes to reuse cached results.
paramstyle: Parameter style ('qmark' or 'pyformat').
on_start_query_execution: Callback called when query starts.
result_set_type_hints: Optional dictionary mapping column names to
Athena DDL type signatures for precise type conversion within
complex types.
**kwargs: Additional execution parameters.
Returns:
Expand Down Expand Up @@ -197,6 +201,7 @@ def execute(
unload_location=unload_location,
connect_timeout=self._connect_timeout,
request_timeout=self._request_timeout,
result_set_type_hints=result_set_type_hints,
**kwargs,
)
else:
Expand Down
2 changes: 2 additions & 0 deletions pyathena/arrow/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(
unload_location: str | None = None,
connect_timeout: float | None = None,
request_timeout: float | None = None,
result_set_type_hints: dict[str | int, str] | None = None,
**kwargs,
) -> None:
super().__init__(
Expand All @@ -99,6 +100,7 @@ def __init__(
query_execution=query_execution,
arraysize=1, # Fetch one row to retrieve metadata
retry_config=retry_config,
result_set_type_hints=result_set_type_hints,
)
self._rows.clear() # Clear pre_fetch data
self._arraysize = arraysize
Expand Down
15 changes: 13 additions & 2 deletions pyathena/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,19 @@ def poll(self, query_id: str) -> Future[AthenaQueryExecution]:
"""
return cast("Future[AthenaQueryExecution]", self._executor.submit(self._poll, query_id))

def _collect_result_set(self, query_id: str) -> AthenaResultSet:
def _collect_result_set(
self,
query_id: str,
result_set_type_hints: dict[str | int, str] | None = None,
) -> AthenaResultSet:
query_execution = cast(AthenaQueryExecution, self._poll(query_id))
return self._result_set_class(
connection=self._connection,
converter=self._converter,
query_execution=query_execution,
arraysize=self._arraysize,
retry_config=self._retry_config,
result_set_type_hints=result_set_type_hints,
)

def execute(
Expand All @@ -165,6 +170,7 @@ def execute(
result_reuse_enable: bool | None = None,
result_reuse_minutes: int | None = None,
paramstyle: str | None = None,
result_set_type_hints: dict[str | int, str] | None = None,
**kwargs,
) -> tuple[str, Future[AthenaResultSet | Any]]:
"""Execute a SQL query asynchronously.
Expand All @@ -183,6 +189,9 @@ def execute(
result_reuse_enable: Enable result reuse for identical queries (optional).
result_reuse_minutes: Result reuse duration in minutes (optional).
paramstyle: Parameter style to use (optional).
result_set_type_hints: Optional dictionary mapping column names to
Athena DDL type signatures for precise type conversion within
complex types.
**kwargs: Additional execution parameters.
Returns:
Expand All @@ -207,7 +216,9 @@ def execute(
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
return query_id, self._executor.submit(self._collect_result_set, query_id)
return query_id, self._executor.submit(
self._collect_result_set, query_id, result_set_type_hints
)

def executemany(
self,
Expand Down
Loading