Skip to content

Latest commit

 

History

History
481 lines (361 loc) · 12.3 KB

File metadata and controls

481 lines (361 loc) · 12.3 KB

Backend Streaming API Guide

Overview

As of v0.5.0, the Orbit Backend trait has been refactored to support streaming operations, eliminating memory bottlenecks when working with large files and directories.

Key Changes

write(): Now accepts AsyncRead instead of Bytes (no more OOM on large files) ✅ list(): Now returns Stream<DirEntry> instead of Vec<DirEntry> (constant memory for huge directories) ✅ S3 Multipart: Automatically used for files >5MB (supports up to 5TB) ✅ Download Optimization: Sliding window concurrency (better throughput on variable-latency networks)


Migration Guide

Old API (v0.4.x)

use orbit::backend::{Backend, LocalBackend, WriteOptions};
use bytes::Bytes;

// ❌ Old: Load entire file into memory
let data = tokio::fs::read("large_file.bin").await?;
let bytes = Bytes::from(data); // OOM if file is >RAM

backend.write(path, bytes, WriteOptions::default()).await?;

// ❌ Old: Load all directory entries into Vec
let entries = backend.list(path, options).await?;
for entry in entries { // OOM if millions of entries
    println!("{}", entry.path.display());
}

New API (v0.5.0+)

use orbit::backend::{Backend, LocalBackend, WriteOptions};
use tokio::fs::File;
use tokio::io::AsyncRead;
use futures::StreamExt;

// ✅ New: Stream file directly from disk
let file = File::open("large_file.bin").await?;
let reader: Box<dyn AsyncRead + Unpin + Send> = Box::new(file);
let size = metadata.len();

backend.write(path, reader, Some(size), WriteOptions::default()).await?;

// ✅ New: Stream directory entries
let mut stream = backend.list(path, options).await?;
while let Some(entry) = stream.next().await {
    let entry = entry?;
    println!("{}", entry.path.display());
}

Usage Examples

Example 1: Upload Large File to S3

Before (OOM on files >RAM):

// ❌ This loads entire 10GB file into memory
let data = tokio::fs::read("10GB.iso").await?;
s3_backend.write(path, Bytes::from(data), options).await?; // OOM!

After (Constant Memory):

use tokio::fs::File;

// ✅ Stream 10GB file with ~200MB RAM usage
let file = File::open("10GB.iso").await?;
let metadata = file.metadata().await?;
let reader: Box<dyn AsyncRead + Unpin + Send> = Box::new(file);

// S3Backend automatically uses multipart upload for files >5MB
let bytes_written = s3_backend
    .write(path, reader, Some(metadata.len()), WriteOptions::default())
    .await?;

println!("Uploaded {} bytes using multipart", bytes_written);

Key Points:

  • Files >5MB: Multipart upload (streams in 5MB chunks)
  • Files <5MB: Single PutObject (buffered for efficiency)
  • Maximum file size: 5TB (S3 limit)
  • Memory usage: ~200MB regardless of file size

Example 2: List Large S3 Bucket

Before (OOM on millions of objects):

// ❌ This loads all entries into memory
let entries = s3_backend.list(path, options).await?; // OOM if 10M objects!
for entry in entries {
    process(entry);
}

After (Constant Memory):

use futures::StreamExt;

// ✅ Stream entries lazily - constant memory
let mut stream = s3_backend.list(path, options).await?;

while let Some(entry_result) = stream.next().await {
    let entry = entry_result?;
    process(entry); // Process incrementally
}

Key Points:

  • S3 pages (max 1000 objects) fetched lazily
  • Memory usage: ~10MB regardless of bucket size
  • Can list millions of objects without OOM
  • Early termination possible (e.g., "find first match")

Example 3: Download with Optimized Concurrency

The new sliding window download ensures full pipeline utilization:

use orbit::protocol::s3::S3Client;

let client = S3Client::new(config).await?;

// Download with sliding window concurrency
client.download_file_resumable(
    "large-dataset.tar.gz",
    Path::new("./dataset.tar.gz"),
    0  // resume_offset
).await?;

