Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions client_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package quickwit

import (
"net/http"
"testing"
)

// The default client's transport must size its idle-connection pool to the
// worker count, so raising SetConcurrent does not get throttled by the stdlib
// default of MaxIdleConnsPerHost = 2.
func TestHTTPClient_DefaultTransportSizedToConcurrent(t *testing.T) {
const concurrent = 16

c := NewClient("http://example/api/v1/test")
c.SetConcurrent(concurrent)

got := c.httpClient()
tr, ok := got.Transport.(*http.Transport)
if !ok {
t.Fatalf("transport = %T, want *http.Transport", got.Transport)
}
if tr.MaxIdleConnsPerHost != concurrent {
t.Errorf("MaxIdleConnsPerHost = %d, want %d", tr.MaxIdleConnsPerHost, concurrent)
}
if tr.MaxIdleConns < concurrent {
t.Errorf("MaxIdleConns = %d, want >= %d", tr.MaxIdleConns, concurrent)
}
}

// The default client is built once and reused, so workers share one connection
// pool rather than each creating its own.
func TestHTTPClient_DefaultClientCached(t *testing.T) {
c := NewClient("http://example/api/v1/test")

if c.httpClient() != c.httpClient() {
t.Error("httpClient returned different default clients across calls")
}
}

// A user-supplied client is used verbatim, untouched by the default tuning.
func TestHTTPClient_UserClientPassthrough(t *testing.T) {
custom := &http.Client{}

c := NewClient("http://example/api/v1/test")
c.SetHTTPClient(custom)

if c.httpClient() != custom {
t.Error("httpClient did not return the user-supplied client")
}
}
31 changes: 28 additions & 3 deletions quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Client struct {
discard bool
concurrent int
ingestBuffer chan any
defaultClient *http.Client
onceDefaultClient sync.Once
onceSetup sync.Once
onceClose sync.Once
stopWg sync.WaitGroup
Expand Down Expand Up @@ -94,10 +96,33 @@ func (c *Client) OnDiscard(f OnDiscardFunc) {
}

func (c *Client) httpClient() *http.Client {
if c.client == nil {
return http.DefaultClient
if c.client != nil {
return c.client
}
return c.client
// When no client is supplied, fall back to a tuned default instead of
// http.DefaultClient. Its transport's MaxIdleConnsPerHost defaults to 2,
// which would close (and force a fresh handshake on) every connection
// beyond the first two when SetConcurrent raises the worker count. Sizing
// the idle pool to the worker count lets each worker keep its connection
// hot for reuse. Built once, lazily, so the pool is shared across workers.
c.onceDefaultClient.Do(func() {
c.defaultClient = c.newDefaultClient()
})
return c.defaultClient
}

func (c *Client) newDefaultClient() *http.Client {
concurrent := c.getConcurrent()

t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConnsPerHost = concurrent
if t.MaxIdleConns < concurrent {
t.MaxIdleConns = concurrent
}

// No client-level timeout: the per-request context deadline from
// getIngestTimeout already bounds each flush, matching prior behavior.
return &http.Client{Transport: t}
}

func (c *Client) getMaxDelay() time.Duration {
Expand Down
Loading