Skip to content

Commit f2af417

Browse files
authored
Merge pull request #5 from PROCYDE/SIEM-12834-rabbitmq-clery-hardening
Siem 12834 rabbitmq clery hardening
2 parents cea460d + 74920c1 commit f2af417

2 files changed

Lines changed: 183 additions & 1 deletion

File tree

source/app/configuration.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import configparser
2020
from app.logger import logger
21+
import json
2122
import logging
2223
import os
2324
import ssl
@@ -227,6 +228,144 @@ class CeleryConfig:
227228
worker_pool_restarts = True
228229
broker_connection_retry_on_startup = True
229230

231+
security_key = ''
232+
security_certificate = ''
233+
security_cert_store = ''
234+
security_digest = 'sha256'
235+
236+
237+
def _parse_bool(value):
238+
if isinstance(value, bool):
239+
return value
240+
return value.lower() in ('true', '1', 'yes', 'on')
241+
242+
def _parse_float(value):
243+
return float(value)
244+
245+
_celery_settings = [
246+
("accept_content", json.loads),
247+
("enable_utc", _parse_bool),
248+
("imports", json.loads),
249+
("include", json.loads),
250+
("timezone", str),
251+
("beat_max_loop_interval", int),
252+
("beat_schedule", json.loads),
253+
("beat_scheduler", str),
254+
("beat_schedule_filename", str),
255+
("beat_sync_every", int),
256+
("broker_url", str),
257+
("broker_transport", str),
258+
("broker_transport_options", json.loads),
259+
("broker_connection_timeout", int),
260+
("broker_connection_retry", _parse_bool),
261+
("broker_connection_max_retries", int),
262+
("broker_failover_strategy", str),
263+
("broker_heartbeat", _parse_float),
264+
("broker_login_method", str),
265+
("broker_pool_limit", int),
266+
("broker_use_ssl", _parse_bool),
267+
("cache_backend", str),
268+
("cache_backend_options", json.loads),
269+
("cassandra_table", str),
270+
("cassandra_entry_ttl", int),
271+
("cassandra_keyspace", str),
272+
("cassandra_port", int),
273+
("cassandra_read_consistency", str),
274+
("cassandra_servers", json.loads),
275+
("cassandra_write_consistency", str),
276+
("cassandra_options", json.loads),
277+
("s3_access_key_id", str),
278+
("s3_secret_access_key", str),
279+
("s3_bucket", str),
280+
("s3_base_path", str),
281+
("s3_endpoint_url", str),
282+
("s3_region", str),
283+
("couchbase_backend_settings", json.loads),
284+
("arangodb_backend_settings", json.loads),
285+
("mongodb_backend_settings", json.loads),
286+
("event_queue_expires", _parse_float),
287+
("event_queue_ttl", _parse_float),
288+
("event_queue_prefix", str),
289+
("event_serializer", str),
290+
("redis_db", str),
291+
("redis_host", str),
292+
("redis_max_connections", int),
293+
("redis_username", str),
294+
("redis_password", str),
295+
("redis_port", int),
296+
("redis_backend_use_ssl", json.loads),
297+
("result_backend", str),
298+
("result_cache_max", int),
299+
("result_compression", str),
300+
("result_exchange", str),
301+
("result_exchange_type", str),
302+
("result_expires", int),
303+
("result_persistent", _parse_bool),
304+
("result_serializer", str),
305+
("database_engine_options", json.loads),
306+
("database_short_lived_sessions", _parse_bool),
307+
("database_db_names", json.loads),
308+
("security_certificate", str),
309+
("security_cert_store", str),
310+
("security_key", str),
311+
("task_acks_late", _parse_bool),
312+
("task_acks_on_failure_or_timeout", _parse_bool),
313+
("task_always_eager", _parse_bool),
314+
("task_annotations", json.loads),
315+
("task_compression", str),
316+
("task_create_missing_queues", _parse_bool),
317+
("task_default_delivery_mode", str),
318+
("task_default_exchange", str),
319+
("task_default_exchange_type", str),
320+
("task_default_queue", str),
321+
("task_default_rate_limit", int),
322+
("task_default_routing_key", str),
323+
("task_eager_propagates", _parse_bool),
324+
("task_ignore_result", _parse_bool),
325+
("task_publish_retry", _parse_bool),
326+
("task_publish_retry_policy", json.loads),
327+
("task_queues", json.loads),
328+
("task_routes", json.loads),
329+
("task_send_sent_event", _parse_bool),
330+
("task_serializer", str),
331+
("task_soft_time_limit", int),
332+
("task_track_started", _parse_bool),
333+
("task_reject_on_worker_lost", _parse_bool),
334+
("task_time_limit", int),
335+
("worker_agent", str),
336+
("worker_autoscaler", str),
337+
("worker_concurrency", int),
338+
("worker_consumer", str),
339+
("worker_direct", _parse_bool),
340+
("worker_disable_rate_limits", _parse_bool),
341+
("worker_enable_remote_control", _parse_bool),
342+
("worker_log_color", _parse_bool),
343+
("worker_log_format", str),
344+
("worker_lost_wait", _parse_float),
345+
("worker_max_tasks_per_child", int),
346+
("worker_pool", str),
347+
("worker_pool_putlocks", _parse_bool),
348+
("worker_pool_restarts", _parse_bool),
349+
("worker_prefetch_multiplier", int),
350+
("worker_redirect_stdouts", _parse_bool),
351+
("worker_redirect_stdouts_level", str),
352+
("worker_send_task_events", _parse_bool),
353+
("worker_state_db", str),
354+
("worker_task_log_format", str),
355+
("worker_timer", str),
356+
("worker_timer_precision", _parse_float),
357+
]
358+
359+
for _setting, _parse in _celery_settings:
360+
_env_var = f"CELERY__{_setting}"
361+
if _env_var in os.environ:
362+
_value = os.environ[_env_var]
363+
try:
364+
_parsed_value = _parse(_value)
365+
setattr(CeleryConfig, _setting, _parsed_value)
366+
except (ValueError, SyntaxError) as e:
367+
logger.warning(f"Failed to parse {_env_var}: {e}")
368+
230369

