Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions csharp/doc/telemetry-sprint-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ Implement the core telemetry infrastructure including feature flag management, p
#### WI-3.2: CircuitBreakerManager
**Description**: Singleton that manages circuit breakers per host.

**Status**: ✅ **COMPLETED**

**Location**: `csharp/src/Telemetry/CircuitBreakerManager.cs`

**Input**:
Expand All @@ -310,11 +312,38 @@ Implement the core telemetry infrastructure including feature flag management, p
| Unit | `CircuitBreakerManager_GetCircuitBreaker_SameHost_ReturnsSameBreaker` | Same host twice | Same CircuitBreaker instance |
| Unit | `CircuitBreakerManager_GetCircuitBreaker_DifferentHosts_CreatesSeparateBreakers` | "host1", "host2" | Different CircuitBreaker instances |

**Implementation Notes**:
- Singleton pattern using `private static readonly` instance with `GetInstance()` method
- Uses `ConcurrentDictionary<string, CircuitBreaker>` with `StringComparer.OrdinalIgnoreCase` for case-insensitive host matching
- `GetOrAdd` ensures thread-safe atomic creation of circuit breakers
- Two overloads for `GetCircuitBreaker`: default config and custom config (failureThreshold + timeout)
- `RemoveCircuitBreaker(host)` for cleanup when last connection to a host is closed
- `Reset()` internal method for test isolation
- Input validation: throws `ArgumentNullException` for null, `ArgumentException` for empty/whitespace hosts
- Comprehensive test coverage with 23 unit tests including:
- Singleton verification
- Same host returns same instance (including case-insensitive)
- Different hosts get separate instances with independent state
- Thread safety with 10 concurrent threads for same host
- Thread safety with 20 concurrent threads for different hosts
- Mixed concurrent access (50 threads, 5 hosts, 10 threads per host)
- Input validation, remove, and reset operations
- Test file location: `csharp/test/Unit/Telemetry/CircuitBreakerManagerTests.cs`

**Key Design Decisions**:
1. **Case-insensitive host matching**: Uses `StringComparer.OrdinalIgnoreCase` since DNS hostnames are case-insensitive
2. **Lazy creation**: Circuit breakers are created on first access via `GetOrAdd`, not pre-allocated
3. **Config-ignored on existing**: When a breaker already exists for a host, the config overload returns the existing instance (config parameters are ignored)
4. **Polly-backed**: Each circuit breaker uses the existing Polly-based `CircuitBreaker` class
5. **No reference counting**: Unlike `TelemetryClientManager`, the circuit breaker manager uses simple add/remove since circuit breakers are lightweight stateless-ish objects

---

#### WI-3.3: CircuitBreakerTelemetryExporter
**Description**: Wrapper that protects telemetry exporter with circuit breaker.

**Status**: ✅ **COMPLETED**

**Location**: `csharp/src/Telemetry/CircuitBreakerTelemetryExporter.cs`

**Input**:
Expand All @@ -333,6 +362,25 @@ Implement the core telemetry infrastructure including feature flag management, p
| Unit | `CircuitBreakerTelemetryExporter_CircuitOpen_DropsMetrics` | Metrics list, circuit open | No export, no exception |
| Unit | `CircuitBreakerTelemetryExporter_InnerExporterFails_CircuitBreakerTracksFailure` | Inner exporter throws | Circuit breaker failure count incremented |

**Implementation Notes**:
- Implements `ITelemetryExporter` interface with `ExportAsync` method
- Constructor takes `(string host, ITelemetryExporter innerExporter)` with input validation
- Gets `CircuitBreaker` from `CircuitBreakerManager.GetInstance().GetCircuitBreaker(host)` for per-host isolation
- Uses `CircuitBreaker.ExecuteAsync<bool>` to wrap inner exporter calls so failures are tracked
- When inner exporter returns `false` (swallowed failure), throws internal `TelemetryExportFailedException` so the circuit breaker can track it as a failure
- Catches `BrokenCircuitException` when circuit is open and returns `false` gracefully with DEBUG-level logging
- Catches `OperationCanceledException` and re-throws (cancellation is not swallowed)
- All other exceptions are swallowed after the circuit breaker has tracked them, logged at TRACE level
- Null/empty logs bypass the circuit breaker and delegate directly to the inner exporter
- Comprehensive test coverage with 22 unit tests including constructor validation, circuit closed/open behavior, failure tracking, cancellation propagation, per-host isolation, and recovery
- Test file location: `csharp/test/Unit/Telemetry/CircuitBreakerTelemetryExporterTests.cs`

