diff --git a/ci/azure-pipelines.yml b/ci/azure-pipelines.yml index 1248050..9a7045d 100644 --- a/ci/azure-pipelines.yml +++ b/ci/azure-pipelines.yml @@ -74,12 +74,22 @@ steps: arguments: '--configuration $(buildConfiguration) --no-restore' - task: DotNetCoreCLI@2 - displayName: 'Run tests on $(osName)' + displayName: 'Run all tests on $(osName)' inputs: command: 'test' projects: 'src/net-questdb-client-tests/net-questdb-client-tests.csproj' arguments: '--configuration $(buildConfiguration) --framework net9.0 --no-build --verbosity normal --logger trx --collect:"XPlat Code Coverage"' publishTestResults: true + condition: eq(variables['osName'], 'Linux') + +- task: DotNetCoreCLI@2 + displayName: 'Run tests on $(osName) (excluding integration tests)' + inputs: + command: 'test' + projects: 'src/net-questdb-client-tests/net-questdb-client-tests.csproj' + arguments: '--configuration $(buildConfiguration) --framework net9.0 --no-build --verbosity normal --logger trx --collect:"XPlat Code Coverage" --filter "FullyQualifiedName!~QuestDbIntegrationTests"' + publishTestResults: true + condition: ne(variables['osName'], 'Linux') - task: PublishCodeCoverageResults@2 displayName: 'Publish code coverage' diff --git a/src/dummy-http-server/DummyHttpServer.cs b/src/dummy-http-server/DummyHttpServer.cs index 91a21bc..d298e98 100644 --- a/src/dummy-http-server/DummyHttpServer.cs +++ b/src/dummy-http-server/DummyHttpServer.cs @@ -41,6 +41,11 @@ public class DummyHttpServer : IDisposable private readonly WebApplication _app; private int _port = 29743; private readonly TimeSpan? _withStartDelay; + private readonly bool _withTokenAuth; + private readonly bool _withBasicAuth; + private readonly bool _withRetriableError; + private readonly bool _withErrorMessage; + private readonly bool _requireClientCert; /// /// Initializes a configurable in-process dummy HTTP server used for testing endpoints. @@ -63,11 +68,20 @@ public DummyHttpServer(bool withTokenAuth = false, bool withBasicAuth = false, b .AddConsole(); }); + // Store configuration in instance fields instead of static fields + // to avoid interference between multiple concurrent servers + _withTokenAuth = withTokenAuth; + _withBasicAuth = withBasicAuth; + _withRetriableError = withRetriableError; + _withErrorMessage = withErrorMessage; + _withStartDelay = withStartDelay; + _requireClientCert = requireClientCert; + + // Also set static flags for backwards compatibility IlpEndpoint.WithTokenAuth = withTokenAuth; IlpEndpoint.WithBasicAuth = withBasicAuth; IlpEndpoint.WithRetriableError = withRetriableError; IlpEndpoint.WithErrorMessage = withErrorMessage; - _withStartDelay = withStartDelay; if (withTokenAuth) { @@ -91,9 +105,8 @@ public DummyHttpServer(bool withTokenAuth = false, bool withBasicAuth = false, b } o.Limits.MaxRequestBodySize = 1073741824; - o.ListenLocalhost(29474, - options => { options.UseHttps(); }); - o.ListenLocalhost(29473); + // Note: These internal ports will be set dynamically in StartAsync based on the main port + // to avoid conflicts when multiple DummyHttpServer instances are created }); _app = bld.Build(); @@ -101,6 +114,13 @@ public DummyHttpServer(bool withTokenAuth = false, bool withBasicAuth = false, b _app.MapHealthChecks("/ping"); _app.UseDefaultExceptionHandler(); + // Add middleware to set X-Server-Port header so endpoints know which port they're running on + _app.Use(async (context, next) => + { + context.Request.Headers["X-Server-Port"] = _port.ToString(); + await next(); + }); + if (withTokenAuth) { _app @@ -126,10 +146,7 @@ public void Dispose() /// public void Clear() { - IlpEndpoint.ReceiveBuffer.Clear(); - IlpEndpoint.ReceiveBytes.Clear(); - IlpEndpoint.LastError = null; - IlpEndpoint.Counter = 0; + IlpEndpoint.ClearPort(_port); } /// @@ -148,7 +165,18 @@ public async Task StartAsync(int port = 29743, int[]? versions = null) versions ??= new[] { 1, 2, 3, }; SettingsEndpoint.Versions = versions; _port = port; - _ = _app.RunAsync($"http://localhost:{port}"); + + // Store configuration flags keyed by port so multiple servers don't interfere + IlpEndpoint.SetPortConfig(port, + tokenAuth: _withTokenAuth, + basicAuth: _withBasicAuth, + retriableError: _withRetriableError, + errorMessage: _withErrorMessage); + + var url = _requireClientCert + ? $"https://localhost:{port}" + : $"http://localhost:{port}"; + _ = _app.RunAsync(url); } /// @@ -170,7 +198,7 @@ public async Task StopAsync() /// The mutable containing the accumulated received text; modifying it updates the server's buffer. public StringBuilder GetReceiveBuffer() { - return IlpEndpoint.ReceiveBuffer; + return IlpEndpoint.GetReceiveBuffer(_port); } /// @@ -179,12 +207,12 @@ public StringBuilder GetReceiveBuffer() /// The mutable list of bytes received by the endpoint. public List GetReceivedBytes() { - return IlpEndpoint.ReceiveBytes; + return IlpEndpoint.GetReceiveBytes(_port); } public Exception? GetLastError() { - return IlpEndpoint.LastError; + return IlpEndpoint.GetLastError(_port); } public async Task Healthcheck() @@ -215,7 +243,7 @@ public async Task Healthcheck() public int GetCounter() { - return IlpEndpoint.Counter; + return IlpEndpoint.GetCounter(_port); } /// diff --git a/src/dummy-http-server/IlpEndpoint.cs b/src/dummy-http-server/IlpEndpoint.cs index 7218245..daa5311 100644 --- a/src/dummy-http-server/IlpEndpoint.cs +++ b/src/dummy-http-server/IlpEndpoint.cs @@ -83,14 +83,117 @@ public class IlpEndpoint : Endpoint { private const string Username = "admin"; private const string Password = "quest"; - public static readonly StringBuilder ReceiveBuffer = new(); - public static readonly List ReceiveBytes = new(); - public static Exception? LastError = new(); + + // Port-keyed storage to support multiple concurrent DummyHttpServer instances + private static readonly Dictionary Bytes, Exception? Error, int Counter)> + PortData = new(); + + // Port-keyed configuration to support multiple concurrent DummyHttpServer instances + private static readonly Dictionary + PortConfig = new(); + + // Configuration flags (global, apply to all servers) - kept for backwards compatibility public static bool WithTokenAuth = false; public static bool WithBasicAuth = false; public static bool WithRetriableError = false; public static bool WithErrorMessage = false; - public static int Counter; + + // Get the port from request headers (set by DummyHttpServer) + private static int GetPortKey(HttpContext context) + { + if (context?.Request.Headers.TryGetValue("X-Server-Port", out var portHeader) == true + && int.TryParse(portHeader.ToString(), out var port)) + { + return port; + } + return context?.Connection?.LocalPort ?? 0; + } + + private static (StringBuilder Buffer, List Bytes, Exception? Error, int Counter) GetOrCreatePortData(int port) + { + lock (PortData) + { + if (!PortData.TryGetValue(port, out var data)) + { + data = (new StringBuilder(), new List(), null, 0); + PortData[port] = data; + } + return data; + } + } + + + // Public methods for accessing port-specific data (used by DummyHttpServer) + public static StringBuilder GetReceiveBuffer(int port) => GetOrCreatePortData(port).Buffer; + public static List GetReceiveBytes(int port) => GetOrCreatePortData(port).Bytes; + + public static Exception? GetLastError(int port) + { + lock (PortData) + { + return GetOrCreatePortData(port).Error; + } + } + + public static void SetLastError(int port, Exception? error) + { + lock (PortData) + { + var data = GetOrCreatePortData(port); + PortData[port] = (data.Buffer, data.Bytes, error, data.Counter); + } + } + + public static int GetCounter(int port) + { + lock (PortData) + { + return GetOrCreatePortData(port).Counter; + } + } + + public static void SetCounter(int port, int value) + { + lock (PortData) + { + var data = GetOrCreatePortData(port); + PortData[port] = (data.Buffer, data.Bytes, data.Error, value); + } + } + + public static void ClearPort(int port) + { + lock (PortData) + { + if (PortData.TryGetValue(port, out var data)) + { + data.Buffer.Clear(); + data.Bytes.Clear(); + PortData[port] = (data.Buffer, data.Bytes, null, 0); + } + } + } + + public static void SetPortConfig(int port, bool tokenAuth, bool basicAuth, bool retriableError, bool errorMessage) + { + lock (PortConfig) + { + PortConfig[port] = (tokenAuth, basicAuth, retriableError, errorMessage); + } + } + + private static (bool TokenAuth, bool BasicAuth, bool RetriableError, bool ErrorMessage) GetPortConfig(int port) + { + lock (PortConfig) + { + if (PortConfig.TryGetValue(port, out var config)) + { + return config; + } + // Return static flags as defaults for backwards compatibility + return (WithTokenAuth, WithBasicAuth, WithRetriableError, WithErrorMessage); + } + } public override void Configure() { @@ -111,14 +214,24 @@ public override void Configure() public override async Task HandleAsync(Request req, CancellationToken ct) { - Counter++; - if (WithRetriableError) + int port = GetPortKey(HttpContext); + var data = GetOrCreatePortData(port); + var config = GetPortConfig(port); + + lock (PortData) + { + // Increment counter for this port + data = GetOrCreatePortData(port); + PortData[port] = (data.Buffer, data.Bytes, data.Error, data.Counter + 1); + } + + if (config.RetriableError) { await SendAsync(null, 500, ct); return; } - if (WithErrorMessage) + if (config.ErrorMessage) { await SendAsync(new JsonErrorResponse { code = "code", errorId = "errorid", line = 1, message = "message", }, 400, ct); @@ -127,13 +240,22 @@ await SendAsync(new JsonErrorResponse try { - ReceiveBuffer.Append(req.StringContent); - ReceiveBytes.AddRange(req.ByteContent); + lock (PortData) + { + data = GetOrCreatePortData(port); + data.Buffer.Append(req.StringContent); + data.Bytes.AddRange(req.ByteContent); + PortData[port] = data; + } await SendNoContentAsync(ct); } catch (Exception ex) { - LastError = ex; + lock (PortData) + { + data = GetOrCreatePortData(port); + PortData[port] = (data.Buffer, data.Bytes, ex, data.Counter); + } throw; } } diff --git a/src/net-questdb-client-tests/HttpTests.cs b/src/net-questdb-client-tests/HttpTests.cs index acc5466..1a9fa2b 100644 --- a/src/net-questdb-client-tests/HttpTests.cs +++ b/src/net-questdb-client-tests/HttpTests.cs @@ -32,6 +32,7 @@ namespace net_questdb_client_tests; +[SetCulture("en-us")] public class HttpTests { private const string Host = "localhost"; @@ -177,6 +178,7 @@ await sender.Column("array", (ReadOnlySpan)aray.AsSpan()) } [Test] + // [Ignore("Test is broken - arrays are not validated until send. Needs redesign.")] public async Task BasicArrayDoubleNegotiationVersion2NotSupported() { { @@ -423,7 +425,7 @@ public async Task AuthBasicFailed() await server.StartAsync(HttpPort); using var sender = Sender.New( - $"https::addr={Host}:{HttpsPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); + $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;auto_flush=off;"); await sender.Table("metrics") .Symbol("tag", "value") .Column("number", 10) @@ -434,6 +436,7 @@ await sender.Table("metrics") async () => await sender.SendAsync(), Throws.TypeOf().With.Message.Contains("Unauthorized") ); + await server.StopAsync(); } [Test] @@ -443,7 +446,7 @@ public async Task AuthBasicSuccess() await server.StartAsync(HttpPort); using var sender = Sender.New( - $"https::addr={Host}:{HttpsPort};username=admin;password=quest;tls_verify=unsafe_off;auto_flush=off;"); + $"http::addr={Host}:{HttpPort};username=admin;password=quest;auto_flush=off;"); await sender.Table("metrics") .Symbol("tag", "value") .Column("number", 10) @@ -451,6 +454,7 @@ await sender.Table("metrics") .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); + await server.StopAsync(); } [Test] @@ -461,7 +465,7 @@ public async Task AuthTokenFailed() using var sender = Sender.New( - $"https::addr={Host}:{HttpsPort};token=askldaklds;tls_verify=unsafe_off;auto_flush=off;"); + $"http::addr={Host}:{HttpPort};token=askldaklds;auto_flush=off;"); for (var i = 0; i < 100; i++) { @@ -476,6 +480,7 @@ await sender async () => await sender.SendAsync(), Throws.TypeOf().With.Message.Contains("Unauthorized") ); + await srv.StopAsync(); } [Test] @@ -488,7 +493,7 @@ public async Task AuthTokenSuccess() using var sender = Sender.New( - $"https::addr={Host}:{HttpsPort};token={token};tls_verify=unsafe_off;auto_flush=off;"); + $"http::addr={Host}:{HttpPort};token={token};auto_flush=off;"); for (var i = 0; i < 100; i++) { @@ -500,6 +505,7 @@ await sender } await sender.SendAsync(); + await srv.StopAsync(); } @@ -1096,6 +1102,127 @@ await sender.Table("neg name") Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); } + [Test] + public async Task SendGuidColumn() + { + using var srv = new DummyHttpServer(); + await srv.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + + var guid = new Guid("550e8400-e29b-41d4-a716-446655440000"); + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("id", guid) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + var expected = "metrics,tag=value id=\"550e8400-e29b-41d4-a716-446655440000\" 1000000000\n"; + Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); + } + + [Test] + public async Task SendCharColumn() + { + using var srv = new DummyHttpServer(); + await srv.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("letter", 'A') + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + var expected = "metrics,tag=value letter=\"A\" 1000000000\n"; + Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); + } + + [Test] + public async Task SendMultipleGuidAndCharColumns() + { + using var srv = new DummyHttpServer(); + await srv.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + + var guid1 = new Guid("550e8400-e29b-41d4-a716-446655440000"); + var guid2 = new Guid("6ba7b810-9dad-11d1-80b4-00c04fd430c8"); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("id1", guid1) + .Column("letter1", 'X') + .Column("id2", guid2) + .Column("letter2", 'Y') + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + var expected = + "metrics,tag=value id1=\"550e8400-e29b-41d4-a716-446655440000\",letter1=\"X\",id2=\"6ba7b810-9dad-11d1-80b4-00c04fd430c8\",letter2=\"Y\" 1000000000\n"; + Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); + } + + [Test] + public async Task SendNullableGuidColumn() + { + using var srv = new DummyHttpServer(); + await srv.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + + var guid = new Guid("550e8400-e29b-41d4-a716-446655440000"); + + // Send with value + await sender.Table("metrics") + .Symbol("tag", "value1") + .NullableColumn("id", guid) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + // Send with null + await sender.Table("metrics") + .Symbol("tag", "value2") + .NullableColumn("id", (Guid?)null) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 2)); + + await sender.SendAsync(); + + var expected = "metrics,tag=value1 id=\"550e8400-e29b-41d4-a716-446655440000\" 1000000000\n" + + "metrics,tag=value2 2000000000\n"; + Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); + } + + [Test] + public async Task SendNullableCharColumn() + { + using var srv = new DummyHttpServer(); + await srv.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + + // Send with value + await sender.Table("metrics") + .Symbol("tag", "value1") + .NullableColumn("letter", 'Z') + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + // Send with null + await sender.Table("metrics") + .Symbol("tag", "value2") + .NullableColumn("letter", (char?)null) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 2)); + + await sender.SendAsync(); + + var expected = "metrics,tag=value1 letter=\"Z\" 1000000000\n" + + "metrics,tag=value2 2000000000\n"; + Assert.That(srv.PrintBuffer(), Is.EqualTo(expected)); + } + [Test] public async Task SendTagAfterField() { @@ -1732,8 +1859,15 @@ public async Task FailsWhenExpectingCert() using var server = new DummyHttpServer(requireClientCert: true); await server.StartAsync(HttpsPort); + using var sender = Sender.Configure($"https::addr=localhost:{HttpsPort};tls_verify=unsafe_off;").Build(); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 12.2) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + Assert.That( - () => Sender.Configure($"https::addr=localhost:{HttpsPort};tls_verify=unsafe_off;").Build(), + async () => await sender.SendAsync(), Throws.TypeOf().With.Message.Contains("ServerFlushError") ); diff --git a/src/net-questdb-client-tests/MultiUrlHttpTests.cs b/src/net-questdb-client-tests/MultiUrlHttpTests.cs new file mode 100644 index 0000000..93e719d --- /dev/null +++ b/src/net-questdb-client-tests/MultiUrlHttpTests.cs @@ -0,0 +1,353 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2024 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +using System.Net; +using dummy_http_server; +using NUnit.Framework; +using QuestDB; +using QuestDB.Utils; + +namespace net_questdb_client_tests; + +/// +/// Tests for multi-URL support in the HTTP sender with address rotation and failover. +/// +public class MultiUrlHttpTests +{ + private const string Host = "localhost"; + private const int HttpPort1 = 29475; + private const int HttpPort2 = 29476; + private const int HttpPort3 = 29477; + + [Test] + public void ParseMultipleAddresses_FromConfigString() + { + // Test parsing multiple addresses from config string + var options = new SenderOptions("http::addr=localhost:9000;addr=localhost:9001;addr=localhost:9002;auto_flush=off;"); + + Assert.That(options.AddressCount, Is.EqualTo(3)); + Assert.That(options.addresses[0], Is.EqualTo("localhost:9000")); + Assert.That(options.addresses[1], Is.EqualTo("localhost:9001")); + Assert.That(options.addresses[2], Is.EqualTo("localhost:9002")); + } + + [Test] + public void ParseMultipleAddresses_DefaultsToSingleAddress() + { + // Test that single address is handled correctly + var options = new SenderOptions("http::addr=localhost:9000;auto_flush=off;"); + + Assert.That(options.AddressCount, Is.EqualTo(1)); + Assert.That(options.addresses[0], Is.EqualTo("localhost:9000")); + } + + [Test] + public void ParseMultipleAddresses_NoAddrSpecified() + { + // Test that default address is used when none specified + var options = new SenderOptions("http::auto_flush=off;"); + + Assert.That(options.AddressCount, Is.GreaterThan(0)); + Assert.That(options.addresses[0], Is.EqualTo("localhost:9000")); + } + + [Test] + public async Task MultipleAddresses_SendToFirstAddress() + { + // Test sending to first address when it's available + using var server1 = new DummyHttpServer(withBasicAuth: false); + using var server2 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + + var configString = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};auto_flush=off;tls_verify=unsafe_off;"; + using var sender = Sender.New(configString); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 10) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + // First server should have received the data + Assert.That(server1.PrintBuffer(), Contains.Substring("metrics,tag=value number=10i")); + // Second server should not have received anything + Assert.That(server2.PrintBuffer(), Is.Empty); + + await server1.StopAsync(); + await server2.StopAsync(); + } + + [Test] + public async Task MultipleAddresses_FailoverOnRetriableError() + { + // Test failover to second address when first returns a retriable error + using var server1 = new DummyHttpServer(withBasicAuth: false, withRetriableError: true); + using var server2 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + + var configString = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};auto_flush=off;tls_verify=unsafe_off;retry_timeout=5000;"; + using var sender = Sender.New(configString); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 10) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + // Second server should have received the data after failover + Assert.That(server2.PrintBuffer(), Contains.Substring("metrics,tag=value number=10i")); + + await server1.StopAsync(); + await server2.StopAsync(); + } + + [Test] + public async Task MultipleAddresses_RoundRobinRotation() + { + // Test round-robin rotation across multiple addresses + using var server1 = new DummyHttpServer(withBasicAuth: false); + using var server2 = new DummyHttpServer(withBasicAuth: false); + using var server3 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + await server3.StartAsync(HttpPort3); + + var configString = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};addr={Host}:{HttpPort3};auto_flush=off;tls_verify=unsafe_off;retry_timeout=5000;"; + + // First request succeeds on server 1 + using var sender1 = Sender.New(configString); + await sender1.Table("test1").Column("val", 1).AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + await sender1.SendAsync(); + + // All three servers can receive, so data goes to first available (server 1) + Assert.That(server1.PrintBuffer(), Contains.Substring("test1")); + Assert.That(server2.PrintBuffer(), Is.Empty); + Assert.That(server3.PrintBuffer(), Is.Empty); + + await server1.StopAsync(); + await server2.StopAsync(); + await server3.StopAsync(); + } + + [Test] + public async Task MultipleAddresses_AllServersUnavailable() + { + // Test error when all addresses are unavailable + var configString = $"http::addr=localhost:29999;addr=localhost:29998;auto_flush=off;tls_verify=unsafe_off;retry_timeout=1000;"; + using var sender = Sender.New(configString); + + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 10) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + // Should throw an error since all servers are unavailable + var ex = Assert.ThrowsAsync(async () => await sender.SendAsync()); + Assert.That(ex?.Message, Does.Contain("Cannot connect")); + } + + [Test] + public async Task MultipleAddresses_SequentialAddresses() + { + // Test that we can send data across multiple available addresses + using var server1 = new DummyHttpServer(withBasicAuth: false); + using var server2 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + + // Create senders with different primary addresses + var configString1 = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};auto_flush=off;tls_verify=unsafe_off;"; + var configString2 = $"http::addr={Host}:{HttpPort2};addr={Host}:{HttpPort1};auto_flush=off;tls_verify=unsafe_off;"; + + using var sender1 = Sender.New(configString1); + await sender1.Table("metrics1").Column("number", 30).AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + await sender1.SendAsync(); + + using var sender2 = Sender.New(configString2); + await sender2.Table("metrics2").Column("number", 40).AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + await sender2.SendAsync(); + + // Both servers should have received data + Assert.That(server1.PrintBuffer(), Contains.Substring("metrics1")); + Assert.That(server2.PrintBuffer(), Contains.Substring("metrics2")); + + await server1.StopAsync(); + await server2.StopAsync(); + } + + [Test] + public async Task MultipleAddresses_SyncSend() + { + // Test synchronous send with multiple addresses + using var server1 = new DummyHttpServer(withBasicAuth: false); + using var server2 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + + var configString = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};auto_flush=off;tls_verify=unsafe_off;"; + using var sender = Sender.New(configString); + + sender.Table("metrics") + .Symbol("tag", "sync_test") + .Column("number", 42) + .At(new DateTime(1970, 01, 01, 0, 0, 1)); + + sender.Send(); + + // First server should have received the data + Assert.That(server1.PrintBuffer(), Contains.Substring("metrics,tag=sync_test number=42i")); + + await server1.StopAsync(); + await server2.StopAsync(); + } + + [Test] + public async Task MultipleAddresses_SuccessfulFirstAttempt() + { + // Test that no rotation occurs when first address succeeds + using var server1 = new DummyHttpServer(withBasicAuth: false); + using var server2 = new DummyHttpServer(withBasicAuth: false); + + await server1.StartAsync(HttpPort1); + await server2.StartAsync(HttpPort2); + + var configString = $"http::addr={Host}:{HttpPort1};addr={Host}:{HttpPort2};auto_flush=off;tls_verify=unsafe_off;"; + using var sender = Sender.New(configString); + + await sender.Table("metrics") + .Symbol("tag", "success") + .Column("number", 100) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + + await sender.SendAsync(); + + // Only first server should have received the data + Assert.That(server1.PrintBuffer(), Contains.Substring("metrics,tag=success number=100i")); + Assert.That(server2.PrintBuffer(), Is.Empty); + + await server1.StopAsync(); + await server2.StopAsync(); + } + + [Test] + public void AddressProvider_RoundRobinRotation() + { + // Test AddressProvider round-robin rotation logic + var addresses = new[] { "host1:9000", "host2:9001", "host3:9002" }; + var provider = new AddressProvider(addresses); + + Assert.That(provider.CurrentAddress, Is.EqualTo("host1:9000")); + Assert.That(provider.CurrentHost, Is.EqualTo("host1")); + Assert.That(provider.CurrentPort, Is.EqualTo(9000)); + Assert.That(provider.AddressCount, Is.EqualTo(3)); + Assert.That(provider.HasMultipleAddresses, Is.True); + + // Rotate to next + provider.RotateToNextAddress(); + Assert.That(provider.CurrentAddress, Is.EqualTo("host2:9001")); + Assert.That(provider.CurrentHost, Is.EqualTo("host2")); + Assert.That(provider.CurrentPort, Is.EqualTo(9001)); + + // Rotate to next + provider.RotateToNextAddress(); + Assert.That(provider.CurrentAddress, Is.EqualTo("host3:9002")); + + // Rotate back to first (round-robin) + provider.RotateToNextAddress(); + Assert.That(provider.CurrentAddress, Is.EqualTo("host1:9000")); + } + + [Test] + public void AddressProvider_ParseHostAndPort() + { + // Test host and port parsing with various formats + var provider1 = new AddressProvider(new[] { "192.168.1.1:9000" }); + Assert.That(provider1.CurrentHost, Is.EqualTo("192.168.1.1")); + Assert.That(provider1.CurrentPort, Is.EqualTo(9000)); + + var provider2 = new AddressProvider(new[] { "example.com:8080" }); + Assert.That(provider2.CurrentHost, Is.EqualTo("example.com")); + Assert.That(provider2.CurrentPort, Is.EqualTo(8080)); + + // IPv6 addresses with port (format: [ipv6]:port) + var provider3 = new AddressProvider(new[] { "[::1]:9000" }); + Assert.That(provider3.CurrentHost, Is.EqualTo("[::1]")); + Assert.That(provider3.CurrentPort, Is.EqualTo(9000)); + } + + [Test] + public void AddressProvider_IPv6Parsing() + { + // Test various IPv6 address formats + + // Simple loopback with port + var provider1 = new AddressProvider(new[] { "[::1]:9000" }); + Assert.That(provider1.CurrentHost, Is.EqualTo("[::1]")); + Assert.That(provider1.CurrentPort, Is.EqualTo(9000)); + + // Full IPv6 address with port + var provider2 = new AddressProvider(new[] { "[2001:db8::1]:9000" }); + Assert.That(provider2.CurrentHost, Is.EqualTo("[2001:db8::1]")); + Assert.That(provider2.CurrentPort, Is.EqualTo(9000)); + + // IPv6 with many colons + var provider3 = new AddressProvider(new[] { "[fe80::1:2:3:4]:8080" }); + Assert.That(provider3.CurrentHost, Is.EqualTo("[fe80::1:2:3:4]")); + Assert.That(provider3.CurrentPort, Is.EqualTo(8080)); + + // IPv6 without port (should return -1 for port) + var provider4 = new AddressProvider(new[] { "[::1]" }); + Assert.That(provider4.CurrentHost, Is.EqualTo("[::1]")); + Assert.That(provider4.CurrentPort, Is.EqualTo(-1)); + + // IPv6 with different port numbers + var provider5 = new AddressProvider(new[] { "[::1]:29000" }); + Assert.That(provider5.CurrentHost, Is.EqualTo("[::1]")); + Assert.That(provider5.CurrentPort, Is.EqualTo(29000)); + } + + [Test] + public void AddressProvider_SingleAddress() + { + // Test AddressProvider with single address + var provider = new AddressProvider(new[] { "localhost:9000" }); + + Assert.That(provider.CurrentAddress, Is.EqualTo("localhost:9000")); + Assert.That(provider.AddressCount, Is.EqualTo(1)); + Assert.That(provider.HasMultipleAddresses, Is.False); + + // Rotating with single address should return same address + provider.RotateToNextAddress(); + Assert.That(provider.CurrentAddress, Is.EqualTo("localhost:9000")); + } +} diff --git a/src/net-questdb-client-tests/QuestDbIntegrationTests.cs b/src/net-questdb-client-tests/QuestDbIntegrationTests.cs new file mode 100644 index 0000000..68f40ed --- /dev/null +++ b/src/net-questdb-client-tests/QuestDbIntegrationTests.cs @@ -0,0 +1,588 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2024 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +using System.Text.Json; +using NUnit.Framework; +using QuestDB; + +namespace net_questdb_client_tests; + +/// +/// Integration tests against a real QuestDB instance running in Docker. +/// Requires Docker to be installed and running. +/// +[TestFixture] +public class QuestDbIntegrationTests +{ + [OneTimeSetUp] + public async Task SetUpFixture() + { + _questDb = new QuestDbManager(IlpPort, HttpPort); + await _questDb.StartAsync(); + } + + [OneTimeTearDown] + public async Task TearDownFixture() + { + if (_questDb != null) + { + await _questDb.StopAsync(); + await _questDb.DisposeAsync(); + } + } + + private QuestDbManager? _questDb; + private const int IlpPort = 19009; + private const int HttpPort = 19000; + + [Test] + public async Task CanSendDataOverHttp() + { + var httpEndpoint = _questDb!.GetHttpEndpoint(); + using var sender = Sender.New($"http::addr={httpEndpoint};auto_flush=off;"); + + // Send test data + await sender + .Table("test_http") + .Symbol("tag", "test") + .Column("value", 42L) + .AtAsync(DateTime.UtcNow); + await sender.SendAsync(); + + // Verify data was written + await VerifyTableHasDataAsync("test_http"); + } + + [Test] + public async Task CanSendDataOverIlp() + { + var ilpEndpoint = _questDb!.GetIlpEndpoint(); + using var sender = Sender.New($"tcp::addr={ilpEndpoint};auto_flush=off;"); + + // Send test data + await sender + .Table("test_ilp") + .Symbol("tag", "test") + .Column("value", 123L) + .AtAsync(DateTime.UtcNow); + await sender.SendAsync(); + + // Verify data was written + await VerifyTableHasDataAsync("test_ilp"); + } + + [Test] + public async Task CanSendMultipleRows() + { + var httpEndpoint = _questDb!.GetHttpEndpoint(); + using var sender = Sender.New($"http::addr={httpEndpoint};auto_flush=off;"); + + // Send multiple rows + for (var i = 0; i < 10; i++) + { + await sender + .Table("test_multiple_rows") + .Symbol("tag", $"test_{i}") + .Column("value", (long)(i * 10)) + .AtAsync(DateTime.UtcNow); + } + + await sender.SendAsync(); + + // Verify all rows were written + var rowCount = await GetTableRowCountAsync("test_multiple_rows"); + Assert.That(rowCount, Is.GreaterThanOrEqualTo(10), "Expected at least 10 rows"); + } + + [Test] + public async Task CanSendDifferentDataTypes() + { + var httpEndpoint = _questDb!.GetHttpEndpoint(); + using var sender = Sender.New($"http::addr={httpEndpoint};auto_flush=off;"); + + var now = DateTime.UtcNow; + + // Send different data types + await sender + .Table("test_data_types") + .Symbol("symbol_col", "test") + .Column("long_col", 42L) + .Column("double_col", 3.14) + .Column("string_col", "hello world") + .Column("bool_col", true) + .AtAsync(now); + + await sender.SendAsync(); + + // Verify the row exists + + long rowCount = 0; + var retries = 10; + for (var i = 0; i < retries; i++) + { + rowCount = await GetTableRowCountAsync("test_data_types"); + if (rowCount != 0) + { + break; + } + + await Task.Delay(500); + } + + Assert.That(rowCount, Is.GreaterThanOrEqualTo(1)); + } + + [Test] + public async Task MultiUrlFallback() + { + // Test that the client properly handles multiple URLs with fallback + var httpEndpoint = _questDb!.GetHttpEndpoint(); + var badEndpoint = "http://localhost:19001"; // Non-existent endpoint + + // The client should try the bad endpoint first, then fallback to the good one + using var sender = Sender.New( + $"http::addr={badEndpoint};addr={httpEndpoint};auto_flush=off;"); + + await sender + .Table("test_multi_url") + .Symbol("tag", "fallback") + .Column("value", 999L) + .AtAsync(DateTime.UtcNow); + + await sender.SendAsync(); + + // Verify data was written despite the bad endpoint + await VerifyTableHasDataAsync("test_multi_url"); + } + + [Test] + public async Task CanAutoFlush() + { + var httpEndpoint = _questDb!.GetHttpEndpoint(); + using var sender = Sender.New( + $"http::addr={httpEndpoint};auto_flush=on;auto_flush_rows=1;"); + + // Send data - should auto-flush due to auto_flush_rows=1 + await sender + .Table("test_auto_flush") + .Symbol("tag", "test") + .Column("value", 777L) + .AtAsync(DateTime.UtcNow); + + // Give it a moment to flush + await Task.Delay(100); + + // Verify data was written + await VerifyTableHasDataAsync("test_auto_flush"); + } + + [Test] + public async Task SendRowsWhileRestartingDatabase() + { + const int rowsPerBatch = 10; + const int numBatches = 5; + const int expectedTotalRows = rowsPerBatch * numBatches; + + // Create a persistent Docker volume for the test database + var volumeName = $"questdb-test-vol-{Guid.NewGuid().ToString().Substring(0, 8)}"; + + // Use a separate QuestDB instance for this chaos test to avoid conflicts + var testDb = new QuestDbManager(29009, 29000); + testDb.SetVolume(volumeName); + try + { + await testDb.StartAsync(); + + var httpEndpoint = testDb.GetHttpEndpoint(); + using var sender = Sender.New( + $"http::addr={httpEndpoint};auto_flush=off;retry_timeout=60000;"); + + var batchesSent = 0; + var sendLock = new object(); + + // Task that restarts the database + var restartTask = Task.Run(async () => + { + // Allow first batch to be sent while database is up + await Task.Delay(600); + + // Perform restart cycles + for (var i = 0; i < 2; i++) + { + TestContext.WriteLine($"Stopping test database (cycle {i + 1})"); + await testDb.StopAsync(); + + // Database is down - sender will retry + await Task.Delay(1200); + + TestContext.WriteLine($"Starting test database (cycle {i + 1})"); + await testDb.StartAsync(); + + // Wait for client to detect database is back up + await Task.Delay(800); + } + + TestContext.WriteLine("Test database restart cycles complete"); + }); + + // Task that sends rows continuously + var sendTask = Task.Run(async () => + { + for (var batch = 0; batch < numBatches; batch++) + { + try + { + // Build batch of rows + for (var i = 0; i < rowsPerBatch; i++) + { + var rowId = batch * rowsPerBatch + i; + await sender + .Table("test_chaos") + .Symbol("batch", $"batch_{batch}") + .Column("row_id", (long)rowId) + .Column("value", (double)(rowId * 100)) + .AtAsync(DateTime.UtcNow); + } + + // Send the batch + TestContext.WriteLine($"Sending batch {batch}"); + await sender.SendAsync(); + + lock (sendLock) + { + batchesSent++; + } + + TestContext.WriteLine($"Batch {batch} sent successfully"); + + // Wait before next batch + await Task.Delay(500); + } + catch (Exception ex) + { + TestContext.WriteLine($"Error sending batch {batch}: {ex.GetType().Name} - {ex.Message}"); + throw; + } + } + + TestContext.WriteLine($"All batches sent. Total: {batchesSent}"); + }); + + // Wait for both tasks to complete + await Task.WhenAll(sendTask, restartTask); + + // Wait for final data to be written + await Task.Delay(2000); + + // Query the row count, with retries + long actualRowCount = 0; + var maxAttempts = 20; + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + try + { + using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5), }; + var response = await client.GetAsync($"{httpEndpoint}/exec?query=test_chaos"); + + if (response.IsSuccessStatusCode) + { + var content = await response.Content.ReadAsStringAsync(); + var json = JsonDocument.Parse(content); + if (json.RootElement.TryGetProperty("count", out var countProp)) + { + actualRowCount = countProp.GetInt64(); + TestContext.WriteLine($"Attempt {attempt + 1}: Found {actualRowCount} rows"); + if (actualRowCount >= expectedTotalRows) + { + break; + } + } + } + } + catch (Exception ex) + { + TestContext.WriteLine($"Attempt {attempt + 1}: Query failed - {ex.Message}"); + } + + await Task.Delay(500); + } + + // Assert that all rows made it + Assert.That( + actualRowCount, + Is.GreaterThanOrEqualTo(expectedTotalRows), + $"Expected {expectedTotalRows} rows but found {actualRowCount}. " + + $"Successfully sent {batchesSent} batches of {rowsPerBatch} rows each"); + } + finally + { + // Cleanup + await testDb.StopAsync(); + await testDb.DisposeAsync(); + } + } + + [Test] + public async Task SendRowsWithMultiDatabaseFailover() + { + const int rowsPerBatch = 10; + const int numBatches = 5; + const int expectedTotalRows = rowsPerBatch * numBatches; + + // Create two separate databases with persistent volumes + var volume1 = $"questdb-test-vol-db1-{Guid.NewGuid().ToString().Substring(0, 8)}"; + var volume2 = $"questdb-test-vol-db2-{Guid.NewGuid().ToString().Substring(0, 8)}"; + + var testDb1 = new QuestDbManager(29009, 29000); + var testDb2 = new QuestDbManager(29019, 29010); + testDb1.SetVolume(volume1); + testDb2.SetVolume(volume2); + + try + { + // Start both databases + await testDb1.StartAsync(); + await testDb2.StartAsync(); + + var endpoint1 = testDb1.GetHttpEndpoint(); + var endpoint2 = testDb2.GetHttpEndpoint(); + + // Create a single sender with both endpoints for failover + using var sender = Sender.New( + $"http::addr={endpoint1};addr={endpoint2};auto_flush=off;retry_timeout=60000;"); + + var batchesSent = 0; + var sendLock = new object(); + + // Task that restarts DB1 after sends complete + var restartDb1Task = Task.Run(async () => + { + // Wait for all sends to complete (5 batches * 500ms + 100ms buffer) + await Task.Delay(2600); + + TestContext.WriteLine("Stopping database 1"); + await testDb1.StopAsync(); + await Task.Delay(1000); + + TestContext.WriteLine("Starting database 1"); + await testDb1.StartAsync(); + + TestContext.WriteLine("Database 1 restart complete"); + }); + + // Task that restarts DB2 after DB1 restart completes + var restartDb2Task = Task.Run(async () => + { + // Wait for DB1 restart to complete before restarting DB2 + await Task.Delay(4000); + + TestContext.WriteLine("Stopping database 2"); + await testDb2.StopAsync(); + await Task.Delay(1000); + + TestContext.WriteLine("Starting database 2"); + await testDb2.StartAsync(); + + TestContext.WriteLine("Database 2 restart complete"); + }); + + // Task that sends rows to both databases via multi-address sender + var sendTask = Task.Run(async () => + { + for (var batch = 0; batch < numBatches; batch++) + { + try + { + // Build batch of rows + for (var i = 0; i < rowsPerBatch; i++) + { + var rowId = batch * rowsPerBatch + i; + await sender + .Table("test_multi_db") + .Symbol("batch", $"batch_{batch}") + .Column("row_id", (long)rowId) + .Column("value", (double)(rowId * 100)) + .AtAsync(DateTime.UtcNow); + } + + TestContext.WriteLine($"Sending batch {batch}"); + await sender.SendAsync(); + + lock (sendLock) + { + batchesSent++; + } + + TestContext.WriteLine($"Batch {batch} sent successfully"); + + await Task.Delay(500); + } + catch (Exception ex) + { + TestContext.WriteLine($"Error sending batch {batch}: {ex.GetType().Name} - {ex.Message}"); + throw; + } + } + + TestContext.WriteLine($"All batches sent. Total: {batchesSent}"); + }); + + // Wait for all tasks + await Task.WhenAll(sendTask, restartDb1Task, restartDb2Task); + await Task.Delay(2000); + + // Query both databases and sum the row counts + var maxAttempts = 20; + long count1 = 0; + long count2 = 0; + + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + try + { + using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5), }; + + // Query database 1 + try + { + var response1 = await client.GetAsync($"{endpoint1}/exec?query=test_multi_db"); + if (response1.IsSuccessStatusCode) + { + var content1 = await response1.Content.ReadAsStringAsync(); + var json1 = JsonDocument.Parse(content1); + if (json1.RootElement.TryGetProperty("count", out var countProp1)) + { + count1 = countProp1.GetInt64(); + } + } + } + catch (Exception ex) + { + TestContext.WriteLine($"Attempt {attempt + 1}: Query DB1 failed - {ex.Message}"); + } + + // Query database 2 + try + { + var response2 = await client.GetAsync($"{endpoint2}/exec?query=test_multi_db"); + if (response2.IsSuccessStatusCode) + { + var content2 = await response2.Content.ReadAsStringAsync(); + var json2 = JsonDocument.Parse(content2); + if (json2.RootElement.TryGetProperty("count", out var countProp2)) + { + count2 = countProp2.GetInt64(); + } + } + } + catch (Exception ex) + { + TestContext.WriteLine($"Attempt {attempt + 1}: Query DB2 failed - {ex.Message}"); + } + + var totalRowCount = count1 + count2; + TestContext.WriteLine($"Attempt {attempt + 1}: DB1={count1}, DB2={count2}, Total={totalRowCount}"); + + if (totalRowCount >= expectedTotalRows) + { + break; + } + } + catch (Exception ex) + { + TestContext.WriteLine($"Attempt {attempt + 1}: Error - {ex.Message}"); + } + + await Task.Delay(500); + } + + var totalRowCount2 = count1 + count2; + + // Assert that the sum of both databases equals expected total + Assert.That( + totalRowCount2, + Is.EqualTo(expectedTotalRows), + $"Expected {expectedTotalRows} total rows across both databases but found {totalRowCount2}. " + + $"Successfully sent {batchesSent} batches of {rowsPerBatch} rows each"); + } + finally + { + // Cleanup + await testDb1.StopAsync(); + await testDb2.StopAsync(); + await testDb1.DisposeAsync(); + await testDb2.DisposeAsync(); + } + } + + private async Task VerifyTableHasDataAsync(string tableName) + { + var value = await GetTableRowCountAsync(tableName); + Assert.That(value, Is.GreaterThan(0)); + } + + private async Task GetTableRowCountAsync(string tableName) + { + var httpEndpoint = _questDb!.GetHttpEndpoint(); + using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(10), }; + + // Retry a few times to allow for write latency + var attempts = 0; + const int maxAttempts = 10; + + while (attempts < maxAttempts) + { + try + { + var response = await client.GetAsync( + $"{httpEndpoint}/exec?query={tableName}"); + + if (response.IsSuccessStatusCode) + { + var content = await response.Content.ReadAsStringAsync(); + var json = JsonDocument.Parse(content); + if ( + json.RootElement.TryGetProperty("count", out var count)) + { + var rowCount = count.GetInt64(); + if (rowCount > 0) + { + return rowCount; + } + } + } + } + catch + { + // Retry + } + + await Task.Delay(100); + attempts++; + } + + Assert.Fail($"Table {tableName} has no data after {maxAttempts} attempts"); + return 0; + } +} \ No newline at end of file diff --git a/src/net-questdb-client-tests/QuestDbManager.cs b/src/net-questdb-client-tests/QuestDbManager.cs new file mode 100644 index 0000000..fc5ffc8 --- /dev/null +++ b/src/net-questdb-client-tests/QuestDbManager.cs @@ -0,0 +1,306 @@ +using System.Diagnostics; + +namespace net_questdb_client_tests; + +/// +/// Manages QuestDB server lifecycle for integration tests using Docker. +/// Handles pulling, starting, and stopping QuestDB container instances. +/// +public class QuestDbManager : IAsyncDisposable +{ + private const string DockerImage = "questdb/questdb:latest"; + private const string ContainerNamePrefix = "questdb-test-"; + private readonly string _containerName; + private readonly HttpClient _httpClient; + private readonly int _httpPort; + + private readonly int _port; + private string? _containerId; + private string? _volumeName; + + /// + /// Initializes a new instance of the QuestDbManager. + /// + /// ILP port (default: 9009) + /// HTTP port (default: 9000) + public QuestDbManager(int port = 9009, int httpPort = 9000) + { + _port = port; + _httpPort = httpPort; + _containerName = $"{ContainerNamePrefix}{port}-{httpPort}-{Guid.NewGuid().ToString().Substring(0, 8)}"; + _httpClient = new HttpClient { Timeout = TimeSpan.FromSeconds(5), }; + } + + public bool IsRunning { get; private set; } + + /// + /// Cleanup resources. + /// + public async ValueTask DisposeAsync() + { + await StopAsync(); + + // Clean up Docker volume if one was used + if (!string.IsNullOrEmpty(_volumeName)) + { + await RunDockerCommandAsync($"volume rm {_volumeName}"); + } + + _httpClient?.Dispose(); + } + + /// + /// Sets a Docker volume to be used for persistent storage. + /// + public void SetVolume(string volumeName) + { + _volumeName = volumeName; + } + + /// + /// Ensures Docker is available. + /// + public async Task EnsureDockerAvailableAsync() + { + try + { + var (exitCode, output) = await RunDockerCommandAsync("--version"); + if (exitCode != 0) + { + throw new InvalidOperationException("Docker is not available or not working properly"); + } + + Console.WriteLine($"Docker is available: {output.Trim()}"); + } + catch (Exception ex) + { + throw new InvalidOperationException( + "Docker is required to run integration tests. " + + "Please install Docker from https://docs.docker.com/get-docker/", + ex); + } + } + + /// + /// Ensures QuestDB Docker image is available (uses local if exists, otherwise pulls latest). + /// + public async Task PullImageAsync() + { + // Check if image already exists locally + if (await ImageExistsAsync()) + { + Console.WriteLine($"Docker image already exists locally: {DockerImage}"); + return; + } + + Console.WriteLine($"Pulling Docker image {DockerImage}..."); + var (exitCode, output) = await RunDockerCommandAsync($"pull {DockerImage}"); + if (exitCode != 0) + { + throw new InvalidOperationException($"Failed to pull Docker image: {output}"); + } + + Console.WriteLine("Docker image pulled successfully"); + } + + /// + /// Checks if the QuestDB Docker image exists locally. + /// + private async Task ImageExistsAsync() + { + // Use 'docker images' to check if image exists + // Format: docker images --filter "reference=questdb/questdb:latest" --quiet + var (exitCode, output) = await RunDockerCommandAsync($"images --filter \"reference={DockerImage}\" --quiet"); + + // If the image exists, output will contain the image ID + // If it doesn't exist, output will be empty + return exitCode == 0 && !string.IsNullOrWhiteSpace(output); + } + + /// + /// Starts the QuestDB container. + /// + public async Task StartAsync() + { + if (IsRunning) + { + Console.WriteLine("QuestDB is already running"); + return; + } + + await EnsureDockerAvailableAsync(); + + // Clean up any existing containers using these ports + await CleanupExistingContainersAsync(); + + await PullImageAsync(); + + Console.WriteLine($"Starting QuestDB container: {_containerName}"); + Console.WriteLine($"HTTP port: {_httpPort}, ILP port: {_port}"); + + // Run container with port mappings + // -d: detached mode + // -p: port mappings + // --name: container name + // -v: volume mount (if specified) + var volumeArg = string.IsNullOrEmpty(_volumeName) + ? string.Empty + : $"-v {_volumeName}:/var/lib/questdb "; + + var runArgs = $"run -d " + + $"-p {_httpPort}:9000 " + + $"-p {_port}:9009 " + + $"--name {_containerName} " + + volumeArg + + DockerImage; + + var (exitCode, output) = await RunDockerCommandAsync(runArgs); + if (exitCode != 0) + { + throw new InvalidOperationException($"Failed to start QuestDB container: {output}"); + } + + _containerId = output.Trim(); + Console.WriteLine($"QuestDB container started: {_containerId}"); + IsRunning = true; + + // Wait for QuestDB to be ready + await WaitForQuestDbAsync(); + } + + /// + /// Stops the QuestDB container. + /// + public async Task StopAsync() + { + if (!IsRunning || string.IsNullOrEmpty(_containerId)) + { + return; + } + + Console.WriteLine($"Stopping QuestDB container: {_containerName}"); + + // Stop the container (with 10 second timeout) + var (exitCode, output) = await RunDockerCommandAsync($"stop -t 10 {_containerName}"); + if (exitCode != 0) + { + Console.WriteLine($"Warning: Failed to stop container gracefully: {output}"); + // Try force remove + await RunDockerCommandAsync($"rm -f {_containerName}"); + } + + IsRunning = false; + _containerId = null; + Console.WriteLine("QuestDB container stopped"); + } + + /// + /// Gets the HTTP endpoint for QuestDB. + /// + public string GetHttpEndpoint() + { + return $"http://localhost:{_httpPort}"; + } + + /// + /// Gets the ILP endpoint for QuestDB. + /// + public string GetIlpEndpoint() + { + return $"localhost:{_port}"; + } + + /// + /// Waits for QuestDB to be ready. + /// + private async Task WaitForQuestDbAsync() + { + const int maxAttempts = 30; + var attempts = 0; + + while (attempts < maxAttempts) + { + try + { + var response = await _httpClient.GetAsync($"{GetHttpEndpoint()}/settings"); + if (response.IsSuccessStatusCode) + { + Console.WriteLine("QuestDB is ready"); + return; + } + } + catch + { + // Ignore and retry + } + + await Task.Delay(1000); + attempts++; + } + + throw new TimeoutException($"QuestDB failed to start within {maxAttempts} seconds"); + } + + private async Task CleanupExistingContainersAsync() + { + Console.WriteLine($"Checking for existing containers on ports {_httpPort}/{_port}..."); + + // Get list of all containers (running and stopped) + var (exitCode, output) = await RunDockerCommandAsync("ps -a --format \"{{.Names}}\""); + if (exitCode != 0) + { + return; // Silently ignore errors listing containers + } + + var containerNames = output.Split('\n', StringSplitOptions.RemoveEmptyEntries); + + // Stop and remove any QuestDB test containers + foreach (var rawName in containerNames) + { + // Trim the name to remove trailing \r or whitespace + var name = rawName.Trim(); + + // Look for containers with matching port pattern: questdb-test-{port}-{httpPort}-* + if (name.Contains(ContainerNamePrefix, StringComparison.Ordinal) && + (name.Contains($"-{_port}-{_httpPort}-", StringComparison.Ordinal) || + name.Contains($"-{_httpPort}-{_port}-", StringComparison.Ordinal))) + { + Console.WriteLine($"Cleaning up existing container: {name}"); + + // Stop the container + await RunDockerCommandAsync($"stop -t 5 {name}"); + + // Remove the container + await RunDockerCommandAsync($"rm {name}"); + } + } + } + + private async Task<(int ExitCode, string Output)> RunDockerCommandAsync(string arguments) + { + var startInfo = new ProcessStartInfo + { + FileName = "docker", + Arguments = arguments, + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true, + }; + + var process = Process.Start(startInfo); + if (process == null) + { + throw new InvalidOperationException("Failed to start docker command"); + } + + var outputTask = process.StandardOutput.ReadToEndAsync(); + var errorTask = process.StandardError.ReadToEndAsync(); + await Task.WhenAll(outputTask, errorTask); + var output = await outputTask; + var error = await errorTask; + await process.WaitForExitAsync(); + + return (process.ExitCode, output + error); + } +} \ No newline at end of file diff --git a/src/net-questdb-client-tests/TcpTests.cs b/src/net-questdb-client-tests/TcpTests.cs index 9d4e4df..5a5c5d6 100644 --- a/src/net-questdb-client-tests/TcpTests.cs +++ b/src/net-questdb-client-tests/TcpTests.cs @@ -38,6 +38,7 @@ namespace net_questdb_client_tests; +[SetCulture("en-us")] public class TcpTests { private readonly IPAddress _host = IPAddress.Loopback; diff --git a/src/net-questdb-client/Buffers/BufferV1.cs b/src/net-questdb-client/Buffers/BufferV1.cs index c39ea27..7a61a0c 100644 --- a/src/net-questdb-client/Buffers/BufferV1.cs +++ b/src/net-questdb-client/Buffers/BufferV1.cs @@ -505,6 +505,25 @@ public virtual IBuffer Column(ReadOnlySpan name, decimal value) throw new IngressError(ErrorCode.ProtocolVersionError, "Protocol Version does not support DECIMAL types"); } + public IBuffer Column(ReadOnlySpan name, char value) + { + Span span = stackalloc char[1]; + span[0] = value; + return Column(name, span); + } + + public IBuffer Column(ReadOnlySpan name, Guid value) + { + Span span = stackalloc char[36]; + value.TryFormat(span, out var charsWritten); + if (charsWritten != 36) + { + throw new IngressError(ErrorCode.InvalidApiCall, "Could not convert guid to string"); + } + + return Column(name, span); + } + /// /// Advance the current buffer write position and the overall length by a given number of bytes. diff --git a/src/net-questdb-client/Buffers/IBuffer.cs b/src/net-questdb-client/Buffers/IBuffer.cs index 541dd3c..1ea82b2 100644 --- a/src/net-questdb-client/Buffers/IBuffer.cs +++ b/src/net-questdb-client/Buffers/IBuffer.cs @@ -296,4 +296,20 @@ public interface IBuffer /// if used with protocol version 1 or 2. /// public IBuffer Column(ReadOnlySpan name, decimal value); + + /// + /// Adds a character column with the specified name and value to the current row. + /// + /// The column name. + /// The character value to store in the column. + /// The buffer instance for method chaining. + public IBuffer Column(ReadOnlySpan name, char value); + + /// + /// Adds a GUID column with the specified name and value to the current row. + /// + /// The column name. + /// The GUID value to store in the column. + /// The buffer instance for method chaining. + public IBuffer Column(ReadOnlySpan name, Guid value); } \ No newline at end of file diff --git a/src/net-questdb-client/Senders/AbstractSender.cs b/src/net-questdb-client/Senders/AbstractSender.cs index b557c3c..bbdb8c6 100644 --- a/src/net-questdb-client/Senders/AbstractSender.cs +++ b/src/net-questdb-client/Senders/AbstractSender.cs @@ -272,12 +272,27 @@ public void Clear() /// The column name. /// The decimal value to write, or null to emit a null for the column. /// The same instance for fluent chaining. + /// public ISender Column(ReadOnlySpan name, decimal value) { Buffer.Column(name, value); return this; } + /// + public ISender Column(ReadOnlySpan name, Guid value) + { + Buffer.Column(name, value); + return this; + } + + /// + public ISender Column(ReadOnlySpan name, char value) + { + Buffer.Column(name, value); + return this; + } + public ISender Column(ReadOnlySpan name, T[] value) where T : struct { Buffer.Column(name, value); diff --git a/src/net-questdb-client/Senders/HttpSender.cs b/src/net-questdb-client/Senders/HttpSender.cs index 5553332..3370ade 100644 --- a/src/net-questdb-client/Senders/HttpSender.cs +++ b/src/net-questdb-client/Senders/HttpSender.cs @@ -45,25 +45,40 @@ namespace QuestDB.Senders; internal class HttpSender : AbstractSender { /// - /// Instance-specific for sending data to QuestDB. + /// Cache of instances, one per address for multi-URL support. + /// Avoids recreating clients on each rotation. /// - private HttpClient _client = null!; + private readonly Dictionary _clientCache = new(); /// - /// Instance specific for use constructing . + /// Cache of instances, one per address. + /// Each address has its own handler to avoid TLS TargetHost conflicts when rotating addresses. /// - private SocketsHttpHandler _handler = null!; + private readonly Dictionary _handlerCache = new(); private readonly Func _sendRequestFactory; private readonly Func _settingRequestFactory; /// - /// Initializes a new HttpSender configured according to the provided options. + /// Manages round-robin address rotation for failover. + /// + private AddressProvider _addressProvider = null!; + + /// + /// Current reference from the cache. + /// + private HttpClient _client = null!; + + /// + /// Initializes a new HttpSender configured according to the provided options. /// - /// Configuration for the sender, including connection endpoint, TLS and certificate settings, buffering and protocol parameters, authentication, and timeouts. + /// + /// Configuration for the sender, including connection endpoint, TLS and certificate settings, + /// buffering and protocol parameters, authentication, and timeouts. + /// public HttpSender(SenderOptions options) { - _sendRequestFactory = GenerateRequest; + _sendRequestFactory = GenerateRequest; _settingRequestFactory = GenerateSettingsRequest; Options = options; @@ -71,7 +86,7 @@ public HttpSender(SenderOptions options) } /// - /// Initializes a new instance of by parsing a configuration string. + /// Initializes a new instance of by parsing a configuration string. /// /// Configuration string in QuestDB connection string format. public HttpSender(string confStr) : this(new SenderOptions(confStr)) @@ -79,36 +94,43 @@ public HttpSender(string confStr) : this(new SenderOptions(confStr)) } /// - /// Configure and initialize the SocketsHttpHandler and HttpClient, set TLS and authentication options, determine the Line Protocol version (probing /settings when set to Auto), and create the internal send buffer. + /// Configure and initialize the SocketsHttpHandler and HttpClient, set TLS and authentication options, determine the + /// Line Protocol version (probing /settings when set to Auto), and create the internal send buffer. /// /// - /// - Applies pool and connection settings from Options. - /// - When using HTTPS, configures TLS protocols, optional remote-certificate validation override (when tls_verify is unsafe_off), optional custom root CA installation, and optional client certificates. - /// - Sets connection timeout, PreAuthenticate, BaseAddress, and disables HttpClient timeout. - /// - Adds Basic or Bearer Authorization header when credentials or token are provided. - /// - If protocol_version is Auto, probes the server's /settings with a 1-second retry window to select the highest mutually supported protocol up to V3, falling back to V1 on errors or unexpected responses. - /// - Initializes the Buffer with init_buf_size, max_name_len, max_buf_size, and the chosen protocol version. + /// - Applies pool and connection settings from Options. + /// - When using HTTPS, configures TLS protocols, optional remote-certificate validation override (when tls_verify is + /// unsafe_off), optional custom root CA installation, and optional client certificates. + /// - Sets connection timeout, PreAuthenticate, BaseAddress, and disables HttpClient timeout. + /// - Adds Basic or Bearer Authorization header when credentials or token are provided. + /// - If protocol_version is Auto, probes the server's /settings with a 1-second retry window to select the highest + /// mutually supported protocol up to V3, falling back to V1 on errors or unexpected responses. + /// - Initializes the Buffer with init_buf_size, max_name_len, max_buf_size, and the chosen protocol version. /// - private void Build() + /// + /// Creates a configured for a specific host. + /// Each handler is isolated to prevent TLS TargetHost conflicts between different addresses. + /// + private SocketsHttpHandler CreateHandler(string host) { - _handler = new SocketsHttpHandler + var handler = new SocketsHttpHandler { PooledConnectionIdleTimeout = Options.pool_timeout, - MaxConnectionsPerServer = 1, + MaxConnectionsPerServer = 1, }; if (Options.protocol == ProtocolType.https) { - _handler.SslOptions.TargetHost = Options.Host; - _handler.SslOptions.EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13; + handler.SslOptions.TargetHost = host; + handler.SslOptions.EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13; if (Options.tls_verify == TlsVerifyType.unsafe_off) { - _handler.SslOptions.RemoteCertificateValidationCallback += (_, _, _, _) => true; + handler.SslOptions.RemoteCertificateValidationCallback += (_, _, _, _) => true; } else { - _handler.SslOptions.RemoteCertificateValidationCallback = + handler.SslOptions.RemoteCertificateValidationCallback = (_, certificate, chain, errors) => { if ((errors & ~SslPolicyErrors.RemoteCertificateChainErrors) != 0) @@ -129,38 +151,29 @@ private void Build() if (!string.IsNullOrEmpty(Options.tls_roots)) { - _handler.SslOptions.ClientCertificates ??= new X509Certificate2Collection(); - _handler.SslOptions.ClientCertificates.Add( + handler.SslOptions.ClientCertificates ??= new X509Certificate2Collection(); + handler.SslOptions.ClientCertificates.Add( X509Certificate2.CreateFromPemFile(Options.tls_roots!, Options.tls_roots_password)); } if (Options.client_cert is not null) { - _handler.SslOptions.ClientCertificates ??= new X509Certificate2Collection(); - _handler.SslOptions.ClientCertificates.Add(Options.client_cert); + handler.SslOptions.ClientCertificates ??= new X509Certificate2Collection(); + handler.SslOptions.ClientCertificates.Add(Options.client_cert); } } - _handler.ConnectTimeout = Options.auth_timeout; - _handler.PreAuthenticate = true; + handler.ConnectTimeout = Options.auth_timeout; + handler.PreAuthenticate = true; + return handler; + } - _client = new HttpClient(_handler); - var uri = new UriBuilder(Options.protocol.ToString(), Options.Host, Options.Port); - _client.BaseAddress = uri.Uri; - _client.Timeout = Timeout.InfiniteTimeSpan; + private void Build() + { + _addressProvider = new AddressProvider(Options.addresses); - if (!string.IsNullOrEmpty(Options.username) && !string.IsNullOrEmpty(Options.password)) - { - _client.DefaultRequestHeaders.Authorization - = new AuthenticationHeaderValue("Basic", - Convert.ToBase64String( - Encoding.ASCII.GetBytes( - $"{Options.username}:{Options.password}"))); - } - else if (!string.IsNullOrEmpty(Options.token)) - { - _client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", Options.token); - } + // Create and cache the initial client + _client = GetClientForCurrentAddress(); var protocolVersion = Options.protocol_version; @@ -168,39 +181,34 @@ private void Build() { // We need to select the last version that both client and server support. // Other clients use 1 second timeout for "/settings", follow same practice here. - using var response = SendWithRetries(default, _settingRequestFactory, TimeSpan.FromSeconds(1)); - if (!response.IsSuccessStatusCode) + // Save the current address index to restore after probing (SendWithRetries may rotate) + var initialAddressIndex = _addressProvider.CurrentIndex; + try { - if (response.StatusCode == HttpStatusCode.NotFound) + using var response = SendWithRetries(default, _settingRequestFactory, TimeSpan.FromSeconds(1)); + if (!response.IsSuccessStatusCode) { protocolVersion = ProtocolVersion.V1; } - else - { - _client.Dispose(); - // Throw exception. - response.EnsureSuccessStatusCode(); - } - } - if (protocolVersion == ProtocolVersion.Auto) - { - try + if (protocolVersion == ProtocolVersion.Auto) { - var json = response.Content.ReadFromJsonAsync().Result!; + var json = response.Content.ReadFromJsonAsync().Result!; var versions = json.Config?.LineProtoSupportVersions!; protocolVersion = (ProtocolVersion)versions.Where(v => v <= (int)ProtocolVersion.V3).Max(); } - catch - { - protocolVersion = ProtocolVersion.V1; - } } - - if (protocolVersion == ProtocolVersion.Auto) + catch { protocolVersion = ProtocolVersion.V1; } + finally + { + // Restore the address index to avoid probe rotating the address + _addressProvider.CurrentIndex = initialAddressIndex; + // Update the client reference to match the restored address + _client = GetClientForCurrentAddress(); + } } Buffer = Buffers.Buffer.Create( @@ -212,19 +220,124 @@ private void Build() } /// - /// Creates an HTTP GET request to the /settings endpoint for querying server capabilities. + /// Creates a new HttpClient for the specified address with proper configuration. /// - /// A new configured for the /settings endpoint. + /// The address to create a client for. + /// A configured HttpClient for the given address. + private HttpClient CreateClientForAddress(string address) + { + // Determine the port to use + var port = AddressProvider.ParsePort(address); + if (port <= 0) + { + // Use protocol default if no port specified + port = Options.protocol switch + { + ProtocolType.http or ProtocolType.https => 9000, + ProtocolType.tcp or ProtocolType.tcps => 9009, + _ => 9000, + }; + } + + var host = address.Contains("//") + ? AddressProvider.ParseHost(address).Split("//")[1] + : AddressProvider.ParseHost(address); + + // Get or create a handler for this specific address + if (!_handlerCache.TryGetValue(address, out var handler)) + { + handler = CreateHandler(host); + _handlerCache[address] = handler; + } + + var client = new HttpClient(handler); + + var uri = new UriBuilder(Options.protocol.ToString(), host, port); + client.BaseAddress = uri.Uri; + client.Timeout = Timeout.InfiniteTimeSpan; + + // Apply authentication headers + if (!string.IsNullOrEmpty(Options.username) && !string.IsNullOrEmpty(Options.password)) + { + client.DefaultRequestHeaders.Authorization + = new AuthenticationHeaderValue("Basic", + Convert.ToBase64String( + Encoding.ASCII.GetBytes( + $"{Options.username}:{Options.password}"))); + } + else if (!string.IsNullOrEmpty(Options.token)) + { + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", Options.token); + } + + return client; + } + + /// + /// Gets or creates an HttpClient for the current address, caching it to avoid recreation on subsequent rotations. + /// + private HttpClient GetClientForCurrentAddress() + { + var address = _addressProvider.CurrentAddress; + + if (!_clientCache.TryGetValue(address, out var client)) + { + // Create and cache a new client for this address + client = CreateClientForAddress(address); + _clientCache[address] = client; + } + + _client = client; + return client; + } + + /// + /// Cleans up all cached HttpClient and SocketsHttpHandler instances except the ones for the current address. + /// Called when a successful response is received to avoid holding unnecessary resources. + /// + private void CleanupUnusedClients() + { + if (!_addressProvider.HasMultipleAddresses) + { + return; + } + + var currentAddress = _addressProvider.CurrentAddress; + var addressesToRemove = _clientCache.Keys + .Where(address => address != currentAddress) + .ToList(); + + foreach (var address in addressesToRemove) + { + if (_clientCache.TryGetValue(address, out var client)) + { + client.Dispose(); + _clientCache.Remove(address); + } + + if (_handlerCache.TryGetValue(address, out var handler)) + { + handler.Dispose(); + _handlerCache.Remove(address); + } + } + } + + /// + /// Creates an HTTP GET request to the /settings endpoint for querying server capabilities. + /// + /// A new configured for the /settings endpoint. private static HttpRequestMessage GenerateSettingsRequest() { return new HttpRequestMessage(HttpMethod.Get, "/settings"); } /// - /// Creates a new cancellation token source linked to the provided token and configured with the calculated request timeout. + /// Creates a new cancellation token source linked to the provided token and configured with the calculated request + /// timeout. /// /// Optional cancellation token to link. - /// A configured with the request timeout. + /// A configured with the request timeout. private CancellationTokenSource GenerateRequestCts(CancellationToken ct = default) { var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); @@ -233,9 +346,12 @@ private CancellationTokenSource GenerateRequestCts(CancellationToken ct = defaul } /// - /// Create an HTTP POST request targeting "/write" with the sender's buffer as the request body. + /// Create an HTTP POST request targeting "/write" with the sender's buffer as the request body. /// - /// An configured with the buffer as the request body, Content-Type set to "text/plain" with charset "utf-8", and optionally gzip-compressed. + /// + /// An configured with the buffer as the request body, Content-Type set to + /// "text/plain" with charset "utf-8", and optionally gzip-compressed. + /// private HttpRequestMessage GenerateRequest() { var request = new HttpRequestMessage(HttpMethod.Post, "/write") @@ -278,13 +394,13 @@ public override ISender Transaction(ReadOnlySpan tableName) if (WithinTransaction) { throw new IngressError(ErrorCode.InvalidApiCall, - "Cannot start another transaction - only one allowed at a time."); + "Cannot start another transaction - only one allowed at a time."); } if (Length > 0) { throw new IngressError(ErrorCode.InvalidApiCall, - "Buffer must be clear before you can start a transaction."); + "Buffer must be clear before you can start a transaction."); } Buffer.Transaction(tableName); @@ -343,14 +459,19 @@ public override void Rollback() } /// - /// Sends the current buffer synchronously to the server, applying configured retries and handling server-side errors. + /// Sends the current buffer synchronously to the server, applying configured retries and handling server-side errors. /// /// - /// Validates that a pending transaction is being committed before sending. If the buffer is empty this method returns immediately. - /// On success updates from the server response date; on failure sets to now. The buffer is always cleared after the operation. + /// Validates that a pending transaction is being committed before sending. If the buffer is empty this method returns + /// immediately. + /// On success updates from the server response date; on failure sets + /// to now. The buffer is always cleared after the operation. /// /// Cancellation token to cancel the send operation. - /// Thrown with if a transaction is open but not committing, or with for server/transport errors. + /// + /// Thrown with if a transaction is open but not + /// committing, or with for server/transport errors. + /// public override void Send(CancellationToken ct = default) { if (WithinTransaction && !CommittingTransaction) @@ -363,7 +484,7 @@ public override void Send(CancellationToken ct = default) return; } - bool success = false; + var success = false; try { using var response = SendWithRetries(ct, _sendRequestFactory, Options.retry_timeout); @@ -372,6 +493,7 @@ public override void Send(CancellationToken ct = default) if (response.IsSuccessStatusCode) { LastFlush = (response.Headers.Date ?? DateTime.UtcNow).UtcDateTime; + CleanupUnusedClients(); success = true; return; } @@ -405,20 +527,25 @@ public override void Send(CancellationToken ct = default) } /// - /// Sends an HTTP request produced by and retries on transient connection or server errors until a successful response is received or elapses. + /// Sends an HTTP request produced by and retries on transient connection or server + /// errors until a successful response is received or elapses. + /// When multiple addresses are configured and a retriable error occurs, rotates to the next address and retries. /// /// Cancellation token used to cancel the overall operation and linked to per-request timeouts. - /// Factory that produces a fresh for each attempt. + /// Factory that produces a fresh for each attempt. /// Maximum duration to keep retrying transient failures; retries are skipped if this is zero. - /// The final returned by the server for a successful request. - /// Thrown with when a connection could not be established within the allowed retries. - /// The caller is responsible for disposing the returned ./// + /// The final returned by the server for a successful request. + /// + /// Thrown with when a connection could not be + /// established within the allowed retries. + /// + /// The caller is responsible for disposing the returned ./// private HttpResponseMessage SendWithRetries(CancellationToken ct, Func requestFactory, - TimeSpan retryTimeout) + TimeSpan retryTimeout) { HttpResponseMessage? response = null; - CancellationTokenSource cts = GenerateRequestCts(ct); - HttpRequestMessage request = requestFactory(); + var cts = GenerateRequestCts(ct); + var request = requestFactory(); try { @@ -434,7 +561,7 @@ private HttpResponseMessage SendWithRetries(CancellationToken ct, Func TimeSpan.Zero) // retry if appropriate - error that's retriable, and retries are enabled { - if (response == null // if it was a cannot correct error + if (response == null // if it was a cannot correct error || (!response.IsSuccessStatusCode // or some other http error && IsRetriableError(response.StatusCode))) { @@ -444,9 +571,9 @@ private HttpResponseMessage SendWithRetries(CancellationToken ct, Func - /// Reads and deserializes a JSON error response from the HTTP response, then throws an with the error details. + /// Reads and deserializes a JSON error response from the HTTP response, then throws an + /// with the error details. /// /// The HTTP response containing a JSON error body. - /// Always thrown with ; the message combines the response reason phrase with the deserialized JSON error or raw response text. + /// + /// Always thrown with ; the message combines the + /// response reason phrase with the deserialized JSON error or raw response text. + /// private void HandleErrorJson(HttpResponseMessage response) { using var respStream = response.Content.ReadAsStream(); @@ -511,10 +650,14 @@ private void HandleErrorJson(HttpResponseMessage response) } /// - /// Read an error payload from the HTTP response (JSON if possible, otherwise raw text) and throw an IngressError containing the server reason and the parsed error details. + /// Read an error payload from the HTTP response (JSON if possible, otherwise raw text) and throw an IngressError + /// containing the server reason and the parsed error details. /// /// The HTTP response containing a JSON or plain-text error body. - /// Always thrown with ; the message contains response.ReasonPhrase followed by the deserialized JSON error or the raw response body. + /// + /// Always thrown with ; the message contains + /// response.ReasonPhrase followed by the deserialized JSON error or the raw response body. + /// private async Task HandleErrorJsonAsync(HttpResponseMessage response) { await using var respStream = await response.Content.ReadAsStreamAsync(); @@ -526,7 +669,7 @@ private async Task HandleErrorJsonAsync(HttpResponseMessage response) catch (JsonException) { using var strReader = new StreamReader(respStream); - var errorStr = await strReader.ReadToEndAsync(); + var errorStr = await strReader.ReadToEndAsync(); throw new IngressError(ErrorCode.ServerFlushError, $"{response.ReasonPhrase}. {errorStr}"); } } @@ -544,14 +687,14 @@ public override async Task SendAsync(CancellationToken ct = default) return; } - HttpRequestMessage? request = null; - CancellationTokenSource? cts = null; - HttpResponseMessage? response = null; + HttpRequestMessage? request = null; + CancellationTokenSource? cts = null; + HttpResponseMessage? response = null; try { request = GenerateRequest(); - cts = GenerateRequestCts(ct); + cts = GenerateRequestCts(ct); try { @@ -565,7 +708,7 @@ public override async Task SendAsync(CancellationToken ct = default) // retry if appropriate - error that's retriable, and retries are enabled if (Options.retry_timeout > TimeSpan.Zero) { - if (response == null // if it was a cannot correct error + if (response == null // if it was a cannot correct error || (!response.IsSuccessStatusCode // or some other http error && IsRetriableError(response.StatusCode))) { @@ -575,9 +718,9 @@ public override async Task SendAsync(CancellationToken ct = default) while (retryTimer.Elapsed < Options.retry_timeout // whilst we can still retry && ( - response == null || // either we can't connect - (!response.IsSuccessStatusCode && // or we have another http error - IsRetriableError(response.StatusCode))) + response == null || // either we can't connect + (!response.IsSuccessStatusCode && // or we have another http error + IsRetriableError(response.StatusCode))) ) { retryInterval = TimeSpan.FromMilliseconds(Math.Min(retryInterval.TotalMilliseconds * 2, 1000)); @@ -588,16 +731,24 @@ public override async Task SendAsync(CancellationToken ct = default) cts.Dispose(); cts = null; + // Rotate to next address if multiple are available + if (_addressProvider.HasMultipleAddresses) + { + _addressProvider.RotateToNextAddress(); + } + request = GenerateRequest(); - cts = GenerateRequestCts(ct); + cts = GenerateRequestCts(ct); var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(0, 10) - 10 / 2.0); await Task.Delay(retryInterval + jitter, cts.Token); try { - response = await _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, - cts.Token); + // Get the client for the current address (may have rotated) + var client = GetClientForCurrentAddress(); + response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, + cts.Token); } catch (HttpRequestException) { @@ -611,12 +762,13 @@ public override async Task SendAsync(CancellationToken ct = default) if (response == null) { throw new IngressError(ErrorCode.ServerFlushError, - $"Cannot connect to `{Options.Host}:{Options.Port}`"); + $"Cannot connect to `{_addressProvider.CurrentHost}:{_addressProvider.CurrentPort}`"); } // return if ok if (response.IsSuccessStatusCode) { + CleanupUnusedClients(); return; } @@ -649,15 +801,20 @@ public override async Task SendAsync(CancellationToken ct = default) } /// - /// Determines whether the specified HTTP status code represents a transient error that should be retried. + /// Determines whether the specified HTTP status code represents a transient error that should be retried. /// /// The HTTP status code to check. - /// true if the error is transient and retriable (e.g., 500, 503, 504, 509, 523, 524, 529, 599); otherwise, false. + /// + /// true if the error is transient and retriable (e.g., 404, 421, 500, 503, 504, 509, 523, 524, 529, 599); + /// otherwise, false. + /// // ReSharper disable once IdentifierTypo private static bool IsRetriableError(HttpStatusCode code) { switch (code) { + case HttpStatusCode.NotFound: // 404 - Can happen when instance doesn't have write access + case (HttpStatusCode)421: // Misdirected Request - Can indicate wrong server/instance case HttpStatusCode.InternalServerError: case HttpStatusCode.ServiceUnavailable: case HttpStatusCode.GatewayTimeout: @@ -676,8 +833,21 @@ private static bool IsRetriableError(HttpStatusCode code) /// public override void Dispose() { - _client.Dispose(); - _handler.Dispose(); + // Dispose all cached clients + foreach (var client in _clientCache.Values) + { + client.Dispose(); + } + + _clientCache.Clear(); + + // Dispose all cached handlers + foreach (var handler in _handlerCache.Values) + { + handler.Dispose(); + } + + _handlerCache.Clear(); Buffer.Clear(); Buffer.TrimExcessBuffers(); } diff --git a/src/net-questdb-client/Senders/ISender.cs b/src/net-questdb-client/Senders/ISender.cs index d790b18..9e6a8d1 100644 --- a/src/net-questdb-client/Senders/ISender.cs +++ b/src/net-questdb-client/Senders/ISender.cs @@ -273,22 +273,6 @@ public interface ISender : IDisposable /// The same instance to allow fluent chaining. public ISender Column(ReadOnlySpan name, ReadOnlySpan value) where T : struct; - /// - /// Adds a column with the specified string value to the current row. - /// - /// The column name. - /// The column's string value; may be null. - /// The same sender instance for fluent chaining. - public ISender Column(ReadOnlySpan name, string? value) - { - if (value is null) - { - return this; - } - - return Column(name, value.AsSpan()); - } - /// /// Adds a column whose value is a sequence of value-type elements with the given multidimensional shape when both /// and are provided; no action is taken if either is null. @@ -457,4 +441,53 @@ public ISender NullableColumn(ReadOnlySpan name, decimal? value) return this; } + + /// + /// Adds a GUID column with the specified name and value to the current row. + /// + /// The column name. + /// The GUID value to store in the column. + /// The sender instance for fluent call chaining. + public ISender Column(ReadOnlySpan name, Guid value); + + /// + /// Adds a character column with the specified name and value to the current row. + /// + /// The column name. + /// The character value to store in the column. + /// The sender instance for fluent call chaining. + public ISender Column(ReadOnlySpan name, char value); + + /// + /// Adds a nullable GUID column with the specified name when a value is provided; does nothing if the value is null. + /// + /// The column name. + /// The nullable GUID value to add as a column; if null, the column is not added. + /// The sender instance for fluent call chaining. + public ISender NullableColumn(ReadOnlySpan name, Guid? value) + { + if (value != null) + { + return Column(name, value.Value); + } + + return this; + } + + /// + /// Adds a nullable character column with the specified name when a value is provided; does nothing if the value is + /// null. + /// + /// The column name. + /// The nullable character value to add as a column; if null, the column is not added. + /// The sender instance for fluent call chaining. + public ISender NullableColumn(ReadOnlySpan name, char? value) + { + if (value != null) + { + return Column(name, value.Value); + } + + return this; + } } \ No newline at end of file diff --git a/src/net-questdb-client/Utils/AddressProvider.cs b/src/net-questdb-client/Utils/AddressProvider.cs new file mode 100644 index 0000000..d0cc893 --- /dev/null +++ b/src/net-questdb-client/Utils/AddressProvider.cs @@ -0,0 +1,166 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2024 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +namespace QuestDB.Utils; + +/// +/// Manages round-robin rotation through a list of addresses for failover support. +/// +public class AddressProvider +{ + private readonly List _addresses; + private int _currentIndex; + + /// + /// Creates a new AddressProvider with the given list of addresses. + /// + /// List of addresses to rotate through + public AddressProvider(IReadOnlyList addresses) + { + if (addresses.Count == 0) + { + throw new ArgumentException("At least one address must be provided", nameof(addresses)); + } + + _addresses = new List(addresses); + _currentIndex = 0; + } + + /// + /// Gets the current address without changing the index. + /// + public string CurrentAddress => _addresses[_currentIndex]; + + /// + /// Gets the host from the current address. + /// + public string CurrentHost => ParseHost(_addresses[_currentIndex]); + + /// + /// Gets the port from the current address. + /// + public int CurrentPort => ParsePort(_addresses[_currentIndex]); + + /// + /// Gets or sets the current address index. + /// Used internally to save/restore state during operations like /settings probing. + /// + internal int CurrentIndex + { + get => _currentIndex; + set => _currentIndex = value; + } + + /// + /// Gets the number of addresses. + /// + public int AddressCount => _addresses.Count; + + /// + /// Checks if there are multiple addresses available. + /// + public bool HasMultipleAddresses => _addresses.Count > 1; + + /// + /// Rotates to the next address in round-robin fashion. + /// + /// The next address + public void RotateToNextAddress() + { + _currentIndex = (_currentIndex + 1) % _addresses.Count; + } + + /// + /// Parses the host from an address string. + /// Supports both regular (host:port) and IPv6 ([ipv6]:port) formats. + /// For IPv6 addresses, returns the complete bracketed form including '[' and ']'. + /// + public static string ParseHost(string address) + { + if (string.IsNullOrEmpty(address)) + return address; + + // Handle IPv6 addresses in bracket notation: [ipv6]:port + if (address.StartsWith("[")) + { + var closingBracketIndex = address.IndexOf(']'); + if (closingBracketIndex > 0) + { + // Return the entire bracketed section as the host + return address.Substring(0, closingBracketIndex + 1); + } + } + + // For non-bracketed addresses, use the last colon to split host and port + var colonIndex = address.LastIndexOf(':'); + if (colonIndex > 0) + { + return address.Substring(0, colonIndex); + } + + return address; + } + + /// + /// Parses the port from an address string. + /// Supports both regular (host:port) and IPv6 ([ipv6]:port) formats. + /// Returns -1 if no port is specified. + /// + public static int ParsePort(string address) + { + if (string.IsNullOrEmpty(address)) + return -1; + + // Handle IPv6 addresses in bracket notation: [ipv6]:port + if (address.StartsWith("[")) + { + var closingBracketIndex = address.IndexOf(']'); + if (closingBracketIndex > 0 && closingBracketIndex < address.Length - 1) + { + // Check if there's a colon after the closing bracket + if (address[closingBracketIndex + 1] == ':') + { + var portString = address.Substring(closingBracketIndex + 2); + if (int.TryParse(portString, out var port)) + { + return port; + } + } + } + return -1; + } + + // For non-bracketed addresses, use the last colon to split host and port + var colonIndex = address.LastIndexOf(':'); + if (colonIndex >= 0 && colonIndex < address.Length - 1) + { + if (int.TryParse(address.Substring(colonIndex + 1), out var port)) + { + return port; + } + } + + return -1; + } +} diff --git a/src/net-questdb-client/Utils/SenderOptions.cs b/src/net-questdb-client/Utils/SenderOptions.cs index e8c6b87..af11799 100644 --- a/src/net-questdb-client/Utils/SenderOptions.cs +++ b/src/net-questdb-client/Utils/SenderOptions.cs @@ -57,6 +57,7 @@ public record SenderOptions }; private string _addr = "localhost:9000"; + private List _addresses = new(); private TimeSpan _authTimeout = TimeSpan.FromMilliseconds(15000); private AutoFlushType _autoFlush = AutoFlushType.on; private int _autoFlushBytes = int.MaxValue; @@ -90,6 +91,7 @@ public record SenderOptions /// public SenderOptions() { + ParseAddresses(); } /// @@ -102,6 +104,7 @@ public SenderOptions(string confStr) ParseEnumWithDefault(nameof(protocol), "http", out _protocol); ParseEnumWithDefault(nameof(protocol_version), "auto", out _protocol_version); ParseStringWithDefault(nameof(addr), "localhost:9000", out _addr!); + ParseAddresses(); ParseEnumWithDefault(nameof(auto_flush), "on", out _autoFlush); ParseIntThatMayBeOff(nameof(auto_flush_rows), IsHttp() ? "75000" : "600", out _autoFlushRows); ParseIntThatMayBeOff(nameof(auto_flush_bytes), int.MaxValue.ToString(), out _autoFlushBytes); @@ -154,6 +157,7 @@ public ProtocolVersion protocol_version /// /// /// Used to populate the and fields. + /// When multiple addresses are configured, this returns the first one. /// public string addr { @@ -161,6 +165,22 @@ public string addr set => _addr = value; } + /// + /// List of all configured addresses for failover. + /// + /// + /// Contains all addresses specified via multiple `addr` entries in the configuration string. + /// The list is never empty; it contains at least the primary address. + /// + [JsonIgnore] + public IReadOnlyList addresses => _addresses.AsReadOnly(); + + /// + /// Gets the number of configured addresses. + /// + [JsonIgnore] + public int AddressCount => _addresses.Count; + /// /// Enables or disables automatic flushing of rows. /// Defaults to . @@ -570,10 +590,29 @@ private void ReadConfigStringIntoBuilder(string confStr) } var splits = confStr.Split("::"); + var paramString = splits[1]; + + // Parse addresses manually before using DbConnectionStringBuilder + // because DbConnectionStringBuilder only keeps the last value for duplicate keys + _addresses.Clear(); + foreach (var param in paramString.Split(';')) + { + if (string.IsNullOrWhiteSpace(param)) continue; + + var kvp = param.Split('='); + if (kvp.Length == 2 && kvp[0].Trim() == "addr") + { + var addrValue = kvp[1].Trim(); + if (!string.IsNullOrEmpty(addrValue)) + { + _addresses.Add(addrValue); + } + } + } _connectionStringBuilder = new DbConnectionStringBuilder { - ConnectionString = splits[1], + ConnectionString = paramString, }; VerifyCorrectKeysInConfigString(); @@ -665,6 +704,15 @@ private void VerifyCorrectKeysInConfigString() } } + private void ParseAddresses() + { + // If no addresses were parsed from config string, use the primary addr + if (_addresses.Count == 0) + { + _addresses.Add(_addr); + } + } + /// /// Construct a new from the current options. ///