From fb200826f0f47049621bd8aaa21f442d053ae4d4 Mon Sep 17 00:00:00 2001 From: tobmes Date: Wed, 18 Feb 2026 13:36:21 +0000 Subject: [PATCH 1/6] Add dynamic Celery configuration loading from environment variables --- source/app/configuration.py | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/source/app/configuration.py b/source/app/configuration.py index 6d84a7d81..8fdbac24a 100644 --- a/source/app/configuration.py +++ b/source/app/configuration.py @@ -18,6 +18,7 @@ import configparser from app.logger import logger +import json import logging import os import ssl @@ -228,6 +229,54 @@ class CeleryConfig: broker_connection_retry_on_startup = True +_celery_settings = [ + ("accept_content", json.loads), + ("task_serializer", str), + ("result_serializer", str), + ("worker_max_tasks_per_child", int), + ("worker_prefetch_multiplier", int), + ("worker_concurrency", int), + ("worker_pool_restarts", bool), + ("enable_utc", bool), + ("task_acks_late", bool), + ("task_ignore_result", bool), + ("task_send_sent_event", bool), + ("broker_connection_retry_on_startup", bool), + ("beat_schedule", json.loads), + ("beat_scheduler", str), + ("timezone", str), + ("result_expires", int), + ("result_persistent", bool), + ("task_default_queue", str), + ("task_default_exchange", str), + ("task_default_routing_key", str), + ("broker_connection_timeout", int), + ("broker_connection_max_retries", int), + ("broker_heartbeat", int), + ("task_publish_retry", bool), + ("task_publish_retry_policy", json.loads), + ("task_queues", json.loads), + ("task_routes", json.loads), + ("task_compression", str), + ("result_compression", str), + ("result_backend_transport_options", json.loads), + ("redis_backend_use_ssl", bool), + ("cassandra_table", str), + ("cassandra_keyspace", str), + ("cassandra_servers", json.loads), +] + +for _setting, _parse in _celery_settings: + _env_var = f"CELERY__{_setting}" + if _env_var in os.environ: + _value = os.environ[_env_var] + try: + _parsed_value = _parse(_value) + setattr(CeleryConfig, _setting, _parsed_value) + except (ValueError, SyntaxError) as e: + logger.warning(f"Failed to parse {_env_var}: {e}") + + class Config: # Handled by bumpversion IRIS_VERSION = "v2.5.0-beta.1-dev-pr3" # DO NOT EDIT THIS LINE MANUALLY From dfc4364e6cd6f82b3f38ba4a6f0550b2bf876a16 Mon Sep 17 00:00:00 2001 From: tobmes Date: Thu, 19 Feb 2026 06:09:30 +0000 Subject: [PATCH 2/6] Add security configuration for Celery using environment variables --- source/app/configuration.py | 5 +++++ source/app/iris_engine/tasker/celery.py | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/source/app/configuration.py b/source/app/configuration.py index 8fdbac24a..f1ca01895 100644 --- a/source/app/configuration.py +++ b/source/app/configuration.py @@ -228,6 +228,11 @@ class CeleryConfig: worker_pool_restarts = True broker_connection_retry_on_startup = True + security_key = os.environ.get('CELERY_SECURITY_KEY', '') + security_certificate = os.environ.get('CELERY_SECURITY_CERT', '') + security_cert_store = os.environ.get('CELERY_SECURITY_CERT_STORE', '') + security_digest = 'sha256' + _celery_settings = [ ("accept_content", json.loads), diff --git a/source/app/iris_engine/tasker/celery.py b/source/app/iris_engine/tasker/celery.py index 3eb377cb4..5083f19e7 100644 --- a/source/app/iris_engine/tasker/celery.py +++ b/source/app/iris_engine/tasker/celery.py @@ -17,15 +17,27 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from celery import Celery +from celery.utils.security import setup_security from app.configuration import CeleryConfig def make_celery(name): - return Celery( + celery_app = Celery( name, config_source=CeleryConfig ) + if CeleryConfig.security_key and CeleryConfig.security_certificate: + setup_security( + allowed_serializers=['auth'], + key=CeleryConfig.security_key, + cert=CeleryConfig.security_certificate, + store=CeleryConfig.security_cert_store, + digest=CeleryConfig.security_digest + ) + + return celery_app + def set_celery_flask_context(celery: Celery, app): class ContextTask(celery.Task): From d4ad6c6051d23b5e0f2d2177e4bd9eef23076151 Mon Sep 17 00:00:00 2001 From: tobmes Date: Thu, 19 Feb 2026 06:09:30 +0000 Subject: [PATCH 3/6] Refactor Celery configuration to enhance security and add dynamic parsing for settings --- source/app/configuration.py | 140 +++++++++++++++++++----- source/app/iris_engine/tasker/celery.py | 14 ++- 2 files changed, 128 insertions(+), 26 deletions(-) diff --git a/source/app/configuration.py b/source/app/configuration.py index 8fdbac24a..c0d93c597 100644 --- a/source/app/configuration.py +++ b/source/app/configuration.py @@ -228,42 +228,132 @@ class CeleryConfig: worker_pool_restarts = True broker_connection_retry_on_startup = True + security_key = '' + security_certificate = '' + security_cert_store = '' + security_digest = 'sha256' + + +def _parse_bool(value): + if isinstance(value, bool): + return value + return value.lower() in ('true', '1', 'yes', 'on') + +def _parse_float(value): + return float(value) _celery_settings = [ ("accept_content", json.loads), - ("task_serializer", str), - ("result_serializer", str), - ("worker_max_tasks_per_child", int), - ("worker_prefetch_multiplier", int), - ("worker_concurrency", int), - ("worker_pool_restarts", bool), - ("enable_utc", bool), - ("task_acks_late", bool), - ("task_ignore_result", bool), - ("task_send_sent_event", bool), - ("broker_connection_retry_on_startup", bool), + ("enable_utc", _parse_bool), + ("imports", json.loads), + ("include", json.loads), + ("timezone", str), + ("beat_max_loop_interval", int), ("beat_schedule", json.loads), ("beat_scheduler", str), - ("timezone", str), + ("beat_schedule_filename", str), + ("beat_sync_every", int), + ("broker_url", str), + ("broker_transport", str), + ("broker_transport_options", json.loads), + ("broker_connection_timeout", int), + ("broker_connection_retry", _parse_bool), + ("broker_connection_max_retries", int), + ("broker_failover_strategy", str), + ("broker_heartbeat", _parse_float), + ("broker_login_method", str), + ("broker_pool_limit", int), + ("broker_use_ssl", _parse_bool), + ("cache_backend", str), + ("cache_backend_options", json.loads), + ("cassandra_table", str), + ("cassandra_entry_ttl", int), + ("cassandra_keyspace", str), + ("cassandra_port", int), + ("cassandra_read_consistency", str), + ("cassandra_servers", json.loads), + ("cassandra_write_consistency", str), + ("cassandra_options", json.loads), + ("s3_access_key_id", str), + ("s3_secret_access_key", str), + ("s3_bucket", str), + ("s3_base_path", str), + ("s3_endpoint_url", str), + ("s3_region", str), + ("couchbase_backend_settings", json.loads), + ("arangodb_backend_settings", json.loads), + ("mongodb_backend_settings", json.loads), + ("event_queue_expires", _parse_float), + ("event_queue_ttl", _parse_float), + ("event_queue_prefix", str), + ("event_serializer", str), + ("redis_db", str), + ("redis_host", str), + ("redis_max_connections", int), + ("redis_username", str), + ("redis_password", str), + ("redis_port", int), + ("redis_backend_use_ssl", json.loads), + ("result_backend", str), + ("result_cache_max", int), + ("result_compression", str), + ("result_exchange", str), + ("result_exchange_type", str), ("result_expires", int), - ("result_persistent", bool), - ("task_default_queue", str), + ("result_persistent", _parse_bool), + ("result_serializer", str), + ("database_engine_options", json.loads), + ("database_short_lived_sessions", _parse_bool), + ("database_db_names", json.loads), + ("security_certificate", str), + ("security_cert_store", str), + ("security_key", str), + ("task_acks_late", _parse_bool), + ("task_acks_on_failure_or_timeout", _parse_bool), + ("task_always_eager", _parse_bool), + ("task_annotations", json.loads), + ("task_compression", str), + ("task_create_missing_queues", _parse_bool), + ("task_default_delivery_mode", str), ("task_default_exchange", str), + ("task_default_exchange_type", str), + ("task_default_queue", str), + ("task_default_rate_limit", int), ("task_default_routing_key", str), - ("broker_connection_timeout", int), - ("broker_connection_max_retries", int), - ("broker_heartbeat", int), - ("task_publish_retry", bool), + ("task_eager_propagates", _parse_bool), + ("task_ignore_result", _parse_bool), + ("task_publish_retry", _parse_bool), ("task_publish_retry_policy", json.loads), ("task_queues", json.loads), ("task_routes", json.loads), - ("task_compression", str), - ("result_compression", str), - ("result_backend_transport_options", json.loads), - ("redis_backend_use_ssl", bool), - ("cassandra_table", str), - ("cassandra_keyspace", str), - ("cassandra_servers", json.loads), + ("task_send_sent_event", _parse_bool), + ("task_serializer", str), + ("task_soft_time_limit", int), + ("task_track_started", _parse_bool), + ("task_reject_on_worker_lost", _parse_bool), + ("task_time_limit", int), + ("worker_agent", str), + ("worker_autoscaler", str), + ("worker_concurrency", int), + ("worker_consumer", str), + ("worker_direct", _parse_bool), + ("worker_disable_rate_limits", _parse_bool), + ("worker_enable_remote_control", _parse_bool), + ("worker_log_color", _parse_bool), + ("worker_log_format", str), + ("worker_lost_wait", _parse_float), + ("worker_max_tasks_per_child", int), + ("worker_pool", str), + ("worker_pool_putlocks", _parse_bool), + ("worker_pool_restarts", _parse_bool), + ("worker_prefetch_multiplier", int), + ("worker_redirect_stdouts", _parse_bool), + ("worker_redirect_stdouts_level", str), + ("worker_send_task_events", _parse_bool), + ("worker_state_db", str), + ("worker_task_log_format", str), + ("worker_timer", str), + ("worker_timer_precision", _parse_float), ] for _setting, _parse in _celery_settings: diff --git a/source/app/iris_engine/tasker/celery.py b/source/app/iris_engine/tasker/celery.py index 3eb377cb4..5083f19e7 100644 --- a/source/app/iris_engine/tasker/celery.py +++ b/source/app/iris_engine/tasker/celery.py @@ -17,15 +17,27 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from celery import Celery +from celery.utils.security import setup_security from app.configuration import CeleryConfig def make_celery(name): - return Celery( + celery_app = Celery( name, config_source=CeleryConfig ) + if CeleryConfig.security_key and CeleryConfig.security_certificate: + setup_security( + allowed_serializers=['auth'], + key=CeleryConfig.security_key, + cert=CeleryConfig.security_certificate, + store=CeleryConfig.security_cert_store, + digest=CeleryConfig.security_digest + ) + + return celery_app + def set_celery_flask_context(celery: Celery, app): class ContextTask(celery.Task): From c81d336c205984bd5218c16e9e66a4df6d608938 Mon Sep 17 00:00:00 2001 From: tobmes Date: Thu, 19 Feb 2026 08:39:11 +0000 Subject: [PATCH 4/6] Fix import path for Celery security setup --- source/app/iris_engine/tasker/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/app/iris_engine/tasker/celery.py b/source/app/iris_engine/tasker/celery.py index 5083f19e7..bcf19bef2 100644 --- a/source/app/iris_engine/tasker/celery.py +++ b/source/app/iris_engine/tasker/celery.py @@ -17,7 +17,7 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from celery import Celery -from celery.utils.security import setup_security +from celery.security import setup_security from app.configuration import CeleryConfig From 95a91eff3282cb002683dddc55363b8b6bd08dff Mon Sep 17 00:00:00 2001 From: tobmes Date: Thu, 19 Feb 2026 08:39:11 +0000 Subject: [PATCH 5/6] Refactor authentication serializer registration in Celery setup --- source/app/iris_engine/tasker/celery.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/source/app/iris_engine/tasker/celery.py b/source/app/iris_engine/tasker/celery.py index 5083f19e7..f3f1b8705 100644 --- a/source/app/iris_engine/tasker/celery.py +++ b/source/app/iris_engine/tasker/celery.py @@ -17,11 +17,26 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from celery import Celery -from celery.utils.security import setup_security +from celery.security import setup_security +from kombu.serialization import register from app.configuration import CeleryConfig +def _register_auth_serializer(): + import json + + def _encode_auth(data): + return json.dumps(data), 'application/auth' + + def _decode_auth(data): + return json.loads(data) + + register('auth', _encode_auth, _decode_auth, content_type='application/auth') + + def make_celery(name): + _register_auth_serializer() + celery_app = Celery( name, config_source=CeleryConfig From 74920c1b8263b5e562779fb4ddb557e440cdd804 Mon Sep 17 00:00:00 2001 From: tobmes Date: Thu, 19 Feb 2026 14:30:36 +0000 Subject: [PATCH 6/6] Remove unnecessary blank line in celery.py --- source/app/iris_engine/tasker/celery.py | 1 - 1 file changed, 1 deletion(-) diff --git a/source/app/iris_engine/tasker/celery.py b/source/app/iris_engine/tasker/celery.py index 4d8ef1523..bc353f9d8 100644 --- a/source/app/iris_engine/tasker/celery.py +++ b/source/app/iris_engine/tasker/celery.py @@ -21,7 +21,6 @@ from kombu.serialization import register from app.configuration import CeleryConfig - def _patch_celery_cert_datetime(): import datetime from celery.security.certificate import Certificate