diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 39553f7554..ddbf6a4e01 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -41,9 +41,9 @@ use crate::client::{ HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, }; use crate::types::{ - CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, - ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RegisterTableRequest, RenameTableRequest, + CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, + CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult, + NamespaceResponse, RegisterTableRequest, RenameTableRequest, }; /// REST catalog URI @@ -466,13 +466,7 @@ impl Catalog for RestCatalog { deserialize_catalog_response::(http_response) .await?; - let ns_identifiers = response - .namespaces - .into_iter() - .map(NamespaceIdent::from_vec) - .collect::>>()?; - - namespaces.extend(ns_identifiers); + namespaces.extend(response.namespaces); match response.next_page_token { Some(token) => next_token = Some(token), @@ -502,9 +496,9 @@ impl Catalog for RestCatalog { let request = context .client .request(Method::POST, context.config.namespaces_endpoint()) - .json(&NamespaceSerde { - namespace: namespace.as_ref().clone(), - properties: Some(properties), + .json(&CreateNamespaceRequest { + namespace: namespace.clone(), + properties, }) .build()?; @@ -513,8 +507,8 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; - Namespace::try_from(response) + deserialize_catalog_response::(http_response).await?; + Ok(Namespace::from(response)) } StatusCode::CONFLICT => Err(Error::new( ErrorKind::Unexpected, @@ -537,8 +531,8 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; - Namespace::try_from(response) + deserialize_catalog_response::(http_response).await?; + Ok(Namespace::from(response)) } StatusCode::NOT_FOUND => Err(Error::new( ErrorKind::Unexpected, @@ -614,7 +608,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; + deserialize_catalog_response::(http_response).await?; identifiers.extend(response.identifiers); @@ -661,11 +655,7 @@ impl Catalog for RestCatalog { partition_spec: creation.partition_spec, write_order: creation.sort_order, stage_create: Some(false), - properties: if creation.properties.is_empty() { - None - } else { - Some(creation.properties) - }, + properties: creation.properties, }) .build()?; @@ -673,7 +663,7 @@ impl Catalog for RestCatalog { let response = match http_response.status() { StatusCode::OK => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -697,7 +687,6 @@ impl Catalog for RestCatalog { let config = response .config - .unwrap_or_default() .into_iter() .chain(self.user_config.props.clone()) .collect(); @@ -735,7 +724,7 @@ impl Catalog for RestCatalog { let response = match http_response.status() { StatusCode::OK | StatusCode::NOT_MODIFIED => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -748,7 +737,6 @@ impl Catalog for RestCatalog { let config = response .config - .unwrap_or_default() .into_iter() .chain(self.user_config.props.clone()) .collect(); @@ -861,9 +849,9 @@ impl Catalog for RestCatalog { let http_response = context.client.query_catalog(request).await?; - let response: LoadTableResponse = match http_response.status() { + let response: LoadTableResult = match http_response.status() { StatusCode::OK => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -905,7 +893,7 @@ impl Catalog for RestCatalog { context.config.table_endpoint(commit.identifier()), ) .json(&CommitTableRequest { - identifier: commit.identifier().clone(), + identifier: Some(commit.identifier().clone()), requirements: commit.take_requirements(), updates: commit.take_updates(), }) @@ -2428,7 +2416,7 @@ mod tests { )) .unwrap(); let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap(); Table::builder() .metadata(resp.metadata) @@ -2568,7 +2556,7 @@ mod tests { )) .unwrap(); let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap(); Table::builder() .metadata(resp.metadata) diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 70cdeaabd0..3a7f8ba544 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -53,6 +53,6 @@ mod catalog; mod client; -mod types; +pub mod types; pub use catalog::*; diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 70ed72051a..bf1de2f054 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Request and response types for the Iceberg REST API. + use std::collections::HashMap; use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; @@ -23,6 +25,10 @@ use iceberg::{ }; use serde_derive::{Deserialize, Serialize}; +mod events; + +pub use events::*; + #[derive(Clone, Debug, Serialize, Deserialize)] pub(super) struct CatalogConfig { pub(super) overrides: HashMap, @@ -30,7 +36,8 @@ pub(super) struct CatalogConfig { } #[derive(Debug, Serialize, Deserialize)] -pub(super) struct ErrorResponse { +/// Wrapper for all non-2xx error responses from the REST API +pub struct ErrorResponse { error: ErrorModel, } @@ -41,11 +48,16 @@ impl From for Error { } #[derive(Debug, Serialize, Deserialize)] -pub(super) struct ErrorModel { - pub(super) message: String, - pub(super) r#type: String, - pub(super) code: u16, - pub(super) stack: Option>, +/// Error payload returned in a response with further details on the error +pub struct ErrorModel { + /// Human-readable error message + pub message: String, + /// Internal type definition of the error + pub r#type: String, + /// HTTP response code + pub code: u16, + /// Optional error stack / context + pub stack: Option>, } impl From for Error { @@ -96,106 +108,255 @@ pub(super) struct TokenResponse { pub(super) issued_token_type: Option, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct NamespaceSerde { - pub(super) namespace: Vec, - pub(super) properties: Option>, -} - -impl TryFrom for Namespace { - type Error = Error; - fn try_from(value: NamespaceSerde) -> std::result::Result { - Ok(Namespace::with_properties( - NamespaceIdent::from_vec(value.namespace)?, - value.properties.unwrap_or_default(), - )) - } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Namespace response +pub struct NamespaceResponse { + /// Namespace identifier + pub namespace: NamespaceIdent, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + /// Properties stored on the namespace, if supported by the server. + pub properties: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Create namespace request +pub struct CreateNamespaceRequest { + /// Name of the namespace to create + pub namespace: NamespaceIdent, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + /// Properties to set on the namespace + pub properties: HashMap, } -impl From<&Namespace> for NamespaceSerde { +impl From<&Namespace> for NamespaceResponse { fn from(value: &Namespace) -> Self { Self { - namespace: value.name().as_ref().clone(), - properties: Some(value.properties().clone()), + namespace: value.name().clone(), + properties: value.properties().clone(), } } } -#[derive(Debug, Serialize, Deserialize)] +impl From for Namespace { + fn from(value: NamespaceResponse) -> Self { + Namespace::with_properties(value.namespace, value.properties) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct ListNamespaceResponse { - pub(super) namespaces: Vec>, - #[serde(default)] - pub(super) next_page_token: Option, +/// Response containing a list of namespace identifiers, with optional pagination support. +pub struct ListNamespaceResponse { + /// List of namespace identifiers returned by the server + pub namespaces: Vec, + /// Opaque token for pagination. If present, indicates there are more results available. + /// Use this value in subsequent requests to retrieve the next page. + pub next_page_token: Option, } -#[allow(dead_code)] -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct UpdateNamespacePropsRequest { - removals: Option>, - updates: Option>, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Request to update properties on a namespace. +/// +/// Properties that are not in the request are not modified or removed by this call. +/// Server implementations are not required to support namespace properties. +pub struct UpdateNamespacePropertiesRequest { + /// List of property keys to remove from the namespace + pub removals: Option>, + /// Map of property keys to values to set or update on the namespace + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub updates: HashMap, } -#[allow(dead_code)] -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct UpdateNamespacePropsResponse { - updated: Vec, - removed: Vec, - missing: Option>, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Response from updating namespace properties, indicating which properties were changed. +pub struct UpdateNamespacePropertiesResponse { + /// List of property keys that were added or updated + pub updated: Vec, + /// List of properties that were removed + pub removed: Vec, + /// List of properties requested for removal that were not found in the namespace's properties. + /// Represents a partial success response. Servers do not need to implement this. + #[serde(skip_serializing_if = "Option::is_none")] + pub missing: Option>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct ListTableResponse { - pub(super) identifiers: Vec, +/// Response containing a list of table identifiers, with optional pagination support. +pub struct ListTablesResponse { + /// List of table identifiers under the requested namespace + pub identifiers: Vec, + /// Opaque token for pagination. If present, indicates there are more results available. + /// Use this value in subsequent requests to retrieve the next page. #[serde(default)] - pub(super) next_page_token: Option, + pub next_page_token: Option, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct RenameTableRequest { - pub(super) source: TableIdent, - pub(super) destination: TableIdent, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Request to rename a table from one identifier to another. +/// +/// It's valid to move a table across namespaces, but the server implementation +/// is not required to support it. +pub struct RenameTableRequest { + /// Current table identifier to rename + pub source: TableIdent, + /// New table identifier to rename to + pub destination: TableIdent, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct LoadTableResponse { - pub(super) metadata_location: Option, - pub(super) metadata: TableMetadata, - pub(super) config: Option>, +/// Result returned when a table is successfully loaded or created. +/// +/// The table metadata JSON is returned in the `metadata` field. The corresponding file location +/// of table metadata should be returned in the `metadata_location` field, unless the metadata +/// is not yet committed. For example, a create transaction may return metadata that is staged +/// but not committed. +/// +/// The `config` map returns table-specific configuration for the table's resources, including +/// its HTTP client and FileIO. For example, config may contain a specific FileIO implementation +/// class for the table depending on its underlying storage. +pub struct LoadTableResult { + /// May be null if the table is staged as part of a transaction + pub metadata_location: Option, + /// The table's full metadata + pub metadata: TableMetadata, + /// Table-specific configuration overriding catalog configuration + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub config: HashMap, + /// Storage credentials for accessing table data. Clients should check this field + /// before falling back to credentials in the `config` field. + #[serde(skip_serializing_if = "Option::is_none")] + pub storage_credentials: Option>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Storage credential for a specific location prefix. +/// +/// Indicates a storage location prefix where the credential is relevant. Clients should +/// choose the most specific prefix (by selecting the longest prefix) if several credentials +/// of the same type are available. +pub struct StorageCredential { + /// Storage location prefix where this credential is relevant + pub prefix: String, + /// Configuration map containing credential information + pub config: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct CreateTableRequest { - pub(super) name: String, - pub(super) location: Option, - pub(super) schema: Schema, - pub(super) partition_spec: Option, - pub(super) write_order: Option, - pub(super) stage_create: Option, - pub(super) properties: Option>, +/// Request to create a new table in a namespace. +/// +/// If `stage_create` is false, the table is created immediately. +/// If `stage_create` is true, the table is not created, but table metadata is initialized +/// and returned. The service should prepare as needed for a commit to the table commit +/// endpoint to complete the create transaction. +pub struct CreateTableRequest { + /// Name of the table to create + pub name: String, + /// Optional table location. If not provided, the server will choose a location. + pub location: Option, + /// Table schema + pub schema: Schema, + /// Optional partition specification. If not provided, the table will be unpartitioned. + pub partition_spec: Option, + /// Optional sort order for the table + pub write_order: Option, + /// Whether to stage the create for a transaction (true) or create immediately (false) + pub stage_create: Option, + /// Optional properties to set on the table + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct CommitTableRequest { - pub(super) identifier: TableIdent, - pub(super) requirements: Vec, - pub(super) updates: Vec, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +/// Request to commit updates to a table. +/// +/// Commits have two parts: requirements and updates. Requirements are assertions that will +/// be validated before attempting to make and commit changes. Updates are changes to make +/// to table metadata. +/// +/// Create table transactions that are started by createTable with `stage-create` set to true +/// are committed using this request. Transactions should include all changes to the table, +/// including table initialization, like AddSchemaUpdate and SetCurrentSchemaUpdate. +pub struct CommitTableRequest { + /// Table identifier to update; must be present for CommitTransactionRequest + #[serde(skip_serializing_if = "Option::is_none")] + pub identifier: Option, + /// List of requirements that must be satisfied before committing changes + pub requirements: Vec, + /// List of updates to apply to the table metadata + pub updates: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct CommitTableResponse { - pub(super) metadata_location: String, - pub(super) metadata: TableMetadata, +/// Response returned when a table is successfully updated. +/// +/// The table metadata JSON is returned in the metadata field. The corresponding file location +/// of table metadata must be returned in the metadata-location field. Clients can check whether +/// metadata has changed by comparing metadata locations. +pub struct CommitTableResponse { + /// Location of the updated table metadata file + pub metadata_location: String, + /// The table's updated metadata + pub metadata: TableMetadata, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct RegisterTableRequest { - pub(super) name: String, - pub(super) metadata_location: String, - pub(super) overwrite: Option, +/// Request to register a table using an existing metadata file location. +pub struct RegisterTableRequest { + /// Name of the table to register + pub name: String, + /// Location of the metadata file for the table + pub metadata_location: String, + /// Whether to overwrite table metadata if the table already exists + pub overwrite: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_namespace_response_serde() { + let json = serde_json::json!({ + "namespace": ["nested", "ns"], + "properties": { + "key1": "value1", + "key2": "value2" + } + }); + let ns_response: NamespaceResponse = + serde_json::from_value(json.clone()).expect("Deserialization failed"); + assert_eq!(ns_response, NamespaceResponse { + namespace: NamespaceIdent::from_vec(vec!["nested".to_string(), "ns".to_string()]) + .unwrap(), + properties: HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]), + }); + assert_eq!( + serde_json::to_value(&ns_response).expect("Serialization failed"), + json + ); + + // Without properties + let json_no_props = serde_json::json!({ + "namespace": ["db", "schema"] + }); + let ns_response_no_props: NamespaceResponse = + serde_json::from_value(json_no_props.clone()).expect("Deserialization failed"); + assert_eq!(ns_response_no_props, NamespaceResponse { + namespace: NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]) + .unwrap(), + properties: HashMap::new(), + }); + assert_eq!( + serde_json::to_value(&ns_response_no_props).expect("Serialization failed"), + json_no_props + ); + } } diff --git a/crates/catalog/rest/src/types/events.rs b/crates/catalog/rest/src/types/events.rs new file mode 100644 index 0000000000..23e0de5c9b --- /dev/null +++ b/crates/catalog/rest/src/types/events.rs @@ -0,0 +1,1286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the iceberg REST catalog types related to the Events API. + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use iceberg::{Error, NamespaceIdent, TableIdent, TableRequirement, TableUpdate, ViewUpdate}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::types::{NamespaceResponse, UpdateNamespacePropertiesResponse}; + +/// Reference to a named object in the catalog (namespace, table, or view). +#[derive(Debug, Hash, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd)] +#[serde(transparent)] +pub struct CatalogObjectIdentifier(Vec); + +impl CatalogObjectIdentifier { + /// Try to convert into a NamespaceIdent. + /// Errors if the identifier has zero parts. + pub fn into_namespace_ident(self) -> Result { + NamespaceIdent::from_vec(self.0) + } + + /// Try to convert into a TableIdent. + /// Errors if the identifier has fewer than two parts. + pub fn into_table_ident(self) -> Result { + TableIdent::from_strs(self.0) + } + + /// Returns inner strings. + pub fn inner(self) -> Vec { + self.0 + } + + /// Get the parent of this object. + /// Returns None if this object only has a single element and thus has no parent. + pub fn parent(&self) -> Option { + self.0.split_last().and_then(|(_, parent)| { + if parent.is_empty() { + None + } else { + Some(Self(parent.to_vec())) + } + }) + } + + /// Number of parts in the identifier. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Check if the identifier is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +/// Identify a table or view by its uuid. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CatalogObjectUuid { + /// The UUID of the catalog object + pub uuid: Uuid, + /// The type of the catalog object + #[serde(rename = "type")] + pub object_type: CatalogObjectType, +} + +/// The type of a catalog object identified by UUID. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum CatalogObjectType { + /// Table object + Table, + /// View object + View, +} + +/// Type of operation in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(untagged)] +#[serde(bound(deserialize = "T: for<'a> Deserialize<'a>"))] +pub enum OperationType +where T: CustomOperationType +{ + /// Standard operation types defined by the Iceberg REST API specification + Standard(StandardOperationType), + /// Custom operation type for catalog-specific extensions. + Custom(T), +} + +/// Trait for types that can be used as custom operation types. +pub trait CustomOperationType: + Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq +{ + /// Create a custom operation type, ensuring proper formatting (e.g., "x-" prefix for strings). + fn normalize(self) -> Self { + self + } +} + +impl CustomOperationType for String { + fn normalize(self) -> Self { + if self.starts_with("x-") { + self + } else { + format!("x-{}", self) + } + } +} + +impl OperationType { + /// Create a standard operation type. + pub fn new_standard(op: StandardOperationType) -> Self { + Self::Standard(op) + } + + /// Create a custom operation type. + /// For string-based types, the value will be automatically prefixed with "x-" if not already present. + pub fn new_custom(value: T) -> Self { + Self::Custom(value.normalize()) + } + + /// Check if the operation type is a standard well-known type. + pub fn is_standard(&self) -> bool { + matches!(self, OperationType::Standard(_)) + } + + /// Check if the operation type is custom. + pub fn is_custom(&self) -> bool { + matches!(self, OperationType::Custom(_)) + } +} + +impl From for OperationType { + fn from(op: StandardOperationType) -> Self { + OperationType::Standard(op) + } +} + +/// Standard operation types defined by the Iceberg REST API specification. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum StandardOperationType { + /// Create table operation + CreateTable, + /// Register table operation + RegisterTable, + /// Drop table operation + DropTable, + /// Update table operation + UpdateTable, + /// Rename table operation + RenameTable, + /// Create view operation + CreateView, + /// Drop view operation + DropView, + /// Update view operation + UpdateView, + /// Rename view operation + RenameView, + /// Create namespace operation + CreateNamespace, + /// Update namespace properties operation + UpdateNamespaceProperties, + /// Drop namespace operation + DropNamespace, +} + +/// Type of catalog object for filtering events. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum QueryEventsRequestObjectType { + /// Namespace object + Namespace, + /// Table object + Table, + /// View object + View, +} + +/// Trait for custom filter collections in event queries. +/// +/// Enables catalog implementations to extend filtering beyond standard Iceberg REST API filters. +/// Types must convert to/from a flat map for REST API serialization. +pub trait CustomFilterCollection: Default + Clone + PartialEq + Eq { + /// Convert into a filter map, which is used for serialization in + /// the REST API. + fn try_as_filter_map( + &self, + ) -> Result + Serialize, impl Serialize>, Error>; + + /// Create from a filter map, which is used for deserialization in + /// the REST API. + fn try_from_filter_map(map: HashMap) -> Result; + + /// Returns the number of custom filters. + fn len(&self) -> usize; + + /// Check if there are no custom filters. + fn is_empty(&self) -> bool; +} + +impl CustomFilterCollection for HashMap { + fn try_as_filter_map( + &self, + ) -> Result + Serialize, impl Serialize>, Error> { + Ok(self.clone()) + } + + fn try_from_filter_map(map: HashMap) -> Result { + Ok(map) + } + + fn len(&self) -> usize { + HashMap::len(self) + } + + fn is_empty(&self) -> bool { + HashMap::is_empty(self) + } +} + +/// Request to query catalog events. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct QueryEventsRequest> +where F: CustomFilterCollection +{ + /// Opaque pagination token + #[builder(default)] + pub continuation_token: Option, + + /// The maximum number of events to return in a single response + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub page_size: Option, + + /// The (server) timestamp to start consuming events from (inclusive). + /// During serialization/deserialization, this is represented as milliseconds since epoch. + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_optional_datetime_ms", + deserialize_with = "deserialize_optional_datetime_ms" + )] + #[serde(rename = "after-timestamp-ms")] + #[builder(default)] + pub after_timestamp: Option>, + + /// Filter events by the type of operation + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub operation_types: Option>, + + /// Filter events by catalog objects referenced by name + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub catalog_objects_by_name: Option>, + + /// Filter events by catalog objects referenced by UUID + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub catalog_objects_by_id: Option>, + + /// Filter events by the type of catalog object + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub object_types: Option>, + + /// Implementation-specific filter extensions + #[serde( + default, + skip_serializing_if = "CustomFilterCollection::is_empty", + serialize_with = "serialize_custom_filters::", + deserialize_with = "deserialize_custom_filters::" + )] + #[builder(default)] + pub custom_filters: F, +} + +/// Response to a query for catalog events. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub struct QueryEventsResponse> +where + C: CustomOperation, + A: Actor, +{ + /// Opaque pagination token for retrieving the next page of results + pub continuation_token: String, + + /// The highest event timestamp processed when generating this response. + /// This may not necessarily appear in the returned events if it was filtered out. + #[serde( + serialize_with = "serialize_datetime_ms", + deserialize_with = "deserialize_datetime_ms" + )] + #[serde(rename = "highest-processed-timestamp-ms")] + pub highest_processed_timestamp: DateTime, + + /// List of events matching the query criteria + pub events: Vec>, +} + +/// Trait for actor representation in events. +/// Enables catalog implementations to define custom actor types. +pub trait Actor: Default + Clone + PartialEq + Eq { + /// Convert into a property map, which is used for serialization in + /// the REST API. + fn try_as_property_map( + &self, + ) -> Result + Serialize, impl Serialize>, Error>; + + /// Create from a property map, which is used for deserialization in + /// the REST API. + fn try_from_property_map(map: HashMap) -> Result; +} + +impl Actor for HashMap { + fn try_as_property_map( + &self, + ) -> Result + Serialize, impl Serialize>, Error> { + Ok(self.clone()) + } + + fn try_from_property_map(map: HashMap) -> Result { + Ok(map) + } +} + +/// An event representing a change to a catalog object. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct Event> +where + C: CustomOperation, + A: Actor, +{ + /// Unique ID of this event. Clients should perform deduplication based on this ID. + pub event_id: String, + + /// Opaque ID of the request this event belongs to. + /// This ID can be used to identify events that were part of the same request. + /// Servers generate this ID randomly. + pub request_id: String, + + /// Total number of events in this batch or request. + /// Some endpoints, such as "updateTable" and "commitTransaction", can perform multiple updates in a single atomic request. + /// Each update is modeled as a separate event. All events generated by the same request share the same `request-id`. + /// The `request-event-count` field indicates the total number of events generated by that request. + pub request_event_count: i32, + + /// Timestamp when this event occurred (epoch milliseconds). + /// Timestamps are not guaranteed to be unique. Typically all events in + /// a transaction will have the same timestamp. + #[serde( + serialize_with = "serialize_datetime_ms", + deserialize_with = "deserialize_datetime_ms" + )] + #[serde(rename = "timestamp-ms")] + pub timestamp: DateTime, + + /// The actor who performed the operation, such as a user or service account. + /// The content of this field is implementation specific. + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_custom_actor::", + deserialize_with = "deserialize_custom_actor::" + )] + #[builder(default)] + pub actor: Option, + + /// The operation that was performed. + /// Clients should discard events with unknown operation types. + #[serde(flatten)] + pub operation: Operation, +} + +/// Operation that was performed on a catalog object. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "operation-type", rename_all = "kebab-case")] +pub enum Operation +where C: CustomOperation +{ + /// Create table operation + CreateTable(CreateTableOperation), + /// Register table operation + RegisterTable(RegisterTableOperation), + /// Drop table operation + DropTable(DropTableOperation), + /// Update table operation + UpdateTable(UpdateTableOperation), + /// Rename table operation + RenameTable(RenameTableOperation), + /// Create view operation + CreateView(CreateViewOperation), + /// Drop view operation + DropView(DropViewOperation), + /// Update view operation + UpdateView(UpdateViewOperation), + /// Rename view operation + RenameView(RenameViewOperation), + /// Create namespace operation + CreateNamespace(CreateNamespaceOperation), + /// Update namespace properties operation + UpdateNamespaceProperties(UpdateNamespacePropertiesOperation), + /// Drop namespace operation + DropNamespace(DropNamespaceOperation), + /// Custom operation for catalog-specific extensions + Custom(C), +} + +/// Operation to create a new table in the catalog. +/// Events for this operation must be issued when the create is finalized and committed, +/// not when the create is staged. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct CreateTableOperation { + /// Table identifier + pub identifier: TableIdent, + /// UUID of the table + pub table_uuid: Uuid, + /// Table updates applied during creation + #[builder(default)] + pub updates: Vec, +} + +/// Operation to register an existing table in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct RegisterTableOperation { + /// Table identifier + pub identifier: TableIdent, + /// UUID of the table + pub table_uuid: Uuid, + /// Optional table updates applied during registration + #[serde(default, skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub updates: Vec, +} + +/// Operation to drop a table from the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct DropTableOperation { + /// Table identifier + pub identifier: TableIdent, + /// UUID of the table + pub table_uuid: Uuid, + /// Whether the purge flag was set + #[serde(default, skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub purge: Option, +} + +/// Operation to update a table in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct UpdateTableOperation { + /// Table identifier + pub identifier: TableIdent, + /// UUID of the table + pub table_uuid: Uuid, + /// Table updates to apply + pub updates: Vec, + /// Requirements that must be met for the update to succeed + #[serde(default, skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub requirements: Vec, +} + +/// Operation to rename a table. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct RenameTableOperation { + /// Current table identifier to rename + pub source: TableIdent, + /// New table identifier to rename to + pub destination: TableIdent, + /// UUID of the table + pub table_uuid: Uuid, +} + +/// Operation to rename a view. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct RenameViewOperation { + /// UUID of the view + pub view_uuid: Uuid, + /// Current view identifier to rename + pub source: TableIdent, + /// New view identifier to rename to + pub destination: TableIdent, +} + +/// Operation to create a view in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct CreateViewOperation { + /// View identifier + pub identifier: TableIdent, + /// UUID of the view + pub view_uuid: Uuid, + /// View updates applied during creation + pub updates: Vec, +} + +/// Operation to drop a view from the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct DropViewOperation { + /// View identifier + pub identifier: TableIdent, + /// UUID of the view + pub view_uuid: Uuid, +} + +/// Operation to update a view in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct UpdateViewOperation { + /// View identifier + pub identifier: TableIdent, + /// UUID of the view + pub view_uuid: Uuid, + /// View updates to apply + pub updates: Vec, + /// Requirements that must be met for the update to succeed + #[serde(default, skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub requirements: Vec, +} + +/// Operation to create a namespace in the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct CreateNamespaceOperation { + #[serde(flatten)] + /// Response containing the created namespace details + pub namespace_response: NamespaceResponse, +} + +/// Operation to update namespace properties. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct UpdateNamespacePropertiesOperation { + /// Namespace identifier + pub namespace: NamespaceIdent, + #[serde(flatten)] + /// Response containing the updated namespace properties + pub update_namespace_properties_response: UpdateNamespacePropertiesResponse, +} + +/// Operation to drop a namespace from the catalog. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, typed_builder::TypedBuilder)] +#[serde(rename_all = "kebab-case")] +pub struct DropNamespaceOperation { + /// Namespace identifier + pub namespace: NamespaceIdent, +} + +/// Trait for custom operations in catalog events. +pub trait CustomOperation: Clone + PartialEq + Eq { + /// The type used for the custom operation type, typically an enum + /// with the supported custom operation variants. + type CustomOperationType: CustomOperationType; + + /// Convert to a generic custom operation representation. + /// This is used for serialization in the REST API. + fn try_into_raw( + &self, + ) -> Result< + RawCustomOperationSer< + '_, + Self::CustomOperationType, + impl AsRef + Serialize, + impl Serialize, + >, + Error, + >; + + /// Create from a generic custom operation representation. + /// This is used for deserialization in the REST API. + fn try_from_raw(raw: RawCustomOperation) -> Result; +} + +/// Custom operation for catalog-specific extensions not defined in the standard. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(bound(deserialize = "T: for<'a> Deserialize<'a>"))] +#[serde(rename_all = "kebab-case")] +pub struct RawCustomOperation +where T: CustomOperationType +{ + /// Custom operation type identifier + pub custom_type: T, + /// Optional table/view identifier + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "identifier")] + pub tabular_ident: Option, + /// Optional table UUID + #[serde(skip_serializing_if = "Option::is_none")] + pub table_uuid: Option, + /// Optional view UUID + #[serde(skip_serializing_if = "Option::is_none")] + pub view_uuid: Option, + /// Optional namespace identifier + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option, + /// Additional custom properties + #[serde(flatten)] + pub properties: HashMap, +} + +impl CustomOperation for RawCustomOperation { + type CustomOperationType = String; + + fn try_into_raw( + &self, + ) -> Result< + RawCustomOperationSer< + '_, + ::CustomOperationType, + impl AsRef + Serialize, + impl Serialize, + >, + iceberg::Error, + > { + Ok(RawCustomOperationSer { + custom_type: &self.custom_type, + tabular_ident: self.tabular_ident.as_ref(), + table_uuid: self.table_uuid, + view_uuid: self.view_uuid, + namespace: self.namespace.as_ref(), + properties: self.properties.clone(), + }) + } + + fn try_from_raw(generic: RawCustomOperation) -> Result { + Ok(Self { + custom_type: generic.custom_type, + tabular_ident: generic.tabular_ident, + table_uuid: generic.table_uuid, + view_uuid: generic.view_uuid, + namespace: generic.namespace, + properties: generic.properties, + }) + } +} + +/// Custom operation for catalog-specific extensions not defined in the standard. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct RawCustomOperationSer<'s, T, PK, PV> +where + T: CustomOperationType, + PK: AsRef + Serialize, + PV: Serialize, +{ + /// Custom operation type identifier + pub custom_type: &'s T, + /// Optional table/view identifier + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "identifier")] + pub tabular_ident: Option<&'s TableIdent>, + /// Optional table UUID + // UUIDs are Copy, so we don't need a reference here + #[serde(skip_serializing_if = "Option::is_none")] + pub table_uuid: Option, + /// Optional view UUID + #[serde(skip_serializing_if = "Option::is_none")] + pub view_uuid: Option, + /// Optional namespace identifier + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option<&'s NamespaceIdent>, + /// Additional custom properties + // Custom Operation enums typically hold their filters typesafe, + // thus we must assume the HashMap is constructed just for serialization. + // Hence it is owned, but keys & values can be references. + #[serde(flatten)] + pub properties: HashMap, +} + +fn serialize_datetime_ms(dt: &DateTime, serializer: S) -> Result +where S: serde::Serializer { + serializer.serialize_i64(dt.timestamp_millis()) +} + +fn deserialize_datetime_ms<'de, D>(deserializer: D) -> Result, D::Error> +where D: serde::Deserializer<'de> { + let ms = i64::deserialize(deserializer)?; + DateTime::from_timestamp_millis(ms).ok_or_else(|| { + serde::de::Error::custom("invalid timestamp, expected milliseconds since epoch") + }) +} + +fn serialize_optional_datetime_ms( + dt: &Option>, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + match dt { + Some(dt) => serializer.serialize_some(&dt.timestamp_millis()), + None => serializer.serialize_none(), + } +} + +fn deserialize_optional_datetime_ms<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where D: serde::Deserializer<'de> { + Option::::deserialize(deserializer)? + .map(|ms| { + DateTime::from_timestamp_millis(ms).ok_or_else(|| { + serde::de::Error::custom("invalid timestamp, expected milliseconds since epoch") + }) + }) + .transpose() +} + +fn serialize_custom_filters(filters: &F, serializer: S) -> Result +where + F: CustomFilterCollection, + S: serde::Serializer, +{ + filters + .try_as_filter_map() + .map_err(serde::ser::Error::custom)? + .serialize(serializer) +} + +fn deserialize_custom_filters<'de, F, D>(deserializer: D) -> Result +where + F: CustomFilterCollection, + D: serde::Deserializer<'de>, +{ + let map = HashMap::::deserialize(deserializer)?; + F::try_from_filter_map(map).map_err(serde::de::Error::custom) +} + +fn serialize_custom_actor(actor: &Option, serializer: S) -> Result +where + A: Actor, + S: serde::Serializer, +{ + match actor { + Some(actor) => actor + .try_as_property_map() + .map_err(serde::ser::Error::custom)? + .serialize(serializer), + None => serializer.serialize_none(), + } +} + +fn deserialize_custom_actor<'de, A, D>(deserializer: D) -> Result, D::Error> +where + A: Actor, + D: serde::Deserializer<'de>, +{ + let opt_map = Option::>::deserialize(deserializer)?; + match opt_map { + Some(map) => Ok(Some( + A::try_from_property_map(map).map_err(serde::de::Error::custom)?, + )), + None => Ok(None), + } +} + +#[cfg(test)] +mod tests { + use iceberg::ErrorKind; + + use super::*; + + #[test] + fn test_query_events_request_serde() { + let request = QueryEventsRequest { + continuation_token: Some("token123".to_string()), + page_size: Some(100), + after_timestamp: Some(DateTime::from_timestamp_millis(1625079600000).unwrap()), + operation_types: Some(vec![ + OperationType::Standard(StandardOperationType::CreateTable), + OperationType::Standard(StandardOperationType::DropTable), + ]), + catalog_objects_by_name: Some(vec![CatalogObjectIdentifier(vec![ + "namespace".to_string(), + "table_name".to_string(), + ])]), + catalog_objects_by_id: Some(vec![CatalogObjectUuid { + uuid: Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap(), + object_type: CatalogObjectType::Table, + }]), + object_types: Some(vec![ + QueryEventsRequestObjectType::Table, + QueryEventsRequestObjectType::View, + ]), + custom_filters: HashMap::from([( + "custom-key".to_string(), + serde_json::json!("custom-value"), + )]), + }; + + let json = serde_json::to_value(&request).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "continuation-token": "token123", + "page-size": 100, + "after-timestamp-ms": 1625079600000i64, + "operation-types": ["create-table", "drop-table"], + "catalog-objects-by-name": [["namespace", "table_name"]], + "catalog-objects-by-id": [{ + "uuid": "123e4567-e89b-12d3-a456-426614174000", + "type": "table" + }], + "object-types": ["table", "view"], + "custom-filters": { + "custom-key": "custom-value" + } + }) + ); + + let deserialized: QueryEventsRequest = serde_json::from_value(json).unwrap(); + assert_eq!(request, deserialized); + } + + #[test] + fn test_query_events_request_minimal() { + let request = QueryEventsRequest::builder().build(); + let json = serde_json::to_value(&request).unwrap(); + // Page token should be serialized to show that client supports + // pagination, even if it's None. + assert_eq!( + json, + serde_json::json!({ + "continuation-token": null, + }) + ); + + let deserialized: QueryEventsRequest = + serde_json::from_value(serde_json::json!({})).unwrap(); + assert_eq!(request, deserialized); + } + + #[test] + fn test_operation_type_serialization() { + let standard = OperationType::from(StandardOperationType::CreateTable); + let json = serde_json::to_string(&standard).unwrap(); + assert_eq!(json, r#""create-table""#); + + let custom = OperationType::new_custom("custom-op".to_string()); + let json = serde_json::to_string(&custom).unwrap(); + assert_eq!(json, r#""x-custom-op""#); + } + + #[test] + fn test_operation_type_deserialization() { + let standard_json = r#""drop-view""#; + let standard: OperationType = serde_json::from_str(standard_json).unwrap(); + assert_eq!( + standard, + OperationType::from(StandardOperationType::DropView) + ); + let custom_json = r#""x-my-custom-op""#; + let custom: OperationType = serde_json::from_str(custom_json).unwrap(); + assert_eq!( + custom, + OperationType::new_custom("x-my-custom-op".to_string()) + ); + } + + #[test] + fn test_custom_operation_type_serde() { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + pub enum MyCustomOp { + #[serde(rename = "x-grant-select")] + GrantSelect, + } + + impl CustomOperationType for MyCustomOp {} + + let custom = OperationType::new_custom(MyCustomOp::GrantSelect); + let json = serde_json::to_string(&custom).unwrap(); + assert_eq!(json, r#""x-grant-select""#); + let deserialized: OperationType = serde_json::from_str(&json).unwrap(); + assert_eq!(custom, deserialized); + } + + #[test] + fn test_custom_raw_operation_serde() { + let raw_op = RawCustomOperation { + custom_type: "x-custom-op".to_string(), + tabular_ident: Some( + TableIdent::from_strs(vec!["namespace".to_string(), "table".to_string()]).unwrap(), + ), + table_uuid: Some(Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap()), + view_uuid: None, + namespace: None, + properties: HashMap::from([( + "extra-info".to_string(), + serde_json::json!("some value"), + )]), + }; + + let json = serde_json::to_value(&raw_op).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "custom-type": "x-custom-op", + "identifier": { + "namespace": ["namespace"], + "name": "table" + }, + "table-uuid": "123e4567-e89b-12d3-a456-426614174000", + "extra-info": "some value" + }) + ); + + let deserialized: RawCustomOperation = serde_json::from_value(json).unwrap(); + assert_eq!(raw_op, deserialized); + } + + #[test] + fn test_custom_operation_serde() { + #[derive(Clone, Hash, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub enum MyCustomOperationTypes { + GrantModifyOnTable, + } + + impl CustomOperationType for MyCustomOperationTypes {} + + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub enum MyCustomOperations { + GrantModifyOnTable { + table_ident: TableIdent, + table_uuid: Uuid, + to_principal: String, + }, + } + + impl CustomOperation for MyCustomOperations { + type CustomOperationType = MyCustomOperationTypes; + + fn try_into_raw( + &self, + ) -> Result< + RawCustomOperationSer< + '_, + Self::CustomOperationType, + impl AsRef + Serialize, + impl Serialize, + >, + Error, + > { + match self { + MyCustomOperations::GrantModifyOnTable { + table_ident, + table_uuid, + to_principal, + } => { + let mut properties = HashMap::new(); + properties.insert("to-principal".to_string(), to_principal.clone()); + Ok(RawCustomOperationSer { + custom_type: &MyCustomOperationTypes::GrantModifyOnTable, + tabular_ident: Some(table_ident), + table_uuid: Some(*table_uuid), + view_uuid: None, + namespace: None, + properties, + }) + } + } + } + + fn try_from_raw( + raw: RawCustomOperation, + ) -> Result { + match raw.custom_type { + MyCustomOperationTypes::GrantModifyOnTable => { + let table_ident = raw.tabular_ident.ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "missing table identifier") + })?; + let table_uuid = raw.table_uuid.ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "missing table UUID") + })?; + let to_principal = raw + .properties + .get("to-principal") + .and_then(|v| v.as_str()) + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "missing to-principal property") + })? + .to_string(); + Ok(MyCustomOperations::GrantModifyOnTable { + table_ident, + table_uuid, + to_principal, + }) + } + } + } + } + + let custom_op = MyCustomOperations::GrantModifyOnTable { + table_ident: TableIdent::from_strs(vec!["namespace", "my_table"]).unwrap(), + table_uuid: Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap(), + to_principal: "user@example.com".to_string(), + }; + + // Test serialization via the trait + let raw_ser = custom_op.try_into_raw().unwrap(); + let json = serde_json::to_value(&raw_ser).unwrap(); + + assert_eq!( + json, + serde_json::json!({ + "custom-type": "GrantModifyOnTable", + "identifier": { + "namespace": ["namespace"], + "name": "my_table" + }, + "table-uuid": "123e4567-e89b-12d3-a456-426614174000", + "to-principal": "user@example.com" + }) + ); + + // Test deserialization via the trait + let raw_custom_op: RawCustomOperation = RawCustomOperation { + custom_type: MyCustomOperationTypes::GrantModifyOnTable, + tabular_ident: Some(TableIdent::from_strs(vec!["namespace", "my_table"]).unwrap()), + table_uuid: Some(Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap()), + view_uuid: None, + namespace: None, + properties: HashMap::from([( + "to-principal".to_string(), + serde_json::json!("user@example.com"), + )]), + }; + + let deserialized = MyCustomOperations::try_from_raw(raw_custom_op).unwrap(); + assert_eq!(custom_op, deserialized); + } + + #[test] + fn test_query_events_response_serde() { + let response = QueryEventsResponse { + continuation_token: "next-page-token".to_string(), + highest_processed_timestamp: DateTime::from_timestamp_millis(1625079600000).unwrap(), + events: vec![ + Event::builder() + .event_id("event-1".to_string()) + .request_id("req-123".to_string()) + .request_event_count(2) + .timestamp(DateTime::from_timestamp_millis(1625079600000).unwrap()) + .operation(Operation::CreateTable(CreateTableOperation { + identifier: TableIdent::from_strs(vec!["namespace", "table1"]).unwrap(), + table_uuid: Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000") + .unwrap(), + updates: vec![], + })) + .build(), + Event::builder() + .event_id("event-2".to_string()) + .request_id("req-123".to_string()) + .request_event_count(2) + .timestamp(DateTime::from_timestamp_millis(1625079600000).unwrap()) + .actor(Some(HashMap::from([( + "user".to_string(), + serde_json::json!("alice@example.com"), + )]))) + .operation(Operation::DropTable(DropTableOperation { + identifier: TableIdent::from_strs(vec!["namespace", "table2"]).unwrap(), + table_uuid: Uuid::parse_str("223e4567-e89b-12d3-a456-426614174000") + .unwrap(), + purge: Some(true), + })) + .build(), + ], + }; + + let json = serde_json::to_value(&response).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "continuation-token": "next-page-token", + "highest-processed-timestamp-ms": 1625079600000i64, + "events": [ + { + "event-id": "event-1", + "request-id": "req-123", + "request-event-count": 2, + "timestamp-ms": 1625079600000i64, + "operation-type": "create-table", + "identifier": { + "namespace": ["namespace"], + "name": "table1" + }, + "table-uuid": "123e4567-e89b-12d3-a456-426614174000", + "updates": [] + }, + { + "event-id": "event-2", + "request-id": "req-123", + "request-event-count": 2, + "timestamp-ms": 1625079600000i64, + "actor": { + "user": "alice@example.com" + }, + "operation-type": "drop-table", + "identifier": { + "namespace": ["namespace"], + "name": "table2" + }, + "table-uuid": "223e4567-e89b-12d3-a456-426614174000", + "purge": true + } + ] + }) + ); + + let deserialized: QueryEventsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(response, deserialized); + } + + #[test] + fn test_query_events_response_empty_events() { + let response = QueryEventsResponse { + continuation_token: "token".to_string(), + highest_processed_timestamp: DateTime::from_timestamp_millis(1625079600000).unwrap(), + events: vec![], + }; + + let json = serde_json::to_value(&response).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "continuation-token": "token", + "highest-processed-timestamp-ms": 1625079600000i64, + "events": [] + }) + ); + + let deserialized: QueryEventsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(response, deserialized); + } + + #[test] + fn test_query_events_response_with_namespace_operations() { + let response = QueryEventsResponse { + continuation_token: "page2".to_string(), + highest_processed_timestamp: DateTime::from_timestamp_millis(1625079700000).unwrap(), + events: vec![ + Event::builder() + .event_id("event-ns-1".to_string()) + .request_id("req-ns-1".to_string()) + .request_event_count(1) + .timestamp(DateTime::from_timestamp_millis(1625079700000).unwrap()) + .operation(Operation::CreateNamespace(CreateNamespaceOperation { + namespace_response: NamespaceResponse { + namespace: NamespaceIdent::from_vec(vec!["new_namespace".to_string()]) + .unwrap(), + properties: HashMap::from([( + "owner".to_string(), + "team-a".to_string(), + )]), + }, + })) + .build(), + Event::builder() + .event_id("event-ns-2".to_string()) + .request_id("req-ns-2".to_string()) + .request_event_count(1) + .timestamp(DateTime::from_timestamp_millis(1625079700000).unwrap()) + .operation(Operation::DropNamespace(DropNamespaceOperation { + namespace: NamespaceIdent::from_vec(vec!["old_namespace".to_string()]) + .unwrap(), + })) + .build(), + ], + }; + + let json = serde_json::to_value(&response).unwrap(); + let deserialized: QueryEventsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(response, deserialized); + } + + #[test] + fn test_query_events_response_with_view_operations() { + let response = QueryEventsResponse { + continuation_token: "view-page".to_string(), + highest_processed_timestamp: DateTime::from_timestamp_millis(1625079800000).unwrap(), + events: vec![ + Event::builder() + .event_id("event-view-1".to_string()) + .request_id("req-view-1".to_string()) + .request_event_count(1) + .timestamp(DateTime::from_timestamp_millis(1625079800000).unwrap()) + .operation(Operation::CreateView(CreateViewOperation { + identifier: TableIdent::from_strs(vec!["namespace", "view1"]).unwrap(), + view_uuid: Uuid::parse_str("323e4567-e89b-12d3-a456-426614174000").unwrap(), + updates: vec![], + })) + .build(), + Event::builder() + .event_id("event-view-2".to_string()) + .request_id("req-view-2".to_string()) + .request_event_count(1) + .timestamp(DateTime::from_timestamp_millis(1625079800000).unwrap()) + .operation(Operation::DropView(DropViewOperation { + identifier: TableIdent::from_strs(vec!["namespace", "view2"]).unwrap(), + view_uuid: Uuid::parse_str("423e4567-e89b-12d3-a456-426614174000").unwrap(), + })) + .build(), + ], + }; + + let json = serde_json::to_value(&response).unwrap(); + let deserialized: QueryEventsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(response, deserialized); + } + + #[test] + fn test_query_events_response_with_custom_operation() { + let response: QueryEventsResponse = QueryEventsResponse { + continuation_token: "custom-page".to_string(), + highest_processed_timestamp: DateTime::from_timestamp_millis(1625079900000).unwrap(), + events: vec![ + Event::builder() + .event_id("event-custom-1".to_string()) + .request_id("req-custom-1".to_string()) + .request_event_count(1) + .timestamp(DateTime::from_timestamp_millis(1625079900000).unwrap()) + .operation(Operation::Custom(RawCustomOperation { + custom_type: "x-custom-op".to_string(), + tabular_ident: Some( + TableIdent::from_strs(vec!["namespace", "table"]).unwrap(), + ), + table_uuid: Some( + Uuid::parse_str("523e4567-e89b-12d3-a456-426614174000").unwrap(), + ), + view_uuid: None, + namespace: None, + properties: HashMap::from([( + "custom-property".to_string(), + serde_json::json!("custom-value"), + )]), + })) + .build(), + ], + }; + + let json = serde_json::to_value(&response).unwrap(); + assert_eq!( + json["events"][0]["operation-type"], + serde_json::json!("custom") + ); + assert_eq!( + json["events"][0]["custom-type"], + serde_json::json!("x-custom-op") + ); + + let deserialized: QueryEventsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(response, deserialized); + } +}