Skip to content

Commit b9dc1fb

Browse files
authored
Merge pull request #3 from somratdutta/add-nessie-tests-1
Add nessie tests 1
2 parents 150a4ef + a780d34 commit b9dc1fb

3 files changed

Lines changed: 177 additions & 14 deletions

File tree

tests/integration/compose/docker_compose_iceberg_nessie_catalog.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ services:
1111
- nessie.catalog.default-warehouse=warehouse
1212
- nessie.catalog.warehouses.warehouse.location=s3://warehouse-rest/
1313
- nessie.catalog.service.s3.default-options.endpoint=http://minio:9000/
14+
- nessie.catalog.service.s3.default-options.external-endpoint=http://127.0.0.1:9002/
1415
- nessie.catalog.service.s3.default-options.access-key=urn:nessie-secret:quarkus:nessie.catalog.secrets.access-key
1516
- nessie.catalog.service.s3.default-options.path-style-access=true
1617
- nessie.catalog.service.s3.default-options.auth-type=STATIC
@@ -35,10 +36,10 @@ services:
3536
networks:
3637
default:
3738
aliases:
38-
- warehouse.minio
39+
- warehouse-rest.minio
3940
ports:
40-
- 9001
41-
- 9000
41+
- "9001:9001"
42+
- "9002:9000"
4243
command: ["server", "/data", "--console-address", ":9001"]
4344

4445
# TODO: move this code to cluster.py
@@ -58,4 +59,4 @@ services:
5859
/usr/bin/mc mb minio/warehouse-rest --ignore-existing;
5960
/usr/bin/mc policy set public minio/warehouse-rest;
6061
tail -f /dev/null
61-
"
62+
"
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
<clickhouse>
2+
<backups>
3+
<allowed_path>/backups</allowed_path>
4+
</backups>
5+
</clickhouse>

tests/integration/test_database_iceberg_nessie_catalog/test.py

