File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1818class Scheduler :
1919 def __init__ (self ):
2020 self ._redis = settings .redis_settings .redis_client
21+ self ._rabbit = settings .rabbit_settings .rabbit_client
2122
2223 def loop (self ):
2324 sched = BackgroundScheduler ()
@@ -47,16 +48,7 @@ def queue_workflow(self, workflow_data):
4748 TODO The handling of KILL signal (e.g. to stop the workflow) is handled using Kafka.
4849 Once the job is picked by a worker, the worker starts listening for KILL signals through Kafka or similar.
4950 """
50- rabbit = pika .BlockingConnection (
51- pika .ConnectionParameters (
52- host = settings .rabbit_settings .rabbit_url ,
53- credentials = pika .PlainCredentials (
54- settings .rabbit_settings .rabbit_user ,
55- settings .rabbit_settings .rabbit_password ,
56- ),
57- )
58- )
59- channel = rabbit .channel ()
51+ channel = self ._rabbit .channel ()
6052 queue = workflow_data .get ("queue" , "default" )
6153 queue = "workflows" if queue == "default" else queue
6254
Original file line number Diff line number Diff line change 11import redis
22import boto3
3+ import pika
34from botocore import client
45
56from typing import Optional
@@ -28,6 +29,17 @@ class RabbitSettings(BaseSettings):
2829 rabbit_user : Optional [str ] = "admin"
2930 rabbit_password : Optional [str ] = "admin"
3031
32+ @computed_field (return_type = pika .BlockingConnection )
33+ def rabbit_client (self ):
34+ pika .BlockingConnection (
35+ pika .ConnectionParameters (
36+ host = self .rabbit_url ,
37+ credentials = pika .PlainCredentials (
38+ self .rabbit_user ,
39+ self .rabbit_password ,
40+ ),
41+ )
42+ )
3143
3244class S3Settings (BaseSettings ):
3345 s3_endpoint_url : Optional [str ] = "http://localhost:9000"
You can’t perform that action at this time.
0 commit comments