From 20d52c5a80d8da82c4f8aeed9b745a96337817b9 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Tue, 3 Mar 2026 17:52:33 +0530 Subject: [PATCH 1/6] chore: update redshift functions --- .../flo_cloud/flo_cloud/aws/redshift.py | 136 +++++++++++++++--- .../datasource/redshift/__init__.py | 95 ++++++++++-- 2 files changed, 199 insertions(+), 32 deletions(-) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py b/wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py index 24c44ab3..dd26a5fd 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py @@ -1,4 +1,5 @@ import os +import string import logging from typing import List, Dict, Any, Optional, Tuple from contextlib import contextmanager @@ -157,39 +158,135 @@ def execute_query( logger.error(f'Query execution error: {e}') raise + def execute_query_as_dict( + self, query: str, params: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: + """ + Execute a raw SQL query and return results as dictionaries. + + Args: + query: SQL query to execute + params: Query parameters (optional) + + Returns: + List of dictionaries containing query results + """ + with self.get_cursor() as cursor: + try: + cursor.execute(query, params) + columns = [desc[0] for desc in cursor.description] + return [dict(zip(columns, row)) for row in cursor.fetchall()] + except RedshiftError as e: + logger.error(f'Query execution error: {e}') + raise + def execute_query_to_dict( self, projection: str = '*', - table_name: str = '', + table_prefix: str = '', + table_names: List[str] = [], where_clause: str = 'true', + join_query: Optional[str] = None, params: Optional[Dict[str, Any]] = None, - limit: int = 20, + limit: int = 10, offset: int = 0, order_by: Optional[str] = None, group_by: Optional[str] = None, ) -> List[Dict[str, Any]]: """ - Execute a SELECT query and return results as dictionaries. + Execute a SELECT query and return results as dictionaries, similar to the + BigQuery implementation. Args: projection: Projection of the query - table_name: Table name + table_prefix: Prefix for table names (e.g. 'db.schema.') + table_names: List of table names where_clause: Where clause of the query + join_query: Join query string (optional) params: Query parameters (optional) limit: Maximum number of rows to return offset: Number of rows to skip + Returns: List of dictionaries containing query results """ - query = f'SELECT {projection} FROM {table_name} WHERE {where_clause} {group_by} {order_by} LIMIT {limit} OFFSET {offset}' - with self.get_cursor() as cursor: - try: + if not table_names: + raise ValueError('At least one table name must be provided') + + base_table = f'{table_prefix}{table_names[0]}' + group_by_clause = f'GROUP BY {group_by}' if group_by else '' + order_by_clause = f'ORDER BY {order_by}' if order_by else '' + + if join_query: + query = self.__get_join_query( + join_query, + table_names, + table_prefix, + projection, + where_clause, + limit, + offset, + order_by, + group_by=group_by, + ) + else: + query = ( + f'SELECT {projection} FROM {base_table} AS a ' + f'WHERE {where_clause} {group_by_clause} {order_by_clause} ' + f'LIMIT {limit} OFFSET {offset}' + ) + + try: + logger.debug(f'Executing query: {query}') + with self.get_cursor() as cursor: cursor.execute(query, params) columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()] - except RedshiftError as e: - logger.error(f'Query execution error: {e}') - raise + except RedshiftError as e: + logger.error(f'Redshift query execution error: {e}') + raise + except Exception as e: + logger.error(f'Unexpected error executing Redshift query: {e}') + raise + + def __get_join_query( + self, + join_query: str, + table_names: List[str], + table_prefix: str, + projection: str, + where_clause: str, + limit: int, + offset: int, + order_by: Optional[str] = None, + group_by: Optional[str] = None, + ) -> str: + """ + Build a join query with table prefix and aliases, mirroring BigQuery's + __get_join_query. Returns a flat SELECT (no nested ARRAY_AGG/STRUCT). + """ + aliases = list(string.ascii_lowercase) + processed_join = join_query + processed_where = where_clause + for i, table_name in enumerate(table_names): + alias = aliases[i] + qualified = f'{table_prefix}{table_name}' + processed_join = processed_join.replace( + f'JOIN {table_name}', + f'LEFT JOIN {qualified} AS {alias}', + ) + processed_join = processed_join.replace(f'{table_name}.', f'{alias}.') + processed_where = processed_where.replace(f'{table_name}.', f'{alias}.') + + group_by_clause = f'GROUP BY {group_by}' if group_by else '' + order_by_clause = f'ORDER BY {order_by}' if order_by else '' + base_table = f'{table_prefix}{table_names[0]}' + return ( + f'SELECT {projection} FROM {base_table} AS {aliases[0]} ' + f'{processed_join} WHERE {processed_where} ' + f'{group_by_clause} {order_by_clause} ' + f'LIMIT {limit} OFFSET {offset}' + ) def execute_command( self, command: str, params: Optional[Dict[str, Any]] = None @@ -398,7 +495,7 @@ def get_table_info(self, table_name: str) -> Dict[str, Any]: ORDER BY c.ordinal_position """ - columns = self.execute_query_to_dict(query, {'table_name': table_name}) + columns = self.execute_query_as_dict(query, {'table_name': table_name}) # Get table statistics stats_query = """ @@ -413,7 +510,7 @@ def get_table_info(self, table_name: str) -> Dict[str, Any]: WHERE tablename = :tablename """ - stats = self.execute_query_to_dict(stats_query, {'tablename': table_name}) + stats = self.execute_query_as_dict(stats_query, {'tablename': table_name}) return {'table_name': table_name, 'columns': columns, 'statistics': stats} @@ -460,7 +557,7 @@ def get_table_size(self, table_name: str, schema: str = 'public') -> Dict[str, A WHERE tablename = :tablename AND schemaname = :schemaname """ - results = self.execute_query_to_dict( + results = self.execute_query_as_dict( query, {'tablename': table_name, 'schemaname': schema} ) @@ -471,7 +568,7 @@ def get_table_size(self, table_name: str, schema: str = 'public') -> Dict[str, A pg_total_relation_size(:tablename) as size_bytes """ - size_results = self.execute_query_to_dict( + size_results = self.execute_query_as_dict( size_query, { 'tablename': f'{schema}.{table_name}', @@ -582,7 +679,7 @@ def get_active_queries(self) -> List[Dict[str, Any]]: ORDER BY starttime DESC """ - return self.execute_query_to_dict(query) + return self.execute_query_as_dict(query) def get_query_history(self, limit: int = 100) -> List[Dict[str, Any]]: """ @@ -608,7 +705,7 @@ def get_query_history(self, limit: int = 100) -> List[Dict[str, Any]]: LIMIT :limit """ - return self.execute_query_to_dict(query, {'limit': limit}) + return self.execute_query_as_dict(query, {'limit': limit}) def test_connection(self) -> bool: """ @@ -619,9 +716,12 @@ def test_connection(self) -> bool: """ try: result = self.execute_query('SELECT 1') - return len(result) > 0 and result[0][0] == 1 + success = len(result) > 0 and result[0][0] == 1 + if success: + logger.info('Redshift connection test successful') + return success except Exception as e: - logger.error(f'Connection test failed: {e}') + logger.error(f'Redshift connection test failed: {e}') return False def get_cluster_info(self) -> Dict[str, Any]: diff --git a/wavefront/server/plugins/datasource/datasource/redshift/__init__.py b/wavefront/server/plugins/datasource/datasource/redshift/__init__.py index 3b26ae26..7de460ee 100644 --- a/wavefront/server/plugins/datasource/datasource/redshift/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/redshift/__init__.py @@ -1,3 +1,4 @@ +import asyncio from typing import Any, Dict, List, Optional from ..types import DataSourceABC @@ -17,8 +18,8 @@ def __init__(self, config: RedshiftConfig): ) self.db_name = f'{config.database}.public' - def test_connection(self) -> bool: - return self.client.test_connection() + async def test_connection(self) -> bool: + return await asyncio.to_thread(self.client.test_connection) def get_schema(self) -> dict: return self.client.get_table_info() @@ -28,31 +29,39 @@ def get_table_names(self, **kwargs) -> list[str]: def fetch_data( self, - table_name: str, - projection: Optional[str] = None, - where_clause: Optional[str] = None, + table_names: List[str], + projection: str = '*', + where_clause: str = 'true', + join_query: Optional[str] = None, params: Optional[Dict[str, Any]] = None, - offset: Optional[int] = None, - limit: Optional[int] = None, + offset: int = 0, + limit: int = 10, order_by: Optional[str] = None, group_by: Optional[str] = None, ) -> List[Dict[str, Any]]: - result = self.client.execute_query_to_dict( + return self.client.execute_query_to_dict( projection=projection, - table_name=f'{self.db_name}.{table_name}', + table_prefix=f'{self.db_name}.', + table_names=table_names, where_clause=where_clause, + join_query=join_query, params=params, limit=limit, offset=offset, order_by=order_by, group_by=group_by, ) - return result def insert_rows_json(self, table_name: str, data): pass - def execute_dynamic_query( + async def execute_query( + self, query: str, use_legacy_sql: bool = False, dry_run: bool = False, **kwargs + ) -> Any: + params = kwargs.get('params') + return await asyncio.to_thread(self.client.execute_query_as_dict, query, params) + + async def execute_dynamic_query( self, query: List[Dict[str, Any]], odata_filter: Optional[str] = None, @@ -63,9 +72,67 @@ def execute_dynamic_query( limit: Optional[int] = 100, params: Optional[Dict[str, Any]] = None, ): - # TODO: Implement RLS filter support for Redshift - # For now, just execute the query without RLS filter - pass + results = {} + tasks = [] + + for query_obj in query: + query_to_execute = query_obj.get('query', '') + query_params = query_obj.get('parameters', {}) + query_id = query_obj.get('id') + if not query_id: + raise ValueError('Query ID is required') + + params_key = [p['name'] for p in query_params] + params_to_execute: Dict[str, Any] = {} + + if params is None: + params = {} + + for key in params_key: + if key not in params: + raise ValueError(f'Missing parameter: {key} for query {query_id}') + params_to_execute[key] = params[key] + + if odata_params: + params_to_execute.update(odata_params) + if odata_data_params: + params_to_execute.update(odata_data_params) + + query_to_execute = query_to_execute.replace( + '{{rls}}', f'{odata_data_filter}' if odata_data_filter else 'TRUE' + ) + query_to_execute = query_to_execute.replace( + '{{filters}}', f'{odata_filter}' if odata_filter else 'TRUE' + ) + query_to_execute += f' LIMIT {limit} OFFSET {offset}' + + task = asyncio.create_task( + asyncio.to_thread( + self.client.execute_query_as_dict, + query_to_execute, + params_to_execute, + ) + ) + tasks.append((query_id, task)) + + for query_id, task in tasks: + try: + formatted_result = await task + results[query_id] = { + 'status': 'success', + 'error': None, + 'description': f'Query {query_id} executed successfully', + 'result': formatted_result, + } + except Exception as e: + results[query_id] = { + 'status': 'error', + 'error': str(e), + 'description': f'Error executing query {query_id}', + 'result': [], + } + + return results __all__ = ['RedshiftPlugin', 'RedshiftConfig'] From 5dad7b2d1de3924c08bb21503315cbad2ffe6e24 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Wed, 4 Mar 2026 13:50:41 +0530 Subject: [PATCH 2/6] feat: add datasources in get config --- .../floware/controllers/config_controller.py | 23 ++++++++++++------- .../server/apps/floware/floware/server.py | 1 + 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/wavefront/server/apps/floware/floware/controllers/config_controller.py b/wavefront/server/apps/floware/floware/controllers/config_controller.py index 027e30a5..4cbd85de 100644 --- a/wavefront/server/apps/floware/floware/controllers/config_controller.py +++ b/wavefront/server/apps/floware/floware/controllers/config_controller.py @@ -8,6 +8,9 @@ from fastapi.params import Depends from dependency_injector.wiring import inject from dependency_injector.wiring import Provide +from db_repo_module.models.datasource import Datasource +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository +from plugins_module.plugins_container import PluginsContainer from floware.services.config_service import ConfigService from user_management_module.utils.user_utils import get_current_user, check_is_admin from fastapi import HTTPException @@ -74,6 +77,10 @@ async def get_config( ConfigService, Depends(Provide[ApplicationContainer.config_service]), ], + datasource_repository: Annotated[ + SQLAlchemyRepository[Datasource], + Depends(Provide[PluginsContainer.datasource_repository]), + ], response_formatter: Annotated[ ResponseFormatter, Depends(Provide[CommonContainer.response_formatter]), @@ -84,16 +91,16 @@ async def get_config( such as logo, table to query, etc. """ url, app_config = await config_service.get_app_config() - if not url: - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse( - {'message': 'No config found'} - ), - ) + datasources = await datasource_repository.find() + datasource_ids = [str(datasource.id) for datasource in datasources] + return JSONResponse( status_code=status.HTTP_200_OK, content=response_formatter.buildSuccessResponse( - {'app_icon': url, 'app_config': app_config} + { + 'app_icon': url, + 'app_config': app_config, + 'datasources': datasource_ids, + } ), ) diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index 9858acbf..bb1a29d7 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -499,6 +499,7 @@ async def global_exception_handler(request: Request, exc: Exception): packages=[ 'plugins_module.controllers', 'plugins_module.services', + 'floware.controllers', 'user_management_module.controllers', 'user_management_module.authorization', 'tools_module.datasources', From 2308517c84f1da86f51017872e2d2652d710b369 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Wed, 4 Mar 2026 16:51:18 +0530 Subject: [PATCH 3/6] feat: add get resource url api using cloud storage manager --- .../server/apps/floware/floware/server.py | 2 + .../plugins_module/controllers/__init__.py | 3 +- .../controllers/cloud_storage_controller.py | 47 +++++++++++++++++++ .../plugins_module/plugins_container.py | 8 ++-- .../services/dynamic_query_service.py | 15 +++--- .../services/message_processor_service.py | 12 ++--- 6 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index bb1a29d7..e72ac9fe 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -93,6 +93,7 @@ from plugins_module.controllers.message_processor_controller import ( message_processor_router, ) +from plugins_module.controllers.cloud_storage_controller import cloud_storage_router # API Services Module from api_services_module.api_services_container import create_api_services_container @@ -386,6 +387,7 @@ async def metrics(request: Request): app.include_router(voice_agent_router, prefix='/floware') app.include_router(tool_router, prefix='/floware') app.include_router(message_processor_router, prefix='/floware') +app.include_router(cloud_storage_router, prefix='/floware') @app.exception_handler(Exception) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py index dcaa2bad..58ce2507 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py @@ -1,4 +1,5 @@ from .datasource_controller import datasource_router from .authenticator_controller import authenticator_router +from .cloud_storage_controller import cloud_storage_router -__all__ = ['datasource_router', 'authenticator_router'] +__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router'] diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py new file mode 100644 index 00000000..a1bc2935 --- /dev/null +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py @@ -0,0 +1,47 @@ +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, Query, status +from fastapi.responses import JSONResponse +from fastapi.routing import APIRouter + +from common_module.common_container import CommonContainer +from common_module.response_formatter import ResponseFormatter +from flo_cloud.cloud_storage import CloudStorageManager +from plugins_module.plugins_container import PluginsContainer + + +cloud_storage_router = APIRouter() + + +@cloud_storage_router.get('/v1/storage/signed-url') +@inject +async def get_resource_presigned_url( + resource_url: str = Query(..., description='The cloud storage URL of the resource'), + expires_in: int = Query( + 300, description='Expiry time in seconds for the presigned URL' + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), + cloud_manager: CloudStorageManager = Depends( + Provide[PluginsContainer.cloud_manager] + ), +): + try: + bucket_name, key = cloud_manager.get_bucket_key(resource_url) + presigned_url = cloud_manager.generate_presigned_url( + bucket_name=bucket_name, + key=key, + type='GET', + expiresIn=expires_in, + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'resource_url': presigned_url} + ), + ) + except Exception as e: + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse(str(e)), + ) diff --git a/wavefront/server/modules/plugins_module/plugins_module/plugins_container.py b/wavefront/server/modules/plugins_module/plugins_module/plugins_container.py index e7e56f66..a9039b3c 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/plugins_container.py +++ b/wavefront/server/modules/plugins_module/plugins_module/plugins_container.py @@ -39,13 +39,13 @@ class PluginsContainer(containers.DeclarativeContainer): # dynamic query service cloud_provider = config.cloud_config.cloud_provider - cloud_manager = providers.Singleton( + cloud_storage_manager = providers.Singleton( CloudStorageManager, provider=config.cloud_config.cloud_provider ) dynamic_query_service = providers.Singleton( DynamicQueryService, - cloud_manager=cloud_manager, + cloud_storage_manager=cloud_storage_manager, dynamic_query_repo=dynamic_query_repository, bucket_name=config.aws.aws_asset_storage_bucket if cloud_provider == 'aws' @@ -54,10 +54,10 @@ class PluginsContainer(containers.DeclarativeContainer): message_processor_service = providers.Singleton( MessageProcessorService, - cloud_manager=cloud_manager, + cloud_storage_manager=cloud_storage_manager, message_processor_repository=message_processor_repository, + hermes_url=config.hermes.url, bucket_name=config.aws.aws_asset_storage_bucket if cloud_provider == 'aws' else config.gcp.gcp_asset_storage_bucket, - hermes_url=config.hermes.url, ) diff --git a/wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py b/wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py index 2dda5a78..28356d39 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py +++ b/wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py @@ -3,16 +3,17 @@ from common_module.log.logger import logger from flo_cloud.cloud_storage import CloudStorageManager from db_repo_module.models.dynamic_query_yaml import DynamicQueryYaml +from typing import Optional class DynamicQueryService: def __init__( self, - cloud_manager: CloudStorageManager, + cloud_storage_manager: CloudStorageManager, dynamic_query_repo: SQLAlchemyRepository[DynamicQueryYaml], - bucket_name: str = None, + bucket_name: Optional[str] = None, ): - self.cloud_manager = cloud_manager + self.cloud_storage_manager = cloud_storage_manager self.dynamic_query_repo = dynamic_query_repo self.bucket_name = bucket_name self.prefix = 'dynamic_query/v1' @@ -40,7 +41,7 @@ async def store_yaml_to_bucket(self, yaml_content: dict, datasource_id: str): file_content = yaml_string.encode('utf-8') # storing to s3bucket - self.cloud_manager.save_small_file( + self.cloud_storage_manager.save_small_file( file_content=file_content, bucket_name=self.bucket_name, key=file_key ) @@ -70,7 +71,7 @@ async def retrive_dynamic_query_yaml(self, page_number, page_size): Returns: dict: Contains yamls list, pagination info, and total count """ - files_keys, has_more = self.cloud_manager.list_files( + files_keys, has_more = self.cloud_storage_manager.list_files( self.bucket_name, self.prefix, page_size, page_number ) yamls = [] @@ -100,7 +101,7 @@ async def get_dynamic_yaml_query(self, query_id: str): dict: Contains yaml query and their parameters """ file_key = f'{self.prefix}/{query_id}.yaml' - file_content = self.cloud_manager.read_file(self.bucket_name, file_key) + file_content = self.cloud_storage_manager.read_file(self.bucket_name, file_key) yaml_query = yaml.safe_load(file_content.decode('utf-8')) if not yaml_query: raise ValueError('YAML file is invalid') @@ -141,7 +142,7 @@ async def delete_dynamic_query(self, datasource_id: str, query_id: str): raise ValueError(f'Query {query_id} not found') # deleting the file from the cloud storage - self.cloud_manager.delete_file(self.bucket_name, query.file_path) + self.cloud_storage_manager.delete_file(self.bucket_name, query.file_path) # deleting the record from the database await self.dynamic_query_repo.delete_all(name=query_id) diff --git a/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py b/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py index 78dc3ae9..44bb048d 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py +++ b/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py @@ -80,12 +80,12 @@ class MessageProcessorService: def __init__( self, - cloud_manager: CloudStorageManager, + cloud_storage_manager: CloudStorageManager, message_processor_repository: SQLAlchemyRepository[MessageProcessors], - bucket_name: str, hermes_url: str, + bucket_name: Optional[str] = None, ): - self.cloud_manager = cloud_manager + self.cloud_storage_manager = cloud_storage_manager self.message_processor_repository = message_processor_repository self.bucket_name = bucket_name self.prefix = 'message_processors/v1' @@ -126,7 +126,7 @@ async def save_message_processor_yaml( # Store YAML file in bucket yaml_bytes = yaml_content.encode('utf-8') - self.cloud_manager.save_small_file( + self.cloud_storage_manager.save_small_file( file_content=yaml_bytes, bucket_name=self.bucket_name, key=file_path ) logger.info(f'Stored YAML file at {self.bucket_name}/{file_path}') @@ -138,7 +138,7 @@ async def get_message_processor_yaml_content( self, processor: MessageProcessors ) -> str: filepath = f'{self.prefix}/{processor.source}' - yaml_bytes = self.cloud_manager.read_file(self.bucket_name, filepath) + yaml_bytes = self.cloud_storage_manager.read_file(self.bucket_name, filepath) return yaml_bytes.decode('utf-8') async def list_message_processors(self) -> List[MessageProcessors]: @@ -171,7 +171,7 @@ async def delete_message_processor(self, processor_id: str) -> bool: return False file_path = f'{self.prefix}/{processor.source}' - self.cloud_manager.delete_file(self.bucket_name, file_path) + self.cloud_storage_manager.delete_file(self.bucket_name, file_path) logger.info(f'Deleted YAML file at {self.bucket_name}/{file_path}') await self.message_processor_repository.delete_all(id=processor_id) From 65a1bbd97e8b714f22792bf5f51b5a9dea2e9443 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Wed, 4 Mar 2026 17:35:33 +0530 Subject: [PATCH 4/6] chore: rename all cloud_manager to cloud_storage_manager --- .../apps/floware/floware/di/application_container.py | 9 ++------- wavefront/server/apps/floware/floware/server.py | 5 +++-- .../apps/floware/floware/services/config_service.py | 8 ++++---- wavefront/server/apps/floware/tests/conftest.py | 6 +++--- .../background_jobs/workflow_job/workflow_job/main.py | 4 ++-- .../controllers/cloud_storage_controller.py | 8 ++++---- .../tools_module/message_processor/provider.py | 6 +++--- .../modules/tools_module/tools_module/tools_container.py | 4 ++-- 8 files changed, 23 insertions(+), 27 deletions(-) diff --git a/wavefront/server/apps/floware/floware/di/application_container.py b/wavefront/server/apps/floware/floware/di/application_container.py index e0b94952..571202ab 100644 --- a/wavefront/server/apps/floware/floware/di/application_container.py +++ b/wavefront/server/apps/floware/floware/di/application_container.py @@ -2,7 +2,6 @@ from dependency_injector import providers from floware.services.notification_service import NotificationService -from flo_cloud.cloud_storage import CloudStorageManager from floware.services.config_service import ConfigService @@ -22,20 +21,16 @@ class ApplicationContainer(containers.DeclarativeContainer): notification_repository = providers.Dependency() notification_user_repository = providers.Dependency() config_repository = providers.Dependency() + cloud_storage_manager = providers.Dependency() # services notification_service = providers.Singleton( NotificationService, notification_repository, notification_user_repository ) - cloud_manager = providers.Singleton( - CloudStorageManager, - provider=config.cloud_config.cloud_provider, - ) - config_service = providers.Singleton( ConfigService, config_repository=config_repository, - cloud_manager=cloud_manager, + cloud_storage_manager=cloud_storage_manager, config=config, ) diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index e72ac9fe..079209fe 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -119,6 +119,7 @@ application_container = ApplicationContainer( db_client=db_repo_container.db_client, + cloud_storage_manager=common_container.cloud_storage_manager, email_repository=db_repo_container.email_repository, oauth_credential_repository=db_repo_container.oauth_credential_repository, user_repository=db_repo_container.user_repository, @@ -138,7 +139,7 @@ plugins_container = PluginsContainer( db_client=db_repo_container.db_client, - cloud_manager=common_container.cloud_storage_manager, + cloud_storage_manager=common_container.cloud_storage_manager, dynamic_query_repository=db_repo_container.dynamic_query_repository, cache_manager=db_repo_container.cache_manager, ) @@ -167,7 +168,7 @@ knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, message_processor_repository=plugins_container.message_processor_repository, api_services_manager=api_services_container.api_service_manager, - cloud_manager=common_container.cloud_storage_manager, + cloud_storage_manager=common_container.cloud_storage_manager, message_processor_bucket_name=bucket_name, ) diff --git a/wavefront/server/apps/floware/floware/services/config_service.py b/wavefront/server/apps/floware/floware/services/config_service.py index 901774da..2446ab36 100644 --- a/wavefront/server/apps/floware/floware/services/config_service.py +++ b/wavefront/server/apps/floware/floware/services/config_service.py @@ -10,11 +10,11 @@ class ConfigService: def __init__( self, config_repository: SQLAlchemyRepository[Config], - cloud_manager: CloudStorageManager, + cloud_storage_manager: CloudStorageManager, config: dict[str, Any], ) -> None: self.config_repository = config_repository - self.cloud_manager = cloud_manager + self.cloud_storage_manager = cloud_storage_manager self.config = config def _get_gcp_credentials(self) -> dict[str, Any]: @@ -42,7 +42,7 @@ async def store_app_config( file_content = await file.read() if file else None if file_content: - self.cloud_manager.save_small_file( + self.cloud_storage_manager.save_small_file( file_content, config_credentials['gcp_asset_storage_bucket'], config_credentials['config_file_name'], @@ -70,7 +70,7 @@ async def get_app_config(self): config_path = config_record[0].value.get('app_icon') config_credentials = self._get_gcp_credentials() # Generate new presigned URL - url = self.cloud_manager.generate_presigned_url( + url = self.cloud_storage_manager.generate_presigned_url( config_credentials['gcp_asset_storage_bucket'], config_path, 'get', diff --git a/wavefront/server/apps/floware/tests/conftest.py b/wavefront/server/apps/floware/tests/conftest.py index 9b700cbe..bd4e884d 100644 --- a/wavefront/server/apps/floware/tests/conftest.py +++ b/wavefront/server/apps/floware/tests/conftest.py @@ -153,7 +153,7 @@ def setup_containers(test_engine, test_session, test_user_id, test_session_id): @pytest.fixture -def mock_cloud_manager(): +def mock_cloud_storage_manager(): """Mock CloudStorageManager for testing""" mock_cloud = Mock(spec=CloudStorageManager) mock_cloud.save_small_file = Mock() @@ -203,13 +203,13 @@ async def mock_get_app_config(): @pytest.fixture def setup_application_container( - mock_cloud_manager, mock_config_repository, mock_config, mock_config_service + mock_cloud_storage_manager, mock_config_repository, mock_config, mock_config_service ): """Setup ApplicationContainer with mocked dependencies""" app_container = ApplicationContainer() # Override the dependencies - app_container.cloud_manager.override(mock_cloud_manager) + app_container.cloud_storage_manager.override(mock_cloud_storage_manager) app_container.config_repository.override(mock_config_repository) app_container.config.override(mock_config) app_container.config_service.override(mock_config_service) diff --git a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py index 816dfc2b..34292c16 100644 --- a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py +++ b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py @@ -33,7 +33,7 @@ plugins_container = PluginsContainer( db_client=db_repo_container.db_client, - cloud_manager=common_container.cloud_storage_manager, + cloud_storage_manager=common_container.cloud_storage_manager, dynamic_query_repository=db_repo_container.dynamic_query_repository, cache_manager=db_repo_container.cache_manager, ) @@ -51,7 +51,7 @@ knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, message_processor_repository=plugins_container.message_processor_repository, api_services_manager=api_services_container.api_service_manager, - cloud_manager=common_container.cloud_storage_manager, + cloud_storage_manager=common_container.cloud_storage_manager, message_processor_bucket_name=bucket_name, ) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py index a1bc2935..c9ec5bb5 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py @@ -22,13 +22,13 @@ async def get_resource_presigned_url( response_formatter: ResponseFormatter = Depends( Provide[CommonContainer.response_formatter] ), - cloud_manager: CloudStorageManager = Depends( - Provide[PluginsContainer.cloud_manager] + cloud_storage_manager: CloudStorageManager = Depends( + Provide[PluginsContainer.cloud_storage_manager] ), ): try: - bucket_name, key = cloud_manager.get_bucket_key(resource_url) - presigned_url = cloud_manager.generate_presigned_url( + bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url) + presigned_url = cloud_storage_manager.generate_presigned_url( bucket_name=bucket_name, key=key, type='GET', diff --git a/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py index 24ea7e09..ce3079d3 100644 --- a/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py +++ b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py @@ -14,11 +14,11 @@ class MessageProcessorToolDetailsProvider(ToolDetailsProvider): def __init__( self, message_processor_repository: SQLAlchemyRepository[MessageProcessors], - cloud_manager: CloudStorageManager, + cloud_storage_manager: CloudStorageManager, message_processor_bucket_name: str, ): self.message_processor_repository = message_processor_repository - self.cloud_manager = cloud_manager + self.cloud_storage_manager = cloud_storage_manager self.message_processor_bucket_name = message_processor_bucket_name self.prefix = 'message_processors/v1' @@ -95,7 +95,7 @@ async def get_tool_details( def _load_yaml_content(self, processor: MessageProcessors) -> str: """Load YAML content from cloud storage""" filepath = f'{self.prefix}/{processor.source}' - yaml_bytes = self.cloud_manager.read_file( + yaml_bytes = self.cloud_storage_manager.read_file( self.message_processor_bucket_name, filepath ) return yaml_bytes.decode('utf-8') diff --git a/wavefront/server/modules/tools_module/tools_module/tools_container.py b/wavefront/server/modules/tools_module/tools_module/tools_container.py index 82cd4a6c..928eda9f 100644 --- a/wavefront/server/modules/tools_module/tools_module/tools_container.py +++ b/wavefront/server/modules/tools_module/tools_module/tools_container.py @@ -19,7 +19,7 @@ class ToolsContainer(containers.DeclarativeContainer): knowledge_base_inference_repository = providers.Dependency() message_processor_repository = providers.Dependency() api_services_manager = providers.Dependency() - cloud_manager = providers.Dependency() + cloud_storage_manager = providers.Dependency() message_processor_bucket_name = providers.Dependency() # Tool loader tool_loader = providers.Singleton( @@ -41,7 +41,7 @@ class ToolsContainer(containers.DeclarativeContainer): message_processor_tool_provider = providers.Singleton( MessageProcessorToolDetailsProvider, message_processor_repository=message_processor_repository, - cloud_manager=cloud_manager, + cloud_storage_manager=cloud_storage_manager, message_processor_bucket_name=message_processor_bucket_name, ) From 1dad3f186a2e06b0789d030aebc46f31e3bdbe1b Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 5 Mar 2026 18:00:36 +0530 Subject: [PATCH 5/6] feat: add read api for cloud storage files --- .../controllers/cloud_storage_controller.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py index c9ec5bb5..6ca1045b 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py @@ -1,3 +1,7 @@ +import json +from enum import Enum +from typing import Optional + from dependency_injector.wiring import inject, Provide from fastapi import Depends, Query, status from fastapi.responses import JSONResponse @@ -12,6 +16,10 @@ cloud_storage_router = APIRouter() +class StorageFileType(str, Enum): + json = 'json' + + @cloud_storage_router.get('/v1/storage/signed-url') @inject async def get_resource_presigned_url( @@ -45,3 +53,50 @@ async def get_resource_presigned_url( status_code=status.HTTP_400_BAD_REQUEST, content=response_formatter.buildErrorResponse(str(e)), ) + + +@cloud_storage_router.get('/v1/storage/read') +@inject +async def read_storage_file( + resource_url: str = Query(..., description='The cloud storage URL of the resource'), + type: StorageFileType = Query(StorageFileType.json, description='File type'), + projection: Optional[str] = Query( + None, + description='Comma-separated list of top-level fields to return from the parsed data', + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), + cloud_storage_manager: CloudStorageManager = Depends( + Provide[PluginsContainer.cloud_storage_manager] + ), +): + try: + bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url) + file_buffer = cloud_storage_manager.read_file(bucket_name, key) + + if type == StorageFileType.json: + data = json.loads(file_buffer.read()) + if projection: + fields = {f.strip() for f in projection.split(',') if f.strip()} + if isinstance(data, dict): + data = {k: v for k, v in data.items() if k in fields} + elif isinstance(data, list): + data = [ + {k: v for k, v in item.items() if k in fields} + if isinstance(item, dict) + else item + for item in data + ] + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse({'data': data}), + ) + except (json.JSONDecodeError, ValueError) as e: + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content=response_formatter.buildErrorResponse( + f'Failed to parse file as {type.value}: {str(e)}' + ), + ) From 6360fa2168a90bcfbb4588efd9afbf9f60b8db12 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Thu, 5 Mar 2026 18:51:44 +0530 Subject: [PATCH 6/6] fix: fix storage read --- .../plugins_module/controllers/cloud_storage_controller.py | 4 ++-- .../server/packages/flo_cloud/flo_cloud/cloud_storage.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py index 6ca1045b..8f8af884 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py @@ -73,10 +73,10 @@ async def read_storage_file( ): try: bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url) - file_buffer = cloud_storage_manager.read_file(bucket_name, key) + file_content = cloud_storage_manager.read_file(bucket_name, key) if type == StorageFileType.json: - data = json.loads(file_buffer.read()) + data = json.loads(file_content) if projection: fields = {f.strip() for f in projection.split(',') if f.strip()} if isinstance(data, dict): diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py index cc703b71..6527270c 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py @@ -1,4 +1,3 @@ -from io import BytesIO from typing import Union, List, Tuple, Optional from .aws.s3 import S3Storage from .gcp.gcs import GCSStorage @@ -83,7 +82,7 @@ def _convert_to_valid_type(self, type: str) -> str: return 'POST' raise ValueError(f"Unsupported type '{type}' for provider '{self.provider}'") - def read_file(self, bucket_name: str, file_path: str) -> BytesIO: + def read_file(self, bucket_name: str, file_path: str) -> bytes: """ Read file from cloud storage