Skip to content

Commit b7fc243

Browse files
committed
Centralize connection creation
1 parent 2a21efb commit b7fc243

8 files changed

Lines changed: 87 additions & 21 deletions

File tree

examples/workflows_python/__init__.py

Whitespace-only changes.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""This module shows a simple workflow definition in Python."""
2+
from salt.framework import workflow, task
3+
4+
5+
class ExampleWorkflow(workflow.Workflow):
6+
@task.task
7+
def my_first_task(self):
8+
print("Hello!")
9+
10+
@task.task
11+
def my_second_task(self):
12+
print("World!")
13+
14+
def graph(self):
15+
self.my_first_task >> self.my_second_task
16+
17+
18+
def main():
19+
ExampleWorkflow().run()
20+
21+
22+
if __name__ == "__main__":
23+
ExampleWorkflow().run()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""This module shows a Workflow with Tasks that return values."""
2+
from salt.framework import workflow, task
3+
4+
5+
class ExampleWorkflow(workflow.Workflow):
6+
@task.task
7+
def my_first_task(self):
8+
print("Hello!")
9+
10+
@task.task
11+
def my_second_task(self):
12+
print("World!")
13+
14+
def graph(self):
15+
self.my_first_task >> self.my_second_task
16+
17+
18+
def main():
19+
ExampleWorkflow().run()
20+
21+
22+
if __name__ == "__main__":
23+
ExampleWorkflow().run()

src/salt/backend/scheduler/scheduler.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
import uuid
66

7-
import redis
87
import pika
98

109
from apscheduler.schedulers.background import BackgroundScheduler
@@ -18,11 +17,7 @@
1817

1918
class Scheduler:
2019
def __init__(self):
21-
self._redis = redis.Redis(
22-
host=settings.redis_settings.redis_url,
23-
port=settings.redis_settings.redis_port,
24-
db=settings.redis_settings.redis_workflow_db,
25-
)
20+
self._redis = settings.redis_settings.redis_client
2621

2722
def loop(self):
2823
sched = BackgroundScheduler()

src/salt/backend/worker/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,19 @@ def my_dynamic_task
4141
```
4242

4343
Only the first one will be executed on the Worker that picked the workflow, while the others will be distributed.
44+
45+
# Task & Workflow Execution
46+
The Workflow Worker is the component that takes care of initializing, monitoring and concluding the execution of an entire workflow.
47+
48+
Tasks will be queued by the Workflow Worker that picked the workflow, using the information defined by the user:
49+
- type of task
50+
- task queues
51+
- arguments
52+
and so on.
53+
54+
Once the Workflow Worker iterates through all the defined tasks in the Workflow and builds the DAG locally, it will start
55+
queuing them to remote queues (remote tasks) or run them locally.
56+
57+
Workflows themselves can have queues, so users can define totally isolated fleets of Workflow Workers that do not affect each other.
58+
59+
Tasks are executed on Workflow Workers that have been configured to listen for updates on a specific Queue.

src/salt/backend/worker/workflow_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323

2424
class WorkflowWorker:
25+
2526
def __init__(self):
2627
self._rabbit = pika.BlockingConnection(
2728
pika.ConnectionParameters(

src/salt/backend/workflow_service/workflow_registry.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
"""The workflow registry class exposes utilities to manage Workflows in the workflow table."""
2-
32
import io
43
import pathlib
5-
6-
import redis
7-
import boto3
84
import json
95
import uuid
106

@@ -21,17 +17,8 @@
2117

2218
class WorkflowRegistry:
2319
def __init__(self):
24-
self._redis = redis.Redis(
25-
host=settings.redis_settings.redis_url,
26-
port=settings.redis_settings.redis_port,
27-
db=settings.redis_settings.redis_workflow_db,
28-
)
29-
self._s3 = boto3.client(
30-
"s3",
31-
endpoint_url=settings.s3_settings.s3_endpoint_url,
32-
aws_access_key_id=settings.s3_settings.s3_aws_access_key_id,
33-
aws_secret_access_key=settings.s3_settings.s3_aws_secret_access_key,
34-
)
20+
self._redis = settings.redis_settings.redis_client
21+
self._s3 = settings.s3_settings.s3_client
3522

3623
def register_workflow(self, request_iterator):
3724
"""This method registers a user uploaded workflow in the backend registry."""

src/salt/settings.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
import redis
2+
import boto3
3+
from botocore import client
4+
15
from typing import Optional
26

7+
from pydantic import computed_field
38
from pydantic_settings import BaseSettings
49

510

@@ -8,6 +13,14 @@ class Redis(BaseSettings):
813
redis_port: Optional[int] = "6379"
914
redis_workflow_db: Optional[int] = 0
1015

16+
@computed_field(return_type=redis.Redis)
17+
def redis_client(self):
18+
return redis.Redis(
19+
host=self.redis_url,
20+
port=self.redis_port,
21+
db=self.redis_workflow_db,
22+
)
23+
1124

1225
class RabbitSettings(BaseSettings):
1326
rabbit_url: Optional[str] = "localhost"
@@ -21,6 +34,14 @@ class S3Settings(BaseSettings):
2134
s3_aws_access_key_id: Optional[str] = "miniadmin"
2235
s3_aws_secret_access_key: Optional[str] = "miniadmin"
2336

37+
@computed_field(return_type=client.BaseClient)
38+
def s3_client(self):
39+
return boto3.client(
40+
"s3",
41+
endpoint_url=self.s3_endpoint_url,
42+
aws_access_key_id=self.s3_aws_access_key_id,
43+
aws_secret_access_key=self.s3_aws_secret_access_key,
44+
)
2445

2546
class WorkflowServiceSettings(BaseSettings):
2647
workflow_service_url: Optional[str] = "localhost"

0 commit comments

Comments
 (0)