How It Works:

  1. Old: Queue 4 chunks → Wait for ALL 4 → Queue next 4 (wasted bandwidth if one chunk is slow)
  2. New: Queue 4 chunks → As EACH completes, queue another (pipeline stays full)

Performance Impact:

  • Variable latency networks: 30-50% faster
  • Stable networks: Similar performance
  • Better handling of transient slowdowns

Example 4: Stream Generated Data

You don't need a file - any AsyncRead works:

use tokio::io::AsyncRead;
use std::pin::Pin;
use std::task::{Context, Poll};

// Custom reader that generates data on-the-fly
struct DataGenerator {
    remaining: u64,
}

impl AsyncRead for DataGenerator {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        if self.remaining == 0 {
            return Poll::Ready(Ok(()));
        }

        let chunk_size = buf.remaining().min(self.remaining as usize);
        let data = vec![0x42; chunk_size]; // Generate 'B' bytes
        buf.put_slice(&data);
        self.remaining -= chunk_size as u64;

        Poll::Ready(Ok(()))
    }
}

// Upload generated data without ever storing it on disk
let generator = DataGenerator { remaining: 1_000_000_000 }; // 1GB
let reader: Box<dyn AsyncRead + Unpin + Send> = Box::new(generator);

s3_backend
    .write(path, reader, Some(1_000_000_000), WriteOptions::default())
    .await?;

Example 5: Early Termination with Streaming List

Find first match without listing entire directory:

use futures::StreamExt;

let mut stream = backend.list(path, ListOptions::recursive()).await?;

// Find first .txt file and stop
let first_txt = stream
    .filter_map(|result| async move {
        match result {
            Ok(entry) if entry.path.extension() == Some("txt".as_ref()) => Some(entry),
            _ => None,
        }
    })
    .next()
    .await;

if let Some(entry) = first_txt {
    println!("Found: {}", entry.path.display());
}
// Stream is dropped here - remaining entries never fetched!

Example 6: Concurrent Operations with Streaming

Process multiple large operations concurrently:

use futures::stream::{self, StreamExt};
use tokio::fs::File;

// Upload 10 large files concurrently
let files = vec![
    "file1.bin", "file2.bin", "file3.bin",
    "file4.bin", "file5.bin", "file6.bin",
    "file7.bin", "file8.bin", "file9.bin", "file10.bin",
];

let uploads = stream::iter(files)
    .map(|filename| async move {
        let file = File::open(filename).await?;
        let metadata = file.metadata().await?;
        let reader: Box<dyn AsyncRead + Unpin + Send> = Box::new(file);

        s3_backend.write(
            Path::new(&format!("uploads/{}", filename)),
            reader,
            Some(metadata.len()),
            WriteOptions::default(),
        ).await
    })
    .buffer_unordered(3) // 3 concurrent uploads
    .collect::<Vec<_>>()
    .await;

for result in uploads {
    match result {
        Ok(bytes) => println!("Uploaded {} bytes", bytes),
        Err(e) => eprintln!("Upload failed: {}", e),
    }
}

Performance Characteristics

Memory Usage

Operation v0.4.x (Old) v0.5.0 (New) Improvement
Upload 10GB file 10GB+ ~200MB 50x less
List 1M S3 objects ~500MB ~10MB 50x less
Download 5GB file 5GB+ ~100MB 50x less

Supported Scales

Resource v0.4.x Limit v0.5.0 Limit
Max upload file size ~RAM size 5TB
Max download file size ~RAM size Unlimited
Max directory entries ~RAM/1KB Unlimited
S3 bucket objects ~10,000 Millions

Backend-Specific Notes

LocalBackend

  • write(): Uses tokio::io::copy() for efficient streaming
  • list(): Collects then streams (acceptable - local FS rarely has millions of files)
  • Supports Unix permissions and timestamps

S3Backend

  • write():
    • <5MB: PutObject (single request)
    • ≥5MB: Multipart upload (5MB chunks, up to 10,000 parts = 5TB)
  • list(): True lazy pagination (fetches S3 pages on-demand)
  • Respects WriteOptions (content_type, metadata, server-side encryption)

