Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

use crate::client::metadata::Metadata;
use crate::metadata::{
DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket, TableDescriptor,
TableInfo, TablePath,
DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec,
TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::rpc::message::{
CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest, DropDatabaseRequest,
DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, GetTableRequest,
ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest,
DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest,
GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest,
ListTablesRequest, TableExistsRequest,
};
use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};

use crate::BucketId;
use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
use std::collections::HashMap;
use std::slice::from_ref;
use std::sync::Arc;
Expand Down Expand Up @@ -138,6 +139,63 @@ impl FlussAdmin {
Ok(response.table_name)
}

/// List all partitions in the given table.
pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result<Vec<PartitionInfo>> {
self.list_partition_infos_with_spec(table_path, None).await
}

/// List partitions in the given table that match the partial partition spec.
pub async fn list_partition_infos_with_spec(
&self,
table_path: &TablePath,
partial_partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<PartitionInfo>> {
let response = self
.admin_gateway
.request(ListPartitionInfosRequest::new(
table_path,
partial_partition_spec,
))
.await?;
Ok(response.get_partitions_info())
}

/// Create a new partition for a partitioned table.
pub async fn create_partition(
&self,
table_path: &TablePath,
partition_spec: &PartitionSpec,
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.request(CreatePartitionRequest::new(
table_path,
partition_spec,
ignore_if_exists,
))
.await?;
Ok(())
}

/// Drop a partition from a partitioned table.
pub async fn drop_partition(
&self,
table_path: &TablePath,
partition_spec: &PartitionSpec,
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.request(DropPartitionRequest::new(
table_path,
partition_spec,
ignore_if_not_exists,
))
.await?;
Ok(())
}

/// Check if a table exists
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
let response = self
Expand Down Expand Up @@ -263,13 +321,13 @@ impl FlussAdmin {

fn prepare_list_offsets_requests(
&self,
table_id: i64,
partition_id: Option<i64>,
table_id: TableId,
partition_id: Option<PartitionId>,
buckets: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, ListOffsetsRequest>> {
let cluster = self.metadata.get_cluster();
let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = HashMap::new();

for bucket_id in buckets {
let table_bucket = TableBucket::new(table_id, *bucket_id);
Expand Down
3 changes: 2 additions & 1 deletion crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;

use crate::TableId;
use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
Expand Down Expand Up @@ -262,7 +263,7 @@ pub struct RecordBatchLogScanner {
/// Private shared implementation for both scanner types
struct LogScannerInner {
table_path: TablePath,
table_id: i64,
table_id: TableId,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/write/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl RecordAccumulator {
ready_write_batch.write_batch.re_enqueued();
let table_path = ready_write_batch.write_batch.table_path().clone();
let bucket_id = ready_write_batch.table_bucket.bucket_id();
let table_id = u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
let table_id = ready_write_batch.table_bucket.table_id();

let dq = {
let mut binding =
Expand Down
3 changes: 2 additions & 1 deletion crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::TableId;
use crate::client::broadcast;
use crate::client::metadata::Metadata;
use crate::client::write::batch::WriteBatch;
Expand Down Expand Up @@ -144,7 +145,7 @@ impl Sender {
return Ok(());
}
let mut records_by_bucket = HashMap::new();
let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> = HashMap::new();
let mut write_batch_by_table: HashMap<TableId, Vec<TableBucket>> = HashMap::new();

for batch in batches {
let table_bucket = batch.table_bucket.clone();
Expand Down
4 changes: 2 additions & 2 deletions crates/fluss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ mod util;
#[cfg(test)]
mod test_utils;

pub type TableId = u64;
pub type PartitionId = u64;
pub type TableId = i64;
pub type PartitionId = i64;
pub type BucketId = i32;

pub mod proto {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluss/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ mod data_lake_format;
mod database;
mod datatype;
mod json_serde;
mod partition;
mod table;

pub use data_lake_format::*;
pub use database::*;
pub use datatype::*;
pub use json_serde::*;
pub use partition::*;
pub use table::*;
Loading
Loading