Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def __init__(self, organization_id, config_client, created_at):
super().__init__(organization_id, config_client, created_at)
self.option_ordered_map = OrderedDict({
'days_threshold': {'default': DEFAULT_DAYS_THRESHOLD},
'skip_cloud_accounts': {'default': []}
'skip_cloud_accounts': {'default': []},
'excluded_pools': {
'default': {},
'clean_func': self.clean_excluded_pools,
},
})

def get_snapshot_info_map(self, account_id_type_map, snapshot_ids,
Expand Down Expand Up @@ -81,15 +85,20 @@ def get_snapshot_info_aws(self, bulk_ids, account_ids,
snapshots = list(self.mongo_client.restapi.resources.find({
'cloud_account_id': {'$in': account_ids},
'cloud_resource_id': {'$in': bulk_ids},
}, ['cloud_resource_id']))
}, ['cloud_resource_id', 'pool_id']))
snapshot_pool_map = {}
for snapshot in snapshots:
snapshot_pool_map[
snapshot['cloud_resource_id']] = snapshot.pop('pool_id')
snapshot_expenses = self._get_snapshot_expenses(
snapshots, last_week_time)
snapshot_info_map = {}
for resource_id, cloud_resource_id, cost in snapshot_expenses:
snapshot_info_map[cloud_resource_id] = {
'cloud_resource_id': cloud_resource_id,
'resource_id': resource_id,
'cost': cost
'cost': cost,
'pool_id': snapshot_pool_map[cloud_resource_id]
}
return snapshot_info_map

Expand All @@ -113,7 +122,8 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids,
'$project': {
'_id': '$_id',
'cloud_resource_id': '$cloud_resource_id',
'snapshots': '$meta.snapshots'
'snapshots': '$meta.snapshots',
'pool_id': '$pool_id',
}
},
{
Expand All @@ -129,6 +139,7 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids,
snap_chains = self.mongo_client.restapi.resources.aggregate(
snapshot_chain_resources_pipeline)
sc_snapshot_map = {}
sc_pool_map = {}
external_table = []
for sc in snap_chains:
external_table.append({
Expand All @@ -137,14 +148,16 @@ def get_snapshot_info_alibaba(self, bulk_ids, account_ids,
})
sc_snapshot_map[sc['cloud_resource_id']] = sc['snapshots'][
'cloud_resource_id']
sc_pool_map[sc['cloud_resource_id']] = sc['pool_id']

snapshot_expenses = self._get_snapshot_expenses(
external_table, last_week_time)
for resource_id, cloud_resource_id, cost in snapshot_expenses:
snapshot_info_map[sc_snapshot_map[cloud_resource_id]] = {
'resource_id': resource_id,
'cloud_resource_id': cloud_resource_id,
'cost': cost
'cost': cost,
'pool_id': sc_pool_map[cloud_resource_id]
}
return snapshot_info_map

Expand Down Expand Up @@ -179,7 +192,8 @@ def process_images(cloud_account, generator):
return images_map

def _get(self):
(days_threshold, skip_cloud_accounts) = self.get_options_values()
(days_threshold, skip_cloud_accounts,
excluded_pools) = self.get_options_values()
cloud_account_map = self.get_cloud_accounts(
SUPPORTED_CLOUD_TYPES, skip_cloud_accounts)
cloud_accounts = list(cloud_account_map.values())
Expand Down Expand Up @@ -244,6 +258,8 @@ def _get(self):
snapshot_cost = snapshot_info.get('cost', 0) * days_in_month
candidate['saving'] = snapshot_cost
candidate['resource_id'] = snapshot_info['resource_id']
candidate['is_excluded'] = snapshot_info.get(
'pool_id') in excluded_pools
return [r for r in list(result.values()) if r['saving'] != 0]


Expand Down
41 changes: 36 additions & 5 deletions bumiworker/bumiworker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ def _execute(self):

def can_continue(self, ex):
return not isinstance(
ex, BumiTaskTimeoutError
) and not isinstance(
ex, BumiTaskWaitError
ex, (BumiTaskTimeoutError, BumiTaskWaitError)
) and self.body['tries_count'] < self.body['max_retries']

@property
Expand Down Expand Up @@ -276,6 +274,14 @@ def module_type(self):
def update_task_state(self):
raise NotImplementedError

def list_modules(self, module_type):
modules = list_modules(module_type)
disabled = set(self.config_cl.disabled_recommendations() or [])
filtered = [m for m in modules if m not in disabled]
skipped = [m for m in modules if m in disabled]
LOG.info("[disabled modules] %s::%s", module_type, skipped)
return filtered

def _get_child_task(self, module):
return {
'last_update': utcnow_timestamp(),
Expand Down Expand Up @@ -307,7 +313,8 @@ def update_task_state(self):
self.body['state'] = TaskState.INITIALIZED_CHECKLIST

def _execute(self):
modules = list_modules(self.module_type)
modules = self.list_modules(self.module_type)

if not modules:
LOG.warning('No %s modules found', self.module_type)
# Nothing to do, update checklist & complete
Expand Down Expand Up @@ -455,7 +462,8 @@ def update_task_state(self):
self.body['state'] = TaskState.INITIALIZED_SERVICE

def _execute(self):
modules = list_modules(self.module_type)
modules = self.list_modules(self.module_type)

self.create_children_tasks(modules)
self.body['children_count'] = len(modules)
self.update_task_state()
Expand Down Expand Up @@ -520,6 +528,29 @@ def check_timeout(self):

class Process(HandleTaskTimeout):
def _execute(self):

# skip already-queued tasks for disabled modules
disabled = set(self.config_cl.disabled_recommendations() or [])
if self.body.get('module') in disabled:
LOG.info(
"Skipping disabled module %s (%s) for org %s (created_at %s)",
self.body.get('module'),
self.body.get('module_type'),
self.body.get('organization_id'),
self.body.get('created_at'),
)
module_data = {
'data': [],
'options': {},
'error': None,
'skipped': True,
'skip_reason': 'disabled_by_config',
}
self.save_result_to_file(module_data)
self.body['state'] = TaskState.PROCESSED
super()._execute()
return

try:
data, options, error = call_module(
self.body['module'], self.body['module_type'],
Expand Down
Loading