SshBackend

  • write(): Buffers in memory (ssh2 crate is synchronous)
  • list(): Collects then streams
  • ⚠️ Note: Limited by ssh2 library - true streaming planned for v0.6.0

SmbBackend

  • write(): Buffers in memory (SMB client is async but accepts full buffer)
  • list(): Collects then streams
  • Works with Windows shares, Samba, NAS devices

Error Handling

Streams can fail at any point during iteration:

use futures::StreamExt;

let mut stream = backend.list(path, options).await?;

while let Some(entry_result) = stream.next().await {
    match entry_result {
        Ok(entry) => {
            // Process successfully
            println!("{}", entry.path.display());
        }
        Err(e) => {
            // Handle error mid-stream
            eprintln!("Error listing entry: {}", e);
            // Can continue or break depending on requirements
        }
    }
}

Best Practices

1. Always Provide size_hint

// ✅ Good: Enables optimal upload strategy
let file = File::open(path).await?;
let size = file.metadata().await?.len();
backend.write(dest, Box::new(file), Some(size), options).await?;

// ⚠️ OK but suboptimal: Forces multipart for all files
backend.write(dest, Box::new(file), None, options).await?;

2. Use Appropriate Chunk Size for S3

// Default: 5MB chunks (good for most cases)
let config = S3Config {
    chunk_size: 5 * 1024 * 1024,
    parallel_operations: 4,
    ..Default::default()
};

// High-throughput networks: Larger chunks
let config = S3Config {
    chunk_size: 50 * 1024 * 1024, // 50MB
    parallel_operations: 8,
    ..Default::default()
};

3. Handle Stream Errors Gracefully

let mut stream = backend.list(path, options).await?;
let mut successful = 0;
let mut failed = 0;

while let Some(result) = stream.next().await {
    match result {
        Ok(_) => successful += 1,
        Err(e) => {
            failed += 1;
            tracing::warn!("Entry error: {}", e);
        }
    }
}

println!("Processed {} entries, {} errors", successful, failed);

4. Use Buffered Streams for I/O

use tokio::io::BufReader;

// ✅ Buffered read for better performance
let file = File::open(path).await?;
let buffered = BufReader::with_capacity(256 * 1024, file); // 256KB buffer
let reader: Box<dyn AsyncRead + Unpin + Send> = Box::new(buffered);

backend.write(dest, reader, size_hint, options).await?;

Troubleshooting

"Out of Memory" Errors

Symptom: Still getting OOM despite streaming API

Cause: Accidentally buffering somewhere

Solution: Check your code for:

// ❌ Don't do this - defeats streaming
let data = tokio::fs::read(path).await?; // Buffers entire file
backend.write(dest, Box::new(std::io::Cursor::new(data)), ...);

// ✅ Do this instead
let file = File::open(path).await?;
backend.write(dest, Box::new(file), ...);

Slow S3 Uploads

Symptom: Uploads slower than expected

Possible Causes:

  1. Small chunk_size: Too many API calls
    • Solution: Increase chunk_size to 10-50MB
  2. Low parallel_operations: Underutilized bandwidth
    • Solution: Increase parallel_operations to 8-16
  3. No size_hint: Forced multipart for small files
    • Solution: Always provide size_hint

Stream Never Completes

Symptom: stream.next().await hangs indefinitely

Cause: Backend error not being propagated

Solution: Check for error handling:

while let Some(entry) = stream.next().await {
    let entry = entry?; // ✅ Propagate errors
    process(entry);
}

Version Compatibility

Version write() list() Notes
v0.4.x Bytes Vec<DirEntry> Old API
v0.5.0 AsyncRead Stream<DirEntry> Breaking change
v0.6.0 (planned) AsyncRead Stream<DirEntry> SSH true streaming

Additional Resources


Feedback

Found an issue or have a suggestion? Please file an issue at: https://github.com/anthropics/orbit/issues