From 0506c1e026b8b1bad6ddf1d9da1d911e59c3dea3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 24 Nov 2025 16:03:25 -0800 Subject: [PATCH 1/8] rfc: Making Storage a trait --- docs/rfcs/0002_storage_trait.md | 334 ++++++++++++++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 docs/rfcs/0002_storage_trait.md diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md new file mode 100644 index 0000000000..050e0fa3b1 --- /dev/null +++ b/docs/rfcs/0002_storage_trait.md @@ -0,0 +1,334 @@ +# Making Storage a Trait in Iceberg-rust + +## Background + +### Existing Implementation +The existing code implements storage functionality through a concrete Storage enum that handles different storage backends (S3, local filesystem, GCS, etc.). This implementation is tightly coupled with OpenDAL as the underlying storage layer. The FileIO struct wraps this Storage enum and provides a high-level API for file operations. + +Original structure: + +- **FileIO:** Main interface for file operations +- **Storage:** Enum with variants for different storage backends +- **InputFile / OutputFile:** Concrete structs for reading and writing files + +All storage operations are implemented directly in these concrete types, making it hard to extend or customize the storage layer without modifying the core codebase. + +### Problem Statement +The original design has several limitations: + +- **Tight Coupling** – All storage logic depends on OpenDAL, limiting flexibility. Users cannot easily opt in for other storage implementations like `object_store` +- **Customization Barriers** – Users cannot easily add custom behaviors or optimizations + +As discussed in Issue #1314, making Storage a trait would allow pluggable storage and better integration with existing systems. + +## Design + +### New Architecture + +The new architecture uses trait-based abstractions with a registry pattern and separate storage crates: + +``` +┌─────────────────────────────────────────────────────┐ +│ crates/iceberg/src/io/ │ +│ ┌───────────────────────────────────────────────┐ │ +│ │ Storage Trait & Registry │ │ +│ │ - pub trait Storage │ │ +│ │ - pub trait StorageBuilder │ │ +│ │ - pub struct StorageBuilderRegistry │ │ +│ │ - pub struct InputFile │ │ +│ │ - pub struct OutputFile │ │ +│ └───────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────┘ + ▲ + │ + ┌───────────────┼───────────────┬───────────────┐ + │ │ │ │ +┌───────┴────────────┐ │ ┌────────────┴──────┐ ┌────┴──────────┐ +│ crates/storage/ │ │ │ crates/storage/ │ │ Third-Party │ +│ opendal/ │ │ │ object_store/ │ │ Crates │ +│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ │ ┌───────────┐ │ +│ │ opendal-s3 │ │ │ │ │ objstore-s3 │ │ │ │ custom │ │ +│ │ impl Storage │ │ │ │ │ impl Storage │ │ │ │ storage │ │ +│ │ impl Builder │ │ │ │ │ impl Builder │ │ │ │impl traits│ │ +│ └────────────────┘ │ │ │ └───────────────┘ │ │ └───────────┘ │ +│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ └───────────────┘ +│ │ opendal-fs │ │ │ │ │ objstore-gcs │ │ +│ │ impl Storage │ │ │ │ │ impl Storage │ │ +│ │ impl Builder │ │ │ │ │ impl Builder │ │ +│ └────────────────┘ │ │ │ └───────────────┘ │ +│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ +│ │ opendal-gcs │ │ │ │ │ objstore-azure│ │ +│ │ impl Storage │ │ │ │ │ impl Storage │ │ +│ │ impl Builder │ │ │ │ │ impl Builder │ │ +│ └────────────────┘ │ │ │ └───────────────┘ │ +│ ... (oss, azure, │ │ └───────────────────┘ +│ memory) │ │ +└────────────────────┘ │ +``` + +### Storage Trait +The Storage trait defines the interface for all storage operations. This implementation uses Option 2 from the initial design: Storage as a trait with concrete `InputFile` and `OutputFile` structs. + +```rust +#[async_trait] +pub trait Storage: Debug + Send + Sync { + // File existence and metadata + async fn exists(&self, path: &str) -> Result; + async fn metadata(&self, path: &str) -> Result; + + // Reading operations + async fn read(&self, path: &str) -> Result; + async fn reader(&self, path: &str) -> Result>; + + // Writing operations + async fn write(&self, path: &str, bs: Bytes) -> Result<()>; + async fn writer(&self, path: &str) -> Result>; + + // Deletion operations + async fn delete(&self, path: &str) -> Result<()>; + async fn remove_dir_all(&self, path: &str) -> Result<()>; + + // File object creation + fn new_input(&self, path: &str) -> Result; + fn new_output(&self, path: &str) -> Result; +} +``` + +### InputFile and OutputFile Structs + +`InputFile` and `OutputFile` are concrete structs that contain a reference to the storage: + +```rust + +pub struct InputFile { + pub storage: Arc, + pub path: String, +} + +pub struct OutputFile { + pub storage: Arc, + pub path: String, +} +``` + +Functions in `InputFile` and `OutputFile` delegate operations to the underlying Storage: + +```rust + +impl InputFile { + pub async fn exists(&self) -> Result { + self.storage.exists(&self.path).await + } + + pub async fn read(&self) -> Result { + self.storage.read(&self.path).await + } + // ... other methods +} +``` + +Benefits of this approach: + +- Simpler and easier to maintain +- Less trait object overhead +- Clear delegation pattern +- Sufficient flexibility for most use cases + +## StorageBuilder and Registry + +### StorageBuilder Trait +The `StorageBuilder` trait defines how storage backends are constructed: + +```rust +pub trait StorageBuilder: Debug + Send + Sync { + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result>; +} +``` + +Key design decisions: + +- Uses `&self` instead of `self` - builders are reusable +- No associated type - returns `Arc` directly +- `Send + Sync` for thread safety + +### StorageBuilderRegistry +The registry manages storage builders and provides lookup by scheme: + +```rust +#[derive(Debug, Clone)] +pub struct StorageBuilderRegistry { + builders: HashMap>, +} + +impl StorageBuilderRegistry { + pub fn new() -> Self { /* ... */ } + pub fn register(&mut self, scheme: impl Into, builder: Arc); + pub fn get_builder(&self, scheme: &str) -> Result>; + pub fn supported_types(&self) -> Vec; +} +``` + +Features: + +- Automatic registration based on enabled cargo features +- Runtime registration of custom builders +- Case-insensitive scheme lookup +- Thread-safe and cloneable + +## Example Usage + +```rust +use iceberg::io::FileIOBuilder; + +// Basic usage (same as the existing code) +let file_io = FileIOBuilder::new("s3") + .with_prop("s3.region", "us-west-2") + .build()?; + +// Custom storage registration +use iceberg::io::{StorageBuilderRegistry, StorageBuilder}; + +let mut registry = StorageBuilderRegistry::new(); +registry.register("custom", Arc::new(MyCustomStorageBuilder)); + +// Check supported types +println!("Supported: {:?}", registry.supported_types()); +``` + +## Storage Implementations +Each storage backend has its own implementation: + +| Storage | File | Struct | Builder | Schemes | +|---------|------|--------|---------|---------| +| S3 | `storage_s3.rs` | `OpenDALS3Storage` | `OpenDALS3StorageBuilder` | `s3`, `s3a` | +| GCS | `storage_gcs.rs` | `OpenDALGcsStorage` | `OpenDALGcsStorageBuilder` | `gs`, `gcs` | +| OSS | `storage_oss.rs` | `OpenDALOssStorage` | `OpenDALOssStorageBuilder` | `oss` | +| Azure | `storage_azdls.rs` | `OpenDALAzdlsStorage` | `OpenDALAzdlsStorageBuilder` | `abfs`, `abfss`, `wasb`, `wasbs` | +| Filesystem | `storage_fs.rs` | `OpenDALFsStorage` | `OpenDALFsStorageBuilder` | `file`, `""` | +| Memory | `storage_memory.rs` | `OpenDALMemoryStorage` | `OpenDALMemoryStorageBuilder` | `memory` | + +### Implementation Pattern +Each storage follows this consistent pattern: + +```rust +// 1. Storage struct with configuration +#[derive(Debug, Clone)] +pub struct OpenDALXxxStorage { + config: Arc, +} + +// 2. Helper method to create operator +impl OpenDALXxxStorage { + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + // Create OpenDAL operator with retry layer + } +} + +// 3. Storage trait implementation +#[async_trait] +impl Storage for OpenDALXxxStorage { + async fn exists(&self, path: &str) -> Result { /* ... */ } + async fn metadata(&self, path: &str) -> Result { /* ... */ } + + // ... implement all 10 methods +} + +// 4. Builder struct +#[derive(Debug)] +pub struct OpenDALXxxStorageBuilder; + +// 5. Builder trait implementation +impl StorageBuilder for OpenDALXxxStorageBuilder { + fn build(&self, props, extensions) -> Result> { + // Parse configuration and create storage + } +} +``` + +### Component Flow + +``` +User Code + ↓ +FileIOBuilder::new("s3") + ↓ +FileIOBuilder::build() + ↓ +StorageBuilderRegistry::new() + ↓ +registry.get_builder("s3") + ↓ +OpenDALS3StorageBuilder + ↓ +builder.build(props, extensions) + ↓ +OpenDALS3Storage (implements Storage) + ↓ +FileIO { inner: Arc } +``` + +## Custom Storage Implementation +To implement a custom storage backend: + +```rust +use std::collections::HashMap; +use std::sync::Arc; +use async_trait::async_trait; +use bytes::Bytes; +use iceberg::io::{ + Storage, StorageBuilder, Extensions, FileMetadata, + FileRead, FileWrite, InputFile, OutputFile +}; + +use iceberg::Result; + +// 1. Define your storage struct +#[derive(Debug, Clone)] +struct MyCustomStorage { + // Your configuration +} + +// 2. Implement the Storage trait +#[async_trait] +impl Storage for MyCustomStorage { + async fn exists(&self, path: &str) -> Result { + // Your implementation + } + // ... implement all 10 methods +} + +// 3. Define your builder +#[derive(Debug)] +struct MyCustomStorageBuilder; + +// 4. Implement StorageBuilder +impl StorageBuilder for MyCustomStorageBuilder { + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result> { + // Parse configuration from props + Ok(Arc::new(MyCustomStorage::new(props)?)) + } + +} + +// 5. Register your builder +let mut registry = StorageBuilderRegistry::new(); +registry.register("custom", Arc::new(MyCustomStorageBuilder)); +``` + +## Implementation Plan + +- **Phase 1 (Initial Implementation):** + - Define core traits (Storage, optionally InputFile/OutputFile) + - Implement StorageBuilder + StorageLoader + - Refactor OpenDAL to use traits +- **Phase 2:** Split storage backends into separate crates (crates/storage/opendal) +- **Phase 3:** Replace FileIO with StorageBuilderRegistry in Catalog structs +- **Phase 4:** Add object_store + other backends From 16bea1975e08c51dfde7111cfaa6ae62020e70ca Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 24 Nov 2025 16:07:34 -0800 Subject: [PATCH 2/8] add license header --- docs/rfcs/0002_storage_trait.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index 050e0fa3b1..5502bdff4c 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -1,3 +1,22 @@ + + # Making Storage a Trait in Iceberg-rust ## Background From e7d67d82aacfef76069ba960e42b00bde535f42b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 25 Nov 2025 10:02:50 -0800 Subject: [PATCH 3/8] Update docs/rfcs/0002_storage_trait.md Co-authored-by: Renjie Liu --- docs/rfcs/0002_storage_trait.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index 5502bdff4c..6eb8f7d890 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -92,7 +92,7 @@ The Storage trait defines the interface for all storage operations. This impleme #[async_trait] pub trait Storage: Debug + Send + Sync { // File existence and metadata - async fn exists(&self, path: &str) -> Result; + async fn exists(&self, path: AsRef) -> Result; async fn metadata(&self, path: &str) -> Result; // Reading operations From fe59361b43297f3712275ad583f54e16a4923df3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 25 Nov 2025 10:18:30 -0800 Subject: [PATCH 4/8] Revert "Update docs/rfcs/0002_storage_trait.md" This reverts commit e7d67d82aacfef76069ba960e42b00bde535f42b. --- docs/rfcs/0002_storage_trait.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index 6eb8f7d890..5502bdff4c 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -92,7 +92,7 @@ The Storage trait defines the interface for all storage operations. This impleme #[async_trait] pub trait Storage: Debug + Send + Sync { // File existence and metadata - async fn exists(&self, path: AsRef) -> Result; + async fn exists(&self, path: &str) -> Result; async fn metadata(&self, path: &str) -> Result; // Reading operations From 075933507f572160020667ba8c7d35d1f1fe29fd Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 3 Dec 2025 14:39:44 -0800 Subject: [PATCH 5/8] add open questions section, update implementation plan --- docs/rfcs/0002_storage_trait.md | 227 +++++++++++++++++++++++++------- 1 file changed, 177 insertions(+), 50 deletions(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index 5502bdff4c..d332c3d101 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -52,8 +52,8 @@ The new architecture uses trait-based abstractions with a registry pattern and s │ ┌───────────────────────────────────────────────┐ │ │ │ Storage Trait & Registry │ │ │ │ - pub trait Storage │ │ -│ │ - pub trait StorageBuilder │ │ -│ │ - pub struct StorageBuilderRegistry │ │ +│ │ - pub trait StorageFactory │ │ +│ │ - pub struct StorageRegistry │ │ │ │ - pub struct InputFile │ │ │ │ - pub struct OutputFile │ │ │ └───────────────────────────────────────────────┘ │ @@ -68,17 +68,17 @@ The new architecture uses trait-based abstractions with a registry pattern and s │ ┌────────────────┐ │ │ │ ┌───────────────┐ │ │ ┌───────────┐ │ │ │ opendal-s3 │ │ │ │ │ objstore-s3 │ │ │ │ custom │ │ │ │ impl Storage │ │ │ │ │ impl Storage │ │ │ │ storage │ │ -│ │ impl Builder │ │ │ │ │ impl Builder │ │ │ │impl traits│ │ +│ │ impl Factory │ │ │ │ │ impl Factory │ │ │ │impl traits│ │ │ └────────────────┘ │ │ │ └───────────────┘ │ │ └───────────┘ │ │ ┌────────────────┐ │ │ │ ┌───────────────┐ │ └───────────────┘ │ │ opendal-fs │ │ │ │ │ objstore-gcs │ │ │ │ impl Storage │ │ │ │ │ impl Storage │ │ -│ │ impl Builder │ │ │ │ │ impl Builder │ │ +│ │ impl Factory │ │ │ │ │ impl Factory │ │ │ └────────────────┘ │ │ │ └───────────────┘ │ │ ┌────────────────┐ │ │ │ ┌───────────────┐ │ │ │ opendal-gcs │ │ │ │ │ objstore-azure│ │ │ │ impl Storage │ │ │ │ │ impl Storage │ │ -│ │ impl Builder │ │ │ │ │ impl Builder │ │ +│ │ impl Factory │ │ │ │ │ impl Factory │ │ │ └────────────────┘ │ │ │ └───────────────┘ │ │ ... (oss, azure, │ │ └───────────────────┘ │ memory) │ │ @@ -105,7 +105,7 @@ pub trait Storage: Debug + Send + Sync { // Deletion operations async fn delete(&self, path: &str) -> Result<()>; - async fn remove_dir_all(&self, path: &str) -> Result<()>; + async fn delete_prefix(&self, path: &str) -> Result<()>; // File object creation fn new_input(&self, path: &str) -> Result; @@ -153,13 +153,13 @@ Benefits of this approach: - Clear delegation pattern - Sufficient flexibility for most use cases -## StorageBuilder and Registry +## StorageFactory and Registry -### StorageBuilder Trait -The `StorageBuilder` trait defines how storage backends are constructed: +### StorageFactory Trait +The `StorageFactory` trait defines how storage backends are constructed: ```rust -pub trait StorageBuilder: Debug + Send + Sync { +pub trait StorageFactory: Debug + Send + Sync { fn build( &self, props: HashMap, @@ -170,24 +170,24 @@ pub trait StorageBuilder: Debug + Send + Sync { Key design decisions: -- Uses `&self` instead of `self` - builders are reusable +- Uses `&self` instead of `self` - factories are reusable - No associated type - returns `Arc` directly - `Send + Sync` for thread safety -### StorageBuilderRegistry -The registry manages storage builders and provides lookup by scheme: +### StorageRegistry +The registry manages storage factories and provides lookup by scheme: ```rust #[derive(Debug, Clone)] -pub struct StorageBuilderRegistry { - builders: HashMap>, +pub struct StorageRegistry { + factories: HashMap>, } -impl StorageBuilderRegistry { +impl StorageRegistry { pub fn new() -> Self { /* ... */ } - pub fn register(&mut self, scheme: impl Into, builder: Arc); - pub fn get_builder(&self, scheme: &str) -> Result>; - pub fn supported_types(&self) -> Vec; + pub fn register(&mut self, scheme: impl Into, factory: Arc); + pub fn get_factory(&self, scheme: &str) -> Result>; + pub fn supported_types(&self) -> impl Iterator; } ``` @@ -209,26 +209,27 @@ let file_io = FileIOBuilder::new("s3") .build()?; // Custom storage registration -use iceberg::io::{StorageBuilderRegistry, StorageBuilder}; +use iceberg::io::{StorageRegistry, StorageFactory}; -let mut registry = StorageBuilderRegistry::new(); -registry.register("custom", Arc::new(MyCustomStorageBuilder)); +let mut registry = StorageRegistry::new(); +registry.register("custom", Arc::new(MyCustomStorageFactory)); // Check supported types -println!("Supported: {:?}", registry.supported_types()); +let supported: Vec<_> = registry.supported_types().collect(); +println!("Supported: {:?}", supported); ``` ## Storage Implementations Each storage backend has its own implementation: -| Storage | File | Struct | Builder | Schemes | +| Storage | File | Struct | Factory | Schemes | |---------|------|--------|---------|---------| -| S3 | `storage_s3.rs` | `OpenDALS3Storage` | `OpenDALS3StorageBuilder` | `s3`, `s3a` | -| GCS | `storage_gcs.rs` | `OpenDALGcsStorage` | `OpenDALGcsStorageBuilder` | `gs`, `gcs` | -| OSS | `storage_oss.rs` | `OpenDALOssStorage` | `OpenDALOssStorageBuilder` | `oss` | -| Azure | `storage_azdls.rs` | `OpenDALAzdlsStorage` | `OpenDALAzdlsStorageBuilder` | `abfs`, `abfss`, `wasb`, `wasbs` | -| Filesystem | `storage_fs.rs` | `OpenDALFsStorage` | `OpenDALFsStorageBuilder` | `file`, `""` | -| Memory | `storage_memory.rs` | `OpenDALMemoryStorage` | `OpenDALMemoryStorageBuilder` | `memory` | +| S3 | `storage_s3.rs` | `OpenDALS3Storage` | `OpenDALS3StorageFactory` | `s3`, `s3a` | +| GCS | `storage_gcs.rs` | `OpenDALGcsStorage` | `OpenDALGcsStorageFactory` | `gs`, `gcs` | +| OSS | `storage_oss.rs` | `OpenDALOssStorage` | `OpenDALOssStorageFactory` | `oss` | +| Azure | `storage_azdls.rs` | `OpenDALAzdlsStorage` | `OpenDALAzdlsStorageFactory` | `abfs`, `abfss`, `wasb`, `wasbs` | +| Filesystem | `storage_fs.rs` | `OpenDALFsStorage` | `OpenDALFsStorageFactory` | `file`, `""` | +| Memory | `storage_memory.rs` | `OpenDALMemoryStorage` | `OpenDALMemoryStorageFactory` | `memory` | ### Implementation Pattern Each storage follows this consistent pattern: @@ -256,12 +257,12 @@ impl Storage for OpenDALXxxStorage { // ... implement all 10 methods } -// 4. Builder struct +// 4. Factory struct #[derive(Debug)] -pub struct OpenDALXxxStorageBuilder; +pub struct OpenDALXxxStorageFactory; -// 5. Builder trait implementation -impl StorageBuilder for OpenDALXxxStorageBuilder { +// 5. Factory trait implementation +impl StorageFactory for OpenDALXxxStorageFactory { fn build(&self, props, extensions) -> Result> { // Parse configuration and create storage } @@ -277,13 +278,13 @@ FileIOBuilder::new("s3") ↓ FileIOBuilder::build() ↓ -StorageBuilderRegistry::new() +StorageRegistry::new() ↓ -registry.get_builder("s3") +registry.get_factory("s3") ↓ -OpenDALS3StorageBuilder +OpenDALS3StorageFactory ↓ -builder.build(props, extensions) +factory.build(props, extensions) ↓ OpenDALS3Storage (implements Storage) ↓ @@ -299,7 +300,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use iceberg::io::{ - Storage, StorageBuilder, Extensions, FileMetadata, + Storage, StorageFactory, Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile }; @@ -320,12 +321,12 @@ impl Storage for MyCustomStorage { // ... implement all 10 methods } -// 3. Define your builder +// 3. Define your factory #[derive(Debug)] -struct MyCustomStorageBuilder; +struct MyCustomStorageFactory; -// 4. Implement StorageBuilder -impl StorageBuilder for MyCustomStorageBuilder { +// 4. Implement StorageFactory +impl StorageFactory for MyCustomStorageFactory { fn build( &self, props: HashMap, @@ -337,17 +338,143 @@ impl StorageBuilder for MyCustomStorageBuilder { } -// 5. Register your builder -let mut registry = StorageBuilderRegistry::new(); -registry.register("custom", Arc::new(MyCustomStorageBuilder)); +// 5. Register your factory +let mut registry = StorageRegistry::new(); +registry.register("custom", Arc::new(MyCustomStorageFactory)); ``` +## Open Questions + +These design decisions need to be resolved during implementation: + +### 1. Storage Granularity (Phase 1) + +**Question:** Should we have a general `Storage` implementation that works with multiple schemes, or specific implementations per scheme? + +**Option A: General Storage (e.g., `OpenDALStorage`)** +- Single implementation handles all schemes (s3, gcs, fs, etc.) +- Scheme detection happens at runtime +- Simpler codebase with less duplication + +```rust +#[derive(Debug, Clone)] +pub struct OpenDALStorage { + operator: Operator, + scheme: String, +} +``` + +**Option B: Scheme-Specific Storage (e.g., `OpenDALS3Storage`, `OpenDALGcsStorage`)** +- Each storage backend has its own implementation +- Type-safe configuration per backend +- Better compile-time guarantees +- More explicit and easier to optimize per backend + +```rust +#[derive(Debug, Clone)] +pub struct OpenDALS3Storage { + config: Arc, +} + +#[derive(Debug, Clone)] +pub struct OpenDALGcsStorage { + config: Arc, +} +``` + +**Current Implementation:** The RFC describes Option B (scheme-specific) + +### 2. Registry Location (Phase 1) + +**Question:** Where should the `StorageRegistry` live? + +This question depends on the answer to Question 1: + +**If Option A (General Storage):** +- Do we even need a registry? A single `OpenDALStorage` could handle all schemes internally +- Registration should happen when the crate is loaded and a registry will not be necessary + +**If Option B (Scheme-Specific Storage):** +- **Option 2a: Global Static Registry** - Single process-wide registry with lazy initialization + - Pros: Simple to use, no need to pass registry around + - Cons: Global state, harder to test, potential initialization ordering issues + +- **Option 2b: Catalog-Owned Registry** - Each catalog instance owns its registry + - Pros: Better encapsulation, easier testing, no global state + - Cons: More complex API, need to pass registry through layers + +### 3. Error Handling Strategy (Phase 2) + +**Question:** How should storage errors be represented? + +**Option A: Enum-Based Errors** +```rust +#[derive(Debug, thiserror::Error)] +pub enum StorageError { + #[error("File not found: {path}")] + NotFound { path: String }, + + #[error("Permission denied: {path}")] + PermissionDenied { path: String }, + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Backend error: {0}")] + Backend(Box), +} +``` +- Pros: Type-safe, pattern matching, clear error categories +- Cons: Need to map all backend errors to enum variants + +**Option B: Trait-Based Errors with Into/From** +```rust +pub trait StorageError: std::error::Error + Send + Sync + 'static { + fn is_not_found(&self) -> bool; + fn is_permission_denied(&self) -> bool; +} + +// Each backend implements its own error type +impl StorageError for OpenDALError { /* ... */ } +impl StorageError for ObjectStoreError { /* ... */ } +``` +- Pros: Flexible, preserves original error information, easier for backends +- Cons: Less type-safe, harder to handle errors uniformly + +**Option C: Hybrid Approach** +```rust +#[derive(Debug, thiserror::Error)] +pub enum StorageError { + #[error("File not found: {path}")] + NotFound { path: String }, + + #[error(transparent)] + Other(#[from] Box), +} +``` +- Pros: Common errors are typed, uncommon errors are wrapped +- Cons: Some loss of type information for wrapped errors + +**Recommendation:** Option C (Hybrid) provides a good balance, with common errors like `NotFound` being strongly typed while allowing backends to preserve their specific error details. + ## Implementation Plan - **Phase 1 (Initial Implementation):** - Define core traits (Storage, optionally InputFile/OutputFile) - - Implement StorageBuilder + StorageLoader + - Implement StorageFactory + StorageRegistry - Refactor OpenDAL to use traits -- **Phase 2:** Split storage backends into separate crates (crates/storage/opendal) -- **Phase 3:** Replace FileIO with StorageBuilderRegistry in Catalog structs -- **Phase 4:** Add object_store + other backends + - **Resolve:** Question 1 (Storage granularity) and Question 2 (Registry location) + - **Decision needed:** Choose between general vs. scheme-specific storage implementations + +- **Phase 2 (Stabilize Storage Trait):** + - Move concrete storage implementations to a separate crate `iceberg-storage` + - Add new API: `delete_iter` to provide batch delete operations + - Remove `FileIOBuilder` and `Extensions` - serializable custom Storage implementations should handle their use cases + - **Resolve:** Question 3 (Error handling strategy) + - **Decision needed:** Define `StorageError` type and conversion strategy if needed + - Stabilize the Storage trait API for long-term compatibility + +- **Phase 3:** Add object_store + other backends + - Implement `object_store`-based storage backends in separate crates + - Validate error handling works across different backend implementations + - Ensure the stabilized Storage trait works well with alternative implementations From 4ba2cd725fb7f95a09d4c8790426b09a97db1206 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 3 Dec 2025 14:48:42 -0800 Subject: [PATCH 6/8] Add a section for typetag serialization --- docs/rfcs/0002_storage_trait.md | 90 ++++++++++++++++++++++++++++----- 1 file changed, 77 insertions(+), 13 deletions(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index d332c3d101..3305592471 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -198,6 +198,74 @@ Features: - Case-insensitive scheme lookup - Thread-safe and cloneable +### Serialization with typetag + +To enable serialization of `Storage` and `StorageFactory` trait objects, we use the `typetag` crate. This allows dynamic trait objects to be serialized and deserialized, which is essential for: + +- Persisting storage configurations +- Sending storage instances across process boundaries +- Supporting custom storage implementations in distributed systems + +The traits are annotated with `#[typetag::serde]`: + +```rust +#[async_trait] +#[typetag::serde(tag = "type")] +pub trait Storage: Debug + Send + Sync { + // ... trait methods +} + +#[typetag::serde(tag = "type")] +pub trait StorageFactory: Debug + Send + Sync { + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result>; +} +``` + +Each implementation must also use the `#[typetag::serde]` attribute: + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpenDALS3Storage { + config: Arc, +} + +#[async_trait] +#[typetag::serde] +impl Storage for OpenDALS3Storage { + // ... implementation +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenDALS3StorageFactory; + +#[typetag::serde] +impl StorageFactory for OpenDALS3StorageFactory { + // ... implementation +} +``` + +Benefits: + +- **Type-safe serialization** - Each implementation is tagged with its type name +- **Extensibility** - Custom implementations can be serialized without modifying core code +- **Cross-language support** - Serialized format is JSON-compatible +- **Replaces FileIOBuilder/Extensions** - Serializable storage implementations eliminate the need for separate builder patterns + +Example usage: + +```rust +// Serialize a storage instance +let storage: Arc = Arc::new(OpenDALS3Storage::new(config)); +let json = serde_json::to_string(&storage)?; + +// Deserialize back to trait object +let storage: Arc = serde_json::from_str(&json)?; +``` + ## Example Usage ```rust @@ -409,19 +477,15 @@ This question depends on the answer to Question 1: **Option A: Enum-Based Errors** ```rust -#[derive(Debug, thiserror::Error)] -pub enum StorageError { - #[error("File not found: {path}")] - NotFound { path: String }, - - #[error("Permission denied: {path}")] - PermissionDenied { path: String }, - - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - - #[error("Backend error: {0}")] - Backend(Box), +pub enum IoErrorKind { + FileNotFound, + CredentialExpired, +} + +pub enum ErrorKind { + // Existing variants + ... + Io(IoErrorKind) } ``` - Pros: Type-safe, pattern matching, clear error categories From 5e7e4b81840756d27d28f1d14a5bfbffc64bef98 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 16 Dec 2025 13:25:31 -0800 Subject: [PATCH 7/8] update rfc to have concrete phase 1 implementation and open to discuss phase 2 --- docs/rfcs/0002_storage_trait.md | 1043 +++++++++++++++++++++---------- 1 file changed, 701 insertions(+), 342 deletions(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index 3305592471..a54f0d6730 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -22,119 +22,153 @@ ## Background ### Existing Implementation -The existing code implements storage functionality through a concrete Storage enum that handles different storage backends (S3, local filesystem, GCS, etc.). This implementation is tightly coupled with OpenDAL as the underlying storage layer. The FileIO struct wraps this Storage enum and provides a high-level API for file operations. -Original structure: +The existing code implements storage functionality through a concrete `Storage` enum that handles different storage backends (S3, local filesystem, GCS, etc.). This implementation is tightly coupled with OpenDAL as the underlying storage layer. The `FileIO` struct wraps this `Storage` enum and provides a high-level API for file operations. -- **FileIO:** Main interface for file operations -- **Storage:** Enum with variants for different storage backends -- **InputFile / OutputFile:** Concrete structs for reading and writing files +```rust +// Current: Concrete enum with variants for each backend +pub(crate) enum Storage { + #[cfg(feature = "storage-memory")] + Memory(Operator), + #[cfg(feature = "storage-fs")] + LocalFs, + #[cfg(feature = "storage-s3")] + S3 { + configured_scheme: String, + config: Arc, + customized_credential_load: Option, + }, + #[cfg(feature = "storage-gcs")] + Gcs { config: Arc }, + // ... other variants +} + +impl Storage { + pub(crate) fn create_operator<'a>(&self, path: &'a impl AsRef) + -> crate::Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-s3")] + Storage::S3 { configured_scheme, config, customized_credential_load } => { + // S3-specific operator creation + } + // ... other match arms + } + } +} +``` -All storage operations are implemented directly in these concrete types, making it hard to extend or customize the storage layer without modifying the core codebase. +Current structure: + +- **FileIO:** Main interface for file operations, wraps `Arc` +- **Storage:** Enum with variants for different storage backends +- **InputFile / OutputFile:** Concrete structs that hold an `Operator` and path ### Problem Statement + The original design has several limitations: - **Tight Coupling** – All storage logic depends on OpenDAL, limiting flexibility. Users cannot easily opt in for other storage implementations like `object_store` - **Customization Barriers** – Users cannot easily add custom behaviors or optimizations +- **No Extensibility** – Adding new backends requires modifying the core enum in the `iceberg` crate As discussed in Issue #1314, making Storage a trait would allow pluggable storage and better integration with existing systems. -## Design +### New Trait-Based Design -### New Architecture +The new design replaces the concrete `Storage` enum with a `Storage` trait: -The new architecture uses trait-based abstractions with a registry pattern and separate storage crates: - -``` -┌─────────────────────────────────────────────────────┐ -│ crates/iceberg/src/io/ │ -│ ┌───────────────────────────────────────────────┐ │ -│ │ Storage Trait & Registry │ │ -│ │ - pub trait Storage │ │ -│ │ - pub trait StorageFactory │ │ -│ │ - pub struct StorageRegistry │ │ -│ │ - pub struct InputFile │ │ -│ │ - pub struct OutputFile │ │ -│ └───────────────────────────────────────────────┘ │ -└─────────────────────────────────────────────────────┘ - ▲ - │ - ┌───────────────┼───────────────┬───────────────┐ - │ │ │ │ -┌───────┴────────────┐ │ ┌────────────┴──────┐ ┌────┴──────────┐ -│ crates/storage/ │ │ │ crates/storage/ │ │ Third-Party │ -│ opendal/ │ │ │ object_store/ │ │ Crates │ -│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ │ ┌───────────┐ │ -│ │ opendal-s3 │ │ │ │ │ objstore-s3 │ │ │ │ custom │ │ -│ │ impl Storage │ │ │ │ │ impl Storage │ │ │ │ storage │ │ -│ │ impl Factory │ │ │ │ │ impl Factory │ │ │ │impl traits│ │ -│ └────────────────┘ │ │ │ └───────────────┘ │ │ └───────────┘ │ -│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ └───────────────┘ -│ │ opendal-fs │ │ │ │ │ objstore-gcs │ │ -│ │ impl Storage │ │ │ │ │ impl Storage │ │ -│ │ impl Factory │ │ │ │ │ impl Factory │ │ -│ └────────────────┘ │ │ │ └───────────────┘ │ -│ ┌────────────────┐ │ │ │ ┌───────────────┐ │ -│ │ opendal-gcs │ │ │ │ │ objstore-azure│ │ -│ │ impl Storage │ │ │ │ │ impl Storage │ │ -│ │ impl Factory │ │ │ │ │ impl Factory │ │ -│ └────────────────┘ │ │ │ └───────────────┘ │ -│ ... (oss, azure, │ │ └───────────────────┘ -│ memory) │ │ -└────────────────────┘ │ +```rust +// New: Trait-based abstraction (defined in iceberg crate) +#[async_trait] +pub trait Storage: Debug + Send + Sync { + ... +} ``` +This enables: +- **Pluggable backends** – Users can implement custom storage backends +- **Multiple libraries** – Support for OpenDAL, object_store, or custom implementations +- **Separate crates** – Storage implementations can live in separate crates + +--- + +## Design (Phase 1): Storage Trait and Core Types + +Phase 1 focuses on defining the `Storage` trait and updating `InputFile`/`OutputFile` to use trait-based storage. + ### Storage Trait -The Storage trait defines the interface for all storage operations. This implementation uses Option 2 from the initial design: Storage as a trait with concrete `InputFile` and `OutputFile` structs. + +The `Storage` trait is defined in the `iceberg` crate and defines the interface for all storage operations: ```rust #[async_trait] pub trait Storage: Debug + Send + Sync { - // File existence and metadata + /// Check if a file exists at the given path async fn exists(&self, path: &str) -> Result; + + /// Get metadata for a file async fn metadata(&self, path: &str) -> Result; - // Reading operations + /// Read entire file content async fn read(&self, path: &str) -> Result; + + /// Create a reader for streaming reads async fn reader(&self, path: &str) -> Result>; - // Writing operations + /// Write bytes to a file (overwrites if exists) async fn write(&self, path: &str, bs: Bytes) -> Result<()>; + + /// Create a writer for streaming writes async fn writer(&self, path: &str) -> Result>; - // Deletion operations + /// Delete a single file async fn delete(&self, path: &str) -> Result<()>; + + /// Delete all files under a prefix (directory) async fn delete_prefix(&self, path: &str) -> Result<()>; - // File object creation + /// Create an InputFile handle for the given path fn new_input(&self, path: &str) -> Result; + + /// Create an OutputFile handle for the given path fn new_output(&self, path: &str) -> Result; } ``` -### InputFile and OutputFile Structs +Note: +- All paths are absolute paths with scheme (e.g., `s3://bucket/path`) -`InputFile` and `OutputFile` are concrete structs that contain a reference to the storage: +### InputFile and OutputFile Changes -```rust +`InputFile` and `OutputFile` change from holding an `Operator` directly to holding a reference to the `Storage` trait: +**Before (current implementation):** +```rust pub struct InputFile { - pub storage: Arc, - pub path: String, + op: Operator, + path: String, + relative_path_pos: usize, } -pub struct OutputFile { - pub storage: Arc, - pub path: String, +impl InputFile { + pub async fn read(&self) -> crate::Result { + Ok(self.op.read(&self.path[self.relative_path_pos..]).await?.to_bytes()) + } } ``` -Functions in `InputFile` and `OutputFile` delegate operations to the underlying Storage: - +**After (new design):** ```rust +pub struct InputFile { + storage: Arc, + path: String, +} impl InputFile { + pub fn location(&self) -> &str { + &self.path + } + pub async fn exists(&self) -> Result { self.storage.exists(&self.path).await } @@ -142,403 +176,728 @@ impl InputFile { pub async fn read(&self) -> Result { self.storage.read(&self.path).await } - // ... other methods + + pub async fn metadata(&self) -> Result { + self.storage.metadata(&self.path).await + } + + pub async fn reader(&self) -> Result> { + self.storage.reader(&self.path).await + } } ``` -Benefits of this approach: +Similarly for `OutputFile`: -- Simpler and easier to maintain -- Less trait object overhead -- Clear delegation pattern -- Sufficient flexibility for most use cases +```rust +pub struct OutputFile { + storage: Arc, + path: String, +} -## StorageFactory and Registry +impl OutputFile { + pub fn location(&self) -> &str { &self.path } + + pub async fn exists(&self) -> Result { + self.storage.exists(&self.path).await + } + + pub async fn write(&self, bs: Bytes) -> Result<()> { + self.storage.write(&self.path, bs).await + } + + pub async fn writer(&self) -> Result> { + self.storage.writer(&self.path).await + } + + pub fn to_input_file(self) -> InputFile { + InputFile { storage: self.storage, path: self.path } + } +} +``` + +### FileIO Changes -### StorageFactory Trait -The `StorageFactory` trait defines how storage backends are constructed: +`FileIO` wraps an `Arc` instead of `Arc` enum: ```rust -pub trait StorageFactory: Debug + Send + Sync { - fn build( - &self, - props: HashMap, - extensions: Extensions, - ) -> Result>; +#[derive(Clone, Debug)] +pub struct FileIO { + inner: Arc, } -``` -Key design decisions: +impl FileIO { + /// Create a new FileIO with the given storage implementation + pub fn new(storage: Arc) -> Self { + Self { inner: storage } + } -- Uses `&self` instead of `self` - factories are reusable -- No associated type - returns `Arc` directly -- `Send + Sync` for thread safety + pub async fn delete(&self, path: impl AsRef) -> Result<()> { + self.inner.delete(path.as_ref()).await + } -### StorageRegistry -The registry manages storage factories and provides lookup by scheme: + pub async fn exists(&self, path: impl AsRef) -> Result { + self.inner.exists(path.as_ref()).await + } -```rust -#[derive(Debug, Clone)] -pub struct StorageRegistry { - factories: HashMap>, -} + pub fn new_input(&self, path: impl AsRef) -> Result { + self.inner.new_input(path.as_ref()) + } -impl StorageRegistry { - pub fn new() -> Self { /* ... */ } - pub fn register(&mut self, scheme: impl Into, factory: Arc); - pub fn get_factory(&self, scheme: &str) -> Result>; - pub fn supported_types(&self) -> impl Iterator; + pub fn new_output(&self, path: impl AsRef) -> Result { + self.inner.new_output(path.as_ref()) + } } ``` -Features: +--- -- Automatic registration based on enabled cargo features -- Runtime registration of custom builders -- Case-insensitive scheme lookup -- Thread-safe and cloneable +## Design (Phase 2): Storage Architecture Options -### Serialization with typetag +Phase 2 addresses how storage implementations are organized and how users interact with them. There are two architectural options under consideration. -To enable serialization of `Storage` and `StorageFactory` trait objects, we use the `typetag` crate. This allows dynamic trait objects to be serialized and deserialized, which is essential for: +### Crate Structure (Common to Both Options) -- Persisting storage configurations -- Sending storage instances across process boundaries -- Supporting custom storage implementations in distributed systems +Both options use the same crate structure. The `Storage` trait remains in the `iceberg` crate (as it's an API/interface), while concrete implementations move to `iceberg-storage`: -The traits are annotated with `#[typetag::serde]`: +``` +crates/ +├── iceberg/ # Core Iceberg functionality (APIs/interfaces) +│ └── src/ +│ └── io/ +│ ├── mod.rs +│ ├── storage.rs # Storage trait definition (stays here) +│ └── file_io.rs # FileIO, InputFile, OutputFile +│ +└── iceberg-storage/ # Concrete storage implementations + └── src/ + ├── lib.rs # Re-exports + └── opendal/ # OpenDAL-based implementations + └── ... +``` -```rust -#[async_trait] -#[typetag::serde(tag = "type")] -pub trait Storage: Debug + Send + Sync { - // ... trait methods -} +The `iceberg-storage` crate depends on the `iceberg` crate (for `Storage` trait, `Result`, `Error`, etc.). -#[typetag::serde(tag = "type")] -pub trait StorageFactory: Debug + Send + Sync { - fn build( - &self, - props: HashMap, - extensions: Extensions, - ) -> Result>; -} -``` +--- + +### Option 1: Unified Storage (Multi-Scheme) + +In this option, a single `Storage` implementation (e.g., `OpenDalStorage`) handles multiple URL schemes internally. There is no need for a registry since scheme routing happens within the storage implementation itself. -Each implementation must also use the `#[typetag::serde]` attribute: +#### OpenDalStorage Implementation ```rust +/// Unified OpenDAL-based storage implementation. +/// +/// This storage handles all supported schemes (S3, GCS, Azure, filesystem, memory) +/// through OpenDAL, creating operators on-demand based on the path scheme. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OpenDALS3Storage { - config: Arc, +pub enum OpenDalStorage { + /// In-memory storage, useful for testing + #[cfg(feature = "storage-memory")] + Memory(#[serde(skip, default = "default_memory_op")] Operator), + + /// Local filesystem storage + #[cfg(feature = "storage-fs")] + LocalFs, + + /// Amazon S3 storage + /// Expects paths of the form `s3[a]:///`. + #[cfg(feature = "storage-s3")] + S3 { + configured_scheme: String, + config: Arc, + #[serde(skip)] + customized_credential_load: Option, + }, + + /// Google Cloud Storage + #[cfg(feature = "storage-gcs")] + Gcs { config: Arc }, + + /// Alibaba Cloud OSS + #[cfg(feature = "storage-oss")] + Oss { config: Arc }, + + /// Azure Data Lake Storage + #[cfg(feature = "storage-azdls")] + Azdls { + configured_scheme: AzureStorageScheme, + config: Arc, + }, } -#[async_trait] -#[typetag::serde] -impl Storage for OpenDALS3Storage { - // ... implementation +impl OpenDalStorage { + /// Build storage from FileIOBuilder + pub fn build(file_io_builder: FileIOBuilder) -> Result { + let (scheme_str, props, extensions) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory(memory_config_build()?)), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + #[cfg(feature = "storage-s3")] + Scheme::S3 => Ok(Self::S3 { + configured_scheme: scheme_str, + config: s3_config_parse(props)?.into(), + customized_credential_load: extensions + .get::() + .map(Arc::unwrap_or_clone), + }), + // ... other schemes + } + } + + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Self::Memory(op) => { /* ... */ } + #[cfg(feature = "storage-fs")] + Self::LocalFs => { /* ... */ } + #[cfg(feature = "storage-s3")] + Self::S3 { configured_scheme, config, customized_credential_load } => { /* ... */ } + // ... other variants + } + } } -#[derive(Debug, Serialize, Deserialize)] -pub struct OpenDALS3StorageFactory; - -#[typetag::serde] -impl StorageFactory for OpenDALS3StorageFactory { - // ... implementation +#[async_trait] +impl Storage for OpenDalStorage { + async fn exists(&self, path: &str) -> Result { + let (op, rel_path) = self.create_operator(path)?; + Ok(op.exists(rel_path).await?) + } + + async fn read(&self, path: &str) -> Result { + let (op, rel_path) = self.create_operator(path)?; + Ok(op.read(rel_path).await?.to_bytes()) + } + + // ... implement other Storage trait methods } ``` -Benefits: +#### StorageRouter Helper -- **Type-safe serialization** - Each implementation is tagged with its type name -- **Extensibility** - Custom implementations can be serialized without modifying core code -- **Cross-language support** - Serialized format is JSON-compatible -- **Replaces FileIOBuilder/Extensions** - Serializable storage implementations eliminate the need for separate builder patterns - -Example usage: +For users who want to compose multiple storage implementations (e.g., use OpenDAL for S3 but a custom storage for a proprietary system), we provide a `StorageRouter` helper. Note: `StorageRouter` does NOT implement `Storage` trait; it's a helper for resolving paths within custom storage implementations. ```rust -// Serialize a storage instance -let storage: Arc = Arc::new(OpenDALS3Storage::new(config)); -let json = serde_json::to_string(&storage)?; +/// Helper for routing paths to different storage implementations. +/// This is NOT a Storage implementation itself, but a utility for building custom storages. +#[derive(Debug, Clone)] +pub struct StorageRouter { + routes: DashMap>, // prefix -> storage +} -// Deserialize back to trait object -let storage: Arc = serde_json::from_str(&json)?; +impl StorageRouter { + pub fn new() -> Self { + Self { routes: DashMap::new() } + } + + /// Register a storage for a URL prefix (e.g., "s3://", "s3://my-bucket/", "gs://") + pub fn register(&self, prefix: impl Into, storage: Arc) { + self.routes.insert(prefix.into(), storage); + } + + /// Resolve which storage should handle the given path. + /// Returns the storage registered with the longest matching prefix. + pub fn resolve(&self, path: &str) -> Result> { + let mut best_match: Option<(usize, Arc)> = None; + + for entry in self.routes.iter() { + if path.starts_with(entry.key()) { + let prefix_len = entry.key().len(); + if best_match.is_none() || prefix_len > best_match.as_ref().unwrap().0 { + best_match = Some((prefix_len, entry.value().clone())); + } + } + } + + best_match + .map(|(_, storage)| storage) + .ok_or_else(|| Error::new( + ErrorKind::FeatureUnsupported, + format!("No storage configured for path: {}", path), + )) + } +} ``` -## Example Usage +#### Example: Custom Storage Using StorageRouter ```rust -use iceberg::io::FileIOBuilder; +use iceberg::io::{Storage, FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; +use iceberg::{Result, Error, ErrorKind}; +use iceberg_storage::{StorageRouter, OpenDalStorage}; +use async_trait::async_trait; +use bytes::Bytes; +use std::sync::Arc; -// Basic usage (same as the existing code) -let file_io = FileIOBuilder::new("s3") - .with_prop("s3.region", "us-west-2") - .build()?; +/// Custom storage that routes S3 to OpenDAL and proprietary URLs to custom backend +#[derive(Debug, Clone)] +pub struct MyCompanyStorage { + router: StorageRouter, +} -// Custom storage registration -use iceberg::io::{StorageRegistry, StorageFactory}; +impl MyCompanyStorage { + pub fn new(s3_config: S3Config, prop_client: ProprietaryClient) -> Result { + let router = StorageRouter::new(); + + // Route S3 URLs to OpenDAL + let opendal = Arc::new(OpenDalStorage::S3 { + configured_scheme: "s3".to_string(), + config: Arc::new(s3_config), + customized_credential_load: None, + }); + router.register("s3://", opendal.clone()); + router.register("s3a://", opendal); + + // Route proprietary URLs to custom storage + let prop_storage = Arc::new(ProprietaryStorage::new(prop_client)); + router.register("prop://", prop_storage); + + Ok(Self { router }) + } +} -let mut registry = StorageRegistry::new(); -registry.register("custom", Arc::new(MyCustomStorageFactory)); +#[async_trait] +impl Storage for MyCompanyStorage { + async fn exists(&self, path: &str) -> Result { + self.router.resolve(path)?.exists(path).await + } + + async fn read(&self, path: &str) -> Result { + self.router.resolve(path)?.read(path).await + } + + // ... delegate all methods to router.resolve(path)? + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } +} -// Check supported types -let supported: Vec<_> = registry.supported_types().collect(); -println!("Supported: {:?}", supported); +// Usage +fn create_file_io() -> Result { + let storage = MyCompanyStorage::new(s3_config, prop_client)?; + Ok(FileIO::new(Arc::new(storage))) +} ``` -## Storage Implementations -Each storage backend has its own implementation: +--- + +### Option 2: Scheme-Specific Storage with Registry -| Storage | File | Struct | Factory | Schemes | -|---------|------|--------|---------|---------| -| S3 | `storage_s3.rs` | `OpenDALS3Storage` | `OpenDALS3StorageFactory` | `s3`, `s3a` | -| GCS | `storage_gcs.rs` | `OpenDALGcsStorage` | `OpenDALGcsStorageFactory` | `gs`, `gcs` | -| OSS | `storage_oss.rs` | `OpenDALOssStorage` | `OpenDALOssStorageFactory` | `oss` | -| Azure | `storage_azdls.rs` | `OpenDALAzdlsStorage` | `OpenDALAzdlsStorageFactory` | `abfs`, `abfss`, `wasb`, `wasbs` | -| Filesystem | `storage_fs.rs` | `OpenDALFsStorage` | `OpenDALFsStorageFactory` | `file`, `""` | -| Memory | `storage_memory.rs` | `OpenDALMemoryStorage` | `OpenDALMemoryStorageFactory` | `memory` | +In this option, each `Storage` implementation handles only one scheme (or a small set of related schemes like `s3`/`s3a`). A `StorageRegistry` maps URL schemes/prefixes to their corresponding storage implementations. -### Implementation Pattern -Each storage follows this consistent pattern: +#### Crate Structure for Option 2 + +``` +crates/iceberg-storage/ +└── src/ + ├── lib.rs + ├── registry.rs # StorageRegistry + └── opendal/ + ├── mod.rs + ├── s3.rs # OpenDalS3Storage + ├── gcs.rs # OpenDalGcsStorage + ├── fs.rs # OpenDalFsStorage + ├── memory.rs # OpenDalMemoryStorage + ├── oss.rs # OpenDalOssStorage + └── azdls.rs # OpenDalAzdlsStorage +``` + +#### StorageRegistry + +The registry maps URL prefixes to storage implementations using `DashMap` for thread-safe concurrent access. Unlike `StorageRouter` which is a helper for custom `Storage` implementations, `StorageRegistry` is designed to be passed directly to `Catalog` implementations. ```rust -// 1. Storage struct with configuration -#[derive(Debug, Clone)] -pub struct OpenDALXxxStorage { - config: Arc, +/// Registry that maps URL prefixes to storage implementations. +/// Can be passed directly to Catalog implementations. +#[derive(Debug, Clone, Default)] +pub struct StorageRegistry { + storages: DashMap>, } -// 2. Helper method to create operator -impl OpenDALXxxStorage { - fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { - // Create OpenDAL operator with retry layer +impl StorageRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Register a storage for a URL prefix (e.g., "s3://", "s3://my-bucket/", "gs://") + pub fn register(&self, prefix: impl Into, storage: Arc) { + self.storages.insert(prefix.into(), storage); + } + + /// Get storage for a path. + /// Returns the storage registered with the longest matching prefix. + pub fn get(&self, path: &str) -> Result> { + let mut best_match: Option<(usize, Arc)> = None; + + for entry in self.storages.iter() { + if path.starts_with(entry.key()) { + let prefix_len = entry.key().len(); + if best_match.is_none() || prefix_len > best_match.as_ref().unwrap().0 { + best_match = Some((prefix_len, entry.value().clone())); + } + } + } + + best_match + .map(|(_, storage)| storage) + .ok_or_else(|| Error::new( + ErrorKind::FeatureUnsupported, + format!("No storage registered for path '{}'. Registered prefixes: {:?}", + path, self.list_prefixes()), + )) + } + + /// List all registered prefixes + pub fn list_prefixes(&self) -> Vec { + self.storages.iter().map(|e| e.key().clone()).collect() } } +``` -// 3. Storage trait implementation -#[async_trait] -impl Storage for OpenDALXxxStorage { - async fn exists(&self, path: &str) -> Result { /* ... */ } - async fn metadata(&self, path: &str) -> Result { /* ... */ } +#### Scheme-Specific Storage Implementation - // ... implement all 10 methods +```rust +/// S3-specific storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDalS3Storage { + config: Arc, + credential_loader: Option>, } -// 4. Factory struct -#[derive(Debug)] -pub struct OpenDALXxxStorageFactory; +impl OpenDalS3Storage { + pub fn new(props: HashMap) -> Result { + let config = parse_s3_config(&props)?; + Ok(Self { + config: Arc::new(config), + credential_loader: None, + }) + } + + pub fn with_credential_loader(mut self, loader: Arc) -> Self { + self.credential_loader = Some(loader); + self + } + + fn create_operator(&self, path: &str) -> Result<(Operator, &str)> { + // Validate scheme + if !path.starts_with("s3://") && !path.starts_with("s3a://") { + return Err(Error::new(ErrorKind::DataInvalid, + format!("OpenDalS3Storage only handles s3:// URLs, got: {}", path))); + } + + let bucket = extract_bucket(path)?; + let mut builder = S3::from_config((*self.config).clone()); + builder.bucket(&bucket); + + let op = Operator::new(builder)?.layer(RetryLayer::new()).finish(); + let rel_path = extract_relative_path(path)?; + Ok((op, rel_path)) + } +} -// 5. Factory trait implementation -impl StorageFactory for OpenDALXxxStorageFactory { - fn build(&self, props, extensions) -> Result> { - // Parse configuration and create storage +#[async_trait] +impl Storage for OpenDalS3Storage { + async fn exists(&self, path: &str) -> Result { + let (op, rel_path) = self.create_operator(path)?; + Ok(op.exists(rel_path).await?) + } + + async fn read(&self, path: &str) -> Result { + let (op, rel_path) = self.create_operator(path)?; + Ok(op.read(rel_path).await?.to_bytes()) + } + + // ... implement other Storage trait methods + + fn new_input(&self, path: &str) -> Result { + if !path.starts_with("s3://") && !path.starts_with("s3a://") { + return Err(Error::new(ErrorKind::DataInvalid, + format!("Invalid S3 path: {}", path))); + } + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> Result { + if !path.starts_with("s3://") && !path.starts_with("s3a://") { + return Err(Error::new(ErrorKind::DataInvalid, + format!("Invalid S3 path: {}", path))); + } + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } ``` -### Component Flow +#### Example: Using StorageRegistry with Catalog -``` -User Code - ↓ -FileIOBuilder::new("s3") - ↓ -FileIOBuilder::build() - ↓ -StorageRegistry::new() - ↓ -registry.get_factory("s3") - ↓ -OpenDALS3StorageFactory - ↓ -factory.build(props, extensions) - ↓ -OpenDALS3Storage (implements Storage) - ↓ -FileIO { inner: Arc } +```rust +use iceberg_storage::{StorageRegistry, OpenDalS3Storage, OpenDalGcsStorage, OpenDalFsStorage}; +use std::sync::Arc; + +fn create_catalog_with_registry() -> Result { + let registry = StorageRegistry::new(); + + // Register scheme-specific storages + let s3_storage = Arc::new(OpenDalS3Storage::new(s3_props)?); + registry.register("s3://", s3_storage.clone()); + registry.register("s3a://", s3_storage); + + let gcs_storage = Arc::new(OpenDalGcsStorage::new(gcs_props)?); + registry.register("gs://", gcs_storage.clone()); + registry.register("gcs://", gcs_storage); + + let fs_storage = Arc::new(OpenDalFsStorage::new()?); + registry.register("file://", fs_storage); + + // Pass registry directly to catalog + GlueCatalog::new(config, registry).await +} ``` -## Custom Storage Implementation -To implement a custom storage backend: +#### Example: Per-Bucket Configuration ```rust -use std::collections::HashMap; -use std::sync::Arc; -use async_trait::async_trait; -use bytes::Bytes; -use iceberg::io::{ - Storage, StorageFactory, Extensions, FileMetadata, - FileRead, FileWrite, InputFile, OutputFile -}; +fn create_multi_account_catalog() -> Result { + let registry = StorageRegistry::new(); + + // Different S3 configurations for different buckets + let prod_s3 = Arc::new(OpenDalS3Storage::new(prod_s3_props)?); + let staging_s3 = Arc::new(OpenDalS3Storage::new(staging_s3_props)?); + + // Register specific bucket prefixes (longest match wins) + registry.register("s3://prod-bucket/", prod_s3.clone()); + registry.register("s3://staging-bucket/", staging_s3); + + // Fallback for other S3 URLs + registry.register("s3://", prod_s3); + + // Pass registry directly to catalog + GlueCatalog::new(config, registry).await +} +``` -use iceberg::Result; +--- -// 1. Define your storage struct -#[derive(Debug, Clone)] -struct MyCustomStorage { - // Your configuration -} +### Comparison of Options -// 2. Implement the Storage trait -#[async_trait] -impl Storage for MyCustomStorage { - async fn exists(&self, path: &str) -> Result { - // Your implementation - } - // ... implement all 10 methods -} +| Aspect | Option 1: Unified Storage | Option 2: Scheme-Specific + Registry | +|--------|---------------------------|--------------------------------------| +| **Crate Structure** | Single `OpenDalStorage` enum | Multiple `OpenDalXxxStorage` structs | +| **Routing** | Internal (via `StorageRouter`) | External (via `StorageRegistry`) | +| **Custom Storage** | Implement all schemes in one type | Implement one scheme per type | +| **Composition** | Use `StorageRouter` helper within Storage impl | Pass `StorageRegistry` to Catalog | +| **Type Safety** | Less (runtime scheme checking) | More (compile-time per storage) | +| **Code Organization** | Centralized | Modular | -// 3. Define your factory -#[derive(Debug)] -struct MyCustomStorageFactory; +--- -// 4. Implement StorageFactory -impl StorageFactory for MyCustomStorageFactory { - fn build( - &self, - props: HashMap, - extensions: Extensions, - ) -> Result> { - // Parse configuration from props - Ok(Arc::new(MyCustomStorage::new(props)?)) - } +## Catalog API Changes -} +The `Catalog` trait currently does not expose storage directly. However, catalogs internally use `FileIO` to read/write table metadata. With the storage trait abstraction, catalogs will need to be updated to work with the new `FileIO` that wraps `Arc`. -// 5. Register your factory -let mut registry = StorageRegistry::new(); -registry.register("custom", Arc::new(MyCustomStorageFactory)); -``` +### Current Catalog Usage -## Open Questions +```rust +// Current: Catalog implementations create FileIO internally +impl GlueCatalog { + pub async fn new(config: GlueCatalogConfig) -> Result { + let file_io = FileIOBuilder::new(&config.warehouse) + .with_props(config.props.clone()) + .build()?; + // ... + } +} +``` -These design decisions need to be resolved during implementation: +### Updated Catalog Usage -### 1. Storage Granularity (Phase 1) +The catalog API changes differ between the two architecture options: -**Question:** Should we have a general `Storage` implementation that works with multiple schemes, or specific implementations per scheme? +#### For Option 1 (Unified Storage) -**Option A: General Storage (e.g., `OpenDALStorage`)** -- Single implementation handles all schemes (s3, gcs, fs, etc.) -- Scheme detection happens at runtime -- Simpler codebase with less duplication +Catalogs accept `Arc` directly: ```rust -#[derive(Debug, Clone)] -pub struct OpenDALStorage { - operator: Operator, - scheme: String, +// Catalog accepts Storage directly +impl GlueCatalog { + pub async fn new(config: GlueCatalogConfig, storage: Arc) -> Result { + let file_io = FileIO::new(storage); + // ... + } } + +// Usage +let storage = Arc::new(OpenDalStorage::build(builder)?); +let catalog = GlueCatalog::new(config, storage).await?; ``` -**Option B: Scheme-Specific Storage (e.g., `OpenDALS3Storage`, `OpenDALGcsStorage`)** -- Each storage backend has its own implementation -- Type-safe configuration per backend -- Better compile-time guarantees -- More explicit and easier to optimize per backend +#### For Option 2 (Scheme-Specific + Registry) + +Catalogs accept `StorageRegistry` directly, which allows the catalog to resolve the appropriate storage for each table location: ```rust -#[derive(Debug, Clone)] -pub struct OpenDALS3Storage { - config: Arc, +// Catalog accepts StorageRegistry +impl GlueCatalog { + pub async fn new(config: GlueCatalogConfig, registry: StorageRegistry) -> Result { + // Registry is stored and used to resolve storage for each table location + // ... + } + + pub async fn load_table(&self, table: &TableIdent) -> Result { + let metadata_location = self.get_metadata_location(table)?; + // Use registry to get the right storage for this table's location + let storage = self.registry.get(&metadata_location)?; + let file_io = FileIO::new(storage); + // ... + } } -#[derive(Debug, Clone)] -pub struct OpenDALGcsStorage { - config: Arc, +// Usage +let registry = StorageRegistry::new(); +registry.register("s3://", Arc::new(OpenDalS3Storage::new(s3_props)?)); +registry.register("gs://", Arc::new(OpenDalGcsStorage::new(gcs_props)?)); +let catalog = GlueCatalog::new(config, registry).await?; +``` + +### CatalogBuilder Changes + +The `CatalogBuilder` trait may need to accept storage configuration: + +```rust +pub trait CatalogBuilder: Default + Debug + Send + Sync { + type C: Catalog; + + /// Build catalog with default storage (from props) + fn build(self, name: impl Into, props: HashMap) + -> impl Future> + Send; + + /// Build catalog with custom storage (Option 1) + fn build_with_storage( + self, + name: impl Into, + props: HashMap, + storage: Arc, + ) -> impl Future> + Send; + + /// Build catalog with storage registry (Option 2) + fn build_with_registry( + self, + name: impl Into, + props: HashMap, + registry: StorageRegistry, + ) -> impl Future> + Send; } ``` -**Current Implementation:** The RFC describes Option B (scheme-specific) +This allows users to: +1. Use default storage behavior (backward compatible) +2. Inject custom storage implementations for testing or custom backends + +--- -### 2. Registry Location (Phase 1) +## Open Questions + +1. **Which architecture option should we adopt?** + - Option 1 (Unified Storage) is simpler and closer to current implementation + - Option 2 (Scheme-Specific + Registry) is more modular and type-safe + +2. **How should catalogs receive storage?** + - Accept `Arc` directly? + - Accept `FileIO`? + - Accept `StorageRegistry` (Option 2)? + - Keep current `FileIOBuilder` approach for backward compatibility? + +3. **Should `StorageRouter`/`StorageRegistry` support prefix-based routing?** + - e.g., different storage for `s3://bucket-a/` vs `s3://bucket-b/` + - Current design supports this, but is it needed? -**Question:** Where should the `StorageRegistry` live? +4. **How should credentials/extensions be passed?** + - Current design uses `Extensions` type in `FileIOBuilder` + - Should this be part of storage construction or a separate concern? -This question depends on the answer to Question 1: +5. **Should we provide default implementations?** + - e.g., `OpenDalStorage` that handles all schemes out of the box + - For users who don't need customization -**If Option A (General Storage):** -- Do we even need a registry? A single `OpenDALStorage` could handle all schemes internally -- Registration should happen when the crate is loaded and a registry will not be necessary +--- -**If Option B (Scheme-Specific Storage):** -- **Option 2a: Global Static Registry** - Single process-wide registry with lazy initialization - - Pros: Simple to use, no need to pass registry around - - Cons: Global state, harder to test, potential initialization ordering issues - -- **Option 2b: Catalog-Owned Registry** - Each catalog instance owns its registry - - Pros: Better encapsulation, easier testing, no global state - - Cons: More complex API, need to pass registry through layers +## Other Considerations -### 3. Error Handling Strategy (Phase 2) +### Storage Error Handling -**Question:** How should storage errors be represented? +After the storage trait is stabilized, we may want to introduce more specific error handling for storage operations. Currently, storage errors are wrapped in the general `Error` type with `ErrorKind`. A future enhancement could add storage-specific error kinds to better handle different failure scenarios from various backends. -**Option A: Enum-Based Errors** ```rust +/// Storage-specific error kinds pub enum IoErrorKind { + /// File or object not found FileNotFound, + /// Credentials expired or invalid CredentialExpired, + /// Permission denied + PermissionDenied, + /// Network or connectivity error + NetworkError, + /// Storage service unavailable + ServiceUnavailable, + /// Rate limit exceeded + RateLimitExceeded, } pub enum ErrorKind { - // Existing variants - ... - Io(IoErrorKind) + // Existing variants... + DataInvalid, + FeatureUnsupported, + // ... + + /// Storage I/O error with specific kind + Io(IoErrorKind), } ``` -- Pros: Type-safe, pattern matching, clear error categories -- Cons: Need to map all backend errors to enum variants - -**Option B: Trait-Based Errors with Into/From** -```rust -pub trait StorageError: std::error::Error + Send + Sync + 'static { - fn is_not_found(&self) -> bool; - fn is_permission_denied(&self) -> bool; -} -// Each backend implements its own error type -impl StorageError for OpenDALError { /* ... */ } -impl StorageError for ObjectStoreError { /* ... */ } -``` -- Pros: Flexible, preserves original error information, easier for backends -- Cons: Less type-safe, harder to handle errors uniformly +This would allow users to handle specific storage errors more precisely: -**Option C: Hybrid Approach** ```rust -#[derive(Debug, thiserror::Error)] -pub enum StorageError { - #[error("File not found: {path}")] - NotFound { path: String }, - - #[error(transparent)] - Other(#[from] Box), +match result { + Err(e) if matches!(e.kind(), ErrorKind::Io(IoErrorKind::FileNotFound)) => { + // Handle missing file + } + Err(e) if matches!(e.kind(), ErrorKind::Io(IoErrorKind::CredentialExpired)) => { + // Refresh credentials and retry + } + Err(e) => { + // Handle other errors + } + Ok(data) => { /* ... */ } } ``` -- Pros: Common errors are typed, uncommon errors are wrapped -- Cons: Some loss of type information for wrapped errors -**Recommendation:** Option C (Hybrid) provides a good balance, with common errors like `NotFound` being strongly typed while allowing backends to preserve their specific error details. +--- ## Implementation Plan -- **Phase 1 (Initial Implementation):** - - Define core traits (Storage, optionally InputFile/OutputFile) - - Implement StorageFactory + StorageRegistry - - Refactor OpenDAL to use traits - - **Resolve:** Question 1 (Storage granularity) and Question 2 (Registry location) - - **Decision needed:** Choose between general vs. scheme-specific storage implementations - -- **Phase 2 (Stabilize Storage Trait):** - - Move concrete storage implementations to a separate crate `iceberg-storage` - - Add new API: `delete_iter` to provide batch delete operations - - Remove `FileIOBuilder` and `Extensions` - serializable custom Storage implementations should handle their use cases - - **Resolve:** Question 3 (Error handling strategy) - - **Decision needed:** Define `StorageError` type and conversion strategy if needed - - Stabilize the Storage trait API for long-term compatibility - -- **Phase 3:** Add object_store + other backends - - Implement `object_store`-based storage backends in separate crates - - Validate error handling works across different backend implementations - - Ensure the stabilized Storage trait works well with alternative implementations +### Phase 1: Storage Trait (Current Branch) +- Define `Storage` trait in `iceberg` crate +- Update `InputFile`/`OutputFile` to use `Arc` +- Update `FileIO` to wrap `Arc` +- Implement `Storage` for existing `Storage` enum (backward compatibility) + +### Phase 2: Concrete Implementations +- Create `iceberg-storage` crate +- Move/implement `OpenDalStorage` (Option 1) or scheme-specific storages (Option 2) +- Update catalog implementations to accept storage +- Consider introducing `IoErrorKind` for storage-specific error handling + +### Phase 3: Additional Backends +- Add `object_store`-based implementations +- Document custom storage implementation guide From 8d9ec3a99270f95e6ef6cbdaa1c70410222ffdd4 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 16 Dec 2025 13:30:24 -0800 Subject: [PATCH 8/8] minor --- docs/rfcs/0002_storage_trait.md | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md index a54f0d6730..d81c57c562 100644 --- a/docs/rfcs/0002_storage_trait.md +++ b/docs/rfcs/0002_storage_trait.md @@ -42,19 +42,6 @@ pub(crate) enum Storage { Gcs { config: Arc }, // ... other variants } - -impl Storage { - pub(crate) fn create_operator<'a>(&self, path: &'a impl AsRef) - -> crate::Result<(Operator, &'a str)> { - match self { - #[cfg(feature = "storage-s3")] - Storage::S3 { configured_scheme, config, customized_credential_load } => { - // S3-specific operator creation - } - // ... other match arms - } - } -} ``` Current structure: @@ -886,13 +873,14 @@ match result { ## Implementation Plan -### Phase 1: Storage Trait (Current Branch) +### Phase 1: Storage Trait (Initial cut and stabilization) - Define `Storage` trait in `iceberg` crate - Update `InputFile`/`OutputFile` to use `Arc` - Update `FileIO` to wrap `Arc` - Implement `Storage` for existing `Storage` enum (backward compatibility) +- Potential new API like `delete_iter` -### Phase 2: Concrete Implementations +### Phase 2: Separate trait and concrete implementations - Create `iceberg-storage` crate - Move/implement `OpenDalStorage` (Option 1) or scheme-specific storages (Option 2) - Update catalog implementations to accept storage