diff --git a/CLAUDE.md b/CLAUDE.md index 2b7a1c9..6ea9685 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -26,6 +26,7 @@ Key behaviors: - **Retries**: failed flushes retry with exponential backoff indefinitely during normal operation; `Close()` retries up to 5 times then discards. - **Setup is lazy**: the background goroutines start on the first `Ingest` call via `sync.Once`. - **`Close()`** is idempotent and safe to call before the first `Ingest`. +- **Gzip**: off by default; `SetGzip(true)` compresses each batch and sends `Content-Encoding: gzip` (the endpoint must accept gzip-encoded ingest). ### Search (sync) diff --git a/ingest_test.go b/ingest_test.go index 81b2369..f178cba 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1,6 +1,7 @@ package quickwit_test import ( + "compress/gzip" "encoding/json" "io" "net/http" @@ -240,6 +241,64 @@ func TestIngest_AppliesAuthHook(t *testing.T) { } } +// Core: with gzip enabled, the request carries Content-Encoding: gzip and the +// decompressed body still delivers every record in order. +func TestIngest_GzipCompressesBody(t *testing.T) { + const numItems = 250 + + var mu sync.Mutex + var received []int + sawGzip := true // require every request to be gzip-encoded + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Content-Encoding") != "gzip" { + mu.Lock() + sawGzip = false + mu.Unlock() + w.WriteHeader(http.StatusBadRequest) + return + } + gr, err := gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + body, _ := io.ReadAll(gr) + gr.Close() + + mu.Lock() + received = append(received, parseIndices(body)...) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(50) + c.SetGzip(true) + + for i := 0; i < numItems; i++ { + c.Ingest(map[string]any{"index": i}) + } + c.Close() + + mu.Lock() + defer mu.Unlock() + + if !sawGzip { + t.Fatal("a request arrived without Content-Encoding: gzip") + } + if len(received) != numItems { + t.Fatalf("received %d items, want %d", len(received), numItems) + } + for i, idx := range received { + if idx != i { + t.Errorf("position %d: got index %d, want %d", i, idx, i) + } + } +} + // Core: with multiple concurrent workers, every record is delivered exactly once // (no loss, no duplication). func TestIngest_ConcurrentWorkersDeliverEachItemOnce(t *testing.T) { diff --git a/quickwit.go b/quickwit.go index acac19d..a56155c 100644 --- a/quickwit.go +++ b/quickwit.go @@ -2,6 +2,7 @@ package quickwit import ( "bytes" + "compress/gzip" "context" "encoding/json" "fmt" @@ -46,6 +47,7 @@ type Client struct { closeSignal chan struct{} onDiscard OnDiscardFunc autoReduceBatchSize bool + gzipEnabled bool } func NewClient(endpoint string) *Client { @@ -91,6 +93,13 @@ func (c *Client) SetAutoReduceBatchSize(autoReduceBatchSize bool) { c.autoReduceBatchSize = autoReduceBatchSize } +// SetGzip enables gzip compression of the ingest request body. When enabled, +// each batch is compressed and sent with a Content-Encoding: gzip header. +// The Quickwit endpoint must accept gzip-encoded ingest requests. +func (c *Client) SetGzip(enabled bool) { + c.gzipEnabled = enabled +} + func (c *Client) OnDiscard(f OnDiscardFunc) { c.onDiscard = f } @@ -212,6 +221,15 @@ func (c *Client) loop() { var buf bytes.Buffer jsonEnc := json.NewEncoder(&buf) + // gzip state is per-worker and reused across flushes via Reset to avoid + // reallocating the compressor on every batch. + useGzip := c.gzipEnabled + var gzBuf bytes.Buffer + var gzw *gzip.Writer + if useGzip { + gzw = gzip.NewWriter(&gzBuf) + } + batchSize := c.getBatchSize() buffer := make([]any, 0, batchSize) var resetBatchSizeAfter time.Time @@ -245,16 +263,36 @@ func (c *Client) loop() { return true } + // body holds the NDJSON payload, gzip-compressed when enabled. Writing + // to a bytes.Buffer cannot fail, so gzw errors are not expected here. + body := buf.Bytes() + if useGzip { + gzBuf.Reset() + gzw.Reset(&gzBuf) + if _, err := gzw.Write(buf.Bytes()); err != nil { + slog.Error("quickwit: failed to gzip ingest body", "error", err) + return false + } + if err := gzw.Close(); err != nil { + slog.Error("quickwit: failed to finalize gzip ingest body", "error", err) + return false + } + body = gzBuf.Bytes() + } + ctx := context.Background() cancel := context.CancelFunc(func() {}) if t := c.getIngestTimeout(); t != 0 { ctx, cancel = context.WithTimeout(ctx, t) } defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(buf.Bytes())) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return false } + if useGzip { + req.Header.Set("Content-Encoding", "gzip") + } c.doAuth(req) resp, err := c.httpClient().Do(req)