From ef91dcd09ce6a43c51591d4597a07cdf15801b67 Mon Sep 17 00:00:00 2001 From: m-dhellin Date: Tue, 26 Nov 2024 16:13:41 +0100 Subject: [PATCH 1/4] formating --- src/core/app/routes/task.py | 301 +++++++++++++++++++----------------- 1 file changed, 163 insertions(+), 138 deletions(-) diff --git a/src/core/app/routes/task.py b/src/core/app/routes/task.py index a88c468a..6172d1b3 100644 --- a/src/core/app/routes/task.py +++ b/src/core/app/routes/task.py @@ -1,19 +1,19 @@ -## Licensed to the Apache Software Foundation (ASF) under one -## or more contributor license agreements. See the NOTICE file -## distributed with this work for additional information -## regarding copyright ownership. The ASF licenses this file -## to you under the Apache License, Version 2.0 (the -## "License"); you may not use this file except in compliance -## with the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at ## -## http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 ## -## Unless required by applicable law or agreed to in writing, -## software distributed under the License is distributed on an -## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -## KIND, either express or implied. See the License for the -## specific language governing permissions and limitations -## under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. #!/usr/bin/env python import os @@ -44,154 +44,179 @@ class restorebackup_start(BaseModel): - virtual_machine_id: str - backup_name: str - storage: Optional[str] = None - mode: Optional[str] = None - class Config: - schema_extra = { - "example": { - "virtual_machine_id": "3414b922-a39f-11ec-b909-0242ac120002", - "backup_name": "vda_VMDiskName_01092842912", - "storage": "path", - "mode": "simple" - } - } + virtual_machine_id: str + backup_name: str + storage: Optional[str] = None + mode: Optional[str] = None + + class Config: + schema_extra = { + "example": { + "virtual_machine_id": "3414b922-a39f-11ec-b909-0242ac120002", + "backup_name": "vda_VMDiskName_01092842912", + "storage": "path", + "mode": "simple" + } + } + @celery.task(name='restore_task_jobs') def retrieve_restore_task_jobs(): - single_vm_payload = {"taskname": "VM_Restore_Disk"} - single_vm_response = requests.get('http://flower:5555/api/tasks', params=single_vm_payload) - single_vm_task = json.loads(single_vm_response.content.decode('ascii')) - for key in single_vm_task: - json_key = single_vm_task[key] - json_key["args"] = json.dumps(task_handler.parse_task_args(json_key["args"])) - - vm_retore_path_payload = {"taskname": "VM_Restore_To_Path"} - vm_retore_path_response = requests.get('http://flower:5555/api/tasks', params=vm_retore_path_payload) - vm_retore_path_task = json.loads(vm_retore_path_response.content.decode('ascii')) - for key in vm_retore_path_task: - json_key = vm_retore_path_task[key] - json_key["args"] = json.dumps(task_handler.parse_task_args(json_key["args"])) - - single_vm_task.update(vm_retore_path_task) - return single_vm_task + single_vm_payload = {"taskname": "VM_Restore_Disk"} + single_vm_response = requests.get( + 'http://flower:5555/api/tasks', params=single_vm_payload) + single_vm_task = json.loads(single_vm_response.content.decode('ascii')) + for key in single_vm_task: + json_key = single_vm_task[key] + json_key["args"] = json.dumps( + task_handler.parse_task_args(json_key["args"])) + + vm_retore_path_payload = {"taskname": "VM_Restore_To_Path"} + vm_retore_path_response = requests.get( + 'http://flower:5555/api/tasks', params=vm_retore_path_payload) + vm_retore_path_task = json.loads( + vm_retore_path_response.content.decode('ascii')) + for key in vm_retore_path_task: + json_key = vm_retore_path_task[key] + json_key["args"] = json.dumps( + task_handler.parse_task_args(json_key["args"])) + + single_vm_task.update(vm_retore_path_task) + return single_vm_task + @celery.task(name='backuptask_jobs') def retrieve_backup_task_jobs(): - single_vm_payload = {"taskname": " Single_VM_Backup"} - single_vm_response = requests.get('http://flower:5555/api/tasks', params=single_vm_payload) - pool_payload = {"taskname": "Pool_VM_Backup"} - pool_response = requests.get('http://flower:5555/api/tasks', params=pool_payload) - subtask_payload = {"taskname": "backup_subtask"} - subtask_response = requests.get('http://flower:5555/api/tasks', params=subtask_payload) - single_vm_task = json.loads(single_vm_response.content.decode('ascii')) - pool_vm_task = json.loads(pool_response.content.decode('ascii')) - subtask = json.loads(subtask_response.content.decode('ascii')) - aggregated_jobs_list = single_vm_task.copy() - aggregated_jobs_list.update(pool_vm_task) - aggregated_jobs_list.update(subtask) - for key in aggregated_jobs_list: - json_key = aggregated_jobs_list[key] - json_key["args"] = json.dumps(task_handler.parse_task_args(json_key["args"])) - return aggregated_jobs_list + single_vm_payload = {"taskname": " Single_VM_Backup"} + single_vm_response = requests.get( + 'http://flower:5555/api/tasks', params=single_vm_payload) + pool_payload = {"taskname": "Pool_VM_Backup"} + pool_response = requests.get( + 'http://flower:5555/api/tasks', params=pool_payload) + subtask_payload = {"taskname": "backup_subtask"} + subtask_response = requests.get( + 'http://flower:5555/api/tasks', params=subtask_payload) + single_vm_task = json.loads(single_vm_response.content.decode('ascii')) + pool_vm_task = json.loads(pool_response.content.decode('ascii')) + subtask = json.loads(subtask_response.content.decode('ascii')) + aggregated_jobs_list = single_vm_task.copy() + aggregated_jobs_list.update(pool_vm_task) + aggregated_jobs_list.update(subtask) + for key in aggregated_jobs_list: + json_key = aggregated_jobs_list[key] + json_key["args"] = json.dumps( + task_handler.parse_task_args(json_key["args"])) + return aggregated_jobs_list + def get_task_logs(task_id): - response = requests.get(f'http://flower:5555/api/task/info/{task_id}') - return response.content + response = requests.get(f'http://flower:5555/api/task/info/{task_id}') + return response.content + @app.get('/api/v1/status/{task_id}', status_code=200) def retrieve_task_status(task_id, identity: Json = Depends(auth.valid_token)): - try: - uuid_obj = UUID(task_id) - except ValueError: - raise HTTPException(status_code=404, detail='Given uuid is not valid') - task = celery.AsyncResult(task_id) - if task.state == 'PENDING': - response = { - 'state': task.state, - 'current': 0, - 'total': 1, - 'status': 'Pending...' - } - elif task.state == 'PROGRESS': - response = { - 'state': task.state, - 'status': 'In progress...' - } - elif task.state != 'FAILURE': - response = { - 'state': task.state, - 'info': task.info - } - if task.info and 'result' in task.info: - response['result'] = task.info['result'] - else: - response = { - 'state': task.state, - 'current': 1, - 'total': 1, - 'status': str(task.info) - } - if "not found" in str(task.info): - raise HTTPException(status_code=404, detail=response) - else: - raise HTTPException(status_code=500, detail=response) - return response + try: + uuid_obj = UUID(task_id) + except ValueError: + raise HTTPException(status_code=404, detail='Given uuid is not valid') + task = celery.AsyncResult(task_id) + if task.state == 'PENDING': + response = { + 'state': task.state, + 'current': 0, + 'total': 1, + 'status': 'Pending...' + } + elif task.state == 'PROGRESS': + response = { + 'state': task.state, + 'status': 'In progress...' + } + elif task.state != 'FAILURE': + response = { + 'state': task.state, + 'info': task.info + } + if task.info and 'result' in task.info: + response['result'] = task.info['result'] + else: + response = { + 'state': task.state, + 'current': 1, + 'total': 1, + 'status': str(task.info) + } + if "not found" in str(task.info): + raise HTTPException(status_code=404, detail=response) + else: + raise HTTPException(status_code=500, detail=response) + return response + @app.get('/api/v1/logs/{task_id}', status_code=200) def retrieve_task_logs(task_id, identity: Json = Depends(auth.valid_token)): - try: - uuid_obj = UUID(task_id) - except ValueError: - raise HTTPException(status_code=404, detail='Given uuid is not valid') - return get_task_logs(task_id) + try: + uuid_obj = UUID(task_id) + except ValueError: + raise HTTPException(status_code=404, detail='Given uuid is not valid') + return get_task_logs(task_id) + @app.post('/api/v1/tasks/singlebackup/{virtual_machine_id}', status_code=202) def start_vm_single_backup(virtual_machine_id, identity: Json = Depends(auth.valid_token)): - try: - uuid_obj = UUID(virtual_machine_id) - except ValueError: - raise HTTPException(status_code=404, detail='Given uuid is not valid') - if not virtual_machine_id: raise HTTPException(status_code=404, detail='Virtual machine not found') - res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), single_backup.single_vm_backup.s()).apply_async() - return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + try: + uuid_obj = UUID(virtual_machine_id) + except ValueError: + raise HTTPException(status_code=404, detail='Given uuid is not valid') + if not virtual_machine_id: + raise HTTPException( + status_code=404, detail='Virtual machine not found') + res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s( + ), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), single_backup.single_vm_backup.s()).apply_async() + return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + @app.post('/api/v1/tasks/restore/{virtual_machine_id}', status_code=202) def start_vm_restore(virtual_machine_id, item: restorebackup_start, identity: Json = Depends(auth.valid_token)): - try: - uuid_obj = UUID(virtual_machine_id) - except ValueError: - raise HTTPException(status_code=404, detail='Given uuid is not valid') - virtual_machine_id = item.virtual_machine_id - print("DEBUG ID VM : " + virtual_machine_id) - #print("virtual_machine_id: " + virtual_machine_id) - backup_name = item.backup_name - #print("backup_name: " + backup_name) - #storage = item.storage - #print("storage: " + storage) - #mode = item.mode - #print("mode: " + mode) - print(uuid_obj) - res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name, '', '')).apply_async() - #res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name)).apply_async() - return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + try: + uuid_obj = UUID(virtual_machine_id) + except ValueError: + raise HTTPException(status_code=404, detail='Given uuid is not valid') + virtual_machine_id = item.virtual_machine_id + print("DEBUG ID VM : " + virtual_machine_id) + # print("virtual_machine_id: " + virtual_machine_id) + backup_name = item.backup_name + # print("backup_name: " + backup_name) + # storage = item.storage + # print("storage: " + storage) + # mode = item.mode + # print("mode: " + mode) + print(uuid_obj) + res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s( + ), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name, '', '')).apply_async() + # res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name)).apply_async() + return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + @app.post('/api/v1/tasks/restorespecificpath', status_code=202) def start_vm_restore_specific_path(item: restorebackup_start, identity: Json = Depends(auth.valid_token)): - virtual_machine_id = item.virtual_machine_id - backup_name = item.backup_name - storage = item.storage - mode = item.mode - #res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name, storage, mode)).apply_async() - #res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name)).apply_async() - res = chain(restore.restore_to_path_task.s(virtual_machine_id, backup_name, storage, mode)).apply_async() - return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + virtual_machine_id = item.virtual_machine_id + backup_name = item.backup_name + storage = item.storage + mode = item.mode + # res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name, storage, mode)).apply_async() + # res = chain(host.retrieve_host.s(), virtual_machine.dmap.s(virtual_machine.parse_host.s()), virtual_machine.handle_results.s(), virtual_machine.filter_virtual_machine_list.s(virtual_machine_id), restore.restore_disk_vm.s(backup_name)).apply_async() + res = chain(restore.restore_to_path_task.s( + virtual_machine_id, backup_name, storage, mode)).apply_async() + return {'Location': app.url_path_for('retrieve_task_status', task_id=res.id)} + @app.get('/api/v1/tasks/backup', status_code=200) -def list_backup_tasks(identity: Json = Depends(auth.valid_token)): - return {'info': retrieve_backup_task_jobs()} +def list_backup_tasks(identity: Json = Depends(auth.valid_token)): + return {'info': retrieve_backup_task_jobs()} + @app.get('/api/v1/tasks/restore', status_code=200) -def list_restore_tasks(identity: Json = Depends(auth.valid_token)): - return {'info': retrieve_restore_task_jobs()} \ No newline at end of file +def list_restore_tasks(identity: Json = Depends(auth.valid_token)): + return {'info': retrieve_restore_task_jobs()} From cac459beb5d09d455e0613f0aac29278e4de4e85 Mon Sep 17 00:00:00 2001 From: m-dhellin Date: Tue, 26 Nov 2024 16:14:50 +0100 Subject: [PATCH 2/4] Printing task args from Redis --- src/core/app/routes/task.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/core/app/routes/task.py b/src/core/app/routes/task.py index 6172d1b3..be7b7d43 100644 --- a/src/core/app/routes/task.py +++ b/src/core/app/routes/task.py @@ -24,6 +24,7 @@ from pydantic import BaseModel, Json import requests from requests.auth import HTTPBasicAuth +import redis import json from typing import Optional @@ -106,6 +107,21 @@ def retrieve_backup_task_jobs(): json_key = aggregated_jobs_list[key] json_key["args"] = json.dumps( task_handler.parse_task_args(json_key["args"])) + + redis_client = redis.Redis(host="redis", port=6379, db=0) + for key in redis_client.keys(): + key = key.decode() + if key.startswith("celery-task-meta"): + task = json.loads(redis_client.get(key).decode()) + try: + print(f"{task.args=}") + except: + pass + try: + print(f"{task["args"]=}") + except: + pass + return aggregated_jobs_list From a53b692426b17bca29b10b5708b7b118aaa3aed2 Mon Sep 17 00:00:00 2001 From: ous16 Date: Fri, 17 Jan 2025 10:40:05 +0100 Subject: [PATCH 3/4] Configuration des connecteurs --- src/core/app/__init__.py | 4 +++ src/core/app/borg/borg_core.py | 1 + src/core/app/routes/connectors.py | 34 +++++++++++-------- src/core/app/routes/task.py | 14 ++++---- .../configuration/policies/PolicyForm.vue | 1 + .../virtualmachines/VirtualmachineDetails.vue | 21 +++++++----- .../virtualmachines/Virtualmachines.vue | 3 ++ .../src/pages/admin/tasks/backup/Backup.vue | 3 ++ .../src/pages/admin/tasks/restore/Restore.vue | 3 ++ 9 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/core/app/__init__.py b/src/core/app/__init__.py index 77928e2c..f1afbc38 100644 --- a/src/core/app/__init__.py +++ b/src/core/app/__init__.py @@ -95,6 +95,10 @@ def _unpack_chord_result( 'default_timeout': 60 * 60 * 12 } } +celery.conf.resultrepr_maxsize = 4096 + + + celery.conf.update(settings) celery.conf.update( diff --git a/src/core/app/borg/borg_core.py b/src/core/app/borg/borg_core.py index 8f28d5ed..32440811 100644 --- a/src/core/app/borg/borg_core.py +++ b/src/core/app/borg/borg_core.py @@ -298,6 +298,7 @@ def borg_list_backup(virtual_machine, repository): result = '{"archives": [], "state": "unlocked"}' else: result = request.stdout.decode("utf-8") + print(result) return result except ValueError as err: print(err.args[0]) diff --git a/src/core/app/routes/connectors.py b/src/core/app/routes/connectors.py index 89e5a602..1cdd13a4 100644 --- a/src/core/app/routes/connectors.py +++ b/src/core/app/routes/connectors.py @@ -31,7 +31,7 @@ from app import auth from app import database -from app.database import Hosts +from app.database import Pools from app.database import Connectors class items_create_connector(BaseModel): @@ -59,10 +59,10 @@ def filter_connector_by_id(connector_id): with Session(engine) as session: statement = select(Connectors).where(Connectors.id == ensure_uuid(connector_id)) results = session.exec(statement) - storage = results.one() - if not storage: + connector = results.one() + if not connector: raise ValueError(f'Connector with id {connector_id} not found') - return storage + return connector except Exception as e: raise ValueError(e) @@ -137,20 +137,26 @@ def api_delete_connector(connector_id): raise ValueError(e) records = [] with Session(engine) as session: - statement = select(Hosts).where(Hosts.connector_id == ensure_uuid(connector_id)) + statement = select(Pools).where( + Pools.connector_id == ensure_uuid(connector_id)) results = session.exec(statement) - for host in results: - records.append(host) + for pool in results: + records.append(pool) if len(records) > 0: - raise ValueError('One or more hosts are linked to this connector') - try: - connector = filter_connector_by_id(connector_id) - with Session(engine) as session: + raise ValueError('One or more pools are linked to this connector') + try: + statement2 = select(Connectors).where( + Connectors.id == ensure_uuid(connector_id)) + results = session.exec(statement2) + connector = results.one() + if not connector: + raise ValueError( + f'Backup policy with id {connector_id} not found') session.delete(connector) session.commit() - return {'state': 'SUCCESS'} - except Exception as e: - raise ValueError(e) + return {'state': 'SUCCESS'} + except Exception as e: + raise ValueError(e) @app.post('/api/v1/connectors', status_code=201) def create_connector(item: items_create_connector, identity: Json = Depends(auth.valid_token)): diff --git a/src/core/app/routes/task.py b/src/core/app/routes/task.py index be7b7d43..f97decfd 100644 --- a/src/core/app/routes/task.py +++ b/src/core/app/routes/task.py @@ -114,11 +114,13 @@ def retrieve_backup_task_jobs(): if key.startswith("celery-task-meta"): task = json.loads(redis_client.get(key).decode()) try: - print(f"{task.args=}") + #print(f"{task.args=}") + pass except: pass try: - print(f"{task["args"]=}") + #print(f"{task["args"]=}") + pass except: pass @@ -163,10 +165,10 @@ def retrieve_task_status(task_id, identity: Json = Depends(auth.valid_token)): 'total': 1, 'status': str(task.info) } - if "not found" in str(task.info): - raise HTTPException(status_code=404, detail=response) - else: - raise HTTPException(status_code=500, detail=response) + # if "not found" in str(task.info): + # raise HTTPException(status_code=404, detail=response) + # else: + # raise HTTPException(status_code=500, detail=response) return response diff --git a/src/ui/src/pages/admin/configuration/policies/PolicyForm.vue b/src/ui/src/pages/admin/configuration/policies/PolicyForm.vue index 6ae9c1bb..9d020ebc 100644 --- a/src/ui/src/pages/admin/configuration/policies/PolicyForm.vue +++ b/src/ui/src/pages/admin/configuration/policies/PolicyForm.vue @@ -29,6 +29,7 @@ :options="dayOptions" v-model="selectedDays" multiple + :rules="[(value) => value?.length > 0 || 'Field is required']" >