-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtasks.py
More file actions
145 lines (118 loc) · 3.25 KB
/
tasks.py
File metadata and controls
145 lines (118 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
tasks to kick-off the project
"""
from invoke import task
from ai21_celery import signature
@task
def start_services(tsk):
"""
starts a Redis broker and an instance of Flower
"""
tsk.run("docker-compose up -d --no-recreate", pty=True)
print("Celery Flower is up at http://localhost:8888")
@task
def stop_services(tsk, delete_data=False):
"""
stops the dependent services
"""
with_volume_data = "-v" if delete_data else ""
tsk.run(f"docker compose down {with_volume_data}")
@task(pre=[start_services])
def run_worker(tsk):
"""
starts the connectors_sync_worker Celery worker process. Also, ai21-celery adds a /metrics endpoint for prometheus
"""
tsk.run(
'PORT=8084 CELERY_URL="celery://localhost/connectors_sync_worker" celery -A '
"connectors_sync_worker.tasks worker -n connectors_sync_worker --loglevel=INFO"
)
@task(iterable=["args"])
def call(tsk, task, args):
"""
calls a given celery task with given args
"""
sig = signature(task, url="celery://localhost/connectors_sync_worker")
task_execution = sig.apply_async(args=tuple(args), expires=30)
print(f"Started task: {task_execution.id}")
print(f"Status: {task_execution.status}")
print(f"Result: {task_execution.get()}")
print(f"Status: {task_execution.status}")
@task
@task
def formatter(tsk, fix=False):
"""
python format
"""
auto_fix = "" if fix else "--diff"
cmd = " && ".join(
[
f"ruff format . {auto_fix}",
]
)
tsk.run(cmd, echo=True, pty=True)
@task
def lint(tsk, fix=False):
"""
python lint
"""
auto_fix = "--fix" if fix else ""
cmd = " && ".join(
[
f"ruff check connectors_sync_worker/ tests/ {auto_fix}",
]
)
tsk.run(cmd, echo=True, pty=True)
@task(optional=["coverage"], help={"coverage": "[true|false]"})
def test(tsk, coverage=False):
"""
run unit tests
"""
cov = "--cov --cov-report=term-missing" if coverage else ""
cmd = f"poetry run pytest {cov}"
tsk.run(cmd, echo=True, pty=True)
@task
def audit(tsk):
"""
run audit check on the dependent packages
"""
cmd = "safety check --full-report"
tsk.run(cmd, echo=True, pty=True)
@task
def staticcheck(tsk):
"""
run static check on the projects files
"""
cmd = "mypy connectors_sync_worker tests"
tsk.run(cmd, echo=True, pty=True)
@task
def build(tsk):
"""
generate a package for connectors_sync_worker
"""
cmd = "poetry build"
tsk.run(cmd, echo=True, pty=True)
@task
def update(tsk):
"""
update outdated packages
"""
cmd = "poetry update"
tsk.run(cmd, echo=True, pty=True)
@task
def outdated(tsk):
"""
update outdated packages
"""
cmd = "poetry show --outdated --top-level"
tsk.run(cmd, echo=True, pty=True)
@task
def build_image(tsk):
"""
build docker image for connectors_sync_worker
"""
cmd = (
"gcloud auth print-access-token | sed 's/\\(.*\\)/machine us-python.pkg.dev\\nlogin oauth2accesstoken\\npassword \\1/g' > .netrc &&"
"chmod 600 .netrc && "
"docker buildx build --secret=id=key,src=${HOME}/.ssh/id_rsa --secret=id=netrc,src=.netrc ."
)
tsk.run(cmd, echo=True, pty=True)