Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions csharp/src/BigQueryParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal class BigQueryParameters
public const string ClientSecret = "adbc.bigquery.client_secret";
public const string ClientTimeout = "adbc.bigquery.client.timeout";
public const string EvaluationKind = "adbc.bigquery.multiple_statement.evaluation_kind";
public const string DisableExplicitCancel = "adbc.bigquery.disable_explicit_cancel";
public const string GetQueryResultsOptionsTimeout = "adbc.bigquery.get_query_results_options.timeout";
public const string IncludeConstraintsWithGetObjects = "adbc.bigquery.include_constraints_getobjects";
public const string IncludePublicProjectId = "adbc.bigquery.include_public_project_id";
Expand Down
57 changes: 41 additions & 16 deletions csharp/src/BigQueryStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
}

using JobCancellationContext cancellationContext = new JobCancellationContext(cancellationRegistry, job);
bool disableExplicitCancel = Options?.ContainsKey(BigQueryParameters.DisableExplicitCancel) == true;
using ICancellationContext cancellationContext = CancellationContext.New(cancellationRegistry, disableExplicitCancel, job);

// We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
// When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
Expand Down Expand Up @@ -224,7 +225,8 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false);

// Note: MultiArrowReader must dispose the cancellationContext.
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry));
ICancellationContext cancellationContext1 = CancellationContext.New(cancellationRegistry, disableExplicitCancel);
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, cancellationContext1);
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows);
return new QueryResult(totalRows, stream);
});
Expand Down Expand Up @@ -285,7 +287,9 @@ private async Task<UpdateResult> ExecuteUpdateInternalAsync()

activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace);

using JobCancellationContext context = new(cancellationRegistry);
bool disableExplicitCancel = Options?.ContainsKey(BigQueryParameters.DisableExplicitCancel) == true;
using ICancellationContext context = CancellationContext.New(cancellationRegistry, disableExplicitCancel);

// Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
Func<Task<BigQueryResults?>> getQueryResultsAsyncFunc = async () =>
{
Expand Down Expand Up @@ -574,9 +578,9 @@ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity?
await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs, cancellationToken);

private async Task<T> ExecuteCancellableJobAsync<T>(
JobCancellationContext context,
ICancellationContext context,
Activity? activity,
Func<JobCancellationContext, Task<T>> func)
Func<ICancellationContext, Task<T>> func)
{
try
{
Expand Down Expand Up @@ -610,19 +614,40 @@ private async Task<T> ExecuteCancellableJobAsync<T>(
}
}

private class CancellationContext : IDisposable
private interface ICancellationContext : IDisposable
{
BigQueryJob? Job { get; set; }
CancellationToken CancellationToken { get; }
void Cancel();
}

private class CancellationContext : ICancellationContext
{
private readonly CancellationRegistry cancellationRegistry;
private readonly CancellationTokenSource cancellationTokenSource;
private bool disposed;

public CancellationContext(CancellationRegistry cancellationRegistry)
public static readonly ICancellationContext Null = new NullCancellationContext();

public static ICancellationContext New(CancellationRegistry cancellationRegistry, bool disableExplicitCancel = false, BigQueryJob? job = default)
{
if (disableExplicitCancel)
{
return Null;
}
return new CancellationContext(cancellationRegistry, job);
}

private CancellationContext(CancellationRegistry cancellationRegistry, BigQueryJob? job = default)
{
cancellationTokenSource = new CancellationTokenSource();
this.Job = job;
this.cancellationRegistry = cancellationRegistry;
this.cancellationRegistry.Register(this);
}

public BigQueryJob? Job { get; set; }

public CancellationToken CancellationToken => cancellationTokenSource.Token;

public void Cancel()
Expand All @@ -641,15 +666,15 @@ public virtual void Dispose()
}
}

