diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
index 1d5a0beb1e..4fa9e405fd 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
@@ -30,6 +30,7 @@ public static class ActivityExtensions
/// The name of the event.
/// The optional list of tags to attach to the event.
/// for convenient chaining.
+ [Obsolete("This method is deprecated. Please use the methods from ActivityWithPii, instead.")]
public static Activity? AddEvent(this Activity? activity, string eventName, IReadOnlyList>? tags = default)
{
if (activity == null) return activity;
@@ -43,6 +44,7 @@ public static class ActivityExtensions
/// The traceParent id for the associated .
/// The optional list of tags to attach to the event.
/// for convenient chaining.
+ [Obsolete("This method is deprecated. Please use the methods from ActivityWithPii, instead.")]
public static Activity? AddLink(this Activity? activity, string traceParent, IReadOnlyList>? tags = default)
{
if (activity == null) return activity;
@@ -58,6 +60,7 @@ public static class ActivityExtensions
/// for convenient chaining.
/// The tag key name as a function
/// The tag value mapped to the input key as a function
+ [Obsolete("This method is deprecated. Please use the methods from ActivityWithPii, instead.")]
public static Activity? AddTag(this Activity? activity, string key, Func value)
{
return activity?.AddTag(key, value());
@@ -72,6 +75,7 @@ public static class ActivityExtensions
/// The tag key name as a function
/// The tag value mapped to the input key
/// /// The condition to check before adding the tag
+ [Obsolete("This method is deprecated. Please use the methods from ActivityWithPii, instead.")]
public static Activity? AddConditionalTag(this Activity? activity, string key, string? value, bool condition)
{
if (condition)
@@ -91,6 +95,7 @@ public static class ActivityExtensions
/// The tag key name
/// The tag value mapped to the input key as a function
/// The format indicator for 16-byte GUID arrays.
+ [Obsolete("This method is deprecated. Please use the methods from ActivityWithPii, instead.")]
public static Activity? AddTag(this Activity? activity, string key, byte[]? value, string? guidFormat)
{
if (value == null)
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
index b332590a56..92173cab07 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs
@@ -62,6 +62,8 @@ public ActivityTrace(string? activitySourceName = default, string? activitySourc
///
public string ActivitySourceName => ActivitySource.Name;
+ #region Obsolete methods
+
///
/// Invokes the delegate within the context of a new started .
///
@@ -75,6 +77,7 @@ public ActivityTrace(string? activitySourceName = default, string? activitySourc
/// status is set to and an Activity is added to the activity
/// and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public void TraceActivity(Action call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, ActivitySource, traceParent ?? TraceParent);
@@ -104,6 +107,7 @@ public void TraceActivity(Action call, [CallerMemberName] string? act
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public T TraceActivity(Func call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, ActivitySource, traceParent ?? TraceParent);
@@ -133,6 +137,7 @@ public T TraceActivity(Func call, [CallerMemberName] string? ac
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public async Task TraceActivityAsync(Func call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, ActivitySource, traceParent ?? TraceParent);
@@ -162,6 +167,7 @@ public async Task TraceActivityAsync(Func call, [CallerMemberNa
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public async Task TraceActivityAsync(Func> call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, ActivitySource, traceParent ?? TraceParent);
@@ -192,6 +198,7 @@ public async Task TraceActivityAsync(Func> call, [Calle
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static async Task TraceActivityAsync(ActivitySource activitySource, Func call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, activitySource, traceParent);
@@ -222,6 +229,7 @@ public static async Task TraceActivityAsync(ActivitySource activitySource, Func<
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static async Task TraceActivityAsync(ActivitySource activitySource, Func> call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
using Activity? activity = StartActivityInternal(activityName, activitySource, traceParent);
@@ -238,6 +246,184 @@ public static async Task TraceActivityAsync(ActivitySource activitySource,
}
}
+ #endregion
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// Returns a new object if there is any listener to the Activity, returns null otherwise
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to . If an exception is thrown by the delegate, the Activity
+ /// status is set to and an Activity is added to the activity
+ /// and finally the exception is rethrown.
+ ///
+ public void TraceActivity(Action call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = true)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The return type for the delegate.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// The result of the call to the delegate.
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public T TraceActivity(Func call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = true)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ ///
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public async Task TraceActivityAsync(Func call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = false)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ await call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The return type for the delegate.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// The result of the call to the delegate.
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public async Task TraceActivityAsync(Func> call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = true)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = await call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The to start the on.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ ///
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public static async Task TraceActivityAsync(ActivitySource activitySource, Func call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = true)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, activitySource, traceParent);
+ try
+ {
+ await call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The return type for the delegate.
+ /// The to start the on.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// The result of the call to the delegate.
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public static async Task TraceActivityAsync(ActivitySource activitySource, Func> call, [CallerMemberName] string? activityName = default, string? traceParent = default, bool exceptionHasPii = true)
+ {
+ using ActivityWithPii? activity = StartActivityWithPiiInternal(activityName, activitySource, traceParent);
+ try
+ {
+ T? result = await call(activity);
+ if (activity?.Status == ActivityStatusCode.Unset) activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceExceptionWithPii(ex, activity, exceptionHasPii);
+ throw;
+ }
+ }
+
///
/// Gets or sets the trace parent context.
///
@@ -257,6 +443,20 @@ public static async Task TraceActivityAsync(ActivitySource activitySource,
private static void TraceException(Exception exception, Activity? activity) =>
WriteTraceException(exception, activity);
+ ///
+ /// Writes the exception to the trace by adding an exception event to the current activity (span).
+ ///
+ /// The exception to trace.
+ /// The current activity where the exception is caught.
+ ///
+ /// An indicator that should be set to true if the exception event is recorded
+ /// at a point where it is known that the exception is escaping the scope of the span/activity.
+ /// For example, escaped should be true if the exception is caught and re-thrown.
+ /// However, escaped should be set to false if execution continues in the current scope.
+ ///
+ private static void TraceExceptionWithPii(Exception exception, ActivityWithPii? activity, bool exceptionHasPii) =>
+ WriteTraceExceptionWithPii(exception, activity, exceptionHasPii);
+
///
public void Dispose()
{
@@ -269,12 +469,24 @@ private static void WriteTraceException(Exception exception, Activity? activity)
activity?.SetStatus(ActivityStatusCode.Error);
}
+ private static void WriteTraceExceptionWithPii(Exception exception, ActivityWithPii? activity, bool exceptionHasPii)
+ {
+ activity?.AddException(exception, exceptionHasPii: exceptionHasPii);
+ activity?.SetStatus(ActivityStatusCode.Error);
+ }
+
private static Activity? StartActivityInternal(string? activityName, ActivitySource activitySource, string? traceParent = default)
{
string fullActivityName = GetActivityName(activityName);
return StartActivity(activitySource, fullActivityName, traceParent);
}
+ private static ActivityWithPii? StartActivityWithPiiInternal(string? activityName, ActivitySource activitySource, string? traceParent = default)
+ {
+ string fullActivityName = GetActivityName(activityName);
+ return StartActivityWithPii(activitySource, fullActivityName, traceParent);
+ }
+
private static string GetActivityName(string? activityName)
{
if (string.IsNullOrWhiteSpace(activityName))
@@ -296,5 +508,19 @@ private static string GetActivityName(string? activityName)
? (activitySource.StartActivity(activityName, ActivityKind.Client, parentContext))
: (activitySource.StartActivity(activityName, ActivityKind.Client));
}
+
+ ///
+ /// Creates and starts a new object if there is any listener to the Activity, returns null otherwise.
+ ///
+ /// The from which to start the activity.
+ /// The name of the method for the activity
+ /// Returns a new object if there is any listener to the Activity, returns null otherwise
+ private static ActivityWithPii? StartActivityWithPii(ActivitySource activitySource, string activityName, string? traceParent = default)
+ {
+ Activity? activity = traceParent != null && ActivityContext.TryParse(traceParent, null, isRemote: true, out ActivityContext parentContext)
+ ? (activitySource.StartActivity(activityName, ActivityKind.Client, parentContext))
+ : (activitySource.StartActivity(activityName, ActivityKind.Client));
+ return ActivityWithPii.New(activity);
+ }
}
}
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityWithPii.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityWithPii.cs
new file mode 100644
index 0000000000..ae8530de03
--- /dev/null
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityWithPii.cs
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Reflection;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ public class ActivityWithPii : IDisposable
+ {
+ private readonly Activity _activity;
+ private readonly bool _shouldDisposeActivity;
+ private bool _disposed;
+
+ private ActivityWithPii(Activity activity, bool shouldDisposeActivity)
+ {
+ _activity = activity;
+ _shouldDisposeActivity = shouldDisposeActivity;
+ }
+
+ ///
+ /// Creates a new instance of from the given .The new instance
+ /// will call Dispose on the provided activity when this instance is disposed.
+ ///
+ /// Use the method in the context of , where you don't own the object and don't want
+ /// to dispose it. Use the method when starting a new activity via , which returns an that
+ /// should be disposed when the activity is stopped.
+ ///
+ ///
+ /// The to encapsulate.
+ /// A new instance of
+ public static ActivityWithPii? New(Activity? activity) => activity == null ? null : new ActivityWithPii(activity, shouldDisposeActivity: true);
+
+ ///
+ /// Creates a wrapper instance of for a given . It will not dispose the
+ /// provided activity.
+ ///
+ /// Use the method in the context of , where you don't own the object and don't want
+ /// to dispose it. Use the method when starting a new activity via , which returns an that
+ /// should be disposed when the activity is stopped.
+ ///
+ ///
+ /// The to wrap.
+ /// A new instance of
+ public static ActivityWithPii? Wrap(Activity? activity) => activity == null ? null : new ActivityWithPii(activity, shouldDisposeActivity: false);
+
+ ///
+ /// This is an ID that is specific to a particular request. Filtering
+ /// to a particular ID insures that you get only one request that matches.
+ /// Id has a hierarchical structure: '|root-id.id1_id2.id3_' Id is generated when
+ /// is called by appending suffix to Parent.Id
+ /// or ParentId; Activity has no Id until it started
+ ///
+ /// See for more details
+ ///
+ ///
+ /// Id looks like '|a000b421-5d183ab6.1.8e2d4c28_1.':
+ /// - '|a000b421-5d183ab6.' - Id of the first, top-most, Activity created
+ /// - '|a000b421-5d183ab6.1.' - Id of a child activity. It was started in the same process as the first activity and ends with '.'
+ /// - '|a000b421-5d183ab6.1.8e2d4c28_' - Id of the grand child activity. It was started in another process and ends with '_'
+ /// 'a000b421-5d183ab6' is a for the first Activity and all its children
+ ///
+ public string? Id => _activity.Id;
+
+ ///
+ /// Gets status code of the current activity object.
+ ///
+ public ActivityStatusCode Status => _activity.Status;
+
+ ///
+ /// Holds the W3C 'tracestate' header as a string.
+ ///
+ /// Tracestate is intended to carry information supplemental to trace identity contained
+ /// in traceparent. List of key value pairs carried by tracestate convey information
+ /// about request position in multiple distributed tracing graphs. It is typically used
+ /// by distributed tracing systems and should not be used as a general purpose baggage
+ /// as this use may break correlation of a distributed trace.
+ ///
+ /// Logically it is just a kind of baggage (if flows just like baggage), but because
+ /// it is expected to be special cased (it has its own HTTP header), it is more
+ /// convenient/efficient if it is not lumped in with other baggage.
+ ///
+ public string? TraceStateString
+ {
+ get => _activity.TraceStateString;
+ set => _activity.TraceStateString = value;
+ }
+
+ ///
+ /// Update the Activity to have baggage with an additional 'key' and value 'value'.
+ /// This shows up in the enumeration as well as the
+ /// method.
+ /// Baggage is meant for information that is needed for runtime control. For information
+ /// that is simply useful to show up in the log with the activity use .
+ /// Returns 'this' for convenient chaining.
+ ///
+ /// for convenient chaining.
+ public ActivityWithPii AddBaggage(string key, string? value, bool isPii = true)
+ {
+ bool shouldRedact = isPii;
+ _activity.AddBaggage(key, shouldRedact ? RedactedValue.DefaultValue : value);
+ return this;
+ }
+
+ public ActivityWithPii AddEvent(ActivityEvent e, bool isPii = true)
+ {
+ ActivityEvent clone = new(e.Name, e.Timestamp, [.. isPii ? CloneTagsWithRedaction(e.Tags) : e.Tags]);
+ _activity.AddEvent(clone);
+ return this;
+ }
+
+ ///
+ /// Add a new object to the list.
+ ///
+ /// The name of the event.
+ /// The optional list of tags to attach to the event.
+ /// for convenient chaining.
+ public ActivityWithPii AddEvent(string eventName, IReadOnlyList>? tags = default, bool isPii = true)
+ {
+ ActivityTagsCollection? tagsCollection = tags == null ? null : [.. tags];
+ return AddEvent(new ActivityEvent(eventName, tags: tagsCollection), isPii);
+ }
+
+ public ActivityWithPii AddException(Exception exception, in TagList tags = default, DateTimeOffset timestamp = default, bool exceptionHasPii = true)
+ {
+ if (exception == null)
+ {
+ throw new ArgumentNullException(nameof(exception));
+ }
+
+ if (!exceptionHasPii)
+ {
+ _activity.AddException(exception, tags, timestamp);
+ return this;
+ }
+
+ TagList exceptionTags = tags;
+ const string ExceptionMessageTag = "exception.message";
+ bool hasMessage = false;
+
+ for (int i = 0; i < exceptionTags.Count; i++)
+ {
+ if (exceptionTags[i].Key == ExceptionMessageTag)
+ {
+ exceptionTags[i] = new KeyValuePair(
+ exceptionTags[i].Key,
+ exceptionTags[i].Value is RedactedValue ? exceptionTags[i].Value : new RedactedValue(exceptionTags[i].Value));
+ hasMessage = true;
+ }
+
+ // TODO: Decide how to handle other unknown tags that may contain PII.
+ // For now, we only handle the well-known "exception.message" tag, but there may be other tags that also contain PII and should be redacted.
+ }
+
+ if (!hasMessage)
+ {
+ exceptionTags.Add(new KeyValuePair(ExceptionMessageTag, new RedactedValue(exception.Message)));
+ }
+
+ _activity.AddException(exception, exceptionTags, timestamp);
+ return this;
+ }
+
+ ///
+ /// Add an to the list.
+ ///
+ /// The to add.
+ /// for convenient chaining.
+ ///
+ /// For contexts that are available during span creation, adding links at span creation is preferred to calling later,
+ /// because head sampling decisions can only consider information present during span creation.
+ ///
+ public ActivityWithPii AddLink(ActivityLink link, bool isPii = true)
+ {
+ if (link.Tags == null)
+ {
+ _activity.AddLink(link);
+ return this;
+ }
+
+ ActivityLink clone = new(link.Context, [.. isPii ? CloneTagsWithRedaction(link.Tags) : link.Tags]);
+ _activity.AddLink(clone);
+ return this;
+ }
+
+ ///
+ /// Add an to the list.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public ActivityWithPii AddLink(string traceParent, IReadOnlyList>? tags = default, bool isPii = true)
+ {
+ ActivityTagsCollection? tagsCollection = tags == null ? null : [.. tags];
+ return AddLink(new ActivityLink(ActivityContext.Parse(traceParent, null), tags: tagsCollection), isPii);
+ }
+
+ ///
+ /// Update the Activity to have a tag with an additional 'key' and value 'value'.
+ /// This shows up in the enumeration. It is meant for information that
+ /// is useful to log but not needed for runtime control (for the latter, )
+ ///
+ /// Note: Treating string tags the same as any other object.
+ ///
+ ///
+ /// for convenient chaining.
+ /// The tag key name
+ /// The tag value mapped to the input key
+ public ActivityWithPii AddTag(string key, object? value, bool isPii = true)
+ {
+ object? finalValue = value;
+ switch (value)
+ {
+ case Delegate:
+ MethodInfo methodInfo = value.GetType().GetMethod("Invoke")!;
+ if (methodInfo.GetParameters().Length != 0)
+ {
+ throw new NotSupportedException("Only parameterless delegates are supported as tag values");
+ }
+ object? returnValue = methodInfo.Invoke(value, []);
+ return AddTag(key, returnValue, isPii);
+ }
+
+ bool shouldRedact = (isPii && finalValue is not RedactedValue);
+ _activity.AddTag(key, shouldRedact ? new RedactedValue(finalValue) : finalValue);
+ return this;
+ }
+
+ ///
+ /// Add or update the Activity tag with the input key and value.
+ /// If the input value is null
+ /// - if the collection has any tag with the same key, then this tag will get removed from the collection.
+ /// - otherwise, nothing will happen and the collection will not change.
+ /// If the input value is not null
+ /// - if the collection has any tag with the same key, then the value mapped to this key will get updated with the new input value.
+ /// - otherwise, the key and value will get added as a new tag to the collection.
+ ///
+ /// The tag key name
+ /// The tag value mapped to the input key
+ /// for convenient chaining.
+ public ActivityWithPii SetTag(string key, object? value, bool isPii = true)
+ {
+ bool shouldRedact = (isPii && value is not RedactedValue);
+ _activity.SetTag(key, shouldRedact ? new RedactedValue(value) : value);
+ return this;
+ }
+
+ ///
+ /// Sets the status code and description on the current activity object.
+ ///
+ /// The status code
+ /// The error status description
+ /// for convenient chaining.
+ ///
+ /// When passing code value different than ActivityStatusCode.Error, the Activity.StatusDescription will reset to null value.
+ /// The description parameter will be respected only when passing ActivityStatusCode.Error value.
+ ///
+ public void SetStatus(ActivityStatusCode statusCode, string? description = null)
+ {
+ _activity.SetStatus(statusCode, description);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposed)
+ {
+ if (disposing)
+ {
+ if (_shouldDisposeActivity)
+ {
+ _activity.Dispose();
+ }
+ }
+
+ _disposed = true;
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+
+ private static IEnumerable> CloneTagsWithRedaction(IEnumerable> tags)
+ {
+ foreach (KeyValuePair tag in tags)
+ {
+ yield return new KeyValuePair(tag.Key, tag.Value is RedactedValue ? tag.Value : new RedactedValue(tag.Value));
+ }
+ }
+
+#if NET5_0_OR_GREATER
+ public static string ToHexString(byte[] value) => Convert.ToHexString(value);
+#else
+ public static string ToHexString(byte[] value)
+ {
+ if (value.Length == 16)
+ {
+ return new Guid(value).ToString("N");
+ }
+ StringBuilder hex = new(value.Length * 2);
+ foreach (byte b in value)
+ {
+ hex.AppendFormat("{0:x2}", b);
+ }
+ return hex.ToString();
+ }
+#endif
+ }
+}
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/IActivityTracerExtensions.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/IActivityTracerExtensions.cs
index a71b709e1d..1b10bcac96 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/IActivityTracerExtensions.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/IActivityTracerExtensions.cs
@@ -24,6 +24,8 @@ namespace Apache.Arrow.Adbc.Tracing
{
public static class IActivityTracerExtensions
{
+ #region Obsolete methods
+
///
/// Invokes the delegate within the context of a new started .
///
@@ -37,6 +39,7 @@ public static class IActivityTracerExtensions
/// status is set to and an Activity is added to the activity
/// and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static void TraceActivity(this IActivityTracer tracer, Action call, [CallerMemberName] string? activityName = default, string? traceParent = default)
{
tracer.Trace.TraceActivity(call, activityName, traceParent ?? tracer.TraceParent);
@@ -56,6 +59,7 @@ public static void TraceActivity(this IActivityTracer tracer, Action
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static T TraceActivity(this IActivityTracer tracer, Func call, [CallerMemberName] string? activityName = null, string? traceParent = null)
{
Type type = typeof(T);
@@ -80,6 +84,7 @@ public static T TraceActivity(this IActivityTracer tracer, Func
/// If an exception is thrown by the delegate, the Activity status is set to
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static Task TraceActivityAsync(this IActivityTracer tracer, Func call, [CallerMemberName] string? activityName = null, string? traceParent = null)
{
return tracer.Trace.TraceActivityAsync(call, activityName, traceParent ?? tracer.TraceParent);
@@ -99,9 +104,112 @@ public static Task TraceActivityAsync(this IActivityTracer tracer, Func
/// and an Event is added to the activity and finally the exception is rethrown.
///
+ [Obsolete("This method is deprecated. Please use TraceActivity overloads that take ActivityWithPii instead of Activity to avoid accidentally logging PII data.")]
public static Task TraceActivityAsync(this IActivityTracer tracer, Func> call, [CallerMemberName] string? activityName = null, string? traceParent = null)
{
return tracer.Trace.TraceActivityAsync(call, activityName, traceParent ?? tracer.TraceParent);
}
+
+ #endregion
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// Returns a new object if there is any listener to the Activity, returns null otherwise
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to . If an exception is thrown by the delegate, the Activity
+ /// status is set to and an Activity is added to the activity
+ /// and finally the exception is rethrown.
+ ///
+ public static void TraceActivity(
+ this IActivityTracer tracer,
+ Action call,
+ [CallerMemberName] string? activityName = default,
+ string? traceParent = default,
+ bool exceptionHasPii = true)
+ {
+ tracer.Trace.TraceActivity(call, activityName, traceParent ?? tracer.TraceParent, exceptionHasPii);
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The return type for the delegate.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// The result of the call to the delegate.
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public static T TraceActivity(
+ this IActivityTracer tracer,
+ Func call,
+ [CallerMemberName] string? activityName = null,
+ string? traceParent = null,
+ bool exceptionHasPii = true)
+ {
+ Type type = typeof(T);
+ if (type == typeof(Task) || (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>)))
+ {
+ throw new InvalidOperationException($"Invalid return type ('{type.Name}') for synchronous method call. Please use {nameof(TraceActivityAsync)}");
+ }
+
+ return tracer.Trace.TraceActivity(call, activityName, traceParent ?? tracer.TraceParent, exceptionHasPii);
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ ///
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public static Task TraceActivityAsync(
+ this IActivityTracer tracer,
+ Func call,
+ [CallerMemberName] string? activityName = null,
+ string? traceParent = null,
+ bool exceptionHasPii = true)
+ {
+ return tracer.Trace.TraceActivityAsync(call, activityName, traceParent ?? tracer.TraceParent, exceptionHasPii);
+ }
+
+ ///
+ /// Invokes the delegate within the context of a new started .
+ ///
+ /// The return type for the delegate.
+ /// The delegate to call within the context of a newly started
+ /// The name of the method for the activity.
+ /// The result of the call to the delegate.
+ ///
+ /// Creates and starts a new object if there is any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If there are no exceptions thrown by the delegate the
+ /// Activity status is set to and the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is set to
+ /// and an Event is added to the activity and finally the exception is rethrown.
+ ///
+ public static Task TraceActivityAsync(
+ this IActivityTracer tracer,
+ Func> call,
+ [CallerMemberName] string? activityName = null,
+ string? traceParent = null,
+ bool exceptionHasPii = true)
+ {
+ return tracer.Trace.TraceActivityAsync(call, traceParent ?? tracer.TraceParent, activityName, exceptionHasPii);
+ }
}
}
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/RedactedValue.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/RedactedValue.cs
new file mode 100644
index 0000000000..e32edf2fec
--- /dev/null
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/RedactedValue.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text.Json.Serialization;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ ///
+ /// Stores a value that should be redacted when converted to string or serialized to JSON.
+ /// The value can still be retrieved using the GetValue method.
+ ///
+ ///
+ [JsonConverter(typeof(ToStringJsonConverter))]
+ public class RedactedValue(object? value)
+ {
+ private readonly object? _value = value;
+
+ public const string DefaultValue = "[REDACTED]";
+
+ ///
+ /// Returns a string representation of the redacted value. This will always return the default value
+ /// regardless of the actual value stored in the object.
+ ///
+ ///
+ public override string ToString() => DefaultValue;
+
+ ///
+ /// Gets the actual value stored in the object. This method can be used to retrieve the original value if needed,
+ /// but it should be used with caution as it may contain sensitive information.
+ ///
+ ///
+ public object? GetValue()
+ {
+ return _value;
+ }
+ }
+}
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ToStringJsonConverter.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ToStringJsonConverter.cs
new file mode 100644
index 0000000000..abd76b513b
--- /dev/null
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ToStringJsonConverter.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ ///
+ /// Converts an object to a JSON string using its ToString() method.
+ /// This is useful for types that do not have a default JSON converter
+ /// or when you want to control how the object is represented in JSON.
+ ///
+ ///
+ public class ToStringJsonConverter : JsonConverter
+ {
+ public override T? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) =>
+ throw new NotImplementedException();
+
+ public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions _)
+ {
+ writer.WriteStringValue(value?.ToString());
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
index 5df4fdb551..91ddac63b3 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
@@ -346,7 +346,7 @@ internal TCLIService.IAsync Client
internal async Task OpenAsync()
{
- await this.TraceActivityAsync(async activity =>
+ await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
CancellationToken cancellationToken = ApacheUtility.GetCancellationToken(ConnectTimeoutMilliseconds, ApacheUtility.TimeUnit.Milliseconds);
try
@@ -453,7 +453,7 @@ private void ResetConnection()
_client = null;
}
- protected virtual Task HandleOpenSessionResponse(TOpenSessionResp? session, Activity? activity = default)
+ protected virtual Task HandleOpenSessionResponse(TOpenSessionResp? session, ActivityWithPii? activity = default)
{
// Explicitly check the session status
if (session == null)
@@ -500,7 +500,7 @@ internal abstract IArrowArrayStream NewReader(
public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList? tableTypes, string? columnNamePattern)
{
- return this.TraceActivity(_ =>
+ return this.TraceActivity((ActivityWithPii? _) =>
{
if (SessionHandle == null)
{
@@ -693,7 +693,7 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata
public override IArrowArrayStream GetTableTypes()
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
TGetTableTypesReq req = new()
{
@@ -735,7 +735,7 @@ public override IArrowArrayStream GetTableTypes()
internal async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default)
{
- await this.TraceActivityAsync(async activity =>
+ await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
activity?.AddEvent("hive2.thrift.poll_start");
TGetOperationStatusResp? statusResponse = null;
@@ -767,7 +767,7 @@ await this.TraceActivityAsync(async activity =>
private string GetInfoTypeStringValue(TGetInfoType infoType)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
TGetInfoReq req = new()
{
@@ -807,7 +807,7 @@ protected override void Dispose(bool disposing)
private void DisposeClient()
{
- this.TraceActivity(activity =>
+ this.TraceActivity((ActivityWithPii? activity) =>
{
if (_client != null && SessionHandle != null)
{
@@ -1060,7 +1060,7 @@ private static StructArray GetTableSchemas(
internal async Task GetCatalogsAsync(CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1082,7 +1082,7 @@ internal async Task GetSchemasAsync(
string? schemaName,
CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1114,7 +1114,7 @@ internal async Task GetTablesAsync(
List? tableTypes,
CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1154,7 +1154,7 @@ internal async Task GetColumnsAsync(
string? columnName,
CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1193,7 +1193,7 @@ internal async Task GetPrimaryKeysAsync(
string? tableName,
CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1231,7 +1231,7 @@ internal async Task GetCrossReferenceAsync(
string? foreignTableName,
CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1359,7 +1359,7 @@ private static StructArray GetColumnSchema(TableInfo tableInfo)
public override Schema GetTableSchema(string? catalog, string? dbSchema, string? tableName)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
if (SessionHandle == null)
{
@@ -1471,7 +1471,7 @@ private static IArrowType GetArrowType(int columnTypeId, string typeName, bool i
internal async Task FetchResultsAsync(TOperationHandle operationHandle, long batchSize = BatchSizeDefault, CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken);
@@ -1482,7 +1482,10 @@ internal async Task FetchResultsAsync(TOperationHandle operationHandle,
.SetNativeError(fetchResp.Status.ErrorCode)
.SetSqlState(fetchResp.Status.SqlState);
}
- activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, HiveServer2Reader.GetRowCount(fetchResp.Results, fetchResp.Results.Columns.Count));
+ activity?.AddTag(
+ SemanticConventions.Db.Response.ReturnedRows,
+ HiveServer2Reader.GetRowCount(fetchResp.Results, fetchResp.Results.Columns.Count),
+ isPii: false);
return fetchResp.Results;
}, ClassName + "." + nameof(FetchResultsAsync));
}
@@ -1496,7 +1499,7 @@ private static async Task FetchNextAsync(TOperationHandle ope
public override IArrowArrayStream GetInfo(IReadOnlyList codes)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
const int strValTypeID = 0;
const int boolValTypeId = 1;
@@ -1612,7 +1615,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList codes)
nullCount++;
break;
}
- Tracing.ActivityExtensions.AddTag(activity, tagKey, tagValue);
+ activity?.AddTag(tagKey, tagValue, isPii: false);
}
StructType entryType = new StructType(
@@ -1710,24 +1713,25 @@ public static bool IsOperationCanceledOrCancellationRequested(Exception ex, Canc
}
}
- internal static void HandleThriftResponse(TStatus status, Activity? activity)
+ internal static void HandleThriftResponse(TStatus status, ActivityWithPii? activity)
{
- if (ErrorHandlers.TryGetValue(status.StatusCode, out Action? handler))
+ if (ErrorHandlers.TryGetValue(status.StatusCode, out Action? handler))
{
handler(status, activity);
}
}
- private static IReadOnlyDictionary> ErrorHandlers => new Dictionary>()
+ private static IReadOnlyDictionary> ErrorHandlers => new Dictionary>()
{
[TStatusCode.ERROR_STATUS] = (status, _) => ThrowErrorResponse(status),
[TStatusCode.INVALID_HANDLE_STATUS] = (status, _) => ThrowErrorResponse(status),
[TStatusCode.STILL_EXECUTING_STATUS] = (status, _) => ThrowErrorResponse(status, AdbcStatusCode.InvalidState),
- [TStatusCode.SUCCESS_STATUS] = (status, activity) => activity?.AddTag(SemanticConventions.Db.Response.StatusCode, status.StatusCode),
+ [TStatusCode.SUCCESS_STATUS] = (status, activity) =>
+ activity?.AddTag(SemanticConventions.Db.Response.StatusCode, status.StatusCode, isPii: false),
[TStatusCode.SUCCESS_WITH_INFO_STATUS] = (status, activity) =>
{
- activity?.AddTag(SemanticConventions.Db.Response.StatusCode, status.StatusCode);
- activity?.AddTag(SemanticConventions.Db.Response.InfoMessages, string.Join(Environment.NewLine, status.InfoMessages));
+ activity?.AddTag(SemanticConventions.Db.Response.StatusCode, status.StatusCode, isPii: false);
+ activity?.AddTag(SemanticConventions.Db.Response.InfoMessages, string.Join(Environment.NewLine, status.InfoMessages), isPii: true);
},
};
@@ -1739,19 +1743,19 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus
protected TConfiguration GetTconfiguration()
{
var thriftConfig = new TConfiguration();
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize);
if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0)
{
- activity?.AddTag(ActivityKeys.Thrift.MaxMessageSize, maxMessageSizeValue);
+ activity?.AddTag(ActivityKeys.Thrift.MaxMessageSize, maxMessageSizeValue, isPii: false);
thriftConfig.MaxMessageSize = maxMessageSizeValue;
}
Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize);
if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0)
{
- activity?.AddTag(ActivityKeys.Thrift.MaxFrameSize, maxFrameSizeValue);
+ activity?.AddTag(ActivityKeys.Thrift.MaxFrameSize, maxFrameSizeValue, isPii: false);
thriftConfig.MaxFrameSize = maxFrameSizeValue;
}
return thriftConfig;
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
index 530066a376..5598c63ff1 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
@@ -25,6 +25,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
@@ -127,7 +128,7 @@ protected override void ValidateOptions()
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: parameters have already been validated.
Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName);
@@ -154,12 +155,11 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;
- activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled);
- activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme);
- activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
- activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent);
- activity?.AddTag(ActivityKeys.Http.Uri, baseAddress);
-
+ activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled, isPii: false);
+ activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme, isPii: false);
+ activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString(), isPii: false);
+ activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent, isPii: false);
+ activity?.AddTag(ActivityKeys.Http.Uri, baseAddress, isPii: true);
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
@@ -173,21 +173,21 @@ protected override TTransport CreateTransport()
private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(HiveServer2AuthType authType, string? username, string? password)
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.Basic))
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
}
else if (!string.IsNullOrEmpty(username) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.UsernameOnly))
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:")));
}
else if (authType == HiveServer2AuthType.None)
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme, isPii: false);
return null;
}
else
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
index 2c6ce3a9fd..f3c9d86adb 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
@@ -94,7 +94,7 @@ public HiveServer2Reader(
public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
// All records have been exhausted
if (_hasNoMoreData)
@@ -109,7 +109,7 @@ public HiveServer2Reader(
int columnCount = GetColumnCount(response.Results);
int rowCount = GetRowCount(response.Results, columnCount);
- activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, rowCount)]);
+ activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, rowCount)], isPii: false);
if ((_statement.EnableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0)
{
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs
index 79331a4b79..132a984c49 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs
@@ -23,6 +23,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
@@ -96,7 +97,7 @@ protected override void ValidateOptions()
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Required properties (validated previously)
Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName);
@@ -137,13 +138,13 @@ protected override TTransport CreateTransport()
{
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig);
}
- activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled);
+ activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled, isPii: false);
TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport);
switch (authTypeValue)
{
case HiveServer2AuthType.None:
- activity?.AddTag(ActivityKeys.TransportType, "buffered_socket");
+ activity?.AddTag(ActivityKeys.TransportType, "buffered_socket", isPii: false);
return bufferedTransport;
case HiveServer2AuthType.Basic:
@@ -157,7 +158,7 @@ protected override TTransport CreateTransport()
PlainSaslMechanism saslMechanism = new(username, password);
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
- activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket");
+ activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket", isPii: false);
return new TFramedTransport(saslTransport);
default:
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
index 8a68f03c6b..aae0c3be95 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
@@ -125,7 +125,7 @@ public override UpdateResult ExecuteUpdate()
private async Task ExecuteQueryAsyncInternal(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (IsMetadataCommand)
{
@@ -186,7 +186,7 @@ public override async ValueTask ExecuteQueryAsync()
private async Task ExecuteUpdateAsyncInternal(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
long? affectedRows = null;
try
@@ -233,14 +233,14 @@ private async Task ExecuteUpdateAsyncInternal(CancellationToken ca
}
finally
{
- activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, affectedRows ?? -1);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, affectedRows ?? -1, isPii: false);
}
}, ClassName + "." + nameof(ExecuteUpdateAsyncInternal));
}
public override async Task ExecuteUpdateAsync()
{
- return await this.TraceActivityAsync(async _ =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? _) =>
{
CancellationTokenSource ts = SetTokenSource();
try
@@ -330,19 +330,19 @@ public override void SetOption(string key, string value)
protected async Task ExecuteStatementAsync(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (Connection.SessionHandle == null)
{
throw new InvalidOperationException("Invalid session");
}
- activity?.AddTag(SemanticConventions.Db.Client.Connection.SessionId, Connection.SessionHandle.SessionId.Guid, "N");
+ activity?.AddTag(SemanticConventions.Db.Client.Connection.SessionId, () => ActivityWithPii.ToHexString(Connection.SessionHandle.SessionId.Guid), isPii: false);
TExecuteStatementReq executeRequest = new TExecuteStatementReq(Connection.SessionHandle, SqlQuery!);
SetStatementProperties(executeRequest);
IResponse response = await Connection.Client.ExecuteStatement(executeRequest, cancellationToken);
HiveServer2Connection.HandleThriftResponse(response.Status!, activity);
- activity?.AddTag(SemanticConventions.Db.Response.OperationId, response.OperationHandle!.OperationId.Guid, "N");
+ activity?.AddTag(SemanticConventions.Db.Response.OperationId, () => ActivityWithPii.ToHexString(response.OperationHandle!.OperationId.Guid), isPii: false);
// Capture direct results if they're available
if (response.DirectResults != null)
@@ -426,9 +426,10 @@ protected void ValidateOptions(IReadOnlyDictionary properties)
private async Task ExecuteMetadataCommandQuery(CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
- activity?.AddTag(SemanticConventions.Db.Query.Text, SqlQuery ?? "");
+ // This is just the name of the metadata query and does not contain sensitive information, so safe to log as-is without redaction
+ activity?.AddTag(SemanticConventions.Db.Query.Text, SqlQuery ?? "", isPii: false);
return SqlQuery?.ToLowerInvariant() switch
{
GetCatalogsCommandName => await GetCatalogsAsync(cancellationToken),
@@ -567,7 +568,7 @@ private async Task GetResultSetSchemaAsync(TOperationHandle operationHan
private async Task GetQueryResult(IResponse response, CancellationToken cancellationToken)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
HiveServer2Connection.HandleThriftResponse(response.Status!, activity);
if (Connection.TryGetDirectResults(response.DirectResults, out QueryResult? result))
@@ -1052,14 +1053,14 @@ public bool TryGetDirectResults(IResponse response, out TSparkDirectResults? dir
///
public override void Cancel()
{
- this.TraceActivity(_ =>
+ this.TraceActivity((ActivityWithPii? _) =>
{
// This will cancel any operation using the current token source
CancelTokenSource();
}, ClassName + "." + nameof(Cancel));
}
- private async Task CancelOperationAsync(Activity? activity, TOperationHandle? operationHandle)
+ private async Task CancelOperationAsync(ActivityWithPii? activity, TOperationHandle? operationHandle)
{
if (operationHandle == null)
{
@@ -1070,13 +1071,15 @@ private async Task CancelOperationAsync(Activity? activity, TOperationHandle? op
{
activity?.AddEvent(
"db.operation.cancel_operation.starting",
- [new(SemanticConventions.Db.Operation.OperationId, new Guid(operationHandle.OperationId.Guid).ToString("N"))]);
+ [new(SemanticConventions.Db.Operation.OperationId, new Guid(operationHandle.OperationId.Guid).ToString("N"))],
+ isPii: false);
TCancelOperationReq req = new(operationHandle);
TCancelOperationResp resp = await Client.CancelOperation(req, cancellationTokenSource.Token);
HiveServer2Connection.HandleThriftResponse(resp.Status, activity);
activity?.AddEvent(
"db.operation.cancel_operation.completed",
- [new(SemanticConventions.Db.Response.StatusCode, resp.Status.StatusCode.ToString())]);
+ [new(SemanticConventions.Db.Response.StatusCode, resp.Status.StatusCode.ToString())],
+ isPii: false);
}
catch (Exception ex)
{
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
index 1b73da8b64..829089dd87 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
@@ -26,6 +26,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
@@ -132,7 +133,7 @@ internal override IArrowArrayStream NewReader(T statement, Schema schema, IRe
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: parameters have already been validated.
Properties.TryGetValue(ImpalaParameters.HostName, out string? hostName);
@@ -159,12 +160,11 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;
- activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps);
- activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme);
- activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
- activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent);
- activity?.AddTag(ActivityKeys.Http.Uri, baseAddress);
-
+ activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps, isPii: false);
+ activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme, isPii: false);
+ activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString(), isPii: false);
+ activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent, isPii: false);
+ activity?.AddTag(ActivityKeys.Http.Uri, baseAddress, isPii: true);
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
@@ -178,7 +178,7 @@ protected override TTransport CreateTransport()
private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(ImpalaAuthType authType, string? username, string? password)
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == ImpalaAuthType.Empty || authType == ImpalaAuthType.Basic))
{
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
index c468bdbfe9..ecc6d02bbf 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
@@ -23,6 +23,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
@@ -103,7 +104,7 @@ protected override void ValidateOptions()
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: hostName and port have already been validated.
Properties.TryGetValue(ImpalaParameters.HostName, out string? hostName);
@@ -129,12 +130,12 @@ protected override TTransport CreateTransport()
{
transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator);
}
- activity?.AddTag(ActivityKeys.Encrypted, true);
+ activity?.AddTag(ActivityKeys.Encrypted, true, isPii: false);
}
else
{
transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: thriftConfig);
- activity?.AddTag(ActivityKeys.Encrypted, false);
+ activity?.AddTag(ActivityKeys.Encrypted, false, isPii: false);
}
activity?.AddTag(ActivityKeys.Host, hostName);
activity?.AddTag(ActivityKeys.Port, port);
@@ -172,7 +173,7 @@ protected override async Task CreateProtocolAsync(TTransport transpor
protected override TOpenSessionReq CreateSessionRequest()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: user name and password have already been validated.
Properties.TryGetValue(AdbcOptions.Username, out string? username);
diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
index d19cc3c282..ba1cfe3dc6 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
@@ -26,6 +26,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
@@ -162,7 +163,7 @@ protected virtual HttpMessageHandler CreateHttpHandler()
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: parameters have already been validated.
Properties.TryGetValue(SparkParameters.HostName, out string? hostName);
@@ -187,12 +188,11 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;
- activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps);
- activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme);
- activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
- activity?.AddTag(ActivityKeys.Http.UserAgent, userAgent);
- activity?.AddTag(ActivityKeys.Http.Uri, baseAddress);
-
+ activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps, isPii: false);
+ activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme, isPii: false);
+ activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString(), isPii: false);
+ activity?.AddTag(ActivityKeys.Http.UserAgent, userAgent, isPii: false);
+ activity?.AddTag(ActivityKeys.Http.Uri, baseAddress, isPii: true);
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
@@ -206,7 +206,7 @@ protected override TTransport CreateTransport()
protected virtual AuthenticationHeaderValue? GetAuthenticationHeaderValue(SparkAuthType authType)
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
Properties.TryGetValue(SparkParameters.Token, out string? token);
Properties.TryGetValue(SparkParameters.AccessToken, out string? access_token);
@@ -214,27 +214,27 @@ protected override TTransport CreateTransport()
Properties.TryGetValue(AdbcOptions.Password, out string? password);
if (!string.IsNullOrEmpty(token) && (authType == SparkAuthType.Empty || authType == SparkAuthType.Token))
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BearerAuthenticationScheme, token);
}
else if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == SparkAuthType.Empty || authType == SparkAuthType.Basic))
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
}
else if (!string.IsNullOrEmpty(username) && (authType == SparkAuthType.Empty || authType == SparkAuthType.UsernameOnly))
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:")));
}
else if (!string.IsNullOrEmpty(access_token) && authType == SparkAuthType.OAuth)
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme, isPii: false);
return new AuthenticationHeaderValue(BearerAuthenticationScheme, access_token);
}
else if (authType == SparkAuthType.None)
{
- activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme);
+ activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme, isPii: false);
return null;
}
else
diff --git a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
index 4ca8ce7d8b..f7d8888784 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
@@ -24,6 +24,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
@@ -105,7 +106,7 @@ protected override void ValidateConnection()
protected override TTransport CreateTransport()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: hostName and port have already been validated.
Properties.TryGetValue(SparkParameters.HostName, out string? hostName);
@@ -140,21 +141,21 @@ protected override TTransport CreateTransport()
{
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator);
}
- activity?.AddTag(ActivityKeys.Encrypted, trustedCert != null);
+ activity?.AddTag(ActivityKeys.Encrypted, trustedCert != null, isPii: false);
}
else
{
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig);
- activity?.AddTag(ActivityKeys.Encrypted, false);
+ activity?.AddTag(ActivityKeys.Encrypted, false, isPii: false);
}
- activity?.AddTag(ActivityKeys.Host, hostName);
- activity?.AddTag(ActivityKeys.Port, port);
+ activity?.AddTag(ActivityKeys.Host, hostName, isPii: true);
+ activity?.AddTag(ActivityKeys.Port, port, isPii: true);
TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport);
switch (authTypeValue)
{
case SparkAuthType.None:
- activity?.AddTag(ActivityKeys.TransportType, "buffered_socket");
+ activity?.AddTag(ActivityKeys.TransportType, "buffered_socket", isPii: false);
return bufferedTransport;
case SparkAuthType.Basic:
@@ -168,7 +169,7 @@ protected override TTransport CreateTransport()
PlainSaslMechanism saslMechanism = new(username, password);
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
- activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket");
+ activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket", isPii: false);
return new TFramedTransport(saslTransport);
default:
@@ -184,7 +185,7 @@ protected override async Task CreateProtocolAsync(TTransport transpor
protected override TOpenSessionReq CreateSessionRequest()
{
- Activity? activity = Activity.Current;
+ ActivityWithPii? activity = ActivityWithPii.Wrap(Activity.Current);
// Assumption: user name and password have already been validated.
Properties.TryGetValue(AdbcOptions.Username, out string? username);
@@ -220,7 +221,7 @@ protected override TOpenSessionReq CreateSessionRequest()
? SparkAuthType.Basic
: SparkAuthType.UsernameOnly
: authTypeValue;
- activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
+ activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString(), isPii: false);
return request;
}
diff --git a/csharp/src/Drivers/BigQuery/ActivityExtensions.cs b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
index 11e1db79c6..95be77779d 100644
--- a/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
+++ b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
@@ -25,27 +25,16 @@ internal static class ActivityExtensions
private const string bigQueryKeyPrefix = "adbc.bigquery.tracing.";
private const string bigQueryParameterKeyValueSuffix = ".value";
- public static Activity AddBigQueryTag(this Activity activity, string key, object? value)
+ public static ActivityWithPii AddBigQueryTag(this ActivityWithPii activity, string key, object? value, bool isPii = true)
{
string bigQueryKey = bigQueryKeyPrefix + key;
- return activity.AddTag(bigQueryKey, value);
+ return activity.AddTag(bigQueryKey, value, isPii);
}
- public static Activity AddConditionalBigQueryTag(this Activity activity, string key, string? value, bool condition)
+ public static ActivityWithPii AddBigQueryParameterTag(this ActivityWithPii activity, string parameterName, object? value)
{
- string bigQueryKey = bigQueryKeyPrefix + key;
- return activity.AddConditionalTag(key, value, condition)!;
- }
-
- public static Activity AddBigQueryParameterTag(this Activity activity, string parameterName, object? value)
- {
- if (BigQueryParameters.IsSafeToLog(parameterName))
- {
- string bigQueryParameterValueKey = parameterName + bigQueryParameterKeyValueSuffix;
- return activity.AddTag(bigQueryParameterValueKey, value);
- }
-
- return activity;
+ string bigQueryParameterValueKey = parameterName + bigQueryParameterKeyValueSuffix;
+ return activity.AddTag(bigQueryParameterValueKey, value, isPii: !BigQueryParameters.IsSafeToLog(parameterName));
}
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
index bc8957f005..5da1e848e4 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
@@ -123,15 +123,6 @@ private bool TryInitTracerProvider(out FileActivityListener? fileActivityListene
return tags;
}
- ///
- /// Conditional used to determines if it is safe to trace
- ///
- ///
- /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource).
- ///
- ///
- internal bool IsSafeToTrace => _fileActivityListener != null;
-
///
/// The function to call when updating the token.
///
@@ -161,7 +152,7 @@ private bool TryInitTracerProvider(out FileActivityListener? fileActivityListene
///
internal BigQueryClient Open(string? projectId = null)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
string? billingProjectId = null;
TimeSpan? clientTimeout = null;
@@ -186,7 +177,7 @@ internal BigQueryClient Open(string? projectId = null)
if (projectId.Equals(BigQueryConstants.PublicProjectId, StringComparison.OrdinalIgnoreCase))
{
projectId = BigQueryConstants.DetectProjectId;
- activity?.AddBigQueryTag("change_public_projectId_to_detect_project_id", projectId);
+ activity?.AddBigQueryTag("change_public_projectId_to_detect_project_id", projectId, isPii: false);
}
}
@@ -236,7 +227,7 @@ internal BigQueryClient Open(string? projectId = null)
}
else
{
- activity?.AddBigQueryTag("client.default_location", null);
+ activity?.AddBigQueryTag("client.default_location", null, isPii: false);
}
BigQueryClient client = bigQueryClientBuilder.Build();
@@ -253,7 +244,7 @@ internal BigQueryClient Open(string? projectId = null)
internal void SetCredential()
{
- this.TraceActivity(activity =>
+ this.TraceActivity((ActivityWithPii? activity) =>
{
string? clientId = null;
string? clientSecret = null;
@@ -327,9 +318,9 @@ internal void SetCredential()
public override void SetOption(string key, string value)
{
- this.TraceActivity(activity =>
+ this.TraceActivity((ActivityWithPii? activity) =>
{
- activity?.AddTag(key + ".set", value);
+ activity?.AddTag(key + ".set", value, isPii: true);
this.properties[key] = value;
@@ -364,7 +355,7 @@ private GoogleCredential ApplyScopes(GoogleCredential credential)
public override IArrowArrayStream GetInfo(IReadOnlyList codes)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
const int strValTypeID = 0;
@@ -453,7 +444,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList codes)
nullCount++;
break;
}
- activity?.AddTag(tagKey, tagValue);
+ activity?.AddTag(tagKey, tagValue, isPii: false);
}
StructType entryType = new StructType(
@@ -496,7 +487,7 @@ public override IArrowArrayStream GetObjects(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern,
tableNamePattern, tableTypes, columnNamePattern);
@@ -519,7 +510,7 @@ internal void UpdateClientToken()
///
public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task ExecuteWithRetriesAsync(Func> action, ActivityWithPii? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs);
///
/// Executes the query using the BigQueryClient.
@@ -536,9 +527,9 @@ internal void UpdateClientToken()
{
if (Client == null) { Client = Open(); }
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
- activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, IsSafeToTrace);
+ activity?.AddTag(SemanticConventions.Db.Query.Text, sql, isPii: true);
Func> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty(), queryOptions, resultsOptions);
BigQueryResults? result = ExecuteWithRetriesAsync(func, activity).GetAwaiter().GetResult();
@@ -555,7 +546,7 @@ private IArrowArray[] GetCatalogs(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder catalogNameBuilder = new StringArray.Builder();
List catalogDbSchemasValues = new List();
@@ -620,7 +611,7 @@ private StructArray GetDbSchemas(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder();
List dbSchemaTablesValues = new List();
@@ -684,7 +675,7 @@ private StructArray GetTableSchemas(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder tableNameBuilder = new StringArray.Builder();
StringArray.Builder tableTypeBuilder = new StringArray.Builder();
@@ -776,7 +767,7 @@ private StructArray GetColumnSchema(
string table,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder columnNameBuilder = new StringArray.Builder();
Int32Array.Builder ordinalPositionBuilder = new Int32Array.Builder();
@@ -893,7 +884,7 @@ private StructArray GetConstraintSchema(
string table,
string? columnNamePattern)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder constraintNameBuilder = new StringArray.Builder();
StringArray.Builder constraintTypeBuilder = new StringArray.Builder();
@@ -964,7 +955,7 @@ private StringArray GetConstraintColumnNames(
string table,
string constraintName)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND constraint_name = '{3}' ORDER BY ordinal_position",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table), Sanitize(constraintName));
@@ -992,7 +983,7 @@ private StructArray GetConstraintsUsage(
string table,
string constraintName)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
StringArray.Builder constraintFkCatalogBuilder = new StringArray.Builder();
StringArray.Builder constraintFkDbSchemaBuilder = new StringArray.Builder();
@@ -1139,7 +1130,7 @@ private XdbcDataType ToXdbcDataType(string type)
public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName)
{
- return this.TraceActivity(activity =>
+ return this.TraceActivity((ActivityWithPii? activity) =>
{
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(tableName));
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 52bcc8e0ee..15cf4ce715 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -88,11 +88,11 @@ public override QueryResult ExecuteQuery()
private async Task ExecuteQueryInternalAsync()
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
QueryOptions queryOptions = ValidateOptions(activity);
- activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace);
+ activity?.AddTag(SemanticConventions.Db.Query.Text, SqlQuery, isPii: true);
BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);
JobReference jobReference = job.Reference;
@@ -217,7 +217,7 @@ private async Task ExecuteQueryInternalAsync()
// Note: MultiArrowReader must dispose the cancellationContext.
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry));
- activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows, isPii: false);
return new QueryResult(totalRows, stream);
});
}
@@ -227,7 +227,7 @@ private async Task> GetArrowReaders(
string table,
string projectId,
int maxStreamCount,
- Activity? activity,
+ ActivityWithPii? activity,
CancellationToken cancellationToken = default)
{
ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };
@@ -235,7 +235,7 @@ private async Task> GetArrowReaders(
ReadSession rrs = await bigQueryReadClient.CreateReadSessionAsync("projects/" + projectId, rs, maxStreamCount);
var readers = rrs.Streams
- .Select(s => ReadChunk(bigQueryReadClient, s.Name, activity, this.bigQueryConnection.IsSafeToTrace, cancellationToken))
+ .Select(s => ReadChunk(bigQueryReadClient, s.Name, activity, cancellationToken))
.Where(chunk => chunk != null)
.Cast();
@@ -249,7 +249,7 @@ public override UpdateResult ExecuteUpdate()
public override void Cancel()
{
- this.TraceActivity(_ =>
+ this.TraceActivity((ActivityWithPii? _) =>
{
this.cancellationRegistry.CancelAll();
});
@@ -263,7 +263,7 @@ public override void Dispose()
private async Task ExecuteUpdateInternalAsync()
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
@@ -275,7 +275,7 @@ private async Task ExecuteUpdateInternalAsync()
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
}
- activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace);
+ activity?.AddTag(SemanticConventions.Db.Query.Text, SqlQuery, isPii: true);
using JobCancellationContext context = new(cancellationRegistry);
// Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
@@ -290,7 +290,7 @@ private async Task ExecuteUpdateInternalAsync()
BigQueryResults? result = await ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity, context.CancellationToken);
long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;
- activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, updatedRows);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, updatedRows, isPii: false);
return new UpdateResult(updatedRows);
});
}
@@ -381,27 +381,27 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type)
return type;
}
- private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity, bool isSafeToTrace, CancellationToken cancellationToken = default)
+ private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, ActivityWithPii? activity, CancellationToken cancellationToken = default)
{
// Ideally we wouldn't need to indirect through a stream, but the necessary APIs in Arrow
// are internal. (TODO: consider changing Arrow).
- activity?.AddConditionalBigQueryTag("read_stream", streamName, isSafeToTrace);
+ activity?.AddTag("read_stream", streamName, isPii: true);
BigQueryReadClient.ReadRowsStream readRowsStream = client.ReadRows(new ReadRowsRequest { ReadStream = streamName });
IAsyncEnumerator enumerator = readRowsStream.GetResponseStream().GetAsyncEnumerator(cancellationToken);
ReadRowsStream stream = new ReadRowsStream(enumerator);
- activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows);
+ activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows, isPii: false);
return stream.HasRows ? stream : null;
}
- private QueryOptions ValidateOptions(Activity? activity)
+ private QueryOptions ValidateOptions(ActivityWithPii? activity)
{
QueryOptions options = new QueryOptions();
if (Client.ProjectId == BigQueryConstants.DetectProjectId)
{
- activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId);
+ activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId, isPii: false);
// An error occurs when calling CreateQueryJob without the ID set,
// so use the first one that is found. This does not prevent from calling
@@ -420,7 +420,7 @@ private QueryOptions ValidateOptions(Activity? activity)
if (firstProjectId != null)
{
options.ProjectId = firstProjectId;
- activity?.AddBigQueryTag("detected_client_project_id", firstProjectId);
+ activity?.AddBigQueryTag("detected_client_project_id", firstProjectId, isPii: false);
// need to reopen the Client with the projectId specified
this.bigQueryConnection.Open(firstProjectId);
}
@@ -494,15 +494,15 @@ private QueryOptions ValidateOptions(Activity? activity)
///
/// The name of the dataset.
/// A to a randomly generated table name in the specified dataset.
- private TableReference TryGetLargeDestinationTableReference(string datasetId, Activity? activity)
+ private TableReference TryGetLargeDestinationTableReference(string datasetId, ActivityWithPii? activity)
{
BigQueryDataset? dataset = null;
try
{
- activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId);
+ activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId, isPii: false);
dataset = this.Client.GetDataset(datasetId);
- activity?.AddBigQueryTag("large_results.dataset.found", datasetId);
+ activity?.AddBigQueryTag("large_results.dataset.found", datasetId, isPii: false);
}
catch (GoogleApiException gaEx)
{
@@ -517,8 +517,8 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac
{
try
{
- activity?.AddBigQueryTag("large_results.dataset.try_create", datasetId);
- activity?.AddBigQueryTag("large_results.dataset.try_create_region", this.Client.DefaultLocation);
+ activity?.AddBigQueryTag("large_results.dataset.try_create", datasetId, isPii: false);
+ activity?.AddBigQueryTag("large_results.dataset.try_create_region", this.Client.DefaultLocation, isPii: false);
DatasetReference reference = this.Client.GetDatasetReference(datasetId);
// The location is not set here because it will use the DefaultLocation from the client.
@@ -532,7 +532,7 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac
});
dataset = this.Client.CreateDataset(datasetId, bigQueryDataset.Resource);
- activity?.AddBigQueryTag("large_results.dataset.created", datasetId);
+ activity?.AddBigQueryTag("large_results.dataset.created", datasetId, isPii: false);
}
catch (Exception ex)
{
@@ -554,7 +554,7 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac
TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", "")
};
- activity?.AddBigQueryTag("large_results.table_reference", reference.ToString());
+ activity?.AddBigQueryTag("large_results.table_reference", reference.ToString(), isPii: false);
return reference;
}
@@ -562,12 +562,12 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac
public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity, CancellationToken cancellationToken = default) =>
+ private async Task ExecuteWithRetriesAsync(Func> action, ActivityWithPii? activity, CancellationToken cancellationToken = default) =>
await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs, cancellationToken);
private async Task ExecuteCancellableJobAsync(
JobCancellationContext context,
- Activity? activity,
+ ActivityWithPii? activity,
Func> func)
{
try
@@ -585,7 +585,7 @@ private async Task ExecuteCancellableJobAsync(
{
if (context.Job != null)
{
- activity?.AddBigQueryTag("job.cancel", context.Job.Reference.JobId);
+ activity?.AddBigQueryTag("job.cancel", context.Job.Reference.JobId, isPii: false);
await context.Job.CancelAsync().ConfigureAwait(false);
}
}
@@ -710,7 +710,7 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<
public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
if (this.readers == null)
{
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs b/csharp/src/Drivers/BigQuery/RetryManager.cs
index 7da26504dd..c3deca9163 100644
--- a/csharp/src/Drivers/BigQuery/RetryManager.cs
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -20,6 +20,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
@@ -31,7 +32,7 @@ internal class RetryManager
public static async Task ExecuteWithRetriesAsync(
ITokenProtectedResource tokenProtectedResource,
Func> action,
- Activity? activity,
+ ActivityWithPii? activity,
int maxRetries = 5,
int initialDelayMilliseconds = 200,
CancellationToken cancellationToken = default)
@@ -55,7 +56,7 @@ public static async Task ExecuteWithRetriesAsync(
{
// Note: OperationCanceledException could be thrown from the call,
// but we only want to break out when the cancellation was requested from the caller.
- activity?.AddBigQueryTag("retry_attempt", retryCount);
+ activity?.AddBigQueryTag("retry_attempt", retryCount, isPii: false);
activity?.AddException(ex);
retryCount++;
@@ -65,7 +66,7 @@ public static async Task ExecuteWithRetriesAsync(
{
if (tokenProtectedResource?.TokenRequiresUpdate(ex) == true)
{
- activity?.AddBigQueryTag("update_token.status", "Expired");
+ activity?.AddBigQueryTag("update_token.status", "Expired", isPii: false);
throw new AdbcException($"Cannot update access token after {maxRetries} tries. Last exception: {ex.GetType().Name}: {ex.Message}", AdbcStatusCode.Unauthenticated, ex);
}
}
@@ -77,9 +78,9 @@ public static async Task ExecuteWithRetriesAsync(
{
if (tokenProtectedResource.TokenRequiresUpdate(ex) == true)
{
- activity?.AddBigQueryTag("update_token.status", "Required");
+ activity?.AddBigQueryTag("update_token.status", "Required", isPii: false);
await tokenProtectedResource.UpdateToken();
- activity?.AddBigQueryTag("update_token.status", "Completed");
+ activity?.AddBigQueryTag("update_token.status", "Completed", isPii: false);
}
}
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 507cc919ab..8379797be0 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -31,6 +31,7 @@
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Drivers.Databricks.Auth;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
@@ -765,7 +766,7 @@ protected override TOpenSessionReq CreateSessionRequest()
return req;
}
- protected override async Task HandleOpenSessionResponse(TOpenSessionResp? session, Activity? activity = default)
+ protected override async Task HandleOpenSessionResponse(TOpenSessionResp? session, ActivityWithPii? activity = default)
{
await base.HandleOpenSessionResponse(session, activity);
if (session != null)
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index f7bc3b201a..3f3e75ef67 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -200,7 +200,7 @@ public async Task StopAsync()
private async Task DownloadFilesAsync(CancellationToken cancellationToken)
{
- await this.TraceActivityAsync(async activity =>
+ await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
await Task.Yield();
@@ -268,7 +268,7 @@ await this.TraceActivityAsync(async activity =>
downloadResult.UpdateWithRefreshedLink(refreshedLink);
activity?.AddEvent("cloudfetch.url_refreshed_before_download", [
new("offset", refreshedLink.StartRowOffset)
- ]);
+ ], isPii: false);
}
}
@@ -352,7 +352,7 @@ await this.TraceActivityAsync(async activity =>
new("total_mb", totalBytes / 1024.0 / 1024.0),
new("total_time_ms", overallStopwatch.ElapsedMilliseconds),
new("total_time_sec", overallStopwatch.ElapsedMilliseconds / 1000.0)
- ]);
+ ], isPii: false);
// If there's an error, add the error to the result queue
if (HasError)
@@ -365,7 +365,7 @@ await this.TraceActivityAsync(async activity =>
private async Task DownloadFileAsync(IDownloadResult downloadResult, CancellationToken cancellationToken)
{
- await this.TraceActivityAsync(async activity =>
+ await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
string url = downloadResult.Link.FileLink;
string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
@@ -375,20 +375,20 @@ await this.TraceActivityAsync(async activity =>
long size = downloadResult.Size;
// Add tags to the Activity for filtering/searching
- activity?.SetTag("cloudfetch.offset", downloadResult.Link.StartRowOffset);
- activity?.SetTag("cloudfetch.sanitized_url", sanitizedUrl);
- activity?.SetTag("cloudfetch.expected_size_bytes", size);
+ activity?.SetTag("cloudfetch.offset", downloadResult.Link.StartRowOffset, isPii: false);
+ activity?.SetTag("cloudfetch.sanitized_url", sanitizedUrl, isPii: false);
+ activity?.SetTag("cloudfetch.expected_size_bytes", size, isPii: false);
// Create a stopwatch to track download time
var stopwatch = Stopwatch.StartNew();
// Log download start
activity?.AddEvent("cloudfetch.download_start", [
- new("offset", downloadResult.Link.StartRowOffset),
+ new("offset", downloadResult.Link.StartRowOffset),
new("sanitized_url", sanitizedUrl),
new("expected_size_bytes", size),
new("expected_size_kb", size / 1024.0)
- ]);
+ ], isPii: false);
// Retry logic for downloading files
for (int retry = 0; retry < _maxRetries; retry++)
@@ -423,7 +423,7 @@ await this.TraceActivityAsync(async activity =>
activity?.AddEvent("cloudfetch.url_refreshed_after_auth_error", [
new("offset", refreshedLink.StartRowOffset),
new("sanitized_url", sanitizedUrl)
- ]);
+ ], isPii: false);
// Continue to the next retry attempt with the refreshed URL
continue;
@@ -446,7 +446,7 @@ await this.TraceActivityAsync(async activity =>
new("sanitized_url", sanitizedUrl),
new("content_length_bytes", contentLength.Value),
new("content_length_mb", contentLength.Value / 1024.0 / 1024.0)
- ]);
+ ], isPii: false);
}
// Read the file data
@@ -476,7 +476,7 @@ await this.TraceActivityAsync(async activity =>
new("sanitized_url", sanitizedUrl),
new("max_retries", _maxRetries),
new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
- ]);
+ ], isPii: false);
// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
@@ -517,7 +517,7 @@ await this.TraceActivityAsync(async activity =>
new("decompressed_size_bytes", dataStream.Length),
new("decompressed_size_kb", dataStream.Length / 1024.0),
new("compression_ratio", compressionRatio)
- ]);
+ ], isPii: false);
actualSize = dataStream.Length;
}
@@ -551,14 +551,14 @@ await this.TraceActivityAsync(async activity =>
new("actual_size_kb", actualSize / 1024.0),
new("latency_ms", stopwatch.ElapsedMilliseconds),
new("throughput_mbps", throughputMBps)
- ]);
+ ], isPii: false);
// Set the download as completed with the original size
downloadResult.SetCompleted(dataStream, size);
}, activityName: "DownloadFile");
}
- private void SetError(Exception ex, Activity? activity = null)
+ private void SetError(Exception ex, ActivityWithPii? activity = null)
{
lock (_errorLock)
{
@@ -570,7 +570,7 @@ private void SetError(Exception ex, Activity? activity = null)
}
}
- private void CompleteWithError(Activity? activity = null)
+ private void CompleteWithError(ActivityWithPii? activity = null)
{
try
{
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 99c0b18e57..b5cebbc74d 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -91,7 +91,7 @@ public CloudFetchReader(
/// The next record batch, or null if there are no more batches.
public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async _ =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? _) =>
{
ThrowIfDisposed();
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
index 152516e427..263659fdb9 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
@@ -181,10 +181,10 @@ public async Task StopAsync()
var refreshedLink = response.Results.ResultLinks.FirstOrDefault(l => l.StartRowOffset == offset);
if (refreshedLink != null)
{
- Activity.Current?.AddEvent("cloudfetch.url_fetched", [
+ ActivityWithPii.Wrap(Activity.Current)?.AddEvent("cloudfetch.url_fetched", [
new("offset", offset),
new("url_length", refreshedLink.FileLink?.Length ?? 0)
- ]);
+ ], isPii: false);
// Create a download result for the refreshed link
var downloadResult = new DownloadResult(refreshedLink, _memoryManager);
@@ -194,7 +194,7 @@ public async Task StopAsync()
}
}
- Activity.Current?.AddEvent("cloudfetch.url_fetch_failed", [new("offset", offset)]);
+ ActivityWithPii.Wrap(Activity.Current)?.AddEvent("cloudfetch.url_fetch_failed", [new("offset", offset)], isPii: false);
return null;
}
finally
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index 2fc6268b06..15207fa963 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -50,7 +50,7 @@ public DatabricksReader(IHiveServer2Statement statement, Schema schema, IRespons
public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- return await this.TraceActivityAsync(async activity =>
+ return await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
ThrowIfDisposed();
@@ -61,7 +61,9 @@ public DatabricksReader(IHiveServer2Statement statement, Schema schema, IRespons
RecordBatch? next = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
if (next != null)
{
- activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, next.Length)]);
+ activity?.AddEvent(
+ SemanticConventions.Messaging.Batch.Response,
+ [new(SemanticConventions.Db.Response.ReturnedRows, next.Length)], isPii:false);
return next;
}
this.reader = null;
@@ -95,7 +97,7 @@ public DatabricksReader(IHiveServer2Statement statement, Schema schema, IRespons
this.batches = response.Results.ArrowBatches;
for (int i = 0; i < this.batches.Count; i++)
{
- activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, this.batches[i].RowCount);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, this.batches[i].RowCount, isPii: false);
}
this.hasNoMoreRows = !response.HasMoreRows;
diff --git a/csharp/src/Drivers/Databricks/TracingDelegatingHandler.cs b/csharp/src/Drivers/Databricks/TracingDelegatingHandler.cs
index f07c0dad8f..ddb6c7717f 100644
--- a/csharp/src/Drivers/Databricks/TracingDelegatingHandler.cs
+++ b/csharp/src/Drivers/Databricks/TracingDelegatingHandler.cs
@@ -60,7 +60,7 @@ public TracingDelegatingHandler(
protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
// Get the current activity or use the trace parent from the connection
- Activity? currentActivity = Activity.Current;
+ ActivityWithPii? currentActivity = ActivityWithPii.Wrap(Activity.Current);
string? traceParentValue = null;
string? traceStateValue = null;
diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
index 90c69abd9a..e7c2448f35 100644
--- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
+++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
@@ -43,7 +43,11 @@ internal class FileExporter : BaseExporter
{
TypeInfoResolver = JsonTypeInfoResolver.Combine(
SerializableActivitySerializerContext.Default,
- new DefaultJsonTypeInfoResolver())
+ new DefaultJsonTypeInfoResolver()),
+ Converters = {
+ // Unredacts any redacted values in the trace when serializing to JSON, so that the full value is available in the file. This is needed since the file exporter is opt-in and users would expect to see the full value in the file.
+ new UnredactConverter(),
+ },
};
private readonly TracingFile _tracingFile;
diff --git a/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj b/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj
index d45d327253..e104827170 100644
--- a/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj
+++ b/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj
@@ -15,4 +15,8 @@
+
+
+
+
diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs
index f34c79bfc5..2b3a8dcf75 100644
--- a/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs
+++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs
@@ -34,7 +34,11 @@ internal sealed class ActivityProcessor : IDisposable
{
TypeInfoResolver = JsonTypeInfoResolver.Combine(
SerializableActivitySerializerContext.Default,
- new DefaultJsonTypeInfoResolver())
+ new DefaultJsonTypeInfoResolver()),
+ Converters = {
+ // Unredacts any redacted values in the trace when serializing to JSON, so that the full value is available in the file. This is needed since the file exporter is opt-in and users would expect to see the full value in the file.
+ new UnredactConverter(),
+ }
};
private Task? _processingTask;
private readonly Channel _channel;
diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs
index 2af071a681..9134bb6e24 100644
--- a/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs
+++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs
@@ -98,8 +98,8 @@ internal SerializableActivity(Activity activity) : this(
activity.ParentSpanId,
activity.IdFormat,
activity.TagObjects.ToDictionary(kv => kv.Key, kv => kv.Value),
- activity.Events.Select(e => (SerializableActivityEvent)e).ToArray(),
- activity.Links.Select(l => (SerializableActivityLink)l).ToArray(),
+ [.. activity.Events.Select(e => (SerializableActivityEvent)e)],
+ [.. activity.Links.Select(l => (SerializableActivityLink)l)],
activity.Baggage.ToDictionary(kv => kv.Key, kv => kv.Value))
{ }
@@ -148,7 +148,7 @@ public static implicit operator SerializableActivityEvent(ActivityEvent source)
{
Name = source.Name,
Timestamp = source.Timestamp,
- Tags = source.Tags.ToArray(),
+ Tags = [.. source.Tags],
};
}
}
diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivitySerializerContext.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivitySerializerContext.cs
index 7e503bb52e..5637232fb5 100644
--- a/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivitySerializerContext.cs
+++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivitySerializerContext.cs
@@ -17,6 +17,7 @@
using System;
using System.Text.Json.Serialization;
+using Apache.Arrow.Adbc.Tracing;
namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener
{
@@ -55,6 +56,7 @@ namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener
[JsonSerializable(typeof(bool[]))]
[JsonSerializable(typeof(Uri))]
+ [JsonSerializable(typeof(RedactedValue))]
internal partial class SerializableActivitySerializerContext : JsonSerializerContext
{
}
diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/UnredactConverter.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/UnredactConverter.cs
new file mode 100644
index 0000000000..e5f771d726
--- /dev/null
+++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/UnredactConverter.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Apache.Arrow.Adbc.Tracing;
+
+namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener
+{
+ public class UnredactConverter : JsonConverter
+ {
+ public override bool CanConvert(Type typeToConvert) => typeToConvert == typeof(RedactedValue);
+ public override RedactedValue? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException();
+ public override void Write(Utf8JsonWriter writer, RedactedValue value, JsonSerializerOptions options)
+ {
+ writer.WriteRawValue(JsonSerializer.Serialize(value.GetValue(), options));
+ }
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/ActivityWithPiiTests.cs b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/ActivityWithPiiTests.cs
new file mode 100644
index 0000000000..89a65b7126
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/ActivityWithPiiTests.cs
@@ -0,0 +1,170 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using Apache.Arrow.Adbc.Tracing;
+using Xunit;
+
+namespace Apache.Arrow.Adbc.Testing.Tracing
+{
+
+ public class ActivityWithPiiTests
+ {
+ private class RedactedValueTestData : TheoryData
+ {
+ public RedactedValueTestData()
+ {
+ Add(null, null);
+ Add((sbyte)1, (sbyte)1);
+ Add((byte)1, (byte)1);
+ Add((short)1, (short)1);
+ Add((ushort)1, (ushort)1);
+ Add((int)1, (int)1);
+ Add((uint)1, (uint)1);
+ Add((long)1, (long)1);
+ Add((ulong)1, (ulong)1);
+ Add((decimal)1, (decimal)1);
+ Add((float)1, (float)1);
+ Add((double)1, (double)1);
+ Add(true, true);
+ Add('A', 'A');
+ Add("string", "string" );
+ Add(new object?[] { null }, new object?[] { null });
+ Add(new object?[] { (sbyte)1 }, new object?[] { (sbyte)1 });
+ Add(new object?[] { (byte)1 }, new object?[] { (byte)1 });
+ Add(new object?[] { (short)1 }, new object?[] { (short)1 });
+ Add(new object?[] { (ushort)1 }, new object?[] { (ushort)1 });
+ Add(new object?[] { (int)1 }, new object?[] { (int)1 });
+ Add(new object?[] { (uint)1 }, new object?[] { (uint)1 });
+ Add(new object?[] { (long)1 }, new object?[] { (long)1 });
+ Add(new object?[] { (ulong)1 }, new object?[] { (ulong)1 });
+ Add(new object?[] { (decimal)1 }, new object?[] { (decimal)1 });
+ Add(new object?[] { (float)1 }, new object?[] { (float)1 });
+ Add(new object?[] { (double)1 }, new object?[] { (double)1 });
+ Add(new object?[] { true }, new object?[] { true });
+ Add(new object?[] { 'A' }, new object?[] { 'A' });
+ Add(new object?[] { "string" }, new object?[] { "string" });
+ }
+ }
+
+ [SkippableTheory]
+ [ClassData(typeof(RedactedValueTestData))]
+ public void CanAddTagDefaultIsPii(object? testValue, object? expectedValue)
+ {
+ string activityName = NewName();
+ Activity activity = new(activityName);
+ var activityWithPii = ActivityWithPii.New(activity);
+
+ Assert.NotNull(activityWithPii);
+ activityWithPii.AddTag("keyName", testValue);
+ var value = activity.TagObjects.First().Value as RedactedValue;
+ Assert.NotNull(value);
+ Assert.Equal(expectedValue, value.GetValue());
+ Assert.Equal(RedactedValue.DefaultValue, value.ToString());
+ }
+
+ [SkippableFact]
+ public void CanAddTagUsingDelegateDefaultIsPii()
+ {
+ List<(object? TestValue, object? ExpectedValue)> testData = [];
+ testData.Add((() => new object?[] { "string" }, new object?[] { "string" }));
+ testData.Add((() => new object?[] { 1 }, new object?[] { 1 }));
+ testData.Add((() => new object?[] { null }, new object?[] { null }));
+ testData.Add((getValue(1), "1"));
+ testData.Add((getAnotherValue(getValue(1)), "1"));
+
+ static Func getAnotherValue(Func anotherFunction) => () => anotherFunction();
+ static Func getValue(int localValue) => localValue.ToString;
+
+ string activityName = NewName();
+ Activity activity = new(activityName);
+ var activityWithPii = ActivityWithPii.New(activity);
+ int index = 0;
+ Assert.NotNull(activityWithPii);
+
+ foreach ((object? testValue, object? expectedValue) in testData)
+ {
+ string key = "keyName" + index++;
+ activityWithPii = activityWithPii.AddTag(key, testValue);
+ Assert.Contains(activity.TagObjects, e => e.Key == key);
+ var value = activity.TagObjects.Where(e => e.Key == key).First().Value as RedactedValue;
+ Assert.NotNull(value);
+ Assert.Equal(expectedValue, value.GetValue());
+ Assert.Equal(RedactedValue.DefaultValue, value.ToString());
+ }
+ }
+
+ [SkippableTheory]
+ [ClassData(typeof(RedactedValueTestData))]
+ public void CanAddTagIsPiiAsFalse(object? testValue, object? expectedValue)
+ {
+ string activityName = NewName();
+ Activity activity = new(activityName);
+ var activityWithPii = ActivityWithPii.New(activity);
+ Assert.NotNull(activityWithPii);
+
+ activityWithPii.AddTag("keyName", testValue, isPii: false);
+ object? value = activity.TagObjects.First().Value;
+ if (expectedValue == null)
+ {
+ Assert.Null(value);
+ }
+ else
+ {
+ Assert.NotNull(value);
+ Assert.IsNotType(value);
+ }
+ Assert.Equal(expectedValue, value);
+ }
+
+ [SkippableFact]
+ public void CanAddTagUsingDelegateIsPiiFalse()
+ {
+ List<(object? TestValue, object? ExpectedValue)> testData = [];
+ testData.Add((() => new object?[] { "string" }, new object?[] { "string" }));
+ testData.Add((() => new object?[] { 1 }, new object?[] { 1 }));
+ testData.Add((() => new object?[] { null }, new object?[] { null }));
+ testData.Add((getValue(1), "1"));
+ testData.Add((getAnotherValue(getValue(1)), "1"));
+
+ static Func getAnotherValue(Func anotherFunction) => () => anotherFunction();
+ static Func getValue(int localValue) => localValue.ToString;
+
+ string activityName = NewName();
+ Activity activity = new(activityName);
+ var activityWithPii = ActivityWithPii.New(activity);
+ int index = 0;
+ Assert.NotNull(activityWithPii);
+
+ foreach ((object? testValue, object? expectedValue) in testData)
+ {
+ string key = "keyName" + index++;
+ activityWithPii = activityWithPii.AddTag(key, testValue, false);
+ Assert.Contains(activity.TagObjects, e => e.Key == key);
+ object? value = activity.TagObjects.Where(e => e.Key == key).First().Value;
+ Assert.NotNull(value);
+ Assert.IsNotType(value);
+ Assert.Equal(expectedValue, value);
+ }
+ }
+
+ private static string NewName() => new Guid().ToString("N");
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
index 021eaed534..75af8afc34 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs
@@ -28,7 +28,7 @@
using Xunit;
using Xunit.Abstractions;
-namespace Apache.Arrow.Adbc.Tests.Tracing
+namespace Apache.Arrow.Adbc.Testing.Tracing
{
public class TracingTests(ITestOutputHelper? outputHelper) : IDisposable
{
@@ -61,7 +61,7 @@ internal void CanStartActivity()
Assert.Equal(currLength, exportedActivities.Count);
int lineCount = 0;
- foreach (var exportedActivity in exportedActivities)
+ foreach (Activity exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
@@ -94,7 +94,7 @@ internal void CanAddEvent()
Assert.Equal(currLength, exportedActivities.Count);
int lineCount = 0;
- foreach (var exportedActivity in exportedActivities)
+ foreach (Activity exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
@@ -120,7 +120,7 @@ internal void CanAddActivityWithDepth()
testClass.MethodWithActivityRecursive(nameof(TraceProducer.MethodWithActivityRecursive), recurseCount);
int lineCount = 0;
- foreach (var exportedActivity in exportedActivities)
+ foreach (Activity exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
@@ -148,7 +148,7 @@ internal void CanAddTraceParent()
const string eventNameWithParent = "eventNameWithParent";
const string eventNameWithoutParent = "eventNameWithoutParent";
testClass.MethodWithActivity(eventNameWithoutParent);
- Assert.True(exportedActivities.Count() > 0);
+ Assert.True(exportedActivities.Count > 0);
const int withParentCountExpected = 10;
for (int i = 0; i < withParentCountExpected; i++)
@@ -157,12 +157,12 @@ internal void CanAddTraceParent()
}
testClass.MethodWithActivity(eventNameWithoutParent);
- Assert.True(exportedActivities.Count() > 0);
+ Assert.True(exportedActivities.Count > 0);
int lineCount = 0;
int withParentCount = 0;
int withoutParentCount = 0;
- foreach (var exportedActivity in exportedActivities)
+ foreach (Activity exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
@@ -279,7 +279,7 @@ internal void CanSetTraceParentOnStatement()
string eventName1 = NewName();
statement.MethodWithActivity(eventName1);
Assert.Single(exportedActivities);
- var activity1 = exportedActivities.First();
+ Activity activity1 = exportedActivities.First();
Assert.Equal(connectionTraceParent, activity1.ParentId);
// Test 2: Set statement-specific trace parent
@@ -291,7 +291,7 @@ internal void CanSetTraceParentOnStatement()
string eventName2 = NewName();
statement.MethodWithActivity(eventName2);
Assert.Single(exportedActivities);
- var activity2 = exportedActivities.First();
+ Activity activity2 = exportedActivities.First();
Assert.Equal(statementTraceParent, activity2.ParentId);
// Test 3: Set trace parent to null (should fall back to connection's trace parent)
@@ -302,7 +302,7 @@ internal void CanSetTraceParentOnStatement()
string eventName3 = NewName();
statement.MethodWithActivity(eventName3);
Assert.Single(exportedActivities);
- var activity3 = exportedActivities.First();
+ Activity activity3 = exportedActivities.First();
Assert.Equal(connectionTraceParent, activity3.ParentId);
}
@@ -344,12 +344,12 @@ internal void MethodWithNoInstrumentation()
internal void MethodWithActivity()
{
- _trace.TraceActivity(_ => { });
+ _trace.TraceActivity(_ => { }, exceptionHasPii: true);
}
internal void MethodWithActivity(string activityName, string? traceParent = default)
{
- _trace.TraceActivity(activity => { }, activityName: activityName, traceParent: traceParent);
+ _trace.TraceActivity(activity => { }, activityName: activityName, traceParent: traceParent, exceptionHasPii: true);
}
internal void MethodWithActivityRecursive(string activityName, int recurseCount)
@@ -361,12 +361,12 @@ internal void MethodWithActivityRecursive(string activityName, int recurseCount)
{
MethodWithActivityRecursive(activityName, recurseCount);
}
- }, activityName: activityName + recurseCount.ToString());
+ }, activityName: activityName + recurseCount.ToString(), exceptionHasPii: true);
}
internal void MethodWithEvent(string eventName)
{
- _trace.TraceActivity((activity) => activity?.AddEvent(eventName));
+ _trace.TraceActivity((ActivityWithPii? activity) => activity?.AddEvent(eventName));
}
internal void MethodWithAllProperties(
@@ -375,16 +375,16 @@ internal void MethodWithAllProperties(
IReadOnlyList> tags,
string traceParent)
{
- _trace.TraceActivity(activity =>
+ _trace.TraceActivity((activity) =>
{
foreach (KeyValuePair tag in tags)
{
activity?.AddTag(tag.Key, tag.Value)
.AddBaggage(tag.Key, tag.Value?.ToString());
}
- activity?.AddEvent(eventName, tags)
+ activity?.AddEvent(eventName, tags)?
.AddLink(traceParent, tags);
- }, activityName: activityName, traceParent: traceParent);
+ }, activityName: activityName, traceParent: traceParent, exceptionHasPii: true);
}
protected virtual void Dispose(bool disposing)
@@ -416,7 +416,7 @@ private class MyTracingConnection(IReadOnlyDictionary properties
public void MethodWithActivity()
{
- this.TraceActivity(activity =>
+ this.TraceActivity((ActivityWithPii? activity) =>
{
activity?.AddTag("exampleTag", "exampleValue")
.AddBaggage("exampleBaggage", "exampleBaggageValue")
@@ -428,7 +428,7 @@ public void MethodWithActivity()
public async Task MethodWithInvalidAsyncTraceActivity1()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
- return await this.TraceActivity(async activity =>
+ return await this.TraceActivity(async (ActivityWithPii? activity) =>
{
await Task.Delay(1);
return true;
@@ -438,7 +438,7 @@ public async Task MethodWithInvalidAsyncTraceActivity1()
public async Task MethodWithInvalidAsyncTraceActivity2()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
- await this.TraceActivity(async activity =>
+ await this.TraceActivity(async (ActivityWithPii? activity) =>
{
await Task.Delay(1);
return;
@@ -448,7 +448,7 @@ await this.TraceActivity(async activity =>
public async ValueTask MethodWithInvalidAsyncTraceActivity3()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
- return await this.TraceActivity(async activity =>
+ return await this.TraceActivity(async (ActivityWithPii? activity) =>
{
await Task.Delay(1);
return true;
@@ -458,7 +458,7 @@ public async ValueTask MethodWithInvalidAsyncTraceActivity3()
public async ValueTask MethodWithInvalidAsyncTraceActivity4()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
- await this.TraceActivity(async activity =>
+ await this.TraceActivity(async (ActivityWithPii? activity) =>
{
await Task.Delay(1);
return;
@@ -468,7 +468,7 @@ await this.TraceActivity(async activity =>
public async Task MethodWithInvalidAsyncTraceActivity5()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
- return await this.TraceActivity(async activity =>
+ return await this.TraceActivity(async (ActivityWithPii? activity) =>
{
await Task.Delay(1);
return await new AwaitableBool();
@@ -509,7 +509,7 @@ private class MyTracingStatement(TracingConnection connection) : TracingStatemen
public void MethodWithActivity(string activityName)
{
- this.TraceActivity(activity =>
+ this.TraceActivity((ActivityWithPii? activity) =>
{
activity?.AddTag("testTag", "testValue");
}, activityName);
diff --git a/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs
index f5bf6c1aa5..44d4cf325d 100644
--- a/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs
+++ b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs
@@ -35,9 +35,18 @@ public class FileActivityListenerTests
[InlineData(ListenersOptions.Exporters.AdbcFile, true)]
public void TestTryActivateFileListener(string? exporterOption, bool expected)
{
- Assert.Equal(expected, FileActivityListener.TryActivateFileListener("TestSource", exporterOption, out FileActivityListener? listener));
- Assert.True(expected == (listener != null));
- listener?.Dispose();
+ string? previousEnvValue = Environment.GetEnvironmentVariable(ListenersOptions.Environment.Exporter);
+ try
+ {
+ Environment.SetEnvironmentVariable(ListenersOptions.Environment.Exporter, null);
+ Assert.Equal(expected, FileActivityListener.TryActivateFileListener("TestSource", exporterOption, out FileActivityListener? listener));
+ Assert.True(expected == (listener != null));
+ listener?.Dispose();
+ }
+ finally
+ {
+ Environment.SetEnvironmentVariable(ListenersOptions.Environment.Exporter, previousEnvValue);
+ }
}
[Fact]
@@ -122,7 +131,7 @@ public TestConnection(IReadOnlyDictionary properties)
public async Task EmulateWorkAsync(string key, string value)
{
- await this.TraceActivityAsync(async (activity) =>
+ await this.TraceActivityAsync(async (ActivityWithPii? activity) =>
{
activity?.AddTag(key, value);
// Simulate some work
diff --git a/csharp/test/Telemetry/Traces/Listeners/FileListener/SerializableActivityTests.cs b/csharp/test/Telemetry/Traces/Listeners/FileListener/SerializableActivityTests.cs
index d8e03c6fa9..9dea7f7ec5 100644
--- a/csharp/test/Telemetry/Traces/Listeners/FileListener/SerializableActivityTests.cs
+++ b/csharp/test/Telemetry/Traces/Listeners/FileListener/SerializableActivityTests.cs
@@ -20,6 +20,7 @@
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener;
+using Apache.Arrow.Adbc.Tracing;
using Xunit.Abstractions;
namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.FileListener
@@ -93,7 +94,7 @@ public SerializableActivityTests(ITestOutputHelper output)
[Fact]
public async Task CannnotSerializeAnonymousObjectWithSerializerContext()
{
- Activity activity = new Activity("activity");
+ Activity activity = new("activity");
using (activity.Start())
{
activity.AddTag("key1", new { Field1 = "value1" });
@@ -163,7 +164,7 @@ await JsonSerializer.SerializeAsync(
[Fact]
public async Task CanSerializeAnonymousObjectWithDefaultTypeInfoResolver()
{
- Activity activity = new Activity("activity");
+ Activity activity = new("activity");
using (activity.Start())
{
activity.AddTag("key1", new { Field1 = "value1" });
@@ -184,5 +185,76 @@ await JsonSerializer.SerializeAsync(
_output.WriteLine("Serialized Activity: {0}", Encoding.UTF8.GetString(stream.ToArray()));
}
}
+
+ [Fact]
+ public async Task CanRedactValue()
+ {
+ string activityName = NewName();
+ Activity activity = new Activity(activityName).Start();
+ using ActivityWithPii? activityWithPii = ActivityWithPii.New(activity);
+ Assert.NotNull(activityWithPii);
+ string testValue = NewName();
+ activityWithPii.AddTag("keyName", new RedactedValue(testValue));
+ var value = activity.TagObjects.First().Value as RedactedValue;
+ Assert.NotNull(value);
+ Assert.Equal("[REDACTED]", value.ToString());
+ Assert.Equal(testValue, value.GetValue());
+
+ SerializableActivity serializableActivity = new(activity);
+ var stream = new MemoryStream();
+ JsonSerializerOptions serializerOptions = new()
+ {
+ TypeInfoResolver = JsonTypeInfoResolver.Combine(
+ SerializableActivitySerializerContext.Default,
+ new DefaultJsonTypeInfoResolver()),
+ };
+ await JsonSerializer.SerializeAsync(
+ stream,
+ serializableActivity,
+ serializerOptions);
+ Assert.NotNull(stream);
+ string actualString = Encoding.UTF8.GetString(stream.ToArray());
+ Assert.DoesNotContain(testValue, actualString);
+ Assert.Contains("[REDACTED]", actualString);
+ }
+
+ [Fact]
+ public async Task CanUnredactValue()
+ {
+ string activityName = NewName();
+ Activity activity = new Activity(activityName).Start();
+ using ActivityWithPii? activityWithPii = ActivityWithPii.New(activity);
+ Assert.NotNull(activityWithPii);
+ activity.Start();
+ string testValue = NewName();
+ activityWithPii.AddTag("keyName", new RedactedValue(testValue));
+ var value = activity.TagObjects.First().Value as RedactedValue;
+ Assert.NotNull(value);
+ Assert.Equal("[REDACTED]", value.ToString());
+ Assert.Equal(testValue, value.GetValue());
+
+ SerializableActivity serializableActivity = new(activity);
+ var stream = new MemoryStream();
+ JsonSerializerOptions serializerOptions = new()
+ {
+ TypeInfoResolver = JsonTypeInfoResolver.Combine(
+ SerializableActivitySerializerContext.Default,
+ new DefaultJsonTypeInfoResolver()),
+ Converters =
+ {
+ new UnredactConverter()
+ },
+ };
+ await JsonSerializer.SerializeAsync(
+ stream,
+ serializableActivity,
+ serializerOptions);
+ Assert.NotNull(stream);
+ string actualString = Encoding.UTF8.GetString(stream.ToArray());
+ Assert.Contains(testValue, actualString);
+ Assert.DoesNotContain("[REDACTED]", actualString);
+ }
+
+ private string NewName() => Guid.NewGuid().ToString("N");
}
}