Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions fastdeploy/cache_manager/cache_messager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""Module for Hackathon 10th Spring No.46.
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
Expand All @@ -18,6 +18,8 @@
import json
import math
import queue
import sys
import tempfile
import threading
import time
import traceback
Expand Down Expand Up @@ -163,7 +165,8 @@ def __init__(
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (pod_ip, engine_worker_queue_port)
else:
address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock"
self.engine_worker_queue = EngineWorkerQueue(
address=address,
is_server=False,
Expand Down Expand Up @@ -505,7 +508,8 @@ def __init__(
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (pod_ip, engine_worker_queue_port)
else:
address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock"
self.engine_worker_queue = EngineWorkerQueue(
address=address,
is_server=False,
Expand Down
8 changes: 5 additions & 3 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""Module for Hackathon 10th Spring No.46.
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
Expand Down Expand Up @@ -335,7 +335,8 @@ def launch_cache_manager(
+ f" >{log_dir}/cache_manager_{int(device_ids[i])}.log 2>&1"
)
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
_popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid}
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs))

logger.info("PrefixCacheManager is waiting for cache transfer manager to be initialized.")
while np.sum(self.cache_transfer_inited_signal.value) != tensor_parallel_size:
Expand Down Expand Up @@ -430,7 +431,8 @@ def launch_cache_messager(
+ f" >{log_dir}/cache_messager_{i}.log 2>&1"
)
logger.info(f"Launch cache messager, command:{launch_cmd}")
cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
_popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid}
cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs))

logger.info("Waiting for cache ready...")
while np.sum(self.cache_ready_signal.value) != tensor_parallel_size:
Expand Down
32 changes: 22 additions & 10 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""Module for Hackathon 10th Spring No.46.
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
Expand Down Expand Up @@ -26,6 +26,7 @@
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
Expand Down Expand Up @@ -469,10 +470,13 @@ def start_worker_queue_service(self, start_queue):
engine_worker_queue_address = (self.cfg.master_ip, self.cfg.parallel_config.local_engine_worker_queue_port)
engine_cache_queue_address = (self.cfg.master_ip, self.cfg.cache_config.local_cache_queue_port)
else:
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
engine_worker_queue_address = (
f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock"
f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock"
)
engine_cache_queue_address = (
f"{_shm_dir}/fd_task_queue_{self.cfg.cache_config.local_cache_queue_port}.sock"
)
engine_cache_queue_address = f"/dev/shm/fd_task_queue_{self.cfg.cache_config.local_cache_queue_port}.sock"

if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
if start_queue:
Expand Down Expand Up @@ -2275,8 +2279,11 @@ def _exit_sub_services(self):
if hasattr(self, "worker_proc") and self.worker_proc is not None:
self.llm_logger.info("Cleaning up worker processes...")
try:
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
else:
self.worker_proc.terminate()
except Exception as e:
self.llm_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")

Expand All @@ -2288,8 +2295,11 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
self.llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
else:
p.terminate()
except Exception as e:
self.llm_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
Expand Down Expand Up @@ -2586,7 +2596,6 @@ def _start_worker_service(self):
"moe_gate_fp32": self.cfg.model_config.moe_gate_fp32,
"enable_entropy": self.cfg.model_config.enable_entropy,
"enable_overlap_schedule": self.cfg.scheduler_config.enable_overlap_schedule,
"enable_flashinfer_allreduce_fusion": self.cfg.parallel_config.enable_flashinfer_allreduce_fusion,
}
for worker_flag, value in worker_store_true_flag.items():
if value:
Expand All @@ -2608,7 +2617,7 @@ def _start_worker_service(self):
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
**({} if sys.platform == "win32" else {"preexec_fn": os.setsid}),
)
return p

Expand Down Expand Up @@ -2676,7 +2685,10 @@ def launch_components(self):
int(self.cfg.parallel_config.engine_worker_queue_port[i]),
)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = (
f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
)

