Skip to content

Commit 7ec838d

Browse files
committed
adding logging lock file for debug purpose with file descriptor, avoiding race conditionst
1 parent 8d246d2 commit 7ec838d

File tree

1 file changed

+127
-24
lines changed

1 file changed

+127
-24
lines changed

py/torch_tensorrt/_utils.py

Lines changed: 127 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import tensorrt as trt
1414
import torch
1515

16-
logger = logging.getLogger(__name__)
16+
logger = logging.getLogger("torch_tensorrt")
1717

1818
_WHL_CPYTHON_VERSION = "cp310"
1919
_TENSORRT_LLM_VERSION_ = "0.17.0.post1"
@@ -149,7 +149,9 @@ def _extracted_dir_trtllm(platform_system: str, platform_machine: str) -> Path:
149149
)
150150

151151

152-
def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
152+
def extract_wheel_file(
153+
wheel_path: Path, extract_dir: Path, plugin_lib_path: Path
154+
) -> None:
153155
"""
154156
Safely extract a wheel file to a directory with a lock to prevent concurrent extraction.
155157
"""
@@ -162,19 +164,84 @@ def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
162164
logger.debug(
163165
f"[Rank {rank}] Starting extraction of {wheel_path} to {extract_dir}"
164166
)
167+
# If another job already finished earlier, skip immediately
168+
if plugin_lib_path.exists():
169+
logger.debug(
170+
f"[Rank {rank}] Plugin already present at {plugin_lib_path}, skipping extraction"
171+
)
172+
return
173+
174+
logger.debug(f"[Rank {rank}] Checking wheel file exists: {wheel_path.exists()}")
175+
165176
try:
166177
import zipfile
178+
179+
logger.debug(f"[Rank {rank}] Successfully imported zipfile")
167180
except ImportError as e:
168181
raise ImportError(
169182
"zipfile module is required but not found. Please install zipfile"
170183
)
171-
# Create lock file to signal extraction in progress
172-
extract_dir.mkdir(parents=True, exist_ok=False)
173-
lock_file.touch(exist_ok=False)
184+
185+
# Create extraction directory first (needed for lock file)
186+
extract_dir.mkdir(parents=True, exist_ok=True)
187+
188+
# Acquire lock atomically, portable across different platforms x86/arm/windows
189+
# Only one process should be able to create the lock file with O_EXCL
190+
logger.debug(f"[Rank {rank}] Attempting to acquire lock: {lock_file}")
191+
acquire_start_time = time.time()
192+
while True:
193+
try:
194+
# Re-check in case extractor finished while we waited
195+
if plugin_lib_path.exists():
196+
logger.debug(
197+
f"[Rank {rank}] Plugin appeared at {plugin_lib_path} during acquire, skipping extraction"
198+
)
199+
return
200+
lock_fd = os.open(str(lock_file), os.O_CREAT | os.O_EXCL | os.O_RDWR)
201+
logger.debug(f"[Rank {rank}] Successfully acquired lock")
202+
# write lock owner metadata for race condition time logging
203+
try:
204+
lock_info = f"pid={os.getpid()} host={platform.node()} rank={rank} start={time.time()}\n"
205+
os.write(lock_fd, lock_info.encode("utf-8"))
206+
os.fsync(lock_fd)
207+
except Exception:
208+
# Its fine if we fail to write metadata
209+
pass
210+
break
211+
except FileExistsError:
212+
if time.time() - acquire_start_time > 300:
213+
if not plugin_lib_path.exists():
214+
logger.warning(
215+
f"[Rank {rank}] Timed out waiting for extraction lock at {lock_file} (>300s) and plugin not present at {plugin_lib_path}"
216+
)
217+
raise TimeoutError(
218+
f"[Rank {rank}] Timed out acquiring extraction lock at {lock_file}"
219+
)
220+
time.sleep(0.5)
221+
222+
if plugin_lib_path.exists():
223+
logger.debug(
224+
f"[Rank {rank}] Plugin already present at {plugin_lib_path} after acquire, skipping extraction"
225+
)
226+
return
227+
# With lock held, perform extraction
228+
logger.debug(
229+
f"[Rank {rank}] Lock acquired, starting extraction from {wheel_path}"
230+
)
174231
try:
175232
with zipfile.ZipFile(wheel_path) as zip_ref:
176233
zip_ref.extractall(extract_dir)
177234
logger.debug(f"[Rank {rank}] Extraction complete: {extract_dir}")
235+
236+
# Delete wheel file after successful extraction (only Rank 0)
237+
try:
238+
wheel_path.unlink(missing_ok=True)
239+
logger.debug(f"[Rank {rank}] Deleted wheel file: {wheel_path}")
240+
except Exception as e:
241+
logger.warning(
242+
f"[Rank {rank}] Could not delete wheel file {wheel_path}: {e}"
243+
)
244+
178245
except FileNotFoundError as e:
179246
logger.error(f"[Rank {rank}] Wheel file not found at {wheel_path}: {e}")
180247
raise RuntimeError(
@@ -191,16 +258,63 @@ def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
191258
"Unexpected error during extraction of TensorRT-LLM wheel"
192259
) from e
193260
finally:
194-
# Remove lock file to signal completion
195-
lock_file.unlink(missing_ok=True)
261+
# Release lock, close file descriptorand remove lock file to signal completion
262+
try:
263+
os.close(lock_fd)
264+
except Exception as e:
265+
logger.debug(
266+
f"[Rank {rank}] Failed to close lock fd for {lock_file}: {e}",
267+
exc_info=e,
268+
)
269+
try:
270+
lock_file.unlink(missing_ok=True)
271+
except Exception as e:
272+
logger.debug(
273+
f"[Rank {rank}] Failed to unlink lock file {lock_file}: {e}",
274+
exc_info=e,
275+
)
196276

