vortex is a zero-dependency Go 1.23 library that brings lazy evaluation, structured concurrency, and fault tolerance to data pipeline development.
Built on Go 1.23's iter.Seq and iter.Seq2 interfaces, vortex treats every data source, database cursors, CSV streams, JSONL files, HTTP response as a unified lazy sequence. Transformations compose without intermediate allocations. Pipelines cancel cleanly through context propagation. Workers coordinate without leaking goroutines.
The result is pipelines that scale from a single row to a billion rows with flat memory, predictable latency, and production-grade error handling all without leaving idiomatic Go.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Source │ ──► │ Transformation │ ──► │ Transformation │ ──► │ Terminal │
│ (iter.Seq) │ │ (Filter/Map) │ │ (Take/Chunk) │ │ (Drain/Range) │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └──────────────────┘
▲ ▲ ▲ ▲
│ │ │ │
slices.Values iterx.FilterSeq iterx.TakeSeq iterx.DrainSeq
sources.CSVRows iterx.Filter iterx.Take iterx.Drain
sources.DBRows iterx.Map iterx.Chunk iterx.ForEach
slices.Values parallel.BatchMap parallel.ParallelMap for v := range
go get github.com/MostafaMagdSalama/vortex@latestGo 1.23 or later.
| Package | What it does |
|---|---|
vortex/iterx |
Lazy sequences — Filter, Map, Take, FlatMap |
vortex/parallel |
Parallel processing for iter.Seq and iter.Seq2 |
vortex/resilience |
Fault tolerance — Retry, Backoff, CircuitBreaker |
vortex/sources |
Data sources — CSVRows, DBRows, Lines, FileLines |
iterx now includes paired APIs: use *Seq helpers like FilterSeq and MapSeq with plain iter.Seq[T], and use the original names like Filter and Map with iter.Seq2[T, error].
parallel follows the same pattern: use ParallelMapSeq, BatchMapSeq, and OrderedParallelMapSeq with plain iter.Seq[T], and use ParallelMap, BatchMap, and OrderedParallelMap with iter.Seq2[T, error].
| Approach | Peak memory | Rows read | Notes |
|---|---|---|---|
| Eager (load all) | 287 MB | 1,000,000 | loads entire file into RAM |
| Lazy (vortex) | 3 MB | 1,000,000 | streams one row at a time |
95x less memory with lazy processing.
View detailed benchmark scaling
╔══════════════════════════════════════════╗
║ memory scaling data ║
╚══════════════════════════════════════════╝
file size eager peak vortex peak
────────── ────────── ───────────
1M rows 287 MB 3 MB
10M rows ~2.8 GB 3 MB
100M rows out of memory 3 MB
| Approach | Peak memory | Rows read | Notes |
|---|---|---|---|
| Eager (load all) | 247 MB | 1,000,000 | loads all rows before processing |
| Lazy (vortex) | ~397 KB | 10 | stops the moment it has what it needs |
636x less memory with lazy processing.
View detailed benchmark output
╔══════════════════════════════════════════╗
║ with vortex (lazy) ║
╚══════════════════════════════════════════╝
memory after creating source: 393 KB
memory after defining filter: 393 KB
memory after defining map: 393 KB
memory after defining take: 393 KB
peak memory: 397 KB
rows read: 10 out of 1,000,000
╔══════════════════════════════════════════╗
║ without vortex (eager) ║
╚══════════════════════════════════════════╝
memory after loading all rows: 134 MB
memory after filtering: 204 MB
memory after extracting names: 247 MB
rows loaded: 1,000,000
The lazy approach stops reading from the database the moment it has enough results — it never touches the remaining 999,990 rows.
| Approach | Peak memory | Time | Notes |
|---|---|---|---|
| Eager (load all) | 194 MB | ~909 ms | decodes the entire file into memory before processing |
| Lazy (vortex) | 1 MB | ~24 ms | streams one line at a time |
194x less memory and ~37x faster with lazy processing.
View detailed benchmark output
╔══════════════════════════════════════════╗
║ with vortex (lazy) ║
╚══════════════════════════════════════════╝
memory before: 3 MB
memory after open: 3 MB
memory after JSONLines: 3 MB
memory after unwrap: 3 MB
memory after Filter: 3 MB
memory after Take: 3 MB
memory before range: 3 MB
memory after range: 1 MB
result:
errors found: 100
time: 24.7103ms
peak memory: 1 MB
╔══════════════════════════════════════════╗
║ without vortex (eager) ║
╚══════════════════════════════════════════╝
memory before: 274 KB
memory after ReadFile: 57 MB
memory after Split: 72 MB
memory after decode all: 168 MB
memory after filter: 194 MB
memory after take: 194 MB
result:
errors found: 100
total lines: 1000000
time: 909.2772ms
peak memory: 194 MB
| Approach | Peak memory | Time | Notes |
|---|---|---|---|
| Eager (load all) | 440 MB | ~12.6s | loads all rows into RAM, then filters, then writes |
| Lazy (vortex) | 77 MB | ~3.7s | streams one row at a time directly into the Excel file |
5.7x less memory and 3.4x faster with lazy processing.
View detailed benchmark output
╔══════════════════════════════════════════╗
║ with vortex (lazy) ║
╚══════════════════════════════════════════╝
rows written : 996667
time : 3.7112285s
mem before : 0.73 MB
mem after : 77.65 MB
mem delta : +76.92 MB
╔══════════════════════════════════════════╗
║ without vortex (eager) ║
╚══════════════════════════════════════════╝
mem after loading all rows : 213.58 MB
mem after filtering : 315.11 MB
rows written : 996667
time : 12.595819s
mem before : 53.61 MB
mem after : 493.71 MB
mem delta : +440.10 MB
The eager approach holds three copies of the data in RAM simultaneously — the original slice, the filtered slice, and the Excel in-memory XML tree. The lazy approach holds one row at a time regardless of table size.
Vortex sources return iter.Seq2[T, error]. That includes sources.CSVRows, sources.DBRows, sources.Lines, and sources.JSONLines.
When your pipeline starts from vortex/sources, use the iter.Seq2-aware helpers like iterx.Filter, iterx.Map, iterx.Take, iterx.FlatMap, iterx.Validate, iterx.Drain, and iterx.ForEach.
This is how Vortex preserves error handling across the pipeline: read failures, decode failures, scan failures, and cancellation errors stay attached to the stream instead of being lost at the source boundary.
Vortex also keeps iterx.FilterSeq, iterx.MapSeq, iterx.TakeSeq, iterx.DrainSeq, and the other *Seq helpers for plain iter.Seq[T] values such as slices.Values(...) or custom generators that do not yield errors.
The split exists so iter.Seq2 pipelines stay safe and explicit, while ordinary iter.Seq pipelines remain simple and ergonomic.
Vortex provides a unified error handling architecture to ensure safety and transparency across pipelines. All library packages bubble up errors rather than failing silently.
All underlying errors (like network failures or database disconnects) are wrapped in vortex.Error. You can use errors.As to retrieve the original error and the operation that failed:
import (
"errors"
"github.com/MostafaMagdSalama/vortex"
)
// inside your pipeline execution
err := iterx.Drain(ctx, mySeq, processor)
var vErr *vortex.Error
if errors.As(err, &vErr) {
fmt.Printf("Failed operation: %s\n", vErr.Op)
fmt.Printf("Underlying root cause: %v\n", vErr.Err)
}Vortex exposes common failure states as sentinel errors in the root package. You can check for them using errors.Is:
vortex.ErrCancelled: Returned when the pipeline's context is cancelled.vortex.ErrValidation: Returned when validation conditions fail (e.g.iterx.Validate).vortex.ErrCircuitOpen: Returned byresilience.CircuitBreakerwhen the service is rejecting traffic.
if errors.Is(err, vortex.ErrCircuitOpen) {
// Serve from cache instead
}import (
"slices"
"github.com/MostafaMagdSalama/vortex/iterx"
)
numbers := slices.Values([]int{1, 2, 3, 4, 5})
for v := range iterx.FilterSeq(context.Background(), numbers, func(n int) bool { return n > 2 }) {
fmt.Println(v) // 3, 4, 5
}import (
"slices"
"github.com/MostafaMagdSalama/vortex/parallel"
)
numbers := slices.Values([]int{1, 2, 3, 4, 5})
for v := range parallel.ParallelMapSeq(context.Background(), numbers, func(n int) int {
return n * 2
}, 4) {
fmt.Println(v) // 2, 4, 6, 8, 10 (unordered)
}for v := range parallel.OrderedParallelMapSeq(context.Background(), numbers, func(n int) int {
return n * 2
}, 4) {
fmt.Println(v) // 2, 4, 6, 8, 10 (strictly ordered)
}for v := range parallel.BatchMapSeq(context.Background(), numbers, func(batch []int) []int {
results := make([]int, len(batch))
for i, v := range batch {
results[i] = v * 2
}
return results
}, 3) {
fmt.Println(v)
}To see real-world, runnable examples for both the *Seq helpers and the error-aware iter.Seq2 helpers, visit the pkg.go.dev documentation or explore iterx/example_test.go in the repository.
sources.CSVRows accepts any io.Reader and returns a lazy sequence of rows.
The source is always streamed - never fully loaded into memory.
file, err := os.Open("users.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
for row, err := range sources.CSVRows(ctx, file) {
if err != nil {
log.Fatal(err)
}
fmt.Println(row)
}func uploadHandler(w http.ResponseWriter, r *http.Request) {
file, _, err := r.FormFile("csv")
if err != nil {
http.Error(w, err.Error(), 400)
return
}
defer file.Close()
for row, err := range sources.CSVRows(r.Context(), file) {
if err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Println(row)
}
}req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://s3.amazonaws.com/bucket/file.csv", nil)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
for row, err := range sources.CSVRows(ctx, resp.Body) {
if err != nil {
log.Fatal(err)
}
fmt.Println(row)
}file, _ := os.Open("users.csv")
defer file.Close()
// skip header row, filter active users, take first 10 names
rows := sources.CSVRows(ctx, file)
filtered := iterx.Filter(ctx, rows, func(row []string) bool { ... })
taken := iterx.Take(ctx, filtered, 10)
names := iterx.Map(ctx, taken, func(row []string) string { return row[1] })
for name, err := range names {
if err != nil { log.Fatal(err) }
fmt.Println(name)
}All three sources satisfy io.Reader. CSVRows reads one record at a
time regardless of whether the source is a file, an HTTP upload, or a network
stream.
multipart upload -> io.Reader -> CSVRows -> one row at a time
presigned URL -> io.Reader -> CSVRows -> one row at a time
local file -> io.Reader -> CSVRows -> one row at a time
import (
"github.com/MostafaMagdSalama/vortex/iterx"
"github.com/MostafaMagdSalama/vortex/sources"
)
// reads one row at a time — stops as soon as Take is satisfied
rows := sources.DBRows(ctx, db, "SELECT id, name, email, status FROM users", scanUser)
filtered := iterx.Filter(ctx, rows, func(u User) bool { return u.Status == "active" })
taken := iterx.Take(ctx, filtered, 5)
names := iterx.Map(ctx, taken, func(u User) string { return u.Name })
for name, err := range names {
if err != nil { log.Fatal(err) }
fmt.Println(name)
}import (
"context"
"github.com/MostafaMagdSalama/vortex/resilience"
)
err := resilience.Retry(context.Background(), resilience.DefaultRetry, func(attempt int) error {
return callSomeAPI()
})cb := resilience.NewCircuitBreaker(5, 10*time.Second)
err := cb.Execute(ctx, func(ctx context.Context) error {
return callSomeAPI(ctx)
})
if errors.Is(err, resilience.ErrCircuitOpen) {
// service is down, circuit is open
}cb := resilience.NewCircuitBreaker(5, 10*time.Second)
err := resilience.Retry(ctx, resilience.DefaultRetry, func(attempt int) error {
return cb.Execute(ctx, func(ctx context.Context) error {
return callSomeAPI(ctx)
})
})MIT