self.llm_logger.info(f"dp start queue service {address}")
self.dp_engine_worker_queue_server.append(
Expand Down
91 changes: 30 additions & 61 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""Module for Hackathon 10th Spring No.46.
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
Expand All @@ -24,6 +24,7 @@
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
Expand All @@ -44,11 +45,6 @@
from fastdeploy.engine.expert_service import start_data_parallel_service
from fastdeploy.engine.request import Request
from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal
from fastdeploy.logger.request_logger import (
RequestLogLevel,
log_request,
log_request_error,
)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.platforms import current_platform
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger
Expand Down Expand Up @@ -290,7 +286,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
# Create Request struct after processing
request = Request.from_dict(task)
request.metrics.scheduler_recv_req_time = time.time()
log_request(RequestLogLevel.CONTENT, message="Receive request {request}", request=request)
llm_logger.info(f"Receive request {request}")
request.metrics.preprocess_start_time = time.time()

request.prompt_token_ids_len = len(request.prompt_token_ids)
Expand All @@ -309,20 +305,12 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Input text is too long, length of prompt token({input_ids_len}) "
f"+ min_dec_len ({min_tokens}) >= max_model_len "
)
log_request_error(
message="request[{request_id}] error: {error}",
request_id=request.get("request_id"),
error=error_msg,
)
llm_logger.error(error_msg)
raise EngineError(error_msg, error_code=400)

if input_ids_len > self.cfg.model_config.max_model_len:
error_msg = f"Length of input token({input_ids_len}) exceeds the limit max_model_len({self.cfg.model_config.max_model_len})."
log_request_error(
message="request[{request_id}] error: {error}",
request_id=request.get("request_id"),
error=error_msg,
)
llm_logger.error(error_msg)
raise EngineError(error_msg, error_code=400)

if request.get("stop_seqs_len") is not None:
Expand All @@ -333,11 +321,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})."
"Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`"
)
log_request_error(
message="request[{request_id}] error: {error}",
request_id=request.get("request_id"),
error=error_msg,
)
llm_logger.error(error_msg)
raise EngineError(error_msg, error_code=400)
stop_seqs_max_len = envs.FD_STOP_SEQS_MAX_LEN
for single_stop_seq_len in stop_seqs_len:
Expand All @@ -346,11 +330,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})."
"Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`"
)
log_request_error(
message="request[{request_id}] error: {error}",
request_id=request.get("request_id"),
error=error_msg,
)
llm_logger.error(error_msg)
raise EngineError(error_msg, error_code=400)

if self._has_guided_input(request):
Expand All @@ -363,22 +343,14 @@ def add_requests(self, task, sampling_params=None, **kwargs):
request, err_msg = self.guided_decoding_checker.schema_format(request)

if err_msg is not None:
log_request_error(
message="request[{request_id}] error: {error}",
request_id=request.get("request_id"),
error=err_msg,
)
llm_logger.error(err_msg)
raise EngineError(err_msg, error_code=400)

request.metrics.preprocess_end_time = time.time()
request.metrics.scheduler_recv_req_time = time.time()
self.engine.scheduler.put_requests([request])
log_request(
RequestLogLevel.STAGES,
message="Cache task with request_id ({request_id})",
request_id=request.get("request_id"),
)
log_request(RequestLogLevel.FULL, message="cache task: {request}", request=request)
llm_logger.info(f"Cache task with request_id ({request.get('request_id')})")
llm_logger.debug(f"cache task: {request}")

