Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.

Commit 2d4299c

Browse files
committed
Refactor task logger setup and TaskLogHandler
- Moved task logger setup from frinx_conductor_wrapper to task_logging. - Refactored the method to set task-specific information for threads.
1 parent b97af5d commit 2d4299c

2 files changed

Lines changed: 17 additions & 17 deletions

File tree

frinx/client/v2/frinx_conductor_wrapper.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,13 @@
2424

2525
from frinx.client.v2.conductor import WFClientMgr
2626
from frinx.common.frinx_rest import CONDUCTOR_URL_BASE
27-
from frinx.common.logging.task_logging import TaskLogHandler
27+
from frinx.common.logging.task_logging import task_log_handler
2828
from frinx.common.worker.worker import WorkerImpl
2929

3030
logger = logging.getLogger(__name__)
3131
hostname = socket.gethostname()
3232
RawTaskIO: TypeAlias = dict[str, Any]
3333

34-
task_logger = logging.getLogger('task_logger')
35-
task_logger.propagate = False
36-
task_log_handler = TaskLogHandler(max_capacity=100, max_message_length=15000)
37-
task_logger.addHandler(task_log_handler)
38-
3934

4035
@dataclass
4136
class RegisteredWorkerTask:
@@ -244,7 +239,7 @@ def register(self, task_blueprint: WorkerImpl) -> None:
244239

245240
def execute(self, task: RawTaskIO, task_blueprint: WorkerImpl) -> None:
246241
try:
247-
task_log_handler.set_taskname_for_thread(task['taskType'])
242+
task_log_handler.set_task_info_for_thread(str(task['taskType']), str(task['workflowInstanceId']))
248243
logger.info('Executing a task %s', task['taskId'])
249244
resp = task_blueprint.execute_wrapper(task)
250245

frinx/common/logging/task_logging.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ def __init__(self, max_capacity: int = 100, max_message_length: int = 15000, lev
2323
self.max_message_length: int = max_message_length
2424
self.thread_data = threading.local()
2525
self.thread_data.log_queue = deque(maxlen=self.max_capacity)
26-
self.thread_data.task_name = 'Unknown'
2726
self.formatter = logging.Formatter('%(levelname)s: %(message)s')
2827
self.console_formatter = logging.Formatter(
29-
'%(asctime)s | %(threadName)s | %(levelname)s | task: %(taskname)s | %(message)s', datefmt='%F %T')
28+
'%(asctime)s | %(threadName)s | %(levelname)s | %(task_info)s | %(message)s',
29+
datefmt='%F %T')
3030
self.console_handler = logging.StreamHandler()
3131
self.console_handler.setFormatter(self.console_formatter)
3232

@@ -38,10 +38,7 @@ def emit(self, record: logging.LogRecord) -> None:
3838
if not hasattr(self.thread_data, 'log_queue'):
3939
self._setup_thread_logging()
4040

41-
if not hasattr(self.thread_data, 'task_name'):
42-
self.thread_data.task_name = 'Unknown'
43-
44-
record.taskname = self.thread_data.task_name
41+
record.task_info = getattr(self.thread_data, 'task_info', 'Unknown')
4542

4643
formatted_record: str = self.format(record)
4744
truncated_record: str = self._truncate_message(formatted_record)
@@ -63,11 +60,12 @@ def _setup_thread_logging(self) -> None:
6360
"""
6461
self.thread_data.log_queue = deque(maxlen=self.max_capacity)
6562

66-
def set_taskname_for_thread(self, task_name: str) -> None:
63+
def set_task_info_for_thread(self, *args: str) -> None:
6764
"""
68-
Set the task name for the current thread.
65+
Set task-specific information for the current thread.
6966
"""
70-
self.thread_data.task_name = task_name
67+
delimiter: str = ' '
68+
self.thread_data.task_info = delimiter.join(str(arg) for arg in args)
7169

7270
def get_logs(self, clear: bool = True) -> list[str]:
7371
"""
@@ -89,4 +87,11 @@ def _clear_taskname_for_thread(self) -> None:
8987
Clear the task name for the current thread.
9088
"""
9189
if hasattr(self.thread_data, 'task_name'):
92-
del self.thread_data.task_name
90+
del self.thread_data.task_name
91+
92+
93+
# Task logger setup
94+
task_log_handler = TaskLogHandler(max_capacity=100, max_message_length=15000)
95+
task_logger = logging.getLogger('task_logger')
96+
task_logger.addHandler(task_log_handler)
97+
task_logger.propagate = False

0 commit comments

Comments
 (0)