private class JobCancellationContext : CancellationContext
private class NullCancellationContext : ICancellationContext
{
public JobCancellationContext(CancellationRegistry cancellationRegistry, BigQueryJob? job = default)
: base(cancellationRegistry)
{
Job = job;
}
public BigQueryJob? Job { get; set; } = null;

public BigQueryJob? Job { get; set; }
public CancellationToken CancellationToken { get; } = default;

public void Cancel() { }

public void Dispose() { }
}

private sealed class CancellationRegistry : IDisposable
Expand Down Expand Up @@ -698,12 +723,12 @@ private class MultiArrowReader : TracingReader
private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));

readonly Schema schema;
readonly CancellationContext cancellationContext;
readonly ICancellationContext cancellationContext;
IEnumerator<IArrowReader>? readers;
IArrowReader? reader;
bool disposed;

public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<IArrowReader> readers, CancellationContext cancellationContext) : base(statement)
public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<IArrowReader> readers, ICancellationContext cancellationContext) : base(statement)
{
this.schema = schema;
this.readers = readers.GetEnumerator();
Expand Down
69 changes: 69 additions & 0 deletions csharp/test/StatementTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,75 @@ public async Task CanCancelStreamFromStatement()
}
}

[Fact]
public async Task CanDisableExplicitCancelStatement()
{
foreach (BigQueryTestEnvironment environment in _environments)
{
AdbcConnection adbcConnection = GetAdbcConnection(environment.Name);

AdbcStatement statement = adbcConnection.CreateStatement();
statement.SetOption(BigQueryParameters.DisableExplicitCancel, "true");

// Execute the query/cancel multiple times to validate consistent behavior
const int iterations = 3;
for (int i = 0; i < iterations; i++)
{
_outputHelper?.WriteLine($"Iteration {i + 1} of {iterations}");
// Generate unique column names so query will not be served from cache
string columnName1 = Guid.NewGuid().ToString("N");
string columnName2 = Guid.NewGuid().ToString("N");
statement.SqlQuery = $"SELECT GENERATE_ARRAY(`{columnName2}`, 10000) AS `{columnName1}` FROM UNNEST(GENERATE_ARRAY(0, 100000)) AS `{columnName2}`";
_outputHelper?.WriteLine($"Query: {statement.SqlQuery}");

// Expect this to take about 10 seconds without cancellation
Task<QueryResult> queryTask = Task.Run(statement.ExecuteQuery);

await Task.Yield();
statement.Cancel();

QueryResult queryResult = await queryTask;
// Should not throw OperationCanceledException
}
}
}

[Fact]
public async Task CanDisableExplicitCancelStreamFromStatement()
{
foreach (BigQueryTestEnvironment environment in _environments)
{
using AdbcConnection adbcConnection = GetAdbcConnection(environment.Name);
using AdbcStatement statement = adbcConnection.CreateStatement();
statement.SetOption(BigQueryParameters.DisableExplicitCancel, "true");

// Execute the query/cancel multiple times to validate consistent behavior
const int iterations = 3;
QueryResult[] results = new QueryResult[iterations];
for (int i = 0; i < iterations; i++)
{
_outputHelper?.WriteLine($"Iteration {i + 1} of {iterations}");
// Generate unique column names so query will not be served from cache
string columnName1 = Guid.NewGuid().ToString("N");
string columnName2 = Guid.NewGuid().ToString("N");
statement.SqlQuery = $"SELECT `{columnName2}` AS `{columnName1}` FROM UNNEST(GENERATE_ARRAY(1, 100)) AS `{columnName2}`";
_outputHelper?.WriteLine($"Query: {statement.SqlQuery}");

// Expect this to take about 10 seconds without cancellation
results[i] = statement.ExecuteQuery();
}
statement.Cancel();
for (int index = 0; index < iterations; index++)
{
QueryResult queryResult = results[index];
using IArrowArrayStream? stream = queryResult.Stream;
Assert.NotNull(stream);
RecordBatch batch = await stream.ReadNextRecordBatchAsync();
// Should not throw OperationCanceledException
}
}
}

private AdbcConnection GetAdbcConnection(string? environmentName)
{
if (string.IsNullOrEmpty(environmentName))
Expand Down
Loading