Lines changed: 167 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,7 @@ def create_clickhouse_iceberg_database(
9696

9797
def load_catalog_impl(started_cluster):
9898
minio_ip = started_cluster.get_instance_ip('minio')
99-
s3_endpoint = f"http://{minio_ip}:9000"
100-
101-
# Add minio hostname mapping so PyIceberg can resolve 'minio' hostnames in table metadata
102-
import subprocess
103-
try:
104-
subprocess.run(['bash', '-c', f'echo "{minio_ip} minio" >> /etc/hosts'], check=True)
105-
print(f"Added minio hostname mapping: {minio_ip} minio")
106-
except Exception as e:
107-
print(f"Failed to add hostname mapping: {e}")
99+
s3_endpoint = f"http://{minio_ip}:9002"
108100

109101
return RestCatalog(
110102
name="my_catalog",
@@ -127,7 +119,7 @@ def started_cluster():
127119
cluster = ClickHouseCluster(__file__)
128120
cluster.add_instance(
129121
"node1",
130-
main_configs=[],
122+
main_configs=["configs/backups.xml"],
131123
user_configs=[],
132124
stay_alive=True,
133125
with_iceberg_catalog=True,
@@ -345,3 +337,168 @@ def record(key):
345337

346338
assert 'aaa\naaa\naaa' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name}`").strip()
347339
assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip()
340+
341+
def test_backup_database(started_cluster):
342+
node = started_cluster.instances["node1"]
343+
create_clickhouse_iceberg_database(started_cluster, node, "backup_database")
344+
345+
backup_id = uuid.uuid4().hex
346+
backup_name = f"File('/backups/test_backup_{backup_id}/')"
347+
348+
node.query(f"BACKUP DATABASE backup_database TO {backup_name}")
349+
node.query("DROP DATABASE backup_database SYNC")
350+
assert "backup_database" not in node.query("SHOW DATABASES")
351+
352+
node.query(f"RESTORE DATABASE backup_database FROM {backup_name}", settings={"allow_experimental_database_iceberg": 1})
353+
assert (
354+
node.query("SHOW CREATE DATABASE backup_database")
355+
== "CREATE DATABASE backup_database\\nENGINE = DataLakeCatalog(\\'http://nessie:19120/iceberg/\\', \\'minio\\', \\'[HIDDEN]\\')\\nSETTINGS catalog_type = \\'rest\\', warehouse = \\'warehouse\\', storage_endpoint = \\'http://minio:9000/warehouse-rest\\'\n"
356+
)
357+
358+
def test_timestamps(started_cluster):
359+
node = started_cluster.instances["node1"]
360+
361+
test_ref = f"test_list_tables_{uuid.uuid4()}"
362+
table_name = f"{test_ref}_table"
363+
test_namespace = (f"{test_ref}_namespace",)
364+
test_table_identifier = (test_namespace[0], table_name)
365+
366+
catalog = load_catalog_impl(started_cluster)
367+
catalog.create_namespace(test_namespace)
368+
369+
simple_schema = Schema(
370+
NestedField(
371+
field_id=1, name="timestamp", field_type=TimestampType(), required=False
372+
),
373+
NestedField(
374+
field_id=2, name="timestamptz", field_type=TimestampType(), required=False
375+
),
376+
)
377+
378+
table = catalog.create_table(
379+
test_table_identifier,
380+
schema=simple_schema,
381+
properties={"write.metadata.compression-codec": "none"},
382+
)
383+
384+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
385+
386+
data = [
387+
{
388+
"timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0),
389+
"timestamptz": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0),
390+
}
391+
]
392+
df = pa.Table.from_pylist(data)
393+
table.append(df)
394+
395+
# Extract the table path from S3 location for ClickHouse Iceberg ENGINE configuration
396+
#
397+
# The table metadata contains the full S3 URI which needs to be processed:
398+
# table.metadata.location: s3://warehouse-rest/<test_namespace>/<test_table_uuid>
399+
# extracted_table_path: <test_namespace>/<test_table_uuid>
400+
401+
table_metadata = table.metadata
402+
table_location = table_metadata.location
403+
if "warehouse-rest/" in table_location:
404+
extracted_table_path = table_location.split("warehouse-rest/")[1]
405+
406+
result = node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{test_namespace[0]}.{table_name}`")
407+
assert result == f"CREATE TABLE {CATALOG_NAME}.`{test_namespace[0]}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/{extracted_table_path}/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
408+
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{test_namespace[0]}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n"
409+
410+
def test_insert(started_cluster):
411+
node = started_cluster.instances["node1"]
412+
413+
catalog = load_catalog_impl(started_cluster)
414+
415+
test_ref = f"test_insert_{uuid.uuid4().hex[:8]}"
416+
test_namespace = (f"{test_ref}_namespace",)
417+
418+
catalog.create_namespace(test_namespace)
419+
420+
test_table_name = f"{test_ref}_table"
421+
test_table_identifier = (test_namespace[0], test_table_name)
422+
423+
schema = Schema(
424+
NestedField(field_id=1, name="id", field_type=DoubleType(), required=False),
425+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
426+
)
427+
428+
table = catalog.create_table(
429+
test_table_identifier,
430+
schema=schema,
431+
properties={"write.metadata.compression-codec": "none"},
432+
)
433+
434+
data = [
435+
{"id": float(i), "data": f"data_{i}"} for i in range(10)
436+
]
437+
df = pa.Table.from_pylist(data)
438+
table.append(df)
439+
440+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
441+
442+
result = node.query(f"SELECT * FROM {CATALOG_NAME}.`{test_namespace[0]}.{test_table_name}` ORDER BY id")
443+
expected = "\n".join([f"{i}\tdata_{i}" for i in range(10)])
444+
assert result.strip() == expected
445+
446+
def test_create(started_cluster):
447+
node = started_cluster.instances["node1"]
448+
449+
catalog = load_catalog_impl(started_cluster)
450+
451+
test_ref = f"test_create_{uuid.uuid4().hex[:8]}"
452+
test_namespace = (f"{test_ref}_namespace",)
453+
454+
catalog.create_namespace(test_namespace)
455+
456+
test_table_name = f"{test_ref}_table"
457+
test_table_identifier = (test_namespace[0], test_table_name)
458+
459+
schema = Schema(
460+
NestedField(field_id=1, name="id", field_type=DoubleType(), required=False),
461+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
462+
)
463+
464+
catalog.create_table(
465+
test_table_identifier,
466+
schema=schema,
467+
properties={"write.metadata.compression-codec": "none"},
468+
)
469+
470+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
471+
472+
result = node.query(f"SHOW TABLES FROM {CATALOG_NAME}")
473+
assert test_table_name in result
474+
475+
def test_drop_table(started_cluster):
476+
node = started_cluster.instances["node1"]
477+
478+
catalog = load_catalog_impl(started_cluster)
479+
480+
test_ref = f"test_drop_table_{uuid.uuid4().hex[:8]}"
481+
test_namespace = (f"{test_ref}_namespace",)
482+
483+
catalog.create_namespace(test_namespace)
484+
485+
test_table_name = f"{test_ref}_table"
486+
test_table_identifier = (test_namespace[0], test_table_name)
487+
488+
schema = Schema(
489+
NestedField(field_id=1, name="id", field_type=DoubleType(), required=False),
490+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
491+
)
492+
493+
catalog.create_table(
494+
test_table_identifier,
495+
schema=schema,
496+
properties={"write.metadata.compression-codec": "none"},
497+
)
498+
499+
catalog.drop_table(test_table_identifier)
500+
501+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
502+
503+
result = node.query(f"SHOW TABLES FROM {CATALOG_NAME}")
504+
assert test_table_name not in result

0 commit comments

Comments
 (0)