Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,18 +1348,28 @@ def send_worker_heartbeat(worker_id: str) -> bool:
def heartbeat_worker_thread(worker_id: str, stop_event: threading.Event, interval: int = 20):
"""Background thread that sends heartbeats every 20 seconds."""

headless_logger.essential(f"✅ Starting heartbeat thread for worker: {worker_id}")
try:
# Use print initially in case headless_logger isn't ready yet
print(f"[HEARTBEAT THREAD] Started for worker: {worker_id}")

while not stop_event.wait(interval):
try:
if send_worker_heartbeat(worker_id):
headless_logger.debug(f"✅ [HEARTBEAT] ✅ Sent for worker {worker_id}")
else:
headless_logger.error(f"❌ [HEARTBEAT] Failed for worker {worker_id}")
except Exception as e:
headless_logger.error(f"[HEARTBEAT THREAD] Exception: {e}")
while not stop_event.wait(interval):
try:
print(f"[HEARTBEAT THREAD] Sending heartbeat for {worker_id}...")
if send_worker_heartbeat(worker_id):
print(f"[HEARTBEAT THREAD] ✅ Sent for worker {worker_id}")
else:
print(f"[HEARTBEAT THREAD] ❌ Failed for worker {worker_id}")
except Exception as e:
print(f"[HEARTBEAT THREAD] Exception during heartbeat: {e}")
import traceback
traceback.print_exc()

headless_logger.essential(f"🛑 Heartbeat thread stopped for worker: {worker_id}")
print(f"[HEARTBEAT THREAD] Stopped for worker: {worker_id}")

except Exception as e:
print(f"[HEARTBEAT THREAD] FATAL ERROR: {e}")
import traceback
traceback.print_exc()


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1562,6 +1572,8 @@ def close(self):
print(f"Polling interval: {cli_args.poll_interval} seconds.")

# Start heartbeat thread if worker ID is provided
print(f"[DEBUG] Heartbeat check: worker_id='{cli_args.worker}', db_type='{db_ops.DB_TYPE}'")

if cli_args.worker and db_ops.DB_TYPE == "supabase":
print(f"Starting heartbeat thread for worker: {cli_args.worker}")
heartbeat_thread = threading.Thread(
Expand All @@ -1571,8 +1583,15 @@ def close(self):
)
heartbeat_thread.start()
print(f"✅ Heartbeat thread started (20-second intervals)")

# Verify thread is actually running
if heartbeat_thread.is_alive():
print(f"✅ Heartbeat thread confirmed alive")
else:
print(f"❌ Heartbeat thread failed to start!")

elif cli_args.worker:
print(f"⚠️ Worker ID provided but SQLite mode - heartbeat not needed")
print(f"⚠️ Worker ID provided ({cli_args.worker}) but DB type is {db_ops.DB_TYPE} - heartbeat not needed")
else:
print(f"⚠️ No worker ID provided - heartbeat disabled")

Expand Down