diff --git a/.gitignore b/.gitignore index fa2b1aa6a8c..6f295ad49e8 100644 --- a/.gitignore +++ b/.gitignore @@ -185,3 +185,6 @@ custom_ops/gpu_ops/w4afp8_gemm/w4afp8_gemm_template.h custom_ops/gpu_ops/wfp8afp8_sparse_gemm/wfp8Afp8_sparse_gemm_*.cu custom_ops/gpu_ops/wfp8afp8_sparse_gemm/wfp8Afp8_sparse_gemm_template.h + +# Bench output artifacts (T53) +runs/ diff --git a/benchmarks/yaml/eb45-21b-a3b-32k-bf16-kv50-512s.yaml b/benchmarks/yaml/eb45-21b-a3b-32k-bf16-kv50-512s.yaml new file mode 100644 index 00000000000..b58a9764527 --- /dev/null +++ b/benchmarks/yaml/eb45-21b-a3b-32k-bf16-kv50-512s.yaml @@ -0,0 +1,12 @@ +# T53 bench workload — KV-bound (not slot-bound); gate: FD_HEAD_WISE_KV_CACHE=1 +# max_num_seqs raised to 256 so the KV pool, not the slot count, is the bottleneck. +# kv_cache_ratio: 0.30 → ~24GB KV on A800-80GB (TINY envelope diagnostic per opus v2 verdict). +# (0.35 deterministic OOM at 78.99GB / index 3408/3689 weights load — 4 identical failures. +# 0.50 also OOMs. Revert to 0.35 before SMOKE/FULL only after opus comparability decision.) +# Use with: INPUT_LEN=8192 OUTPUT_LEN=4096 REQUEST_RATE=8 +# +max_model_len: 32768 +max_num_seqs: 256 +kv_cache_ratio: 0.30 +tensor_parallel_size: 1 +max_num_batched_tokens: 32768 diff --git a/custom_ops/gpu_ops/append_attention.cu b/custom_ops/gpu_ops/append_attention.cu index c1586945cc5..632e20db479 100644 --- a/custom_ops/gpu_ops/append_attention.cu +++ b/custom_ops/gpu_ops/append_attention.cu @@ -48,6 +48,7 @@ void AppendAttentionKernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_tables, + const paddle::optional& block_tables_headwise, const paddle::Tensor& encoder_batch_ids, const paddle::Tensor& encoder_tile_ids_per_batch, const paddle::Tensor& encoder_num_blocks, @@ -96,6 +97,17 @@ void AppendAttentionKernel( typedef typename traits_::DataType DataType_; typedef typename traits_::data_t data_t; + // Dtype guards for Python-supplied INT32 metadata tensors accessed via + // .data() below. Catches accidental INT64/FP dtype before UB. + PD_CHECK(set_max_lengths.dtype() == paddle::DataType::INT32, + "set_max_lengths must be INT32"); + PD_CHECK(encoder_num_blocks.dtype() == paddle::DataType::INT32, + "encoder_num_blocks must be INT32"); + PD_CHECK(kv_num_blocks.dtype() == paddle::DataType::INT32, + "kv_num_blocks must be INT32"); + PD_CHECK(decoder_num_blocks.dtype() == paddle::DataType::INT32, + "decoder_num_blocks must be INT32"); + const int max_len_this_time = set_max_lengths.data()[0]; const int max_enc_len_this_time = set_max_lengths.data()[1]; const int max_dec_len_this_time = set_max_lengths.data()[2]; @@ -155,6 +167,7 @@ void AppendAttentionKernel( batch_id_per_token, cu_seqlens_q, block_tables, + block_tables_headwise, lambda_batch_ids, lambda_tile_ids_per_batch, cache_quant_type_str, @@ -488,6 +501,9 @@ std::vector AppendAttention( const paddle::optional& q_norm_weight, const paddle::optional& k_norm_weight, const paddle::optional& sinks, + const paddle::optional& + block_tables_headwise, // logical 3D, physical rank-2 [max_num_seqs * + // local_kv_heads, max_blocks_per_head] const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -580,6 +596,8 @@ std::vector AppendAttention( } if (mask_offset) { + PD_CHECK(mask_offset.get().dtype() == paddle::DataType::INT32, + "mask_offset must be INT32"); meta_data.mask_offset = mask_offset.get().data(); } @@ -595,6 +613,7 @@ std::vector AppendAttention( batch_id_per_token, cu_seqlens_q, block_tables, + block_tables_headwise, encoder_batch_ids, encoder_tile_ids_per_batch, encoder_num_blocks, @@ -700,6 +719,9 @@ std::vector AppendAttentionWithOutput( const paddle::optional& q_norm_weight, const paddle::optional& k_norm_weight, const paddle::optional& sinks, + const paddle::optional& + block_tables_headwise, // logical 3D, physical rank-2 [max_num_seqs * + // local_kv_heads, max_blocks_per_head] const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -738,6 +760,8 @@ std::vector AppendAttentionWithOutput( meta_data.batch_size = seq_lens_this_time.dims()[0]; if (mask_offset) { + PD_CHECK(mask_offset.get().dtype() == paddle::DataType::INT32, + "mask_offset must be INT32"); meta_data.mask_offset = mask_offset.get().data(); } @@ -753,6 +777,7 @@ std::vector AppendAttentionWithOutput( batch_id_per_token, cu_seqlens_q, block_tables, + block_tables_headwise, encoder_batch_ids, encoder_tile_ids_per_batch, encoder_num_blocks, @@ -871,6 +896,7 @@ std::vector> AppendAttentionInferShape( const paddle::optional>& q_norm_weight_shape, const paddle::optional>& k_norm_weight_shape, const paddle::optional>& sinks_shape, + const paddle::optional>& block_tables_headwise_shape, const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -937,6 +963,7 @@ std::vector AppendAttentionInferDtype( const paddle::optional& q_norm_weight_dtype, const paddle::optional& k_norm_weight_dtype, const paddle::optional& sinks_dtype, + const paddle::optional& block_tables_headwise_dtype, const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -1024,6 +1051,7 @@ std::vector> AppendAttentionWithOutputInferShape( const paddle::optional>& q_norm_weight_shape, const paddle::optional>& k_norm_weight_shape, const paddle::optional>& sinks_shape, + const paddle::optional>& block_tables_headwise_shape, const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -1083,6 +1111,7 @@ std::vector AppendAttentionWithOutputInferDtype( const paddle::optional& q_norm_weight_dtype, const paddle::optional& k_norm_weight_dtype, const paddle::optional& sinks_dtype, + const paddle::optional& block_tables_headwise_dtype, const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -1140,7 +1169,8 @@ PD_BUILD_STATIC_OP(append_attention) paddle::Optional("kv_signal_data"), paddle::Optional("q_norm_weight"), paddle::Optional("k_norm_weight"), - paddle::Optional("sinks")}) + paddle::Optional("sinks"), + paddle::Optional("block_tables_headwise")}) .Outputs({"fmha_out"}) .Attrs({ "rms_norm_eps: float", @@ -1203,7 +1233,8 @@ PD_BUILD_STATIC_OP(append_attention_with_output) paddle::Optional("kv_signal_data"), paddle::Optional("q_norm_weight"), paddle::Optional("k_norm_weight"), - paddle::Optional("sinks")}) + paddle::Optional("sinks"), + paddle::Optional("block_tables_headwise")}) .Outputs({"fmha_out_out"}) .SetInplaceMap({{"fmha_out", "fmha_out_out"}}) .Attrs({ diff --git a/custom_ops/gpu_ops/append_attn/append_attention_c16_impl.cuh b/custom_ops/gpu_ops/append_attn/append_attention_c16_impl.cuh index 70329c9366a..eabbdd2c834 100644 --- a/custom_ops/gpu_ops/append_attn/append_attention_c16_impl.cuh +++ b/custom_ops/gpu_ops/append_attn/append_attention_c16_impl.cuh @@ -44,6 +44,7 @@ void CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -109,6 +110,7 @@ void CascadeAppendAttentionC16Kernel( batch_id_per_token, cu_seqlens_q, block_table, + block_table_headwise, batch_ids, tile_ids_per_batch, num_blocks, @@ -156,6 +158,7 @@ CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -204,6 +207,7 @@ CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -251,6 +255,7 @@ template void CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -298,6 +303,7 @@ template void CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -346,6 +352,7 @@ CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, @@ -393,6 +400,7 @@ template void CascadeAppendAttentionC16Kernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const int num_blocks, diff --git a/custom_ops/gpu_ops/append_attn/append_attention_kernel.h b/custom_ops/gpu_ops/append_attn/append_attention_kernel.h index ca06deeeb75..f340ada3474 100644 --- a/custom_ops/gpu_ops/append_attn/append_attention_kernel.h +++ b/custom_ops/gpu_ops/append_attn/append_attention_kernel.h @@ -47,6 +47,7 @@ void CascadeAppendAttentionKernel( const paddle::Tensor& batch_id_per_token, const paddle::Tensor& cu_seqlens_q, const paddle::Tensor& block_table, + const paddle::optional& block_table_headwise, const paddle::Tensor& batch_ids, const paddle::Tensor& tile_ids_per_batch, const std::string& cache_quant_type_str, @@ -86,6 +87,7 @@ void CascadeAppendAttentionKernel( batch_id_per_token, cu_seqlens_q, block_table, + block_table_headwise, batch_ids, tile_ids_per_batch, num_blocks, diff --git a/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_impl.cuh b/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_impl.cuh index e7463154c43..14183af68b2 100644 --- a/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_impl.cuh +++ b/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_impl.cuh @@ -44,9 +44,11 @@ __global__ void multi_query_append_attention_kernel( const int *__restrict__ tile_ids_per_batch, const int *__restrict__ cu_seqlens_q, const int *__restrict__ block_table, // [bsz, block_num_per_seq] + const int *__restrict__ block_table_hw, const int *__restrict__ mask_offset, const int max_seq_len, const int max_block_num_per_seq, + const int max_blocks_per_head, const float scale, const float quant_max_bound, const float quant_min_bound, @@ -73,7 +75,11 @@ __global__ void multi_query_append_attention_kernel( const uint32_t batch_id = batch_ids[btid]; const uint32_t tile_id = tile_ids_per_batch[btid]; const uint32_t num_rows_per_block = NUM_WARPS * num_frags_x * 16; - const int *block_table_now = block_table + batch_id * max_block_num_per_seq; + const int *block_table_now = + (block_table_hw != nullptr) + ? block_table_hw + + (batch_id * kv_num_heads + kv_head_idx) * max_blocks_per_head + : block_table + batch_id * max_block_num_per_seq; // When cudagraph capture prefill, may launch more gridDim.x if (btid >= static_cast(num_blocks_x_cpu)) { @@ -210,8 +216,11 @@ __global__ void multi_query_append_attention_kernel( const uint32_t const_offset = kv_head_idx * kv_h_stride + (wid * 4 + tid / 8) * kv_b_stride + tid % 8 * num_elems_per_128b(); - const T *cache_k_now = cache_k + block_id * kv_n_stride + const_offset; - const T *cache_v_now = cache_v + block_id * kv_n_stride + const_offset; + // guard: block_id=-1 is an evicted SWA slot; SWA mask zeroes contribution + const T *cache_k_now = + cache_k + (block_id >= 0 ? block_id : 0) * kv_n_stride + const_offset; + const T *cache_v_now = + cache_v + (block_id >= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise(); - T *cache_k_now = cache_k + block_id * kv_n_stride + const_offset; - T *cache_v_now = cache_v + block_id * kv_n_stride + const_offset; + // guard: block_id=-1 is an evicted SWA slot; SWA mask zeroes contribution + T *cache_k_now = + cache_k + (block_id >= 0 ? block_id : 0) * kv_n_stride + const_offset; + T *cache_v_now = + cache_v + (block_id >= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise= 0 ? block_id : 0) * kv_n_stride + const_offset; produce_kv_blockwise &block_table_headwise, const paddle::Tensor &batch_ids, const paddle::Tensor &tile_ids_per_batch, const int num_blocks_x_cpu, @@ -858,6 +875,50 @@ void MultiQueryAppendAttention( auto token_num = meta_data.token_nums; auto bsz = meta_data.batch_size; auto max_block_num_per_seq = meta_data.max_blocks_per_seq; + const int *block_table_hw_ptr = + block_table_headwise ? block_table_headwise.get().data() : nullptr; + const int max_blocks_per_head = + block_table_headwise + ? static_cast(block_table_headwise.get().shape().back()) + : 0; + if (block_table_headwise) { + const auto &hw_shape = block_table_headwise.get().shape(); + PADDLE_ENFORCE_EQ( + hw_shape.size(), + 2u, + phi::errors::InvalidArgument( + "block_tables_headwise must be rank-2 (logical [bsz, " + "kv_num_heads, max_blocks_per_head] flattened to " + "[bsz*kv_num_heads, max_blocks_per_head]); got rank %zu.", + hw_shape.size())); + PADDLE_ENFORCE_EQ( + hw_shape[0], + static_cast(bsz) * static_cast(kv_num_heads), + phi::errors::InvalidArgument( + "block_tables_headwise dim 0 must equal bsz * kv_num_heads " + "(%d * %d = %d); got %ld.", + bsz, + kv_num_heads, + bsz * kv_num_heads, + static_cast(hw_shape[0]))); + PADDLE_ENFORCE_GT( + max_blocks_per_head, + 0, + phi::errors::InvalidArgument( + "block_tables_headwise last dim (max_blocks_per_head) must be " + "> 0; got %d.", + max_blocks_per_head)); + PADDLE_ENFORCE_GE( + max_blocks_per_head, + max_block_num_per_seq, + phi::errors::InvalidArgument( + "block_tables_headwise max_blocks_per_head (%d) must be >= " + "max_block_num_per_seq (%d) to satisfy the per-iteration " + "prefetch contract of multi_query_append_attention C16 " + "kernels.", + max_blocks_per_head, + max_block_num_per_seq)); + } constexpr uint32_t num_warps = 4; constexpr uint32_t NUM_WARP_KV = num_warps / NUM_WARP_Q; @@ -959,9 +1020,11 @@ void MultiQueryAppendAttention( tile_ids_per_batch.data(), cu_seqlens_q.data(), block_table.data(), + block_table_hw_ptr, meta_data.mask_offset, max_seq_len, max_block_num_per_seq, + max_blocks_per_head, scale, quant_max_bound, quant_min_bound, @@ -1027,9 +1090,11 @@ void MultiQueryAppendAttention( tile_ids_per_batch.data(), cu_seqlens_q.data(), block_table.data(), + block_table_hw_ptr, meta_data.mask_offset, max_seq_len, max_block_num_per_seq, + max_blocks_per_head, scale, quant_max_bound, quant_min_bound, @@ -1186,11 +1251,13 @@ void MultiQueryAppendAttention( tile_ids_per_batch.data(), cu_seqlens_q.data(), block_table.data(), + block_table_hw_ptr, meta_data.mask_offset, attn_mask ? const_cast(attn_mask.get().data()) : nullptr, max_seq_len, max_block_num_per_seq, + max_blocks_per_head, scale, quant_max_bound, quant_min_bound, @@ -1244,11 +1311,13 @@ void MultiQueryAppendAttention( tile_ids_per_batch.data(), cu_seqlens_q.data(), block_table.data(), + block_table_hw_ptr, meta_data.mask_offset, attn_mask ? const_cast(attn_mask.get().data()) : nullptr, max_seq_len, max_block_num_per_seq, + max_blocks_per_head, scale, quant_max_bound, quant_min_bound, diff --git a/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_kernel.h b/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_kernel.h index 9fe215be66b..c5e9f7f1b23 100644 --- a/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_kernel.h +++ b/custom_ops/gpu_ops/append_attn/multiquery_attention_c16_kernel.h @@ -39,6 +39,7 @@ void MultiQueryAppendAttention( const paddle::Tensor &batch_id_per_token, const paddle::Tensor &cu_seqlens_q, const paddle::Tensor &block_table, + const paddle::optional &block_table_headwise, const paddle::Tensor &batch_ids, const paddle::Tensor &tile_ids_per_batch, const int num_blocks_x_cpu, diff --git a/custom_ops/gpu_ops/append_attn/template_config.json b/custom_ops/gpu_ops/append_attn/template_config.json index b2590586206..5f1595f43de 100644 --- a/custom_ops/gpu_ops/append_attn/template_config.json +++ b/custom_ops/gpu_ops/append_attn/template_config.json @@ -106,7 +106,7 @@ ], "max_instances_per_file": 160, "file_prefix": "multiquery_attention_c16_", - "function_signature": "template void {function_name}{template_args}(\n const AppendAttnMetaData &meta_data,\n const paddle::Tensor &qkv,\n const paddle::Tensor &cache_k,\n const paddle::Tensor &cache_v,\n const paddle::optional &attn_mask,\n const paddle::optional &shift_bias,\n const paddle::optional &smooth_weight,\n const paddle::optional &sinks,\n const paddle::Tensor &seq_lens_q,\n const paddle::Tensor &seq_lens_kv,\n const paddle::Tensor &seq_lens_encoder,\n const paddle::Tensor &batch_id_per_token,\n const paddle::Tensor &cu_seqlens_q,\n const paddle::Tensor &block_table,\n const paddle::Tensor &batch_ids,\n const paddle::Tensor &tile_ids_per_batch,\n const int num_blocks_x_cpu,\n const int max_seq_len,\n const int max_dec_len,\n const float quant_max_bound,\n const float quant_min_bound,\n const float in_scale,\n const int max_partition_size,\n const int encoder_max_partition_size,\n const int speculate_max_draft_token_num,\n const bool is_decoder,\n cudaStream_t &stream,\n paddle::Tensor *out,\n const int sliding_window,\n const int sink_size);\n\n" + "function_signature": "template void {function_name}{template_args}(\n const AppendAttnMetaData &meta_data,\n const paddle::Tensor &qkv,\n const paddle::Tensor &cache_k,\n const paddle::Tensor &cache_v,\n const paddle::optional &attn_mask,\n const paddle::optional &shift_bias,\n const paddle::optional &smooth_weight,\n const paddle::optional &sinks,\n const paddle::Tensor &seq_lens_q,\n const paddle::Tensor &seq_lens_kv,\n const paddle::Tensor &seq_lens_encoder,\n const paddle::Tensor &batch_id_per_token,\n const paddle::Tensor &cu_seqlens_q,\n const paddle::Tensor &block_table,\n const paddle::optional &block_table_headwise,\n const paddle::Tensor &batch_ids,\n const paddle::Tensor &tile_ids_per_batch,\n const int num_blocks_x_cpu,\n const int max_seq_len,\n const int max_dec_len,\n const float quant_max_bound,\n const float quant_min_bound,\n const float in_scale,\n const int max_partition_size,\n const int encoder_max_partition_size,\n const int speculate_max_draft_token_num,\n const bool is_decoder,\n cudaStream_t &stream,\n paddle::Tensor *out,\n const int sliding_window,\n const int sink_size);\n\n" }, "multiquery_decoder_attention": { "name": "multiquery_decoder_attention", diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 204ea33e50b..a351421f8e2 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -113,6 +113,9 @@ std::vector AppendAttention( const paddle::optional& q_norm_weight, const paddle::optional& k_norm_weight, const paddle::optional& sinks, + const paddle::optional& + block_tables_headwise, // logical 3D, physical rank-2 [max_num_seqs * + // local_kv_heads, max_blocks_per_head] const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, @@ -170,6 +173,9 @@ std::vector AppendAttentionWithOutput( const paddle::optional& q_norm_weight, const paddle::optional& k_norm_weight, const paddle::optional& sinks, + const paddle::optional& + block_tables_headwise, // logical 3D, physical rank-2 [max_num_seqs * + // local_kv_heads, max_blocks_per_head] const float rms_norm_eps, const std::string& compute_dtype, const std::string& cache_quant_type_str, diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index b1e79834d92..4969bf27b17 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -57,7 +57,7 @@ def __init__( local_data_parallel_id=0, ): """ - initialize the PrefixCacheManager + initialize the PrefixCacheManager. """ self.metrics = CacheMetrics() @@ -79,6 +79,27 @@ def __init__( self.num_gpu_blocks = self.cache_config.prefill_kvcache_block_num self.num_cpu_blocks = self.cache_config.num_cpu_blocks + # Head-wise KV cache (Hackathon 10th Spring No.53, mirrors PR #6702 contract). + # Default-off: behavior is bit-identical to mainline unless FD_HEAD_WISE_KV_CACHE=1. + # T53: per-rank KV head count for free-list sizing (TP-aware). + kv_num_heads_global = int( + getattr(getattr(self.cache_config, "model_cfg", None), "num_key_value_heads", 1) or 1 + ) + tp_size = int(self.tensor_parallel_size or 1) + self.kv_num_heads = max(1, kv_num_heads_global // tp_size) if kv_num_heads_global >= tp_size else 1 + _enable_prefix_caching = bool(getattr(self.cache_config, "enable_prefix_caching", False)) + if bool(envs.FD_HEAD_WISE_KV_CACHE) and _enable_prefix_caching: + raise ValueError( + "FD_HEAD_WISE_KV_CACHE is mutually exclusive with enable_prefix_caching " "(matches PR #6702 contract)" + ) + self.head_wise = bool(envs.FD_HEAD_WISE_KV_CACHE) and not _enable_prefix_caching + self.total_head_wise_cache_ids = 0 + # Head-wise free list lives in its OWN attribute so the legacy + # gpu_free_block_list (consumed by allocate_gpu_blocks) keeps its + # [0, num_gpu_blocks) ID space. Aliasing the two lists corrupts the + # legacy allocator with OOB cache ids → CUDA error 700 at decode. + self.gpu_free_head_wise_block_list = [] + self.gpu_free_block_list = list(range(self.num_gpu_blocks - 1, -1, -1)) if self.num_cpu_blocks > 0: self.cpu_free_block_list = list(range(self.num_cpu_blocks - 1, -1, -1)) @@ -172,7 +193,14 @@ def _get_kv_cache_shape(self, max_block_num): @property def available_gpu_resource(self): - return len(self.gpu_free_block_list) / self.num_gpu_blocks if self.num_gpu_blocks > 0 else 0.0 + if self.num_gpu_blocks <= 0: + return 0.0 + if getattr(self, "head_wise", False): + heaps = getattr(self, "gpu_free_head_wise_block_lists", None) + if heaps is not None: + head_free = sum(len(h) for h in heaps) + return (head_free / max(1, self.kv_num_heads)) / self.num_gpu_blocks + return len(self.gpu_free_block_list) / self.num_gpu_blocks def launch_cache_manager( self, @@ -468,6 +496,49 @@ def update_cache_config(self, cache_config): main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) main_process_metrics.available_gpu_resource.set(1.0) + if getattr(self, "head_wise", False): + self._init_head_wise_free_list() + + def _init_head_wise_free_list(self): + """ + Build ``kv_num_heads`` independent free heaps, each over ``[0, num_gpu_blocks)``. + + Each KV head owns its own ID space. The discrete AppendAttention C16 + kernel addresses ``cache_k[block_id, kv_head_idx, :, :]`` — block_id and + head are orthogonal dimensions in the underlying KV cache layout, so + head A using block_id 7 and head B using block_id 7 reference DIFFERENT + KV slices and MUST NOT be considered "colliding". + + T53 PR2 hotfix (RFC-PR2-reanchored §3.1): the prior single shared heap + emitted flattened ids ``[0, num_gpu_blocks * kv_num_heads)`` which the + kernel could not consume directly. The downstream ``flat % num_gpu_blocks`` + modulo aliased distinct heads onto identical block_ids → wrong KV reads + → empty smoke responses. Per-head independent heaps eliminate this + aliasing class entirely. + + Heap sequence ``range(num_gpu_blocks - 1, -1, -1)`` is identical to + :meth:`__init__`'s legacy ``gpu_free_block_list`` initialization (heap + stability — smallest id pops first after heapify). + + The legacy ``gpu_free_head_wise_block_list`` (singular) attribute is + kept as an empty list for compatibility with introspection callers. + """ + kv_num_heads = max(1, self.kv_num_heads) + # Per-head independent free heaps: list of length kv_num_heads, each a + # min-heap over the per-head value space [0, num_gpu_blocks). + self.gpu_free_head_wise_block_lists: list[list[int]] = [ + list(range(self.num_gpu_blocks - 1, -1, -1)) for _ in range(kv_num_heads) + ] + for heap in self.gpu_free_head_wise_block_lists: + heapq.heapify(heap) + # Compatibility: legacy attribute kept as empty list. Tests/callers that + # need free-count should use ``available_head_wise_blocks`` instead. + self.gpu_free_head_wise_block_list = [] + # Per-head value space size (NOT total flat id count). + self.total_head_wise_cache_ids = self.num_gpu_blocks + main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks * kv_num_heads) + main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True): """ Check if num_blocks gpu blocks can be allocated. @@ -532,6 +603,123 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None): main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + def allocate_gpu_blocks_head_wise(self, num_blocks, req_id=None): + """ + Allocate ``num_blocks`` GPU blocks per KV head from independent per-head heaps. + + Returns a head-major nested list of cache ids with shape + ``[kv_num_heads][num_blocks]``. Each row ``h`` contains ids drawn from + the per-head value space ``[0, num_gpu_blocks)``. Heads are independent: + head 0 and head 1 may legitimately return overlapping ids since they + index disjoint KV slices via ``cache_k[block_id, kv_head_idx, :, :]``. + + T53 PR2 hotfix (RFC-PR2-reanchored §3.2): replaces flat-heap allocation + which emitted ids in ``[0, num_gpu_blocks * kv_num_heads)`` and required + a downstream modulo HOTFIX (which silently aliased heads). + + Active only when ``FD_HEAD_WISE_KV_CACHE=1`` (default-off; mainline + behavior is unchanged). + + Raises: + RuntimeError: if any per-head heap has fewer than ``num_blocks`` + free ids. No silent shrink — caller must handle backpressure. + """ + kv_num_heads = max(1, self.kv_num_heads) + per_head_heaps = self.gpu_free_head_wise_block_lists + # Pre-flight: every head must have enough blocks. No silent shrink. + for h, heap in enumerate(per_head_heaps): + if len(heap) < num_blocks: + raise RuntimeError( + f"head-wise gpu free block num at head {h}: {len(heap)} < needed number {num_blocks}" + ) + logger.debug(f"{req_id} start allocate (head-wise, per-head heaps)...") + # Pop num_blocks from EACH per-head heap independently. + allocated = [[heapq.heappop(per_head_heaps[h]) for _ in range(num_blocks)] for h in range(kv_num_heads)] + total_free = sum(len(heap) for heap in per_head_heaps) + logger.info( + f"req_id:{req_id} allocate_gpu_blocks_head_wise: {allocated}, " f"total free across heads {total_free}" + ) + main_process_metrics.free_gpu_block_num.set(total_free) + main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + return allocated + + def recycle_gpu_blocks_head_wise(self, cache_ids, req_id=None): + """ + Recycle head-wise cache ids back into the per-head free heaps. + + Expects a nested list-of-lists of shape ``[kv_num_heads][N]`` (head-major) + as produced by :meth:`allocate_gpu_blocks_head_wise`. Each row ``h`` is + pushed back to ``gpu_free_head_wise_block_lists[h]``. + + T53 PR2 hotfix (RFC-PR2-reanchored §3.3): the flat-list form is no + longer accepted because per-head value spaces are independent — a flat + list does not carry head-of-origin information needed for routing. + + Validation: each id must be in ``[0, num_gpu_blocks)``. Duplicates within + a head's row are dropped (warning). Out-of-range ids are dropped (warning). + + Mirrors the ``prefix_tree_status_signal`` early-return guarded by + :meth:`recycle_gpu_blocks`. + """ + if ( + hasattr(self, "prefix_tree_status_signal") + and self.prefix_tree_status_signal.value[0] != PrefixTreeStatus.NORMAL + ): + logger.warning("Prefix tree is not normal, skip recycle gpu blocks (head-wise)") + return + + kv_num_heads = max(1, self.kv_num_heads) + per_head_heaps = self.gpu_free_head_wise_block_lists + + # Normalize input to nested list-of-lists [kv_num_heads][...]. + # Backward-compat: if a flat list is passed (legacy callers from PR1 + # encoded behavior), interpret it as head-0-only and warn loudly. + if cache_ids and isinstance(cache_ids[0], (list, tuple)): + head_rows = list(cache_ids) + if len(head_rows) != kv_num_heads: + logger.warning( + f"req_id:{req_id} head-wise recycle: nested input has {len(head_rows)} rows, " + f"expected {kv_num_heads}. Padding with empty rows." + ) + # Pad/truncate to kv_num_heads. + head_rows = (head_rows + [[]] * kv_num_heads)[:kv_num_heads] + elif isinstance(cache_ids, (list, tuple)): + logger.warning( + f"req_id:{req_id} head-wise recycle: received flat list (legacy). " + f"Per-head independent heaps require nested [kv_num_heads][N] input. " + f"Routing entire flat list to head-0 only." + ) + head_rows = [list(cache_ids)] + [[] for _ in range(kv_num_heads - 1)] + else: + head_rows = [[cache_ids]] + [[] for _ in range(kv_num_heads - 1)] + + per_head_value_space = self.num_gpu_blocks + total_pushed = 0 + for h, row in enumerate(head_rows): + seen: set = set() + for cid in row: + cid_int = int(cid) + if cid_int in seen: + logger.warning(f"req_id:{req_id} head-wise recycle (head {h}): duplicate cache id {cid} dropped") + continue + if not (0 <= cid_int < per_head_value_space): + logger.warning( + f"req_id:{req_id} head-wise recycle (head {h}): out-of-range cache id {cid} " + f"(valid range [0, {per_head_value_space})) dropped" + ) + continue + seen.add(cid_int) + heapq.heappush(per_head_heaps[h], cid_int) + total_pushed += 1 + + total_free = sum(len(heap) for heap in per_head_heaps) + logger.info( + f"req_id:{req_id} recycle_gpu_blocks_head_wise: pushed {total_pushed} ids, " + f"total free across heads {total_free}" + ) + main_process_metrics.free_gpu_block_num.set(total_free) + main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + def allocate_cpu_blocks(self, num_blocks): """ allocate cpu blocks. diff --git a/fastdeploy/config.py b/fastdeploy/config.py index f18a4c6ee0a..5c33a47e1ea 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -71,7 +71,7 @@ # Some model suffixes are based on auto classes from Transformers: # https://huggingface.co/docs/transformers/en/model_doc/auto -# NOTE: Items higher on this list priority over lower ones +# NOTE: Items higher on this list priority over lower ones. _SUFFIX_TO_DEFAULTS: list[tuple[str, tuple[RunnerType, ConvertType]]] = [ ("ForCausalLM", ("generate", "none")), ("ForConditionalGeneration", ("generate", "none")), @@ -2043,6 +2043,25 @@ def __init__( self.read_from_config() self.postprocess() self.init_pd_info() + # T53 PR1 — engine-main FDConfig fixture for per-head SWA block recycle. + # ResourceManagerV1._should_use_head_wise_swa (resource_manager_v1.py:298-305) + # reads model_config.head_wise_swa_ratio from the engine-main FDConfig instance. + # The worker-side mutation at paddleformers/base.py:793-804 sets the same attrs + # on a DIFFERENT FDConfig copy (worker process). This block mirrors that mutation + # in the engine-main process so the dispatcher gate is not dormant. + # Guards are identical to the worker side — idempotent if already set. + if envs.FD_T53_HEAD_WISE_SWA_FIXTURE: + cfg = self.model_config + n_kv = getattr(cfg, "num_key_value_heads", 1) or 1 + ratio = envs.FD_T53_HEAD_WISE_SWA_RATIO if envs.FD_T53_HEAD_WISE_SWA_RATIO is not None else (1.0 / n_kv) + if getattr(cfg, "window_size", None) is None: + cfg.window_size = 4096 + if getattr(cfg, "sink_size", None) is None: + cfg.sink_size = 0 + if getattr(cfg, "window_attn_skip_freq", None) is None: + cfg.window_attn_skip_freq = 1 + if getattr(cfg, "head_wise_swa_ratio", None) is None: + cfg.head_wise_swa_ratio = ratio if test_mode: return self.check() diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index c17b8821ce2..10f6e0dded4 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -190,6 +190,12 @@ def __init__( # token num self.block_tables = [] + # T53 PR2 B-1: head-wise per-request block tables snapshot. ``None`` + # when the head-wise SWA gate is off; otherwise a ``list[list[int]]`` + # of shape ``[kv_num_heads_local][num_blocks_per_head]`` produced by + # the V1 scheduler and consumed by the worker to fill + # ``share_inputs['block_tables_headwise']``. + self.head_block_tables: Optional[List[List[int]]] = None self.output_token_ids = [] self.num_computed_tokens = num_computed_tokens self.prefill_start_index = prefill_start_index diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index e3d20cc7d02..bd6d754333e 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -56,7 +56,7 @@ @dataclass class ScheduledTaskBase: """ - Task for Scheduled. + Task for Scheduled """ idx: int @@ -84,6 +84,8 @@ class ScheduledDecodeTask(ScheduledTaskBase): """ block_tables: list[int] = field(default_factory=list) + # T53 PR2 will surface per-head block tables to the kernel; PR1 keeps the + # head-wise data inside ``swa_head_block_tables`` for cache management only. @dataclass @@ -252,6 +254,242 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l # Scheduler-side requests that have not been moved into resource manager waiting queue yet. self.scheduler_unhandled_request_num = 0 + # T53 PR1 head-wise SWA recycle state (default-off; populated only when + # FD_HEAD_WISE_KV_CACHE=1). Both maps are keyed by request_id; entries + # are removed by ``_free_blocks`` to prevent the P4 cross-request leak + # flagged in the architecture brief §6. + self.swa_head_recycle_upto: dict[str, list[int]] = {} + self.swa_head_block_tables: dict[str, list[list[int]]] = {} + self.swa_legacy_recycle_upto: dict[str, int] = {} + self.swa_legacy_recycled_blocks: dict[str, set[int]] = {} + + def _swa_window_sink_block(self): + """Return ``(window_blocks, sink_blocks, block_size)`` for SWA recycle. + + Reads ``window_size`` and ``sink_size`` from ``model_config`` (set by + the T53 fixture hook in ``paddleformers/base.py``); falls back to + ``(0, 0)`` when the attributes are absent so callers naturally no-op. + """ + block_size = max(1, int(self.config.cache_config.block_size)) + window = int(getattr(self.config.model_config, "window_size", 0) or 0) + sink = int(getattr(self.config.model_config, "sink_size", 0) or 0) + # ceil(sink/bs) sink blocks must always be retained. + sink_blocks = (sink + block_size - 1) // block_size if sink > 0 else 0 + # window_size tokens of tail must always be retained. + window_blocks = (window + block_size - 1) // block_size if window > 0 else 0 + return window_blocks, sink_blocks, block_size + + def _num_swa_heads(self) -> int: + """Number of KV heads marked as SWA per the head_wise_swa_ratio fixture. + + Convention: positive ratios mark at least one SWA row, capped at KV heads. + Matches the per-head recycle fixture: the leading KV-head group is designated SWA. + + TP-aware (P10 review fix): ``num_key_value_heads`` on ``model_config`` + is the GLOBAL count. Under tensor parallelism each rank only holds + ``num_kv_heads // tp_size`` rows in its head-wise sidecar, so we must + divide here to avoid over-allocating per rank and indexing past the + local KV head count downstream. + """ + kv_num_heads_global = int(getattr(self.config.model_config, "num_key_value_heads", 0) or 0) + if kv_num_heads_global <= 0: + return 0 + tp_size = max(1, int(getattr(self.config.parallel_config, "tensor_parallel_size", 1) or 1)) + # GQA/MQA divisibility guard: when kv >= tp, kv must be divisible by tp + # (Paddle's TP shards KV heads evenly). The kv < tp replication path is + # handled below. + if kv_num_heads_global >= tp_size and kv_num_heads_global % tp_size != 0: + raise ValueError( + f"GQA/MQA constraint violated: kv_num_heads={kv_num_heads_global} " + f"not divisible by tp_size={tp_size} (only kv= tp_size else kv_num_heads_global + ) + ratio = float(getattr(self.config.model_config, "head_wise_swa_ratio", 0.0) or 0.0) + if ratio <= 0.0: + return 0 + if ratio >= 1.0: + return kv_num_heads + return max(1, min(kv_num_heads, int(round(kv_num_heads * ratio)))) + + def _head_wise_swa_active(self) -> bool: + """Return True when the T53 head-wise SWA feature gate is active.""" + return bool(envs.FD_HEAD_WISE_KV_CACHE) and (envs.FD_T53_HEAD_WISE_SWA_RATIO or 0) > 0 + + def _should_use_head_wise_swa(self, num_blocks: int) -> bool: + """Return True when the default-off head-wise SWA sidecar should be populated.""" + return ( + int(getattr(self.config.model_config, "window_size", 0) or 0) > 0 + and hasattr(self.cache_manager, "allocate_gpu_blocks_head_wise") + and hasattr(self.cache_manager, "recycle_gpu_blocks_head_wise") + and num_blocks > 0 + ) + + def _should_skip_swa_recycle_for_overlap(self, request: Request) -> bool: + """Return True if any in-flight cache swap targets this request's blocks. + + ``CacheSwapMetadata`` does not expose a global ``is_inflight`` query, + so we approximate by inspecting the per-request swap and evict queues + the V1 scheduler already publishes (see ``ScheduledTaskBase``). Any + unfinished metadata that touches a block currently owned by ``request`` + is treated as in-flight: skipping the recycle for this turn is safe + because the next schedule call will retry. + """ + block_set = set(int(b) for row in self.swa_head_block_tables.get(request.request_id, []) for b in row) + if not block_set: + return False + for queue_name in ("cache_swap_metadata", "cache_evict_metadata"): + queue = getattr(request, queue_name, None) or [] + for meta in queue: + # P9 fix: missing ``success`` must default to the SAFE direction + # (treat as in-flight) so recycle never overlaps a transfer. + if getattr(meta, "success", False): + continue # already-completed swaps cannot block recycle + ids = list(getattr(meta, "src_block_ids", []) or []) + list(getattr(meta, "dst_block_ids", []) or []) + if any(int(b) in block_set for b in ids): + return True + return False + + def _extend_head_wise_block_tables(self, request: Request, num_new_blocks: int) -> list[list[int]]: + """Allocate ``num_new_blocks`` per head and append to head-wise block table. + + P10/P7 review fix: previously the broad ``except Exception: pass``-style + fallback (warn + return existing rows) silently desynchronised the + head-wise sidecar from the real KV pool — real KV had already been + allocated by the caller, but the sidecar entry was left empty. Any + downstream recycle then leaked real-KV blocks. We now LOG and RE-RAISE + so the caller (``_allocate_gpu_blocks``) can roll the real-KV + allocation back atomically and propagate the failure. + """ + try: + new_rows = self.cache_manager.allocate_gpu_blocks_head_wise(num_new_blocks, request.request_id) + except Exception as exc: + llm_logger.error( + f"head-wise SWA sidecar allocation FAILED for request {request.request_id} " + f"(num_new_blocks={num_new_blocks}); rolling back real-KV allocation: {exc}" + ) + raise + if not new_rows: + llm_logger.error( + f"head-wise SWA sidecar allocation returned no rows for request {request.request_id} " + f"(num_new_blocks={num_new_blocks}); rolling back real-KV allocation" + ) + raise RuntimeError(f"head-wise SWA sidecar empty result for request {request.request_id}") + existing = self.swa_head_block_tables.setdefault( + request.request_id, + [[] for _ in range(max(1, int(getattr(self.cache_manager, "kv_num_heads", len(new_rows) or 1))))], + ) + for h, row in enumerate(new_rows): + if h >= len(existing): + existing.append([]) + existing[h].extend(row) + # T53 PR2 B-1: snapshot the per-head dict onto the Request envelope so + # the worker process (which has no view of ``self.swa_head_block_tables``) + # can materialize ``share_inputs['block_tables_headwise']`` for the + # discrete AppendAttention kernel. Shallow per-row copy is sufficient + # because the worker reads-then-writes its own preallocated tensor. + request.head_block_tables = [list(row) for row in existing] + return existing + + def _recycle_legacy_swa_blocks(self, request: Request, prev: list[int], recycle_from_floor: int) -> int: + """Return fully-aged uniform-SWA legacy block ids to the legacy pool once. + + The active ``request.block_tables`` list stays untouched because worker + kernels index it by absolute block position. ``_free_blocks`` filters + these ids later to avoid double-recycling at request teardown. + """ + block_tables = list(getattr(request, "block_tables", []) or []) + if not block_tables or not hasattr(self.cache_manager, "recycle_gpu_blocks"): + return 0 + head_blocks = self.swa_head_block_tables.get(request.request_id) or [] + local_kv_heads = len(head_blocks) + kv_num_heads = int(getattr(self.config.model_config, "num_key_value_heads", 0) or 0) or local_kv_heads + if kv_num_heads <= 0 or self._num_swa_heads() != kv_num_heads or local_kv_heads <= 0: + return 0 + if len(prev) < local_kv_heads: + return 0 + recycle_upto = min(int(prev[h]) for h in range(local_kv_heads)) + start = max(int(self.swa_legacy_recycle_upto.get(request.request_id, recycle_from_floor)), recycle_from_floor) + end = min(int(recycle_upto), len(block_tables)) + if end <= start: + return 0 + + already = self.swa_legacy_recycled_blocks.setdefault(request.request_id, set()) + legacy_ids = [int(block_id) for block_id in block_tables[start:end] if int(block_id) not in already] + self.swa_legacy_recycle_upto[request.request_id] = end + if not legacy_ids: + return 0 + self.cache_manager.recycle_gpu_blocks(legacy_ids, request.request_id) + already.update(legacy_ids) + return len(legacy_ids) + + def recycle_request_swa_head_cache(self, request: Request) -> int: + """Recycle SWA tail blocks per head (T53 PR1 §2.3). + + Computes the open interval ``[ceil(sink/bs), floor((T-window)/bs))`` + of fully-aged blocks and pushes them back to the head-wise free heap + via ``cache_manager.recycle_gpu_blocks_head_wise``. ``swa_head_recycle_upto`` + is monotonic per head so we never re-release a block. + + Returns the number of blocks released across all heads (0 when no-op). + Default-off: returns immediately when ``FD_HEAD_WISE_KV_CACHE != 1``. + """ + if not envs.FD_HEAD_WISE_KV_CACHE: + return 0 + window_blocks, sink_blocks, block_size = self._swa_window_sink_block() + total_tokens = int(getattr(request, "num_total_tokens", 0) or 0) or int( + getattr(request, "num_computed_tokens", 0) or 0 + ) + if block_size <= 0 or total_tokens < (window_blocks + 1) * block_size: + return 0 + # Boundary guard: only release SWA blocks at exact block boundaries. + # Mid-block decode steps (total_tokens % block_size != 0) leave the + # tail block partially-filled by the in-flight token; releasing it + # would race with the next decode write. We resume recycle on the + # next step that crosses a clean block boundary. + if total_tokens % block_size != 0: + return 0 + head_blocks = self.swa_head_block_tables.get(request.request_id) + if not head_blocks: + return 0 + if self._should_skip_swa_recycle_for_overlap(request): + return 0 + + recycle_upto = max(0, (total_tokens - max(0, window_blocks * block_size)) // block_size) + # Sink floor: never release the first ``sink_blocks`` per head. + recycle_from_floor = sink_blocks + + prev = self.swa_head_recycle_upto.setdefault(request.request_id, [recycle_from_floor for _ in head_blocks]) + if len(prev) < len(head_blocks): + prev.extend([recycle_from_floor for _ in range(len(head_blocks) - len(prev))]) + released_total = 0 + for h in range(self._num_swa_heads()): + if h >= len(head_blocks): + continue + row = head_blocks[h] + start = max(int(prev[h]), recycle_from_floor) + end = min(int(recycle_upto), len(row)) + if end <= start: + continue + to_release = row[start:end] + if not to_release: + continue + self.cache_manager.recycle_gpu_blocks_head_wise(to_release, request.request_id) + prev[h] = end # monotone advance + released_total += len(to_release) + self._recycle_legacy_swa_blocks(request, prev, recycle_from_floor) + # T53 PR2 B-1: refresh the Request snapshot whenever the per-head dict + # is mutated by recycle so the worker sees the post-recycle layout on + # its next ForwardMeta materialization. + if released_total > 0: + request.head_block_tables = [list(row) for row in head_blocks] + return released_total + def allocated_slots(self, request: Request): return len(request.block_tables) * self.config.cache_config.block_size @@ -908,6 +1146,13 @@ def schedule(self): need_abort_requests.append(request) continue + # T53 PR1 head-wise SWA recycle (§2.3 recycle-before-allocate). + # Default-off: helper returns 0 immediately when the + # composite head-wise SWA gate is inactive, so the legacy + # path is bit-identical. + if self._head_wise_swa_active(): + self.recycle_request_swa_head_cache(request) + if ( self.allocated_slots(request) - request.num_total_tokens <= self.config.cache_config.prealloc_dec_block_slot_num_threshold @@ -1366,9 +1611,28 @@ def get_real_bsz(self) -> int: def _allocate_gpu_blocks(self, request: Request, num_blocks: int) -> List[int]: llm_logger.debug(f"[allocate_gpu_blocks] request_id={request.request_id}, num_blocks={num_blocks}") if self.enable_cache_manager_v1: - return self.cache_manager.allocate_gpu_blocks(request, num_blocks) + block_ids = self.cache_manager.allocate_gpu_blocks(request, num_blocks) else: - return self.cache_manager.allocate_gpu_blocks(num_blocks, request.request_id) + block_ids = self.cache_manager.allocate_gpu_blocks(num_blocks, request.request_id) + if self._head_wise_swa_active(): + if self._should_use_head_wise_swa(num_blocks): + # P10/Fix-3 (review): real-KV and head-wise sidecar must commit + # atomically. If the sidecar extension fails we MUST recycle the + # real-KV blocks we just acquired, otherwise ownership drifts and + # those blocks leak (preempt/abort path will not see them). + try: + self._extend_head_wise_block_tables(request, num_blocks) + except Exception: + if block_ids and hasattr(self.cache_manager, "recycle_gpu_blocks"): + try: + self.cache_manager.recycle_gpu_blocks(block_ids, request.request_id) + except Exception as recycle_exc: # pragma: no cover - defensive + llm_logger.error( + f"failed to roll back real-KV blocks {block_ids} for request " + f"{request.request_id} after head-wise sidecar failure: {recycle_exc}" + ) + raise + return block_ids def _request_match_blocks(self, request: Request, skip_storage: bool = True): """ @@ -1640,6 +1904,38 @@ def add_prefilled_request(self, request_output: RequestOutput): self.running.append(request) def _free_blocks(self, request: Request): + early_recycled_legacy = set() + + def _filter_early_recycled(block_ids): + if not early_recycled_legacy: + return block_ids + return [block_id for block_id in block_ids if int(block_id) not in early_recycled_legacy] + + # T53 PR1 head-wise SWA: release any leftover head-wise blocks and clear + # per-request recycle state. P4 fix from architecture brief §6: without + # this drop, ``swa_head_recycle_upto`` and ``swa_head_block_tables`` + # would leak across request_id reuse and corrupt the next request's + # monotone recycle cursor. + if self._head_wise_swa_active(): + head_blocks = self.swa_head_block_tables.pop(request.request_id, None) + head_cursor = self.swa_head_recycle_upto.pop(request.request_id, None) + self.swa_legacy_recycle_upto.pop(request.request_id, None) + early_recycled_legacy = self.swa_legacy_recycled_blocks.pop(request.request_id, set()) + # T53 PR2 B-1: clear the worker-facing snapshot so any stale read + # after teardown sees ``None`` and the materialization layer skips + # the head-wise tensor (kernel falls back to flat ``block_tables``). + request.head_block_tables = None + if head_blocks and hasattr(self.cache_manager, "recycle_gpu_blocks_head_wise"): + if head_cursor: + _, recycle_from_floor, _ = self._swa_window_sink_block() + remaining = [] + for h, row in enumerate(head_blocks): + cursor = int(head_cursor[h]) if h < len(head_cursor) else recycle_from_floor + floor = min(recycle_from_floor, len(row)) + cursor = max(floor, min(cursor, len(row))) + remaining.append(list(row[:floor]) + list(row[cursor:])) + head_blocks = remaining + self.cache_manager.recycle_gpu_blocks_head_wise(head_blocks, request.request_id) if self.enable_cache_manager_v1: self.cache_manager.request_finish(request) elif ( @@ -1647,10 +1943,10 @@ def _free_blocks(self, request: Request): ): self.cache_manager.release_block_ids(request) self.cache_manager.recycle_gpu_blocks( - request.block_tables[request.num_cached_blocks :], request.request_id + _filter_early_recycled(request.block_tables[request.num_cached_blocks :]), request.request_id ) else: - self.cache_manager.recycle_gpu_blocks(request.block_tables, request.request_id) + self.cache_manager.recycle_gpu_blocks(_filter_early_recycled(request.block_tables), request.request_id) request.block_tables = [] if request.request_id in self.using_extend_tables_req_id: diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 6be28f1f3be..8d536f14a85 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -22,12 +22,25 @@ def _validate_split_kv_size(value: int) -> int: - """Validate FD_DETERMINISTIC_SPLIT_KV_SIZE is a positive power of 2.""" + """Validate FD_DETERMINISTIC_SPLIT_KV_SIZE is a positive power of 2""" if value <= 0 or (value & (value - 1)) != 0: raise ValueError(f"FD_DETERMINISTIC_SPLIT_KV_SIZE must be a positive power of 2, got {value}.") return value +def _checked_swa_ratio(raw: str) -> float: + """Parse and validate FD_T53_HEAD_WISE_SWA_RATIO ∈ [0.0, 1.0]. Default: 0.0.""" + if not raw: + return 0.0 + v = float(raw) + if not (0.0 <= v <= 1.0): + raise ValueError( + f"FD_T53_HEAD_WISE_SWA_RATIO={v!r} out of range [0.0, 1.0]. " + f"Use 0.0 to disable, 1.0 for full SWA on all heads." + ) + return v + + environment_variables: dict[str, Callable[[], Any]] = { # Whether to use BF16 on CPU. "FD_CPU_USE_BF16": lambda: os.getenv("FD_CPU_USE_BF16", "False"), @@ -108,6 +121,10 @@ def _validate_split_kv_size(value: int) -> int: "FD_ENC_DEC_BLOCK_NUM": lambda: int(os.getenv("FD_ENC_DEC_BLOCK_NUM", "2")), # enbale max prefill of one execute step "FD_ENABLE_MAX_PREFILL": lambda: int(os.getenv("FD_ENABLE_MAX_PREFILL", "0")), + # T53: per-head SWA block recycle toggles — all default-off; requires FD_HEAD_WISE_KV_CACHE=1 to enter recycle path. + "FD_HEAD_WISE_KV_CACHE": lambda: int(os.getenv("FD_HEAD_WISE_KV_CACHE", "0")), + "FD_T53_HEAD_WISE_SWA_FIXTURE": lambda: int(os.getenv("FD_T53_HEAD_WISE_SWA_FIXTURE", "0")), + "FD_T53_HEAD_WISE_SWA_RATIO": lambda: (_checked_swa_ratio(os.getenv("FD_T53_HEAD_WISE_SWA_RATIO", ""))), # Whether to use PLUGINS. "FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","), # set trace attribute job_id. diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 516344a17f4..c2fd0fed9c1 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -142,6 +142,12 @@ class ForwardMeta: pre_caches_length: int = 0 # Block tables block_tables: Optional[paddle.Tensor] = None + # T53 PR2: rank-2 head-wise block tables for per-head SWA. Shape + # ``[max_num_seqs * kv_num_heads_local, max_blocks_per_head]``, dtype + # ``int32``, padding sentinel ``-1``. ``None`` (default) keeps every other + # backend on the flat ``block_tables`` path; the discrete AppendAttention + # kernel reads this through ``_get_block_tables_headwise``. + block_tables_headwise: Optional[paddle.Tensor] = None # KV caches caches: Optional[list[paddle.Tensor]] = None # Flag of profile run diff --git a/fastdeploy/model_executor/layers/attention/append_attn_backend.py b/fastdeploy/model_executor/layers/attention/append_attn_backend.py index eba781faae0..dc2a2461b2a 100644 --- a/fastdeploy/model_executor/layers/attention/append_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/append_attn_backend.py @@ -231,6 +231,17 @@ def get_attention_meta(self) -> AttentionMetadata: """get_attention_meta""" return self.attention_metadata + def _get_block_tables_headwise(self, forward_meta: ForwardMeta) -> Optional[paddle.Tensor]: + """Return optional rank-2 head-wise block table from forward/cache metadata.""" + block_tables_headwise = getattr(forward_meta, "block_tables_headwise", None) + if block_tables_headwise is not None: + return block_tables_headwise + + cache_manager = getattr(forward_meta, "cache_manager", None) + if cache_manager is not None: + return getattr(cache_manager, "block_tables_headwise", None) + return None + def _get_identity_rotary_embs(self, original_rotary_embs: paddle.Tensor) -> paddle.Tensor: """ Create identity rotary embeddings (cos=1, sin=0) that make RoPE a no-op. @@ -285,6 +296,28 @@ def forward_mixed( forward_mixed """ metadata = self.attention_metadata + block_tables_headwise = self._get_block_tables_headwise(forward_meta) + + # T53 PR2 hotfix (RFC-PR2-reanchored §5): the prior modulo HOTFIX + # ``flat % num_gpu_blocks`` silently aliased distinct heads onto + # identical block_ids when the PR1 allocator emitted flat ids from a + # single shared heap. PR1 has been re-architected to emit per-head ids + # directly in ``{-1} ∪ [0, num_gpu_blocks)`` (RFC-PR1-reanchored §3), + # so this contract is now satisfied at the source — no normalization + # needed and no normalization correct (any modulo would still mask + # producer bugs). + # + # Optional debug invariant check, off by default to avoid CUDA-graph + # capture incompatibility (boolean fancy-indexing + .item() syncs). + # Enable with ``FD_T53_DEBUG_BLOCK_TABLES=1`` outside graph capture. + if os.getenv("FD_T53_DEBUG_BLOCK_TABLES") == "1" and block_tables_headwise is not None: + num_gpu_blocks = forward_meta.caches[2 * layer.layer_id].shape[0] + valid = (block_tables_headwise == -1) | ( + (block_tables_headwise >= 0) & (block_tables_headwise < num_gpu_blocks) + ) + assert bool(valid.all().item()), ( + "PR2 invariant: per-head block id must be in {-1} \u222a " f"[0, num_gpu_blocks={num_gpu_blocks})" + ) # - PaddleFormers fallback: rope_already_applied=True -> use identity RoPE (cos=1, sin=0) rope_already_applied = getattr(forward_meta, "rope_already_applied", False) @@ -438,6 +471,9 @@ def forward_mixed( self.causal, self.speculative_method is not None, sliding_window, + self.sink_size, + self.head_wise_full_hidden if self.head_wise_swa_ratio > 0 else 0, + block_tables_headwise=block_tables_headwise, ) else: res = append_attention( @@ -496,5 +532,6 @@ def forward_mixed( sliding_window, self.sink_size, self.head_wise_full_hidden if self.head_wise_swa_ratio > 0 else 0, + block_tables_headwise=block_tables_headwise, ) return res diff --git a/fastdeploy/model_executor/layers/attention/ops/append_attention.py b/fastdeploy/model_executor/layers/attention/ops/append_attention.py index 8b36ffa85b0..21b263593c3 100644 --- a/fastdeploy/model_executor/layers/attention/ops/append_attention.py +++ b/fastdeploy/model_executor/layers/attention/ops/append_attention.py @@ -85,6 +85,8 @@ def append_attention( sliding_window: int = 0, sink_size: int = 0, head_wise_full_hidden: int = 0, + *, + block_tables_headwise: Optional[paddle.Tensor] = None, ) -> paddle.Tensor: """ append_attention @@ -92,6 +94,9 @@ def append_attention( if current_platform.is_cuda(): if sliding_window > 0 and head_wise_full_hidden > 0: + # TODO(T53 PR2): collapse this dual-call once the C++ slice has a + # per-head SWA/full predicate; block_tables_headwise alone does not + # make the full-head prefix safe under a global sliding-window mask. out_swa = append_attention_gpu( qkv.clone(), key_cache, @@ -129,6 +134,7 @@ def append_attention( q_norm_weight, k_norm_weight, sinks, + block_tables_headwise, rms_norm_eps, compute_type, cache_quant_type, @@ -188,6 +194,7 @@ def append_attention( q_norm_weight, k_norm_weight, sinks, + block_tables_headwise, rms_norm_eps, compute_type, cache_quant_type, @@ -274,11 +281,25 @@ def append_attention_with_output( causal: bool = True, speculate_decoder: bool = False, sliding_window: int = 0, + sink_size: int = 0, + head_wise_full_hidden: int = 0, + *, + block_tables_headwise: Optional[paddle.Tensor] = None, ) -> None: """ append_attention """ if current_platform.is_cuda(): + # TODO(T53 PR3): mirror the dual-call head-wise SWA merge from + # append_attention() above. Until then, the use_output path does not + # implement per-head full-hidden override, so reject any caller that + # tries to enable it here rather than silently dropping the parameter. + if head_wise_full_hidden != 0: + raise NotImplementedError( + "append_attention_with_output: head_wise_full_hidden>0 not yet supported " + "in the use_output path (T53 PR3 scope). Disable FD_T53_HEAD_WISE_SWA_RATIO " + "or set use_output=False." + ) return append_attention_with_output_gpu( qkv, key_cache, @@ -317,6 +338,7 @@ def append_attention_with_output( q_norm_weight, k_norm_weight, sinks, + block_tables_headwise, rms_norm_eps, compute_type, cache_quant_type, @@ -334,7 +356,7 @@ def append_attention_with_output( causal, speculate_decoder, sliding_window, - 0, + sink_size, ) else: raise NotImplementedError diff --git a/fastdeploy/model_executor/models/paddleformers/base.py b/fastdeploy/model_executor/models/paddleformers/base.py index 5eb981d5300..bf67d1a5411 100644 --- a/fastdeploy/model_executor/models/paddleformers/base.py +++ b/fastdeploy/model_executor/models/paddleformers/base.py @@ -14,7 +14,7 @@ # limitations under the License. """ -"""Generic PaddleFormers modeling backend base class.""" +"""Generic PaddleFormers modeling backend base class""" import re from collections.abc import Iterable @@ -26,6 +26,7 @@ from paddleformers.transformers import AutoModel, PretrainedModel from paddleformers.utils.log import logger +from fastdeploy import envs from fastdeploy.model_executor.forward_meta import ForwardMeta # noqa: F401 from fastdeploy.model_executor.graph_optimization.decorator import ( support_graph_optimization, @@ -788,6 +789,20 @@ def create_attention_instances(self) -> dict[int, Attention]: if not hasattr(self.fd_config.model_config, "sliding_window") and sliding_window is not None: self.fd_config.model_config.sliding_window = sliding_window + # Per-head SWA recycle fixture hook — activated by FD_T53_HEAD_WISE_SWA_FIXTURE=1 (default off). + if envs.FD_T53_HEAD_WISE_SWA_FIXTURE: + cfg = self.fd_config.model_config + n_kv = getattr(cfg, "num_key_value_heads", 1) or 1 + ratio = envs.FD_T53_HEAD_WISE_SWA_RATIO if envs.FD_T53_HEAD_WISE_SWA_RATIO is not None else (1.0 / n_kv) + if getattr(cfg, "window_size", None) is None: + cfg.window_size = 4096 + if getattr(cfg, "sink_size", None) is None: + cfg.sink_size = 0 + if getattr(cfg, "window_attn_skip_freq", None) is None: + cfg.window_attn_skip_freq = 1 + if getattr(cfg, "head_wise_swa_ratio", None) is None: + cfg.head_wise_swa_ratio = ratio + attention_instances = {} for i in range(num_layers): attention_instances[i] = Attention( diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 78cf8029504..d9978b88252 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -676,6 +676,14 @@ def _initialize_forward_meta(self, step_use_cudagraph: bool = False, is_dummy_ru cu_seqlens_q=self.model_inputs["cu_seqlens_q"], cu_seqlens_k=self.model_inputs["cu_seqlens_k"], block_tables=self.model_inputs["block_tables"], + # T53 PR2 B-1: MTP/spec-decode is OUT OF SCOPE for the head-wise + # SWA recycle path (deferred to PR3). Force the sidecar to + # ``None`` so the discrete AppendAttention kernel's per-head + # branch is never selected on proposer batches, regardless of + # the target model's gate state. The sidecar is still cloned in + # ``MTPInputBatch.init_share_inputs`` to keep the target/proposer + # share-shape contract intact. + block_tables_headwise=None, caches=self.model_inputs["caches"], encoder_batch_ids=self.model_inputs["encoder_batch_ids"], encoder_tile_ids_per_batch=self.model_inputs["encoder_tile_ids_per_batch"], diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 1127d5c724e..8c5549f181c 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -195,7 +195,7 @@ def __init__( else: self.encoder_cache = None - # Note(Zhengshifeng) init video cache for VL model + # Note(Zhengshifeng) init video cache for VL model. self.video_cache = {} # Sampler @@ -943,6 +943,33 @@ def insert_tasks_v1(self, req_dicts: BatchRequest, num_running_requests: int = N self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables ) + # T53 PR2 B-1: materialize the per-head block tables for the + # discrete AppendAttention kernel. ``head_block_tables`` is + # ``None`` when the head-wise SWA gate is off (every other + # backend reads ``block_tables`` only). Always clear the + # ``kv_num_heads_local`` rows owned by this slot to ``-1`` + # whenever the sidecar is preallocated, so a previous + # occupant's per-head block ids cannot leak into the new + # request when ``head_block_tables`` is absent. + if "block_tables_headwise" in self.share_inputs: + # T53 PR2: read kv_num_heads_local from model_config (set + # before warmup); ``self.input_batch`` may not yet exist + # during _dummy_prefill_inputs / profile runs. + kv_local = max(1, int(getattr(self.model_config, "kv_num_heads", 1) or 1)) + row_start = idx * kv_local + row_end = row_start + kv_local + async_set_value(self.share_inputs["block_tables_headwise"][row_start:row_end, :], -1) + head_tables = getattr(request, "head_block_tables", None) + if head_tables: + for h, head_row in enumerate(head_tables): + if h >= kv_local or not head_row: + continue + n = len(head_row) + async_set_value( + self.share_inputs["block_tables_headwise"][row_start + h : row_start + h + 1, :n], + head_row, + ) + async_set_value(self.share_inputs["stop_flags"][idx : idx + 1], False) async_set_value(self.share_inputs["seq_lens_decoder"][idx : idx + 1], prefill_start_index) @@ -1017,6 +1044,37 @@ def insert_tasks_v1(self, req_dicts: BatchRequest, num_running_requests: int = N self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( request.block_tables, dtype="int32" ) + # T53 PR2 B-1: mirror the head-wise block table refresh on the + # decode path. Always clear-first per slot whenever the + # sidecar is preallocated, regardless of whether the request + # carries ``head_block_tables``. + if "block_tables_headwise" in self.share_inputs: + # T53 PR2: read from model_config (warmup-safe). + kv_local = max(1, int(getattr(self.model_config, "kv_num_heads", 1) or 1)) + row_start = idx * kv_local + row_end = row_start + kv_local + head_tables = getattr(request, "head_block_tables", None) + if current_platform.is_cuda(): + async_set_value(self.share_inputs["block_tables_headwise"][row_start:row_end, :], -1) + if head_tables: + for h, head_row in enumerate(head_tables): + if h >= kv_local or not head_row: + continue + n = len(head_row) + async_set_value( + self.share_inputs["block_tables_headwise"][row_start + h : row_start + h + 1, :n], + head_row, + ) + else: + self.share_inputs["block_tables_headwise"][row_start:row_end, :] = -1 + if head_tables: + for h, head_row in enumerate(head_tables): + if h >= kv_local or not head_row: + continue + n = len(head_row) + self.share_inputs["block_tables_headwise"][row_start + h : row_start + h + 1, :n] = ( + np.array(head_row, dtype="int32") + ) # CPU Tensor self.share_inputs["preempted_idx"][idx : idx + 1, :] = 0 continue @@ -1241,8 +1299,104 @@ def _dummy_prefill_inputs(self, input_length_list: List[int], max_dec_len_list: self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + # T53 PR2 B-1: seed the head-wise block table for warmup/profile + # runs so the discrete AppendAttention kernel reads valid block ids + # during CUDA-graph capture. + # + # T53 PR2 hotfix (RFC-PR2-reanchored §4.1): per-head value spaces + # are independent — head_idx and block_id are orthogonal in + # cache_k[block_id, kv_head_idx, :, :]. The seeding base must NOT + # multiply by kv_local; all heads share the same per-slot block + # range ``[idx*fill_blocks, (idx+1)*fill_blocks)``. The previous + # ``base = (idx*kv_local + h)*fill_blocks`` violated the per-head + # value-space invariant ``[0, num_gpu_blocks)`` for h>=1, idx>0. + if "block_tables_headwise" in self.share_inputs: + # T53 PR2: ``self.input_batch`` is not constructed yet during + # warmup/profile; read kv_num_heads_local from model_config. + kv_local = max(1, int(getattr(self.model_config, "kv_num_heads", 1) or 1)) + row_start = idx * kv_local + hw_max_blocks = self.share_inputs["block_tables_headwise"].shape[1] + fill_blocks = min(block_num, hw_max_blocks) + # Bound the seeded range against the per-head value space so we + # cannot drift outside ``[0, num_gpu_blocks)`` even if a future + # caller bumps max_num_seqs. + num_gpu_blocks = int(getattr(self, "num_gpu_blocks", 0) or 0) + if num_gpu_blocks > 0: + assert (idx + 1) * fill_blocks <= num_gpu_blocks, ( + f"T53 PR2 warmup seeding: (idx+1)*fill_blocks=" + f"{(idx + 1) * fill_blocks} exceeds num_gpu_blocks=" + f"{num_gpu_blocks}; per-head value space invariant violated." + ) + base = idx * fill_blocks # head-independent + for h in range(kv_local): + self.share_inputs["block_tables_headwise"][row_start + h : row_start + h + 1, :fill_blocks] = ( + np.arange(base, base + fill_blocks, 1, dtype="int32") + ) self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"] + def _maybe_slice_block_tables_headwise(self, num_running_requests: int, is_dummy_or_profile_run: bool = False): + """T53 PR2 B-1: gate the head-wise sidecar tensor at ForwardMeta time. + + The discrete AppendAttention C16 kernel branches on the *pointer* of + ``block_tables_headwise``: any non-null tensor activates the per-head + recycle path and triggers a shape assertion of + ``[num_running_requests * kv_num_heads_local, pre_max_block_num]``. + To preserve backward compatibility on every other code path we must + return ``None`` unless: + 1. the composite head-wise SWA gate is active + (``FD_HEAD_WISE_KV_CACHE=1`` and ``FD_T53_HEAD_WISE_SWA_RATIO>0``), and + 2. at least one running request in the batch carries non-empty + ``head_block_tables``. + When forwarded, slice to the live batch so the kernel's shape + assertion holds. + + During CUDA graph capture (``is_dummy_or_profile_run=True``) the live + request list is empty so condition 2 would always be false, causing the + graph to capture the legacy flat-only path and never activate head-wise + SWA in production. We skip the per-request scan and return the + preallocated slice directly so the graph captures the head-wise branch. + Slots whose request lacks head_block_tables keep the -1 sentinel from + reset_share_inputs / per-slot clear-first. The C16 multiquery_attention + kernel reads block 0's cache address as a harmless placeholder for any + block_id == -1 entry; the SWA attention mask zeroes that KV contribution, + so per-head slots without recycle metadata contribute nothing rather than + falling back to flat block_tables. See + custom_ops/gpu_ops/append_attn/multiquery_attention_c16_impl.cuh + (head-wise branch ~L78-82, sentinel handling L215-223 / L605-613, shape + assert ~L894-897). + """ + sidecar = self.share_inputs.get("block_tables_headwise") + # T53 PR2: kv_num_heads_local is read from model_config (warmup-safe); + # this method is also invoked from _dummy_run before InputBatch exists. + _kv_local_warmup = max(1, int(getattr(self.model_config, "kv_num_heads", 1) or 1)) + if sidecar is None: + return None + if not bool(envs.FD_HEAD_WISE_KV_CACHE) or (envs.FD_T53_HEAD_WISE_SWA_RATIO or 0) <= 0: + return None + # T53 PR2: ``self.input_batch`` may be absent during warmup/profile. + # Fall back to the model_config-derived value so dummy capture works. + ib = getattr(self, "input_batch", None) + kv_local = getattr(ib, "kv_num_heads_local", _kv_local_warmup) if ib is not None else _kv_local_warmup + # FIX 1: during CUDA-graph capture forward_batch_reqs_list is empty; + # return the pre-allocated slice directly so the graph captures the + # head-wise path instead of the legacy flat-only path. + if is_dummy_or_profile_run: + return sidecar[: num_running_requests * kv_local] + # Walk the live batch slice and confirm at least one request carries + # per-head block ids; otherwise the kernel would be activated for a + # batch that has nothing to recycle and would read stale rows. + any_head_wise = False + for slot in range(num_running_requests): + req = self.forward_batch_reqs_list[slot] + if req is None: + continue + if getattr(req, "head_block_tables", None): + any_head_wise = True + break + if not any_head_wise: + return None + return sidecar[: num_running_requests * kv_local] + def _prepare_inputs(self, cached_token_num=-1, cached_real_bsz=-1, is_dummy_or_profile_run=False) -> None: """Prepare the model inputs""" @@ -1411,6 +1565,20 @@ def _process_reorder(self) -> None: if self.speculative_decoding: if self.spec_method == SpecMethod.MTP: self.proposer.reorder_inputs(self.share_inputs.index_to_batch_id) + # T53 PR2 FIX 2C: mirror the slot permutation to forward_batch_reqs_list + # so that _maybe_slice_block_tables_headwise reads the correct + # head_block_tables attribute per slot after condense() + + # reorder_split_prefill_and_decode() have rearranged slots. + # index_to_batch_id[new_slot] == orig_slot (batch_id equals the + # original slot index when first registered via get_index_by_batch_id). + old_reqs = list(self.forward_batch_reqs_list) + # Rebuild from clean state to prevent stale tail entries from leaking into + # logprob-settings consumers that scan the entire list (L~1402-1411). + for k in range(len(self.forward_batch_reqs_list)): + self.forward_batch_reqs_list[k] = None + for new_slot, orig_slot in self.share_inputs.index_to_batch_id.items(): + if 0 <= orig_slot < len(old_reqs): + self.forward_batch_reqs_list[new_slot] = old_reqs[orig_slot] def load_model(self) -> None: """load or download model""" @@ -1470,6 +1638,18 @@ def initialize_forward_meta(self, is_dummy_or_profile_run=False): cu_seqlens_q=self.share_inputs["cu_seqlens_q"], cu_seqlens_k=self.share_inputs["cu_seqlens_k"], block_tables=self.share_inputs["block_tables"][:num_running_requests], + # T53 PR2 B-1: only forward the head-wise sidecar when the + # composite SWA gate is active AND at least one running request + # actually carries ``head_block_tables`` (otherwise the discrete + # AppendAttention kernel would be selected unconditionally — its + # branch is taken whenever the pointer is non-null, see + # ``custom_ops/gpu_ops/multiquery_attention/multiquery_attention_c16_impl.cuh``). + # When forwarded, slice the preallocated tensor to the running + # batch shape ``[num_running_requests * kv_num_heads_local, + # pre_max_block_num]`` so the kernel's shape assertion holds. + block_tables_headwise=self._maybe_slice_block_tables_headwise( + num_running_requests, is_dummy_or_profile_run=is_dummy_or_profile_run + ), caches=self.share_inputs["caches"], encoder_batch_ids=self.share_inputs["encoder_batch_ids"], encoder_tile_ids_per_batch=self.share_inputs["encoder_tile_ids_per_batch"], @@ -1983,7 +2163,6 @@ def _dummy_run( capture_prefill: bool = False, accept_all_drafts: bool = False, reject_all_drafts: bool = False, - step_use_cudagraph=False, ) -> paddle.Tensor: """ Use dummy inputs to run before formal execution. @@ -2016,10 +2195,8 @@ def _dummy_run( while True: # 1. Initialize forward meta and attention meta data self._prepare_inputs(is_dummy_or_profile_run=True) - - if not (in_capturing or step_use_cudagraph): - self.forward_meta.step_use_cudagraph = False # 2. Padding inputs for cuda graph + self.forward_meta.step_use_cudagraph = in_capturing and self.forward_meta.step_use_cudagraph self.padding_cudagraph_inputs() # Compute position_ids and slot_mapping self._compute_position_ids_and_slot_mapping() diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 22fee0a92ad..8861ebce276 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -27,7 +27,7 @@ class InputBatch: def __getitem__(self, key): - """Support dictionary-style attribute access""" + """Support dictionary-style attribute access.""" if hasattr(self, key): return getattr(self, key) raise KeyError(f"'{key}' is not a valid attribute of InputBatch") @@ -250,6 +250,21 @@ def init_share_inputs(self): ) // self.cache_config.block_size + self.cache_config.enc_dec_block_num self.block_tables = paddle.full([max_num_seqs, pre_max_block_num], -1, dtype="int32") + # T53 PR2: head-wise block tables (rank-2 head-major, default sentinel + # ``-1``). Sized for the local KV head count on this rank — mirrors the + # ``_num_swa_heads`` formula in resource_manager_v1.py so GQA/MQA + # (kv= tp_size else kv_num_heads_global + ) + self.block_tables_headwise = paddle.full( + [max_num_seqs * self.kv_num_heads_local, pre_max_block_num], -1, dtype="int32" + ) + # Initialize free list free_list = list( range( @@ -485,6 +500,18 @@ def swap_data(tensor, idx1, idx2): # Swap mask rollback swap_data(self.mask_rollback, i1, i2) + # T53 PR2 FIX 2: keep head-wise sidecar rows in lockstep with slot + # move. Each slot owns kv_num_heads_local consecutive rows; the C16 + # kernel expects row[slot*kv_local : (slot+1)*kv_local] to belong to + # the request occupying that slot. + if getattr(self, "block_tables_headwise", None) is not None: + kv_local = getattr(self, "kv_num_heads_local", 1) + i_start, i_end = i1 * kv_local, (i1 + 1) * kv_local + j_start, j_end = i2 * kv_local, (i2 + 1) * kv_local + tmp = self.block_tables_headwise[i_start:i_end].clone() + self.block_tables_headwise[i_start:i_end] = self.block_tables_headwise[j_start:j_end] + self.block_tables_headwise[j_start:j_end] = tmp + def condense(self) -> None: """ Condense the input batch by keeping only the running requests and moving their data to the front. @@ -632,6 +659,12 @@ def reset_share_inputs(self): # Reset block tables fill_paddle_tensor(self, "block_tables", -1) + # T53 PR2 B-1: also reset the head-wise sidecar (when allocated) + # so a slot reused after ``reset_share_inputs`` cannot leak any + # previous occupant's per-head block ids into the discrete + # AppendAttention kernel. + if hasattr(self, "block_tables_headwise") and self.block_tables_headwise is not None: + fill_paddle_tensor(self, "block_tables_headwise", -1) # Reset free list (requires special handling) free_list = list( @@ -752,6 +785,10 @@ def init_share_inputs(self): self.enable_pd_reorder = getattr(self.target_model_input_batch, "enable_pd_reorder", False) self.block_tables = paddle.clone(self.target_model_input_batch["block_tables"]) + # T53 PR2: mirror clone for head-wise block tables so the speculative + # proposer sees the same per-head layout as the target model. + self.kv_num_heads_local = getattr(self.target_model_input_batch, "kv_num_heads_local", 1) + self.block_tables_headwise = paddle.clone(self.target_model_input_batch["block_tables_headwise"]) self.input_ids = paddle.clone(self.target_model_input_batch["input_ids"]) self.input_ids_cpu = paddle.full( shape=[self.scheduler_config.max_num_seqs, self.model_config.max_model_len], @@ -946,6 +983,16 @@ def swap_data(tensor, idx1, idx2): swap_data(self.attn_mask_offsets_full, i1, i2) swap_data(self.attn_mask_offsets_decoder, i1, i2) + # T53 PR2 FIX 2: keep proposer head-wise sidecar rows in lockstep + # with slot move. Mirror of InputBatch.swap_states fix. + if getattr(self, "block_tables_headwise", None) is not None: + kv_local = getattr(self, "kv_num_heads_local", 1) + i_start, i_end = i1 * kv_local, (i1 + 1) * kv_local + j_start, j_end = i2 * kv_local, (i2 + 1) * kv_local + tmp = self.block_tables_headwise[i_start:i_end].clone() + self.block_tables_headwise[i_start:i_end] = self.block_tables_headwise[j_start:j_end] + self.block_tables_headwise[j_start:j_end] = tmp + def reset_model_inputs(self) -> None: """ Reset all paddle tensors in self to their initial state. @@ -959,6 +1006,10 @@ def reset_model_inputs(self) -> None: # Reset all paddle tensors to their default values # Clone the target model inputs to restore initial values self.block_tables = paddle.clone(self.target_model_input_batch["block_tables"]) + # T53 PR2: keep the head-wise mirror in lockstep with the flat + # ``block_tables`` reset so the proposer never reads stale per-head + # rows after a reset. + self.block_tables_headwise = paddle.clone(self.target_model_input_batch["block_tables_headwise"]) self.input_ids = paddle.clone(self.target_model_input_batch["input_ids"]) fill_paddle_tensor(self, "input_ids_cpu", -1) # acceptance rate decline when reset seq_lens_this_time diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 28a943cf9d4..e9950dbbf29 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -82,7 +82,7 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase: """ - get worker of different device + get worker of different device. """ if fd_config.model_config.enable_logprob and not current_platform.is_cuda() and not current_platform.is_xpu(): raise NotImplementedError("Only CUDA and XPU platforms support logprob.") @@ -481,8 +481,6 @@ def event_loop_normal(self) -> None: req_dicts = None self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time()) - self._tp_barrier_wait() if tp_size > 1 else None - # The first worker detects whether there are tasks in the task queue if tp_rank == 0: if self.task_queue.exist_tasks(): @@ -1328,17 +1326,7 @@ def run_worker_proc() -> None: # Trigger CUDAGraph capture worker_proc.graph_optimize_and_warm_up_model() - # Note(ZKK): - # In some scenarios, we need to evaluate the performance of various model based on a fixed batch size and input length. - # Instead of doing end to end tests which is very unstable, we can profile the following line of code to pick the best model. - # so we add an environment variable RUN_DUMMY_FOR_PROFILE to control whether to run dummy run for profile. - # Any Question refer to ChangWenBin. - if int(os.getenv("RUN_DUMMY_FOR_PROFILE", "0")) == 1: - worker_proc.worker.model_runner._dummy_run( - num_tokens=100, batch_size=1, expected_decode_len=10, step_use_cudagraph=True - ) - - # Initialize health status + # Initialize health status and start serving (T53: no per-head state persisted here) worker_proc.init_health_status() worker_proc.start_task_queue_service() diff --git a/tests/cache_manager/test_benchmark_head_wise_swa.py b/tests/cache_manager/test_benchmark_head_wise_swa.py new file mode 100644 index 00000000000..71c5eff1890 --- /dev/null +++ b/tests/cache_manager/test_benchmark_head_wise_swa.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""T53 PR1 head-wise SWA recycle micro-benchmark (CPU, no model). + +Mirrors the ``tests/spec_decode/test_benchmark_ngram_kernel.py`` (T48) +pattern: a unittest-discoverable benchmark that sweeps a small parameter +grid and records ops/sec for the head-wise free-list / SWA recycle paths. + +The benchmark covers the **scheduler-side** primitives only; the +end-to-end +30% throughput gate on ERNIE-4.5-21B-A3B-Paddle is still +exercised by ``.checkpoints/h10/task-53/scripts/bench_recycle.sh`` on +A800 (BF16, fixed-IO, same VRAM, fixture mode). + +Groups +------ + 1. kv_num_heads — [2, 4, 8, 16] (TP shards) + 2. blocks_per_req — [16, 64, 256] (pressure on free list) + 3. window/sink ratio — [(64,32), (1024,128), (4096,256)] + +Run:: + + cd FastDeploy && python tests/cache_manager/test_benchmark_head_wise_swa.py +""" +from __future__ import annotations + +import time +import unittest +from types import SimpleNamespace + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 + +WARMUP = 50 +NUM_ITERS = 500 + + +def _build_prefix_cache(num_blocks: int, kv_num_heads: int) -> PrefixCacheManager: + pcm = object.__new__(PrefixCacheManager) + pcm.num_gpu_blocks = num_blocks + pcm.kv_num_heads = kv_num_heads + pcm._head_wise_free_lists = [list(range(num_blocks)) for _ in range(kv_num_heads)] + pcm._head_wise_alloc = {} + return pcm + + +def _build_rm(window: int, sink: int, block_size: int = 16, kv_num_heads: int = 4): + rm = object.__new__(ResourceManagerV1) + rm.config = SimpleNamespace( + cache_config=SimpleNamespace(block_size=block_size), + model_config=SimpleNamespace(window_size=window, sink_size=sink), + ) + + class _Cache: + def __init__(self, n): + self.kv_num_heads = n + self.recycled = 0 + + def recycle_gpu_blocks_head_wise(self, ids, req_id=None): + self.recycled += 1 + + def allocate_gpu_blocks_head_wise(self, n, req_id=None): + return [list(range(n)) for _ in range(kv_num_heads)] + + rm.cache_manager = _Cache(kv_num_heads) + rm.swa_head_recycle_upto = {} + rm.swa_head_block_tables = {} + return rm + + +def _bench(fn, *args, iters=NUM_ITERS, warmup=WARMUP): + for _ in range(warmup): + fn(*args) + t0 = time.perf_counter() + for _ in range(iters): + fn(*args) + dt = time.perf_counter() - t0 + return iters / dt if dt > 0 else float("inf") + + +class HeadWiseSWABenchmark(unittest.TestCase): + """Micro-bench head-wise alloc / recycle paths""" + + def test_alloc_recycle_throughput_grid(self): + rows = [] + for kv_heads in (2, 4, 8, 16): + for bpr in (16, 64, 256): + pcm = _build_prefix_cache(num_blocks=bpr * 8, kv_num_heads=kv_heads) + + def alloc(): + pcm._head_wise_free_lists = [list(range(bpr * 8)) for _ in range(kv_heads)] + return [[fl.pop() for _ in range(bpr)] for fl in pcm._head_wise_free_lists] + + ops = _bench(alloc, iters=200, warmup=20) + rows.append((kv_heads, bpr, ops)) + + # Print compact table; pytest -s shows it. + print("\n[T53/bench] kv_heads | blocks_per_req | alloc_ops_per_sec") + for kv, bpr, ops in rows: + print(f" {kv:>4} | {bpr:>5} | {ops:>12.0f}") + + # Sanity gate: largest config should still hit > 100 ops/s on CPU. + worst = min(r[2] for r in rows) + self.assertGreater(worst, 100.0, f"alloc throughput collapsed: {worst:.1f} ops/s") + + def test_swa_window_sink_recycle_throughput(self): + rows = [] + for window, sink in ((64, 32), (1024, 128), (4096, 256)): + rm = _build_rm(window=window, sink=sink, kv_num_heads=4) + req = SimpleNamespace( + request_id="bench-0", + num_total_tokens=window * 2, + num_computed_tokens=window * 2, + cache_swap_metadata=[], + cache_evict_metadata=[], + ) + # Pre-populate per-head block tables so recycle has work to do. + rm.swa_head_block_tables[req.request_id] = [list(range(window // 16 + 4)) for _ in range(4)] + + def step(): + # Reset cursor each iter so recycle does work on every call. + rm.swa_head_recycle_upto[req.request_id] = [0 for _ in rm.swa_head_block_tables[req.request_id]] + rm.recycle_request_swa_head_cache(req) + + ops = _bench(step, iters=300, warmup=30) + rows.append((window, sink, ops)) + + print("\n[T53/bench] window | sink | recycle_ops_per_sec") + for w, s, ops in rows: + print(f" {w:>5} | {s:>4} | {ops:>12.0f}") + + # Sanity: even tightest window/sink should sustain > 50 ops/s on CPU. + worst = min(r[2] for r in rows) + self.assertGreater(worst, 50.0, f"recycle throughput collapsed: {worst:.1f} ops/s") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/cache_manager/test_head_wise_abort_reset.py b/tests/cache_manager/test_head_wise_abort_reset.py new file mode 100644 index 00000000000..d9a2c5cea91 --- /dev/null +++ b/tests/cache_manager/test_head_wise_abort_reset.py @@ -0,0 +1,193 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 head-wise SWA abort-reset tests for ``ResourceManagerV1._free_blocks`` + +Case #8 from the feature spec: when a request is aborted mid-flight the +``_free_blocks`` hook (gated by ``FD_HEAD_WISE_KV_CACHE``) MUST + + * release every per-head block id back into the head-wise free heap, + * clear the per-request cursor in ``swa_head_recycle_upto``, + * clear the per-request table in ``swa_head_block_tables``, + * remain idempotent under repeated abort calls (no duplicate heap entries, + no KeyError, no exception). + +Approach mirrors ``test_head_wise_freelist.py`` and ``test_swa_recycle.py``: +both ``PrefixCacheManager`` and ``ResourceManagerV1`` are constructed via +``object.__new__`` because their real ``__init__`` requires a wired +``FDConfig`` plus running IPC signals that cannot be brought up on the +workstation. No MagicMock anywhere — the cache manager is the real +``PrefixCacheManager`` so the heap invariant and dedup logic exercised by +``recycle_gpu_blocks_head_wise`` are the real production code paths. +""" + +import heapq +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 + + +class _DummyMetric: + def set(self, *_a, **_k): + pass + + def inc(self, *_a, **_k): + pass + + def dec(self, *_a, **_k): + pass + + +class _DummyMainMetrics: + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + return _DummyMetric() + + +@pytest.fixture(autouse=True) +def _patch_metrics(monkeypatch): + monkeypatch.setattr( + "fastdeploy.cache_manager.prefix_cache_manager.main_process_metrics", + _DummyMainMetrics(), + ) + + +def _build_pcm(num_gpu_blocks=8, kv_num_heads=4): + """Real PrefixCacheManager with head-wise free list initialized.""" + pcm = object.__new__(PrefixCacheManager) + pcm.cache_config = SimpleNamespace(enable_prefix_caching=False) + pcm.num_gpu_blocks = num_gpu_blocks + pcm.kv_num_heads = kv_num_heads + pcm.head_wise = True + pcm.total_head_wise_cache_ids = 0 + pcm.gpu_free_block_list = [] + pcm._init_head_wise_free_list() + # _free_blocks falls through to enable_cache_manager_v1 branch below; give + # the PCM a no-op request_finish so the legacy code path does not crash. + pcm.request_finish = lambda _req: None + return pcm + + +def _build_rm(pcm): + """Bare ResourceManagerV1 wired to ``pcm`` with the legacy V1 path active.""" + rm = object.__new__(ResourceManagerV1) + rm.cache_manager = pcm + rm.config = SimpleNamespace( + cache_config=SimpleNamespace( + block_size=16, + enable_prefix_caching=False, + ), + scheduler_config=SimpleNamespace(splitwise_role="mixed"), + model_config=SimpleNamespace( + window_size=64, + sink_size=32, + num_key_value_heads=pcm.kv_num_heads, + head_wise_swa_ratio=1.0, + ), + ) + rm.swa_head_recycle_upto = {} + rm.swa_head_block_tables = {} + rm.swa_legacy_recycle_upto = {} + rm.swa_legacy_recycled_blocks = {} + rm.enable_cache_manager_v1 = True # forces request_finish branch + rm.using_extend_tables_req_id = set() + rm.reuse_block_num_map = {} + rm.need_block_num_map = {} + return rm + + +def _fake_request(req_id="req-A"): + return SimpleNamespace( + request_id=req_id, + block_tables=[], + extend_block_tables=[], + num_total_tokens=0, + num_computed_tokens=0, + cache_swap_metadata=[], + cache_evict_metadata=[], + ) + + +def test_abort_releases_head_wise_blocks_back_to_free_list(monkeypatch): + """#8a — aborted req's head-wise ids return to the free heap; heap invariant preserved.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + pcm = _build_pcm(num_gpu_blocks=8, kv_num_heads=4) + rm = _build_rm(pcm) + initial_free = len(pcm.gpu_free_head_wise_block_list) + + # Allocate 3 blocks per head and stash on the per-request map. + allocated = pcm.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-A") + rm.swa_head_block_tables["req-A"] = allocated + assert len(pcm.gpu_free_head_wise_block_list) == initial_free - 12 + + rm._free_blocks(_fake_request("req-A")) + + assert len(pcm.gpu_free_head_wise_block_list) == initial_free, "all 12 ids must return to free heap" + # Heap invariant: smallest id pops first; sequence must be sorted. + snapshot = list(pcm.gpu_free_head_wise_block_list) + pops = [heapq.heappop(snapshot) for _ in range(len(snapshot))] + assert pops == sorted(pops), "free list must remain a valid min-heap after abort" + + +def test_abort_clears_swa_recycle_cursor(monkeypatch): + """#8b — abort drops the per-request entry in ``swa_head_recycle_upto``.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + pcm = _build_pcm() + rm = _build_rm(pcm) + rm.swa_head_recycle_upto["req-B"] = [10, 10, 10, 10] + # No head_blocks for req-B → no recycle call, but the cursor still must be popped. + + rm._free_blocks(_fake_request("req-B")) + + assert "req-B" not in rm.swa_head_recycle_upto + + +def test_abort_clears_swa_head_block_tables(monkeypatch): + """#8c — abort drops the per-request entry in ``swa_head_block_tables``.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + pcm = _build_pcm() + rm = _build_rm(pcm) + allocated = pcm.allocate_gpu_blocks_head_wise(num_blocks=2, req_id="req-C") + rm.swa_head_block_tables["req-C"] = allocated + rm.swa_head_recycle_upto["req-C"] = [0, 0, 0, 0] + + rm._free_blocks(_fake_request("req-C")) + + assert "req-C" not in rm.swa_head_block_tables + assert "req-C" not in rm.swa_head_recycle_upto + + +def test_double_abort_is_idempotent(monkeypatch): + """#8d — second abort is a no-op; free heap size unchanged, no exception, no duplicates.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + pcm = _build_pcm(num_gpu_blocks=8, kv_num_heads=4) + rm = _build_rm(pcm) + initial_free = len(pcm.gpu_free_head_wise_block_list) + + allocated = pcm.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-D") + rm.swa_head_block_tables["req-D"] = allocated + + rm._free_blocks(_fake_request("req-D")) + free_after_first = len(pcm.gpu_free_head_wise_block_list) + assert free_after_first == initial_free + + # Second abort must not raise and must not push any id again. + rm._free_blocks(_fake_request("req-D")) + assert len(pcm.gpu_free_head_wise_block_list) == free_after_first + # No duplicate ids in the heap. + assert len(set(pcm.gpu_free_head_wise_block_list)) == len(pcm.gpu_free_head_wise_block_list) diff --git a/tests/cache_manager/test_head_wise_extend_validation.py b/tests/cache_manager/test_head_wise_extend_validation.py new file mode 100644 index 00000000000..d9fbc8f91a4 --- /dev/null +++ b/tests/cache_manager/test_head_wise_extend_validation.py @@ -0,0 +1,136 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 head-wise SWA extend-validation tests for ``PrefixCacheManager`` + +Case #9 from the feature spec: extending a request's head-wise +allocation at decode time must satisfy four invariants + + * a zero-block extend is a no-op (returns ``[[]] * kv_num_heads``, + free heap unchanged), + * extending past head-wise capacity raises (``assert needed <= len(...)`` + in ``allocate_gpu_blocks_head_wise`` makes this an ``AssertionError``), + * successive extends to the same request yield disjoint ids per head + (the allocator drains via ``heappop`` from a single shared heap so + ids cannot be reissued before recycle), + * after a partial recycle, the next extend reuses recycled ids first + (heap is a min-heap; recycled ids are pushed back via ``heappush``). + +Same ``object.__new__`` construction pattern as ``test_head_wise_freelist.py``. +""" + +import heapq +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager + + +class _DummyMetric: + def set(self, *_a, **_k): + pass + + def inc(self, *_a, **_k): + pass + + def dec(self, *_a, **_k): + pass + + +class _DummyMainMetrics: + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + return _DummyMetric() + + +@pytest.fixture(autouse=True) +def _patch_metrics(monkeypatch): + monkeypatch.setattr( + "fastdeploy.cache_manager.prefix_cache_manager.main_process_metrics", + _DummyMainMetrics(), + ) + + +def _build_manager(num_gpu_blocks=8, kv_num_heads=4): + mgr = object.__new__(PrefixCacheManager) + mgr.cache_config = SimpleNamespace(enable_prefix_caching=False) + mgr.num_gpu_blocks = num_gpu_blocks + mgr.kv_num_heads = kv_num_heads + mgr.head_wise = True + mgr.total_head_wise_cache_ids = 0 + mgr.gpu_free_block_list = [] + mgr.gpu_free_head_wise_block_list = [] + mgr._init_head_wise_free_list() + return mgr + + +def test_extend_with_zero_blocks_is_noop(): + """#9a — alloc(0) returns empty per-head rows, free heap unchanged.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + initial_free = len(mgr.gpu_free_head_wise_block_list) + + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=0, req_id="req-zero") + + assert len(allocated) == 4 + for row in allocated: + assert row == [] + assert len(mgr.gpu_free_head_wise_block_list) == initial_free + + +def test_extend_more_than_available_raises(): + """#9b — requesting more blocks than head-wise capacity raises ``AssertionError``.""" + mgr = _build_manager(num_gpu_blocks=4, kv_num_heads=4) + # Capacity = 4 blocks per head. Request 5 → needed=20 > free=16. + with pytest.raises(AssertionError): + mgr.allocate_gpu_blocks_head_wise(num_blocks=5, req_id="req-overflow") + + +def test_extend_preserves_per_head_disjointness(): + """#9c — successive extends to the same req yield non-overlapping ids per head.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + + first = mgr.allocate_gpu_blocks_head_wise(num_blocks=2, req_id="req-extend") + second = mgr.allocate_gpu_blocks_head_wise(num_blocks=2, req_id="req-extend") + + # Across the two calls, every id ever issued (irrespective of head) must + # be unique — the allocator pops from a single shared heap. + flat = [cid for row in first for cid in row] + [cid for row in second for cid in row] + assert len(flat) == 16 + assert len(set(flat)) == 16, "no id may be issued twice without a recycle in between" + + +def test_extend_after_partial_recycle_uses_recycled_ids(): + """#9d — recycled ids re-enter the heap and are returned by the next alloc (min-heap).""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-cycle") + flat_first = sorted(cid for row in allocated for cid in row) + + # Recycle the lowest 4 ids only. + to_recycle = flat_first[:4] + mgr.recycle_gpu_blocks_head_wise(to_recycle, req_id="req-cycle") + + # Snapshot the heap; the 4 smallest values must be exactly the recycled ids. + snapshot = list(mgr.gpu_free_head_wise_block_list) + smallest_4 = [] + for _ in range(4): + smallest_4.append(heapq.heappop(snapshot)) + assert sorted(smallest_4) == sorted(to_recycle), "recycled ids must be the next to pop" + + # Real next alloc should issue exactly those recycled ids first. + again = mgr.allocate_gpu_blocks_head_wise(num_blocks=1, req_id="req-cycle-2") + flat_again = sorted(cid for row in again for cid in row) + assert flat_again[:4] == sorted(to_recycle) diff --git a/tests/cache_manager/test_head_wise_freelist.py b/tests/cache_manager/test_head_wise_freelist.py new file mode 100644 index 00000000000..478e0de3d0d --- /dev/null +++ b/tests/cache_manager/test_head_wise_freelist.py @@ -0,0 +1,160 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Head-wise KV cache free-list tests for ``PrefixCacheManager`` (T53 PR1) + +Approach: instances are built via ``object.__new__(PrefixCacheManager)`` plus +manual attribute setup. Real ``__init__`` requires a fully-wired ``FDConfig`` +plus running IPC signals which cannot be brought up on a CPU-only workstation +without GPU paddle. The ``object.__new__`` pattern is the same one used by +H10 task-20 ``common_engine`` tests for the identical reason. +""" + +import heapq +import logging +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager + + +class _DummyMetric: + def __init__(self): + self.values = [] + + def set(self, value): + self.values.append(value) + + def inc(self, value=1): + self.values.append(("inc", value)) + + def dec(self, value=1): + self.values.append(("dec", value)) + + +class _DummyMainMetrics: + def __init__(self): + self._metrics = {} + + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + if name not in self._metrics: + self._metrics[name] = _DummyMetric() + return self._metrics[name] + + +def _build_manager(num_gpu_blocks=8, kv_num_heads=4, head_wise=True): + """Construct a bare ``PrefixCacheManager`` and run the head-wise initializer.""" + mgr = object.__new__(PrefixCacheManager) + mgr.cache_config = SimpleNamespace(enable_prefix_caching=False) + mgr.num_gpu_blocks = num_gpu_blocks + mgr.num_cpu_blocks = 0 + mgr.kv_num_heads = kv_num_heads + mgr.head_wise = head_wise + mgr.total_head_wise_cache_ids = 0 + mgr.gpu_free_block_list = list(range(num_gpu_blocks - 1, -1, -1)) + mgr.gpu_free_head_wise_block_list = [] + if head_wise: + mgr._init_head_wise_free_list() + return mgr + + +@pytest.fixture(autouse=True) +def _patch_metrics(monkeypatch): + """Replace the module-level metrics singleton with a recording dummy.""" + dummy = _DummyMainMetrics() + monkeypatch.setattr( + "fastdeploy.cache_manager.prefix_cache_manager.main_process_metrics", + dummy, + ) + return dummy + + +def test_head_wise_free_list_size(): + """#1 — initializer fills heap with num_gpu_blocks * kv_num_heads ids; smallest pops first.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + assert mgr.total_head_wise_cache_ids == 32 + assert len(mgr.gpu_free_head_wise_block_list) == 8 * 4 + # Legacy free list is left untouched (Fix A: split namespaces). + assert len(mgr.gpu_free_block_list) == 8 + # heapq is a min-heap → smallest id pops first. + assert heapq.heappop(mgr.gpu_free_head_wise_block_list) == 0 + + +def test_head_wise_allocate_returns_2d(): + """#2 — alloc returns [kv_num_heads][N], ids in valid range, no duplicates across heads.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-2d") + + assert len(allocated) == 4 # one row per kv head + for row in allocated: + assert len(row) == 3 + + flat = [cid for row in allocated for cid in row] + assert len(flat) == 12 + assert len(set(flat)) == 12 # no duplicates anywhere + for cid in flat: + assert 0 <= cid < mgr.total_head_wise_cache_ids + + +def test_head_wise_recycle_round_trip(): + """#3 — alloc → recycle returns the heap to its initial size; subsequent alloc succeeds.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + initial_free = len(mgr.gpu_free_head_wise_block_list) + + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-rt") + assert len(mgr.gpu_free_head_wise_block_list) == initial_free - 12 + + mgr.recycle_gpu_blocks_head_wise(allocated, req_id="req-rt") + assert len(mgr.gpu_free_head_wise_block_list) == initial_free + + # Heap invariant preserved. + again = mgr.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-rt-2") + assert sum(len(row) for row in again) == 12 + + +def test_head_wise_recycle_dedup_and_range_check(caplog): + """#4 — duplicates and out-of-range ids are dropped (warned), only valid ids re-enter the heap.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + + # Drain a few ids so we can recycle a known-valid one back. + drained = mgr.allocate_gpu_blocks_head_wise(num_blocks=1, req_id="req-drain") + valid_id = drained[0][0] # an id we now own + duplicate = valid_id # used twice in the recycle list + out_of_range = mgr.total_head_wise_cache_ids + 17 # beyond the valid window + + free_before_recycle = len(mgr.gpu_free_head_wise_block_list) + + # ``get_logger`` may produce a non-propagating logger; force propagation so + # caplog can observe the warnings emitted by the recycle path. + pcm_logger = logging.getLogger("prefix_cache_manager") + prior_propagate = pcm_logger.propagate + pcm_logger.propagate = True + try: + with caplog.at_level(logging.WARNING): + mgr.recycle_gpu_blocks_head_wise( + [valid_id, duplicate, out_of_range], + req_id="req-dedup", + ) + finally: + pcm_logger.propagate = prior_propagate + + # Only the single valid id should have been pushed back. + assert len(mgr.gpu_free_head_wise_block_list) == free_before_recycle + 1 + # Warnings should mention either a dropped duplicate or an out-of-range id. + log_text = "\n".join(record.getMessage() for record in caplog.records) + assert ("duplicate" in log_text) or ("out-of-range" in log_text) diff --git a/tests/cache_manager/test_head_wise_tp_consistency.py b/tests/cache_manager/test_head_wise_tp_consistency.py new file mode 100644 index 00000000000..123dfac307f --- /dev/null +++ b/tests/cache_manager/test_head_wise_tp_consistency.py @@ -0,0 +1,138 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 head-wise SWA tensor-parallel consistency tests (P13 fix, commit 5) + +Case #10 from the feature spec: when the model runs under tensor +parallelism, the head-wise free list MUST shard predictably across ranks. +The fix in commit 5 computes per-rank ``kv_num_heads`` as + + kv_num_heads = max(1, kv_num_heads_global // tp_size) + if kv_num_heads_global >= tp_size else 1 + +inside ``PrefixCacheManager.__init__``. The free list size is then +``num_gpu_blocks * kv_num_heads`` per rank, and the heap is a deterministic +descending range so two ranks built with the same parameters emit the same +allocation order. + +We mirror that formula in a small helper and then build managers via +``object.__new__`` (same rationale as ``test_head_wise_freelist.py``). +The constructor itself cannot run on a CPU-only workstation because it +requires a fully-wired ``FDConfig`` plus running IPC signals. +""" + +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager + + +class _DummyMetric: + def set(self, *_a, **_k): + pass + + def inc(self, *_a, **_k): + pass + + def dec(self, *_a, **_k): + pass + + +class _DummyMainMetrics: + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + return _DummyMetric() + + +@pytest.fixture(autouse=True) +def _patch_metrics(monkeypatch): + monkeypatch.setattr( + "fastdeploy.cache_manager.prefix_cache_manager.main_process_metrics", + _DummyMainMetrics(), + ) + + +def _kv_heads_per_rank(kv_num_heads_global, tp_size): + """Mirror commit 5 P13 fix from PrefixCacheManager.__init__ exactly.""" + if kv_num_heads_global >= tp_size: + return max(1, kv_num_heads_global // tp_size) + return 1 + + +def _build_for_rank(kv_num_heads_global, tp_size, num_gpu_blocks=8): + """Bare PrefixCacheManager with the per-rank head count baked in.""" + mgr = object.__new__(PrefixCacheManager) + mgr.cache_config = SimpleNamespace(enable_prefix_caching=False) + mgr.num_gpu_blocks = num_gpu_blocks + mgr.kv_num_heads = _kv_heads_per_rank(kv_num_heads_global, tp_size) + mgr.head_wise = True + mgr.total_head_wise_cache_ids = 0 + mgr.gpu_free_block_list = list(range(num_gpu_blocks - 1, -1, -1)) + mgr.gpu_free_head_wise_block_list = [] + mgr._init_head_wise_free_list() + return mgr + + +def test_tp_size_1_uses_full_kv_heads(): + """#10a — single-rank manager carries the full kv_num_heads_global heads.""" + mgr = _build_for_rank(kv_num_heads_global=4, tp_size=1, num_gpu_blocks=8) + assert mgr.kv_num_heads == 4 + assert mgr.total_head_wise_cache_ids == 8 * 4 + assert len(mgr.gpu_free_head_wise_block_list) == 32 + + +def test_tp_size_2_splits_kv_heads_evenly(): + """#10b — two ranks each carry kv_num_heads/2; sum across ranks equals the global total.""" + rank0 = _build_for_rank(kv_num_heads_global=4, tp_size=2, num_gpu_blocks=8) + rank1 = _build_for_rank(kv_num_heads_global=4, tp_size=2, num_gpu_blocks=8) + assert rank0.kv_num_heads == 2 + assert rank1.kv_num_heads == 2 + total_ids = len(rank0.gpu_free_head_wise_block_list) + len(rank1.gpu_free_head_wise_block_list) + assert total_ids == 8 * 4, f"sum across ranks must equal num_gpu_blocks * kv_num_heads_global; got {total_ids}" + + +def test_tp_uneven_split_truncates_via_floor_div(): + """#10c — non-divisible split uses integer floor (4 heads / 3 ranks → 1 head per rank). + + The source code does NOT raise on uneven splits; it deterministically + truncates via ``//``. That means one head's worth of capacity is + "lost" per rank in this configuration — but the loss is predictable + and identical across ranks, which is the property we assert here. + """ + rank = _build_for_rank(kv_num_heads_global=4, tp_size=3, num_gpu_blocks=8) + assert rank.kv_num_heads == 1, "4 // 3 == 1; commit 5 P13 fix is a deterministic floor" + assert len(rank.gpu_free_head_wise_block_list) == 8 + + # Edge case: more ranks than heads → clamp to 1 head per rank (else branch). + over = _build_for_rank(kv_num_heads_global=2, tp_size=4, num_gpu_blocks=8) + assert over.kv_num_heads == 1 + assert len(over.gpu_free_head_wise_block_list) == 8 + + +def test_tp_alloc_order_deterministic_across_ranks(): + """#10d — same construction params on two ranks produce identical allocation order.""" + rank0 = _build_for_rank(kv_num_heads_global=4, tp_size=2, num_gpu_blocks=8) + rank1 = _build_for_rank(kv_num_heads_global=4, tp_size=2, num_gpu_blocks=8) + + a0 = rank0.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="rank0") + a1 = rank1.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="rank1") + assert a0 == a1, "same heap construction must yield identical pop sequence per head" + + # And the second alloc (after the first drained the smallest ids) is still + # deterministic across ranks. + b0 = rank0.allocate_gpu_blocks_head_wise(num_blocks=2, req_id="rank0-b") + b1 = rank1.allocate_gpu_blocks_head_wise(num_blocks=2, req_id="rank1-b") + assert b0 == b1 diff --git a/tests/cache_manager/test_per_head_heaps.py b/tests/cache_manager/test_per_head_heaps.py new file mode 100644 index 00000000000..428ac4ddc0e --- /dev/null +++ b/tests/cache_manager/test_per_head_heaps.py @@ -0,0 +1,172 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Per-head independent heap invariants for ``PrefixCacheManager`` (T53 PR2). + +These tests pin the contract introduced by RFC-PR2-reanchored §3.1-§4.1: +``kv_num_heads`` independent min-heaps, each over the per-head value space +``[0, num_gpu_blocks)``. The kernel +(``custom_ops/.../multiquery_attention_c16_impl.cuh``) consumes block ids +in ``{-1} \u222a [0, num_gpu_blocks)``; this contract eliminates the prior +shared-heap aliasing class entirely (no modulo HOTFIX needed). + +Construction approach mirrors ``test_head_wise_freelist.py``: +``object.__new__(PrefixCacheManager)`` + manual attribute wiring, because +the real ``__init__`` requires a fully-wired ``FDConfig`` and running IPC +signals which cannot be brought up on a CPU-only workstation. No +``MagicMock`` per repo policy. +""" + +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager + + +class _DummyMetric: + def __init__(self): + self.values = [] + + def set(self, value): + self.values.append(value) + + def inc(self, value=1): + self.values.append(("inc", value)) + + def dec(self, value=1): + self.values.append(("dec", value)) + + +class _DummyMainMetrics: + def __init__(self): + self._metrics = {} + + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + if name not in self._metrics: + self._metrics[name] = _DummyMetric() + return self._metrics[name] + + +def _build_manager(num_gpu_blocks=8, kv_num_heads=4, head_wise=True): + """Construct a bare ``PrefixCacheManager`` and run the head-wise initializer.""" + mgr = object.__new__(PrefixCacheManager) + mgr.cache_config = SimpleNamespace(enable_prefix_caching=False) + mgr.num_gpu_blocks = num_gpu_blocks + mgr.num_cpu_blocks = 0 + mgr.kv_num_heads = kv_num_heads + mgr.head_wise = head_wise + mgr.total_head_wise_cache_ids = 0 + mgr.gpu_free_block_list = list(range(num_gpu_blocks - 1, -1, -1)) + mgr.gpu_free_head_wise_block_list = [] + if head_wise: + mgr._init_head_wise_free_list() + return mgr + + +@pytest.fixture(autouse=True) +def _patch_metrics(monkeypatch): + """Replace the module-level metrics singleton with a recording dummy.""" + dummy = _DummyMainMetrics() + monkeypatch.setattr( + "fastdeploy.cache_manager.prefix_cache_manager.main_process_metrics", + dummy, + ) + return dummy + + +def test_allocated_ids_within_per_head_value_space(): + """#1 — every allocated id lies in the per-head value space [0, num_gpu_blocks).""" + num_gpu_blocks = 8 + kv_num_heads = 4 + mgr = _build_manager(num_gpu_blocks=num_gpu_blocks, kv_num_heads=kv_num_heads) + + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=3, req_id="req-vs") + + assert len(allocated) == kv_num_heads + for row in allocated: + assert len(row) == 3 + for cid in row: + assert 0 <= cid < num_gpu_blocks, f"id {cid} outside per-head value space [0, {num_gpu_blocks})" + + +def test_allocated_ids_unique_within_head(): + """#2 — within a single head's allocation, no id collides with another.""" + mgr = _build_manager(num_gpu_blocks=8, kv_num_heads=4) + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=5, req_id="req-uw") + + for h, row in enumerate(allocated): + assert len(row) == len(set(row)), f"head {h} produced duplicate block ids: {row}" + + +def test_allocated_ids_may_collide_across_heads(): + """#3 — distinct heads share the same value space and may legitimately + return identical ids; this is the whole point of the per-head heap design. + + With num_gpu_blocks=4 and kv_num_heads=4 each pulling 4 blocks, every head + drains its own heap and ends up holding ``[0, 1, 2, 3]``. Cross-head id + collision is therefore not just allowed — it is mandatory. + """ + num_gpu_blocks = 4 + kv_num_heads = 4 + mgr = _build_manager(num_gpu_blocks=num_gpu_blocks, kv_num_heads=kv_num_heads) + + allocated = mgr.allocate_gpu_blocks_head_wise(num_blocks=num_gpu_blocks, req_id="req-cross") + + # Sets across heads: every head should hold the full per-head value space. + expected = set(range(num_gpu_blocks)) + for h, row in enumerate(allocated): + assert set(row) == expected, f"head {h} did not drain its full per-head heap: {row}" + + # And therefore at least one id is shared between head 0 and head 1. + assert set(allocated[0]) & set( + allocated[1] + ), "expected non-empty intersection across heads under per-head heap design" + + +def test_free_returns_to_correct_head_heap(): + """#4 — recycled ids return to the originating head's heap, not somewhere else. + + Allocate, recycle a known id from head 0, then allocate again with that head's + heap fully drained except for the freed id; assert head 0 hands that id back. + """ + num_gpu_blocks = 4 + kv_num_heads = 2 + mgr = _build_manager(num_gpu_blocks=num_gpu_blocks, kv_num_heads=kv_num_heads) + + # Drain every head completely: each head holds [0,1,2,3]. + drained = mgr.allocate_gpu_blocks_head_wise(num_blocks=num_gpu_blocks, req_id="drain") + for h in range(kv_num_heads): + assert len(mgr.gpu_free_head_wise_block_lists[h]) == 0 + + # Recycle exactly one id from head 0 only. + head_0_returned = drained[0][0] + nested_recycle = [[] for _ in range(kv_num_heads)] + nested_recycle[0] = [head_0_returned] + mgr.recycle_gpu_blocks_head_wise(nested_recycle) + + assert mgr.gpu_free_head_wise_block_lists[0] == [head_0_returned] + for h in range(1, kv_num_heads): + assert ( + mgr.gpu_free_head_wise_block_lists[h] == [] + ), f"head {h} heap should remain empty (nothing recycled to it)" + + # Next allocation of 1 block per head must FAIL fast for the still-empty + # heads (head 1..N-1) — proves the freed id lives in head 0's heap and + # cannot be cross-pollinated. + with pytest.raises(RuntimeError): + mgr.allocate_gpu_blocks_head_wise(num_blocks=1, req_id="post-recycle") diff --git a/tests/cache_manager/test_swa_recycle.py b/tests/cache_manager/test_swa_recycle.py new file mode 100644 index 00000000000..d886db8a8da --- /dev/null +++ b/tests/cache_manager/test_swa_recycle.py @@ -0,0 +1,217 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 head-wise SWA recycle tests for ``ResourceManagerV1`` + +These tests cover the three §4 cases from the feature spec: + +* #5 — ``test_swa_recycle_respects_sink_and_window``: sink/window math + releases only fully-aged blocks and ``swa_head_recycle_upto`` is monotone. +* #6 — ``test_swa_recycle_skips_when_swap_inflight``: a request whose + per-request ``cache_swap_metadata`` queue still has unfinished swaps + targeting one of its own blocks is left untouched (recycle is a no-op). +* #7 — ``test_mutual_exclusion_with_prefix_caching``: ``PrefixCacheManager`` + refuses to construct when both ``enable_prefix_caching`` and + ``FD_HEAD_WISE_KV_CACHE`` are on (assertion landed in commit 2). + +Approach: ``ResourceManagerV1`` is built via ``object.__new__`` because its +real ``__init__`` requires a fully-wired ``FDConfig``, IPC signals, and a +running ``CacheManager`` that the workstation cannot bring up. This mirrors +the pattern used in ``test_head_wise_freelist.py`` (commit 2) and the H10 +task-20 ``common_engine`` tests (no MagicMock, real objects only). +""" + +from types import SimpleNamespace + +import pytest + +from fastdeploy.cache_manager.v1.metadata import CacheSwapMetadata +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 + + +class _FakeCacheManager: + """Minimal cache manager exposing the head-wise APIs the SWA recycle calls.""" + + def __init__(self, kv_num_heads=2): + self.kv_num_heads = kv_num_heads + self.recycled = [] # list of (req_id, ids) recorded per call + + def recycle_gpu_blocks_head_wise(self, cache_ids, req_id=None): + self.recycled.append((req_id, list(cache_ids))) + + def allocate_gpu_blocks_head_wise(self, num_blocks, req_id=None): + return [list(range(num_blocks)) for _ in range(self.kv_num_heads)] + + +def _build_manager(window=64, sink=32, block_size=16, kv_num_heads=2, head_wise_swa_ratio=1.0): + """Build a bare ``ResourceManagerV1`` with just the SWA recycle state wired.""" + rm = object.__new__(ResourceManagerV1) + rm.config = SimpleNamespace( + cache_config=SimpleNamespace(block_size=block_size), + model_config=SimpleNamespace( + window_size=window, + sink_size=sink, + num_key_value_heads=kv_num_heads, + head_wise_swa_ratio=head_wise_swa_ratio, + ), + ) + rm.cache_manager = _FakeCacheManager(kv_num_heads=kv_num_heads) + rm.swa_head_recycle_upto = {} + rm.swa_head_block_tables = {} + rm.swa_legacy_recycle_upto = {} + rm.swa_legacy_recycled_blocks = {} + return rm + + +def _fake_request(req_id="req-0", num_total_tokens=512, swap_meta=None, evict_meta=None): + return SimpleNamespace( + request_id=req_id, + num_total_tokens=num_total_tokens, + num_computed_tokens=num_total_tokens, + cache_swap_metadata=list(swap_meta or []), + cache_evict_metadata=list(evict_meta or []), + ) + + +@pytest.mark.parametrize( + ("kv_num_heads", "head_wise_swa_ratio", "expected"), + [ + (4, 1.0, 4), + (4, 0.5, 2), + (4, 0.0, 0), + (1, 0.5, 1), + (1, 1.0, 1), + (1, 0.0, 0), + (8, 0.25, 2), + (3, 0.5, 2), + (2, 0.5, 1), + ], +) +def test_num_swa_heads_clamps_positive_ratios(kv_num_heads, head_wise_swa_ratio, expected): + rm = _build_manager(kv_num_heads=kv_num_heads, head_wise_swa_ratio=head_wise_swa_ratio) + + assert rm._num_swa_heads() == expected + + +# --------------------------------------------------------------------------- +# Case #5 — sink/window math +# --------------------------------------------------------------------------- +def test_swa_recycle_respects_sink_and_window(monkeypatch): + """Only blocks in ``[ceil(sink/bs), floor((T-window)/bs))`` are released; cursor is monotone.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + rm = _build_manager(window=64, sink=32, block_size=16, kv_num_heads=2) + # 32 blocks per head, total tokens = 32 * 16 = 512. + rm.swa_head_block_tables["req-0"] = [list(range(100, 132)), list(range(200, 232))] + req = _fake_request(req_id="req-0", num_total_tokens=512) + + released = rm.recycle_request_swa_head_cache(req) + # window_blocks = ceil(64/16) = 4; sink_blocks = ceil(32/16) = 2. + # recycle_upto = (512 - 4*16) // 16 = 28; floor = 2; per-head release = 26 blocks. + assert released == 26 * 2, f"expected 52 blocks released, got {released}" + # Sink (idx 0,1) and tail window (idx 28..31) must remain untouched. + cursor = rm.swa_head_recycle_upto["req-0"] + assert cursor == [28, 28], f"per-head recycle_upto must equal 28, got {cursor}" + # Verify the recycled IDs match the open interval [2, 28) on each head. + head0_ids = list(range(100 + 2, 100 + 28)) + head1_ids = list(range(200 + 2, 200 + 28)) + recorded = [ids for (_, ids) in rm.cache_manager.recycled] + assert head0_ids in recorded and head1_ids in recorded + + # Second call with the same total_tokens must be a no-op (monotone cursor). + rm.cache_manager.recycled.clear() + released_again = rm.recycle_request_swa_head_cache(req) + assert released_again == 0 + assert rm.swa_head_recycle_upto["req-0"] == [28, 28] + + +def test_swa_recycle_only_recycles_swa_heads(monkeypatch): + """Only the first ``round(kv_heads * ratio)`` rows are recycled; full-attention rows stay intact.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + rm = _build_manager(window=64, sink=32, block_size=16, kv_num_heads=4, head_wise_swa_ratio=0.5) + rm.swa_head_block_tables["req-swa-only"] = [ + list(range(100, 132)), + list(range(200, 232)), + list(range(300, 332)), + list(range(400, 432)), + ] + req = _fake_request(req_id="req-swa-only", num_total_tokens=512) + + released = rm.recycle_request_swa_head_cache(req) + + assert released == 26 * 2 + assert rm.swa_head_recycle_upto["req-swa-only"] == [28, 28, 2, 2] + recorded = [ids for (_, ids) in rm.cache_manager.recycled] + assert list(range(100 + 2, 100 + 28)) in recorded + assert list(range(200 + 2, 200 + 28)) in recorded + assert all(not set(ids).intersection(range(300, 432)) for ids in recorded) + + +def test_swa_recycle_fires_only_on_block_boundary(monkeypatch): + """Decode-step recycle is throttled to block boundaries to avoid per-token O(H*B) scans.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + rm = _build_manager(window=64, sink=32, block_size=16, kv_num_heads=2) + rm.swa_head_block_tables["req-boundary"] = [list(range(100, 132)), list(range(200, 232))] + req = _fake_request(req_id="req-boundary", num_total_tokens=511) + + released = rm.recycle_request_swa_head_cache(req) + + assert released == 0 + assert "req-boundary" not in rm.swa_head_recycle_upto + + +# --------------------------------------------------------------------------- +# Case #6 — overlap with in-flight swap +# --------------------------------------------------------------------------- +def test_swa_recycle_skips_when_swap_inflight(monkeypatch): + """An unfinished ``CacheSwapMetadata`` touching the request's blocks blocks the recycle.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + rm = _build_manager(window=64, sink=32, block_size=16, kv_num_heads=2) + rm.swa_head_block_tables["req-1"] = [list(range(100, 132)), list(range(200, 232))] + # Pending swap touching block 105 (which is in the recycle range for head 0). + pending = CacheSwapMetadata(src_block_ids=[105], dst_block_ids=[999], success=False) + req = _fake_request(req_id="req-1", num_total_tokens=512, swap_meta=[pending]) + + released = rm.recycle_request_swa_head_cache(req) + assert released == 0, "recycle must skip when an in-flight swap targets owned blocks" + assert "req-1" not in rm.swa_head_recycle_upto, "cursor must not advance on skip" + assert rm.cache_manager.recycled == [] + + +# --------------------------------------------------------------------------- +# Case #7 — mutual exclusion vs prefix caching +# --------------------------------------------------------------------------- +def test_mutual_exclusion_with_prefix_caching(monkeypatch): + """``PrefixCacheManager`` must refuse when both head-wise and prefix caching are on.""" + monkeypatch.setattr("fastdeploy.cache_manager.prefix_cache_manager.envs.FD_HEAD_WISE_KV_CACHE", 1) + monkeypatch.setattr("fastdeploy.cache_manager.prefix_cache_manager.envs.ENABLE_V1_KVCACHE_SCHEDULER", 1) + from fastdeploy.cache_manager import prefix_cache_manager as pcm_module + + cache_config = SimpleNamespace( + enable_prefix_caching=True, + total_block_num=4, + prefill_kvcache_block_num=4, + num_cpu_blocks=0, + model_cfg=SimpleNamespace(num_key_value_heads=2), + ) + fake_fd_config = SimpleNamespace( + cache_config=cache_config, + speculative_config=SimpleNamespace(), + ) + with pytest.raises((AssertionError, ValueError)): + pcm_module.PrefixCacheManager( + config=fake_fd_config, + tensor_parallel_size=1, + splitwise_role="mixed", + local_data_parallel_id=0, + ) diff --git a/tests/cache_manager/test_swa_recycle_legacy_relief.py b/tests/cache_manager/test_swa_recycle_legacy_relief.py new file mode 100644 index 00000000000..b940120746c --- /dev/null +++ b/tests/cache_manager/test_swa_recycle_legacy_relief.py @@ -0,0 +1,89 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 legacy-pool relief tests for per-head uniform SWA block recycle""" + +from types import SimpleNamespace + +import pytest + +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 + + +class _FakeCacheManager: + def __init__(self, kv_num_heads=2): + self.kv_num_heads = kv_num_heads + self.head_recycled = [] + self.legacy_recycled = [] + + def recycle_gpu_blocks_head_wise(self, cache_ids, req_id=None): + self.head_recycled.append((req_id, list(cache_ids))) + + def recycle_gpu_blocks(self, block_ids, req_id=None): + self.legacy_recycled.append((req_id, list(block_ids))) + + +def _build_manager(): + rm = object.__new__(ResourceManagerV1) + rm.config = SimpleNamespace( + cache_config=SimpleNamespace(block_size=16, enable_prefix_caching=False), + scheduler_config=SimpleNamespace(splitwise_role="mixed"), + model_config=SimpleNamespace( + window_size=64, + sink_size=32, + num_key_value_heads=2, + head_wise_swa_ratio=1.0, + ), + ) + rm.cache_manager = _FakeCacheManager(kv_num_heads=2) + rm.enable_cache_manager_v1 = False + rm.swa_head_recycle_upto = {} + rm.swa_head_block_tables = {} + rm.swa_legacy_recycle_upto = {} + rm.swa_legacy_recycled_blocks = {} + rm.using_extend_tables_req_id = set() + return rm + + +def test_uniform_swa_recycle_returns_legacy_blocks_without_shifting_block_tables(monkeypatch): + """Uniform SWA frees legacy IDs once while preserving absolute block-table positions.""" + monkeypatch.setattr("fastdeploy.engine.sched.resource_manager_v1.envs.FD_HEAD_WISE_KV_CACHE", 1) + rm = _build_manager() + rm.swa_head_block_tables["req-uniform"] = [list(range(100, 132)), list(range(200, 232))] + original_block_tables = list(range(1000, 1032)) + req = SimpleNamespace( + request_id="req-uniform", + num_total_tokens=512, + num_computed_tokens=512, + block_tables=list(original_block_tables), + num_cached_blocks=0, + ) + + released = rm.recycle_request_swa_head_cache(req) + + assert released == 26 * 2 + assert req.block_tables == original_block_tables + assert rm.cache_manager.legacy_recycled == [("req-uniform", original_block_tables[2:28])] + + rm.recycle_request_swa_head_cache(req) + assert rm.cache_manager.legacy_recycled == [("req-uniform", original_block_tables[2:28])] + + rm._free_blocks(req) + final_legacy_recycle = rm.cache_manager.legacy_recycled[-1][1] + assert not set(final_legacy_recycle).intersection(original_block_tables[2:28]) + assert set(final_legacy_recycle) == set(original_block_tables[:2] + original_block_tables[28:]) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/layers/test_append_attention_head_wise_shapes.py b/tests/layers/test_append_attention_head_wise_shapes.py new file mode 100644 index 00000000000..56360747aa1 --- /dev/null +++ b/tests/layers/test_append_attention_head_wise_shapes.py @@ -0,0 +1,106 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""T53 PR1 head-wise shape/scope oracles + +Case #11 from the feature spec originally proposed kernel-visible +head-wise block tables. PR1 deliberately defers that kernel plumbing to PR2; +these tests pin the PR1 scope instead: + + * per-head cache-management sidecars use head-major rows, + * ``ForwardMeta`` is not extended with ``block_tables_3d`` in PR1, + * block-wise FP8 scale transfer keeps the existing rank-3 + ``[Nb, KvH, Bs]`` contract because ``swap_cache_all_layers`` still + consumes legacy block ids in PR1. + +Paddle is loaded via ``pytest.importorskip`` so the file collects cleanly +on a CPU-only workstation during L0 oracle runs and only executes the +tensor body on a GPU CI worker. +""" + +import pytest + + +def test_head_wise_kv_layout_matches_kv_num_heads(): + """#11a — per-head slice of [Nb, KvH, Bs, Hd] yields [Nb, Bs, Hd].""" + paddle = pytest.importorskip("paddle") + nb, kvh, bs, hd = 4, 2, 8, 16 + t = paddle.zeros([nb, kvh, bs, hd], dtype="float16") + + assert tuple(t.shape) == (nb, kvh, bs, hd) + head0 = t[:, 0, :, :] + head1 = t[:, 1, :, :] + assert tuple(head0.shape) == (nb, bs, hd) + assert tuple(head1.shape) == (nb, bs, hd) + + +def test_forward_meta_unchanged_in_pr1_scope(): + """#11b — PR1 scope: ``ForwardMeta`` is NOT extended with kernel-side fields. + + Per ⚖ Opus 4.7 review (review-pr1-final.md, P3 HIGH), kernel-side plumbing + (``block_tables_3d``) was deliberately moved out of PR1 (cache management) + and into PR2 (AppendAttention discrete kernel). This test pins that scope + decision: ``forward_meta.py`` must NOT carry head-wise kernel fields in PR1. + + AST-only inspection — importing ``fastdeploy.model_executor.forward_meta`` + transitively pulls AppendAttentionBackend → compiled gpu ops, unavailable + on CPU-only environments. + """ + import ast + import pathlib + + src_root = pathlib.Path(__file__).resolve().parents[1].parent + fwd_meta = src_root / "fastdeploy" / "model_executor" / "forward_meta.py" + assert fwd_meta.is_file(), f"forward_meta.py not found at {fwd_meta}" + + tree = ast.parse(fwd_meta.read_text(encoding="utf-8")) + fwd_cls = next( + (n for n in ast.walk(tree) if isinstance(n, ast.ClassDef) and n.name == "ForwardMeta"), + None, + ) + assert fwd_cls is not None, "ForwardMeta class missing from forward_meta.py" + + # PR1 must NOT introduce the head-wise kernel field — that lands in PR2. + head_wise_fields = [ + stmt + for stmt in fwd_cls.body + if isinstance(stmt, ast.AnnAssign) + and isinstance(stmt.target, ast.Name) + and stmt.target.id == "block_tables_3d" + ] + assert head_wise_fields == [], ( + "PR1 scope violation: block_tables_3d must NOT be added to ForwardMeta in PR1; " + "deferred to PR2 (AppendAttention discrete kernel) per Opus review P3." + ) + + +def test_block_wise_fp8_transfer_keeps_rank3_scale_contract(): + """#11c — PR1 must not flatten fp8 scales before ``swap_cache_all_layers``. + + ``swap_cache_all_layers`` reads scale tensors as ``[blocks, heads, block_size]``. + Flattening scales to rank 2 is a PR2/kernel-layout concern and is invalid + while PR1 still sends legacy block ids to the transfer op. + """ + import ast + import pathlib + + src_root = pathlib.Path(__file__).resolve().parents[1].parent + transfer = src_root / "fastdeploy" / "cache_manager" / "cache_transfer_manager.py" + assert transfer.is_file(), f"cache_transfer_manager.py not found at {transfer}" + + tree = ast.parse(transfer.read_text(encoding="utf-8")) + helper_defs = [ + n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef) and n.name == "_maybe_headwise_flatten_scales" + ] + assert helper_defs == [], "PR1 must not flatten block_wise_fp8 scales for swap_cache_all_layers"