From 2896f454202733ad487ec4efd60fd2503e568661 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Tue, 24 Feb 2026 17:26:39 +0530 Subject: [PATCH 1/8] feature to export dynamic queries --- .../controllers/datasource_controller.py | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index c0fc1a2c..fe0b1ca0 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -8,6 +8,7 @@ from fastapi import Request from fastapi import status from fastapi.responses import JSONResponse +from fastapi.responses import StreamingResponse from fastapi.routing import APIRouter from common_module.common_container import CommonContainer @@ -38,6 +39,8 @@ from plugins_module.services.dynamic_query_service import DynamicQueryService from db_repo_module.cache.cache_manager import CacheManager from ..utils.helper import generate_cache_key, validate_yaml_query +import csv +import io import yaml from ..utils.helper import DynamicQueryRequest from ..utils.helper import DynamicQueryExecuteRequest @@ -47,6 +50,29 @@ datasource_router = APIRouter() +def _serialized_rows_to_csv(rows: list) -> bytes: + """Convert a list of serialized dicts (e.g. from execute_dynamic_query) to CSV bytes.""" + if not rows: + return b'' + out = io.StringIO() + fieldnames = list(rows[0].keys()) + for row in rows[1:]: + for k in row: + if k not in fieldnames: + fieldnames.append(k) + writer = csv.DictWriter(out, fieldnames=fieldnames, extrasaction='ignore') + + def _cell_value(v): + if isinstance(v, (dict, list)): + return json.dumps(v) + return v if v is None or isinstance(v, str) else str(v) + + writer.writeheader() + for row in rows: + writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) + return out.getvalue().encode('utf-8-sig') + + @datasource_router.post('/v1/datasources') @inject async def add_datasource( @@ -718,6 +744,108 @@ async def execute_dynamic_query( ) +@datasource_router.post('/v1/{datasource_id}/dynamic-queries/{query_id}/export') +@inject +async def export_dynamic_query_csv( + request: Request, + datasource_id: str, + query_id: str, + filter: str | None = Query(None, alias='$filter'), + offset: int | None = 0, + limit: int | None = 100, + dynamic_query_params: DynamicQueryExecuteRequest = None, + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), + dynamic_query_yaml_service: DynamicQueryService = Depends( + Provide[PluginsContainer.dynamic_query_service] + ), + user_service: UserService = Depends(Provide[UserContainer.user_service]), + cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]), + force_fetch: int = Query(0), +): + """Execute the dynamic query and return results as a downloadable CSV file.""" + role_id, user_id, _ = get_current_user(request) + datasource_type, datasource_config = await get_datasource_config(datasource_id) + if not datasource_config: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse( + f'Datasource not found: {datasource_id}' + ), + ) + yaml_query, _ = await dynamic_query_yaml_service.get_dynamic_yaml_query(query_id) + if not yaml_query: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse( + f'Dynamic query not found: {query_id}' + ), + ) + + rls_filter_str = None + is_admin = await check_admin(role_id) + if not is_admin: + rls_filters = await user_service.get_user_resources( + user_id=user_id, scope=ResourceScope.DATA + ) + if len(rls_filters) == 0: + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content=response_formatter.buildErrorResponse( + 'Data access not set for non-admin user' + ), + ) + rls_filters = fetch_data_filters(rls_filters) + rls_filter_str = f"{ ' $and '.join(rls_filters)}" + + datasource_plugin = DatasourcePlugin(datasource_type, datasource_config) + cache_key = generate_cache_key( + query_id, + filter, + rls_filter_str, + limit, + offset, + dynamic_query_params.params if dynamic_query_params else None, + ) + if not force_fetch: + cached_result = cache_manager.get_str(cache_key) + if cached_result: + serialized_res = json.loads(cached_result) + else: + res = await datasource_plugin.execute_dynamic_query( + yaml_query, + rls_filter_str, + filter, + offset, + limit, + dynamic_query_params.params if dynamic_query_params else None, + ) + serialized_res = serialize_values(res) + cache_manager.add(cache_key, json.dumps(serialized_res), expiry=60 * 2) + else: + res = await datasource_plugin.execute_dynamic_query( + yaml_query, + rls_filter_str, + filter, + offset, + limit, + dynamic_query_params.params if dynamic_query_params else None, + ) + serialized_res = serialize_values(res) + cache_manager.add(cache_key, json.dumps(serialized_res), expiry=60 * 2) + + csv_bytes = _serialized_rows_to_csv(serialized_res) + filename = f'export_{query_id}.csv' + return StreamingResponse( + iter([csv_bytes]), + media_type='text/csv', + headers={ + 'Content-Disposition': f'attachment; filename="{filename}"', + }, + ) + + @datasource_router.delete('/v1/{datasource_id}/dynamic-queries/{query_id}') @inject async def delete_dynamic_query( From 154d0a6ce8133cc314b393f72aa777ae099e455e Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 4 Mar 2026 13:31:07 +0530 Subject: [PATCH 2/8] fix(floware): dynamic query based expoer --- .../controllers/datasource_controller.py | 46 ++++++------------- .../plugins/datasource/datasource/__init__.py | 4 +- .../datasource/bigquery/__init__.py | 2 +- .../plugins/datasource/datasource/types.py | 2 +- 4 files changed, 19 insertions(+), 35 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index fe0b1ca0..0395b6d7 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -1,3 +1,4 @@ +from typing import Dict, Any from datasource.bigquery.config import BigQueryConfig from datasource.redshift.config import RedshiftConfig from dependency_injector.wiring import inject @@ -761,8 +762,6 @@ async def export_dynamic_query_csv( Provide[PluginsContainer.dynamic_query_service] ), user_service: UserService = Depends(Provide[UserContainer.user_service]), - cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]), - force_fetch: int = Query(0), ): """Execute the dynamic query and return results as a downloadable CSV file.""" role_id, user_id, _ = get_current_user(request) @@ -800,40 +799,25 @@ async def export_dynamic_query_csv( rls_filter_str = f"{ ' $and '.join(rls_filters)}" datasource_plugin = DatasourcePlugin(datasource_type, datasource_config) - cache_key = generate_cache_key( - query_id, - filter, + res: Dict[str, Any] = await datasource_plugin.execute_dynamic_query( + yaml_query, rls_filter_str, - limit, + filter, offset, + limit, dynamic_query_params.params if dynamic_query_params else None, ) - if not force_fetch: - cached_result = cache_manager.get_str(cache_key) - if cached_result: - serialized_res = json.loads(cached_result) - else: - res = await datasource_plugin.execute_dynamic_query( - yaml_query, - rls_filter_str, - filter, - offset, - limit, - dynamic_query_params.params if dynamic_query_params else None, - ) - serialized_res = serialize_values(res) - cache_manager.add(cache_key, json.dumps(serialized_res), expiry=60 * 2) - else: - res = await datasource_plugin.execute_dynamic_query( - yaml_query, - rls_filter_str, - filter, - offset, - limit, - dynamic_query_params.params if dynamic_query_params else None, + + if len(res.keys()) < 1 or res[res.keys()[0]]['status'] != 'success': + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, no results' + ), ) - serialized_res = serialize_values(res) - cache_manager.add(cache_key, json.dumps(serialized_res), expiry=60 * 2) + + first_key = res.keys()[0] + serialized_res = serialize_values(res[first_key]['result']) csv_bytes = _serialized_rows_to_csv(serialized_res) filename = f'export_{query_id}.csv' diff --git a/wavefront/server/plugins/datasource/datasource/__init__.py b/wavefront/server/plugins/datasource/datasource/__init__.py index 6ebb9f23..90d4d718 100644 --- a/wavefront/server/plugins/datasource/datasource/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/__init__.py @@ -104,12 +104,12 @@ async def execute_dynamic_query( offset: Optional[int] = 0, limit: Optional[int] = 100, params: Optional[Dict[str, Any]] = None, - ): + ) -> Dict[str, Any]: odata_filter, odata_params = self.odata_parser.prepare_odata_filter(filter) odata_data_filter, odata_data_params = self.odata_parser.prepare_odata_filter( rls_filter ) - result_by_query = await self.datasource.execute_dynamic_query( + result_by_query: Dict[str, Any] = await self.datasource.execute_dynamic_query( query, offset, limit, diff --git a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py index 40497bca..9931eedc 100644 --- a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py @@ -80,7 +80,7 @@ async def execute_dynamic_query( odata_data_filter: Optional[str] = None, odata_data_params: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, - ): + ) -> Dict[str, Any]: results = {} tasks = [] diff --git a/wavefront/server/plugins/datasource/datasource/types.py b/wavefront/server/plugins/datasource/datasource/types.py index 074b6056..5aeb161e 100644 --- a/wavefront/server/plugins/datasource/datasource/types.py +++ b/wavefront/server/plugins/datasource/datasource/types.py @@ -91,7 +91,7 @@ async def execute_dynamic_query( odata_data_params: Optional[Dict[str, Any]] = None, offset: Optional[int] = 0, limit: Optional[int] = 100, - ): + ) -> Dict[str, Any]: pass @abstractmethod From baa0d53cf286b89008cb219e7121e3135bc3d23f Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 4 Mar 2026 14:18:08 +0530 Subject: [PATCH 3/8] fix(floware): dynamic query based export --- .../controllers/datasource_controller.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index 0395b6d7..e5993132 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -808,7 +808,16 @@ async def export_dynamic_query_csv( dynamic_query_params.params if dynamic_query_params else None, ) - if len(res.keys()) < 1 or res[res.keys()[0]]['status'] != 'success': + if not res: + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, no results' + ), + ) + + first_key = next(iter(res)) + if res[first_key].get('status') != 'success': return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=response_formatter.buildErrorResponse( @@ -816,7 +825,6 @@ async def export_dynamic_query_csv( ), ) - first_key = res.keys()[0] serialized_res = serialize_values(res[first_key]['result']) csv_bytes = _serialized_rows_to_csv(serialized_res) From 461407687e4e9259871ab73ca0213aaa030466ec Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 4 Mar 2026 16:15:58 +0530 Subject: [PATCH 4/8] Export using CSV signed url --- .../controllers/datasource_controller.py | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index e5993132..cd64f860 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -9,7 +9,6 @@ from fastapi import Request from fastapi import status from fastapi.responses import JSONResponse -from fastapi.responses import StreamingResponse from fastapi.routing import APIRouter from common_module.common_container import CommonContainer @@ -35,6 +34,7 @@ from plugins_module.plugins_container import PluginsContainer from user_management_module.user_container import UserContainer from user_management_module.services.user_service import UserService +from flo_cloud.cloud_storage import CloudStorageManager from fastapi import HTTPException from user_management_module.utils.user_utils import get_current_user from plugins_module.services.dynamic_query_service import DynamicQueryService @@ -762,6 +762,12 @@ async def export_dynamic_query_csv( Provide[PluginsContainer.dynamic_query_service] ), user_service: UserService = Depends(Provide[UserContainer.user_service]), + cloud_manager: CloudStorageManager = Depends( + Provide[PluginsContainer.cloud_manager] + ), + config: PluginsContainer.config.provided = Depends( + Provide[PluginsContainer.config] + ), ): """Execute the dynamic query and return results as a downloadable CSV file.""" role_id, user_id, _ = get_current_user(request) @@ -827,14 +833,37 @@ async def export_dynamic_query_csv( serialized_res = serialize_values(res[first_key]['result']) + # Convert rows to CSV bytes csv_bytes = _serialized_rows_to_csv(serialized_res) filename = f'export_{query_id}.csv' - return StreamingResponse( - iter([csv_bytes]), - media_type='text/csv', - headers={ - 'Content-Disposition': f'attachment; filename="{filename}"', - }, + + # Store CSV in main application bucket under dynamic_query_updates folder + # and return a signed URL to the file + provider = config.cloud_config.cloud_provider + bucket_name = ( + config.aws.aws_asset_storage_bucket + if provider == 'aws' + else config.gcp.gcp_asset_storage_bucket + ) + + file_key = f'dynamic_query_updates/{filename}' + + # Save the CSV to cloud storage + cloud_manager.save_small_file( + file_content=csv_bytes, + bucket_name=bucket_name, + key=file_key, + content_type='text/csv', + ) + + # Generate a signed URL for downloading the file + signed_url = cloud_manager.generate_presigned_url( + bucket_name=bucket_name, key=file_key, type='GET' + ) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse({'export_url': signed_url}), ) From 9a3bdbd1e47263e7c82ff1a5331d53da25f5eb8e Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 4 Mar 2026 16:51:30 +0530 Subject: [PATCH 5/8] Export using CSV signed url --- .../controllers/datasource_controller.py | 63 +++++++++++++------ .../plugins_module/utils/helper.py | 17 +++++ 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index cd64f860..99d387fe 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -39,7 +39,11 @@ from user_management_module.utils.user_utils import get_current_user from plugins_module.services.dynamic_query_service import DynamicQueryService from db_repo_module.cache.cache_manager import CacheManager -from ..utils.helper import generate_cache_key, validate_yaml_query +from ..utils.helper import ( + generate_cache_key, + generate_export_filename_hash, + validate_yaml_query, +) import csv import io import yaml @@ -765,9 +769,8 @@ async def export_dynamic_query_csv( cloud_manager: CloudStorageManager = Depends( Provide[PluginsContainer.cloud_manager] ), - config: PluginsContainer.config.provided = Depends( - Provide[PluginsContainer.config] - ), + config: dict = Depends(Provide[PluginsContainer.config]), + force_fetch: int = Query(0), ): """Execute the dynamic query and return results as a downloadable CSV file.""" role_id, user_id, _ = get_current_user(request) @@ -804,6 +807,41 @@ async def export_dynamic_query_csv( rls_filters = fetch_data_filters(rls_filters) rls_filter_str = f"{ ' $and '.join(rls_filters)}" + # Bucket and filename: hash of $filter, limit, offset, dynamic_query_params + provider = config['cloud_config']['cloud_provider'] + bucket_name = ( + config['aws']['aws_asset_storage_bucket'] + if provider == 'aws' + else config['gcp']['gcp_asset_storage_bucket'] + ) + export_hash = generate_export_filename_hash( + filter=filter, + limit=limit, + offset=offset, + params=dynamic_query_params.params if dynamic_query_params else None, + ) + filename = f'export_{query_id}_{export_hash}.csv' + file_key = f'dynamic_query_exports/{filename}' + + # If not force_fetch, return existing file from bucket if present + if not force_fetch: + existing_keys, _ = cloud_manager.list_files( + bucket_name=bucket_name, + prefix=file_key, + page_size=1, + page_number=1, + ) + if existing_keys and existing_keys[0] == file_key: + signed_url = cloud_manager.generate_presigned_url( + bucket_name=bucket_name, key=file_key, type='GET' + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'export_url': signed_url} + ), + ) + datasource_plugin = DatasourcePlugin(datasource_type, datasource_config) res: Dict[str, Any] = await datasource_plugin.execute_dynamic_query( yaml_query, @@ -833,22 +871,8 @@ async def export_dynamic_query_csv( serialized_res = serialize_values(res[first_key]['result']) - # Convert rows to CSV bytes + # Convert rows to CSV bytes and store in bucket csv_bytes = _serialized_rows_to_csv(serialized_res) - filename = f'export_{query_id}.csv' - - # Store CSV in main application bucket under dynamic_query_updates folder - # and return a signed URL to the file - provider = config.cloud_config.cloud_provider - bucket_name = ( - config.aws.aws_asset_storage_bucket - if provider == 'aws' - else config.gcp.gcp_asset_storage_bucket - ) - - file_key = f'dynamic_query_updates/{filename}' - - # Save the CSV to cloud storage cloud_manager.save_small_file( file_content=csv_bytes, bucket_name=bucket_name, @@ -856,7 +880,6 @@ async def export_dynamic_query_csv( content_type='text/csv', ) - # Generate a signed URL for downloading the file signed_url = cloud_manager.generate_presigned_url( bucket_name=bucket_name, key=file_key, type='GET' ) diff --git a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py index 6e51d2ea..55f60d18 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py +++ b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py @@ -53,6 +53,23 @@ def generate_cache_key( return f'dynamic_query:{hash_digest}' +def generate_export_filename_hash( + filter: str = None, + limit: int = None, + offset: int = None, + params: dict[str, str] = None, +) -> str: + """Generate a short hash for export filename from $filter, limit, offset, and dynamic_query params.""" + key_dict = { + 'filter': filter, + 'limit': limit, + 'offset': offset, + 'params': params or {}, + } + key_json = json.dumps(key_dict, sort_keys=True, separators=(',', ':')) + return hashlib.md5(key_json.encode()).hexdigest() + + def validate_yaml_query(yaml_query: dict) -> bool: """ Validate the structure of a dynamic query YAML file. From d68ab88f54cb262d0d76502e3e1ec49c5bcfd6fc Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 4 Mar 2026 17:59:30 +0530 Subject: [PATCH 6/8] fix(gold): rls filter to download api --- .../plugins_module/controllers/datasource_controller.py | 1 + .../modules/plugins_module/plugins_module/utils/helper.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index 99d387fe..cb5fd2de 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -819,6 +819,7 @@ async def export_dynamic_query_csv( limit=limit, offset=offset, params=dynamic_query_params.params if dynamic_query_params else None, + rls_filter_str=rls_filter_str, ) filename = f'export_{query_id}_{export_hash}.csv' file_key = f'dynamic_query_exports/{filename}' diff --git a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py index 55f60d18..9b71bcd0 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py +++ b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py @@ -58,13 +58,15 @@ def generate_export_filename_hash( limit: int = None, offset: int = None, params: dict[str, str] = None, + rls_filter_str: str = None, ) -> str: - """Generate a short hash for export filename from $filter, limit, offset, and dynamic_query params.""" + """Generate a short hash for export filename from $filter, limit, offset, dynamic_query params, and RLS scope.""" key_dict = { 'filter': filter, 'limit': limit, 'offset': offset, 'params': params or {}, + 'rls': rls_filter_str, } key_json = json.dumps(key_dict, sort_keys=True, separators=(',', ':')) return hashlib.md5(key_json.encode()).hexdigest() From 077779afc63c5533cdededa402a248a6ead82516 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Thu, 5 Mar 2026 16:05:01 +0530 Subject: [PATCH 7/8] fix(floware): support for more tables --- .../plugins_module/services/datasource_services.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py b/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py index 744d9913..61714608 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py +++ b/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py @@ -53,6 +53,7 @@ def check_is_valid_resource(resource_id: str) -> bool: 'rf_parsed_data_object', 'rf_gold_data_object', 'rf_gold_item_details', + 'rf_gold_auditor_data', ]: return True return False From f18373af7344be019d0087432b780f2fd7946f56 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Fri, 6 Mar 2026 16:41:54 +0530 Subject: [PATCH 8/8] fix(floware): use streaming write to GCS and also blocking multi-export per user --- .../controllers/datasource_controller.py | 56 ++++++++++++++++--- .../flo_cloud/_types/cloud_storage.py | 15 ++++- .../packages/flo_cloud/flo_cloud/aws/s3.py | 27 ++++++++- .../flo_cloud/flo_cloud/cloud_storage.py | 13 ++++- .../packages/flo_cloud/flo_cloud/gcp/gcs.py | 20 ++++++- 5 files changed, 119 insertions(+), 12 deletions(-) diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index cb5fd2de..4260f6cc 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -749,6 +749,9 @@ async def execute_dynamic_query( ) +EXPORT_RATE_LIMIT_SECONDS = 120 # 2 minutes between exports per user + + @datasource_router.post('/v1/{datasource_id}/dynamic-queries/{query_id}/export') @inject async def export_dynamic_query_csv( @@ -770,10 +773,23 @@ async def export_dynamic_query_csv( Provide[PluginsContainer.cloud_manager] ), config: dict = Depends(Provide[PluginsContainer.config]), + cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]), force_fetch: int = Query(0), ): """Execute the dynamic query and return results as a downloadable CSV file.""" role_id, user_id, _ = get_current_user(request) + + # Block multiple exports per user within a 2-minute window (Redis) + export_rate_key = f'dynamic_query_export_rate:{user_id}' + if not cache_manager.add( + export_rate_key, '1', expiry=EXPORT_RATE_LIMIT_SECONDS, nx=True + ): + return JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content=response_formatter.buildErrorResponse( + f'Export rate limit: one export per user every {EXPORT_RATE_LIMIT_SECONDS // 60} minutes. Please try again later.' + ), + ) datasource_type, datasource_config = await get_datasource_config(datasource_id) if not datasource_config: return JSONResponse( @@ -872,14 +888,38 @@ async def export_dynamic_query_csv( serialized_res = serialize_values(res[first_key]['result']) - # Convert rows to CSV bytes and store in bucket - csv_bytes = _serialized_rows_to_csv(serialized_res) - cloud_manager.save_small_file( - file_content=csv_bytes, - bucket_name=bucket_name, - key=file_key, - content_type='text/csv', - ) + # Stream rows to CSV directly in GCS/S3 to avoid building the full CSV in memory + rows = serialized_res or [] + if not isinstance(rows, list): + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, invalid rows' + ), + ) + + if rows: + fieldnames = list(rows[0].keys()) + for row in rows[1:]: + for k in row: + if k not in fieldnames: + fieldnames.append(k) + else: + fieldnames = [] + + def _cell_value(v): + if isinstance(v, (dict, list)): + return json.dumps(v) + return v if v is None or isinstance(v, str) else str(v) + + with cloud_manager.open_text_writer( + bucket_name=bucket_name, key=file_key, content_type='text/csv' + ) as f: + if fieldnames: + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') + writer.writeheader() + for row in rows: + writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) signed_url = cloud_manager.generate_presigned_url( bucket_name=bucket_name, key=file_key, type='GET' diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py b/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py index 175d5914..a9ade42e 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, IO, ContextManager class CloudStorageHandler(ABC): @@ -103,3 +103,16 @@ def list_files( Exception: If listing fails """ pass + + @abstractmethod + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text writer stream to cloud storage for incremental writes. + + Implementations should return a context manager that yields a file-like + object opened in text mode. Data written to this object must be + uploaded to the underlying storage when the context manager exits. + """ + pass diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py b/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py index 689745be..53e75f17 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py @@ -1,7 +1,8 @@ from itertools import islice import boto3 import io -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, IO, ContextManager +from contextlib import contextmanager from botocore.exceptions import ClientError from .._types import CloudStorageHandler from ..exceptions import CloudStorageFileNotFoundError @@ -201,3 +202,27 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: self.s3_client.delete_object(Bucket=bucket_name, Key=file_path) except Exception as e: raise Exception(f'Error deleting file from S3: {str(e)}') + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer for S3 uploads. + + Since boto3 does not provide a native streaming text writer API like + GCS, this implementation buffers content in memory and uploads it on + context exit. This keeps a consistent interface with GCS, while still + allowing incremental CSV writing logic to be shared. + """ + + @contextmanager + def _writer() -> IO[str]: + buffer = io.StringIO() + try: + yield buffer + data = buffer.getvalue().encode('utf-8') + self.save_large_file(data, bucket_name, key, content_type) + finally: + buffer.close() + + return _writer() diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py index cc703b71..e3b926c4 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py @@ -1,5 +1,5 @@ from io import BytesIO -from typing import Union, List, Tuple, Optional +from typing import Union, List, Tuple, Optional, IO, ContextManager from .aws.s3 import S3Storage from .gcp.gcs import GCSStorage from ._types import CloudStorageHandler, CloudProvider @@ -187,3 +187,14 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: file_path: Path to the file in bucket """ return self.handler.delete_file(bucket_name, file_path) + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer to cloud storage for incremental writes. + + For GCS, this uses the native streaming blob.open API. + For S3, this buffers content in memory and uploads on close. + """ + return self.handler.open_text_writer(bucket_name, key, content_type) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 620ed7f3..8dc99684 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -3,7 +3,7 @@ from itertools import islice from google.cloud import storage from google.cloud.exceptions import NotFound -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, IO, ContextManager from .._types import CloudStorageHandler from ..exceptions import CloudStorageFileNotFoundError import re @@ -240,3 +240,21 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: blob.delete() except Exception as e: raise Exception(f'Error deleting file from GCS: {str(e)}') + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer to a GCS object using the native blob.open API. + + Returns a context manager yielding a file-like object. Data written + to this object is streamed directly to GCS. + """ + if not bucket_name: + raise ValueError('bucket_name cannot be None or empty') + if not key: + raise ValueError('key cannot be None or empty') + + bucket = self.client.bucket(bucket_name) + blob = bucket.blob(key) + return blob.open('wt', content_type=content_type)