197277
else:
198-
# Other ranks wait for extraction to complete
199-
while lock_file.exists():
200-
logger.debug(
201-
f"[Rank {rank}] Waiting for extraction to finish at {extract_dir}..."
202-
)
278+
# Other ranks wait for extraction to complete.
279+
# only check lock file - don't check plugin existence during extraction
280+
# because plugin file may be partially written before extraction completes
281+
observed_lock = False
282+
wait_start_time = time.time()
283+
while True:
284+
if lock_file.exists():
285+
observed_lock = True
286+
logger.debug(
287+
f"[Rank {rank}] Waiting for extraction to finish at {extract_dir}..."
288+
)
289+
else:
290+
if observed_lock:
291+
# Lock was present and now gone -> extraction finished
292+
logger.debug(
293+
f"[Rank {rank}] Lock file removed, extraction complete"
294+
)
295+
break
296+
else:
297+
# Lock file never appeared - check if plugin already exists from previous run
298+
if plugin_lib_path.exists():
299+
logger.debug(
300+
f"[Rank {rank}] Plugin already exists from previous run, no extraction needed"
301+
)
302+
break
303+
# Lock not seen yet, keep waiting for Rank 0 to start
304+
logger.debug(
305+
f"[Rank {rank}] Waiting for extraction to start (no lock file yet)..."
306+
)
307+
203308
time.sleep(0.5)
309+
if time.time() - wait_start_time > 600:
310+
# 10 minute safeguard to avoid indefinite waits
311+
logger.warning(
312+
f"[Rank {rank}] Timed out (>600s) waiting for extraction to finish at {extract_dir}; "
313+
f"lock_present={lock_file.exists()} plugin_present={plugin_lib_path.exists()}"
314+
)
315+
raise TimeoutError(
316+
f"[Rank {rank}] Timed out waiting for extraction to finish at {extract_dir}"
317+
)
204318

205319

206320
def download_and_get_plugin_lib_path() -> Optional[str]:
@@ -251,18 +365,7 @@ def download_and_get_plugin_lib_path() -> Optional[str]:
251365
except OSError as e:
252366
logger.error(f"Local file write error: {e}")
253367

254-
extract_wheel_file(wheel_path, extract_dir)
255-
256-
try:
257-
wheel_path.unlink(missing_ok=True)
258-
logger.debug(f"Deleted wheel file: {wheel_path}")
259-
except Exception as e:
260-
logger.warning(f"Could not delete wheel file {wheel_path}: {e}")
261-
if not plugin_lib_path.exists():
262-
logger.error(
263-
f"Plugin library not found at expected location: {plugin_lib_path}"
264-
)
265-
return None
368+
extract_wheel_file(wheel_path, extract_dir, plugin_lib_path)
266369

267370
return str(plugin_lib_path)
268371

0 commit comments

Comments
 (0)