diff --git a/api/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentation.java b/api/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentation.java
new file mode 100644
index 000000000000..00a90bf1a07b
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+/** A SQL representation of a UDF, containing the SQL expression and dialect. */
+public interface SQLUdfRepresentation extends UdfRepresentation {
+
+ @Override
+ default String type() {
+ return Type.SQL;
+ }
+
+ /** The SQL expression text that defines the function body. */
+ String sql();
+
+ /** The SQL dialect identifier (e.g., "spark", "trino"). */
+ String dialect();
+}
diff --git a/api/src/main/java/org/apache/iceberg/udf/UdfParameter.java b/api/src/main/java/org/apache/iceberg/udf/UdfParameter.java
new file mode 100644
index 000000000000..f79ab59a345a
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/udf/UdfParameter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import javax.annotation.Nullable;
+
+/** A parameter declared in a UDF definition. */
+public interface UdfParameter {
+
+ /** The parameter name. */
+ String name();
+
+ /** The parameter data type. */
+ UdfType type();
+
+ /** A documentation string for the parameter. */
+ @Nullable
+ String doc();
+}
diff --git a/api/src/main/java/org/apache/iceberg/udf/UdfRepresentation.java b/api/src/main/java/org/apache/iceberg/udf/UdfRepresentation.java
new file mode 100644
index 000000000000..e58210f5f205
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/udf/UdfRepresentation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+/**
+ * Describes how a UDF's logic is expressed, for example as a SQL body with a specific dialect. A
+ * UDF definition version may carry one or more representations so that engines can pick a form they
+ * understand.
+ */
+public interface UdfRepresentation {
+
+ /** Standard representation type names used in UDF metadata. */
+ class Type {
+ private Type() {}
+
+ /** A SQL body representation, see {@link SQLUdfRepresentation}. */
+ public static final String SQL = "sql";
+ }
+
+ /** Returns the representation type, e.g., {@link Type#SQL}. */
+ String type();
+}
diff --git a/api/src/main/java/org/apache/iceberg/udf/UdfType.java b/api/src/main/java/org/apache/iceberg/udf/UdfType.java
new file mode 100644
index 000000000000..60442f09e110
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/udf/UdfType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+/**
+ * Represents a UDF data type as defined in the UDF spec. UDF types are based on Iceberg types but
+ * intentionally omit field IDs and element nullability. Concrete implementations live as static
+ * nested classes on {@link UdfTypes}: {@link UdfTypes.PrimitiveType} for primitive and
+ * semi-structured types, and the nested types {@link UdfTypes.ListType}, {@link UdfTypes.MapType},
+ * and {@link UdfTypes.StructType}.
+ */
+public interface UdfType {
+
+ enum TypeID {
+ PRIMITIVE,
+ LIST,
+ MAP,
+ STRUCT
+ }
+
+ TypeID typeId();
+
+ default boolean isPrimitiveType() {
+ return typeId() == TypeID.PRIMITIVE;
+ }
+
+ default boolean isListType() {
+ return typeId() == TypeID.LIST;
+ }
+
+ default boolean isMapType() {
+ return typeId() == TypeID.MAP;
+ }
+
+ default boolean isStructType() {
+ return typeId() == TypeID.STRUCT;
+ }
+
+ default UdfTypes.PrimitiveType asPrimitiveType() {
+ throw new IllegalArgumentException("Not a primitive type: " + this);
+ }
+
+ default UdfTypes.ListType asListType() {
+ throw new IllegalArgumentException("Not a list type: " + this);
+ }
+
+ default UdfTypes.MapType asMapType() {
+ throw new IllegalArgumentException("Not a map type: " + this);
+ }
+
+ default UdfTypes.StructType asStructType() {
+ throw new IllegalArgumentException("Not a struct type: " + this);
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/udf/UdfTypes.java b/api/src/main/java/org/apache/iceberg/udf/UdfTypes.java
new file mode 100644
index 000000000000..8a0dcf3a9d74
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/udf/UdfTypes.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Concrete implementations of {@link UdfType}: {@link PrimitiveType} for primitive and
+ * semi-structured types and {@link ListType}, {@link MapType}, {@link StructType} for nested types.
+ * {@link NestedField} represents a named field inside a {@link StructType}.
+ */
+public class UdfTypes {
+
+ private UdfTypes() {}
+
+ /**
+ * A UDF primitive or semi-structured type, encoded as a type string (e.g., {@code int}, {@code
+ * string}, {@code decimal(9, 2)}, {@code variant}).
+ *
+ *
The type string must be a recognized Iceberg primitive or semi-structured type as understood
+ * by {@link Types#fromTypeName(String)}. The input is canonicalized to Iceberg's standard form
+ * (lowercase, normalized whitespace), so {@code PrimitiveType.of("INT")} and {@code
+ * PrimitiveType.of("Decimal( 9 , 2 )")} produce {@code int} and {@code decimal(9, 2)}
+ * respectively.
+ */
+ public static final class PrimitiveType implements UdfType {
+
+ private final String typeString;
+
+ public static PrimitiveType of(String typeString) {
+ Preconditions.checkArgument(typeString != null, "Invalid primitive type: null");
+ // Validate against Iceberg's primitive/semi-structured type vocabulary and use the parsed
+ // type's canonical toString() so callers don't have to worry about casing or whitespace.
+ String canonical = Types.fromTypeName(typeString).toString();
+ return new PrimitiveType(canonical);
+ }
+
+ private PrimitiveType(String typeString) {
+ this.typeString = typeString;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.PRIMITIVE;
+ }
+
+ @Override
+ public PrimitiveType asPrimitiveType() {
+ return this;
+ }
+
+ /** The primitive or semi-structured type string. */
+ public String typeString() {
+ return typeString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof PrimitiveType)) {
+ return false;
+ }
+
+ return Objects.equals(typeString, ((PrimitiveType) o).typeString);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(PrimitiveType.class, typeString);
+ }
+
+ @Override
+ public String toString() {
+ return typeString;
+ }
+ }
+
+ /** A UDF list type with an element type. */
+ public static final class ListType implements UdfType {
+
+ private final UdfType elementType;
+
+ public static ListType of(UdfType elementType) {
+ Preconditions.checkArgument(elementType != null, "Invalid element type: null");
+ return new ListType(elementType);
+ }
+
+ private ListType(UdfType elementType) {
+ this.elementType = elementType;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.LIST;
+ }
+
+ @Override
+ public ListType asListType() {
+ return this;
+ }
+
+ public UdfType elementType() {
+ return elementType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof ListType)) {
+ return false;
+ }
+
+ return Objects.equals(elementType, ((ListType) o).elementType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ListType.class, elementType);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("list<%s>", elementType);
+ }
+ }
+
+ /** A UDF map type with key and value types. */
+ public static final class MapType implements UdfType {
+
+ private final UdfType keyType;
+ private final UdfType valueType;
+
+ public static MapType of(UdfType keyType, UdfType valueType) {
+ Preconditions.checkArgument(keyType != null, "Invalid key type: null");
+ Preconditions.checkArgument(valueType != null, "Invalid value type: null");
+ return new MapType(keyType, valueType);
+ }
+
+ private MapType(UdfType keyType, UdfType valueType) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.MAP;
+ }
+
+ @Override
+ public MapType asMapType() {
+ return this;
+ }
+
+ public UdfType keyType() {
+ return keyType;
+ }
+
+ public UdfType valueType() {
+ return valueType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof MapType)) {
+ return false;
+ }
+
+ MapType that = (MapType) o;
+ return Objects.equals(keyType, that.keyType) && Objects.equals(valueType, that.valueType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(MapType.class, keyType, valueType);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("map<%s,%s>", keyType, valueType);
+ }
+ }
+
+ /**
+ * A UDF struct type with an ordered list of named fields. Based on Iceberg struct types but
+ * intentionally omits field IDs and element nullability.
+ */
+ public static final class StructType implements UdfType {
+
+ private final List fields;
+
+ public static StructType of(NestedField... fields) {
+ Preconditions.checkArgument(fields != null, "Invalid fields: null");
+ return of(Arrays.asList(fields));
+ }
+
+ public static StructType of(List fields) {
+ Preconditions.checkArgument(fields != null, "Invalid fields: null");
+ return new StructType(ImmutableList.copyOf(fields));
+ }
+
+ private StructType(List fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.STRUCT;
+ }
+
+ @Override
+ public StructType asStructType() {
+ return this;
+ }
+
+ public List fields() {
+ return fields;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof StructType)) {
+ return false;
+ }
+
+ return Objects.equals(fields, ((StructType) o).fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(StructType.class, fields);
+ }
+
+ @Override
+ public String toString() {
+ return fields.stream()
+ .map(NestedField::toString)
+ .collect(Collectors.joining(",", "struct<", ">"));
+ }
+ }
+
+ /** A field within a {@link StructType}, with a name and a type. */
+ public static final class NestedField {
+
+ private final String name;
+ private final UdfType type;
+
+ public static NestedField of(String name, UdfType type) {
+ Preconditions.checkArgument(name != null, "Invalid field name: null");
+ Preconditions.checkArgument(type != null, "Invalid field type: null");
+ return new NestedField(name, type);
+ }
+
+ private NestedField(String name, UdfType type) {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public UdfType type() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof NestedField)) {
+ return false;
+ }
+
+ NestedField that = (NestedField) o;
+ return Objects.equals(name, that.name) && Objects.equals(type, that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s:%s", name, type);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/BaseSQLUdfRepresentation.java b/core/src/main/java/org/apache/iceberg/udf/BaseSQLUdfRepresentation.java
new file mode 100644
index 000000000000..43df1e243fd5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/BaseSQLUdfRepresentation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Value.Include(value = SQLUdfRepresentation.class)
+@SuppressWarnings("ImmutablesStyle")
+@Value.Style(
+ typeImmutable = "ImmutableSQLUdfRepresentation",
+ visibilityString = "PUBLIC",
+ builderVisibilityString = "PUBLIC")
+interface BaseSQLUdfRepresentation extends SQLUdfRepresentation {}
diff --git a/core/src/main/java/org/apache/iceberg/udf/BaseUdfParameter.java b/core/src/main/java/org/apache/iceberg/udf/BaseUdfParameter.java
new file mode 100644
index 000000000000..8531a92cbc15
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/BaseUdfParameter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import javax.annotation.Nullable;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@SuppressWarnings("ImmutablesStyle")
+@Value.Style(
+ typeImmutable = "ImmutableUdfParameter",
+ visibilityString = "PUBLIC",
+ builderVisibilityString = "PUBLIC")
+interface BaseUdfParameter extends UdfParameter {
+ @Override
+ @Nullable
+ String doc();
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentationParser.java b/core/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentationParser.java
new file mode 100644
index 000000000000..7174c8e6de7c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/SQLUdfRepresentationParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class SQLUdfRepresentationParser {
+ private static final String SQL = "sql";
+ private static final String DIALECT = "dialect";
+
+ private SQLUdfRepresentationParser() {}
+
+ static String toJson(SQLUdfRepresentation representation) {
+ return JsonUtil.generate(gen -> toJson(representation, gen), false);
+ }
+
+ static void toJson(SQLUdfRepresentation representation, JsonGenerator generator)
+ throws IOException {
+ Preconditions.checkArgument(representation != null, "Invalid SQL UDF representation: null");
+ generator.writeStartObject();
+ generator.writeStringField(UdfRepresentationParser.TYPE, representation.type());
+ generator.writeStringField(SQL, representation.sql());
+ generator.writeStringField(DIALECT, representation.dialect());
+ generator.writeEndObject();
+ }
+
+ static SQLUdfRepresentation fromJson(String json) {
+ return JsonUtil.parse(json, SQLUdfRepresentationParser::fromJson);
+ }
+
+ static SQLUdfRepresentation fromJson(JsonNode node) {
+ Preconditions.checkArgument(
+ node != null, "Cannot parse SQL UDF representation from null object");
+ Preconditions.checkArgument(
+ node.isObject(), "Cannot parse SQL UDF representation from non-object: %s", node);
+ return ImmutableSQLUdfRepresentation.builder()
+ .sql(JsonUtil.getString(SQL, node))
+ .dialect(JsonUtil.getString(DIALECT, node))
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/UdfParameterParser.java b/core/src/main/java/org/apache/iceberg/udf/UdfParameterParser.java
new file mode 100644
index 000000000000..bd9df272c950
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/UdfParameterParser.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class UdfParameterParser {
+ private static final String NAME = "name";
+ private static final String TYPE = "type";
+ private static final String DOC = "doc";
+
+ private UdfParameterParser() {}
+
+ static String toJson(UdfParameter parameter) {
+ return JsonUtil.generate(gen -> toJson(parameter, gen), false);
+ }
+
+ static void toJson(UdfParameter parameter, JsonGenerator generator) throws IOException {
+ Preconditions.checkArgument(parameter != null, "Invalid UDF parameter: null");
+ generator.writeStartObject();
+ generator.writeStringField(NAME, parameter.name());
+ UdfTypeUtil.writeType(TYPE, parameter.type(), generator);
+ if (parameter.doc() != null) {
+ generator.writeStringField(DOC, parameter.doc());
+ }
+
+ generator.writeEndObject();
+ }
+
+ static UdfParameter fromJson(String json) {
+ return JsonUtil.parse(json, UdfParameterParser::fromJson);
+ }
+
+ static UdfParameter fromJson(JsonNode node) {
+ Preconditions.checkArgument(node != null, "Cannot parse UDF parameter from null object");
+ Preconditions.checkArgument(
+ node.isObject(), "Cannot parse UDF parameter from non-object: %s", node);
+
+ ImmutableUdfParameter.Builder builder =
+ ImmutableUdfParameter.builder()
+ .name(JsonUtil.getString(NAME, node))
+ .type(UdfTypeUtil.readType(node.get(TYPE)));
+
+ if (node.has(DOC)) {
+ builder.doc(JsonUtil.getString(DOC, node));
+ }
+
+ return builder.build();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/UdfRepresentationParser.java b/core/src/main/java/org/apache/iceberg/udf/UdfRepresentationParser.java
new file mode 100644
index 000000000000..54252732212b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/UdfRepresentationParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Locale;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class UdfRepresentationParser {
+ static final String TYPE = "type";
+
+ private UdfRepresentationParser() {}
+
+ static void toJson(UdfRepresentation representation, JsonGenerator generator) throws IOException {
+ Preconditions.checkArgument(representation != null, "Invalid UDF representation: null");
+ switch (representation.type().toLowerCase(Locale.ROOT)) {
+ case UdfRepresentation.Type.SQL ->
+ SQLUdfRepresentationParser.toJson((SQLUdfRepresentation) representation, generator);
+ default ->
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot serialize unsupported UDF representation: %s", representation.type()));
+ }
+ }
+
+ static String toJson(UdfRepresentation representation) {
+ return JsonUtil.generate(gen -> toJson(representation, gen), false);
+ }
+
+ static UdfRepresentation fromJson(String json) {
+ return JsonUtil.parse(json, UdfRepresentationParser::fromJson);
+ }
+
+ static UdfRepresentation fromJson(JsonNode node) {
+ Preconditions.checkArgument(node != null, "Cannot parse UDF representation from null object");
+ Preconditions.checkArgument(
+ node.isObject(), "Cannot parse UDF representation from non-object: %s", node);
+ String type = JsonUtil.getString(TYPE, node).toLowerCase(Locale.ROOT);
+ return switch (type) {
+ case UdfRepresentation.Type.SQL -> SQLUdfRepresentationParser.fromJson(node);
+ default -> ImmutableUnknownUdfRepresentation.builder().type(type).build();
+ };
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/UdfTypeUtil.java b/core/src/main/java/org/apache/iceberg/udf/UdfTypeUtil.java
new file mode 100644
index 000000000000..1dedca1f8a6c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/UdfTypeUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+
+/**
+ * Utility for reading and writing UDF types. A type is either a primitive type string (e.g., "int",
+ * "string", "variant") or a JSON object for nested types (struct, list, map).
+ */
+class UdfTypeUtil {
+
+ private static final String TYPE = "type";
+ private static final String LIST = "list";
+ private static final String MAP = "map";
+ private static final String STRUCT = "struct";
+ private static final String ELEMENT = "element";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+ private static final String FIELDS = "fields";
+ private static final String NAME = "name";
+
+ private UdfTypeUtil() {}
+
+ /** Reads a UDF type from a JSON node. */
+ static UdfType readType(JsonNode node) {
+ Preconditions.checkArgument(node != null, "Cannot read type from null node");
+
+ if (node.isTextual()) {
+ return UdfTypes.PrimitiveType.of(node.asText());
+ } else if (node.isObject()) {
+ String typeName = JsonUtil.getString(TYPE, node).toLowerCase(Locale.ROOT);
+ return switch (typeName) {
+ case LIST -> UdfTypes.ListType.of(readType(JsonUtil.get(ELEMENT, node)));
+ case MAP ->
+ UdfTypes.MapType.of(
+ readType(JsonUtil.get(KEY, node)), readType(JsonUtil.get(VALUE, node)));
+ case STRUCT -> readStruct(node);
+ default ->
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot parse UDF type from object with unknown type %s: %s", typeName, node));
+ };
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Cannot parse UDF type from node: %s", node));
+ }
+ }
+
+ private static UdfTypes.StructType readStruct(JsonNode node) {
+ JsonNode fieldsNode = JsonUtil.get(FIELDS, node);
+ Preconditions.checkArgument(
+ fieldsNode.isArray(), "Cannot parse struct type from non-array fields: %s", fieldsNode);
+
+ ImmutableList.Builder fields = ImmutableList.builder();
+ for (JsonNode fieldNode : fieldsNode) {
+ Preconditions.checkArgument(
+ fieldNode.isObject(), "Cannot parse struct field from non-object: %s", fieldNode);
+ fields.add(
+ UdfTypes.NestedField.of(
+ JsonUtil.getString(NAME, fieldNode), readType(JsonUtil.get(TYPE, fieldNode))));
+ }
+
+ return UdfTypes.StructType.of(fields.build());
+ }
+
+ /** Writes a UDF type to a JSON generator under the given field name. */
+ static void writeType(String fieldName, UdfType type, JsonGenerator generator)
+ throws IOException {
+ Preconditions.checkArgument(type != null, "Invalid type: null");
+ generator.writeFieldName(fieldName);
+ writeTypeValue(type, generator);
+ }
+
+ private static void writeTypeValue(UdfType type, JsonGenerator generator) throws IOException {
+ switch (type.typeId()) {
+ case PRIMITIVE -> generator.writeString(type.asPrimitiveType().typeString());
+ case LIST -> {
+ generator.writeStartObject();
+ generator.writeStringField(TYPE, LIST);
+ writeType(ELEMENT, type.asListType().elementType(), generator);
+ generator.writeEndObject();
+ }
+ case MAP -> {
+ UdfTypes.MapType mapType = type.asMapType();
+ generator.writeStartObject();
+ generator.writeStringField(TYPE, MAP);
+ writeType(KEY, mapType.keyType(), generator);
+ writeType(VALUE, mapType.valueType(), generator);
+ generator.writeEndObject();
+ }
+ case STRUCT -> {
+ List fields = type.asStructType().fields();
+ generator.writeStartObject();
+ generator.writeStringField(TYPE, STRUCT);
+ generator.writeArrayFieldStart(FIELDS);
+ for (UdfTypes.NestedField field : fields) {
+ generator.writeStartObject();
+ generator.writeStringField(NAME, field.name());
+ writeType(TYPE, field.type(), generator);
+ generator.writeEndObject();
+ }
+ generator.writeEndArray();
+ generator.writeEndObject();
+ }
+ default -> throw new IllegalArgumentException("Unknown UDF type: " + type);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/udf/UnknownUdfRepresentation.java b/core/src/main/java/org/apache/iceberg/udf/UnknownUdfRepresentation.java
new file mode 100644
index 000000000000..f8d9fd28fe4d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/udf/UnknownUdfRepresentation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Value.Style(
+ typeImmutable = "ImmutableUnknownUdfRepresentation",
+ visibilityString = "PACKAGE",
+ builderVisibilityString = "PACKAGE")
+interface UnknownUdfRepresentation extends UdfRepresentation {}
diff --git a/core/src/test/java/org/apache/iceberg/udf/TestSQLUdfRepresentationParser.java b/core/src/test/java/org/apache/iceberg/udf/TestSQLUdfRepresentationParser.java
new file mode 100644
index 000000000000..518e67229520
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/udf/TestSQLUdfRepresentationParser.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.Test;
+
+class TestSQLUdfRepresentationParser {
+
+ @Test
+ void parseSqlUdfRepresentation() {
+ String json =
+ """
+ {"type":"sql", "sql": "x + 1", "dialect": "spark"}""";
+ SQLUdfRepresentation representation =
+ ImmutableSQLUdfRepresentation.builder().sql("x + 1").dialect("spark").build();
+
+ assertThat(SQLUdfRepresentationParser.fromJson(json))
+ .as("Should be able to parse valid SQL UDF representation")
+ .isEqualTo(representation);
+ }
+
+ @Test
+ void parseMissingRequiredFields() {
+ String missingDialect =
+ """
+ {"type":"sql", "sql": "x + 1"}""";
+ assertThatThrownBy(() -> UdfRepresentationParser.fromJson(missingDialect))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: dialect");
+
+ String missingSql =
+ """
+ {"type":"sql", "dialect": "spark"}""";
+ assertThatThrownBy(() -> UdfRepresentationParser.fromJson(missingSql))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: sql");
+
+ String missingType =
+ """
+ {"sql":"x + 1","dialect":"spark"}""";
+ assertThatThrownBy(() -> UdfRepresentationParser.fromJson(missingType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: type");
+ }
+
+ @Test
+ void roundTripSerialization() {
+ SQLUdfRepresentation representation =
+ ImmutableSQLUdfRepresentation.builder().sql("x + 1").dialect("spark").build();
+
+ String serialized = UdfRepresentationParser.toJson(representation);
+ assertThat(UdfRepresentationParser.fromJson(serialized)).isEqualTo(representation);
+ }
+
+ @Test
+ void roundTripWithTrinoDialect() {
+ SQLUdfRepresentation representation =
+ ImmutableSQLUdfRepresentation.builder().sql("x + 1.0").dialect("trino").build();
+
+ String serialized = UdfRepresentationParser.toJson(representation);
+ assertThat(UdfRepresentationParser.fromJson(serialized)).isEqualTo(representation);
+ }
+
+ @Test
+ void nullSqlUdfRepresentation() {
+ assertThatThrownBy(() -> SQLUdfRepresentationParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid SQL UDF representation: null");
+
+ assertThatThrownBy(() -> SQLUdfRepresentationParser.fromJson((JsonNode) null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse SQL UDF representation from null object");
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/udf/TestUdfParameterParser.java b/core/src/test/java/org/apache/iceberg/udf/TestUdfParameterParser.java
new file mode 100644
index 000000000000..c86bc3eeb780
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/udf/TestUdfParameterParser.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.Test;
+
+class TestUdfParameterParser {
+
+ @Test
+ void parsePrimitiveTypeParameter() {
+ String json =
+ """
+ {"name":"x","type":"int"}""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder().name("x").type(UdfTypes.PrimitiveType.of("int")).build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void parseParameterWithDoc() {
+ String json =
+ """
+ {"name":"x","type":"int","doc":"Input integer"}""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder()
+ .name("x")
+ .type(UdfTypes.PrimitiveType.of("int"))
+ .doc("Input integer")
+ .build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void parseListTypeParameter() {
+ String json =
+ """
+ {
+ "name": "items",
+ "type": {
+ "type": "list",
+ "element": "string"
+ }
+ }""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder()
+ .name("items")
+ .type(UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string")))
+ .build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void parseMapTypeParameter() {
+ String json =
+ """
+ {
+ "name": "lookup",
+ "type": {
+ "type": "map",
+ "key": "string",
+ "value": "int"
+ }
+ }""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder()
+ .name("lookup")
+ .type(
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int")))
+ .build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void parseStructTypeParameter() {
+ String json =
+ """
+ {
+ "name": "row",
+ "type": {
+ "type": "struct",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "label", "type": "string"}
+ ]
+ }
+ }""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder()
+ .name("row")
+ .type(
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")),
+ UdfTypes.NestedField.of("label", UdfTypes.PrimitiveType.of("string"))))
+ .build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void parseNestedListOfStruct() {
+ String json =
+ """
+ {
+ "name": "records",
+ "type": {
+ "type": "list",
+ "element": {
+ "type": "struct",
+ "fields": [
+ {"name": "id", "type": "int"}
+ ]
+ }
+ }
+ }""";
+ UdfParameter expected =
+ ImmutableUdfParameter.builder()
+ .name("records")
+ .type(
+ UdfTypes.ListType.of(
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")))))
+ .build();
+
+ assertThat(UdfParameterParser.fromJson(json)).isEqualTo(expected);
+ }
+
+ @Test
+ void roundTripPrimitiveType() {
+ UdfParameter parameter =
+ ImmutableUdfParameter.builder()
+ .name("x")
+ .type(UdfTypes.PrimitiveType.of("int"))
+ .doc("Input integer")
+ .build();
+
+ String serialized = UdfParameterParser.toJson(parameter);
+ assertThat(UdfParameterParser.fromJson(serialized)).isEqualTo(parameter);
+ }
+
+ @Test
+ void roundTripListType() {
+ UdfParameter parameter =
+ ImmutableUdfParameter.builder()
+ .name("items")
+ .type(UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string")))
+ .build();
+
+ String serialized = UdfParameterParser.toJson(parameter);
+ assertThat(UdfParameterParser.fromJson(serialized)).isEqualTo(parameter);
+ }
+
+ @Test
+ void roundTripMapType() {
+ UdfParameter parameter =
+ ImmutableUdfParameter.builder()
+ .name("lookup")
+ .type(
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int")))
+ .build();
+
+ String serialized = UdfParameterParser.toJson(parameter);
+ assertThat(UdfParameterParser.fromJson(serialized)).isEqualTo(parameter);
+ }
+
+ @Test
+ void roundTripStructType() {
+ UdfParameter parameter =
+ ImmutableUdfParameter.builder()
+ .name("row")
+ .type(
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")),
+ UdfTypes.NestedField.of("label", UdfTypes.PrimitiveType.of("string"))))
+ .build();
+
+ String serialized = UdfParameterParser.toJson(parameter);
+ assertThat(UdfParameterParser.fromJson(serialized)).isEqualTo(parameter);
+ }
+
+ @Test
+ void nullParameter() {
+ assertThatThrownBy(() -> UdfParameterParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid UDF parameter: null");
+
+ assertThatThrownBy(() -> UdfParameterParser.fromJson((JsonNode) null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse UDF parameter from null object");
+ }
+
+ @Test
+ void missingRequiredFields() {
+ String missingName =
+ """
+ {"type":"int"}""";
+ assertThatThrownBy(() -> UdfParameterParser.fromJson(missingName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: name");
+
+ String missingType =
+ """
+ {"name":"x"}""";
+ assertThatThrownBy(() -> UdfParameterParser.fromJson(missingType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot read type from null node");
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/udf/TestUdfRepresentationParser.java b/core/src/test/java/org/apache/iceberg/udf/TestUdfRepresentationParser.java
new file mode 100644
index 000000000000..0536cc2ad942
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/udf/TestUdfRepresentationParser.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+class TestUdfRepresentationParser {
+
+ @Test
+ void parseUnknownRepresentation() {
+ String json =
+ """
+ {"type":"python"}""";
+ UdfRepresentation unknownRepresentation = UdfRepresentationParser.fromJson(json);
+ assertThat(unknownRepresentation)
+ .isEqualTo(ImmutableUnknownUdfRepresentation.builder().type("python").build());
+
+ assertThatThrownBy(() -> UdfRepresentationParser.toJson(unknownRepresentation))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot serialize unsupported UDF representation: python");
+ }
+
+ @Test
+ void nullRepresentation() {
+ assertThatThrownBy(() -> UdfRepresentationParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid UDF representation: null");
+ }
+
+ @Test
+ void missingType() {
+ assertThatThrownBy(
+ () ->
+ UdfRepresentationParser.fromJson(
+ """
+ {"sql":"x + 1"}"""))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: type");
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/udf/TestUdfTypeUtil.java b/core/src/test/java/org/apache/iceberg/udf/TestUdfTypeUtil.java
new file mode 100644
index 000000000000..51072f406901
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/udf/TestUdfTypeUtil.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.udf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.jupiter.api.Test;
+
+class TestUdfTypeUtil {
+
+ @Test
+ void readPrimitiveType() {
+ JsonNode node = JsonUtil.mapper().valueToTree("int");
+ UdfType type = UdfTypeUtil.readType(node);
+ assertThat(type).isEqualTo(UdfTypes.PrimitiveType.of("int"));
+ }
+
+ @Test
+ void readDecimalType() {
+ JsonNode node = JsonUtil.mapper().valueToTree("decimal(9,2)");
+ UdfType type = UdfTypeUtil.readType(node);
+ assertThat(type).isEqualTo(UdfTypes.PrimitiveType.of("decimal(9,2)"));
+ }
+
+ @Test
+ void readVariantType() {
+ JsonNode node = JsonUtil.mapper().valueToTree("variant");
+ UdfType type = UdfTypeUtil.readType(node);
+ assertThat(type).isEqualTo(UdfTypes.PrimitiveType.of("variant"));
+ }
+
+ @Test
+ void readListType() {
+ String json =
+ """
+ {"type":"list","element":"string"}""";
+ JsonNode node = JsonUtil.parse(json, n -> n);
+ UdfType type = UdfTypeUtil.readType(node);
+ assertThat(type).isEqualTo(UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string")));
+ }
+
+ @Test
+ void readMapType() {
+ String json =
+ """
+ {"type":"map","key":"string","value":"int"}""";
+ JsonNode node = JsonUtil.parse(json, n -> n);
+ UdfType type = UdfTypeUtil.readType(node);
+ assertThat(type)
+ .isEqualTo(
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int")));
+ }
+
+ @Test
+ void readStructType() {
+ String structJson =
+ """
+ {
+ "type": "struct",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }""";
+ JsonNode node = JsonUtil.parse(structJson, n -> n);
+ UdfType expected =
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")),
+ UdfTypes.NestedField.of("name", UdfTypes.PrimitiveType.of("string")));
+
+ assertThat(UdfTypeUtil.readType(node)).isEqualTo(expected);
+ }
+
+ @Test
+ void readNestedListOfMap() {
+ String json =
+ """
+ {
+ "type": "list",
+ "element": {
+ "type": "map",
+ "key": "string",
+ "value": "int"
+ }
+ }""";
+ JsonNode node = JsonUtil.parse(json, n -> n);
+ UdfType expected =
+ UdfTypes.ListType.of(
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int")));
+
+ assertThat(UdfTypeUtil.readType(node)).isEqualTo(expected);
+ }
+
+ @Test
+ void primitiveTypeRejectsUnknownVocabulary() {
+ assertThatThrownBy(() -> UdfTypes.PrimitiveType.of("foo"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse type string to primitive: foo");
+
+ assertThatThrownBy(() -> UdfTypes.PrimitiveType.of("struct"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse type string to primitive: struct");
+ }
+
+ @Test
+ void readPrimitiveTypeIsCaseInsensitive() {
+ JsonNode upper = JsonUtil.mapper().valueToTree("INT");
+ JsonNode mixed = JsonUtil.mapper().valueToTree("Decimal(9,2)");
+ assertThat(UdfTypeUtil.readType(upper)).isEqualTo(UdfTypes.PrimitiveType.of("int"));
+ assertThat(UdfTypeUtil.readType(mixed)).isEqualTo(UdfTypes.PrimitiveType.of("decimal(9,2)"));
+ }
+
+ @Test
+ void readNestedTypeNameIsCaseInsensitive() {
+ String listJson =
+ """
+ {"type":"LIST","element":"string"}""";
+ assertThat(UdfTypeUtil.readType(JsonUtil.parse(listJson, n -> n)))
+ .isEqualTo(UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string")));
+
+ String mapJson =
+ """
+ {"type":"Map","key":"string","value":"int"}""";
+ assertThat(UdfTypeUtil.readType(JsonUtil.parse(mapJson, n -> n)))
+ .isEqualTo(
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int")));
+
+ String structJson =
+ """
+ {"type":"STRUCT","fields":[{"name":"id","type":"int"}]}""";
+ assertThat(UdfTypeUtil.readType(JsonUtil.parse(structJson, n -> n)))
+ .isEqualTo(
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int"))));
+ }
+
+ @Test
+ void readNullNode() {
+ assertThatThrownBy(() -> UdfTypeUtil.readType(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot read type from null node");
+ }
+
+ @Test
+ void readArrayNode() {
+ JsonNode node = JsonUtil.mapper().valueToTree(new int[] {1, 2, 3});
+ assertThatThrownBy(() -> UdfTypeUtil.readType(node))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot parse UDF type from node:");
+ }
+
+ @Test
+ void readUnknownNestedType() {
+ String json =
+ """
+ {"type":"set"}""";
+ JsonNode node = JsonUtil.parse(json, n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(node))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse UDF type from object with unknown type set: {\"type\":\"set\"}");
+ }
+
+ @Test
+ void readListMissingElement() {
+ JsonNode node = JsonUtil.parse("{\"type\":\"list\"}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(node))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing field: element");
+ }
+
+ @Test
+ void readMapMissingKeyOrValue() {
+ JsonNode missingKey = JsonUtil.parse("{\"type\":\"map\",\"value\":\"int\"}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(missingKey))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing field: key");
+
+ JsonNode missingValue = JsonUtil.parse("{\"type\":\"map\",\"key\":\"string\"}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(missingValue))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing field: value");
+ }
+
+ @Test
+ void readStructWithInvalidField() {
+ JsonNode missingName =
+ JsonUtil.parse("{\"type\":\"struct\",\"fields\":[{\"type\":\"int\"}]}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(missingName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: name");
+
+ JsonNode missingType =
+ JsonUtil.parse("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\"}]}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(missingType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing field: type");
+ }
+
+ @Test
+ void readStructFieldNotObject() {
+ JsonNode node = JsonUtil.parse("{\"type\":\"struct\",\"fields\":[\"oops\"]}", n -> n);
+ assertThatThrownBy(() -> UdfTypeUtil.readType(node))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot parse struct field from non-object:");
+ }
+
+ @Test
+ void writePrimitiveType() {
+ String json =
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("return-type", UdfTypes.PrimitiveType.of("int"), gen);
+ gen.writeEndObject();
+ },
+ false);
+
+ assertThat(json)
+ .isEqualTo(
+ """
+ {"return-type":"int"}""");
+ }
+
+ @Test
+ void writeListType() {
+ UdfType listType = UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string"));
+ String json =
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("return-type", listType, gen);
+ gen.writeEndObject();
+ },
+ false);
+
+ assertThat(json)
+ .isEqualTo(
+ """
+ {"return-type":{"type":"list","element":"string"}}""");
+ }
+
+ @Test
+ void writeMapType() {
+ UdfType mapType =
+ UdfTypes.MapType.of(UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int"));
+ String json =
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("return-type", mapType, gen);
+ gen.writeEndObject();
+ },
+ false);
+
+ assertThat(json)
+ .isEqualTo(
+ """
+ {"return-type":{"type":"map","key":"string","value":"int"}}""");
+ }
+
+ @Test
+ void writeStructType() {
+ UdfType structType =
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")),
+ UdfTypes.NestedField.of("name", UdfTypes.PrimitiveType.of("string")));
+ String json =
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("return-type", structType, gen);
+ gen.writeEndObject();
+ },
+ false);
+
+ assertThat(json)
+ .isEqualTo(
+ """
+ {"return-type":{"type":"struct","fields":[\
+ {"name":"id","type":"int"},\
+ {"name":"name","type":"string"}]}}""");
+ }
+
+ @Test
+ void writeNullType() {
+ assertThatThrownBy(
+ () ->
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("type", null, gen);
+ gen.writeEndObject();
+ },
+ false))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid type: null");
+ }
+
+ @Test
+ void roundTripStructWithListAndMap() {
+ UdfType structType =
+ UdfTypes.StructType.of(
+ UdfTypes.NestedField.of("id", UdfTypes.PrimitiveType.of("int")),
+ UdfTypes.NestedField.of(
+ "tags", UdfTypes.ListType.of(UdfTypes.PrimitiveType.of("string"))),
+ UdfTypes.NestedField.of(
+ "props",
+ UdfTypes.MapType.of(
+ UdfTypes.PrimitiveType.of("string"), UdfTypes.PrimitiveType.of("int"))));
+
+ String json =
+ JsonUtil.generate(
+ gen -> {
+ gen.writeStartObject();
+ UdfTypeUtil.writeType("type", structType, gen);
+ gen.writeEndObject();
+ },
+ false);
+
+ JsonNode node = JsonUtil.parse(json, n -> n);
+ UdfType deserialized = UdfTypeUtil.readType(node.get("type"));
+ assertThat(deserialized).isEqualTo(structType);
+ }
+}