-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
206 lines (149 loc) · 4.87 KB
/
main.py
File metadata and controls
206 lines (149 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import pathlib
import click
from taskcrafter.logger import app_logger
from taskcrafter.util.file import get_file_content
from taskcrafter.job_loader import JobManager
from taskcrafter.hook_loader import HookManager
from taskcrafter.plugin_loader import plugin_list, init_plugins, plugin_lookup
from taskcrafter.scheduler import SchedulerManager
from taskcrafter.preview import (
rich_preview,
result_table,
plugin_info_preview,
plugin_list_preview,
)
from taskcrafter.config import app_config
from taskcrafter.util.validator import validate_hooks, validate_jobs, validate_schema
from taskcrafter.util.yaml import get_yaml_from_string
from taskcrafter.wizard import create_file_wizard
JOBS_FILE = "jobs/jobs.yaml"
schedulerManager: SchedulerManager = None
@click.group()
@click.option(
"--file",
"-f",
type=click.Path(),
default=JOBS_FILE,
help="Name of the jobs file (yaml).",
)
def cli(file: str = JOBS_FILE):
"""CLI for TaskCrafter."""
file_path = pathlib.Path(file)
if not file_path.is_file():
if not create_file_wizard(file_path):
exit(1)
app_config.jobs_file = file
@cli.command()
def help():
"""Display help information."""
ctx = click.get_current_context()
click.echo(cli.get_help(ctx))
click.echo("\n🔍 Examples:\n")
for command_name, command_obj in cli.commands.items():
if hasattr(command_obj, "commands"):
for subcommand_name in command_obj.commands:
click.echo(f" taskcrafter {command_name} {subcommand_name}")
else:
click.echo(f" taskcrafter {command_name}")
click.echo("\nℹ️ Use --help with any command to get more details.")
click.echo(" e.g., taskcrafter jobs run --help\n")
def validate_and_initialize(show_report: bool = False):
"""Reads file, validates schema, initializes plugins, and sets up managers."""
try:
file_content = get_file_content(app_config.jobs_file)
yaml = get_yaml_from_string(file_content)
validate_schema(yaml)
init_plugins(yaml)
jobManager = JobManager(file_content)
hookManager = HookManager(file_content, job_manager=jobManager)
validate_jobs(jobManager.jobs, show_report=show_report)
validate_hooks(hookManager.hooks, show_report=show_report)
except Exception as e:
app_logger.error(f"{e.__class__.__name__}: {e}")
return None, None
return jobManager, hookManager
def run_helper(job_id: str):
"""
Core logic for running jobs. Can be called programmatically.
"""
global schedulerManager
jobManager, hookManager = validate_and_initialize()
if jobManager is None or hookManager is None:
return
if job_id:
try:
job = jobManager.job_get_by_id(job_id)
except ValueError:
app_logger.error(f"Job {job_id} does not exist.")
jobManager.jobs = [job]
schedulerManager = SchedulerManager(
job_manager=jobManager, hook_manager=hookManager
)
for job in jobManager.jobs:
schedulerManager.schedule_job(job)
schedulerManager.start_scheduler()
result_table(jobManager.executed_jobs)
@click.group()
def jobs():
"""Manage jobs."""
@jobs.command()
@click.option("--job", "-j", "job_id", help="Name of the job.")
def run(job_id: str):
"""
Runs all jobs from YAML file. If a --job parameter is provided, it runs only that job.
Examples:
\b
taskcrafter jobs run
taskcrafter jobs run --job job1
"""
run_helper(job_id)
@jobs.command()
def validate():
"""Validate jobs from YAML file."""
jobManager, hookManager = validate_and_initialize(show_report=True)
if jobManager is None or hookManager is None:
return
@jobs.command()
def list():
"""List all jobs from YAML file."""
jobManager, hookManager = validate_and_initialize()
if jobManager is None or hookManager is None:
return
rich_preview(jobManager.jobs, hookManager.hooks)
@click.group()
def plugins():
"""Manage TaskCrafter plugins."""
validate_and_initialize()
@plugins.command("list")
def plugins_list():
"""
List all available plugins.
Examples:
\b
taskcrafter plugins list
"""
app_logger.info("Listing all available plugins...")
plugins = plugin_list()
if not plugins:
click.echo("No plugins found.")
app_logger.warning("No plugins found.")
return
plugin_list_preview(plugins)
@plugins.command("info")
@click.argument("name")
def plugin_info(name):
"""
Show detailed info about a specific plugin.
Examples:
\b
taskcrafter plugins info echo
"""
plugin = plugin_lookup(name)
if not plugin:
app_logger.error(f"Plugin {name} not found.")
return
plugin_info_preview(plugin)
cli.add_command(jobs)
cli.add_command(plugins)
if __name__ == "__main__":
cli()