def _worker_processes_ready(self):
"""
Expand Down Expand Up @@ -465,8 +437,11 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
else:
p.terminate()
except Exception as e:
console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
Expand All @@ -479,8 +454,11 @@ def _exit_sub_services(self):

if hasattr(self, "worker_proc") and self.worker_proc is not None:
try:
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
else:
self.worker_proc.terminate()
except Exception as e:
console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")

Expand Down Expand Up @@ -700,7 +678,6 @@ def _start_worker_service(self):
"enable_entropy": self.cfg.model_config.enable_entropy,
"ep_prefill_use_worst_num_tokens": self.cfg.parallel_config.ep_prefill_use_worst_num_tokens,
"enable_overlap_schedule": self.cfg.scheduler_config.enable_overlap_schedule,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 enable_flashinfer_allreduce_fusion 已从此处 worker_store_true_flagworker_process.py 的 argparse 参数中删除,但 model_executor/layers/normalization.pylinear.py 仍从 config 读取该字段控制融合行为。

这导致 flag 无法再从 engine 侧传递到 worker 子进程命令行,model_executor 层将始终以 config 的默认值(False)运行,若用户通过 EngineArgs 开启了该 flag,实际效果将静默失效

建议:如果是有意移除,需同步删除 config.pyargs_utils.pynormalization.pylinear.py 中的相关代码;如果不是有意移除,则需恢复这三处删除。

"enable_flashinfer_allreduce_fusion": self.cfg.parallel_config.enable_flashinfer_allreduce_fusion,
}
for worker_flag, value in worker_store_true_flag.items():
if value:
Expand All @@ -722,7 +699,7 @@ def _start_worker_service(self):
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
**({} if sys.platform == "win32" else {"preexec_fn": os.setsid}),
)
return p

Expand Down Expand Up @@ -761,16 +738,11 @@ def generate(self, prompts, stream):
Yields:
dict: The generated response.
"""
log_request(RequestLogLevel.CONTENT, message="Starting generation for prompt: {prompts}", prompts=prompts)
llm_logger.info(f"Starting generation for prompt: {prompts}")
try:
req_id = self._format_and_add_data(prompts)
except Exception as e:
log_request_error(
message="request[{request_id}] error while adding request: {error}, {traceback}",
request_id=prompts.get("request_id"),
error=str(e),
traceback=traceback.format_exc(),
)
llm_logger.error(f"Error happened while adding request, details={e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)

# Get the result of the current request
Expand All @@ -789,7 +761,7 @@ def generate(self, prompts, stream):
output = self.engine.data_processor.process_response_dict(
result.to_dict(), stream=False, include_stop_str_in_output=False, direct_decode=not stream
)
log_request(RequestLogLevel.FULL, message="Generate result: {output}", output=output)
llm_logger.debug(f"Generate result: {output}")
if not stream:
yield output
else:
Expand Down Expand Up @@ -865,7 +837,10 @@ def launch_components(self):
int(self.cfg.parallel_config.engine_worker_queue_port[i]),
)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = (
f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
)

llm_logger.info(f"dp start queue service {address}")
self.dp_engine_worker_queue_server.append(
Expand All @@ -876,7 +851,7 @@ def launch_components(self):
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)
)
ctx = multiprocessing.get_context("fork")
ctx = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")
cfg = copy.deepcopy(self.cfg)
self.dp_processed.append(
ctx.Process(
Expand All @@ -893,14 +868,8 @@ def launch_components(self):
+ f" data parallel id {i}"
)
self.dp_processed[-1].start()

for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
):

while self.launched_expert_service_signal.value[i] == 0:
time.sleep(0.1)
time.sleep(1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug dp worker 初始化逻辑从「并行启动→批量等待」被意外改为「串行启动→逐个等待」。

原代码设计:先 start() 所有 dp 子进程(允许并发加载权重),再独立循环等待所有就绪,总耗时约等于最慢进程的时间。

改动后:每启动一个进程都立即等待其就绪,多 dp 场景总启动耗时线性增加为所有进程初始化时间之和。

建议恢复原始两循环结构:

for i in range(1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode):
    # ... 创建并 start() 进程
    self.dp_processed[-1].start()

for i in range(1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode):
    while self.launched_expert_service_signal.value[i] == 0:
        time.sleep(0.1)


def check_worker_initialize_status(self):
"""
Expand Down
Loading
Loading