Skip to content

Commit a55e472

Browse files
DragonFivemaxiaolong.maxwell
authored andcommitted
feat: add rec scheduler master and engine.
1 parent cc2ef9e commit a55e472

26 files changed

+1245
-24
lines changed

xllm/api_service/api_service.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ limitations under the License.
2727
#include "core/common/metrics.h"
2828
#include "core/distributed_runtime/dit_master.h"
2929
#include "core/distributed_runtime/llm_master.h"
30-
// TODO. add following when next pr.
31-
// #include "core/runtime/rec_master.h"
30+
#include "core/distributed_runtime/rec_master.h"
3231
#include "core/distributed_runtime/vlm_master.h"
3332
#include "core/util/closure_guard.h"
3433
#include "embedding.pb.h"
@@ -73,8 +72,6 @@ APIService::APIService(Master* master,
7372
std::make_unique<ImageGenerationServiceImpl>(
7473
dynamic_cast<DiTMaster*>(master), model_names);
7574
} else if (FLAGS_backend == "rec") {
76-
// TODO. delete this when next pr.
77-
using RecMaster = LLMMaster;
7875
rec_completion_service_impl_ = std::make_unique<RecCompletionServiceImpl>(
7976
dynamic_cast<RecMaster*>(master), model_names);
8077
}

xllm/api_service/rec_completion_service_impl.cpp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ limitations under the License.
2727
#include "common/instance_name.h"
2828
#include "completion.pb.h"
2929
#include "core/distributed_runtime/llm_master.h"
30+
#include "core/distributed_runtime/rec_master.h"
3031
#include "core/framework/request/mm_data.h"
3132
#include "core/framework/request/request_output.h"
32-
// TODO. add following when next pr.
33-
// #include "core/runtime/rec_master.h"
3433
#include "core/util/utils.h"
3534

