Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -905,40 +905,43 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
}
}
}
for (const auto & column : requested_columns_list)
if (!file_meta_data.value()->columns_info.empty())
{
const auto & column_name = column.first;
for (const auto & column : requested_columns_list)
{
const auto & column_name = column.first;

if (file_meta_data.value()->columns_info.contains(column_name))
continue;
if (file_meta_data.value()->columns_info.contains(column_name))
continue;

if (!column.second.second.type->isNullable())
continue;
if (!column.second.second.type->isNullable())
continue;

/// With View over Iceberg table we have someting like 'materialize(time)' as column_name
/// Simple cheap check
if (column_name.starts_with("materialize(") && column_name.ends_with(")"))
continue;
/// With View over Iceberg table we have someting like 'materialize(time)' as column_name
/// Simple cheap check
if (column_name.starts_with("materialize(") && column_name.ends_with(")"))
continue;

/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;
/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;

/// Column is nullable and absent in file
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);

LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
/// Column is nullable and absent in file
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);

LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 alice 1.5
2 bob 2.5
3 carol 3.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on S3/MinIO

-- A stats-less Iceberg manifest (no per-column statistics) must return real
-- values, not NULLs: empty stats were misread as "all columns absent" (#1545).
SELECT * FROM icebergS3(s3_conn, filename = 'iceberg_no_column_stats') ORDER BY ALL
SETTINGS allow_experimental_iceberg_read_optimization = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
## How this data is generated?

A tiny Iceberg v2 table with three nullable columns and three rows:

| id | name | value |
|----|-------|-------|
| 1 | alice | 1.5 |
| 2 | bob | 2.5 |
| 3 | carol | 3.5 |

The point of this fixture is that its manifest is **stats-less**: the `data_file`
entry carries no per-column statistics at all (`column_sizes`, `value_counts`,
`null_value_counts`, `lower_bounds`, `upper_bounds` are all empty). Such manifests
are produced by writers that do not collect metrics. They left ClickHouse's
`DataFileMetaInfo::columns_info` empty, which the Iceberg read optimization used
to misread as "every column is absent", returning `NULL` for all of them.

It is generated by `generate.py` (needs `pyiceberg`, `pyarrow`, `fastavro`):

```bash
python3 generate.py <output_dir>
```

The script creates the table via `pyiceberg` with
`write.metadata.metrics.default = none`, then post-processes the manifest avro to
drop every per-column statistic and rewrites all internal paths to the stable
`s3a://test/iceberg_no_column_stats` prefix used by the test bucket.

Used by `tests/queries/0_stateless/04302_iceberg_read_optimization_no_column_stats.sql`.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""Generate the stats-less Iceberg fixture for 04302_iceberg_read_optimization_no_column_stats.

Creates a 3-row table, strips every per-column statistic from the manifest so
ClickHouse's `DataFileMetaInfo::columns_info` is empty, and rewrites internal
paths to a stable `s3a://test/<name>` prefix. See README.md. Usage: generate.py <out_dir>.
"""
import json
import shutil
import sys
import tempfile
from pathlib import Path

import fastavro
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, LongType, StringType, DoubleType

AVRO_RESERVED = {"avro.schema", "avro.codec"}

# Per-column statistics on a manifest data_file entry (all optional); clearing
# them all is what makes the manifest stats-less.
STAT_FIELDS = (
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
)


def deep_replace(obj, old, new):
if isinstance(obj, str):
return obj.replace(old, new)
if isinstance(obj, dict):
return {k: deep_replace(v, old, new) for k, v in obj.items()}
if isinstance(obj, list):
return [deep_replace(v, old, new) for v in obj]
return obj


def clear_stats(record):
df = record.get("data_file")
if isinstance(df, dict):
for field in STAT_FIELDS:
if field in df and df[field]:
df[field] = []
return record


def rewrite_avro(src: Path, dst: Path, old: str, new: str, strip_stats: bool):
with open(src, "rb") as f:
reader = fastavro.reader(f)
schema = reader.writer_schema
meta = {k: v for k, v in reader.metadata.items() if k not in AVRO_RESERVED}
records = [deep_replace(r, old, new) for r in reader]
if strip_stats:
records = [clear_stats(r) for r in records]
with open(dst, "wb") as f:
fastavro.writer(f, schema, records, metadata=meta)


def main(out_dir: str):
work = Path(tempfile.mkdtemp(prefix="iceberg_gen_"))
warehouse = work / "warehouse"
warehouse.mkdir(parents=True)

catalog = SqlCatalog(
"gen",
uri=f"sqlite:///{work}/catalog.db",
warehouse=f"file://{warehouse}",
)
catalog.create_namespace("ns")

schema = Schema(
NestedField(1, "id", LongType(), required=False),
NestedField(2, "name", StringType(), required=False),
NestedField(3, "value", DoubleType(), required=False),
)

# Reduces metrics, but pyiceberg still writes column_sizes (stripped below).
table = catalog.create_table(
"ns.no_stats",
schema=schema,
properties={"write.metadata.metrics.default": "none"},
)

data = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int64()),
"name": pa.array(["alice", "bob", "carol"], type=pa.string()),
"value": pa.array([1.5, 2.5, 3.5], type=pa.float64()),
}
)
table.append(data)

table_location = Path(table.location().replace("file://", ""))
old_prefix = table.location() # file:///tmp/.../ns.db/no_stats
out = Path(out_dir)
new_prefix = f"s3a://test/{out.name}" # s3a://test/iceberg_no_column_stats

if out.exists():
shutil.rmtree(out)
(out / "metadata").mkdir(parents=True)
(out / "data").mkdir(parents=True)

for f in (table_location / "data").rglob("*"):
if f.is_file():
rel = f.relative_to(table_location / "data")
target = out / "data" / rel
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(f, target)

meta_dir = table_location / "metadata"

# Keep only the latest metadata.json (the post-append snapshot). The empty
# create-time version and the history logs that reference it aren't read.
latest_json = max(
(f for f in meta_dir.iterdir() if f.name.endswith(".metadata.json")),
key=lambda f: int(f.name.split("-", 1)[0]),
)
meta = json.loads(latest_json.read_text().replace(old_prefix, new_prefix))
meta["metadata-log"] = []
meta["snapshot-log"] = []
(out / "metadata" / latest_json.name).write_text(json.dumps(meta, separators=(",", ":")))

for f in meta_dir.iterdir():
if f.name.endswith(".avro"):
# Only manifests carry data_file stats; the manifest list (snap-*) does not.
strip = not f.name.startswith("snap-")
rewrite_avro(f, out / "metadata" / f.name, old_prefix, new_prefix, strip)

print(f"old prefix: {old_prefix}")
print(f"new prefix: {new_prefix}")
print(f"table uuid: {table.metadata.table_uuid}")
print(f"copied to: {out}")
shutil.rmtree(work)


if __name__ == "__main__":
if len(sys.argv) != 2:
sys.exit(f"usage: {sys.argv[0]} <output_dir>")
main(sys.argv[1])
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"s3a://test/iceberg_no_column_stats","table-uuid":"d0e63068-996d-41ee-9b7d-31f1fb17f1b1","last-updated-ms":1782851862557,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"id","type":"long","required":false},{"id":2,"name":"name","type":"string","required":false},{"id":3,"name":"value","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{"write.metadata.metrics.default":"none"},"current-snapshot-id":7564025723254944482,"snapshots":[{"snapshot-id":7564025723254944482,"sequence-number":1,"timestamp-ms":1782851862557,"manifest-list":"s3a://test/iceberg_no_column_stats/metadata/snap-7564025723254944482-0-39d4e713-69c8-49f9-ab8e-f887af4bcecb.avro","summary":{"operation":"append","added-files-size":"1346","added-data-files":"1","added-records":"3","total-data-files":"1","total-delete-files":"0","total-records":"3","total-files-size":"1346","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":7564025723254944482,"type":"branch"}},"statistics":[],"partition-statistics":[],"format-version":2,"last-sequence-number":1}
Binary file not shown.
Binary file not shown.
Loading