From f1b16cbc259c7afea4c2a0cc7e936d4b33b9abe9 Mon Sep 17 00:00:00 2001 From: James Brookes Date: Tue, 15 Feb 2022 16:45:45 +0000 Subject: [PATCH 1/2] Remove references to ActiveMQ Queue Client --- autoreduce_utils/clients/queue_client.py | 136 ------------ .../settings/client_settings_factory.py | 27 +-- .../tests/test_client_settings_factory.py | 25 --- .../clients/tests/test_queue_client.py | 195 ------------------ autoreduce_utils/credentials.py | 6 - 5 files changed, 1 insertion(+), 388 deletions(-) delete mode 100644 autoreduce_utils/clients/queue_client.py delete mode 100644 autoreduce_utils/clients/tests/test_queue_client.py diff --git a/autoreduce_utils/clients/queue_client.py b/autoreduce_utils/clients/queue_client.py deleted file mode 100644 index 0b2a0dd..0000000 --- a/autoreduce_utils/clients/queue_client.py +++ /dev/null @@ -1,136 +0,0 @@ -# ############################################################################### # -# Autoreduction Repository : https://github.com/autoreduction/autoreduce -# -# Copyright © 2020 ISIS Rutherford Appleton Laboratory UKRI -# SPDX - License - Identifier: GPL-3.0-or-later -# ############################################################################### # -""" -Client class for accessing queuing service -""" -import logging -import os -import uuid -import socket - -import stomp -from stomp.exception import ConnectFailedException - -from autoreduce_utils.message.message import Message -from autoreduce_utils.clients.connection_exception import ConnectionException - - -class QueueClient(): - """ - Class for client to access messaging service via python - """ - - def __init__(self, consumer_name='queue_client'): - self.activemq_host = os.getenv("ACTIVEMQ_HOST") - self.activemq_port = os.getenv("ACTIVEMQ_PORT") - self.activemq_user = os.getenv("ACTIVEMQ_USERNAME") - self._connection = None - self._consumer_name = consumer_name - self._logger = logging.getLogger(__package__) - - def connect(self): - """ - Create the connection if the connection has not been created - """ - - if self._connection is None or not self._connection.is_connected(): - self.disconnect() - self._create_connection() - - def _test_connection(self): - if self._connection.is_connected(): - return True - raise ConnectionException("ActiveMQ") - - def disconnect(self): - """ - Disconnect from queue service - """ - - self._logger.info("Starting disconnect from ActiveMQ...") - if self._connection is not None and self._connection.is_connected(): - # By passing a receipt Stomp will call stop on the transport layer - # which causes it to wait on the listener thread (if it's still - # running). Without this step we just drop out, so the behaviour - # is not guaranteed. UUID is used by Stomp if we don't pass in - # a receipt so this matches the behaviour under the hood - try: - self._connection.remove_listener("queue_processor") - except KeyError: - pass # If no listener was set for this client instance - self._connection.disconnect(receipt=str(uuid.uuid4())) - self._logger.info("Disconnected from ActiveMQ.") - self._connection = None - - def _create_connection(self): - """ - Create the connection to the queuing service and store as self._connection - """ - - inteded_for_production = "AUTOREDUCTION_PRODUCTION" in os.environ - - aimed_at_dev = False - if self.activemq_host.startswith("127") or "dev" in self.activemq_host or "activemq" in self.activemq_host: - aimed_at_dev = True - # Prevent unintentional submission to non-development envs - if not inteded_for_production and not aimed_at_dev: - raise RuntimeError( - f"Trying to submit to a potentially non-development environment at `{self.activemq_host}`. " - "You must declare AUTOREDUCTION_PRODUCTION in the environment " - "if you intend to submit to the production environment") - if inteded_for_production and aimed_at_dev: - raise RuntimeError(f"Trying to submit to production environment but host is `{self.activemq_host}`. " - "Remove AUTOREDUCTION_PRODUCTION if that is unintentional.") - - if self._connection is None or not self._connection.is_connected(): - try: - host_port = [(self.activemq_host, int(self.activemq_port))] - connection = stomp.Connection(host_and_ports=host_port) - self._logger.info("Starting connection to %s", host_port) - connection.connect(username=self.activemq_user, passcode=os.getenv("ACTIVEMQ_PASSWORD"), wait=True) - except ConnectFailedException as exp: - raise ConnectionException("ActiveMQ") from exp - self._connection = connection - - def subscribe(self, listener): - """ - Subscribes to the data_ready queue - :param listener: QueueListener object - """ - - self._logger.info("Subscribing to data ready queue") - self._connection.set_listener("queue_processor", listener) - self._connection.subscribe(destination='/queue/DataReady', - id=socket.getfqdn(), - ack="client-individual", - header={"activemq.prefetchSize": "1"}) - - def ack(self, message_id, subscription): - """ - Acknowledge receipt of a message - :param message_id: The identifier of the message - """ - # pylint:disable=no-value-for-parameter - self._connection.ack(message_id, subscription) - - def send(self, destination, message, persistent='true', priority='4', delay=None): - """ - Send a message via the open connection to a queue - :param destination: Queue to send to - :param message: Message instance OR json dump of dict containing message payload - :param persistent: should to message be persistent - :param priority: priority rating of the message - :param delay: time to wait before send - """ - self.connect() - - if isinstance(message, Message): - message_json_dump = message.serialize() - else: - message_json_dump = message - - self._connection.send(destination, message_json_dump, persistent=persistent, priority=priority, delay=delay) diff --git a/autoreduce_utils/clients/settings/client_settings_factory.py b/autoreduce_utils/clients/settings/client_settings_factory.py index 5d7ac64..64aa889 100644 --- a/autoreduce_utils/clients/settings/client_settings_factory.py +++ b/autoreduce_utils/clients/settings/client_settings_factory.py @@ -17,7 +17,7 @@ class ClientSettingsFactory: """ ignore_kwargs = ['username', 'password', 'host', 'port'] - valid_types = ['database', 'icat', 'queue', 'sftp', 'cycle'] + valid_types = ['database', 'icat', 'sftp', 'cycle'] def create(self, settings_type, username, password, host, port, **kwargs): """ @@ -47,8 +47,6 @@ def create(self, settings_type, username, password, host, port, **kwargs): settings = self._create_database(**kwargs) elif settings_type.lower() == 'icat': settings = self._create_icat(**kwargs) - elif settings_type.lower() == 'queue': - settings = self._create_queue(**kwargs) elif settings_type.lower() == 'sftp': settings = self._create_sftp(**kwargs) elif settings_type.lower() == 'cycle': @@ -63,14 +61,6 @@ def _create_database(self, **kwargs): self._test_kwargs(database_kwargs, kwargs) return MySQLSettings(**kwargs) - def _create_queue(self, **kwargs): - """ - :return: Queue compatible settings object - """ - queue_kwargs = ['data_ready'] - self._test_kwargs(queue_kwargs, kwargs) - return ActiveMQSettings(**kwargs) - def _create_icat(self, **kwargs): """ :return: Icat compatible settings object @@ -131,21 +121,6 @@ def get_full_connection_string(self): return f'mysql+mysqldb://{self.username}:{self.password}@{self.host}/{self.database}' -class ActiveMQSettings(ClientSettings): - """ - ActiveMq settings to be used as a Queue settings object - """ - data_ready = None - all_subscriptions = None - - def __init__(self, data_ready='/queue/DataReady', **kwargs): - # TODO explicitly state args - super(ActiveMQSettings, self).__init__(**kwargs) # pylint:disable=super-with-arguments - - self.data_ready = data_ready - self.all_subscriptions = [data_ready] - - class SFTPSettings(ClientSettings): """ SFTP settings object diff --git a/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py b/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py index 7e37018..4b5286d 100644 --- a/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py +++ b/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py @@ -34,20 +34,6 @@ def test_create_database(self): self.assertEqual(actual.database, 'test-name') self.assertEqual(actual.get_full_connection_string(), 'mysql+mysqldb://test-user:test-pass@test-host/test-name') - def test_create_queue(self): - actual = self.factory.create('queue', - username='test-user', - password='test-pass', - host='test-host', - port='test-port', - data_ready='test-dr') - self.assertIsInstance(actual, ActiveMQSettings) - self.assertEqual(actual.username, 'test-user') - self.assertEqual(actual.password, 'test-pass') - self.assertEqual(actual.host, 'test-host') - self.assertEqual(actual.port, 'test-port') - self.assertEqual(actual.data_ready, 'test-dr') - def test_create_icat(self): actual = self.factory.create('icat', username='test-user', @@ -78,17 +64,6 @@ def test_invalid_database_args(self): 'port', database_invalid='invalid') - def test_invalid_queue_args(self): - self.assertRaisesRegex(ValueError, - "queue_invalid is not a recognised key word argument.", - self.factory.create, - 'queue', - 'user', - 'pass', - 'host', - 'port', - queue_invalid='invalid') - def test_invalid_icat_args(self): self.assertRaisesRegex(ValueError, "icat_invalid is not a recognised key word argument.", diff --git a/autoreduce_utils/clients/tests/test_queue_client.py b/autoreduce_utils/clients/tests/test_queue_client.py deleted file mode 100644 index 42b9f66..0000000 --- a/autoreduce_utils/clients/tests/test_queue_client.py +++ /dev/null @@ -1,195 +0,0 @@ -# ############################################################################### # -# Autoreduction Repository : https://github.com/autoreduction/autoreduce -# -# Copyright © 2020 ISIS Rutherford Appleton Laboratory UKRI -# SPDX - License - Identifier: GPL-3.0-or-later -# ############################################################################### # -""" -Test functionality for the activemq client -""" -import os -import socket - -from unittest import TestCase, mock -from unittest.mock import patch, MagicMock - -from autoreduce_utils.message.message import Message -from autoreduce_utils.clients.connection_exception import ConnectionException -from autoreduce_utils.clients.queue_client import QueueClient - - -# pylint:disable=protected-access,no-self-use -class TestQueueClient(TestCase): - """ - Exercises the queue client - """ - - def test_default_init(self): - """ - Test: Class variables are created and set - When: QueueClient is initialised with default credentials - """ - - client = QueueClient() - self.assertIsNone(client._connection) - self.assertEqual('queue_client', client._consumer_name) - - def test_valid_connection(self): - """ - Test: Access is established with a valid connection - (This by proxy will also test the get_connection function) - When: connect is called while valid credentials are held - """ - - client = QueueClient() - client.connect() - self.assertTrue(client._connection.is_connected()) - - @mock.patch.dict(os.environ, { - "ACTIVEMQ_USERNAME": "not-user", - "ACTIVEMQ_PASSWORD": "not-pass", - "ACTIVEMQ_HOST": "127.does.not.exist", - "ACTIVEMQ_PORT": "1234" - }, - clear=True) - def test_connection_failed_invalid_credentials(self): - """ - Test: A ConnectionException is raised - When: _test_connection is called while invalid credentials are held - """ - client = QueueClient() - with self.assertRaises(ConnectionException): - client.connect() - - def test_stop_connection(self): - """ - Test: Connection is stopped and connection variables are set to None - When: disconnect is called while a valid connection is currently established - """ - - client = QueueClient() - mocked_connection = mock.Mock() - client._connection = mocked_connection - - with mock.patch("uuid.uuid4") as patched_uuid: - patched_uuid.return_value = 1 - client.disconnect() - - mocked_connection.disconnect.assert_called_with(receipt=str(1)) - self.assertIsNone(client._connection) - - @patch('stomp.connect.StompConnection11.send') - def test_send_with_raw_string(self, mock_stomp_send): - """ - Test: send sends the given data using stomp.send - When: send is called with a string argument for message - """ - - client = QueueClient() - client.send('dataready', 'raw_json_dump') - (args, _) = mock_stomp_send.call_args - self.assertEqual(args[0], 'dataready') - self.assertEqual(args[1], 'raw_json_dump') - - @patch('stomp.connect.StompConnection11.send') - def test_send_with_message_instance(self, mock_stomp_send): - """ - Test: send sends the given data using stomp.send - When: send is called with a Message instance argument for message - """ - - client = QueueClient() - message = Message(description="test-message") - client.send('dataready', message) - (args, _) = mock_stomp_send.call_args - self.assertEqual(args[0], 'dataready') - self.assertEqual(args[1], message.serialize()) - - @patch('stomp.connect.StompConnection11.ack') - def test_ack(self, mock_stomp_ack): - """ - Test: ack sends an ack frame using stomp.ack - When: ack is called while a valid connection is held - """ - - client = QueueClient() - client.connect() - client.ack("test", "subscription") - mock_stomp_ack.assert_called_once_with('test', "subscription") - - def test_create_connection_bad_development(self): - """ - Test: Exception raised - When: production host used in non production environment - """ - - client = QueueClient() - real_host = client.activemq_host - client.activemq_host = "production.domain.com" - self.assertRaisesRegex(RuntimeError, "non-development", client._create_connection) - client.activemq_host = real_host - - def test_create_connection_bad_production(self): - """ - Test: Exception raised - When: Local host used in production environment - """ - - client = QueueClient() - real_host = client.activemq_host - - os.environ["AUTOREDUCTION_PRODUCTION"] = "1" - client.activemq_host = "127.0.0.1" - self.assertRaisesRegex(RuntimeError, ".*production environment.*", client._create_connection) - - client.activemq_host = "somethingdev" - self.assertRaisesRegex(RuntimeError, ".*production environment.*", client._create_connection) - - client.activemq_host = "activemq" - self.assertRaisesRegex(RuntimeError, ".*production environment.*", client._create_connection) - - client.activemq_host = real_host - del os.environ["AUTOREDUCTION_PRODUCTION"] - - def test_test_connection_not_connected(self): - """ - Test: Exception raised - When: test_connection called when not connected - """ - - client = QueueClient() - mock_connection = MagicMock() - mock_connection.is_connected.return_value = False - client._connection = mock_connection - with self.assertRaises(ConnectionException): - client._test_connection() - - def test_test_connection_connected(self): - """ - Test: test_connection returns True - When: Connected - """ - - client = QueueClient() - mock_connection = MagicMock() - mock_connection.is_connected.return_value = True - client._connection = mock_connection - self.assertTrue(client._test_connection()) - - def test_subscribe(self): - """ - Test: correct calls made - When: subscribe is called - """ - - client = QueueClient() - mock_connection = MagicMock() - mock_listener = MagicMock() - client._connection = mock_connection - client.subscribe(mock_listener) - - mock_connection.set_listener.assert_called_with("queue_processor", mock_listener) - mock_connection.subscribe.assert_called_with(destination="/queue/DataReady", - id=socket.getfqdn(), - ack="client-individual", - header={"activemq.prefetchSize": "1"}) diff --git a/autoreduce_utils/credentials.py b/autoreduce_utils/credentials.py index 65f26e2..a23c7b2 100644 --- a/autoreduce_utils/credentials.py +++ b/autoreduce_utils/credentials.py @@ -27,12 +27,6 @@ port=os.getenv('DATABASE_PORT'), database_name=os.getenv('DATABASE_NAME')) -ACTIVEMQ_CREDENTIALS = SETTINGS_FACTORY.create('queue', - username=os.getenv('ACTIVEMQ_USERNAME'), - password=os.getenv('ACTIVEMQ_PASSWORD'), - host=os.getenv('ACTIVEMQ_HOST'), - port=os.getenv('ACTIVEMQ_PORT')) - CYCLE_SETTINGS = SETTINGS_FACTORY.create('cycle', username=os.getenv('CYCLE_USER'), password=os.getenv('CYCLE_PASSWORD'), From 7a74e2835d255318a9eee73f3c96bd4991506232 Mon Sep 17 00:00:00 2001 From: James Brookes Date: Tue, 15 Feb 2022 16:49:03 +0000 Subject: [PATCH 2/2] Remove unused Stomp --- .../clients/settings/tests/test_client_settings_factory.py | 2 +- autoreduce_utils/settings.py | 1 - setup.py | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py b/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py index 4b5286d..d7ae48d 100644 --- a/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py +++ b/autoreduce_utils/clients/settings/tests/test_client_settings_factory.py @@ -10,7 +10,7 @@ import unittest from autoreduce_utils.clients.settings.client_settings_factory import (ClientSettingsFactory, MySQLSettings, - ICATSettings, ActiveMQSettings) + ICATSettings) # pylint:disable=missing-docstring diff --git a/autoreduce_utils/settings.py b/autoreduce_utils/settings.py index dff51e1..46d277a 100644 --- a/autoreduce_utils/settings.py +++ b/autoreduce_utils/settings.py @@ -31,7 +31,6 @@ level=LOG_LEVEL, handlers=[rotating_file_handler, stream_handler]) -logging.getLogger('stomp.py').setLevel("ERROR") ##################################################################################################### PROJECT_DEV_ROOT = os.path.join(AUTOREDUCE_HOME_ROOT, "dev") diff --git a/setup.py b/setup.py index 053418f..443a8b7 100644 --- a/setup.py +++ b/setup.py @@ -19,8 +19,7 @@ author='ISIS Autoreduction Team', url='https://github.com/autoreduction/autoreduce-utils/', install_requires=[ - 'pydantic==1.9.0', 'gitpython<=3.1.26', 'python-icat==0.20.1', 'suds-py3==1.4.5.0', 'stomp.py==8.0.0', - 'confluent-kafka==1.8.2' + 'pydantic==1.9.0', 'gitpython<=3.1.26', 'python-icat==0.20.1', 'suds-py3==1.4.5.0', 'confluent-kafka==1.8.2' ], packages=find_packages(), package_data={},