Skip to content

Commit b5ae637

Browse files
committed
typetag serde for Storage
1 parent 4ecae74 commit b5ae637

File tree

11 files changed

+212
-19
lines changed

11 files changed

+212
-19
lines changed

Cargo.lock

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ toml = "0.8"
128128
tracing = "0.1.41"
129129
tracing-subscriber = "0.3.20"
130130
typed-builder = "0.20"
131+
typetag = "0.2"
131132
url = "2.5.7"
132133
uuid = { version = "1.18", features = ["v7"] }
133134
volo = "0.10.6"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ strum = { workspace = true, features = ["derive"] }
9090
thrift = { workspace = true }
9191
tokio = { workspace = true, optional = false, features = ["sync"] }
9292
typed-builder = { workspace = true }
93+
typetag = { workspace = true }
9394
url = { workspace = true }
9495
uuid = { workspace = true }
9596
zstd = { workspace = true }

crates/iceberg/src/io/file_io.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ mod tests {
523523
use bytes::Bytes;
524524
use futures::AsyncReadExt;
525525
use futures::io::AllowStdIo;
526+
use serde::{Deserialize, Serialize};
526527
use tempfile::TempDir;
527528

528529
use super::{FileIO, FileIOBuilder};
@@ -533,12 +534,17 @@ mod tests {
533534
use crate::{Error, ErrorKind, Result};
534535

535536
// Test storage implementation that tracks write operations
536-
#[derive(Debug, Clone)]
537+
#[derive(Debug, Clone, Serialize, Deserialize)]
537538
struct TestStorage {
539+
#[serde(skip, default = "default_written")]
538540
written: Arc<Mutex<Vec<String>>>,
539541
received_props: HashMap<String, String>,
540542
}
541543

544+
fn default_written() -> Arc<Mutex<Vec<String>>> {
545+
Arc::new(Mutex::new(Vec::new()))
546+
}
547+
542548
#[allow(dead_code)]
543549
impl TestStorage {
544550
pub fn written(&self) -> MutexGuard<'_, Vec<String>> {
@@ -551,6 +557,7 @@ mod tests {
551557
}
552558

553559
#[async_trait]
560+
#[typetag::serde]
554561
impl Storage for TestStorage {
555562
async fn exists(&self, _path: &str) -> Result<bool> {
556563
Ok(true)

crates/iceberg/src/io/storage.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ use crate::Result;
3636
/// This trait defines the interface for all storage backends. Implementations
3737
/// provide access to different storage systems like S3, GCS, Azure, local filesystem, etc.
3838
///
39+
/// The trait supports serialization via `typetag`, allowing storage instances to be
40+
/// serialized and deserialized across process boundaries.
41+
///
3942
/// # Example
4043
///
4144
/// ```rust,ignore
@@ -51,6 +54,7 @@ use crate::Result;
5154
/// }
5255
/// ```
5356
#[async_trait]
57+
#[typetag::serde(tag = "type")]
5458
pub trait Storage: Debug + Send + Sync {
5559
/// Check if a file exists at the given path
5660
async fn exists(&self, path: &str) -> Result<bool>;

crates/iceberg/src/io/storage_azdls.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ pub(crate) fn azdls_create_operator<'a>(
132132
/// paths are expected to contain the `dfs` storage service.
133133
/// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are
134134
/// expected to contain the `blob` storage service.
135-
#[derive(Debug, Clone, PartialEq)]
135+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136136
pub(crate) enum AzureStorageScheme {
137137
Abfs,
138138
Abfss,
@@ -606,7 +606,9 @@ mod tests {
606606
}
607607

608608
/// Azure Data Lake Storage implementation using OpenDAL
609-
#[derive(Debug, Clone)]
609+
///
610+
/// Stores configuration and creates operators on-demand.
611+
#[derive(Debug, Clone, Serialize, Deserialize)]
610612
pub struct OpenDALAzdlsStorage {
611613
/// Because Azdls accepts multiple possible schemes, we store the full
612614
/// passed scheme here to later validate schemes passed via paths.
@@ -625,6 +627,7 @@ impl OpenDALAzdlsStorage {
625627
}
626628

627629
#[async_trait]
630+
#[typetag::serde]
628631
impl Storage for OpenDALAzdlsStorage {
629632
async fn exists(&self, path: &str) -> Result<bool> {
630633
let (op, relative_path) = self.create_operator(path)?;

crates/iceberg/src/io/storage_fs.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use async_trait::async_trait;
2222
use bytes::Bytes;
2323
use opendal::Operator;
2424
use opendal::services::FsConfig;
25+
use serde::{Deserialize, Serialize};
2526

2627
use crate::Result;
2728
use crate::io::{
@@ -37,7 +38,9 @@ pub(crate) fn fs_config_build() -> Result<Operator> {
3738
}
3839

3940
/// Filesystem storage implementation using OpenDAL
40-
#[derive(Debug, Clone)]
41+
///
42+
/// This storage is stateless and creates operators on-demand.
43+
#[derive(Debug, Clone, Serialize, Deserialize)]
4144
pub struct OpenDALFsStorage;
4245

4346
impl OpenDALFsStorage {
@@ -52,6 +55,7 @@ impl OpenDALFsStorage {
5255
}
5356

5457
#[async_trait]
58+
#[typetag::serde]
5559
impl Storage for OpenDALFsStorage {
5660
async fn exists(&self, path: &str) -> Result<bool> {
5761
let relative_path = self.extract_relative_path(path);
@@ -131,3 +135,27 @@ impl StorageFactory for OpenDALFsStorageFactory {
131135
Ok(Arc::new(OpenDALFsStorage))
132136
}
133137
}
138+
139+
#[cfg(test)]
140+
mod tests {
141+
use super::*;
142+
use crate::io::Storage;
143+
144+
#[test]
145+
fn test_fs_storage_serialization() {
146+
// Create a filesystem storage instance using the factory
147+
let factory = OpenDALFsStorageFactory;
148+
let storage = factory
149+
.build(HashMap::new(), Extensions::default())
150+
.unwrap();
151+
152+
// Serialize the storage
153+
let serialized = serde_json::to_string(&storage).unwrap();
154+
155+
// Deserialize the storage
156+
let deserialized: Box<dyn Storage> = serde_json::from_str(&serialized).unwrap();
157+
158+
// Verify the type is correct
159+
assert!(format!("{:?}", deserialized).contains("OpenDALFsStorage"));
160+
}
161+
}

crates/iceberg/src/io/storage_gcs.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use async_trait::async_trait;
2323
use bytes::Bytes;
2424
use opendal::Operator;
2525
use opendal::services::GcsConfig;
26+
use serde::{Deserialize, Serialize};
2627
use url::Url;
2728

2829
use crate::io::{
@@ -112,7 +113,9 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result<Operator>
112113
}
113114

114115
/// GCS storage implementation using OpenDAL
115-
#[derive(Debug, Clone)]
116+
///
117+
/// Stores configuration and creates operators on-demand.
118+
#[derive(Debug, Clone, Serialize, Deserialize)]
116119
pub struct OpenDALGcsStorage {
117120
config: Arc<GcsConfig>,
118121
}
@@ -136,6 +139,7 @@ impl OpenDALGcsStorage {
136139
}
137140

138141
#[async_trait]
142+
#[typetag::serde]
139143
impl Storage for OpenDALGcsStorage {
140144
async fn exists(&self, path: &str) -> Result<bool> {
141145
let (op, relative_path) = self.create_operator(path)?;

0 commit comments

Comments
 (0)