Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion xllm/core/distributed_runtime/vlm_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ bool VLMEngine::allocate_kv_cache(const Engine::KVCacheCapacity& kv_cache_cap) {
.block_size(block_size)
.enable_prefix_cache(options_.enable_prefix_cache())
.enable_disagg_pd(options_.enable_disagg_pd())
.enable_cache_upload(options_.enable_cache_upload());
.enable_cache_upload(options_.enable_cache_upload())
.enable_mm_prefix_cache(options_.enable_prefix_cache());
kv_cache_manager_ = std::make_unique<BlockManagerPool>(options);

// init kv cache for each worker in parallel
Expand Down
3 changes: 2 additions & 1 deletion xllm/core/distributed_runtime/vlm_master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ std::shared_ptr<Request> VLMMaster::generate_request(
"Image processor process failed.");
return nullptr;
}

input_processor_->hash_mm_items(mm_inputs, mm_data);
auto prompt = chat_template_->apply(messages);
if (!prompt.has_value()) {
CALLBACK_WITH_ERROR(StatusCode::INVALID_ARGUMENT,
Expand Down Expand Up @@ -494,6 +494,7 @@ bool VLMMaster::build_mm_data_from_image_urls(
LOG(ERROR) << "image processor process failed.";
return false;
}
input_processor_->hash_mm_items(mm_inputs, mm_data);

return true;
}
Expand Down
9 changes: 2 additions & 7 deletions xllm/core/framework/batch/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ void Batch::add(Sequence* sequence, uint32_t allowed_max_token) {
if (input_embedding.defined())
input_embeddings_vec_.emplace_back(input_embedding);

const auto& mm_data = sequence->get_mm_data();
// if (sequence->is_chunked_prefill_stage() && mm_data.valid())
// TODO:Compatible With Chunked Prefill
if ((sequence->stage() == SequenceStage::PREFILL) && mm_data.valid()) {
mm_data_vec_.emplace_back(mm_data);
}
update_forward_type(sequence);
}

Expand Down Expand Up @@ -411,7 +405,7 @@ void Batch::process_sample_output(const RawForwardOutput& raw_output,
int64_t mm_embedding_idx = 0;
const auto sequences = get_sequences();
for (auto* seq : sequences) {
int64_t n_images = seq->get_mm_data().size();
int64_t n_images = seq->mm_data().size();
if (n_images <= 0) {
continue;
}
Expand All @@ -433,6 +427,7 @@ void Batch::process_sample_output(const RawForwardOutput& raw_output,
CHECK(seq->finished());
mm_embedding_idx += output_tensor_size;
}
return;
}

for (size_t output_idx = 0; output_idx < output_targets_.size();
Expand Down
24 changes: 23 additions & 1 deletion xllm/core/framework/batch/batch_input_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License.
#include "framework/model/model_input_params.h"
#include "framework/request/sequence.h"
#include "framework/sampling/sampling_params.h"
#include "request/mm_data_visitor.h"
#include "runtime/params_utils.h"
#include "util/blocking_counter.h"
#include "util/slice.h"
Expand Down Expand Up @@ -312,6 +313,8 @@ void BatchInputBuilder::process_single_sequence(
state.seq_lens.push_back(state.seq_lens.back() + seq_len);
state.q_seq_lens.push_back(state.q_seq_lens.back() + padded_q_seq_len);
#endif
// Process multi-modal input
process_multi_modal_inputs(sequence, n_kv_cache_tokens, q_seq_len, seq_index);
// Process tokens and positions
extract_tokens_and_positions(
sequence, n_kv_cache_tokens, logical_seq_len, seq_len, state_ptr);
Expand Down Expand Up @@ -347,7 +350,11 @@ void BatchInputBuilder::extract_tokens_and_positions(Sequence* sequence,
if (use_mrope_) {
const auto& args = *args_;
MPositionHelper helper(*sequence, args);
state.mrope_positions_vec.emplace_back(helper.get_positions());
const auto& whole_positions = helper.get_positions();
auto position = (sequence->stage() == SequenceStage::DECODE)
? whole_positions
: whole_positions.slice(1, n_kv_cache_tokens, seq_len);
state.mrope_positions_vec.push_back(position);
}

// Process real tokens
Expand Down Expand Up @@ -724,4 +731,19 @@ void BatchInputBuilder::process_swap_block_infos(
swap_block_transfer_infos_->end());
}
}

void BatchInputBuilder::process_multi_modal_inputs(Sequence* sequence,
uint32_t n_kv_cache_tokens,
uint32_t q_seq_len,
int32_t seq_index) {
MMData& mm_data = sequence->mutable_mm_data();
if ((sequence->stage() != SequenceStage::DECODE) && mm_data.valid()) {
UpdateMMItemScheduleStateVisitor visitor(n_kv_cache_tokens, q_seq_len);
mm_data.foreach (visitor);
MMType ty{static_cast<MMType::Value>(mm_data.type())};
MMData new_mm_data(ty, std::move(visitor.mm_data_items_));
new_mm_data.set_seq_index(seq_index);
mm_data_vec_.emplace_back(std::move(new_mm_data));
}
}
} // namespace xllm
6 changes: 5 additions & 1 deletion xllm/core/framework/batch/batch_input_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class BatchInputBuilder {
void process_sequences_multithreaded();
void padding_decode_batch_size(uint32_t num_decoding_tokens,
uint32_t min_decoding_batch_size);
void process_multi_modal_inputs(Sequence* sequence,
uint32_t n_kv_cache_tokens,
uint32_t q_seq_len,
int32_t seq_index);
ForwardInput state_to_forward_input();
RawForwardInput state_to_raw_forward_input();

Expand Down Expand Up @@ -140,7 +144,7 @@ class BatchInputBuilder {
const std::vector<Sequence*>& sequences_;
const std::vector<uint32_t>& allowed_max_tokens_;
const std::vector<torch::Tensor>& input_embeddings_vec_;
const std::vector<MMData>& mm_data_vec_;
std::vector<MMData> mm_data_vec_;
const ModelArgs* args_;

// Builder state
Expand Down
4 changes: 3 additions & 1 deletion xllm/core/framework/batch/mposition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
#include <absl/strings/match.h>

#include "framework/model/model_args.h"
#include "framework/request/mm_batch_data.h"
#include "framework/request/sequence.h"

namespace xllm {
Expand Down Expand Up @@ -47,7 +48,8 @@ std::vector<std::tuple<std::string, int32_t, int32_t>> groupByTokenType(
torch::Tensor MPositionHelper::get_positions() {
// if (seq_.is_chunked_prefill_stage()) {
if (seq_.kv_state().kv_cache_tokens_num() < seq_.num_prompt_tokens()) {
auto& mm_data = seq_.get_mm_data();
auto& data = seq_.mm_data();
MMBatchData mm_data({data});

torch::Tensor image_grid_thw;
if (auto res = mm_data.get<torch::Tensor>("image_grid_thw"))
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/framework/batch/onerec_batch_input_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ ForwardInput OneRecBatchInputBuilder::build_rec_forward_input(
src_ptr + group_encoder_seq_len);
}
// Collect sparse_embedding
auto mm_data = sequence->get_mm_data();
auto mm_data = sequence->mm_data();
auto sparse_embedding_optional =
mm_data.get<torch::Tensor>(Sequence::ENCODER_SPARSE_EMBEDDING_NAME);
if (sparse_embedding_optional.has_value()) {
Expand Down
5 changes: 4 additions & 1 deletion xllm/core/framework/block/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BlockManager {
PROPERTY(int32_t, block_size) = 0;
PROPERTY(bool, enable_prefix_cache) = true;
PROPERTY(bool, enable_disagg_pd) = false;
PROPERTY(bool, enable_mm_prefix_cache) = false;
PROPERTY(bool, enable_cache_upload) = false;
};

Expand All @@ -56,10 +57,12 @@ class BlockManager {
virtual std::vector<Block> allocate(size_t num_blocks) = 0;

virtual std::vector<Block> allocate_shared(
Sequence* sequence,
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks = {}) = 0;

virtual void cache(const Slice<int32_t>& token_ids,
virtual void cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num = 0) = 0;
virtual void cache(const std::vector<Block>& blocks) = 0;
Expand Down
18 changes: 13 additions & 5 deletions xllm/core/framework/block/block_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ BlockManagerImpl::BlockManagerImpl(const Options& options)
CHECK_GT(options.num_blocks(), 0) << "No blocks to allocate";
CHECK_GT(options.block_size(), 0) << "Block size must be positive";
if (options_.enable_prefix_cache()) {
prefix_cache_ = create_prefix_cache(options.block_size(),
options.enable_cache_upload());
PrefixCache::Options prefix_cache_options;
prefix_cache_options.block_size(options.block_size())
.enable_cache_upload(options.enable_cache_upload())
.enable_mm_prefix_cache(options.enable_mm_prefix_cache());
prefix_cache_ = create_prefix_cache(prefix_cache_options);
CHECK(prefix_cache_) << "Failed to create prefix cache!";
}

Expand Down Expand Up @@ -122,14 +125,15 @@ bool BlockManagerImpl::has_enough_blocks(uint32_t num_blocks) {
}

std::vector<Block> BlockManagerImpl::allocate_shared(
Sequence* sequence,
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks) {
// only allocate shared blocks for prefill sequences
if (options_.enable_prefix_cache()) {
AUTO_COUNTER(prefix_cache_latency_seconds_match);

std::vector<Block> shared_blocks =
prefix_cache_->match(tokens_ids, existed_shared_blocks);
prefix_cache_->match(sequence, tokens_ids, existed_shared_blocks);

const size_t prefix_length =
shared_blocks.empty() ? 0
Expand All @@ -148,13 +152,17 @@ std::vector<Block> BlockManagerImpl::allocate_shared(
return {};
}

void BlockManagerImpl::cache(const Slice<int32_t>& token_ids,
void BlockManagerImpl::cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num) {
if (options_.enable_prefix_cache()) {
AUTO_COUNTER(prefix_cache_latency_seconds_insert);
// Add the kv cache to the prefix cache
prefix_cache_->insert(token_ids, blocks, existed_shared_blocks_num);
prefix_cache_->insert(sequence,
token_ids,
blocks,
existed_shared_blocks_num);
}
}

Expand Down
4 changes: 3 additions & 1 deletion xllm/core/framework/block/block_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ class BlockManagerImpl : public BlockManager {

// allocate shared blocks when enable prefix cache
std::vector<Block> allocate_shared(
Sequence* sequence,
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks = {}) override;

// cache blocks when enable prefix cache
void cache(const Slice<int32_t>& token_ids,
void cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num = 0) override;
void cache(const std::vector<Block>& blocks) override;
Expand Down
14 changes: 8 additions & 6 deletions xllm/core/framework/block/block_manager_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
.enable_disagg_pd(options_.enable_disagg_pd())
.enable_cache_upload(options_.host_num_blocks() > 0
? false
: options_.enable_cache_upload());
: options_.enable_cache_upload())
.enable_mm_prefix_cache(options_.enable_mm_prefix_cache());

for (int32_t i = 0; i < dp_size; ++i) {
if (options_.enable_xtensor()) {
Expand All @@ -61,7 +62,7 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
page_size,
/*dp_rank=*/i,
options_.model_id()));
} else if (options.enable_disagg_pd() || options_.enable_kvcache_store()) {
} else if (options_.enable_disagg_pd() || options_.enable_kvcache_store()) {
block_managers_.emplace_back(
std::make_unique<ConcurrentBlockManagerImpl>(block_options));
} else {
Expand Down Expand Up @@ -239,6 +240,7 @@ bool BlockManagerPool::allocate(Sequence* sequence,
LOG(FATAL)
<< "allocate(Sequence* sequence, size_t num_tokens, size_t "
"needed_copy_in_blocks_num) is not implemented in BlockManagerPool.";
return false;
}

std::vector<Block> BlockManagerPool::allocate(size_t num_tokens,
Expand All @@ -264,7 +266,7 @@ bool BlockManagerPool::try_allocate(Sequence* sequence) {
// If the sequence holds shared_blocks, the hash values of these blocks do
// not need to be recalculated and can be reused directly.
shared_blocks = block_managers_[dp_rank]->allocate_shared(
sequence->tokens(), existed_shared_blocks);
sequence, sequence->tokens(), existed_shared_blocks);

if (!shared_blocks.empty()) {
sequence->add_kv_blocks(shared_blocks);
Expand Down Expand Up @@ -333,8 +335,8 @@ void BlockManagerPool::allocate_shared(Sequence* sequence) {
// If the sequence holds shared_blocks, the hash values of these blocks do
// not need to be recalculated and can be reused directly.
std::vector<Block> shared_blocks =
block_managers_[dp_rank]->allocate_shared(sequence->tokens(),
existed_shared_blocks);
block_managers_[dp_rank]->allocate_shared(
sequence, sequence->tokens(), existed_shared_blocks);
sequence->add_shared_kv_blocks(std::move(shared_blocks));
}
}
Expand All @@ -345,7 +347,7 @@ void BlockManagerPool::cache(Sequence* sequence) {
auto* blocks = sequence->kv_state().mutable_kv_blocks();
auto existed_shared_blocks_num = sequence->kv_state().shared_kv_blocks_num();
block_managers_[dp_rank]->cache(
token_ids, *blocks, existed_shared_blocks_num);
sequence, token_ids, *blocks, existed_shared_blocks_num);
}

void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const {
Expand Down
1 change: 1 addition & 0 deletions xllm/core/framework/block/block_manager_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class BlockManagerPool : public KVCacheManager {
PROPERTY(int64_t, num_layers) = 0; // Required when enable_xtensor is true
PROPERTY(int64_t, slot_size) = 0; // Memory size per slot (for xtensor)
PROPERTY(std::string, model_id); // Model ID for multi-model support
PROPERTY(bool, enable_mm_prefix_cache) = false;
};

explicit BlockManagerPool(const Options& options, int32_t dp_size = 1);
Expand Down
11 changes: 8 additions & 3 deletions xllm/core/framework/block/concurrent_block_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@ void ConcurrentBlockManagerImpl::deallocate(const Slice<Block>& blocks) {
}

std::vector<Block> ConcurrentBlockManagerImpl::allocate_shared(
Sequence* sequence,
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks) {
std::lock_guard<std::mutex> lock(mutex_);
return BlockManagerImpl::allocate_shared(tokens_ids);
return BlockManagerImpl::allocate_shared(sequence, tokens_ids);
}

void ConcurrentBlockManagerImpl::cache(const Slice<int32_t>& token_ids,
void ConcurrentBlockManagerImpl::cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num) {
std::lock_guard<std::mutex> lock(mutex_);
BlockManagerImpl::cache(token_ids, blocks, existed_shared_blocks_num);
BlockManagerImpl::cache(sequence,
token_ids,
blocks,
existed_shared_blocks_num);
}

void ConcurrentBlockManagerImpl::cache(const std::vector<Block>& blocks) {
Expand Down
4 changes: 3 additions & 1 deletion xllm/core/framework/block/concurrent_block_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class ConcurrentBlockManagerImpl : public BlockManagerImpl {

// try to share blocks among sequences with the same prefix
std::vector<Block> allocate_shared(
Sequence* sequence,
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks = {}) override;

// cache the blocks
void cache(const Slice<int32_t>& token_ids,
void cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num = 0) override;
void cache(const std::vector<Block>& blocks) override;
Expand Down
6 changes: 4 additions & 2 deletions xllm/core/framework/block/embedding_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ void EmbeddingManager::deallocate(const Slice<Block>& blocks) {
}

std::vector<Block> EmbeddingManager::allocate_shared(
const Slice<int32_t>& /*tokens_ids*/,
Sequence* /*sequence*/,
const Slice<int32_t>& /*token_ids*/,
const Slice<Block>& /*existed_shared_blocks*/) {
return {};
}

void EmbeddingManager::cache(const Slice<int32_t>& /*token_ids*/,
void EmbeddingManager::cache(Sequence* /*sequence*/,
const Slice<int32_t>& /*token_ids*/,
std::vector<Block>& /*blocks*/,
size_t /*existed_shared_blocks_num*/) {}

Expand Down
6 changes: 4 additions & 2 deletions xllm/core/framework/block/embedding_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ class EmbeddingManager : public BlockManager {
void deallocate(const Slice<Block>& blocks) override;

std::vector<Block> allocate_shared(
const Slice<int32_t>& tokens_ids,
Sequence* sequence,
const Slice<int32_t>& token_ids,
const Slice<Block>& existed_shared_blocks = {}) override;

void cache(const Slice<int32_t>& token_ids,
void cache(Sequence* sequence,
const Slice<int32_t>& token_ids,
std::vector<Block>& blocks,
size_t existed_shared_blocks_num = 0) override;
void cache(const std::vector<Block>& blocks) override;
Expand Down
5 changes: 3 additions & 2 deletions xllm/core/framework/block/hierarchy_block_manager_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ void HierarchyBlockManagerPool::allocate_host_shared(Sequence* sequence) {
if (options_.enable_prefix_cache()) {
int32_t dp_rank = BlockManagerPool::get_dp_rank(sequence);
std::vector<Block> shared_blocks =
host_block_managers_[dp_rank]->allocate_shared(sequence->tokens());
host_block_managers_[dp_rank]->allocate_shared(sequence,
sequence->tokens());
sequence->add_shared_host_kv_blocks(std::move(shared_blocks));
}
}
Expand All @@ -218,7 +219,7 @@ void HierarchyBlockManagerPool::prefetch_from_storage(
int32_t dp_rank = BlockManagerPool::get_dp_rank(prefill_sequence.get());
std::vector<Block> shared_blocks =
host_block_managers_[dp_rank]->allocate_shared(
prefill_sequence->tokens());
prefill_sequence.get(), prefill_sequence->tokens());
prefill_sequence->add_shared_host_kv_blocks(std::move(shared_blocks));

// round down to the nearest block number
Expand Down
Loading
Loading