Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion bec_lib/bec_lib/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import os
import time
import warnings
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -163,13 +164,19 @@ def compile_file_components(
return (file_path_component, file_extension)


def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: bool = True) -> str:
def get_full_path(
scan_status_msg: ScanStatusMessage,
name: str,
create_dir: bool = True,
log_if_dir_does_not_exist: bool = True,
) -> str:
"""Get the full file path for a given scan status message and additional name.

Args:
scan_status_msg (ScanStatusMessage): Scan status message
name (str): Additional name (i.e. device name) to add to the file path
create_dir (bool, optional): Create the directory if it does not exist. Defaults to True.
log_if_dir_does_not_exist (bool, optional): Log a warning if the directory does not exist. Defaults to True.
"""
Comment thread
wakonig marked this conversation as resolved.

if name == "":
Expand All @@ -192,10 +199,33 @@ def get_full_path(scan_status_msg: ScanStatusMessage, name: str, create_dir: boo
# Compile full file path
full_path = f"{file_base_path}_{name}.{file_extension}"
if create_dir:
if log_if_dir_does_not_exist and not os.path.exists(os.path.dirname(full_path)):
logger.warning(f"Directory {os.path.dirname(full_path)} does not exist. Creating it.")
os.makedirs(os.path.dirname(full_path), exist_ok=True)
return full_path
Comment thread
wakonig marked this conversation as resolved.


def wait_for_directory(path: str, timeout: float = 10.0, interval: float = 0.1) -> None:
"""
Wait for a directory to be created.

Args:
path (str): Path to the directory to wait for.
timeout (float, optional): Maximum time to wait in seconds. Defaults to 10.
interval (float, optional): Time to wait between checks in seconds. Defaults to 0.1.

Raises:
FileWriterError: If the timeout is reached before the directory is created.

"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if os.path.isdir(path):
return
time.sleep(interval)
raise FileWriterError(f"Timeout reached while waiting for directory {path} to be created.")


class FileWriter:
"""FileWriter for creating file paths and directories for services and devices."""

Expand Down
27 changes: 27 additions & 0 deletions bec_lib/tests/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# pylint: skip-file
import os
import threading
import time
from unittest import mock

import pytest
Expand All @@ -15,6 +17,7 @@
LogWriter,
compile_file_components,
get_full_path,
wait_for_directory,
)
from bec_lib.messages import ScanStatusMessage
from bec_lib.tests.utils import ConnectorMock
Expand Down Expand Up @@ -259,6 +262,30 @@ def test_compile_file_components_valid_paths(kwargs, expected_path, description)
assert file_path == expected_path, description


def test_wait_for_directory_returns_when_directory_appears(tmpdir):
"""wait_for_directory should stop polling once the directory exists."""
dir_path = tmpdir.join("created-later")

def _create_directory():
time.sleep(0.02)
dir_path.mkdir()

creator = threading.Thread(target=_create_directory)
creator.start()
try:
wait_for_directory(str(dir_path), timeout=1.0, interval=0.01)
finally:
creator.join()


def test_wait_for_directory_raises_on_timeout(tmpdir):
"""wait_for_directory should raise when the directory never appears."""
dir_path = tmpdir.join("never-created")

with pytest.raises(FileWriterError, match="Timeout reached while waiting for directory"):
wait_for_directory(str(dir_path), timeout=0.05, interval=0.01)


@pytest.mark.parametrize(
"scan_info",
[
Expand Down
8 changes: 6 additions & 2 deletions bec_server/bec_server/file_writer/file_writer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def update_scan_storage_with_status(self, msg: messages.ScanStatusMessage) -> No
if status == "open" and not scan_storage.start_time:
scan_storage.start_time = msg.content.get("timestamp")
scan_storage.async_writer = AsyncWriter(
get_full_path(scan_status_msg=msg, name="master"),
get_full_path(scan_status_msg=msg, name="master", log_if_dir_does_not_exist=False),
Comment thread
wakonig marked this conversation as resolved.
scan_id=scan_id,
scan_number=msg.scan_number,
connector=self.connector,
Expand Down Expand Up @@ -391,7 +391,11 @@ def write_file(self, scan_id: str) -> None:
start_time = time.time()

try:
file_path = get_full_path(scan_status_msg=storage.status_msg, name=file_suffix)
file_path = get_full_path(
scan_status_msg=storage.status_msg,
name=file_suffix,
log_if_dir_does_not_exist=False,
)
Comment thread
wakonig marked this conversation as resolved.
successful = True

# If we've already written device data, we need to append to the file
Expand Down
Loading