diff --git a/Cargo.lock b/Cargo.lock index 2d464709fd..4d1fa9bd43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2632,6 +2632,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "errno" version = "0.3.14" @@ -3398,6 +3409,7 @@ dependencies = [ "tempfile", "tokio", "typed-builder", + "typetag", "url", "uuid", "zstd", @@ -3786,6 +3798,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -7110,12 +7131,42 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "typetag" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2212c8a9b9bcfca32024de14998494cf9a5dfa59ea1b829de98bac374b86bf" +dependencies = [ + "erased-serde", + "inventory", + "once_cell", + "serde", + "typetag-impl", +] + +[[package]] +name = "typetag-impl" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "typify" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index ded3aedecb..c56c808f1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ toml = "0.8" tracing = "0.1.41" tracing-subscriber = "0.3.20" typed-builder = "0.20" +typetag = "0.2" url = "2.5.7" uuid = { version = "1.18", features = ["v7"] } volo = "0.10.6" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6f1332a444..9a5de7736d 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -86,6 +86,7 @@ serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } tokio = { workspace = true, optional = false, features = ["sync"] } typed-builder = { workspace = true } +typetag = { workspace = true } url = { workspace = true } uuid = { workspace = true } zstd = { workspace = true } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6209c1e261..1f0f70094d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -447,7 +447,7 @@ impl ArrowReader { file_io: FileIO, should_load_page_index: bool, arrow_reader_options: Option, - ) -> Result>> { + ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; @@ -1646,18 +1646,18 @@ impl BoundPredicateVisitor for PredicateConverter<'_> { } /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. -pub struct ArrowFileReader { +pub struct ArrowFileReader { meta: FileMetadata, preload_column_index: bool, preload_offset_index: bool, preload_page_index: bool, metadata_size_hint: Option, - r: R, + r: Box, } -impl ArrowFileReader { +impl ArrowFileReader { /// Create a new ArrowFileReader - pub fn new(meta: FileMetadata, r: R) -> Self { + pub fn new(meta: FileMetadata, r: Box) -> Self { Self { meta, preload_column_index: false, @@ -1696,7 +1696,7 @@ impl ArrowFileReader { } } -impl AsyncFileReader for ArrowFileReader { +impl AsyncFileReader for ArrowFileReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { Box::pin( self.r diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 24e91ca8a4..e20cea2ed2 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -17,14 +17,15 @@ use std::any::{Any, TypeId}; use std::collections::HashMap; +use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use opendal::Operator; use url::Url; -use super::storage::Storage; +use super::storage::OpenDalStorage; +pub use super::storage::Storage; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage. @@ -47,8 +48,7 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, - - inner: Arc, + inner: Arc, } impl FileIO { @@ -89,8 +89,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; - Ok(op.delete(relative_path).await?) + self.inner.delete(path.as_ref()).await } /// Remove the path and all nested dirs and files recursively. @@ -104,14 +103,8 @@ impl FileIO { /// - If the path is a file or not exist, this function will be no-op. /// - If the path is a empty directory, this function will remove the directory itself. /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories. - pub async fn remove_dir_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = if relative_path.ends_with('/') { - relative_path.to_string() - } else { - format!("{relative_path}/") - }; - Ok(op.remove_all(&path).await?) + pub async fn delete_prefix(&self, path: impl AsRef) -> Result<()> { + self.inner.delete_prefix(path.as_ref()).await } /// Check file exists. @@ -120,8 +113,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - Ok(op.exists(relative_path).await?) + self.inner.exists(path.as_ref()).await } /// Creates input file. @@ -130,14 +122,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = path.as_ref().to_string(); - let relative_path_pos = path.len() - relative_path.len(); - Ok(InputFile { - op, - path, - relative_path_pos, - }) + self.inner.new_input(path.as_ref()) } /// Creates output file. @@ -146,14 +131,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = path.as_ref().to_string(); - let relative_path_pos = path.len() - relative_path.len(); - Ok(OutputFile { - op, - path, - relative_path_pos, - }) + self.inner.new_output(path.as_ref()) } } @@ -262,7 +240,8 @@ impl FileIOBuilder { /// Builds [`FileIO`]. pub fn build(self) -> Result { - let storage = Storage::build(self.clone())?; + let storage = OpenDalStorage::build(self.clone())?; + Ok(FileIO { builder: self, inner: Arc::new(storage), @@ -293,7 +272,7 @@ pub trait FileRead: Send + Sync + Unpin + 'static { #[async_trait::async_trait] impl FileRead for opendal::Reader { - async fn read(&self, range: Range) -> crate::Result { + async fn read(&self, range: Range) -> Result { Ok(opendal::Reader::read(self, range).await?.to_bytes()) } } @@ -301,49 +280,48 @@ impl FileRead for opendal::Reader { /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { - op: Operator, - // Absolution path of file. + storage: Arc, path: String, - // Relative path of file to uri, starts at [`relative_path_pos`] - relative_path_pos: usize, } impl InputFile { + /// Creates a new input file. + /// + /// # Arguments + /// + /// * `storage` - The storage backend to use + /// * `path` - Absolute path to the file + pub fn new(storage: Arc, path: String) -> Self { + Self { storage, path } + } + /// Absolute path to root uri. pub fn location(&self) -> &str { &self.path } /// Check if file exists. - pub async fn exists(&self) -> crate::Result { - Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) + pub async fn exists(&self) -> Result { + self.storage.exists(&self.path).await } /// Fetch and returns metadata of file. - pub async fn metadata(&self) -> crate::Result { - let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?; - - Ok(FileMetadata { - size: meta.content_length(), - }) + pub async fn metadata(&self) -> Result { + self.storage.metadata(&self.path).await } /// Read and returns whole content of file. /// /// For continuous reading, use [`Self::reader`] instead. - pub async fn read(&self) -> crate::Result { - Ok(self - .op - .read(&self.path[self.relative_path_pos..]) - .await? - .to_bytes()) + pub async fn read(&self) -> Result { + self.storage.read(&self.path).await } /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. - pub async fn reader(&self) -> crate::Result> { - Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + pub async fn reader(&self) -> Result> { + self.storage.reader(&self.path).await } } @@ -354,16 +332,16 @@ impl InputFile { /// It's possible for us to remove the async_trait, but we need to figure /// out how to handle the object safety. #[async_trait::async_trait] -pub trait FileWrite: Send + Unpin + 'static { +pub trait FileWrite: Send + Sync + Unpin + 'static { /// Write bytes to file. /// /// TODO: we can support writing non-contiguous bytes in the future. - async fn write(&mut self, bs: Bytes) -> crate::Result<()>; + async fn write(&mut self, bs: Bytes) -> Result<()>; /// Close file. /// /// Calling close on closed file will generate an error. - async fn close(&mut self) -> crate::Result<()>; + async fn close(&mut self) -> Result<()>; } #[async_trait::async_trait] @@ -389,17 +367,24 @@ impl FileWrite for Box { } } -/// Output file is used for writing to files.. +/// Output file is used for writing to files. #[derive(Debug)] pub struct OutputFile { - op: Operator, - // Absolution path of file. + storage: Arc, path: String, - // Relative path of file to uri, starts at [`relative_path_pos`] - relative_path_pos: usize, } impl OutputFile { + /// Creates a new output file. + /// + /// # Arguments + /// + /// * `storage` - The storage backend to use + /// * `path` - Absolute path to the file + pub fn new(storage: Arc, path: String) -> Self { + Self { storage, path } + } + /// Relative path to root uri. pub fn location(&self) -> &str { &self.path @@ -407,23 +392,19 @@ impl OutputFile { /// Checks if file exists. pub async fn exists(&self) -> Result { - Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) + self.storage.exists(&self.path).await } /// Deletes file. /// /// If the file does not exist, it will not return error. pub async fn delete(&self) -> Result<()> { - Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?) + self.storage.delete(&self.path).await } /// Converts into [`InputFile`]. pub fn to_input_file(self) -> InputFile { - InputFile { - op: self.op, - path: self.path, - relative_path_pos: self.relative_path_pos, - } + InputFile::new(self.storage, self.path) } /// Create a new output file with given bytes. @@ -433,9 +414,7 @@ impl OutputFile { /// Calling `write` will overwrite the file if it exists. /// For continuous writing, use [`Self::writer`]. pub async fn write(&self, bs: Bytes) -> crate::Result<()> { - let mut writer = self.writer().await?; - writer.write(bs).await?; - writer.close().await + self.storage.write(self.path.as_str(), bs).await } /// Creates output file for continuous writing. @@ -444,9 +423,7 @@ impl OutputFile { /// /// For one-time writing, use [`Self::write`] instead. pub async fn writer(&self) -> crate::Result> { - Ok(Box::new( - self.op.writer(&self.path[self.relative_path_pos..]).await?, - )) + Ok(Box::new(self.storage.writer(&self.path).await?)) } } @@ -517,14 +494,14 @@ mod tests { assert!(file_io.exists(&a_path).await.unwrap()); // Remove a file should be no-op. - file_io.remove_dir_all(&a_path).await.unwrap(); + file_io.delete_prefix(&a_path).await.unwrap(); assert!(file_io.exists(&a_path).await.unwrap()); // Remove a not exist dir should be no-op. - file_io.remove_dir_all("not_exists/").await.unwrap(); + file_io.delete_prefix("not_exists/").await.unwrap(); // Remove a dir should remove all files in it. - file_io.remove_dir_all(&sub_dir_path).await.unwrap(); + file_io.delete_prefix(&sub_dir_path).await.unwrap(); assert!(!file_io.exists(&b_path).await.unwrap()); assert!(!file_io.exists(&c_path).await.unwrap()); assert!(file_io.exists(&a_path).await.unwrap()); @@ -543,7 +520,7 @@ mod tests { let file_io = create_local_file_io(); assert!(!file_io.exists(&full_path).await.unwrap()); assert!(file_io.delete(&full_path).await.is_ok()); - assert!(file_io.remove_dir_all(&full_path).await.is_ok()); + assert!(file_io.delete_prefix(&full_path).await.is_ok()); } #[tokio::test] diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb5964345..e582fe2e9b 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -88,11 +88,11 @@ mod storage_s3; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; #[cfg(feature = "storage-fs")] -use storage_fs::*; +pub(crate) use storage_fs::*; #[cfg(feature = "storage-gcs")] pub use storage_gcs::*; #[cfg(feature = "storage-memory")] -use storage_memory::*; +pub(crate) use storage_memory::*; #[cfg(feature = "storage-oss")] pub use storage_oss::*; #[cfg(feature = "storage-s3")] diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 03e43600dd..e754c7e16a 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -#[cfg(any( - feature = "storage-s3", - feature = "storage-gcs", - feature = "storage-oss", - feature = "storage-azdls", -))] +//! Storage interfaces of Iceberg + +use std::fmt::Debug; use std::sync::Arc; +use async_trait::async_trait; +use bytes::Bytes; use opendal::layers::RetryLayer; #[cfg(feature = "storage-azdls")] use opendal::services::AzdlsConfig; @@ -33,34 +32,89 @@ use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; +use serde::{Deserialize, Serialize}; -#[cfg(feature = "storage-azdls")] -use super::AzureStorageScheme; -use super::FileIOBuilder; -#[cfg(feature = "storage-s3")] -use crate::io::CustomAwsCredentialLoader; -use crate::{Error, ErrorKind}; +use super::{FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; +use crate::{Error, ErrorKind, Result}; + +/// Trait for storage operations in Iceberg. +/// +/// The trait supports serialization via `typetag`, allowing storage instances to be +/// serialized and deserialized across process boundaries. +/// +/// Third-party implementations can implement this trait to provide custom storage backends. +#[async_trait] +#[typetag::serde(tag = "type")] +pub trait Storage: Debug + Send + Sync { + /// Check if a file exists at the given path + async fn exists(&self, path: &str) -> Result; + + /// Get metadata from an input path + async fn metadata(&self, path: &str) -> Result; + + /// Read bytes from a path + async fn read(&self, path: &str) -> Result; + + /// Get FileRead from a path + async fn reader(&self, path: &str) -> Result>; + + /// Write bytes to an output path + async fn write(&self, path: &str, bs: Bytes) -> Result<()>; + + /// Get FileWrite from a path + async fn writer(&self, path: &str) -> Result>; -/// The storage carries all supported storage services in iceberg -#[derive(Debug)] -pub(crate) enum Storage { + /// Delete a file at the given path + async fn delete(&self, path: &str) -> Result<()>; + + /// Delete all files with the given prefix + async fn delete_prefix(&self, path: &str) -> Result<()>; + + /// Create a new input file for reading + fn new_input(&self, path: &str) -> Result; + + /// Create a new output file for writing + fn new_output(&self, path: &str) -> Result; +} + +/// Unified OpenDAL-based storage implementation. +/// +/// This storage handles all supported schemes (S3, GCS, Azure, filesystem, memory) +/// through OpenDAL, creating operators on-demand based on the path scheme. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) enum OpenDalStorage { + /// In-memory storage, useful for testing #[cfg(feature = "storage-memory")] - Memory(Operator), + Memory(#[serde(skip, default = "default_memory_op")] Operator), + /// Local filesystem storage #[cfg(feature = "storage-fs")] LocalFs, + /// Amazon S3 storage /// Expects paths of the form `s3[a]:///`. #[cfg(feature = "storage-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. configured_scheme: String, + /// S3 configuration config: Arc, - customized_credential_load: Option, + /// Optional custom credential loader + #[serde(skip)] + customized_credential_load: Option, }, + /// Google Cloud Storage #[cfg(feature = "storage-gcs")] - Gcs { config: Arc }, + Gcs { + /// GCS configuration + config: Arc, + }, + /// Alibaba Cloud OSS #[cfg(feature = "storage-oss")] - Oss { config: Arc }, + Oss { + /// OSS configuration + config: Arc, + }, + /// Azure Data Lake Storage /// Expects paths of the form /// `abfs[s]://@.dfs./` or /// `wasb[s]://@.blob./`. @@ -68,14 +122,20 @@ pub(crate) enum Storage { Azdls { /// Because Azdls accepts multiple possible schemes, we store the full /// passed scheme here to later validate schemes passed via paths. - configured_scheme: AzureStorageScheme, + configured_scheme: super::AzureStorageScheme, + /// Azure DLS configuration config: Arc, }, } -impl Storage { - /// Convert iceberg config to opendal config. - pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { +#[cfg(feature = "storage-memory")] +fn default_memory_op() -> Operator { + super::memory_config_build().expect("Failed to build memory operator") +} + +impl OpenDalStorage { + /// Build storage from FileIOBuilder + pub fn build(file_io_builder: FileIOBuilder) -> Result { let (scheme_str, props, extensions) = file_io_builder.into_parts(); let _ = (&props, &extensions); let scheme = Self::parse_scheme(&scheme_str)?; @@ -83,30 +143,35 @@ impl Storage { match scheme { #[cfg(feature = "storage-memory")] Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)), + #[cfg(feature = "storage-fs")] Scheme::Fs => Ok(Self::LocalFs), + #[cfg(feature = "storage-s3")] Scheme::S3 => Ok(Self::S3 { configured_scheme: scheme_str, config: super::s3_config_parse(props)?.into(), customized_credential_load: extensions - .get::() + .get::() .map(Arc::unwrap_or_clone), }), + #[cfg(feature = "storage-gcs")] Scheme::Gcs => Ok(Self::Gcs { config: super::gcs_config_parse(props)?.into(), }), + #[cfg(feature = "storage-oss")] Scheme::Oss => Ok(Self::Oss { config: super::oss_config_parse(props)?.into(), }), + #[cfg(feature = "storage-azdls")] Scheme::Azdls => { - let scheme = scheme_str.parse::()?; + let configured_scheme = scheme_str.parse::()?; Ok(Self::Azdls { + configured_scheme, config: super::azdls_config_parse(props)?.into(), - configured_scheme: scheme, }) } // Update doc on [`FileIO`] when adding new schemes. @@ -129,23 +194,19 @@ impl Storage { /// /// * An [`opendal::Operator`] instance used to operate on file. /// * Relative path to the root uri of [`opendal::Operator`]. - pub(crate) fn create_operator<'a>( - &self, - path: &'a impl AsRef, - ) -> crate::Result<(Operator, &'a str)> { - let path = path.as_ref(); - let _ = path; + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { let (operator, relative_path): (Operator, &str) = match self { #[cfg(feature = "storage-memory")] - Storage::Memory(op) => { + Self::Memory(op) => { if let Some(stripped) = path.strip_prefix("memory:/") { Ok::<_, crate::Error>((op.clone(), stripped)) } else { Ok::<_, crate::Error>((op.clone(), &path[1..])) } } + #[cfg(feature = "storage-fs")] - Storage::LocalFs => { + Self::LocalFs => { let op = super::fs_config_build()?; if let Some(stripped) = path.strip_prefix("file:/") { @@ -154,8 +215,9 @@ impl Storage { Ok::<_, crate::Error>((op, &path[1..])) } } + #[cfg(feature = "storage-s3")] - Storage::S3 { + Self::S3 { configured_scheme, config, customized_credential_load, @@ -174,8 +236,9 @@ impl Storage { )) } } + #[cfg(feature = "storage-gcs")] - Storage::Gcs { config } => { + Self::Gcs { config } => { let operator = super::gcs_config_build(config, path)?; let prefix = format!("gs://{}/", operator.info().name()); if path.starts_with(&prefix) { @@ -187,10 +250,10 @@ impl Storage { )) } } + #[cfg(feature = "storage-oss")] - Storage::Oss { config } => { + Self::Oss { config } => { let op = super::oss_config_build(config, path)?; - // Check prefix of oss path. let prefix = format!("oss://{}/", op.info().name()); if path.starts_with(&prefix) { @@ -202,8 +265,9 @@ impl Storage { )) } } + #[cfg(feature = "storage-azdls")] - Storage::Azdls { + Self::Azdls { configured_scheme, config, } => super::azdls_create_operator(path, config, configured_scheme), @@ -240,3 +304,109 @@ impl Storage { } } } + +#[async_trait] +#[typetag::serde] +impl Storage for OpenDalStorage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + let meta = op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.delete(relative_path).await?) + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + + #[test] + #[cfg(feature = "storage-memory")] + fn test_opendal_storage_memory() { + let builder = FileIOBuilder::new("memory"); + let storage = OpenDalStorage::build(builder).unwrap(); + assert!(matches!(storage, OpenDalStorage::Memory { .. })); + } + + #[test] + #[cfg(feature = "storage-fs")] + fn test_opendal_storage_fs() { + let builder = FileIOBuilder::new("file"); + let storage = OpenDalStorage::build(builder).unwrap(); + assert!(matches!(storage, OpenDalStorage::LocalFs)); + } + + #[test] + #[cfg(feature = "storage-s3")] + fn test_opendal_storage_s3() { + let builder = FileIOBuilder::new("s3"); + let storage = OpenDalStorage::build(builder).unwrap(); + assert!(matches!(storage, OpenDalStorage::S3 { .. })); + } + + #[test] + #[cfg(feature = "storage-memory")] + fn test_storage_serialization() { + let builder = FileIOBuilder::new("memory"); + let storage = OpenDalStorage::build(builder).unwrap(); + + // Serialize + let serialized = serde_json::to_string(&storage).unwrap(); + + // Deserialize + let deserialized: OpenDalStorage = serde_json::from_str(&serialized).unwrap(); + + assert!(matches!(deserialized, OpenDalStorage::Memory { .. })); + } +} diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 5abb0cd6e0..ec11db6c07 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -125,7 +125,7 @@ pub(crate) fn azdls_create_operator<'a>( /// paths are expected to contain the `dfs` storage service. /// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are /// expected to contain the `blob` storage service. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub(crate) enum AzureStorageScheme { Abfs, Abfss, @@ -135,7 +135,7 @@ pub(crate) enum AzureStorageScheme { impl AzureStorageScheme { // Returns the respective encrypted or plain-text HTTP scheme. - pub fn as_http_scheme(&self) -> &str { + pub(crate) fn as_http_scheme(&self) -> &str { match self { AzureStorageScheme::Abfs | AzureStorageScheme::Wasb => "http", AzureStorageScheme::Abfss | AzureStorageScheme::Wasbs => "https", diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 15a8e9b33a..a4b7464704 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -285,10 +285,13 @@ impl FileMetadata { let input_file_length = input_file.metadata().await?.size; let footer_payload_length = - FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; - let footer_bytes = - FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length) - .await?; + FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?; + let footer_bytes = FileMetadata::read_footer_bytes( + file_read.as_ref(), + input_file_length, + footer_payload_length, + ) + .await?; let magic_length = FileMetadata::MAGIC_LENGTH as usize; // check first four bytes of footer diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index dce53d93f0..8941d4a8fc 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -18,7 +18,7 @@ use tokio::sync::OnceCell; use crate::Result; -use crate::io::{FileRead, InputFile}; +use crate::io::InputFile; use crate::puffin::blob::Blob; use crate::puffin::metadata::{BlobMetadata, FileMetadata};