Skip to content
40 changes: 5 additions & 35 deletions crates/fluss/tests/integration/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,25 @@ static SHARED_FLUSS_CLUSTER: LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
#[after_all]
mod admin_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder};
use crate::integration::fluss_cluster::FlussTestingCluster;
use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
use fluss::error::FlussError;
use fluss::metadata::{
DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor,
TablePath,
};
use std::sync::Arc;
use std::thread;

fn before_all() {
// Create a new tokio runtime in a separate thread
let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime");
rt.block_on(async {
let cluster = FlussTestingClusterBuilder::new("test-admin").build().await;
let mut guard = cluster_guard.write();
*guard = Some(cluster);
});
})
.join()
.expect("Failed to create cluster");
// wait for 20 seconds to avoid the error like
// CoordinatorEventProcessor is not initialized yet
thread::sleep(std::time::Duration::from_secs(20));
start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone());
}

fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
let cluster_guard = SHARED_FLUSS_CLUSTER.read();
if cluster_guard.is_none() {
panic!("Fluss cluster not initialized. Make sure before_all() was called.");
}
Arc::new(cluster_guard.as_ref().unwrap().clone())
get_cluster(&SHARED_FLUSS_CLUSTER)
}

fn after_all() {
// Create a new tokio runtime in a separate thread
let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime");
rt.block_on(async {
let mut guard = cluster_guard.write();
if let Some(cluster) = guard.take() {
cluster.stop().await;
}
});
})
.join()
.expect("Failed to cleanup cluster");
stop_cluster(SHARED_FLUSS_CLUSTER.clone());
}

#[tokio::test]
Expand Down
Loading
Loading