Skip to content
Open

ssd #35

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
836 changes: 836 additions & 0 deletions fluxon_doc_cn/design/kv_5_SSD存储设计.md

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions fluxon_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fluxon_rs/fluxon_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ bytes = "1"
pprof = { version = "0.15", features = ["flamegraph"] }
hex = "0.4"
sha2 = "0.10"
io-uring = "0.7"
tokio-tungstenite = { version = "0.21", default-features = false, features = ["connect", "handshake"], optional = true }

sockudo-ws = { version = "^1.7.4", default-features = false, features = ["tokio-runtime", "fastrand"], optional = true }
Expand Down
117 changes: 107 additions & 10 deletions fluxon_rs/fluxon_kv/src/client_kv_api/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
cluster_manager::NodeID,
master_kv_router::msg_pack::{
GetAllocationMode, GetDoneReq, GetDoneResp, GetMetaReq, GetMetaResp, GetRevokeReq,
GetStartReq, GetStartResp,
GetSourceKind, GetStartReq, GetStartResp,
},
p2p::msg_pack::MsgPack,
rpcresp_kvresult_convert::msg_and_error::codes_api,
Expand All @@ -26,19 +26,27 @@ use std::sync::Arc;
pub struct RemoteGetInfo {
get_id: u64,
data_len: usize,
source_kind: GetSourceKind,
src_addr: u64,
target_addr: u64,
node_id: NodeID,
peer_is_src_or_target: bool,
}

impl RemoteGetInfo {
pub fn source_kind(&self) -> GetSourceKind {
self.source_kind
}
}

impl std::fmt::Display for RemoteGetInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GetInfo{{ get_id: {}, data_len: {} bytes, src_addr: {:#x}, target_addr: {:#x}, node_id: {:?}, remote_transfer: {} }}",
"GetInfo{{ get_id: {}, data_len: {} bytes, source_kind: {:?}, src_addr: {:#x}, target_addr: {:#x}, node_id: {:?}, remote_transfer: {} }}",
self.get_id,
self.data_len,
self.source_kind,
self.src_addr,
self.target_addr,
self.node_id,
Expand Down Expand Up @@ -177,8 +185,80 @@ impl ClientKvApiInner {
);
}

let mut ssd_done_resp = None;
if resp.source_kind == GetSourceKind::Ssd {
let ssd_stage_len = resp.ssd_stage_len;
if ssd_stage_len < data_len as u64 {
#[cfg(test)]
{
self.test_record.remove_transfering_get(get_id);
}

self.get_revoke(get_id).await?;
return Err(KvError::Api(ApiError::InvalidArgument {
detail: format!(
"invalid ssd stage len for key={} get_id={} data_len={} ssd_stage_len={}",
key, get_id, data_len, ssd_stage_len
),
}));
}
let done_resp = match self
.stage_kv_from_ssd_source(
&resp.node_id,
key,
put_id,
get_id,
abs_src,
abs_target,
data_len as u64,
ssd_stage_len,
)
.await
{
Ok(done_resp) => done_resp,
Err(err) => {
tracing::warn!(
"kv get ssd stage failed: key={}, source_node={}, stage={:#x}, target={:#x}, len={}, ssd_stage_len={}, err={}",
key,
resp.node_id,
abs_src,
abs_target,
data_len,
ssd_stage_len,
err
);

#[cfg(test)]
{
self.test_record.remove_transfering_get(get_id);
}

obe_get_transfer_error(&metrics, &client_id, &node_role, key, data_len as u64);
self.get_revoke_ssd_source(get_id).await?;
return Err(err);
}
};
ssd_done_resp = Some(done_resp);
tracing::debug!(
"kv get ssd staged and pushed: key={}, source_node={}, stage={:#x}, target={:#x}, len={}, ssd_stage_len={}",
key,
resp.node_id,
abs_src,
abs_target,
data_len,
ssd_stage_len
);
}

// transfer data (skip if local and src==target to avoid redundant copy)
if peer_id.is_none() && abs_src == abs_target {
if resp.source_kind == GetSourceKind::Ssd {
tracing::debug!(
"kv get ssd owner push complete: key={}, target={:#x}, len={} (skip requester transfer)",
key,
abs_target,
data_len
);
} else if peer_id.is_none() && abs_src == abs_target {
tracing::debug!(
"kv get local no-op: src==target {:#x}, len={} (skip transfer)",
abs_target,
Expand Down Expand Up @@ -249,12 +329,17 @@ impl ClientKvApiInner {

// Removed post-transfer zero-header verification per request.

// Complete the get operation and get holder_id
let done_resp = match self.get_done(get_id).await {
Ok(resp) => resp,
Err(err) => {
obe_get_end_error_rpc(&metrics, &client_id, &node_role, key, data_len as u64);
return Err(err);
// Complete the get operation and get holder_id. SSD source already called
// get_done after pushing into the requester target.
let done_resp = if let Some(done_resp) = ssd_done_resp {
done_resp
} else {
match self.get_done(get_id).await {
Ok(resp) => resp,
Err(err) => {
obe_get_end_error_rpc(&metrics, &client_id, &node_role, key, data_len as u64);
return Err(err);
}
}
};
let end_handle_us = done_resp.server_process_us;
Expand Down Expand Up @@ -326,6 +411,7 @@ impl ClientKvApiInner {
let get_info = RemoteGetInfo {
get_id,
data_len,
source_kind: resp.source_kind,
src_addr: abs_src,
target_addr: abs_target,
node_id: resp.node_id.into(),
Expand Down Expand Up @@ -435,8 +521,19 @@ impl ClientKvApiInner {

/// 撤销 Get 操作,释放已分配的资源
pub async fn get_revoke(&self, get_id: u64) -> KvResult<()> {
self.get_revoke_inner(get_id, false).await
}

async fn get_revoke_ssd_source(&self, get_id: u64) -> KvResult<()> {
self.get_revoke_inner(get_id, true).await
}

async fn get_revoke_inner(&self, get_id: u64, drop_ssd_source: bool) -> KvResult<()> {
let req = MsgPack {
serialize_part: GetRevokeReq { get_id },
serialize_part: GetRevokeReq {
get_id,
drop_ssd_source,
},
raw_bytes: Vec::new(),
};

Expand Down
Loading
Loading