231370
class Config:
232371
# Handled by bumpversion

source/app/iris_engine/tasker/celery.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,58 @@
1717
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
1818

1919
from celery import Celery
20+
from celery.security import setup_security
21+
from kombu.serialization import register
2022
from app.configuration import CeleryConfig
2123

24+
def _patch_celery_cert_datetime():
25+
import datetime
26+
from celery.security.certificate import Certificate
27+
28+
_original_has_expired = Certificate.has_expired
29+
30+
def _patched_has_expired(self):
31+
try:
32+
return _original_has_expired(self)
33+
except TypeError:
34+
not_valid_after = self._cert.not_valid_after_utc
35+
return datetime.datetime.now(datetime.timezone.utc) >= not_valid_after
36+
37+
Certificate.has_expired = _patched_has_expired
38+
39+
40+
def _register_auth_serializer():
41+
import json
42+
43+
def _encode_auth(data):
44+
return json.dumps(data), 'application/auth'
45+
46+
def _decode_auth(data):
47+
return json.loads(data)
48+
49+
register('auth', _encode_auth, _decode_auth, content_type='application/auth')
50+
2251

2352
def make_celery(name):
24-
return Celery(
53+
_register_auth_serializer()
54+
55+
celery_app = Celery(
2556
name,
2657
config_source=CeleryConfig
2758
)
2859

60+
if CeleryConfig.security_key and CeleryConfig.security_certificate:
61+
_patch_celery_cert_datetime()
62+
setup_security(
63+
allowed_serializers=['auth'],
64+
key=CeleryConfig.security_key,
65+
cert=CeleryConfig.security_certificate,
66+
store=CeleryConfig.security_cert_store,
67+
digest=CeleryConfig.security_digest
68+
)
69+
70+
return celery_app
71+
2972

3073
def set_celery_flask_context(celery: Celery, app):
3174
class ContextTask(celery.Task):

0 commit comments

Comments
 (0)