From 5a2f8dd8da0e2897a415fcdf2c4fa7ccf76c8b67 Mon Sep 17 00:00:00 2001 From: boby-cloudforge Date: Sun, 3 May 2026 15:45:13 +0200 Subject: [PATCH] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No.46?= =?UTF-8?q?=E3=80=91Windows=20Python=20runtime=20guards?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastdeploy/cache_manager/cache_messager.py | 10 +- .../cache_manager/prefix_cache_manager.py | 8 +- fastdeploy/engine/common_engine.py | 32 +++++-- fastdeploy/engine/engine.py | 91 ++++++------------- fastdeploy/engine/expert_service.py | 14 ++- fastdeploy/eplb/async_expert_loader.py | 9 +- fastdeploy/inter_communicator/fmq.py | 6 +- fastdeploy/inter_communicator/zmq_client.py | 7 +- fastdeploy/inter_communicator/zmq_server.py | 12 ++- fastdeploy/worker/worker_process.py | 13 +-- 10 files changed, 101 insertions(+), 101 deletions(-) diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index 452d2d795cc..4ebb6fff03a 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -18,6 +18,8 @@ import json import math import queue +import sys +import tempfile import threading import time import traceback @@ -163,7 +165,8 @@ def __init__( if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: address = (pod_ip, engine_worker_queue_port) else: - address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock" self.engine_worker_queue = EngineWorkerQueue( address=address, is_server=False, @@ -505,7 +508,8 @@ def __init__( if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: address = (pod_ip, engine_worker_queue_port) else: - address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock" self.engine_worker_queue = EngineWorkerQueue( address=address, is_server=False, diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index b1e79834d92..a42cceea8fd 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -335,7 +335,8 @@ def launch_cache_manager( + f" >{log_dir}/cache_manager_{int(device_ids[i])}.log 2>&1" ) logger.info(f"Launch cache transfer manager, command:{launch_cmd}") - cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid)) + _popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid} + cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs)) logger.info("PrefixCacheManager is waiting for cache transfer manager to be initialized.") while np.sum(self.cache_transfer_inited_signal.value) != tensor_parallel_size: @@ -430,7 +431,8 @@ def launch_cache_messager( + f" >{log_dir}/cache_messager_{i}.log 2>&1" ) logger.info(f"Launch cache messager, command:{launch_cmd}") - cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid)) + _popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid} + cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs)) logger.info("Waiting for cache ready...") while np.sum(self.cache_ready_signal.value) != tensor_parallel_size: diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 75586d09a3e..dc6ba33428d 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -26,6 +26,7 @@ import signal import subprocess import sys +import tempfile import threading import time import traceback @@ -469,10 +470,13 @@ def start_worker_queue_service(self, start_queue): engine_worker_queue_address = (self.cfg.master_ip, self.cfg.parallel_config.local_engine_worker_queue_port) engine_cache_queue_address = (self.cfg.master_ip, self.cfg.cache_config.local_cache_queue_port) else: + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() engine_worker_queue_address = ( - f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock" + f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock" + ) + engine_cache_queue_address = ( + f"{_shm_dir}/fd_task_queue_{self.cfg.cache_config.local_cache_queue_port}.sock" ) - engine_cache_queue_address = f"/dev/shm/fd_task_queue_{self.cfg.cache_config.local_cache_queue_port}.sock" if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0": if start_queue: @@ -2275,8 +2279,11 @@ def _exit_sub_services(self): if hasattr(self, "worker_proc") and self.worker_proc is not None: self.llm_logger.info("Cleaning up worker processes...") try: - pgid = os.getpgid(self.worker_proc.pid) - os.killpg(pgid, signal.SIGTERM) + if sys.platform != "win32": + pgid = os.getpgid(self.worker_proc.pid) + os.killpg(pgid, signal.SIGTERM) + else: + self.worker_proc.terminate() except Exception as e: self.llm_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") @@ -2288,8 +2295,11 @@ def _exit_sub_services(self): for p in self.cache_manager_processes: self.llm_logger.info(f"Killing cache manager process {p.pid}") try: - pgid = os.getpgid(p.pid) - os.killpg(pgid, signal.SIGTERM) + if sys.platform != "win32": + pgid = os.getpgid(p.pid) + os.killpg(pgid, signal.SIGTERM) + else: + p.terminate() except Exception as e: self.llm_logger.error( f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" @@ -2586,7 +2596,6 @@ def _start_worker_service(self): "moe_gate_fp32": self.cfg.model_config.moe_gate_fp32, "enable_entropy": self.cfg.model_config.enable_entropy, "enable_overlap_schedule": self.cfg.scheduler_config.enable_overlap_schedule, - "enable_flashinfer_allreduce_fusion": self.cfg.parallel_config.enable_flashinfer_allreduce_fusion, } for worker_flag, value in worker_store_true_flag.items(): if value: @@ -2608,7 +2617,7 @@ def _start_worker_service(self): pd_cmd, stdout=subprocess.PIPE, shell=True, - preexec_fn=os.setsid, + **({} if sys.platform == "win32" else {"preexec_fn": os.setsid}), ) return p @@ -2676,7 +2685,10 @@ def launch_components(self): int(self.cfg.parallel_config.engine_worker_queue_port[i]), ) else: - address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + address = ( + f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" + ) self.llm_logger.info(f"dp start queue service {address}") self.dp_engine_worker_queue_server.append( diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 3fbc311f142..9f5cb140c48 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -24,6 +24,7 @@ import signal import subprocess import sys +import tempfile import threading import time import traceback @@ -44,11 +45,6 @@ from fastdeploy.engine.expert_service import start_data_parallel_service from fastdeploy.engine.request import Request from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal -from fastdeploy.logger.request_logger import ( - RequestLogLevel, - log_request, - log_request_error, -) from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.platforms import current_platform from fastdeploy.utils import EngineError, console_logger, envs, llm_logger @@ -290,7 +286,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): # Create Request struct after processing request = Request.from_dict(task) request.metrics.scheduler_recv_req_time = time.time() - log_request(RequestLogLevel.CONTENT, message="Receive request {request}", request=request) + llm_logger.info(f"Receive request {request}") request.metrics.preprocess_start_time = time.time() request.prompt_token_ids_len = len(request.prompt_token_ids) @@ -309,20 +305,12 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Input text is too long, length of prompt token({input_ids_len}) " f"+ min_dec_len ({min_tokens}) >= max_model_len " ) - log_request_error( - message="request[{request_id}] error: {error}", - request_id=request.get("request_id"), - error=error_msg, - ) + llm_logger.error(error_msg) raise EngineError(error_msg, error_code=400) if input_ids_len > self.cfg.model_config.max_model_len: error_msg = f"Length of input token({input_ids_len}) exceeds the limit max_model_len({self.cfg.model_config.max_model_len})." - log_request_error( - message="request[{request_id}] error: {error}", - request_id=request.get("request_id"), - error=error_msg, - ) + llm_logger.error(error_msg) raise EngineError(error_msg, error_code=400) if request.get("stop_seqs_len") is not None: @@ -333,11 +321,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})." "Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`" ) - log_request_error( - message="request[{request_id}] error: {error}", - request_id=request.get("request_id"), - error=error_msg, - ) + llm_logger.error(error_msg) raise EngineError(error_msg, error_code=400) stop_seqs_max_len = envs.FD_STOP_SEQS_MAX_LEN for single_stop_seq_len in stop_seqs_len: @@ -346,11 +330,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})." "Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`" ) - log_request_error( - message="request[{request_id}] error: {error}", - request_id=request.get("request_id"), - error=error_msg, - ) + llm_logger.error(error_msg) raise EngineError(error_msg, error_code=400) if self._has_guided_input(request): @@ -363,22 +343,14 @@ def add_requests(self, task, sampling_params=None, **kwargs): request, err_msg = self.guided_decoding_checker.schema_format(request) if err_msg is not None: - log_request_error( - message="request[{request_id}] error: {error}", - request_id=request.get("request_id"), - error=err_msg, - ) + llm_logger.error(err_msg) raise EngineError(err_msg, error_code=400) request.metrics.preprocess_end_time = time.time() request.metrics.scheduler_recv_req_time = time.time() self.engine.scheduler.put_requests([request]) - log_request( - RequestLogLevel.STAGES, - message="Cache task with request_id ({request_id})", - request_id=request.get("request_id"), - ) - log_request(RequestLogLevel.FULL, message="cache task: {request}", request=request) + llm_logger.info(f"Cache task with request_id ({request.get('request_id')})") + llm_logger.debug(f"cache task: {request}") def _worker_processes_ready(self): """ @@ -465,8 +437,11 @@ def _exit_sub_services(self): for p in self.cache_manager_processes: llm_logger.info(f"Killing cache manager process {p.pid}") try: - pgid = os.getpgid(p.pid) - os.killpg(pgid, signal.SIGTERM) + if sys.platform != "win32": + pgid = os.getpgid(p.pid) + os.killpg(pgid, signal.SIGTERM) + else: + p.terminate() except Exception as e: console_logger.error( f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" @@ -479,8 +454,11 @@ def _exit_sub_services(self): if hasattr(self, "worker_proc") and self.worker_proc is not None: try: - pgid = os.getpgid(self.worker_proc.pid) - os.killpg(pgid, signal.SIGTERM) + if sys.platform != "win32": + pgid = os.getpgid(self.worker_proc.pid) + os.killpg(pgid, signal.SIGTERM) + else: + self.worker_proc.terminate() except Exception as e: console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") @@ -700,7 +678,6 @@ def _start_worker_service(self): "enable_entropy": self.cfg.model_config.enable_entropy, "ep_prefill_use_worst_num_tokens": self.cfg.parallel_config.ep_prefill_use_worst_num_tokens, "enable_overlap_schedule": self.cfg.scheduler_config.enable_overlap_schedule, - "enable_flashinfer_allreduce_fusion": self.cfg.parallel_config.enable_flashinfer_allreduce_fusion, } for worker_flag, value in worker_store_true_flag.items(): if value: @@ -722,7 +699,7 @@ def _start_worker_service(self): pd_cmd, stdout=subprocess.PIPE, shell=True, - preexec_fn=os.setsid, + **({} if sys.platform == "win32" else {"preexec_fn": os.setsid}), ) return p @@ -761,16 +738,11 @@ def generate(self, prompts, stream): Yields: dict: The generated response. """ - log_request(RequestLogLevel.CONTENT, message="Starting generation for prompt: {prompts}", prompts=prompts) + llm_logger.info(f"Starting generation for prompt: {prompts}") try: req_id = self._format_and_add_data(prompts) except Exception as e: - log_request_error( - message="request[{request_id}] error while adding request: {error}, {traceback}", - request_id=prompts.get("request_id"), - error=str(e), - traceback=traceback.format_exc(), - ) + llm_logger.error(f"Error happened while adding request, details={e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) # Get the result of the current request @@ -789,7 +761,7 @@ def generate(self, prompts, stream): output = self.engine.data_processor.process_response_dict( result.to_dict(), stream=False, include_stop_str_in_output=False, direct_decode=not stream ) - log_request(RequestLogLevel.FULL, message="Generate result: {output}", output=output) + llm_logger.debug(f"Generate result: {output}") if not stream: yield output else: @@ -865,7 +837,10 @@ def launch_components(self): int(self.cfg.parallel_config.engine_worker_queue_port[i]), ) else: - address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + address = ( + f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" + ) llm_logger.info(f"dp start queue service {address}") self.dp_engine_worker_queue_server.append( @@ -876,7 +851,7 @@ def launch_components(self): local_data_parallel_size=self.cfg.parallel_config.data_parallel_size, ) ) - ctx = multiprocessing.get_context("fork") + ctx = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork") cfg = copy.deepcopy(self.cfg) self.dp_processed.append( ctx.Process( @@ -893,14 +868,8 @@ def launch_components(self): + f" data parallel id {i}" ) self.dp_processed[-1].start() - - for i in range( - 1, - self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, - ): - while self.launched_expert_service_signal.value[i] == 0: - time.sleep(0.1) + time.sleep(1) def check_worker_initialize_status(self): """ diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 5958b3d9bd3..9dd30bd533c 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -18,6 +18,7 @@ import os import signal +import sys import threading import time import traceback @@ -109,7 +110,7 @@ def start(self, ipc_signal_suffix, local_data_parallel_id): if envs.FD_ENABLE_RETURN_TEXT: self.engine.create_data_processor() if self.cfg.scheduler_config.name == "dp": - self.cfg.init_pd_info() + self.cfg.init_cache_info() self.engine.scheduler.start(local_data_parallel_id) if ipc_signal_suffix is not None: @@ -122,7 +123,7 @@ def start(self, ipc_signal_suffix, local_data_parallel_id): self.llm_logger.info(f"start expert service {local_data_parallel_id}") if self.cfg.scheduler_config.name == "splitwise": - self.cfg.init_pd_info() + self.cfg.init_cache_info() role = self.cfg.scheduler_config.splitwise_role host_ip = self.cfg.host_ip self.engine.scheduler.start(role, host_ip, self.cfg.register_info) @@ -201,8 +202,11 @@ def _exit_sub_services(self): for p in self.cache_manager_processes: self.llm_logger.info(f"Killing cache manager process {p.pid}") try: - pgid = os.getpgid(p.pid) - os.killpg(pgid, signal.SIGTERM) + if sys.platform != "win32": + pgid = os.getpgid(p.pid) + os.killpg(pgid, signal.SIGTERM) + else: + p.terminate() except Exception as e: console_logger.error( f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" diff --git a/fastdeploy/eplb/async_expert_loader.py b/fastdeploy/eplb/async_expert_loader.py index 79c7fd623cb..fd280cacaaa 100644 --- a/fastdeploy/eplb/async_expert_loader.py +++ b/fastdeploy/eplb/async_expert_loader.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,6 +16,8 @@ import ctypes import os +import sys +import tempfile import time import traceback from typing import List, Tuple @@ -76,6 +78,7 @@ MAIN_MODEL_REDUNDANT_SHM_SIZE = 5 MODEL_MAIN_NAME = "eplb_main" +_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, eplb_config: EPLBConfig, logger=None): @@ -92,7 +95,7 @@ def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, epl mmap_infos = {} for name in model_name: - expert_weight_file = f"/dev/shm/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}" + expert_weight_file = f"{_shm_dir}/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}" shm_size = main_size if not os.path.isfile(expert_weight_file): @@ -436,7 +439,7 @@ def load_model_weights_process( ) if success: model_name = MODEL_MAIN_NAME - file_path = f"/dev/shm/{model_name}_rank_{rank}_expert_weight_{shm_uuid}" + file_path = f"{_shm_dir}/{model_name}_rank_{rank}_expert_weight_{shm_uuid}" weight_infos = save_tensor_to_shm_mem(ep_loader.cached_weights, file_path, logger) logger.info( "redundant_expert: async load save_tensor_to_shm_mem, " diff --git a/fastdeploy/inter_communicator/fmq.py b/fastdeploy/inter_communicator/fmq.py index 9da40a1fe99..16157f0edff 100644 --- a/fastdeploy/inter_communicator/fmq.py +++ b/fastdeploy/inter_communicator/fmq.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -16,6 +16,8 @@ import asyncio import json +import sys +import tempfile import time import traceback import uuid @@ -79,7 +81,7 @@ class Endpoint: @dataclass class Config: - ipc_root: str = "/dev/shm" + ipc_root: str = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() io_threads: int = 1 copy: bool = False endpoints: Dict[str, Endpoint] = field(default_factory=dict) diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 9c7495fba72..6d55fade748 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -14,6 +14,8 @@ # limitations under the License. """ +import sys +import tempfile import time from abc import ABC, abstractmethod from multiprocessing.reduction import ForkingPickler @@ -152,7 +154,8 @@ class ZmqIpcClient(ZmqClientBase): def __init__(self, name, mode): self.name = name self.mode = mode - self.file_name = f"/dev/shm/{name}.socket" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + self.file_name = f"{_shm_dir}/{name}.socket" self.context = zmq.Context() self.socket = self.context.socket(self.mode) diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 19cddf64928..ee112276e94 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -15,6 +15,8 @@ """ import os +import sys +import tempfile import threading import time import traceback @@ -430,10 +432,11 @@ def __init__(self, name, mode): self.file_name = None return + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() if mode == zmq.PULL: - self.file_name = f"/dev/shm/{name}.socket" + self.file_name = f"{_shm_dir}/{name}.socket" elif mode == zmq.ROUTER: - self.file_name = f"/dev/shm/router_{name}.ipc" + self.file_name = f"{_shm_dir}/router_{name}.ipc" else: raise ValueError(f"Unsupported ZMQ mode: {mode}") self._create_socket() @@ -456,7 +459,8 @@ def _get_worker_push_socket(self, worker_pid): sock = self.context.socket(zmq.PUSH) sock.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM) sock.setsockopt(zmq.SNDTIMEO, -1) - address = f"ipc:///dev/shm/response_{self.push_name_prefix}_w{worker_pid}.pull" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + address = f"ipc://{_shm_dir}/response_{self.push_name_prefix}_w{worker_pid}.pull" sock.connect(address) self.worker_push_sockets[worker_pid] = sock self.worker_push_addresses[worker_pid] = address diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 28a943cf9d4..7a33fe36fc4 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -1,4 +1,4 @@ -""" +"""Module for Hackathon 10th Spring No.46. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" @@ -18,6 +18,8 @@ import asyncio import json import os +import sys +import tempfile import time import traceback from typing import Tuple @@ -726,7 +728,8 @@ def start_task_queue_service(self): self.parallel_config.local_engine_worker_queue_port, ) else: - task_address = f"/dev/shm/fd_task_queue_{self.parallel_config.local_engine_worker_queue_port}.sock" + _shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir() + task_address = f"{_shm_dir}/fd_task_queue_{self.parallel_config.local_engine_worker_queue_port}.sock" logger.info(f"connect task queue address {task_address}") self.task_queue = TaskQueue( address=task_address, @@ -833,12 +836,6 @@ def parse_args(): default=None, help="Configuration of SpeculativeConfig.", ) - parser.add_argument( - "--enable_flashinfer_allreduce_fusion", - action="store_true", - default=False, - help="Flag to enable all reduce fusion kernel in flashinfer.", - ) parser.add_argument( "--max_num_batched_tokens", type=int,