-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler.py
More file actions
155 lines (127 loc) · 5.5 KB
/
scheduler.py
File metadata and controls
155 lines (127 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import schedule
import time
import threading
import logging
from datetime import datetime
import glob
class MarkemImagesCleanupScheduler:
"""Background scheduler to clean marked_images folder daily at 2 AM"""
def __init__(self, marked_images_dir="marked_images"):
self.marked_images_dir = marked_images_dir
self.is_running = False
self.scheduler_thread = None
self.setup_logging()
def setup_logging(self):
"""Setup logging for cleanup operations"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('cleanup_scheduler.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('MarkemImagesCleanupScheduler')
def cleanup_marked_images(self):
"""Clean up all files in the marked_images directory"""
try:
if not os.path.exists(self.marked_images_dir):
self.logger.info(f"Directory {self.marked_images_dir} does not exist. Creating it.")
os.makedirs(self.marked_images_dir, exist_ok=True)
return
# Get all files in the marked_images directory
files_pattern = os.path.join(self.marked_images_dir, "*")
files_to_delete = glob.glob(files_pattern)
if not files_to_delete:
self.logger.info(f"No files found in {self.marked_images_dir} directory.")
return
deleted_count = 0
failed_count = 0
for file_path in files_to_delete:
try:
if os.path.isfile(file_path):
os.remove(file_path)
deleted_count += 1
self.logger.info(f"Deleted file: {os.path.basename(file_path)}")
elif os.path.isdir(file_path):
# Skip subdirectories for safety
self.logger.warning(f"Skipped subdirectory: {os.path.basename(file_path)}")
except Exception as e:
failed_count += 1
self.logger.error(f"Failed to delete {os.path.basename(file_path)}: {str(e)}")
self.logger.info(f"Cleanup completed. Deleted: {deleted_count} files, Failed: {failed_count} files")
except Exception as e:
self.logger.error(f"Error during cleanup operation: {str(e)}")
def schedule_cleanup(self):
"""Schedule the cleanup to run daily at 2 AM"""
schedule.every().day.at("02:00").do(self.cleanup_marked_images)
self.logger.info("Scheduled daily cleanup at 2:00 AM")
def run_scheduler(self):
"""Run the scheduler in a background thread"""
self.logger.info("Starting cleanup scheduler...")
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
self.logger.info("Cleanup scheduler stopped")
def start(self):
"""Start the background scheduler"""
if self.is_running:
self.logger.warning("Scheduler is already running")
return
self.is_running = True
self.schedule_cleanup()
# Start scheduler in a background thread
self.scheduler_thread = threading.Thread(target=self.run_scheduler, daemon=True)
self.scheduler_thread.start()
self.logger.info("Background cleanup scheduler started successfully")
def stop(self):
"""Stop the background scheduler"""
if not self.is_running:
self.logger.warning("Scheduler is not running")
return
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
schedule.clear()
self.logger.info("Background cleanup scheduler stopped")
def run_cleanup_now(self):
"""Manually trigger cleanup operation"""
self.logger.info("Manual cleanup triggered")
self.cleanup_marked_images()
def get_status(self):
"""Get current scheduler status"""
return {
'is_running': self.is_running,
'next_run': str(schedule.next_run()) if schedule.jobs else None,
'total_jobs': len(schedule.jobs),
'marked_images_dir': self.marked_images_dir
}
# Global scheduler instance
cleanup_scheduler = MarkemImagesCleanupScheduler()
def start_cleanup_scheduler():
"""Function to start the cleanup scheduler"""
cleanup_scheduler.start()
def stop_cleanup_scheduler():
"""Function to stop the cleanup scheduler"""
cleanup_scheduler.stop()
def manual_cleanup():
"""Function to manually trigger cleanup"""
cleanup_scheduler.run_cleanup_now()
def get_scheduler_status():
"""Function to get scheduler status"""
return cleanup_scheduler.get_status()
if __name__ == "__main__":
# Test the scheduler
print("Starting cleanup scheduler test...")
cleanup_scheduler.start()
try:
# Keep the main thread alive for testing
while True:
status = cleanup_scheduler.get_status()
print(f"Scheduler Status: {status}")
time.sleep(30) # Print status every 30 seconds
except KeyboardInterrupt:
print("\nStopping scheduler...")
cleanup_scheduler.stop()
print("Scheduler stopped.")