Skip to content

Commit de6b6fc

Browse files
author
Jade Wang
committed
Implement CircuitBreakerTelemetryExporter wrapper for ITelemetryExporter\n\nTask ID: task-1.2-circuit-breaker-telemetry-exporter
1 parent b2b7deb commit de6b6fc

3 files changed

Lines changed: 829 additions & 0 deletions

File tree

csharp/doc/telemetry-sprint-plan.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ Implement the core telemetry infrastructure including feature flag management, p
342342
#### WI-3.3: CircuitBreakerTelemetryExporter
343343
**Description**: Wrapper that protects telemetry exporter with circuit breaker.
344344

345+
**Status**: ✅ **COMPLETED**
346+
345347
**Location**: `csharp/src/Telemetry/CircuitBreakerTelemetryExporter.cs`
346348

347349
**Input**:
@@ -360,6 +362,25 @@ Implement the core telemetry infrastructure including feature flag management, p
360362
| Unit | `CircuitBreakerTelemetryExporter_CircuitOpen_DropsMetrics` | Metrics list, circuit open | No export, no exception |
361363
| Unit | `CircuitBreakerTelemetryExporter_InnerExporterFails_CircuitBreakerTracksFailure` | Inner exporter throws | Circuit breaker failure count incremented |
362364

365+
**Implementation Notes**:
366+
- Implements `ITelemetryExporter` interface with `ExportAsync` method
367+
- Constructor takes `(string host, ITelemetryExporter innerExporter)` with input validation
368+
- Gets `CircuitBreaker` from `CircuitBreakerManager.GetInstance().GetCircuitBreaker(host)` for per-host isolation
369+
- Uses `CircuitBreaker.ExecuteAsync<bool>` to wrap inner exporter calls so failures are tracked
370+
- When inner exporter returns `false` (swallowed failure), throws internal `TelemetryExportFailedException` so the circuit breaker can track it as a failure
371+
- Catches `BrokenCircuitException` when circuit is open and returns `false` gracefully with DEBUG-level logging
372+
- Catches `OperationCanceledException` and re-throws (cancellation is not swallowed)
373+
- All other exceptions are swallowed after the circuit breaker has tracked them, logged at TRACE level
374+
- Null/empty logs bypass the circuit breaker and delegate directly to the inner exporter
375+
- Comprehensive test coverage with 22 unit tests including constructor validation, circuit closed/open behavior, failure tracking, cancellation propagation, per-host isolation, and recovery
376+
- Test file location: `csharp/test/Unit/Telemetry/CircuitBreakerTelemetryExporterTests.cs`
377+
378+
**Key Design Decisions**:
379+
1. **False-return tracking**: When the inner exporter returns `false` (indicating a swallowed failure), the wrapper throws a `TelemetryExportFailedException` inside `ExecuteAsync` so the circuit breaker can track the failure. This satisfies the requirement that "circuit breaker MUST see exceptions before they are swallowed."
380+
2. **Null/empty bypass**: Null or empty logs bypass the circuit breaker entirely and delegate to the inner exporter. This avoids unnecessary circuit breaker overhead for no-op calls.
381+
3. **Cancellation propagation**: `OperationCanceledException` is the only exception that propagates to the caller, matching the existing `DatabricksTelemetryExporter` behavior.
382+
4. **Per-host isolation**: Uses `CircuitBreakerManager` singleton to get per-host circuit breakers, ensuring failures on one host don't affect other hosts.
383+
363384
---
364385

