From a410500cc63659e11acb8c361c5a95e3a5e03380 Mon Sep 17 00:00:00 2001 From: Hilmar Falkenberg Date: Tue, 22 Jul 2025 16:07:06 +0200 Subject: [PATCH 1/4] errorConsumer in context Signed-off-by: Hilmar Falkenberg --- .../logs/export/BatchLogRecordProcessor.java | 31 +++++++++++++++---- .../BatchLogRecordProcessorBuilder.java | 21 ++++++++++++- .../sdk/logs/export/ExportErrorContext.java | 19 ++++++++++++ .../logs/export/SimpleLogRecordProcessor.java | 5 +++ 4 files changed, 69 insertions(+), 7 deletions(-) create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 601bfdfa208..369f4a05a14 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -17,6 +17,7 @@ import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -26,8 +27,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Implementation of the {@link LogRecordProcessor} that batches logs exported by the SDK then @@ -71,7 +74,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord long scheduleDelayNanos, int maxQueueSize, int maxExportBatchSize, - long exporterTimeoutNanos) { + long exporterTimeoutNanos, + @Nullable Consumer> errorConsumer) { this.worker = new Worker( logRecordExporter, @@ -79,7 +83,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..) + new ArrayBlockingQueue<>(maxQueueSize), // TODO: use JcTools.newFixedSizeQueue(..) + errorConsumer); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -89,7 +94,7 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { if (logRecord == null) { return; } - worker.addLog(logRecord); + worker.addLog(logRecord, context); } @Override @@ -164,13 +169,16 @@ private static final class Worker implements Runnable { private volatile boolean continueWork = true; private final ArrayList batch; + @Nullable private final Consumer> errorConsumer; + private Worker( LogRecordExporter logRecordExporter, MeterProvider meterProvider, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - Queue queue) { + Queue queue, + @Nullable Consumer> errorConsumer) { this.logRecordExporter = logRecordExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; @@ -211,11 +219,16 @@ private Worker( false); this.batch = new ArrayList<>(this.maxExportBatchSize); + this.errorConsumer = errorConsumer; } - private void addLog(ReadWriteLogRecord logData) { + private void addLog(ReadWriteLogRecord logData, Context context) { if (!queue.offer(logData)) { processedLogsCounter.add(1, droppedAttrs); + Consumer consumer = context.get(ExportErrorContext.KEY); + if (consumer != null) { + consumer.accept(logData); + } } else { if (queue.size() >= logsNeeded.get()) { signal.offer(true); @@ -323,7 +336,13 @@ private void exportCurrentBatch() { if (result.isSuccess()) { processedLogsCounter.add(batch.size(), exportedAttrs); } else { - logger.log(Level.FINE, "Exporter failed"); + logger.log(Level.WARNING, "Exporter failed"); + if (errorConsumer != null) { + // If the exporter failed, we call the error consumer with the batch. + // This allows the user to handle the error, e.g., by logging it or sending it to a + // different exporter. + errorConsumer.accept(batch); + } } } catch (RuntimeException e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java index 5e17848c774..09b4a3b1b48 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java @@ -9,10 +9,14 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Builder class for {@link BatchLogRecordProcessor}. @@ -39,6 +43,8 @@ public final class BatchLogRecordProcessorBuilder { private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); private MeterProvider meterProvider = MeterProvider.noop(); + @Nullable private Consumer> errorConsumer; + BatchLogRecordProcessorBuilder(LogRecordExporter logRecordExporter) { this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter"); } @@ -142,6 +148,18 @@ public BatchLogRecordProcessorBuilder setMeterProvider(MeterProvider meterProvid return this; } + /** + * Sets the error consumer to handle failed log record exports. + * + * @param errorConsumer the consumer to handle collections of failed LogRecordData + * @return this builder + */ + public BatchLogRecordProcessorBuilder setErrorConsumer( + Consumer> errorConsumer) { + this.errorConsumer = errorConsumer; + return this; + } + // Visible for testing int getMaxExportBatchSize() { return maxExportBatchSize; @@ -167,6 +185,7 @@ public BatchLogRecordProcessor build() { scheduleDelayNanos, maxQueueSize, maxExportBatchSize, - exporterTimeoutNanos); + exporterTimeoutNanos, + errorConsumer); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java new file mode 100644 index 00000000000..18fd390dfc8 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; +import java.util.function.Consumer; +import javax.annotation.concurrent.Immutable; + +@Immutable +public class ExportErrorContext { + public static final ContextKey> KEY = + ContextKey.named("export-error-consumer"); + + private ExportErrorContext() {} +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index cc75b50ae1f..deb4296a284 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -17,6 +17,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -77,6 +78,10 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { () -> { pendingExports.remove(result); if (!result.isSuccess()) { + Consumer consumer = context.get(ExportErrorContext.KEY); + if (consumer != null) { + consumer.accept(logRecord); + } logger.log(Level.FINE, "Exporter failed"); } }); From b35a5dac078112fdbb5d3fde9ff72d3ff6203f95 Mon Sep 17 00:00:00 2001 From: Hilmar Falkenberg Date: Wed, 23 Jul 2025 12:31:43 +0200 Subject: [PATCH 2/4] let's use always: Collection Signed-off-by: Hilmar Falkenberg --- .../sdk/logs/export/BatchLogRecordProcessor.java | 4 ++-- .../io/opentelemetry/sdk/logs/export/ExportErrorContext.java | 5 +++-- .../sdk/logs/export/SimpleLogRecordProcessor.java | 5 +++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 369f4a05a14..27d482b8238 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -225,9 +225,9 @@ private Worker( private void addLog(ReadWriteLogRecord logData, Context context) { if (!queue.offer(logData)) { processedLogsCounter.add(1, droppedAttrs); - Consumer consumer = context.get(ExportErrorContext.KEY); + Consumer> consumer = context.get(ExportErrorContext.KEY); if (consumer != null) { - consumer.accept(logData); + consumer.accept(Collections.singleton(logData.toLogRecordData())); } } else { if (queue.size() >= logsNeeded.get()) { diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java index 18fd390dfc8..3198f2842a5 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java @@ -6,13 +6,14 @@ package io.opentelemetry.sdk.logs.export; import io.opentelemetry.context.ContextKey; -import io.opentelemetry.sdk.logs.ReadWriteLogRecord; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.Collection; import java.util.function.Consumer; import javax.annotation.concurrent.Immutable; @Immutable public class ExportErrorContext { - public static final ContextKey> KEY = + public static final ContextKey>> KEY = ContextKey.named("export-error-consumer"); private ExportErrorContext() {} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index deb4296a284..25b1e2b04fb 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -12,6 +12,7 @@ import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; @@ -78,9 +79,9 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { () -> { pendingExports.remove(result); if (!result.isSuccess()) { - Consumer consumer = context.get(ExportErrorContext.KEY); + Consumer> consumer = context.get(ExportErrorContext.KEY); if (consumer != null) { - consumer.accept(logRecord); + consumer.accept(Collections.singleton(logRecord.toLogRecordData())); } logger.log(Level.FINE, "Exporter failed"); } From 2909cbbcfd465e225fa66438ef57918edf8ce084 Mon Sep 17 00:00:00 2001 From: Hilmar Falkenberg Date: Wed, 6 Aug 2025 15:22:05 +0200 Subject: [PATCH 3/4] revert log-level change --- .../opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 27d482b8238..d7c53584a8c 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -336,7 +336,7 @@ private void exportCurrentBatch() { if (result.isSuccess()) { processedLogsCounter.add(batch.size(), exportedAttrs); } else { - logger.log(Level.WARNING, "Exporter failed"); + logger.log(Level.FINE, "Exporter failed"); if (errorConsumer != null) { // If the exporter failed, we call the error consumer with the batch. // This allows the user to handle the error, e.g., by logging it or sending it to a From 6eebdd179c99e20872336ddb5e98383076edc1ca Mon Sep 17 00:00:00 2001 From: Hilmar Falkenberg Date: Fri, 5 Sep 2025 16:16:44 +0200 Subject: [PATCH 4/4] add diff opentelemetry-sdk-logs.txt Signed-off-by: Hilmar Falkenberg --- .../apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt index 75bd1d0a6dd..f69e6d80f6d 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt @@ -1,2 +1,8 @@ Comparing source compatibility of opentelemetry-sdk-logs-1.54.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.53.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder setErrorConsumer(java.util.function.Consumer>) ++++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.logs.export.ExportErrorContext (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.context.ContextKey>> KEY