From de02eaf1c2e6597f0a3bd57288849c7c66fdba5e Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 24 Jan 2026 11:30:51 +0000 Subject: [PATCH 1/4] Implement Admin APIs for partitioning --- crates/fluss/src/client/admin.rs | 68 ++- crates/fluss/src/metadata/mod.rs | 2 + crates/fluss/src/metadata/partition.rs | 477 ++++++++++++++++++ crates/fluss/src/metadata/table.rs | 47 +- crates/fluss/src/proto/fluss_api.proto | 36 +- crates/fluss/src/rpc/api_key.rs | 12 + .../fluss/src/rpc/message/create_partition.rs | 59 +++ .../fluss/src/rpc/message/drop_partition.rs | 59 +++ .../src/rpc/message/list_partition_infos.rs | 63 +++ crates/fluss/src/rpc/message/mod.rs | 6 + crates/fluss/tests/integration/admin.rs | 135 ++++- 11 files changed, 952 insertions(+), 12 deletions(-) create mode 100644 crates/fluss/src/metadata/partition.rs create mode 100644 crates/fluss/src/rpc/message/create_partition.rs create mode 100644 crates/fluss/src/rpc/message/drop_partition.rs create mode 100644 crates/fluss/src/rpc/message/list_partition_infos.rs diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 6646f97c..12e2e624 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -17,13 +17,14 @@ use crate::client::metadata::Metadata; use crate::metadata::{ - DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket, TableDescriptor, - TableInfo, TablePath, + DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, + TableBucket, TableDescriptor, TableInfo, TablePath, }; use crate::rpc::message::{ - CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest, DropDatabaseRequest, - DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, GetTableRequest, - ListDatabasesRequest, ListTablesRequest, TableExistsRequest, + CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, + DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, + GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest, + ListTablesRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; @@ -138,6 +139,63 @@ impl FlussAdmin { Ok(response.table_name) } + /// List all partitions in the given table. + pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result> { + self.list_partition_infos_with_spec(table_path, None).await + } + + /// List partitions in the given table that match the partial partition spec. + pub async fn list_partition_infos_with_spec( + &self, + table_path: &TablePath, + partial_partition_spec: Option<&PartitionSpec>, + ) -> Result> { + let response = self + .admin_gateway + .request(ListPartitionInfosRequest::new( + table_path, + partial_partition_spec, + )) + .await?; + Ok(response.get_partitions_info()) + } + + /// Create a new partition for a partitioned table. + pub async fn create_partition( + &self, + table_path: &TablePath, + partition_spec: &PartitionSpec, + ignore_if_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(CreatePartitionRequest::new( + table_path, + partition_spec, + ignore_if_exists, + )) + .await?; + Ok(()) + } + + /// Drop a partition from a partitioned table. + pub async fn drop_partition( + &self, + table_path: &TablePath, + partition_spec: &PartitionSpec, + ignore_if_not_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropPartitionRequest::new( + table_path, + partition_spec, + ignore_if_not_exists, + )) + .await?; + Ok(()) + } + /// Check if a table exists pub async fn table_exists(&self, table_path: &TablePath) -> Result { let response = self diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 9c0b1b47..0ca654a6 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -19,10 +19,12 @@ mod data_lake_format; mod database; mod datatype; mod json_serde; +mod partition; mod table; pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use json_serde::*; +pub use partition::*; pub use table::*; diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs new file mode 100644 index 00000000..020e07ec --- /dev/null +++ b/crates/fluss/src/metadata/partition.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionId; +use crate::error::{Error, Result}; +use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +/// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and +/// they need to be re-arranged to the correct order by comparing with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionSpec { + partition_spec: HashMap, +} + +impl PartitionSpec { + pub fn new(partition_spec: HashMap) -> Self { + Self { partition_spec } + } + + pub fn get_spec_map(&self) -> &HashMap { + &self.partition_spec + } + + pub fn to_pb(&self) -> PbPartitionSpec { + PbPartitionSpec { + partition_key_values: self + .partition_spec + .iter() + .map(|(k, v)| PbKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + } + } + + pub fn from_pb(pb: &PbPartitionSpec) -> Self { + let partition_spec = pb + .partition_key_values + .iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + Self { partition_spec } + } +} + +impl Display for PartitionSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "PartitionSpec{{{:?}}}", self.partition_spec) + } +} + +/// Represents a partition, which is the resolved version of PartitionSpec. The partition +/// spec is re-arranged into the correct order by comparing it with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResolvedPartitionSpec { + partition_keys: Vec, + partition_values: Vec, +} + +pub const PARTITION_SPEC_SEPARATOR: &str = "$"; + +impl ResolvedPartitionSpec { + pub fn new(partition_keys: Vec, partition_values: Vec) -> Result { + if partition_keys.len() != partition_values.len() { + return Err(Error::IllegalArgument { + message: "The number of partition keys and partition values should be the same." + .to_string(), + }); + } + Ok(Self { + partition_keys, + partition_values, + }) + } + + pub fn from_partition_spec( + partition_keys: Vec, + partition_spec: &PartitionSpec, + ) -> Self { + let partition_values = + Self::get_reordered_partition_values(&partition_keys, partition_spec); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { + Self { + partition_keys: vec![partition_key], + partition_values: vec![partition_value], + } + } + + pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { + let partition_values: Vec = + partition_name.split('$').map(|s| s.to_string()).collect(); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_qualified_name(qualified_partition_name: &str) -> Result { + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for pair in qualified_partition_name.split('/') { + let parts: Vec<&str> = pair.splitn(2, '=').collect(); + if parts.len() != 2 { + return Err(Error::IllegalArgument { + message: format!( + "Invalid partition name format. Expected key=value, got: {}", + pair + ), + }); + } + keys.push(parts[0].to_string()); + values.push(parts[1].to_string()); + } + + Ok(Self { + partition_keys: keys, + partition_values: values, + }) + } + + pub fn get_partition_keys(&self) -> &[String] { + &self.partition_keys + } + + pub fn get_partition_values(&self) -> &[String] { + &self.partition_values + } + + pub fn to_partition_spec(&self) -> PartitionSpec { + let mut spec_map = HashMap::new(); + for (i, key) in self.partition_keys.iter().enumerate() { + spec_map.insert(key.clone(), self.partition_values[i].clone()); + } + PartitionSpec::new(spec_map) + } + + /// Generate the partition name for a partition table of specify partition values. + /// + /// The partition name is in the following format: value1$value2$...$valueN + pub fn get_partition_name(&self) -> String { + self.partition_values.join(PARTITION_SPEC_SEPARATOR) + } + + /// Returns the qualified partition name for a partition spec. + /// The format is: key1=value1/key2=value2/.../keyN=valueN + pub fn get_partition_qualified_name(&self) -> String { + let mut sb = String::new(); + for (i, key) in self.partition_keys.iter().enumerate() { + sb.push_str(key); + sb.push('='); + sb.push_str(&self.partition_values[i]); + if i != self.partition_keys.len() - 1 { + sb.push('/'); + } + } + sb + } + + pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result { + let other_partition_keys = other.get_partition_keys(); + let other_partition_values = other.get_partition_values(); + + let mut expected_partition_values = Vec::new(); + for other_partition_key in other_partition_keys { + let key_index = self + .partition_keys + .iter() + .position(|k| k == other_partition_key); + match key_index { + Some(idx) => expected_partition_values.push(self.partition_values[idx].clone()), + None => { + return Err(Error::IllegalArgument { + message: format!( + "table don't contains this partitionKey: {}", + other_partition_key + ), + }); + } + } + } + + let expected_partition_name = expected_partition_values.join(PARTITION_SPEC_SEPARATOR); + let other_partition_name = other_partition_values.join(PARTITION_SPEC_SEPARATOR); + + Ok(expected_partition_name == other_partition_name) + } + + pub fn to_pb(&self) -> PbPartitionSpec { + PbPartitionSpec { + partition_key_values: self + .partition_keys + .iter() + .zip(self.partition_values.iter()) + .map(|(k, v)| PbKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + } + } + + pub fn from_pb(pb: &PbPartitionSpec) -> Self { + let partition_keys = pb + .partition_key_values + .iter() + .map(|kv| kv.key.clone()) + .collect(); + let partition_values = pb + .partition_key_values + .iter() + .map(|kv| kv.value.clone()) + .collect(); + Self { + partition_keys, + partition_values, + } + } + + fn get_reordered_partition_values( + partition_keys: &[String], + partition_spec: &PartitionSpec, + ) -> Vec { + let partition_spec_map = partition_spec.get_spec_map(); + partition_keys + .iter() + .map(|key| partition_spec_map.get(key).cloned().unwrap_or_default()) + .collect() + } +} + +impl Display for ResolvedPartitionSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.get_partition_qualified_name()) + } +} + +/// Information of a partition metadata, includes the partition's name and the partition id that +/// represents the unique identifier of the partition. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PartitionInfo { + partition_id: i64, + partition_spec: ResolvedPartitionSpec, +} + +impl PartitionInfo { + pub fn new(partition_id: i64, partition_spec: ResolvedPartitionSpec) -> Self { + Self { + partition_id, + partition_spec, + } + } + + /// Get the partition id. The id is globally unique in the Fluss cluster. + pub fn get_partition_id(&self) -> i64 { + self.partition_id + } + + /// Get the partition name. + pub fn get_partition_name(&self) -> String { + self.partition_spec.get_partition_name() + } + + pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec { + &self.partition_spec + } + + pub fn get_partition_spec(&self) -> PartitionSpec { + self.partition_spec.to_partition_spec() + } + + pub fn to_pb(&self) -> PbPartitionInfo { + PbPartitionInfo { + partition_id: self.partition_id, + partition_spec: self.partition_spec.to_pb(), + } + } + + pub fn from_pb(pb: &PbPartitionInfo) -> Self { + Self { + partition_id: pb.partition_id, + partition_spec: ResolvedPartitionSpec::from_pb(&pb.partition_spec), + } + } +} + +impl Display for PartitionInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Partition{{name='{}', id={}}}", + self.get_partition_name(), + self.partition_id + ) + } +} + +/// A class to identify a table partition, containing the table id and the partition id. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TablePartition { + table_id: i64, + partition_id: PartitionId, +} + +impl TablePartition { + pub fn new(table_id: i64, partition_id: PartitionId) -> Self { + Self { + table_id, + partition_id, + } + } + + pub fn get_table_id(&self) -> i64 { + self.table_id + } + + pub fn get_partition_id(&self) -> PartitionId { + self.partition_id + } +} + +impl Display for TablePartition { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "TablePartition{{tableId={}, partitionId={}}}", + self.table_id, self.partition_id + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_partition_spec() { + let mut map = HashMap::new(); + map.insert("date".to_string(), "2024-01-15".to_string()); + map.insert("region".to_string(), "US".to_string()); + + let spec = PartitionSpec::new(map.clone()); + assert_eq!(spec.get_spec_map(), &map); + } + + #[test] + fn test_resolved_partition_spec_name() { + let spec = ResolvedPartitionSpec::new( + vec!["date".to_string(), "region".to_string()], + vec!["2024-01-15".to_string(), "US".to_string()], + ) + .unwrap(); + + assert_eq!(spec.get_partition_name(), "2024-01-15$US"); + assert_eq!( + spec.get_partition_qualified_name(), + "date=2024-01-15/region=US" + ); + } + + #[test] + fn test_resolved_partition_spec_from_partition_name() { + let spec = ResolvedPartitionSpec::from_partition_name( + vec!["date".to_string(), "region".to_string()], + "2024-01-15$US", + ); + + assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); + } + + #[test] + fn test_resolved_partition_spec_from_qualified_name() { + let spec = + ResolvedPartitionSpec::from_partition_qualified_name("date=2024-01-15/region=US") + .unwrap(); + + assert_eq!(spec.get_partition_keys(), &["date", "region"]); + assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]); + } + + #[test] + fn test_resolved_partition_spec_mismatched_lengths() { + let result = ResolvedPartitionSpec::new( + vec!["date".to_string(), "region".to_string()], + vec!["2024-01-15".to_string()], + ); + + assert!(result.is_err()); + } + + #[test] + fn test_partition_info() { + let spec = + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + .unwrap(); + + let info = PartitionInfo::new(42, spec); + assert_eq!(info.get_partition_id(), 42); + assert_eq!(info.get_partition_name(), "2024-01-15"); + } + + #[test] + fn test_table_partition() { + let tp = TablePartition::new(100, 42); + assert_eq!(tp.get_table_id(), 100); + assert_eq!(tp.get_partition_id(), 42); + } + + #[test] + fn test_partition_spec_pb_roundtrip() { + let mut map = HashMap::new(); + map.insert("date".to_string(), "2024-01-15".to_string()); + let spec = PartitionSpec::new(map); + + let pb = spec.to_pb(); + let restored = PartitionSpec::from_pb(&pb); + + assert_eq!( + spec.get_spec_map().get("date"), + restored.get_spec_map().get("date") + ); + } + + #[test] + fn test_partition_info_pb_roundtrip() { + let spec = + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + .unwrap(); + let info = PartitionInfo::new(42, spec); + + let pb = info.to_pb(); + let restored = PartitionInfo::from_pb(&pb); + + assert_eq!(info.get_partition_id(), restored.get_partition_id()); + assert_eq!(info.get_partition_name(), restored.get_partition_name()); + } + + #[test] + fn test_contains() { + let full_spec = ResolvedPartitionSpec::new( + vec!["date".to_string(), "region".to_string()], + vec!["2024-01-15".to_string(), "US".to_string()], + ) + .unwrap(); + + let partial_spec = + ResolvedPartitionSpec::new(vec!["date".to_string()], vec!["2024-01-15".to_string()]) + .unwrap(); + + assert!(full_spec.contains(&partial_spec).unwrap()); + } +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index f4cf972d..3b5c8618 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -697,26 +697,65 @@ impl TablePath { } } +/// A database name, table name and partition name combo. It's used to represent the physical path of +/// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table), the +/// partition_name will be not null, otherwise null. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhysicalTablePath { table_path: TablePath, - #[allow(dead_code)] - partition: Option, + partition_name: Option, } impl PhysicalTablePath { pub fn of(table_path: TablePath) -> Self { Self { table_path, - partition: None, + partition_name: None, } } - // TODO: support partition + pub fn of_partitioned(table_path: TablePath, partition_name: Option) -> Self { + Self { + table_path, + partition_name, + } + } + + pub fn of_with_names( + database_name: String, + table_name: String, + partition_name: Option, + ) -> Self { + Self { + table_path: TablePath::new(database_name, table_name), + partition_name, + } + } pub fn get_table_path(&self) -> &TablePath { &self.table_path } + + pub fn get_database_name(&self) -> &str { + self.table_path.database() + } + + pub fn get_table_name(&self) -> &str { + self.table_path.table() + } + + pub fn get_partition_name(&self) -> Option<&String> { + self.partition_name.as_ref() + } +} + +impl Display for PhysicalTablePath { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match &self.partition_name { + Some(partition) => write!(f, "{}(p={})", self.table_path, partition), + None => write!(f, "{}", self.table_path), + } + } } #[derive(Debug, Clone)] diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index eaee94c3..7a7f9c2a 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -373,4 +373,38 @@ message PbLookupRespForBucket { message PbValue { optional bytes values = 1; -} \ No newline at end of file +} + +message PbPartitionSpec { + repeated PbKeyValue partition_key_values = 1; +} + +message PbPartitionInfo { + required int64 partition_id = 1; + required PbPartitionSpec partition_spec = 2; +} + +message ListPartitionInfosRequest { + required PbTablePath table_path = 1; + optional PbPartitionSpec partial_partition_spec = 2; +} + +message ListPartitionInfosResponse { + repeated PbPartitionInfo partitions_info = 1; +} + +message CreatePartitionRequest { + required PbTablePath table_path = 1; + required PbPartitionSpec partition_spec = 2; + required bool ignore_if_exists = 3; +} + +message CreatePartitionResponse {} + +message DropPartitionRequest { + required PbTablePath table_path = 1; + required PbPartitionSpec partition_spec = 2; + required bool ignore_if_not_exists = 3; +} + +message DropPartitionResponse {} \ No newline at end of file diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index 66e4beb8..f6009c07 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -27,6 +27,7 @@ pub enum ApiKey { DropTable, GetTable, ListTables, + ListPartitionInfos, TableExists, MetaData, ProduceLog, @@ -37,6 +38,8 @@ pub enum ApiKey { GetFileSystemSecurityToken, GetDatabaseInfo, GetLatestLakeSnapshot, + CreatePartition, + DropPartition, Unknown(i16), } @@ -51,6 +54,7 @@ impl From for ApiKey { 1006 => ApiKey::DropTable, 1007 => ApiKey::GetTable, 1008 => ApiKey::ListTables, + 1009 => ApiKey::ListPartitionInfos, 1010 => ApiKey::TableExists, 1012 => ApiKey::MetaData, 1014 => ApiKey::ProduceLog, @@ -61,6 +65,8 @@ impl From for ApiKey { 1025 => ApiKey::GetFileSystemSecurityToken, 1032 => ApiKey::GetLatestLakeSnapshot, 1035 => ApiKey::GetDatabaseInfo, + 1036 => ApiKey::CreatePartition, + 1037 => ApiKey::DropPartition, _ => Unknown(key), } } @@ -77,6 +83,7 @@ impl From for i16 { ApiKey::DropTable => 1006, ApiKey::GetTable => 1007, ApiKey::ListTables => 1008, + ApiKey::ListPartitionInfos => 1009, ApiKey::TableExists => 1010, ApiKey::MetaData => 1012, ApiKey::ProduceLog => 1014, @@ -87,6 +94,8 @@ impl From for i16 { ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::GetLatestLakeSnapshot => 1032, ApiKey::GetDatabaseInfo => 1035, + ApiKey::CreatePartition => 1036, + ApiKey::DropPartition => 1037, Unknown(x) => x, } } @@ -107,6 +116,7 @@ mod tests { (1006, ApiKey::DropTable), (1007, ApiKey::GetTable), (1008, ApiKey::ListTables), + (1009, ApiKey::ListPartitionInfos), (1010, ApiKey::TableExists), (1012, ApiKey::MetaData), (1014, ApiKey::ProduceLog), @@ -117,6 +127,8 @@ mod tests { (1025, ApiKey::GetFileSystemSecurityToken), (1032, ApiKey::GetLatestLakeSnapshot), (1035, ApiKey::GetDatabaseInfo), + (1036, ApiKey::CreatePartition), + (1037, ApiKey::DropPartition), ]; for (raw, key) in cases { diff --git a/crates/fluss/src/rpc/message/create_partition.rs b/crates/fluss/src/rpc/message/create_partition.rs new file mode 100644 index 00000000..93dbf70d --- /dev/null +++ b/crates/fluss/src/rpc/message/create_partition.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{PartitionSpec, TablePath}; +use crate::proto::CreatePartitionResponse; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct CreatePartitionRequest { + pub inner_request: proto::CreatePartitionRequest, +} + +impl CreatePartitionRequest { + pub fn new( + table_path: &TablePath, + partition_spec: &PartitionSpec, + ignore_if_exists: bool, + ) -> Self { + CreatePartitionRequest { + inner_request: proto::CreatePartitionRequest { + table_path: to_table_path(table_path), + partition_spec: partition_spec.to_pb(), + ignore_if_exists, + }, + } + } +} + +impl RequestBody for CreatePartitionRequest { + type ResponseBody = CreatePartitionResponse; + + const API_KEY: ApiKey = ApiKey::CreatePartition; + + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(CreatePartitionRequest); +impl_read_version_type!(CreatePartitionResponse); diff --git a/crates/fluss/src/rpc/message/drop_partition.rs b/crates/fluss/src/rpc/message/drop_partition.rs new file mode 100644 index 00000000..ddc97d83 --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_partition.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{PartitionSpec, TablePath}; +use crate::proto::DropPartitionResponse; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DropPartitionRequest { + pub inner_request: proto::DropPartitionRequest, +} + +impl DropPartitionRequest { + pub fn new( + table_path: &TablePath, + partition_spec: &PartitionSpec, + ignore_if_not_exists: bool, + ) -> Self { + DropPartitionRequest { + inner_request: proto::DropPartitionRequest { + table_path: to_table_path(table_path), + partition_spec: partition_spec.to_pb(), + ignore_if_not_exists, + }, + } + } +} + +impl RequestBody for DropPartitionRequest { + type ResponseBody = DropPartitionResponse; + + const API_KEY: ApiKey = ApiKey::DropPartition; + + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(DropPartitionRequest); +impl_read_version_type!(DropPartitionResponse); diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs b/crates/fluss/src/rpc/message/list_partition_infos.rs new file mode 100644 index 00000000..ab693671 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_partition_infos.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{PartitionInfo, PartitionSpec, TablePath}; +use crate::proto::ListPartitionInfosResponse; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListPartitionInfosRequest { + pub inner_request: proto::ListPartitionInfosRequest, +} + +impl ListPartitionInfosRequest { + pub fn new(table_path: &TablePath, partial_partition_spec: Option<&PartitionSpec>) -> Self { + ListPartitionInfosRequest { + inner_request: proto::ListPartitionInfosRequest { + table_path: to_table_path(table_path), + partial_partition_spec: partial_partition_spec.map(|s| s.to_pb()), + }, + } + } +} + +impl RequestBody for ListPartitionInfosRequest { + type ResponseBody = ListPartitionInfosResponse; + + const API_KEY: ApiKey = ApiKey::ListPartitionInfos; + + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(ListPartitionInfosRequest); +impl_read_version_type!(ListPartitionInfosResponse); + +impl ListPartitionInfosResponse { + pub fn get_partitions_info(&self) -> Vec { + self.partitions_info + .iter() + .map(PartitionInfo::from_pb) + .collect() + } +} diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 881a64f6..addb97a1 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -21,9 +21,11 @@ use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; mod create_database; +mod create_partition; mod create_table; mod database_exists; mod drop_database; +mod drop_partition; mod drop_table; mod fetch; mod get_database_info; @@ -33,6 +35,7 @@ mod get_table; mod header; mod list_databases; mod list_offsets; +mod list_partition_infos; mod list_tables; mod lookup; mod produce_log; @@ -42,9 +45,11 @@ mod update_metadata; pub use crate::rpc::RpcError; pub use create_database::*; +pub use create_partition::*; pub use create_table::*; pub use database_exists::*; pub use drop_database::*; +pub use drop_partition::*; pub use drop_table::*; pub use fetch::*; pub use get_database_info::*; @@ -54,6 +59,7 @@ pub use get_table::*; pub use header::*; pub use list_databases::*; pub use list_offsets::*; +pub use list_partition_infos::*; pub use list_tables::*; pub use lookup::*; pub use produce_log::*; diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index fbdb295d..a3f32096 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -37,9 +37,10 @@ mod admin_test { use crate::integration::utils::{get_cluster, start_cluster, stop_cluster}; use fluss::error::FlussError; use fluss::metadata::{ - DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, - TablePath, + DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, PartitionSpec, Schema, + TableDescriptor, TablePath, }; + use std::collections::HashMap; use std::sync::Arc; fn before_all() { @@ -223,6 +224,136 @@ mod admin_test { assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false); } + #[tokio::test] + async fn test_partition_apis() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection + .get_admin() + .await + .expect("Failed to get admin client"); + + let test_db_name = "test_partition_apis_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_partition_apis") + .build(); + + admin + .create_database(test_db_name, true, Some(&db_descriptor)) + .await + .expect("Failed to create test database"); + + let test_table_name = "partitioned_table"; + let table_path = TablePath::new(test_db_name.to_string(), test_table_name.to_string()); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("dt", DataTypes::string()) + .primary_key(vec!["id".to_string(), "dt".to_string()]) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(3), vec!["id".to_string()]) + .partitioned_by(vec!["dt".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create partitioned table"); + + let partitions = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partitions"); + assert!( + partitions.is_empty(), + "Expected no partitions initially, found {}", + partitions.len() + ); + + let mut partition_values = HashMap::new(); + partition_values.insert("dt".to_string(), "2024-01-15".to_string()); + let partition_spec = PartitionSpec::new(partition_values); + + admin + .create_partition(&table_path, &partition_spec, false) + .await + .expect("Failed to create partition"); + + let partitions = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partitions"); + assert_eq!( + partitions.len(), + 1, + "Expected exactly one partition after creation" + ); + assert_eq!( + partitions[0].get_partition_name(), + "2024-01-15", + "Partition name mismatch" + ); + + // list with partial spec filter - should find the partition + let partitions_with_spec = admin + .list_partition_infos_with_spec(&table_path, Some(&partition_spec)) + .await + .expect("Failed to list partitions with spec"); + assert_eq!( + partitions_with_spec.len(), + 1, + "Expected one partition matching the spec" + ); + assert_eq!( + partitions_with_spec[0].get_partition_name(), + "2024-01-15", + "Partition name mismatch with spec filter" + ); + + // list with non-matching spec - should find no partitions + let mut non_matching_values = HashMap::new(); + non_matching_values.insert("dt".to_string(), "2024-01-16".to_string()); + let non_matching_spec = PartitionSpec::new(non_matching_values); + let partitions_non_matching = admin + .list_partition_infos_with_spec(&table_path, Some(&non_matching_spec)) + .await + .expect("Failed to list partitions with non-matching spec"); + assert!( + partitions_non_matching.is_empty(), + "Expected no partitions for non-matching spec" + ); + + admin + .drop_partition(&table_path, &partition_spec, false) + .await + .expect("Failed to drop partition"); + + let partitions = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partitions"); + assert!( + partitions.is_empty(), + "Expected no partitions after drop, found {}", + partitions.len() + ); + + admin + .drop_table(&table_path, true) + .await + .expect("Failed to drop table"); + admin.drop_database(test_db_name, true, true).await; + } + #[tokio::test] async fn test_fluss_error_response() { let cluster = get_fluss_cluster(); From c9c175cf83aac0bce2267768dbf3112435196e0a Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 24 Jan 2026 12:35:27 +0000 Subject: [PATCH 2/4] Remove vestigial unit test --- crates/fluss/src/metadata/partition.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 020e07ec..6aaf61ab 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -358,16 +358,6 @@ impl Display for TablePartition { mod tests { use super::*; - #[test] - fn test_partition_spec() { - let mut map = HashMap::new(); - map.insert("date".to_string(), "2024-01-15".to_string()); - map.insert("region".to_string(), "US".to_string()); - - let spec = PartitionSpec::new(map.clone()); - assert_eq!(spec.get_spec_map(), &map); - } - #[test] fn test_resolved_partition_spec_name() { let spec = ResolvedPartitionSpec::new( From d9a7a532f90fad1583c3792e60fb78ebb34df5e6 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 24 Jan 2026 17:17:27 +0000 Subject: [PATCH 3/4] Address copilot comments --- crates/fluss/src/lib.rs | 2 +- crates/fluss/src/metadata/partition.rs | 10 ++++++---- crates/fluss/src/metadata/table.rs | 4 ++-- crates/fluss/tests/integration/admin.rs | 20 +++++++++++++++----- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index e8d822fb..f0650057 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -35,7 +35,7 @@ mod util; mod test_utils; pub type TableId = u64; -pub type PartitionId = u64; +pub type PartitionId = i64; pub type BucketId = i32; pub mod proto { diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 6aaf61ab..0e251035 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -112,8 +112,10 @@ impl ResolvedPartitionSpec { } pub fn from_partition_name(partition_keys: Vec, partition_name: &str) -> Self { - let partition_values: Vec = - partition_name.split('$').map(|s| s.to_string()).collect(); + let partition_values: Vec = partition_name + .split(PARTITION_SPEC_SEPARATOR) + .map(|s| s.to_string()) + .collect(); Self { partition_keys, partition_values, @@ -160,7 +162,7 @@ impl ResolvedPartitionSpec { PartitionSpec::new(spec_map) } - /// Generate the partition name for a partition table of specify partition values. + /// Generate the partition name for a partition table with specified partition values. /// /// The partition name is in the following format: value1$value2$...$valueN pub fn get_partition_name(&self) -> String { @@ -197,7 +199,7 @@ impl ResolvedPartitionSpec { None => { return Err(Error::IllegalArgument { message: format!( - "table don't contains this partitionKey: {}", + "table does not contain partitionKey: {}", other_partition_key ), }); diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 3b5c8618..1b90ac30 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -698,8 +698,8 @@ impl TablePath { } /// A database name, table name and partition name combo. It's used to represent the physical path of -/// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table), the -/// partition_name will be not null, otherwise null. +/// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table), +/// `partition_name` will be `Some(...)`; otherwise, it will be `None`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhysicalTablePath { table_path: TablePath, diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index a3f32096..9842a5aa 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -250,14 +250,19 @@ mod admin_test { .column("id", DataTypes::int()) .column("name", DataTypes::string()) .column("dt", DataTypes::string()) - .primary_key(vec!["id".to_string(), "dt".to_string()]) + .column("region", DataTypes::string()) + .primary_key(vec![ + "id".to_string(), + "dt".to_string(), + "region".to_string(), + ]) .build() .expect("Failed to build table schema"); let table_descriptor = TableDescriptor::builder() .schema(table_schema) .distributed_by(Some(3), vec!["id".to_string()]) - .partitioned_by(vec!["dt".to_string()]) + .partitioned_by(vec!["dt".to_string(), "region".to_string()]) .property("table.replication.factor", "1") .log_format(LogFormat::ARROW) .kv_format(KvFormat::COMPACTED) @@ -281,6 +286,7 @@ mod admin_test { let mut partition_values = HashMap::new(); partition_values.insert("dt".to_string(), "2024-01-15".to_string()); + partition_values.insert("region".to_string(), "EMEA".to_string()); let partition_spec = PartitionSpec::new(partition_values); admin @@ -299,13 +305,17 @@ mod admin_test { ); assert_eq!( partitions[0].get_partition_name(), - "2024-01-15", + "2024-01-15$EMEA", "Partition name mismatch" ); // list with partial spec filter - should find the partition + let mut partition_values = HashMap::new(); + partition_values.insert("dt".to_string(), "2024-01-15".to_string()); + let partial_partition_spec = PartitionSpec::new(partition_values); + let partitions_with_spec = admin - .list_partition_infos_with_spec(&table_path, Some(&partition_spec)) + .list_partition_infos_with_spec(&table_path, Some(&partial_partition_spec)) .await .expect("Failed to list partitions with spec"); assert_eq!( @@ -315,7 +325,7 @@ mod admin_test { ); assert_eq!( partitions_with_spec[0].get_partition_name(), - "2024-01-15", + "2024-01-15$EMEA", "Partition name mismatch with spec filter" ); From d7735a1a15495192e0d51d5faa10e98a540e67b3 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 24 Jan 2026 18:24:51 +0000 Subject: [PATCH 4/4] Use proper id types --- crates/fluss/src/client/admin.rs | 8 ++++---- crates/fluss/src/client/table/scanner.rs | 3 ++- crates/fluss/src/client/write/accumulator.rs | 2 +- crates/fluss/src/client/write/sender.rs | 3 ++- crates/fluss/src/lib.rs | 2 +- crates/fluss/src/metadata/partition.rs | 6 +++--- crates/fluss/src/metadata/table.rs | 19 ++++++++++--------- crates/fluss/src/rpc/message/list_offsets.rs | 10 ++++++---- crates/fluss/src/util/mod.rs | 3 ++- 9 files changed, 31 insertions(+), 25 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 12e2e624..bffe0f51 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -29,9 +29,9 @@ use crate::rpc::message::{ use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; -use crate::BucketId; use crate::error::{Error, Result}; use crate::proto::GetTableInfoResponse; +use crate::{BucketId, PartitionId, TableId}; use std::collections::HashMap; use std::slice::from_ref; use std::sync::Arc; @@ -321,13 +321,13 @@ impl FlussAdmin { fn prepare_list_offsets_requests( &self, - table_id: i64, - partition_id: Option, + table_id: TableId, + partition_id: Option, buckets: &[BucketId], offset_spec: OffsetSpec, ) -> Result> { let cluster = self.metadata.get_cluster(); - let mut node_for_bucket_list: HashMap> = HashMap::new(); + let mut node_for_bucket_list: HashMap> = HashMap::new(); for bucket_id in buckets { let table_bucket = TableBucket::new(table_id, *bucket_id); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index afa44f35..9ad424c0 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; +use crate::TableId; use crate::client::connection::FlussConnection; use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; @@ -262,7 +263,7 @@ pub struct RecordBatchLogScanner { /// Private shared implementation for both scanner types struct LogScannerInner { table_path: TablePath, - table_id: i64, + table_id: TableId, metadata: Arc, log_scanner_status: Arc, log_fetcher: LogFetcher, diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 46c822c1..96114fb0 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -401,7 +401,7 @@ impl RecordAccumulator { ready_write_batch.write_batch.re_enqueued(); let table_path = ready_write_batch.write_batch.table_path().clone(); let bucket_id = ready_write_batch.table_bucket.bucket_id(); - let table_id = u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0); + let table_id = ready_write_batch.table_bucket.table_id(); let dq = { let mut binding = diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index ceed2456..1ffda582 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::TableId; use crate::client::broadcast; use crate::client::metadata::Metadata; use crate::client::write::batch::WriteBatch; @@ -144,7 +145,7 @@ impl Sender { return Ok(()); } let mut records_by_bucket = HashMap::new(); - let mut write_batch_by_table: HashMap> = HashMap::new(); + let mut write_batch_by_table: HashMap> = HashMap::new(); for batch in batches { let table_bucket = batch.table_bucket.clone(); diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index f0650057..f079db28 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -34,7 +34,7 @@ mod util; #[cfg(test)] mod test_utils; -pub type TableId = u64; +pub type TableId = i64; pub type PartitionId = i64; pub type BucketId = i32; diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 0e251035..1ecc0dcd 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -266,12 +266,12 @@ impl Display for ResolvedPartitionSpec { /// represents the unique identifier of the partition. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PartitionInfo { - partition_id: i64, + partition_id: PartitionId, partition_spec: ResolvedPartitionSpec, } impl PartitionInfo { - pub fn new(partition_id: i64, partition_spec: ResolvedPartitionSpec) -> Self { + pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec) -> Self { Self { partition_id, partition_spec, @@ -279,7 +279,7 @@ impl PartitionInfo { } /// Get the partition id. The id is globally unique in the Fluss cluster. - pub fn get_partition_id(&self) -> i64 { + pub fn get_partition_id(&self) -> PartitionId { self.partition_id } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 1b90ac30..c4a91954 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -20,6 +20,7 @@ use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; use crate::metadata::datatype::{DataField, DataType, RowType}; +use crate::{BucketId, PartitionId, TableId}; use core::fmt; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -761,7 +762,7 @@ impl Display for PhysicalTablePath { #[derive(Debug, Clone)] pub struct TableInfo { pub table_path: TablePath, - pub table_id: i64, + pub table_id: TableId, pub schema_id: i32, pub schema: Schema, pub row_type: RowType, @@ -858,7 +859,7 @@ impl TableInfo { #[allow(clippy::too_many_arguments)] pub fn new( table_path: TablePath, - table_id: i64, + table_id: TableId, schema_id: i32, schema: Schema, bucket_keys: Vec, @@ -1039,13 +1040,13 @@ impl Display for TableInfo { #[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] pub struct TableBucket { - table_id: i64, - partition_id: Option, - bucket: i32, + table_id: TableId, + partition_id: Option, + bucket: BucketId, } impl TableBucket { - pub fn new(table_id: i64, bucket: i32) -> Self { + pub fn new(table_id: TableId, bucket: BucketId) -> Self { TableBucket { table_id, partition_id: None, @@ -1053,15 +1054,15 @@ impl TableBucket { } } - pub fn table_id(&self) -> i64 { + pub fn table_id(&self) -> TableId { self.table_id } - pub fn bucket_id(&self) -> i32 { + pub fn bucket_id(&self) -> BucketId { self.bucket } - pub fn partition_id(&self) -> Option { + pub fn partition_id(&self) -> Option { self.partition_id } } diff --git a/crates/fluss/src/rpc/message/list_offsets.rs b/crates/fluss/src/rpc/message/list_offsets.rs index fcecb418..262645a6 100644 --- a/crates/fluss/src/rpc/message/list_offsets.rs +++ b/crates/fluss/src/rpc/message/list_offsets.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::{impl_read_version_type, impl_write_version_type, proto}; +use crate::{ + BucketId, PartitionId, TableId, impl_read_version_type, impl_write_version_type, proto, +}; use crate::error::Result as FlussResult; use crate::error::{Error, FlussError}; @@ -74,9 +76,9 @@ pub struct ListOffsetsRequest { impl ListOffsetsRequest { pub fn new( - table_id: i64, - partition_id: Option, - bucket_ids: Vec, + table_id: TableId, + partition_id: Option, + bucket_ids: Vec, offset_spec: OffsetSpec, ) -> Self { ListOffsetsRequest { diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 30424e5d..28681919 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -18,6 +18,7 @@ pub mod murmur_hash; pub mod varint; +use crate::TableId; use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; use std::collections::{HashMap, HashSet}; @@ -154,7 +155,7 @@ impl FairBucketStatusMap { self.map.clear(); // Group buckets by table ID - let mut table_to_buckets: LinkedHashMap> = LinkedHashMap::new(); + let mut table_to_buckets: LinkedHashMap> = LinkedHashMap::new(); for bucket in bucket_to_status.keys() { table_to_buckets .entry(bucket.table_id())