diff --git a/client_internal_test.go b/client_internal_test.go new file mode 100644 index 0000000..68221ad --- /dev/null +++ b/client_internal_test.go @@ -0,0 +1,50 @@ +package quickwit + +import ( + "net/http" + "testing" +) + +// The default client's transport must size its idle-connection pool to the +// worker count, so raising SetConcurrent does not get throttled by the stdlib +// default of MaxIdleConnsPerHost = 2. +func TestHTTPClient_DefaultTransportSizedToConcurrent(t *testing.T) { + const concurrent = 16 + + c := NewClient("http://example/api/v1/test") + c.SetConcurrent(concurrent) + + got := c.httpClient() + tr, ok := got.Transport.(*http.Transport) + if !ok { + t.Fatalf("transport = %T, want *http.Transport", got.Transport) + } + if tr.MaxIdleConnsPerHost != concurrent { + t.Errorf("MaxIdleConnsPerHost = %d, want %d", tr.MaxIdleConnsPerHost, concurrent) + } + if tr.MaxIdleConns < concurrent { + t.Errorf("MaxIdleConns = %d, want >= %d", tr.MaxIdleConns, concurrent) + } +} + +// The default client is built once and reused, so workers share one connection +// pool rather than each creating its own. +func TestHTTPClient_DefaultClientCached(t *testing.T) { + c := NewClient("http://example/api/v1/test") + + if c.httpClient() != c.httpClient() { + t.Error("httpClient returned different default clients across calls") + } +} + +// A user-supplied client is used verbatim, untouched by the default tuning. +func TestHTTPClient_UserClientPassthrough(t *testing.T) { + custom := &http.Client{} + + c := NewClient("http://example/api/v1/test") + c.SetHTTPClient(custom) + + if c.httpClient() != custom { + t.Error("httpClient did not return the user-supplied client") + } +} diff --git a/quickwit.go b/quickwit.go index 1b1957c..acac19d 100644 --- a/quickwit.go +++ b/quickwit.go @@ -38,6 +38,8 @@ type Client struct { discard bool concurrent int ingestBuffer chan any + defaultClient *http.Client + onceDefaultClient sync.Once onceSetup sync.Once onceClose sync.Once stopWg sync.WaitGroup @@ -94,10 +96,33 @@ func (c *Client) OnDiscard(f OnDiscardFunc) { } func (c *Client) httpClient() *http.Client { - if c.client == nil { - return http.DefaultClient + if c.client != nil { + return c.client } - return c.client + // When no client is supplied, fall back to a tuned default instead of + // http.DefaultClient. Its transport's MaxIdleConnsPerHost defaults to 2, + // which would close (and force a fresh handshake on) every connection + // beyond the first two when SetConcurrent raises the worker count. Sizing + // the idle pool to the worker count lets each worker keep its connection + // hot for reuse. Built once, lazily, so the pool is shared across workers. + c.onceDefaultClient.Do(func() { + c.defaultClient = c.newDefaultClient() + }) + return c.defaultClient +} + +func (c *Client) newDefaultClient() *http.Client { + concurrent := c.getConcurrent() + + t := http.DefaultTransport.(*http.Transport).Clone() + t.MaxIdleConnsPerHost = concurrent + if t.MaxIdleConns < concurrent { + t.MaxIdleConns = concurrent + } + + // No client-level timeout: the per-request context deadline from + // getIngestTimeout already bounds each flush, matching prior behavior. + return &http.Client{Transport: t} } func (c *Client) getMaxDelay() time.Duration {