From 3cdb8d999eef643068ac13201d65eb5ba413f299 Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 2 Aug 2024 08:40:36 +0000 Subject: [PATCH 1/4] [k8s] Fixed bug between threads when there are multiple executions --- lithops/serverless/backends/k8s/entry_point.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lithops/serverless/backends/k8s/entry_point.py b/lithops/serverless/backends/k8s/entry_point.py index a4c6eea63..e3b576f05 100644 --- a/lithops/serverless/backends/k8s/entry_point.py +++ b/lithops/serverless/backends/k8s/entry_point.py @@ -24,8 +24,7 @@ import time import requests from functools import partial -from multiprocessing import Value -from threading import Thread +from multiprocessing import Value, Process from lithops.version import __version__ from lithops.utils import setup_lithops_logger, b64str_to_dict @@ -178,7 +177,7 @@ def callback_work_queue(ch, method, properties, body): with running_jobs.get_lock(): running_jobs.value -= processes_to_start - Thread(target=run_job_k8s_rabbitmq, args=([message])).start() + Process(target=run_job_k8s_rabbitmq, args=(message,)).start() ch.basic_ack(delivery_tag=method.delivery_tag) From 390f899b0b0a584b6e1239052584ffff6969fbb3 Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 2 Aug 2024 11:40:04 +0000 Subject: [PATCH 2/4] Updated CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c61c4291..aa60d8dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Fixed - [Storage] Fixed "KeyError: 'monitoring_interval'" error when instantiating Storage() class +- [k8s] Fixed bug between threads when there are multiple executions ## [v3.4.1] From 089e912a5ed691bdfe38d78a6cfd4c434c32219a Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:09:21 +0000 Subject: [PATCH 3/4] [K8s] Fixed error with first execution [Singularity] Fixed error with first execution --- lithops/serverless/backends/k8s/k8s.py | 4 ++++ lithops/serverless/backends/singularity/singularity.py | 5 ++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lithops/serverless/backends/k8s/k8s.py b/lithops/serverless/backends/k8s/k8s.py index ad421a05e..81dcb0c9e 100644 --- a/lithops/serverless/backends/k8s/k8s.py +++ b/lithops/serverless/backends/k8s/k8s.py @@ -100,6 +100,7 @@ def __init__(self, k8s_config, internal_storage): params = pika.URLParameters(self.amqp_url) self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() + self.channel.queue_declare(queue='task_queue', durable=True) # Define some needed variables self._get_nodes() @@ -275,6 +276,9 @@ def clean(self, all=False): pass except ApiException: pass + + if self.rabbitmq_executor: + self.channel.queue_delete(queue='task_queue') def clear(self, job_keys=None): """ diff --git a/lithops/serverless/backends/singularity/singularity.py b/lithops/serverless/backends/singularity/singularity.py index 4704da3c5..10c62882b 100644 --- a/lithops/serverless/backends/singularity/singularity.py +++ b/lithops/serverless/backends/singularity/singularity.py @@ -49,6 +49,7 @@ def __init__(self, singularity_config, internal_storage): params = pika.URLParameters(self.amqp_url) self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() + self.channel.queue_declare(queue='task_queue', durable=True) msg = COMPUTE_CLI_MSG.format('Singularity') logger.info(f"{msg}") @@ -140,9 +141,7 @@ def clean(self, all=False): Deletes all jobs """ logger.debug('Cleaning RabbitMQ queues') - delete_queues = ['task_queue', 'status_queue'] - for queue in delete_queues: - self.channel.queue_delete(queue=queue) + self.channel.queue_delete(queue='task_queue') def list_runtimes(self, singularity_image_name='all'): """ From 17a279e0dc051675f386d755b4936dba02c59e72 Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:36:50 +0000 Subject: [PATCH 4/4] Fixed linting --- lithops/serverless/backends/k8s/k8s.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lithops/serverless/backends/k8s/k8s.py b/lithops/serverless/backends/k8s/k8s.py index 81dcb0c9e..d0f726840 100644 --- a/lithops/serverless/backends/k8s/k8s.py +++ b/lithops/serverless/backends/k8s/k8s.py @@ -276,7 +276,7 @@ def clean(self, all=False): pass except ApiException: pass - + if self.rabbitmq_executor: self.channel.queue_delete(queue='task_queue')