From 453a7931fcb3602b0bde4b68d3e43072db01ea89 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 12 Jun 2026 14:13:28 +0700 Subject: [PATCH] Flink: Avoid per-row field getter allocation in RowDataWrapper Co-Authored-By: Claude Opus 4.8 (1M context) --- .../org/apache/iceberg/flink/RowDataWrapper.java | 14 +++++++++----- .../org/apache/iceberg/flink/RowDataWrapper.java | 14 +++++++++----- .../org/apache/iceberg/flink/RowDataWrapper.java | 14 +++++++++----- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 920e44b24b31..f92095963255 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -48,7 +48,14 @@ public RowDataWrapper(RowType rowType, Types.StructType struct) { for (int i = 0; i < size; i++) { types[i] = rowType.getTypeAt(i); - getters[i] = buildGetter(types[i], struct.fields().get(i).type()); + PositionalGetter getter = buildGetter(types[i], struct.fields().get(i).type()); + if (getter == null) { + // Pre-build the Flink field getter once instead of recreating it on every access. + RowData.FieldGetter fieldGetter = FlinkRowData.createFieldGetter(types[i], i); + getter = (row, pos) -> fieldGetter.getFieldOrNull(row); + } + + getters[i] = getter; } } @@ -66,12 +73,9 @@ public int size() { public T get(int pos, Class javaClass) { if (rowData.isNullAt(pos)) { return null; - } else if (getters[pos] != null) { - return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); - return javaClass.cast(value); + return javaClass.cast(getters[pos].get(rowData, pos)); } @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 920e44b24b31..f92095963255 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -48,7 +48,14 @@ public RowDataWrapper(RowType rowType, Types.StructType struct) { for (int i = 0; i < size; i++) { types[i] = rowType.getTypeAt(i); - getters[i] = buildGetter(types[i], struct.fields().get(i).type()); + PositionalGetter getter = buildGetter(types[i], struct.fields().get(i).type()); + if (getter == null) { + // Pre-build the Flink field getter once instead of recreating it on every access. + RowData.FieldGetter fieldGetter = FlinkRowData.createFieldGetter(types[i], i); + getter = (row, pos) -> fieldGetter.getFieldOrNull(row); + } + + getters[i] = getter; } } @@ -66,12 +73,9 @@ public int size() { public T get(int pos, Class javaClass) { if (rowData.isNullAt(pos)) { return null; - } else if (getters[pos] != null) { - return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); - return javaClass.cast(value); + return javaClass.cast(getters[pos].get(rowData, pos)); } @Override diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 920e44b24b31..f92095963255 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -48,7 +48,14 @@ public RowDataWrapper(RowType rowType, Types.StructType struct) { for (int i = 0; i < size; i++) { types[i] = rowType.getTypeAt(i); - getters[i] = buildGetter(types[i], struct.fields().get(i).type()); + PositionalGetter getter = buildGetter(types[i], struct.fields().get(i).type()); + if (getter == null) { + // Pre-build the Flink field getter once instead of recreating it on every access. + RowData.FieldGetter fieldGetter = FlinkRowData.createFieldGetter(types[i], i); + getter = (row, pos) -> fieldGetter.getFieldOrNull(row); + } + + getters[i] = getter; } } @@ -66,12 +73,9 @@ public int size() { public T get(int pos, Class javaClass) { if (rowData.isNullAt(pos)) { return null; - } else if (getters[pos] != null) { - return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); - return javaClass.cast(value); + return javaClass.cast(getters[pos].get(rowData, pos)); } @Override