diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml index cf0240d1a5..30fc3914f8 100644 --- a/crates/integration_tests/testdata/docker-compose.yaml +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -80,3 +80,29 @@ services: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 + ports: + - 15002:15002 # Spark Connect + - 4040:4040 # Spark UI + healthcheck: + test: ["CMD", "sh", "-c", "netstat -an | grep 15002 | grep LISTEN"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s + + provision: + image: python:3.12-slim + networks: + rest_bridge: + depends_on: + spark-iceberg: + condition: service_healthy + entrypoint: ["/bin/sh", "-c", "pip install -q 'pyspark[connect]==4.0.1' && python3 /opt/spark/provision.py && touch /tmp/provision_complete && tail -f /dev/null"] + volumes: + - ./spark/provision.py:/opt/spark/provision.py:ro + healthcheck: + test: ["CMD-SHELL", "[ -f /tmp/provision_complete ]"] + interval: 2s + timeout: 2s + retries: 90 + start_period: 20s diff --git a/crates/integration_tests/testdata/spark/Dockerfile b/crates/integration_tests/testdata/spark/Dockerfile index 7de700bde2..4b486c9001 100644 --- a/crates/integration_tests/testdata/spark/Dockerfile +++ b/crates/integration_tests/testdata/spark/Dockerfile @@ -13,47 +13,55 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM python:3.9-bullseye +ARG BASE_IMAGE_SPARK_VERSION=4.0.1 -RUN apt-get -qq update && \ - apt-get -qq install -y --no-install-recommends sudo curl openjdk-11-jdk && \ - apt-get -qq clean && \ - rm -rf /var/lib/apt/lists/* +FROM apache/spark:${BASE_IMAGE_SPARK_VERSION} -ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} -ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} -ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH +# Dependency versions - keep these compatible +ARG ICEBERG_VERSION=1.10.1 +ARG ICEBERG_SPARK_RUNTIME_VERSION=4.0_2.13 +ARG HADOOP_VERSION=3.4.1 +ARG SCALA_VERSION=2.13 +ARG AWS_SDK_VERSION=2.24.6 +ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2 -RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events +USER root WORKDIR ${SPARK_HOME} -ENV SPARK_VERSION=3.5.8 -ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 -ENV ICEBERG_VERSION=1.10.0 - -RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ - && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ - && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz - -# Download iceberg spark runtime -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ - && mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars - -# Download AWS bundle -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar +# Install curl for JAR downloads +RUN apt-get update && \ + apt-get install -y --no-install-recommends curl && \ + rm -rf /var/lib/apt/lists/* -COPY spark-defaults.conf /opt/spark/conf -ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" +# Copy configuration (early for better caching) +COPY --chown=spark:spark spark-defaults.conf ${SPARK_HOME}/conf/ -RUN chmod u+x /opt/spark/sbin/* && \ - chmod u+x /opt/spark/bin/* +# Create event log directory +RUN mkdir -p /home/iceberg/spark-events && \ + chown -R spark:spark /home/iceberg -WORKDIR '/home/' +# Required JAR dependencies +ENV JARS_TO_DOWNLOAD="\ + org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ + org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \ + org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \ + software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar" -COPY entrypoint.sh . -COPY provision.py . +# Download JARs with retry logic +RUN set -e && \ + cd "${SPARK_HOME}/jars" && \ + for jar_path in ${JARS_TO_DOWNLOAD}; do \ + jar_name=$(basename "${jar_path}") && \ + echo "Downloading ${jar_name}..." && \ + curl -fsSL --retry 3 --retry-delay 5 \ + -o "${jar_name}" \ + "${MAVEN_MIRROR}/${jar_path}" && \ + echo "✓ Downloaded ${jar_name}"; \ + done && \ + chown -R spark:spark "${SPARK_HOME}/jars" -HEALTHCHECK --retries=120 --interval=1s \ - CMD ls /tmp/ready || exit 1 +USER spark +WORKDIR ${SPARK_HOME} -ENTRYPOINT ["./entrypoint.sh"] +# Start Spark Connect server +CMD ["sh", "-c", "SPARK_NO_DAEMONIZE=true ${SPARK_HOME}/sbin/start-connect-server.sh"] diff --git a/crates/integration_tests/testdata/spark/entrypoint.sh b/crates/integration_tests/testdata/spark/entrypoint.sh deleted file mode 100755 index 51ec329f03..0000000000 --- a/crates/integration_tests/testdata/spark/entrypoint.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -set -e - -start-master.sh -p 7077 -start-worker.sh spark://spark-iceberg:7077 - -echo "Starting provision" -python3 ./provision.py - -echo "Finished provisioning" -touch /tmp/ready - -echo "Print logs" -tail -f $SPARK_HOME/logs/* diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index 899a49f6ac..c53a1dd842 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -18,17 +18,8 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr -# The configuration is important, otherwise we get many small -# parquet files with a single row. When a positional delete -# hits the Parquet file with one row, the parquet file gets -# dropped instead of having a merge-on-read delete file. -spark = ( - SparkSession - .builder - .config("spark.sql.shuffle.partitions", "1") - .config("spark.default.parallelism", "1") - .getOrCreate() -) +# Create SparkSession against the remote Spark Connect server +spark = SparkSession.builder.remote("sc://spark-iceberg:15002").getOrCreate() spark.sql(f"""CREATE NAMESPACE IF NOT EXISTS rest.default""") @@ -87,24 +78,25 @@ """ ) -spark.sql( - f""" -INSERT INTO rest.default.test_positional_merge_on_read_double_deletes -VALUES - (CAST('2023-03-01' AS date), 1, 'a'), - (CAST('2023-03-02' AS date), 2, 'b'), - (CAST('2023-03-03' AS date), 3, 'c'), - (CAST('2023-03-04' AS date), 4, 'd'), - (CAST('2023-03-05' AS date), 5, 'e'), - (CAST('2023-03-06' AS date), 6, 'f'), - (CAST('2023-03-07' AS date), 7, 'g'), - (CAST('2023-03-08' AS date), 8, 'h'), - (CAST('2023-03-09' AS date), 9, 'i'), - (CAST('2023-03-10' AS date), 10, 'j'), - (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); -""" -) +# Use coalesce(1) to write all data into a single parquet file. +# This ensures that deleting individual rows creates positional delete files +# instead of dropping entire files (which happens when a file has only one row). +spark.sql(""" + SELECT * FROM VALUES + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l') + AS t(dt, number, letter) +""").coalesce(1).writeTo("rest.default.test_positional_merge_on_read_double_deletes").append() # Creates two positional deletes that should be merged spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_double_deletes WHERE number = 9") diff --git a/crates/integration_tests/testdata/spark/spark-defaults.conf b/crates/integration_tests/testdata/spark/spark-defaults.conf index 9db0c7da9e..b576f624b9 100644 --- a/crates/integration_tests/testdata/spark/spark-defaults.conf +++ b/crates/integration_tests/testdata/spark/spark-defaults.conf @@ -16,14 +16,23 @@ # spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + +# Configure Iceberg REST catalog spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.rest.type rest spark.sql.catalog.rest.uri http://rest:8181 spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO spark.sql.catalog.rest.warehouse s3://warehouse/rest/ spark.sql.catalog.rest.s3.endpoint http://minio:9000 +spark.sql.catalog.rest.cache-enabled false + spark.sql.defaultCatalog rest + +# Configure Spark UI and event logging +spark.ui.enabled true spark.eventLog.enabled true spark.eventLog.dir /home/iceberg/spark-events + +spark.sql.ansi.enabled false spark.history.fs.logDirectory /home/iceberg/spark-events spark.sql.catalogImplementation in-memory