365386
### Phase 4: Exception Handling
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Diagnostics;
20+
using System.Threading;
21+
using System.Threading.Tasks;
22+
using AdbcDrivers.Databricks.Telemetry.Models;
23+
using Polly.CircuitBreaker;
24+
25+
namespace AdbcDrivers.Databricks.Telemetry
26+
{
27+
/// <summary>
28+
/// Wraps an <see cref="ITelemetryExporter"/> with circuit breaker protection.
29+
/// </summary>
30+
/// <remarks>
31+
/// When the circuit is closed, delegates to the inner exporter through the circuit breaker
32+
/// so that failures are tracked. When the circuit is open, drops events gracefully
33+
/// (returns false, no exception). All exceptions are swallowed after the circuit breaker
34+
/// has had a chance to track them.
35+
///
36+
/// Critical design requirement: The circuit breaker MUST see exceptions before they are
37+
/// swallowed, so it can properly track failures and transition states.
38+
///
39+
/// JDBC Reference: CircuitBreakerTelemetryPushClient.java
40+
/// </remarks>
41+
internal sealed class CircuitBreakerTelemetryExporter : ITelemetryExporter
42+
{
43+
private readonly ITelemetryExporter _innerExporter;
44+
private readonly CircuitBreaker _circuitBreaker;
45+
private readonly string _host;
46+
47+
/// <summary>
48+
/// Creates a new <see cref="CircuitBreakerTelemetryExporter"/> that wraps the given exporter
49+
/// with circuit breaker protection for the specified host.
50+
/// </summary>
51+
/// <param name="host">The host identifier used to obtain a per-host circuit breaker.</param>
52+
/// <param name="innerExporter">The inner telemetry exporter to delegate to.</param>
53+
/// <exception cref="ArgumentNullException">
54+
/// Thrown when <paramref name="host"/> or <paramref name="innerExporter"/> is null.
55+
/// </exception>
56+
/// <exception cref="ArgumentException">
57+
/// Thrown when <paramref name="host"/> is empty or whitespace.
58+
/// </exception>
59+
public CircuitBreakerTelemetryExporter(string host, ITelemetryExporter innerExporter)
60+
{
61+
if (host == null)
62+
{
63+
throw new ArgumentNullException(nameof(host));
64+
}
65+
66+
if (string.IsNullOrWhiteSpace(host))
67+
{
68+
throw new ArgumentException("Host cannot be empty or whitespace.", nameof(host));
69+
}
70+
71+
_innerExporter = innerExporter ?? throw new ArgumentNullException(nameof(innerExporter));
72+
_host = host;
73+
_circuitBreaker = CircuitBreakerManager.GetInstance().GetCircuitBreaker(host);
74+
}
75+
76+
/// <summary>
77+
/// Export telemetry frontend logs through the circuit breaker.
78+
/// </summary>
79+
/// <param name="logs">The list of telemetry frontend logs to export.</param>
80+
/// <param name="ct">Cancellation token.</param>
81+
/// <returns>
82+
/// True if the export succeeded, false if the export failed or was dropped due to
83+
/// an open circuit. Returns true for empty/null logs (delegates to inner exporter).
84+
/// </returns>
85+
/// <remarks>
86+
/// This method never throws exceptions. When the circuit is open, events are dropped
87+
/// gracefully and logged at DEBUG level. When the inner exporter fails, the circuit
88+
/// breaker tracks the failure before the exception is swallowed.
89+
/// </remarks>
90+
public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, CancellationToken ct = default)
91+
{
92+
if (logs == null || logs.Count == 0)
93+
{
94+
return await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);
95+
}
96+
97+
try
98+
{
99+
// Execute through the circuit breaker so it can track failures.
100+
// The inner exporter is called inside ExecuteAsync, meaning:
101+
// - If it throws, the circuit breaker sees the exception and tracks the failure
102+
// - If it returns false (swallowed failure), we throw to let the circuit breaker track it
103+
bool result = await _circuitBreaker.ExecuteAsync<bool>(async () =>
104+
{
105+
bool exportResult = await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);
106+
107+
if (!exportResult)
108+
{
109+
// The inner exporter returned false (it swallowed the error internally).
110+
// Throw so the circuit breaker can track this as a failure.
111+
throw new TelemetryExportFailedException(
112+
"Inner telemetry exporter returned false, indicating export failure.");
113+
}
114+
115+
return exportResult;
116+
}).ConfigureAwait(false);
117+
118+
return result;
119+
}
120+
catch (BrokenCircuitException)
121+
{
122+
// Circuit is open - drop events gracefully
123+
Debug.WriteLine($"[DEBUG] Circuit breaker OPEN for host '{_host}' - dropping {logs.Count} telemetry event(s).");
124+
return false;
125+
}
126+
catch (OperationCanceledException)
127+
{
128+
// Cancellation should not be swallowed - propagate it
129+
throw;
130+
}
131+
catch (Exception ex)
132+
{
133+
// All other exceptions swallowed AFTER the circuit breaker has seen them
134+
Debug.WriteLine($"[TRACE] Telemetry export error for host '{_host}': {ex.Message}");
135+
return false;
136+
}
137+
}
138+
139+
/// <summary>
140+
/// Internal exception used to signal that the inner exporter returned false
141+
/// (a swallowed failure), so the circuit breaker can track it as a failure.
142+
/// </summary>
143+
internal sealed class TelemetryExportFailedException : Exception
144+
{
145+
public TelemetryExportFailedException(string message) : base(message)
146+
{
147+
}
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)