diff --git a/Cargo.lock b/Cargo.lock index 6528d13f65..4374316662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4733,7 +4733,7 @@ dependencies = [ "md-5", "parking_lot", "percent-encoding", - "quick-xml 0.37.4", + "quick-xml", "rand 0.8.5", "reqwest", "ring", @@ -4756,9 +4756,9 @@ checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "opendal" -version = "0.53.0" +version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ebd1183902124c6b3ee0a9383683513dd8cca3d25a5d065593f969a44f979e" +checksum = "f947c4efbca344c1a125753366033c8107f552b2e3f8251815ed1908f116ca3e" dependencies = [ "anyhow", "async-trait", @@ -4770,10 +4770,11 @@ dependencies = [ "futures", "getrandom 0.2.15", "http 1.3.1", + "http-body 1.0.1", "log", "md-5", "percent-encoding", - "quick-xml 0.36.2", + "quick-xml", "reqsign", "reqwest", "serde", @@ -5333,26 +5334,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" -[[package]] -name = "quick-xml" -version = "0.35.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e" -dependencies = [ - "memchr", - "serde", -] - -[[package]] -name = "quick-xml" -version = "0.36.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.37.4" @@ -5639,9 +5620,9 @@ checksum = "a35e8a6bf28cd121053a66aa2e6a2e3eaffad4a60012179f0e864aa5ffeff215" [[package]] name = "reqsign" -version = "0.16.1" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" +checksum = "9323c0afb30e54f793f4705b10c890395bccc87c6e6ea62c4e7e82d09a380dc6" dependencies = [ "anyhow", "async-trait", @@ -5657,7 +5638,7 @@ dependencies = [ "log", "once_cell", "percent-encoding", - "quick-xml 0.35.0", + "quick-xml", "rand 0.8.5", "reqwest", "rsa", diff --git a/Cargo.toml b/Cargo.toml index 3d12fc8774..cae0c85cb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ motore-macros = "0.4.3" murmur3 = "0.5.2" num-bigint = "0.4.6" once_cell = "1.20" -opendal = "0.53.0" +opendal = "0.53.3" ordered-float = "4" parquet = "55" pilota = "0.11.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 3324a4e9e3..d1ddc82463 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -32,6 +32,7 @@ repository = { workspace = true } default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] +storage-azdls = ["opendal/services-azdls"] storage-fs = ["opendal/services-fs"] storage-gcs = ["opendal/services-gcs"] storage-memory = ["opendal/services-memory"] diff --git a/crates/iceberg/README.md b/crates/iceberg/README.md index fe8ba5239e..14acaa2d22 100644 --- a/crates/iceberg/README.md +++ b/crates/iceberg/README.md @@ -62,17 +62,18 @@ async fn main() -> Result<()> { Iceberg Rust provides various storage backends through feature flags. Here are the currently supported storage backends: -| Storage Backend | Feature Flag | Status | Description | -|----------------|--------------|--------|-------------| -| Memory | `storage-memory` | โœ… Stable | In-memory storage for testing and development | -| Local Filesystem | `storage-fs` | โœ… Stable | Local filesystem storage | -| Amazon S3 | `storage-s3` | โœ… Stable | Amazon S3 storage | -| Google Cloud Storage | `storage-gcs` | โœ… Stable | Google Cloud Storage | -| Alibaba Cloud OSS | `storage-oss` | ๐Ÿงช Experimental | Alibaba Cloud Object Storage Service | +| Storage Backend | Feature Flag | Status | Description | +| -------------------- | ---------------- | -------------- | --------------------------------------------- | +| Memory | `storage-memory` | โœ… Stable | In-memory storage for testing and development | +| Local Filesystem | `storage-fs` | โœ… Stable | Local filesystem storage | +| Amazon S3 | `storage-s3` | โœ… Stable | Amazon S3 storage | +| Google Cloud Storage | `storage-gcs` | โœ… Stable | Google Cloud Storage | +| Alibaba Cloud OSS | `storage-oss` | ๐Ÿงช Experimental | Alibaba Cloud Object Storage Service | +| Azure Datalake | `storage-azdls` | ๐Ÿงช Experimental | Azure Datalake Storage v2 | You can enable all stable storage backends at once using the `storage-all` feature flag. -> Note that `storage-oss` is currently experimental and not included in `storage-all`. +> Note that `storage-oss` and `storage-azdls` are currently experimental and not included in `storage-all`. Example usage in `Cargo.toml`: diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 2f0ae1736c..389397ecae 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -35,12 +35,14 @@ use crate::{Error, ErrorKind, Result}; /// /// Supported storages: /// -/// | Storage | Feature Flag | Schemes | -/// |--------------------|-------------------|------------| -/// | Local file system | `storage-fs` | `file` | -/// | Memory | `storage-memory` | `memory` | -/// | S3 | `storage-s3` | `s3`, `s3a`| -/// | GCS | `storage-gcs` | `gs`, `gcs`| +/// | Storage | Feature Flag | Expected Path Format | Schemes | +/// |--------------------|-------------------|----------------------------------| ------------------------------| +/// | Local file system | `storage-fs` | `file` | `file://path/to/file` | +/// | Memory | `storage-memory` | `memory` | `memory://path/to/file` | +/// | S3 | `storage-s3` | `s3`, `s3a` | `s3:///path/to/file` | +/// | GCS | `storage-gcs` | `gs`, `gcs` | `gs:///path/to/file` | +/// | OSS | `storage-oss` | `oss` | `oss:///path/to/file` | +/// | Azure Datalake | `storage-azdls` | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://@.dfs.core.windows.net/path/to/file` or `wasb://@.blob.core.windows.net/path/to/file` | #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index d442b1522e..5eb5964345 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -67,32 +67,36 @@ //! - `new_output`: Create output file for writing. mod file_io; +mod storage; + pub use file_io::*; +pub(crate) mod object_cache; -mod storage; +#[cfg(feature = "storage-azdls")] +mod storage_azdls; +#[cfg(feature = "storage-fs")] +mod storage_fs; +#[cfg(feature = "storage-gcs")] +mod storage_gcs; #[cfg(feature = "storage-memory")] mod storage_memory; -#[cfg(feature = "storage-memory")] -use storage_memory::*; +#[cfg(feature = "storage-oss")] +mod storage_oss; #[cfg(feature = "storage-s3")] mod storage_s3; -#[cfg(feature = "storage-s3")] -pub use storage_s3::*; -pub(crate) mod object_cache; -#[cfg(feature = "storage-fs")] -mod storage_fs; +#[cfg(feature = "storage-azdls")] +pub use storage_azdls::*; #[cfg(feature = "storage-fs")] use storage_fs::*; #[cfg(feature = "storage-gcs")] -mod storage_gcs; -#[cfg(feature = "storage-gcs")] pub use storage_gcs::*; - -#[cfg(feature = "storage-oss")] -mod storage_oss; +#[cfg(feature = "storage-memory")] +use storage_memory::*; #[cfg(feature = "storage-oss")] pub use storage_oss::*; +#[cfg(feature = "storage-s3")] +pub use storage_s3::*; pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index e727be5f58..a847977e5b 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use opendal::layers::RetryLayer; +#[cfg(feature = "storage-azdls")] +use opendal::services::AzdlsConfig; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; #[cfg(feature = "storage-oss")] @@ -26,6 +28,8 @@ use opendal::services::OssConfig; use opendal::services::S3Config; use opendal::{Operator, Scheme}; +#[cfg(feature = "storage-azdls")] +use super::AzureStorageScheme; use super::FileIOBuilder; use crate::{Error, ErrorKind}; @@ -36,17 +40,28 @@ pub(crate) enum Storage { Memory(Operator), #[cfg(feature = "storage-fs")] LocalFs, + /// Expects paths of the form `s3[a]:///`. #[cfg(feature = "storage-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. - scheme_str: String, + configured_scheme: String, config: Arc, }, - #[cfg(feature = "storage-oss")] - Oss { config: Arc }, #[cfg(feature = "storage-gcs")] Gcs { config: Arc }, + #[cfg(feature = "storage-oss")] + Oss { config: Arc }, + /// Expects paths of the form + /// `abfs[s]://@.dfs./` or + /// `wasb[s]://@.blob./`. + #[cfg(feature = "storage-azdls")] + Azdls { + /// Because Azdls accepts multiple possible schemes, we store the full + /// passed scheme here to later validate schemes passed via paths. + configured_scheme: AzureStorageScheme, + config: Arc, + }, } impl Storage { @@ -62,7 +77,7 @@ impl Storage { Scheme::Fs => Ok(Self::LocalFs), #[cfg(feature = "storage-s3")] Scheme::S3 => Ok(Self::S3 { - scheme_str, + configured_scheme: scheme_str, config: super::s3_config_parse(props)?.into(), }), #[cfg(feature = "storage-gcs")] @@ -73,6 +88,14 @@ impl Storage { Scheme::Oss => Ok(Self::Oss { config: super::oss_config_parse(props)?.into(), }), + #[cfg(feature = "storage-azdls")] + Scheme::Azdls => { + let scheme = scheme_str.parse::()?; + Ok(Self::Azdls { + config: super::azdls_config_parse(props)?.into(), + configured_scheme: scheme, + }) + } // Update doc on [`FileIO`] when adding new schemes. _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -118,12 +141,15 @@ impl Storage { } } #[cfg(feature = "storage-s3")] - Storage::S3 { scheme_str, config } => { + Storage::S3 { + configured_scheme, + config, + } => { let op = super::s3_config_build(config, path)?; let op_info = op.info(); // Check prefix of s3 path. - let prefix = format!("{}://{}/", scheme_str, op_info.name()); + let prefix = format!("{}://{}/", configured_scheme, op_info.name()); if path.starts_with(&prefix) { Ok((op, &path[prefix.len()..])) } else { @@ -133,7 +159,6 @@ impl Storage { )) } } - #[cfg(feature = "storage-gcs")] Storage::Gcs { config } => { let operator = super::gcs_config_build(config, path)?; @@ -162,11 +187,17 @@ impl Storage { )) } } + #[cfg(feature = "storage-azdls")] + Storage::Azdls { + configured_scheme, + config, + } => super::azdls_create_operator(path, config, configured_scheme), #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), not(feature = "storage-gcs"), - not(feature = "storage-oss") + not(feature = "storage-oss"), + not(feature = "storage-azdls"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -189,6 +220,7 @@ impl Storage { "s3" | "s3a" => Ok(Scheme::S3), "gs" | "gcs" => Ok(Scheme::Gcs), "oss" => Ok(Scheme::Oss), + "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls), s => Ok(s.parse::()?), } } diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs new file mode 100644 index 0000000000..2cc6f1e26c --- /dev/null +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -0,0 +1,552 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Display; +use std::str::FromStr; + +use opendal::Configurator; +use opendal::services::AzdlsConfig; +use url::Url; + +use crate::{Error, ErrorKind, Result, ensure_data_valid}; + +/// A connection string. +/// +/// Note, this string is parsed first, and any other passed adls.* properties +/// will override values from the connection string. +const ADLS_CONNECTION_STRING: &str = "adls.connection-string"; + +/// The account that you want to connect to. +pub const ADLS_ACCOUNT_NAME: &str = "adls.account-name"; + +/// The key to authentication against the account. +pub const ADLS_ACCOUNT_KEY: &str = "adls.account-key"; + +/// The shared access signature. +pub const ADLS_SAS_TOKEN: &str = "adls.sas-token"; + +/// The tenant-id. +pub const ADLS_TENANT_ID: &str = "adls.tenant-id"; + +/// The client-id. +pub const ADLS_CLIENT_ID: &str = "adls.client-id"; + +/// The client-secret. +pub const ADLS_CLIENT_SECRET: &str = "adls.client-secret"; + +/// Parses adls.* prefixed configuration properties. +pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Result { + let mut config = AzdlsConfig::default(); + + if let Some(_conn_str) = properties.remove(ADLS_CONNECTION_STRING) { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Azdls: connection string currently not supported", + )); + } + + if let Some(account_name) = properties.remove(ADLS_ACCOUNT_NAME) { + config.account_name = Some(account_name); + } + + if let Some(account_key) = properties.remove(ADLS_ACCOUNT_KEY) { + config.account_key = Some(account_key); + } + + if let Some(sas_token) = properties.remove(ADLS_SAS_TOKEN) { + config.sas_token = Some(sas_token); + } + + if let Some(tenant_id) = properties.remove(ADLS_TENANT_ID) { + config.tenant_id = Some(tenant_id); + } + + if let Some(client_id) = properties.remove(ADLS_CLIENT_ID) { + config.client_id = Some(client_id); + } + + if let Some(client_secret) = properties.remove(ADLS_CLIENT_SECRET) { + config.client_secret = Some(client_secret); + } + + Ok(config) +} + +/// Builds an OpenDAL operator from the AzdlsConfig and path. +/// +/// The path is expected to include the scheme in a format like: +/// `abfss://@.dfs.core.windows.net/mydir/myfile.parquet`. +pub(crate) fn azdls_create_operator<'a>( + absolute_path: &'a str, + config: &AzdlsConfig, + configured_scheme: &AzureStorageScheme, +) -> Result<(opendal::Operator, &'a str)> { + let path = absolute_path.parse::()?; + match_path_with_config(&path, config, configured_scheme)?; + + let op = azdls_config_build(config, &path)?; + + // Paths to files in ADLS tend to be written in fully qualified form, + // including their filesystem and account name. + // OpenDAL's operator methods expect only the relative path, so we split it + // off and save it for later use. + let relative_path_len = path.path.len(); + let (_, relative_path) = absolute_path.split_at(absolute_path.len() - relative_path_len); + + Ok((op, relative_path)) +} + +/// Note that `abf[s]` and `wasb[s]` variants have different implications: +/// - `abfs[s]` is used to refer to files in ADLS Gen2, backed by blob storage; +/// paths are expected to contain the `dfs` storage service. +/// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are +/// expected to contain the `blob` storage service. +#[derive(Debug, PartialEq)] +pub(crate) enum AzureStorageScheme { + Abfs, + Abfss, + Wasb, + Wasbs, +} + +impl AzureStorageScheme { + // Returns the respective encrypted or plain-text HTTP scheme. + pub fn as_http_scheme(&self) -> &str { + match self { + AzureStorageScheme::Abfs | AzureStorageScheme::Wasb => "http", + AzureStorageScheme::Abfss | AzureStorageScheme::Wasbs => "https", + } + } +} + +impl Display for AzureStorageScheme { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AzureStorageScheme::Abfs => write!(f, "abfs"), + AzureStorageScheme::Abfss => write!(f, "abfss"), + AzureStorageScheme::Wasb => write!(f, "wasb"), + AzureStorageScheme::Wasbs => write!(f, "wasbs"), + } + } +} + +impl FromStr for AzureStorageScheme { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "abfs" => Ok(AzureStorageScheme::Abfs), + "abfss" => Ok(AzureStorageScheme::Abfss), + "wasb" => Ok(AzureStorageScheme::Wasb), + "wasbs" => Ok(AzureStorageScheme::Wasbs), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unexpected Azure Storage scheme: {}", s), + )), + } + } +} + +/// Validates whether the given path matches what's configured for the backend. +fn match_path_with_config( + path: &AzureStoragePath, + config: &AzdlsConfig, + configured_scheme: &AzureStorageScheme, +) -> Result<()> { + ensure_data_valid!( + &path.scheme == configured_scheme, + "Storage::Azdls: Scheme mismatch: configured {}, passed {}", + configured_scheme, + path.scheme + ); + + if let Some(ref configured_account_name) = config.account_name { + ensure_data_valid!( + &path.account_name == configured_account_name, + "Storage::Azdls: Account name mismatch: configured {}, path {}", + configured_account_name, + path.account_name + ); + } + + if let Some(ref configured_endpoint) = config.endpoint { + let passed_http_scheme = path.scheme.as_http_scheme(); + ensure_data_valid!( + configured_endpoint.starts_with(passed_http_scheme), + "Storage::Azdls: Endpoint {} does not use the expected http scheme {}.", + configured_endpoint, + passed_http_scheme + ); + + let ends_with_expected_suffix = configured_endpoint + .trim_end_matches('/') + .ends_with(&path.endpoint_suffix); + ensure_data_valid!( + ends_with_expected_suffix, + "Storage::Azdls: Endpoint suffix {} used with configured endpoint {}.", + path.endpoint_suffix, + configured_endpoint, + ); + } + + Ok(()) +} + +fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result { + let mut builder = config.clone().into_builder(); + + if config.endpoint.is_none() { + // If no endpoint is provided, we construct it from the fully-qualified path. + builder = builder.endpoint(&path.as_endpoint()); + } + builder = builder.filesystem(&path.filesystem); + + Ok(opendal::Operator::new(builder)?.finish()) +} + +/// Represents a fully qualified path to blob/ file in Azure Storage. +#[derive(Debug, PartialEq)] +struct AzureStoragePath { + /// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`. + scheme: AzureStorageScheme, + + /// Under Blob Storage, this is considered the _container_. + filesystem: String, + + account_name: String, + + /// Either `blob` or `dfs` for Blob Storage and ADLSv2 respectively. + storage_service: String, + + /// The endpoint suffix, e.g., `core.windows.net` for the public cloud + /// endpoint. + endpoint_suffix: String, + + /// Path to the file. + /// + /// It is relative to the `root` of the `AzdlsConfig`. + path: String, +} + +impl AzureStoragePath { + /// Converts the AzureStoragePath into a full endpoint URL. + /// + /// This is possible because the path is fully qualified. + fn as_endpoint(&self) -> String { + format!( + "{}://{}.{}.{}", + self.scheme.as_http_scheme(), + self.account_name, + self.storage_service, + self.endpoint_suffix + ) + } +} + +impl FromStr for AzureStoragePath { + type Err = Error; + + fn from_str(path: &str) -> Result { + let url = Url::parse(path)?; + + let filesystem = url.username(); + ensure_data_valid!( + !filesystem.is_empty(), + "AzureStoragePath: No container or filesystem name in path: {}", + path + ); + + let (account_name, storage_service, endpoint_suffix) = parse_azure_storage_endpoint(&url)?; + let scheme = validate_storage_and_scheme(storage_service, url.scheme())?; + + Ok(AzureStoragePath { + scheme, + filesystem: filesystem.to_string(), + account_name: account_name.to_string(), + storage_service: storage_service.to_string(), + endpoint_suffix: endpoint_suffix.to_string(), + path: url.path().to_string(), + }) + } +} + +fn parse_azure_storage_endpoint(url: &Url) -> Result<(&str, &str, &str)> { + let host = url.host_str().ok_or(Error::new( + ErrorKind::DataInvalid, + "AzureStoragePath: No host", + ))?; + + let (account_name, endpoint) = host.split_once('.').ok_or(Error::new( + ErrorKind::DataInvalid, + "AzureStoragePath: No account name", + ))?; + if account_name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "AzureStoragePath: No account name", + )); + } + + let (storage, endpoint_suffix) = endpoint.split_once('.').ok_or(Error::new( + ErrorKind::DataInvalid, + "AzureStoragePath: No storage service", + ))?; + + Ok((account_name, storage, endpoint_suffix)) +} + +fn validate_storage_and_scheme( + storage_service: &str, + scheme_str: &str, +) -> Result { + let scheme = scheme_str.parse::()?; + match scheme { + AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { + ensure_data_valid!( + storage_service == "dfs", + "AzureStoragePath: Unexpected storage service for abfs[s]: {}", + storage_service + ); + Ok(scheme) + } + AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { + ensure_data_valid!( + storage_service == "blob", + "AzureStoragePath: Unexpected storage service for wasb[s]: {}", + storage_service + ); + Ok(scheme) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use opendal::services::AzdlsConfig; + + use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator}; + use crate::io::azdls_config_parse; + + #[test] + fn test_azdls_config_parse() { + let test_cases = vec![ + ( + "account name and key", + HashMap::from([ + (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()), + (super::ADLS_ACCOUNT_KEY.to_string(), "secret".to_string()), + ]), + Some(AzdlsConfig { + account_name: Some("test".to_string()), + account_key: Some("secret".to_string()), + ..Default::default() + }), + ), + ( + "account name and SAS token", + HashMap::from([ + (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()), + (super::ADLS_SAS_TOKEN.to_string(), "token".to_string()), + ]), + Some(AzdlsConfig { + account_name: Some("test".to_string()), + sas_token: Some("token".to_string()), + ..Default::default() + }), + ), + ( + "account name and ADD credentials", + HashMap::from([ + (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()), + (super::ADLS_CLIENT_ID.to_string(), "abcdef".to_string()), + (super::ADLS_CLIENT_SECRET.to_string(), "secret".to_string()), + (super::ADLS_TENANT_ID.to_string(), "12345".to_string()), + ]), + Some(AzdlsConfig { + account_name: Some("test".to_string()), + client_id: Some("abcdef".to_string()), + client_secret: Some("secret".to_string()), + tenant_id: Some("12345".to_string()), + ..Default::default() + }), + ), + ]; + + for (name, properties, expected) in test_cases { + let config = azdls_config_parse(properties); + match expected { + Some(expected_config) => { + assert!(config.is_ok(), "Test case {} failed: {:?}", name, config); + assert_eq!(config.unwrap(), expected_config, "Test case: {}", name); + } + None => { + assert!(config.is_err(), "Test case {} expected error.", name); + } + } + } + } + + #[test] + fn test_azdls_create_operator() { + let test_cases = vec![ + ( + "basic", + ( + "abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }, + AzureStorageScheme::Abfss, + ), + Some(("myfs", "/path/to/file.parquet")), + ), + ( + "different account", + ( + "abfss://myfs@anotheraccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }, + AzureStorageScheme::Abfss, + ), + None, + ), + ( + "different scheme", + ( + "wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }, + AzureStorageScheme::Abfss, + ), + None, + ), + ( + "incompatible scheme for endpoint", + ( + "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("http://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }, + AzureStorageScheme::Abfss, + ), + None, + ), + ( + "different endpoint suffix", + ( + "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.chinacloudapi.cn".to_string()), + ..Default::default() + }, + AzureStorageScheme::Abfss, + ), + None, + ), + ( + "endpoint inferred from fully qualified path", + ( + "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet", + AzdlsConfig { + filesystem: "myfs".to_string(), + account_name: Some("myaccount".to_string()), + endpoint: None, + ..Default::default() + }, + AzureStorageScheme::Abfs, + ), + Some(("myfs", "/path/to/file.parquet")), + ), + ]; + + for (name, input, expected) in test_cases { + let result = azdls_create_operator(input.0, &input.1, &input.2); + match expected { + Some((expected_filesystem, expected_path)) => { + assert!(result.is_ok(), "Test case {} failed: {:?}", name, result); + + let (op, relative_path) = result.unwrap(); + assert_eq!(op.info().name(), expected_filesystem); + assert_eq!(relative_path, expected_path); + } + None => { + assert!(result.is_err(), "Test case {} expected error.", name); + } + } + } + } + + #[test] + fn test_parse_azure_storage_path() { + let test_cases = vec![ + ( + "succeeds", + "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet", + Some(AzureStoragePath { + scheme: AzureStorageScheme::Abfss, + filesystem: "somefs".to_string(), + account_name: "myaccount".to_string(), + storage_service: "dfs".to_string(), + endpoint_suffix: "core.windows.net".to_string(), + path: "/path/to/file.parquet".to_string(), + }), + ), + ( + "unexpected scheme", + "adls://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet", + None, + ), + ( + "no filesystem", + "abfss://myaccount.dfs.core.windows.net/path/to/file.parquet", + None, + ), + ( + "no account name", + "abfs://myfs@dfs.core.windows.net/path/to/file.parquet", + None, + ), + ]; + + for (name, input, expected) in test_cases { + let result = input.parse::(); + match expected { + Some(expected_path) => { + assert!(result.is_ok(), "Test case {} failed: {:?}", name, result); + assert_eq!(result.unwrap(), expected_path, "Test case: {}", name); + } + None => { + assert!(result.is_err(), "Test case {} expected error.", name); + } + } + } + } +}