diff --git a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py index c0fc1a2c..4260f6cc 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py +++ b/wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py @@ -1,3 +1,4 @@ +from typing import Dict, Any from datasource.bigquery.config import BigQueryConfig from datasource.redshift.config import RedshiftConfig from dependency_injector.wiring import inject @@ -33,11 +34,18 @@ from plugins_module.plugins_container import PluginsContainer from user_management_module.user_container import UserContainer from user_management_module.services.user_service import UserService +from flo_cloud.cloud_storage import CloudStorageManager from fastapi import HTTPException from user_management_module.utils.user_utils import get_current_user from plugins_module.services.dynamic_query_service import DynamicQueryService from db_repo_module.cache.cache_manager import CacheManager -from ..utils.helper import generate_cache_key, validate_yaml_query +from ..utils.helper import ( + generate_cache_key, + generate_export_filename_hash, + validate_yaml_query, +) +import csv +import io import yaml from ..utils.helper import DynamicQueryRequest from ..utils.helper import DynamicQueryExecuteRequest @@ -47,6 +55,29 @@ datasource_router = APIRouter() +def _serialized_rows_to_csv(rows: list) -> bytes: + """Convert a list of serialized dicts (e.g. from execute_dynamic_query) to CSV bytes.""" + if not rows: + return b'' + out = io.StringIO() + fieldnames = list(rows[0].keys()) + for row in rows[1:]: + for k in row: + if k not in fieldnames: + fieldnames.append(k) + writer = csv.DictWriter(out, fieldnames=fieldnames, extrasaction='ignore') + + def _cell_value(v): + if isinstance(v, (dict, list)): + return json.dumps(v) + return v if v is None or isinstance(v, str) else str(v) + + writer.writeheader() + for row in rows: + writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) + return out.getvalue().encode('utf-8-sig') + + @datasource_router.post('/v1/datasources') @inject async def add_datasource( @@ -718,6 +749,188 @@ async def execute_dynamic_query( ) +EXPORT_RATE_LIMIT_SECONDS = 120 # 2 minutes between exports per user + + +@datasource_router.post('/v1/{datasource_id}/dynamic-queries/{query_id}/export') +@inject +async def export_dynamic_query_csv( + request: Request, + datasource_id: str, + query_id: str, + filter: str | None = Query(None, alias='$filter'), + offset: int | None = 0, + limit: int | None = 100, + dynamic_query_params: DynamicQueryExecuteRequest = None, + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), + dynamic_query_yaml_service: DynamicQueryService = Depends( + Provide[PluginsContainer.dynamic_query_service] + ), + user_service: UserService = Depends(Provide[UserContainer.user_service]), + cloud_manager: CloudStorageManager = Depends( + Provide[PluginsContainer.cloud_manager] + ), + config: dict = Depends(Provide[PluginsContainer.config]), + cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]), + force_fetch: int = Query(0), +): + """Execute the dynamic query and return results as a downloadable CSV file.""" + role_id, user_id, _ = get_current_user(request) + + # Block multiple exports per user within a 2-minute window (Redis) + export_rate_key = f'dynamic_query_export_rate:{user_id}' + if not cache_manager.add( + export_rate_key, '1', expiry=EXPORT_RATE_LIMIT_SECONDS, nx=True + ): + return JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content=response_formatter.buildErrorResponse( + f'Export rate limit: one export per user every {EXPORT_RATE_LIMIT_SECONDS // 60} minutes. Please try again later.' + ), + ) + datasource_type, datasource_config = await get_datasource_config(datasource_id) + if not datasource_config: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse( + f'Datasource not found: {datasource_id}' + ), + ) + yaml_query, _ = await dynamic_query_yaml_service.get_dynamic_yaml_query(query_id) + if not yaml_query: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse( + f'Dynamic query not found: {query_id}' + ), + ) + + rls_filter_str = None + is_admin = await check_admin(role_id) + if not is_admin: + rls_filters = await user_service.get_user_resources( + user_id=user_id, scope=ResourceScope.DATA + ) + if len(rls_filters) == 0: + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content=response_formatter.buildErrorResponse( + 'Data access not set for non-admin user' + ), + ) + rls_filters = fetch_data_filters(rls_filters) + rls_filter_str = f"{ ' $and '.join(rls_filters)}" + + # Bucket and filename: hash of $filter, limit, offset, dynamic_query_params + provider = config['cloud_config']['cloud_provider'] + bucket_name = ( + config['aws']['aws_asset_storage_bucket'] + if provider == 'aws' + else config['gcp']['gcp_asset_storage_bucket'] + ) + export_hash = generate_export_filename_hash( + filter=filter, + limit=limit, + offset=offset, + params=dynamic_query_params.params if dynamic_query_params else None, + rls_filter_str=rls_filter_str, + ) + filename = f'export_{query_id}_{export_hash}.csv' + file_key = f'dynamic_query_exports/{filename}' + + # If not force_fetch, return existing file from bucket if present + if not force_fetch: + existing_keys, _ = cloud_manager.list_files( + bucket_name=bucket_name, + prefix=file_key, + page_size=1, + page_number=1, + ) + if existing_keys and existing_keys[0] == file_key: + signed_url = cloud_manager.generate_presigned_url( + bucket_name=bucket_name, key=file_key, type='GET' + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'export_url': signed_url} + ), + ) + + datasource_plugin = DatasourcePlugin(datasource_type, datasource_config) + res: Dict[str, Any] = await datasource_plugin.execute_dynamic_query( + yaml_query, + rls_filter_str, + filter, + offset, + limit, + dynamic_query_params.params if dynamic_query_params else None, + ) + + if not res: + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, no results' + ), + ) + + first_key = next(iter(res)) + if res[first_key].get('status') != 'success': + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, no results' + ), + ) + + serialized_res = serialize_values(res[first_key]['result']) + + # Stream rows to CSV directly in GCS/S3 to avoid building the full CSV in memory + rows = serialized_res or [] + if not isinstance(rows, list): + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse( + f'Unexpected dynamic query result format for query_id {query_id}, invalid rows' + ), + ) + + if rows: + fieldnames = list(rows[0].keys()) + for row in rows[1:]: + for k in row: + if k not in fieldnames: + fieldnames.append(k) + else: + fieldnames = [] + + def _cell_value(v): + if isinstance(v, (dict, list)): + return json.dumps(v) + return v if v is None or isinstance(v, str) else str(v) + + with cloud_manager.open_text_writer( + bucket_name=bucket_name, key=file_key, content_type='text/csv' + ) as f: + if fieldnames: + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') + writer.writeheader() + for row in rows: + writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames}) + + signed_url = cloud_manager.generate_presigned_url( + bucket_name=bucket_name, key=file_key, type='GET' + ) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse({'export_url': signed_url}), + ) + + @datasource_router.delete('/v1/{datasource_id}/dynamic-queries/{query_id}') @inject async def delete_dynamic_query( diff --git a/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py b/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py index 744d9913..61714608 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py +++ b/wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py @@ -53,6 +53,7 @@ def check_is_valid_resource(resource_id: str) -> bool: 'rf_parsed_data_object', 'rf_gold_data_object', 'rf_gold_item_details', + 'rf_gold_auditor_data', ]: return True return False diff --git a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py index 6e51d2ea..9b71bcd0 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py +++ b/wavefront/server/modules/plugins_module/plugins_module/utils/helper.py @@ -53,6 +53,25 @@ def generate_cache_key( return f'dynamic_query:{hash_digest}' +def generate_export_filename_hash( + filter: str = None, + limit: int = None, + offset: int = None, + params: dict[str, str] = None, + rls_filter_str: str = None, +) -> str: + """Generate a short hash for export filename from $filter, limit, offset, dynamic_query params, and RLS scope.""" + key_dict = { + 'filter': filter, + 'limit': limit, + 'offset': offset, + 'params': params or {}, + 'rls': rls_filter_str, + } + key_json = json.dumps(key_dict, sort_keys=True, separators=(',', ':')) + return hashlib.md5(key_json.encode()).hexdigest() + + def validate_yaml_query(yaml_query: dict) -> bool: """ Validate the structure of a dynamic query YAML file. diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py b/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py index 175d5914..a9ade42e 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, IO, ContextManager class CloudStorageHandler(ABC): @@ -103,3 +103,16 @@ def list_files( Exception: If listing fails """ pass + + @abstractmethod + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text writer stream to cloud storage for incremental writes. + + Implementations should return a context manager that yields a file-like + object opened in text mode. Data written to this object must be + uploaded to the underlying storage when the context manager exits. + """ + pass diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py b/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py index 689745be..53e75f17 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py @@ -1,7 +1,8 @@ from itertools import islice import boto3 import io -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, IO, ContextManager +from contextlib import contextmanager from botocore.exceptions import ClientError from .._types import CloudStorageHandler from ..exceptions import CloudStorageFileNotFoundError @@ -201,3 +202,27 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: self.s3_client.delete_object(Bucket=bucket_name, Key=file_path) except Exception as e: raise Exception(f'Error deleting file from S3: {str(e)}') + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer for S3 uploads. + + Since boto3 does not provide a native streaming text writer API like + GCS, this implementation buffers content in memory and uploads it on + context exit. This keeps a consistent interface with GCS, while still + allowing incremental CSV writing logic to be shared. + """ + + @contextmanager + def _writer() -> IO[str]: + buffer = io.StringIO() + try: + yield buffer + data = buffer.getvalue().encode('utf-8') + self.save_large_file(data, bucket_name, key, content_type) + finally: + buffer.close() + + return _writer() diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py index cc703b71..e3b926c4 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py @@ -1,5 +1,5 @@ from io import BytesIO -from typing import Union, List, Tuple, Optional +from typing import Union, List, Tuple, Optional, IO, ContextManager from .aws.s3 import S3Storage from .gcp.gcs import GCSStorage from ._types import CloudStorageHandler, CloudProvider @@ -187,3 +187,14 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: file_path: Path to the file in bucket """ return self.handler.delete_file(bucket_name, file_path) + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer to cloud storage for incremental writes. + + For GCS, this uses the native streaming blob.open API. + For S3, this buffers content in memory and uploads on close. + """ + return self.handler.open_text_writer(bucket_name, key, content_type) diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 620ed7f3..8dc99684 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -3,7 +3,7 @@ from itertools import islice from google.cloud import storage from google.cloud.exceptions import NotFound -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, IO, ContextManager from .._types import CloudStorageHandler from ..exceptions import CloudStorageFileNotFoundError import re @@ -240,3 +240,21 @@ def delete_file(self, bucket_name: str, file_path: str) -> None: blob.delete() except Exception as e: raise Exception(f'Error deleting file from GCS: {str(e)}') + + def open_text_writer( + self, bucket_name: str, key: str, content_type: Optional[str] = None + ) -> ContextManager[IO[str]]: + """ + Open a text-mode writer to a GCS object using the native blob.open API. + + Returns a context manager yielding a file-like object. Data written + to this object is streamed directly to GCS. + """ + if not bucket_name: + raise ValueError('bucket_name cannot be None or empty') + if not key: + raise ValueError('key cannot be None or empty') + + bucket = self.client.bucket(bucket_name) + blob = bucket.blob(key) + return blob.open('wt', content_type=content_type) diff --git a/wavefront/server/plugins/datasource/datasource/__init__.py b/wavefront/server/plugins/datasource/datasource/__init__.py index 6ebb9f23..90d4d718 100644 --- a/wavefront/server/plugins/datasource/datasource/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/__init__.py @@ -104,12 +104,12 @@ async def execute_dynamic_query( offset: Optional[int] = 0, limit: Optional[int] = 100, params: Optional[Dict[str, Any]] = None, - ): + ) -> Dict[str, Any]: odata_filter, odata_params = self.odata_parser.prepare_odata_filter(filter) odata_data_filter, odata_data_params = self.odata_parser.prepare_odata_filter( rls_filter ) - result_by_query = await self.datasource.execute_dynamic_query( + result_by_query: Dict[str, Any] = await self.datasource.execute_dynamic_query( query, offset, limit, diff --git a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py index 40497bca..9931eedc 100644 --- a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py @@ -80,7 +80,7 @@ async def execute_dynamic_query( odata_data_filter: Optional[str] = None, odata_data_params: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, - ): + ) -> Dict[str, Any]: results = {} tasks = [] diff --git a/wavefront/server/plugins/datasource/datasource/types.py b/wavefront/server/plugins/datasource/datasource/types.py index 074b6056..5aeb161e 100644 --- a/wavefront/server/plugins/datasource/datasource/types.py +++ b/wavefront/server/plugins/datasource/datasource/types.py @@ -91,7 +91,7 @@ async def execute_dynamic_query( odata_data_params: Optional[Dict[str, Any]] = None, offset: Optional[int] = 0, limit: Optional[int] = 100, - ): + ) -> Dict[str, Any]: pass @abstractmethod