**Key Design Decisions**:
1. **False-return tracking**: When the inner exporter returns `false` (indicating a swallowed failure), the wrapper throws a `TelemetryExportFailedException` inside `ExecuteAsync` so the circuit breaker can track the failure. This satisfies the requirement that "circuit breaker MUST see exceptions before they are swallowed."
2. **Null/empty bypass**: Null or empty logs bypass the circuit breaker entirely and delegate to the inner exporter. This avoids unnecessary circuit breaker overhead for no-op calls.
3. **Cancellation propagation**: `OperationCanceledException` is the only exception that propagates to the caller, matching the existing `DatabricksTelemetryExporter` behavior.
4. **Per-host isolation**: Uses `CircuitBreakerManager` singleton to get per-host circuit breakers, ensuring failures on one host don't affect other hosts.

---

### Phase 4: Exception Handling
Expand Down
Empty file added csharp/doc/telemetry-tasks.json
Empty file.
126 changes: 126 additions & 0 deletions csharp/src/Telemetry/CircuitBreakerManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2025 ADBC Drivers Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Concurrent;

namespace AdbcDrivers.Databricks.Telemetry
{
/// <summary>
/// Singleton that manages one <see cref="CircuitBreaker"/> instance per host.
/// Prevents creating duplicate circuit breakers when multiple connections target the same host.
/// </summary>
/// <remarks>
/// This manager uses a <see cref="ConcurrentDictionary{TKey, TValue}"/> internally to ensure
/// thread-safe access from multiple connections. The same host always returns the same
/// <see cref="CircuitBreaker"/> instance, while different hosts get separate instances
/// for per-host isolation.
/// </remarks>
internal sealed class CircuitBreakerManager
{
private static readonly CircuitBreakerManager s_instance = new CircuitBreakerManager();

private readonly ConcurrentDictionary<string, CircuitBreaker> _circuitBreakers =
new ConcurrentDictionary<string, CircuitBreaker>(StringComparer.OrdinalIgnoreCase);

/// <summary>
/// Private constructor to enforce singleton pattern.
/// </summary>
private CircuitBreakerManager()
{
}

/// <summary>
/// Gets the singleton instance of <see cref="CircuitBreakerManager"/>.
/// </summary>
/// <returns>The singleton <see cref="CircuitBreakerManager"/> instance.</returns>
public static CircuitBreakerManager GetInstance() => s_instance;

/// <summary>
/// Gets an existing <see cref="CircuitBreaker"/> for the specified host,
/// or creates a new one with default configuration if none exists.
/// </summary>
/// <param name="host">The host identifier (e.g., "workspace.databricks.com").</param>
/// <returns>The <see cref="CircuitBreaker"/> instance for the specified host.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentException">Thrown when <paramref name="host"/> is empty or whitespace.</exception>
public CircuitBreaker GetCircuitBreaker(string host)
{
if (host == null)
{
throw new ArgumentNullException(nameof(host));
}

if (string.IsNullOrWhiteSpace(host))
{
throw new ArgumentException("Host cannot be empty or whitespace.", nameof(host));
}

return _circuitBreakers.GetOrAdd(host, _ => new CircuitBreaker());
}

/// <summary>
/// Gets an existing <see cref="CircuitBreaker"/> for the specified host,
/// or creates a new one with the specified configuration if none exists.
/// If a breaker already exists for the host, the configuration parameters are ignored.
/// </summary>
/// <param name="host">The host identifier (e.g., "workspace.databricks.com").</param>
/// <param name="failureThreshold">Number of failures before the circuit opens.</param>
/// <param name="timeout">Duration the circuit stays open before transitioning to half-open.</param>
/// <returns>The <see cref="CircuitBreaker"/> instance for the specified host.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentException">Thrown when <paramref name="host"/> is empty or whitespace.</exception>
public CircuitBreaker GetCircuitBreaker(string host, int failureThreshold, TimeSpan timeout)
{
if (host == null)
{
throw new ArgumentNullException(nameof(host));
}

if (string.IsNullOrWhiteSpace(host))
{
throw new ArgumentException("Host cannot be empty or whitespace.", nameof(host));
}

return _circuitBreakers.GetOrAdd(host, _ => new CircuitBreaker(failureThreshold, timeout));
}

/// <summary>
/// Removes the circuit breaker for the specified host, if one exists.
/// This is intended for cleanup when the last connection to a host is closed.
/// </summary>
/// <param name="host">The host identifier.</param>
/// <returns><c>true</c> if a circuit breaker was removed; otherwise, <c>false</c>.</returns>
internal bool RemoveCircuitBreaker(string host)
{
if (string.IsNullOrWhiteSpace(host))
{
return false;
}

return _circuitBreakers.TryRemove(host, out _);
}

/// <summary>
/// Resets the manager by removing all circuit breakers.
/// This is primarily intended for testing purposes.
/// </summary>
internal void Reset()
{
_circuitBreakers.Clear();
}
}
}
150 changes: 150 additions & 0 deletions csharp/src/Telemetry/CircuitBreakerTelemetryExporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2025 ADBC Drivers Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using AdbcDrivers.Databricks.Telemetry.Models;
using Polly.CircuitBreaker;

