diff --git a/bumiworker/bumiworker/modules/recommendations/snapshots_with_non_used_images.py b/bumiworker/bumiworker/modules/recommendations/snapshots_with_non_used_images.py index 6aacb1ad7..276fafd03 100644 --- a/bumiworker/bumiworker/modules/recommendations/snapshots_with_non_used_images.py +++ b/bumiworker/bumiworker/modules/recommendations/snapshots_with_non_used_images.py @@ -23,7 +23,11 @@ def __init__(self, organization_id, config_client, created_at): super().__init__(organization_id, config_client, created_at) self.option_ordered_map = OrderedDict({ 'days_threshold': {'default': DEFAULT_DAYS_THRESHOLD}, - 'skip_cloud_accounts': {'default': []} + 'skip_cloud_accounts': {'default': []}, + 'excluded_pools': { + 'default': {}, + 'clean_func': self.clean_excluded_pools, + }, }) def get_snapshot_info_map(self, account_id_type_map, snapshot_ids, @@ -81,7 +85,11 @@ def get_snapshot_info_aws(self, bulk_ids, account_ids, snapshots = list(self.mongo_client.restapi.resources.find({ 'cloud_account_id': {'$in': account_ids}, 'cloud_resource_id': {'$in': bulk_ids}, - }, ['cloud_resource_id'])) + }, ['cloud_resource_id', 'pool_id'])) + snapshot_pool_map = {} + for snapshot in snapshots: + snapshot_pool_map[ + snapshot['cloud_resource_id']] = snapshot.pop('pool_id') snapshot_expenses = self._get_snapshot_expenses( snapshots, last_week_time) snapshot_info_map = {} @@ -89,7 +97,8 @@ def get_snapshot_info_aws(self, bulk_ids, account_ids, snapshot_info_map[cloud_resource_id] = { 'cloud_resource_id': cloud_resource_id, 'resource_id': resource_id, - 'cost': cost + 'cost': cost, + 'pool_id': snapshot_pool_map[cloud_resource_id] } return snapshot_info_map @@ -113,7 +122,8 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids, '$project': { '_id': '$_id', 'cloud_resource_id': '$cloud_resource_id', - 'snapshots': '$meta.snapshots' + 'snapshots': '$meta.snapshots', + 'pool_id': '$pool_id', } }, { @@ -129,6 +139,7 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids, snap_chains = self.mongo_client.restapi.resources.aggregate( snapshot_chain_resources_pipeline) sc_snapshot_map = {} + sc_pool_map = {} external_table = [] for sc in snap_chains: external_table.append({ @@ -137,6 +148,7 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids, }) sc_snapshot_map[sc['cloud_resource_id']] = sc['snapshots'][ 'cloud_resource_id'] + sc_pool_map[sc['cloud_resource_id']] = sc['pool_id'] snapshot_expenses = self._get_snapshot_expenses( external_table, last_week_time) @@ -144,7 +156,8 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids, snapshot_info_map[sc_snapshot_map[cloud_resource_id]] = { 'resource_id': resource_id, 'cloud_resource_id': cloud_resource_id, - 'cost': cost + 'cost': cost, + 'pool_id': sc_pool_map[cloud_resource_id] } return snapshot_info_map @@ -179,7 +192,8 @@ def process_images(cloud_account, generator): return images_map def _get(self): - (days_threshold, skip_cloud_accounts) = self.get_options_values() + (days_threshold, skip_cloud_accounts, + excluded_pools) = self.get_options_values() cloud_account_map = self.get_cloud_accounts( SUPPORTED_CLOUD_TYPES, skip_cloud_accounts) cloud_accounts = list(cloud_account_map.values()) @@ -244,6 +258,8 @@ def _get(self): snapshot_cost = snapshot_info.get('cost', 0) * days_in_month candidate['saving'] = snapshot_cost candidate['resource_id'] = snapshot_info['resource_id'] + candidate['is_excluded'] = snapshot_info.get( + 'pool_id') in excluded_pools return [r for r in list(result.values()) if r['saving'] != 0] diff --git a/bumiworker/bumiworker/tasks.py b/bumiworker/bumiworker/tasks.py index e0aefc088..9ebc19921 100644 --- a/bumiworker/bumiworker/tasks.py +++ b/bumiworker/bumiworker/tasks.py @@ -89,9 +89,7 @@ def _execute(self): def can_continue(self, ex): return not isinstance( - ex, BumiTaskTimeoutError - ) and not isinstance( - ex, BumiTaskWaitError + ex, (BumiTaskTimeoutError, BumiTaskWaitError) ) and self.body['tries_count'] < self.body['max_retries'] @property @@ -276,6 +274,14 @@ def module_type(self): def update_task_state(self): raise NotImplementedError + def list_modules(self, module_type): + modules = list_modules(module_type) + disabled = set(self.config_cl.disabled_recommendations() or []) + filtered = [m for m in modules if m not in disabled] + skipped = [m for m in modules if m in disabled] + LOG.info("[disabled modules] %s::%s", module_type, skipped) + return filtered + def _get_child_task(self, module): return { 'last_update': utcnow_timestamp(), @@ -307,7 +313,8 @@ def update_task_state(self): self.body['state'] = TaskState.INITIALIZED_CHECKLIST def _execute(self): - modules = list_modules(self.module_type) + modules = self.list_modules(self.module_type) + if not modules: LOG.warning('No %s modules found', self.module_type) # Nothing to do, update checklist & complete @@ -455,7 +462,8 @@ def update_task_state(self): self.body['state'] = TaskState.INITIALIZED_SERVICE def _execute(self): - modules = list_modules(self.module_type) + modules = self.list_modules(self.module_type) + self.create_children_tasks(modules) self.body['children_count'] = len(modules) self.update_task_state() @@ -520,6 +528,29 @@ def check_timeout(self): class Process(HandleTaskTimeout): def _execute(self): + + # skip already-queued tasks for disabled modules + disabled = set(self.config_cl.disabled_recommendations() or []) + if self.body.get('module') in disabled: + LOG.info( + "Skipping disabled module %s (%s) for org %s (created_at %s)", + self.body.get('module'), + self.body.get('module_type'), + self.body.get('organization_id'), + self.body.get('created_at'), + ) + module_data = { + 'data': [], + 'options': {}, + 'error': None, + 'skipped': True, + 'skip_reason': 'disabled_by_config', + } + self.save_result_to_file(module_data) + self.body['state'] = TaskState.PROCESSED + super()._execute() + return + try: data, options, error = call_module( self.body['module'], self.body['module_type'], diff --git a/docker_images/configurator/configurator.py b/docker_images/configurator/configurator.py index fc5fc8806..4e4dcf6db 100644 --- a/docker_images/configurator/configurator.py +++ b/docker_images/configurator/configurator.py @@ -1,6 +1,6 @@ +import argparse import logging import os -import sys import time from retrying import retry @@ -15,121 +15,141 @@ from influxdb import InfluxDBClient from optscale_client.config_client.client import Client as EtcdClient -LOG = logging.getLogger(__name__) -ETCD_KEYS_TO_DELETE = ['/logstash_host', '/optscale_meter_enabled'] +ETCD_KEYS_TO_DELETE = ["/logstash_host", "/optscale_meter_enabled"] RETRY_ARGS = dict(stop_max_attempt_number=300, wait_fixed=500) RABBIT_PRECONDIFITON_FAILED_CODE = 406 CH_HTTP_PORT = 8123 CH_LOCAL_NAME = "clickhouse" +logger = logging.getLogger(__name__) class Configurator(object): - def __init__(self, config_path='config.yml', host='etcd', port=2379): - self.config = yaml.safe_load(open(config_path, 'r')) + def __init__(self, config_path: str, host="etcd", port=2379): + self.config = yaml.safe_load(open(config_path, "r")) self.etcd_cl = EtcdClient(host=host, port=port) - config = self.config['etcd'] - - conn_str = 'mysql+mysqlconnector://{user}:{password}@{host}:{port}' - self.engine = create_engine(conn_str.format( - user=config['restdb']['user'], - password=config['restdb']['password'], - host=config['restdb']['host'], - port=config['restdb']['port']) + config = self.config["etcd"] + + conn_str = "mysql+mysqlconnector://{user}:{password}@{host}:{port}" + self.engine = create_engine( + conn_str.format( + user=config["restdb"]["user"], + password=config["restdb"]["password"], + host=config["restdb"]["host"], + port=config["restdb"]["port"], + ) ) if "url" in config["mongo"]: mongo_url = config["mongo"]["url"] else: mongo_url = "mongodb://%s:%s@%s:%s" % ( - config['mongo']['user'], config['mongo']['pass'], - config['mongo']['host'], config['mongo']['port'] + config["mongo"]["user"], + config["mongo"]["pass"], + config["mongo"]["host"], + config["mongo"]["port"], ) self.mongo_client = MongoClient(mongo_url) - rabbit_config = config['rabbit'] - credentials = pika.PlainCredentials(rabbit_config['user'], - rabbit_config['pass']) + rabbit_config = config["rabbit"] + credentials = pika.PlainCredentials( + rabbit_config["user"], rabbit_config["pass"] + ) rabbit_connection_parameters = pika.ConnectionParameters( - host=rabbit_config['host'], - port=int(rabbit_config['port']), + host=rabbit_config["host"], + port=int(rabbit_config["port"]), credentials=credentials, connection_attempts=100, - retry_delay=2 + retry_delay=2, ) - self.rabbit_client = pika.BlockingConnection( - rabbit_connection_parameters) + self.rabbit_client = pika.BlockingConnection(rabbit_connection_parameters) self.influx_client = InfluxDBClient( - config['influxdb']['host'], - config['influxdb']['port'], - config['influxdb']['user'], - config['influxdb']['pass'], - config['influxdb']['database'], + config["influxdb"]["host"], + config["influxdb"]["port"], + config["influxdb"]["user"], + config["influxdb"]["pass"], + config["influxdb"]["database"], ) - s3_params = config['minio'] + s3_params = config["minio"] self.s3_client = boto3.client( - 's3', - endpoint_url='http://{}:{}'.format( - s3_params['host'], s3_params['port']), - aws_access_key_id=s3_params['access'], - aws_secret_access_key=s3_params['secret'], - config=BotoConfig(s3={'addressing_style': 'path'}) + "s3", + endpoint_url="http://{}:{}".format(s3_params["host"], s3_params["port"]), + aws_access_key_id=s3_params["access"], + aws_secret_access_key=s3_params["secret"], + config=BotoConfig(s3={"addressing_style": "path"}), ) @retry(**RETRY_ARGS, retry_on_exception=lambda x: True) def configure_influx(self): - self.influx_client.create_database( - self.config['etcd']['influxdb']['database']) + self.influx_client.create_database(self.config["etcd"]["influxdb"]["database"]) def stitch_ch_to_http(self): try: - ch_host = self.etcd_cl.get('/clickhouse/host').value - ch_port = self.etcd_cl.get('/clickhouse/port').value + ch_host = self.etcd_cl.get("/clickhouse/host").value + ch_port = self.etcd_cl.get("/clickhouse/port").value # switch to http port only for local host - LOG.info("Ch host: %s", ch_host) - LOG.info("Ch port: %s", ch_port) + logger.info("Ch host: %s", ch_host) + logger.info("Ch port: %s", ch_port) if ch_host == CH_LOCAL_NAME and str(ch_port) != str(CH_HTTP_PORT): - LOG.info("Updating clickhouse port to %s", CH_HTTP_PORT) - self.etcd_cl.write( - "/clickhouse/port", - CH_HTTP_PORT - ) + logger.info("Updating clickhouse port to %s", CH_HTTP_PORT) + self.etcd_cl.write("/clickhouse/port", CH_HTTP_PORT) except etcd.EtcdKeyNotFound: - LOG.info("Skipping update ch port due to missing key") + logger.info("Skipping update ch port due to missing key") def commit_config(self): - LOG.info("Creating /configured key") - self.etcd_cl.write('/configured', time.time()) + logger.info("Creating /configured key") + self.etcd_cl.write("/configured", time.time()) def pre_configure(self): - LOG.info("Creating databases") + logger.info("Creating databases") self.create_databases() + logger.debug("Databases created.") + logger.debug("Creating InfluxDB databases.") self.configure_influx() + logger.debug("InfluxDB databases created.") + logger.debug("Configuring ClickHouse databases") self.stitch_ch_to_http() + logger.debug("ClickHouse databases created.") + logger.debug("Creating Thanos.") self.configure_thanos() + logger.debug("Thanos created.") # setting to 0 to block updates until update is finished # and new images pushed into registry - self.etcd_cl.write('/registry_ready', 0) - - config = self.config.get('etcd') - if self.config.get('skip_config_update', False): - LOG.info('Only making structure updates') - self.etcd_cl.update_structure('/', config) + logger.debug("Writing etc /registry_ready.") + self.etcd_cl.write("/registry_ready", 0) + logger.debug("etc /registry_ready wrote.") + config = self.config.get("etcd") + if self.config.get("skip_config_update", False): + logger.info("Only making structure updates.") + logger.debug("Updating etcd structure.") + self.etcd_cl.update_structure("/", config) + logger.debug("etcd structure updated.") + logger.debug("Committing conf.") self.commit_config() return - LOG.info("Writing default etcd keys") + logger.info("Writing default etcd keys") for key in ETCD_KEYS_TO_DELETE: try: + logger.debug("Deleting key %s from etc", key) self.etcd_cl.delete(key) except etcd.EtcdKeyNotFound: pass - self.etcd_cl.write_branch('/', config, overwrite_lists=True) - LOG.info("Configuring database server") + self.etcd_cl.write_branch("/", config, overwrite_lists=True) + logger.info("Configuring database server") self.configure_databases() + logger.debug("Databases configured.") + logger.debug("Configuring auth salt") self.configure_auth_salt() + logger.debug("Auth salt configured.") + logger.debug("Configuring mongo.") self.configure_mongo() + logger.debug("Mongo configured.") + logger.debug("Configuring RabbitMQ.") self.configure_rabbit() + logger.debug("RabbitMQ configured.") + logger.debug("Committing conf.") self.commit_config() + logger.debug("Configuration completed.") def _create_auth_salt_key(self): salt = "" @@ -148,10 +168,8 @@ def configure_auth_salt(self): self._create_auth_salt_key() def _declare_events_queue(self, channel): - LOG.info('declaring queue') - channel.queue_declare( - self.config['etcd']['events_queue'], durable=True - ) + logger.info("declaring queue") + channel.queue_declare(self.config["etcd"]["events_queue"], durable=True) def configure_rabbit(self): channel = self.rabbit_client.channel() @@ -159,10 +177,9 @@ def configure_rabbit(self): self._declare_events_queue(channel) except pika.exceptions.ChannelClosed as e: if e.args and e.args[0] == RABBIT_PRECONDIFITON_FAILED_CODE: - LOG.info( - 'failed to declare queue - %s. Deleting existing queue', e) + logger.info("failed to declare queue - %s. Deleting existing queue", e) channel = self.rabbit_client.channel() - channel.queue_delete(self.config['etcd']['events_queue']) + channel.queue_delete(self.config["etcd"]["events_queue"]) self._declare_events_queue(channel) else: raise @@ -175,45 +192,90 @@ def configure_mongo(self): http://api.mongodb.com/python/current/tutorial.html#getting-a-database :return: """ - _ = self.mongo_client[self.config['etcd']['mongo']['database']] + _ = self.mongo_client[self.config["etcd"]["mongo"]["database"]] @retry(**RETRY_ARGS, retry_on_exception=lambda x: True) def configure_databases(self): # in case of foreman model changes recreate db - if self.config.get('drop_tasks_db'): + if self.config.get("drop_tasks_db"): self.engine.execute("DROP DATABASE IF EXISTS tasks") @retry(**RETRY_ARGS, retry_on_exception=lambda x: True) def create_databases(self): - for db in self.config.get('databases'): - # http://dev.mysql.com/doc/refman/5.6/en/innodb-row-format-dynamic.html NOQA - self.engine.execute( - "CREATE DATABASE IF NOT EXISTS `{0}` " - "DEFAULT CHARACTER SET `utf8mb4` " - "DEFAULT COLLATE `utf8mb4_unicode_ci`".format(db)) + for db in self.config.get("databases"): + # heat migrations fail with utf8mb4 + if db != "heat": + # http://dev.mysql.com/doc/refman/5.6/en/innodb-row-format-dynamic.html NOQA + self.engine.execute( + "CREATE DATABASE IF NOT EXISTS `{0}` " + "DEFAULT CHARACTER SET `utf8mb4` " + "DEFAULT COLLATE `utf8mb4_unicode_ci`".format(db) + ) + else: + self.engine.execute("CREATE DATABASE IF NOT EXISTS `{0}`".format(db)) @retry(**RETRY_ARGS, retry_on_exception=lambda x: True) def configure_thanos(self): - bucket_name = 'thanos' - prefix = 'data' + bucket_name = "thanos" + prefix = "data" try: self.s3_client.create_bucket(Bucket=bucket_name) - LOG.info('Created %s bucket in minio', bucket_name) - self.s3_client.put_object( - Bucket=bucket_name, Body='', Key='%s/' % prefix) - LOG.info('Created %s folder in %s bucket', prefix, bucket_name) + logger.info("Created %s bucket in minio", bucket_name) + self.s3_client.put_object(Bucket=bucket_name, Body="", Key="%s/" % prefix) + logger.info("Created %s folder in %s bucket", prefix, bucket_name) except self.s3_client.exceptions.BucketAlreadyOwnedByYou: - LOG.info('Skipping bucket %s creation. Bucket already exists', - bucket_name) + logger.info( + "Skipping bucket %s creation. Bucket already exists", bucket_name + ) if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - etcd_host = os.environ.get('HX_ETCD_HOST') - etcd_port = int(os.environ.get('HX_ETCD_PORT')) - if len(sys.argv) > 1: - conf = Configurator(sys.argv[1], host=etcd_host, port=etcd_port) - else: - conf = Configurator(host=etcd_host, port=etcd_port) - stage = os.environ.get('HX_CONFIG_STAGE') + etcd_host = os.environ.get("HX_ETCD_HOST") + etcd_port = int(os.environ.get("HX_ETCD_PORT")) + parser = argparse.ArgumentParser() + parser.add_argument( + "config_path", + nargs="?", + default="config.yml", + help="Path to the configuration file (default: config.yml)", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=[ + "DEBUG", + "INFO", + "WARNING", + "ERROR", + "debug", + "info", + "warning", + "error", + ], + help="Set logging level (default: INFO)", + ) + parser.add_argument( + "--log-format", + default="%(levelname)s:%(name)s:%(message)s", + help="Logging format string (default: %(levelname)s:%(name)s:%(message)s", + ) + + args = parser.parse_args() + numeric_level = getattr(logging, args.log_level.upper()) + logger.setLevel(numeric_level) + if not logger.hasHandlers(): + handler = logging.StreamHandler() + handler.setLevel(numeric_level) + formatter = logging.Formatter( + fmt=args.log_format, datefmt="%Y-%m-%d %H:%M:%S:%z" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + log_level = args.log_level.upper() + + conf = Configurator(config_path=args.config_path, host=etcd_host, port=etcd_port) + stage = os.environ.get("HX_CONFIG_STAGE") + logger.info( + f"Starting Configurator with config: {args.config_path} and loglevel: {log_level}" + ) conf.pre_configure() diff --git a/ngui/ui/src/containers/RecommendationsOverviewContainer/RecommendationsOverview.tsx b/ngui/ui/src/containers/RecommendationsOverviewContainer/RecommendationsOverview.tsx index d6fb70340..54f783237 100644 --- a/ngui/ui/src/containers/RecommendationsOverviewContainer/RecommendationsOverview.tsx +++ b/ngui/ui/src/containers/RecommendationsOverviewContainer/RecommendationsOverview.tsx @@ -93,6 +93,9 @@ const RecommendationsOverview = ({ .filter(serviceFilter(service)) .filter(searchFilter(search)) .filter(appliedDataSourcesFilter(selectedDataSourceTypes)) + // TODO : Remove obsolete_images recommendation but keep it in archived recommendations + // discuss solutions + .filter((rec) => rec.type !== "obsolete_images") .sort(sortRecommendation); return ( diff --git a/ngui/ui/src/containers/RecommendationsOverviewContainer/recommendations/SnapshotsWithNonUsedImages.tsx b/ngui/ui/src/containers/RecommendationsOverviewContainer/recommendations/SnapshotsWithNonUsedImages.tsx index 860a7b69f..a49d8e0b4 100644 --- a/ngui/ui/src/containers/RecommendationsOverviewContainer/recommendations/SnapshotsWithNonUsedImages.tsx +++ b/ngui/ui/src/containers/RecommendationsOverviewContainer/recommendations/SnapshotsWithNonUsedImages.tsx @@ -75,6 +75,8 @@ class SnapshotsWithNonUsedImages extends BaseRecommendation { dismissible = true; + withExclusions = true; + static resourceDescriptionMessageId = "snapshotWithNonUsedImagesRecommendation"; get previewItems() { diff --git a/ngui/ui/src/translations/en-US/app.json b/ngui/ui/src/translations/en-US/app.json index c2032b6a4..f65652f5d 100644 --- a/ngui/ui/src/translations/en-US/app.json +++ b/ngui/ui/src/translations/en-US/app.json @@ -2200,6 +2200,7 @@ "snapshotObsoleteLastUsedHelp": "Last time a snapshot was used", "snapshotWithNonUsedImagesRecommendation": "This snapshot is locked by an unused AMI. Consider AMI and corresponding snapshot cleanup.", "snapshots": "Snapshots", + "snapshotsWithNonUsedImages": "Snapshots with non-used images", "snapshotsWithNonUsedImagesDescription": "Some snapshots are locked by the AMIs which have not been used for instance creation for the last {daysThreshold} {daysThreshold, plural,\n =1 {day}\n other {days}\n}. Consider AMIs and corresponding snapshots cleanup.", "snapshotsWithNonUsedImagesLastUsedHelp": "Last time an image was seen used for instance creation", "snapshotsWithNonUsedImagesTitle": "Snapshots with non-used Images", diff --git a/optscale-deploy/optscale/templates/ngui.yaml b/optscale-deploy/optscale/templates/ngui.yaml index 7013d17d5..18c58319e 100644 --- a/optscale-deploy/optscale/templates/ngui.yaml +++ b/optscale-deploy/optscale/templates/ngui.yaml @@ -78,4 +78,6 @@ spec: value: {{ $config.env.finops_in_practice_portal_overview }} - name: VITE_BILLING_INTEGRATION value: {{ $config.env.billing_integration }} + - name: VITE_APP_THEME + value: {{ $config.env.app_theme }} {{ include "ready_probe" $config | indent 8 }} diff --git a/optscale-deploy/optscale/values.yaml b/optscale-deploy/optscale/values.yaml index 84f49faac..8b3bacb33 100644 --- a/optscale-deploy/optscale/values.yaml +++ b/optscale-deploy/optscale/values.yaml @@ -232,6 +232,7 @@ ngui: on_initialize_organization_setup_mode: finops_in_practice_portal_overview: billing_integration: + app_theme: minio: name: &minio_name minio diff --git a/optscale-deploy/overlay/user_template.yml b/optscale-deploy/overlay/user_template.yml index a0b548434..625a16c9d 100644 --- a/optscale-deploy/overlay/user_template.yml +++ b/optscale-deploy/overlay/user_template.yml @@ -110,6 +110,7 @@ ngui: # "enabled" - the billing integration is enabled # otherwise - the billing integration is disabled billing_integration: "" + app_theme: "" elk: env: htpasswd_user: userforelk diff --git a/optscale_client/config_client/client.py b/optscale_client/config_client/client.py index 66708afeb..3acad1587 100644 --- a/optscale_client/config_client/client.py +++ b/optscale_client/config_client/client.py @@ -1,6 +1,7 @@ import os import etcd +import json import logging import subprocess from retrying import retry @@ -603,3 +604,39 @@ def stripe_settings(self): 'webhook_secret': branch['webhook_secret'], 'enabled': True if branch['enabled'].lower() == 'true' else False } + + def disabled_recommendations(self): + """ + Get settings for disabled recommendations. + Accepts: + - "r1,r2,r3" + - "r1, r2, r3" + - multiline: "r1,r2\nr3" + - JSON list: ["r1","r2"] + Returns: List[str] + """ + try: + dr = self.get('/disabled_recommendations').value + except etcd.EtcdKeyNotFound: + return [] + + if dr is None: + return [] + + if isinstance(dr, (bytes, bytearray)): + dr = dr.decode("utf-8", errors="replace") + + dr = str(dr).strip() + if not dr: + return [] + + if dr.startswith("["): + try: + val = json.loads(dr) + if isinstance(val, list): + return [str(x).strip() for x in val if str(x).strip()] + except Exception: + pass + + dr = dr.replace("\n", ",").replace(";", ",") + return [x.strip() for x in dr.split(",") if x.strip()]