Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Key behaviors:
- **Retries**: failed flushes retry with exponential backoff indefinitely during normal operation; `Close()` retries up to 5 times then discards.
- **Setup is lazy**: the background goroutines start on the first `Ingest` call via `sync.Once`.
- **`Close()`** is idempotent and safe to call before the first `Ingest`.
- **Gzip**: off by default; `SetGzip(true)` compresses each batch and sends `Content-Encoding: gzip` (the endpoint must accept gzip-encoded ingest).

### Search (sync)

Expand Down
59 changes: 59 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package quickwit_test

import (
"compress/gzip"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -240,6 +241,64 @@ func TestIngest_AppliesAuthHook(t *testing.T) {
}
}

// Core: with gzip enabled, the request carries Content-Encoding: gzip and the
// decompressed body still delivers every record in order.
func TestIngest_GzipCompressesBody(t *testing.T) {
const numItems = 250

var mu sync.Mutex
var received []int
sawGzip := true // require every request to be gzip-encoded

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Encoding") != "gzip" {
mu.Lock()
sawGzip = false
mu.Unlock()
w.WriteHeader(http.StatusBadRequest)
return
}
gr, err := gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
body, _ := io.ReadAll(gr)
gr.Close()

mu.Lock()
received = append(received, parseIndices(body)...)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.SetBatchSize(50)
c.SetGzip(true)

for i := 0; i < numItems; i++ {
c.Ingest(map[string]any{"index": i})
}
c.Close()

mu.Lock()
defer mu.Unlock()

if !sawGzip {
t.Fatal("a request arrived without Content-Encoding: gzip")
}
if len(received) != numItems {
t.Fatalf("received %d items, want %d", len(received), numItems)
}
for i, idx := range received {
if idx != i {
t.Errorf("position %d: got index %d, want %d", i, idx, i)
}
}
}

// Core: with multiple concurrent workers, every record is delivered exactly once
// (no loss, no duplication).
func TestIngest_ConcurrentWorkersDeliverEachItemOnce(t *testing.T) {
Expand Down
40 changes: 39 additions & 1 deletion quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package quickwit

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -46,6 +47,7 @@ type Client struct {
closeSignal chan struct{}
onDiscard OnDiscardFunc
autoReduceBatchSize bool
gzipEnabled bool
}

func NewClient(endpoint string) *Client {
Expand Down Expand Up @@ -91,6 +93,13 @@ func (c *Client) SetAutoReduceBatchSize(autoReduceBatchSize bool) {
c.autoReduceBatchSize = autoReduceBatchSize
}

// SetGzip enables gzip compression of the ingest request body. When enabled,
// each batch is compressed and sent with a Content-Encoding: gzip header.
// The Quickwit endpoint must accept gzip-encoded ingest requests.
func (c *Client) SetGzip(enabled bool) {
c.gzipEnabled = enabled
}

func (c *Client) OnDiscard(f OnDiscardFunc) {
c.onDiscard = f
}
Expand Down Expand Up @@ -212,6 +221,15 @@ func (c *Client) loop() {
var buf bytes.Buffer
jsonEnc := json.NewEncoder(&buf)

// gzip state is per-worker and reused across flushes via Reset to avoid
// reallocating the compressor on every batch.
useGzip := c.gzipEnabled
var gzBuf bytes.Buffer
var gzw *gzip.Writer
if useGzip {
gzw = gzip.NewWriter(&gzBuf)
}

batchSize := c.getBatchSize()
buffer := make([]any, 0, batchSize)
var resetBatchSizeAfter time.Time
Expand Down Expand Up @@ -245,16 +263,36 @@ func (c *Client) loop() {
return true
}

// body holds the NDJSON payload, gzip-compressed when enabled. Writing
// to a bytes.Buffer cannot fail, so gzw errors are not expected here.
body := buf.Bytes()
if useGzip {
gzBuf.Reset()
gzw.Reset(&gzBuf)
if _, err := gzw.Write(buf.Bytes()); err != nil {
slog.Error("quickwit: failed to gzip ingest body", "error", err)
return false
}
if err := gzw.Close(); err != nil {
slog.Error("quickwit: failed to finalize gzip ingest body", "error", err)
return false
}
body = gzBuf.Bytes()
}

ctx := context.Background()
cancel := context.CancelFunc(func() {})
if t := c.getIngestTimeout(); t != 0 {
ctx, cancel = context.WithTimeout(ctx, t)
}
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(buf.Bytes()))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return false
}
if useGzip {
req.Header.Set("Content-Encoding", "gzip")
}
c.doAuth(req)

resp, err := c.httpClient().Do(req)
Expand Down
Loading