3635
#define likely(x) __builtin_expect(!!(x), 1)
@@ -89,9 +88,7 @@ bool send_result_to_client_brpc_rec(std::shared_ptr<CompletionCall> call,
8988
// Add rec specific output tensors
9089
auto output_tensor = response.mutable_output_tensors()->Add();
9190
output_tensor->set_name("rec_result");
92-
// TODO: add following when next pr.
93-
// if (FLAGS_enable_constrained_decoding) {
94-
if (true) {
91+
if (FLAGS_enable_constrained_decoding) {
9592
output_tensor->set_datatype(proto::DataType::INT64);
9693
output_tensor->mutable_shape()->Add(req_output.outputs.size());
9794
output_tensor->mutable_shape()->Add(1); // Single item per output
@@ -190,11 +187,8 @@ void RecCompletionServiceImpl::process_async_impl(
190187
master_->handle_request(
191188
std::move(rpc_request_ref.prompt()),
192189
std::move(prompt_tokens),
193-
// TODO. add following when next pr.
194-
// std::move(mm_data),
190+
std::move(mm_data),
195191
std::move(request_params),
196-
// TODO. delete this when next pr.
197-
call.get(),
198192
[call,
199193
model,
200194
master = master_,

xllm/api_service/rec_completion_service_impl.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919

2020
#include "api_service_impl.h"
2121
#include "completion.pb.h"
22+
#include "core/distributed_runtime/rec_master.h"
2223
#include "rec.pb.h"
2324
#include "stream_call.h"
2425

@@ -27,10 +28,6 @@ namespace xllm {
2728
using CompletionCall =
2829
StreamCall<proto::CompletionRequest, proto::CompletionResponse>;
2930

30-
// TODO. add following when next pr.
31-
// class RecMaster;
32-
using RecMaster = LLMMaster;
33-
3431
// a class to handle completion requests
3532
class RecCompletionServiceImpl final : public APIServiceImpl<CompletionCall> {
3633
public:
@@ -45,4 +42,4 @@ class RecCompletionServiceImpl final : public APIServiceImpl<CompletionCall> {
4542
RecMaster* master_ = nullptr;
4643
};
4744

48-
} // namespace xllm
45+
} // namespace xllm

xllm/core/common/global_flags.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ DEFINE_int32(
164164
256,
165165
"Max decode token per sequence which used for ZeroEvictionScheduler.");
166166

167+
// for rec, it's better to set to 100;
168+
DEFINE_int32(request_queue_size,
169+
100000,
170+
"The request queue size of the scheduler");
171+
167172
// --- parallel config ---
168173

169174
DEFINE_int32(dp_size, 1, "Data parallel size for MLA attention.");

xllm/core/common/global_flags.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ DECLARE_bool(enable_latency_aware_schedule);
187187

188188
DECLARE_int32(profile_max_prompt_length);
189189

190+
DECLARE_int32(request_queue_size);
191+
190192
DECLARE_bool(enable_profile_kv_blocks);
191193

192194
DECLARE_bool(disable_ttft_profiling);

xllm/core/distributed_runtime/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ cc_library(
2121
vlm_engine.h
2222
vlm_master.h
2323
speculative_engine.h
24+
rec_engine.h
25+
rec_master.h
2426
disagg_pd_service.h
2527
disagg_pd_service_impl.h
2628
pd_ooc_service.h
@@ -40,6 +42,8 @@ cc_library(
4042
vlm_engine.cpp
4143
vlm_master.cpp
4244
speculative_engine.cpp
45+
rec_engine.cpp
46+
rec_master.cpp
4347
disagg_pd_service.cpp
4448
disagg_pd_service_impl.cpp
4549
pd_ooc_service.cpp

xllm/core/distributed_runtime/master.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ limitations under the License.
3434
#include "llm_engine.h"
3535
#include "llm_master.h"
3636
#include "models/model_registry.h"
37+
#include "rec_engine.h"
38+
#include "rec_master.h"
3739
#include "speculative_engine.h"
3840
#include "util/device_name_utils.h"
3941
#include "util/scope_guard.h"
@@ -231,6 +233,35 @@ Master::Master(const Options& options, EngineType type) : options_(options) {
231233
eng_options.device_ip(options_.device_ip().value());
232234
}
233235
engine_ = std::make_unique<LLMEngine>(eng_options);
236+
} else if (type == EngineType::REC) {
237+
options_.enable_schedule_overlap(false);
238+
LOG(WARNING) << "Force to disable schedule overlap for REC model, not "
239+
"supported yet.";
240+
runtime::Options eng_options;
241+
eng_options.model_path(options_.model_path())
242+
.devices(devices)
243+
.backend(options_.backend())
244+
.block_size(options_.block_size())
245+
.max_cache_size(options_.max_cache_size())
246+
.max_memory_utilization(options_.max_memory_utilization())
247+
.enable_prefix_cache(options_.enable_prefix_cache())
248+
.task_type(options_.task_type())
249+
.enable_chunked_prefill(options_.enable_chunked_prefill())
250+
.enable_offline_inference(options_.enable_offline_inference())
251+
.spawn_worker_path(options_.spawn_worker_path())
252+
.enable_shm(options_.enable_shm())
253+
.is_local(options_.is_local())
254+
.enable_schedule_overlap(options_.enable_schedule_overlap())
255+
.master_node_addr(options_.master_node_addr())
256+
.nnodes(options_.nnodes())
257+
.node_rank(options_.node_rank())
258+
.dp_size(options_.dp_size())
259+
.ep_size(options_.ep_size())
260+
.max_seqs_per_batch(options_.max_seqs_per_batch())
261+
.max_tokens_per_chunk_for_prefill(
262+
options_.max_tokens_per_chunk_for_prefill());
263+
264+
engine_ = std::make_unique<RecEngine>(eng_options);
234265
} else {
235266
LOG(WARNING) << "Not supported llm engine type: "
236267
<< static_cast<size_t>(type);
@@ -246,6 +277,9 @@ std::unique_ptr<Master> create_master(const std::string& backend,
246277
} else if (backend == "dit") {
247278
LOG(INFO) << "creating dit master";
248279
return std::make_unique<DiTMaster>(options);
280+
} else if (backend == "rec") {
281+
LOG(INFO) << "creating rec master";
282+
return std::make_unique<RecMaster>(options);
249283
} else {
250284
LOG(FATAL) << "Failed to create master, backend is" << backend;
251285
return nullptr;

0 commit comments

Comments
 (0)