namespace AdbcDrivers.Databricks.Telemetry
{
/// <summary>
/// Wraps an <see cref="ITelemetryExporter"/> with circuit breaker protection.
/// </summary>
/// <remarks>
/// When the circuit is closed, delegates to the inner exporter through the circuit breaker
/// so that failures are tracked. When the circuit is open, drops events gracefully
/// (returns false, no exception). All exceptions are swallowed after the circuit breaker
/// has had a chance to track them.
///
/// Critical design requirement: The circuit breaker MUST see exceptions before they are
/// swallowed, so it can properly track failures and transition states.
///
/// JDBC Reference: CircuitBreakerTelemetryPushClient.java
/// </remarks>
internal sealed class CircuitBreakerTelemetryExporter : ITelemetryExporter
{
private readonly ITelemetryExporter _innerExporter;
private readonly CircuitBreaker _circuitBreaker;
private readonly string _host;

/// <summary>
/// Creates a new <see cref="CircuitBreakerTelemetryExporter"/> that wraps the given exporter
/// with circuit breaker protection for the specified host.
/// </summary>
/// <param name="host">The host identifier used to obtain a per-host circuit breaker.</param>
/// <param name="innerExporter">The inner telemetry exporter to delegate to.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="host"/> or <paramref name="innerExporter"/> is null.
/// </exception>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="host"/> is empty or whitespace.
/// </exception>
public CircuitBreakerTelemetryExporter(string host, ITelemetryExporter innerExporter)
{
if (host == null)
{
throw new ArgumentNullException(nameof(host));
}

if (string.IsNullOrWhiteSpace(host))
{
throw new ArgumentException("Host cannot be empty or whitespace.", nameof(host));
}

_innerExporter = innerExporter ?? throw new ArgumentNullException(nameof(innerExporter));
_host = host;
_circuitBreaker = CircuitBreakerManager.GetInstance().GetCircuitBreaker(host);
}

/// <summary>
/// Export telemetry frontend logs through the circuit breaker.
/// </summary>
/// <param name="logs">The list of telemetry frontend logs to export.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>
/// True if the export succeeded, false if the export failed or was dropped due to
/// an open circuit. Returns true for empty/null logs (delegates to inner exporter).
/// </returns>
/// <remarks>
/// This method never throws exceptions. When the circuit is open, events are dropped
/// gracefully and logged at DEBUG level. When the inner exporter fails, the circuit
/// breaker tracks the failure before the exception is swallowed.
/// </remarks>
public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, CancellationToken ct = default)
{
if (logs == null || logs.Count == 0)
{
return await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);
}

try
{
// Execute through the circuit breaker so it can track failures.
// The inner exporter is called inside ExecuteAsync, meaning:
// - If it throws, the circuit breaker sees the exception and tracks the failure
// - If it returns false (swallowed failure), we throw to let the circuit breaker track it
bool result = await _circuitBreaker.ExecuteAsync<bool>(async () =>
{
bool exportResult = await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);

if (!exportResult)
{
// The inner exporter returned false (it swallowed the error internally).
// Throw so the circuit breaker can track this as a failure.
throw new TelemetryExportFailedException(
"Inner telemetry exporter returned false, indicating export failure.");
}

return exportResult;
}).ConfigureAwait(false);

return result;
}
catch (BrokenCircuitException)
{
// Circuit is open - drop events gracefully
Debug.WriteLine($"[DEBUG] Circuit breaker OPEN for host '{_host}' - dropping {logs.Count} telemetry event(s).");
return false;
}
catch (OperationCanceledException)
{
// Cancellation should not be swallowed - propagate it
throw;
}
catch (Exception ex)
{
// All other exceptions swallowed AFTER the circuit breaker has seen them
Debug.WriteLine($"[TRACE] Telemetry export error for host '{_host}': {ex.Message}");
return false;
}
}

/// <summary>
/// Internal exception used to signal that the inner exporter returned false
/// (a swallowed failure), so the circuit breaker can track it as a failure.
/// </summary>
internal sealed class TelemetryExportFailedException : Exception
{
public TelemetryExportFailedException(string message) : base(message)
{
}
}
}
}
Loading
Loading