Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from fastapi.params import Depends
from dependency_injector.wiring import inject
from dependency_injector.wiring import Provide
from db_repo_module.models.datasource import Datasource
from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository
from plugins_module.plugins_container import PluginsContainer
from floware.services.config_service import ConfigService
from user_management_module.utils.user_utils import get_current_user, check_is_admin
from fastapi import HTTPException
Expand Down Expand Up @@ -74,6 +77,10 @@ async def get_config(
ConfigService,
Depends(Provide[ApplicationContainer.config_service]),
],
datasource_repository: Annotated[
SQLAlchemyRepository[Datasource],
Depends(Provide[PluginsContainer.datasource_repository]),
],
response_formatter: Annotated[
ResponseFormatter,
Depends(Provide[CommonContainer.response_formatter]),
Expand All @@ -84,16 +91,16 @@ async def get_config(
such as logo, table to query, etc.
"""
url, app_config = await config_service.get_app_config()
if not url:
return JSONResponse(
status_code=status.HTTP_200_OK,
content=response_formatter.buildSuccessResponse(
{'message': 'No config found'}
),
)
datasources = await datasource_repository.find()
datasource_ids = [str(datasource.id) for datasource in datasources]
Comment on lines +94 to +95
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

find() default limit silently truncates datasource IDs.

Line 94 calls datasource_repository.find() without limit; SQLAlchemyRepository.find() defaults to 100, so the response can omit datasources once count exceeds 100 (see wavefront/server/modules/db_repo_module/db_repo_module/repositories/sql_alchemy_repository.py, Line 75).
This makes the new datasources field incomplete under normal growth.

💡 Proposed fix
-    datasources = await datasource_repository.find()
+    # Avoid implicit repository default truncation (default limit=100)
+    datasources = await datasource_repository.find(limit=10_000)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
datasources = await datasource_repository.find()
datasource_ids = [str(datasource.id) for datasource in datasources]
# Avoid implicit repository default truncation (default limit=100)
datasources = await datasource_repository.find(limit=10_000)
datasource_ids = [str(datasource.id) for datasource in datasources]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/controllers/config_controller.py`
around lines 94 - 95, The code calls datasource_repository.find() which uses
SQLAlchemyRepository.find() default limit (100) and can truncate results; change
the call to explicitly request all rows (e.g., call
datasource_repository.find(limit=None) or datasource_repository.find(limit=0)
depending on repository semantics) so datasources contains every datasource
before building datasource_ids = [str(datasource.id) for datasource in
datasources]; update that invocation in config_controller.py to pass the
explicit limit to eliminate silent truncation.


return JSONResponse(
status_code=status.HTTP_200_OK,
content=response_formatter.buildSuccessResponse(
{'app_icon': url, 'app_config': app_config}
{
'app_icon': url,
'app_config': app_config,
'datasources': datasource_ids,
}
Comment on lines +100 to +104
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Response shape can become inconsistent when config is missing.

config_service.get_app_config() can return app_config = None, so Line 102 may return null instead of an object. Prefer returning a stable shape ({}) for client safety.

💡 Proposed fix
             {
                 'app_icon': url,
-                'app_config': app_config,
+                'app_config': app_config or {},
                 'datasources': datasource_ids,
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
{
'app_icon': url,
'app_config': app_config,
'datasources': datasource_ids,
}
{
'app_icon': url,
'app_config': app_config or {},
'datasources': datasource_ids,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/controllers/config_controller.py`
around lines 100 - 104, The response currently injects app_config directly from
config_service.get_app_config() which can be None, causing 'app_config' to be
null; update the controller code that builds the response (where the dict
contains 'app_icon', 'app_config', 'datasources') to normalize app_config to a
stable object by replacing None with an empty dict (e.g., app_config =
app_config or {}) before inserting it into the response so clients always
receive an object for 'app_config'.

),
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from dependency_injector import providers

from floware.services.notification_service import NotificationService
from flo_cloud.cloud_storage import CloudStorageManager
from floware.services.config_service import ConfigService


Expand All @@ -22,20 +21,16 @@ class ApplicationContainer(containers.DeclarativeContainer):
notification_repository = providers.Dependency()
notification_user_repository = providers.Dependency()
config_repository = providers.Dependency()
cloud_storage_manager = providers.Dependency()

# services
notification_service = providers.Singleton(
NotificationService, notification_repository, notification_user_repository
)

cloud_manager = providers.Singleton(
CloudStorageManager,
provider=config.cloud_config.cloud_provider,
)

config_service = providers.Singleton(
ConfigService,
config_repository=config_repository,
cloud_manager=cloud_manager,
cloud_storage_manager=cloud_storage_manager,
config=config,
)
8 changes: 6 additions & 2 deletions wavefront/server/apps/floware/floware/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
from plugins_module.controllers.message_processor_controller import (
message_processor_router,
)
from plugins_module.controllers.cloud_storage_controller import cloud_storage_router

# API Services Module
from api_services_module.api_services_container import create_api_services_container
Expand All @@ -119,6 +120,7 @@

application_container = ApplicationContainer(
db_client=db_repo_container.db_client,
cloud_storage_manager=common_container.cloud_storage_manager,
email_repository=db_repo_container.email_repository,
oauth_credential_repository=db_repo_container.oauth_credential_repository,
user_repository=db_repo_container.user_repository,
Expand All @@ -138,7 +140,7 @@

plugins_container = PluginsContainer(
db_client=db_repo_container.db_client,
cloud_manager=common_container.cloud_storage_manager,
cloud_storage_manager=common_container.cloud_storage_manager,
dynamic_query_repository=db_repo_container.dynamic_query_repository,
cache_manager=db_repo_container.cache_manager,
)
Expand All @@ -162,7 +164,7 @@
knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository,
message_processor_repository=plugins_container.message_processor_repository,
api_services_manager=api_services_container.api_service_manager,
cloud_manager=common_container.cloud_storage_manager,
cloud_storage_manager=common_container.cloud_storage_manager,
message_processor_bucket_name=bucket_name,
)

Expand Down Expand Up @@ -382,6 +384,7 @@ async def metrics(request: Request):
app.include_router(voice_agent_router, prefix='/floware')
app.include_router(tool_router, prefix='/floware')
app.include_router(message_processor_router, prefix='/floware')
app.include_router(cloud_storage_router, prefix='/floware')


@app.exception_handler(Exception)
Expand Down Expand Up @@ -495,6 +498,7 @@ async def global_exception_handler(request: Request, exc: Exception):
packages=[
'plugins_module.controllers',
'plugins_module.services',
'floware.controllers',
'user_management_module.controllers',
'user_management_module.authorization',
'tools_module.datasources',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ class ConfigService:
def __init__(
self,
config_repository: SQLAlchemyRepository[Config],
cloud_manager: CloudStorageManager,
cloud_storage_manager: CloudStorageManager,
config: dict[str, Any],
) -> None:
self.config_repository = config_repository
self.cloud_manager = cloud_manager
self.cloud_storage_manager = cloud_storage_manager
self.config = config

def _get_floware_credentials(self) -> dict[str, Any]:
Expand Down Expand Up @@ -46,7 +46,7 @@ async def store_app_config(

file_content = await file.read() if file else None
if file_content:
self.cloud_manager.save_small_file(
self.cloud_storage_manager.save_small_file(
file_content,
config_credentials['asset_storage_bucket'],
config_credentials['config_file_name'],
Expand Down Expand Up @@ -74,7 +74,7 @@ async def get_app_config(self):
config_path = config_record[0].value.get('app_icon')
config_credentials = self._get_floware_credentials()
# Generate new presigned URL
url = self.cloud_manager.generate_presigned_url(
url = self.cloud_storage_manager.generate_presigned_url(
config_credentials['asset_storage_bucket'],
config_path,
'get',
Expand Down
6 changes: 3 additions & 3 deletions wavefront/server/apps/floware/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def setup_containers(test_engine, test_session, test_user_id, test_session_id):


@pytest.fixture
def mock_cloud_manager():
def mock_cloud_storage_manager():
"""Mock CloudStorageManager for testing"""
mock_cloud = Mock(spec=CloudStorageManager)
mock_cloud.save_small_file = Mock()
Expand Down Expand Up @@ -203,13 +203,13 @@ async def mock_get_app_config():

@pytest.fixture
def setup_application_container(
mock_cloud_manager, mock_config_repository, mock_config, mock_config_service
mock_cloud_storage_manager, mock_config_repository, mock_config, mock_config_service
):
"""Setup ApplicationContainer with mocked dependencies"""
app_container = ApplicationContainer()

# Override the dependencies
app_container.cloud_manager.override(mock_cloud_manager)
app_container.cloud_storage_manager.override(mock_cloud_storage_manager)
app_container.config_repository.override(mock_config_repository)
app_container.config.override(mock_config)
app_container.config_service.override(mock_config_service)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

plugins_container = PluginsContainer(
db_client=db_repo_container.db_client,
cloud_manager=common_container.cloud_storage_manager,
cloud_storage_manager=common_container.cloud_storage_manager,
dynamic_query_repository=db_repo_container.dynamic_query_repository,
cache_manager=db_repo_container.cache_manager,
)
Expand All @@ -51,7 +51,7 @@
knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository,
message_processor_repository=plugins_container.message_processor_repository,
api_services_manager=api_services_container.api_service_manager,
cloud_manager=common_container.cloud_storage_manager,
cloud_storage_manager=common_container.cloud_storage_manager,
message_processor_bucket_name=bucket_name,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .datasource_controller import datasource_router
from .authenticator_controller import authenticator_router
from .cloud_storage_controller import cloud_storage_router

__all__ = ['datasource_router', 'authenticator_router']
__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Sort __all__ to satisfy Ruff (RUF022).

Current ordering triggers the static-analysis warning.

Proposed fix
-__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
+__all__ = ['authenticator_router', 'cloud_storage_router', 'datasource_router']
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
__all__ = ['authenticator_router', 'cloud_storage_router', 'datasource_router']
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 5-5: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py`
at line 5, The __all__ list in __init__.py is unsorted and triggers Ruff RUF022;
reorder the entries in the __all__ variable so they are in alphabetical order
(e.g., 'authenticator_router', 'cloud_storage_router', 'datasource_router') to
satisfy the linter and keep the exported module names sorted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
from enum import Enum
from typing import Optional

from dependency_injector.wiring import inject, Provide
from fastapi import Depends, Query, status
from fastapi.responses import JSONResponse
from fastapi.routing import APIRouter

from common_module.common_container import CommonContainer
from common_module.response_formatter import ResponseFormatter
from flo_cloud.cloud_storage import CloudStorageManager
from plugins_module.plugins_container import PluginsContainer


cloud_storage_router = APIRouter()


class StorageFileType(str, Enum):
json = 'json'


@cloud_storage_router.get('/v1/storage/signed-url')
@inject
async def get_resource_presigned_url(
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
expires_in: int = Query(
300, description='Expiry time in seconds for the presigned URL'
),
Comment on lines +27 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add lower-bound validation for expires_in.

expires_in currently accepts zero/negative values, which can generate invalid or immediately expired URLs.

Proposed fix
     expires_in: int = Query(
-        300, description='Expiry time in seconds for the presigned URL'
+        300,
+        ge=1,
+        description='Expiry time in seconds for the presigned URL',
     ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expires_in: int = Query(
300, description='Expiry time in seconds for the presigned URL'
),
expires_in: int = Query(
300,
ge=1,
description='Expiry time in seconds for the presigned URL',
),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 19 - 21, The expires_in query parameter currently allows zero or
negative values; update its Query declaration in cloud_storage_controller.py
(the expires_in: int = Query(...) parameter) to enforce a lower bound (e.g., add
ge=1) so only positive seconds are accepted, and adjust the description
accordingly; if you prefer explicit runtime checking instead, validate
expires_in at the start of the handler (the controller function using
expires_in) and return a 400/raise HTTPException when expires_in < 1.

response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
cloud_storage_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_storage_manager]
),
):
try:
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
presigned_url = cloud_storage_manager.generate_presigned_url(
bucket_name=bucket_name,
key=key,
type='GET',
expiresIn=expires_in,
)
Comment on lines +23 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for auth middleware and exemptions
echo "=== Searching for auth middleware and exemptions ==="
rg -n --type=py 'RequireAuthMiddleware|exclude|exempt|public|skip_auth|@protected|@auth' | head -50

echo ""
echo "=== Searching for the endpoint and related auth code ==="
rg -n --type=py 'get_resource_presigned_url' -B5 -A10 | head -100

echo ""
echo "=== Searching for CloudStorageManager implementation ==="
rg -n --type=py 'class CloudStorageManager' -A30 | head -100

echo ""
echo "=== Searching for get_bucket_key method ==="
rg -n --type=py 'def get_bucket_key' -B3 -A15

echo ""
echo "=== Searching for authorization check patterns ==="
rg -n --type=py 'check_auth|get_current_user|verify_access|authorize|is_admin' | head -30

Repository: rootflo/wavefront

Length of output: 28985


🏁 Script executed:

#!/bin/bash
# Search for cloud_storage_router registration and middleware setup
echo "=== Searching for cloud_storage_router registration and middleware ==="
rg -n --type=py 'cloud_storage_router|PluginsContainer' -B3 -A5 | head -80

echo ""
echo "=== Full cloud_storage_controller.py file ==="
cat wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py

echo ""
echo "=== Searching for current_user or authorization in plugins_module ==="
rg -n --type=py 'current_user|authorize|permission|scope' wavefront/server/modules/plugins_module/ | head -30

Repository: rootflo/wavefront

Length of output: 13600


Add authentication and authorization checks before generating presigned URLs.

The handler accepts a caller-supplied resource_url and signs it without verifying the caller's identity or access rights. Any request can generate presigned URLs for arbitrary buckets and keys, bypassing resource-level access controls.

Implement the same authorization pattern used elsewhere in the module (e.g., datasource_controller.py): call get_current_user(request) to extract user/role context, then validate that the caller has permission to access the specific bucket and key before signing the URL.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 22-24: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


[warning] 25-27: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 15 - 36, The get_resource_presigned_url handler currently signs any
caller-supplied resource_url without auth; before calling
cloud_storage_manager.get_bucket_key and generate_presigned_url, extract the
caller context using get_current_user(request) (same pattern as
datasource_controller) and enforce authorization for the resolved bucket/key
(e.g., call an existing permission check like
cloud_storage_manager.validate_access(user, bucket_name, key) or use the
module's authorization utility) to ensure the user has rights to access that
resource; only after successful validation proceed to call
cloud_storage_manager.generate_presigned_url with the bucket_name and key.

return JSONResponse(
status_code=status.HTTP_200_OK,
content=response_formatter.buildSuccessResponse(
{'resource_url': presigned_url}
),
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
Comment on lines +51 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not convert all failures into HTTP 400.

Unexpected internal failures should not be reported as client input errors. Keep 400 for validation errors and let unknown exceptions propagate as 500.

Proposed fix
-    except Exception as e:
+    except ValueError as e:
         return JSONResponse(
             status_code=status.HTTP_400_BAD_REQUEST,
             content=response_formatter.buildErrorResponse(str(e)),
         )
+    except Exception:
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
except ValueError as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
except Exception:
raise
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 43-43: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 43 - 47, The current broad except in cloud_storage_controller
catches all exceptions and returns HTTP 400; change it to only catch expected
client/validation errors (e.g., ValidationError or whatever specific exceptions
your input validation uses) and keep the JSONResponse with
status.HTTP_400_BAD_REQUEST and response_formatter.buildErrorResponse for those
specific exceptions, while allowing unexpected exceptions to propagate (or catch
Exception and return a 500 using status.HTTP_500_INTERNAL_SERVER_ERROR with an
appropriate generic error message) so internal failures are not reported as
client input errors; update the except block around the code that currently uses
JSONResponse, status.HTTP_400_BAD_REQUEST, and
response_formatter.buildErrorResponse accordingly (or re-raise the exception)
and ensure logging is added for internal errors if you choose to convert them to
500.



@cloud_storage_router.get('/v1/storage/read')
@inject
async def read_storage_file(
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
type: StorageFileType = Query(StorageFileType.json, description='File type'),
projection: Optional[str] = Query(
None,
description='Comma-separated list of top-level fields to return from the parsed data',
),
response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
cloud_storage_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_storage_manager]
),
):
try:
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
file_content = cloud_storage_manager.read_file(bucket_name, key)

if type == StorageFileType.json:
data = json.loads(file_content)
if projection:
fields = {f.strip() for f in projection.split(',') if f.strip()}
if isinstance(data, dict):
data = {k: v for k, v in data.items() if k in fields}
elif isinstance(data, list):
data = [
{k: v for k, v in item.items() if k in fields}
if isinstance(item, dict)
else item
for item in data
]

return JSONResponse(
status_code=status.HTTP_200_OK,
content=response_formatter.buildSuccessResponse({'data': data}),
)
except (json.JSONDecodeError, ValueError) as e:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=response_formatter.buildErrorResponse(
f'Failed to parse file as {type.value}: {str(e)}'
),
Comment on lines +96 to +101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Separate URL/key validation errors from JSON parse errors.

ValueError is grouped with JSONDecodeError and reported as “Failed to parse file as json”, which is misleading when the actual failure is invalid resource_url/bucket-key parsing.

Proposed fix
-    except (json.JSONDecodeError, ValueError) as e:
+    except ValueError as e:
+        return JSONResponse(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            content=response_formatter.buildErrorResponse(str(e)),
+        )
+    except json.JSONDecodeError as e:
         return JSONResponse(
             status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
             content=response_formatter.buildErrorResponse(
                 f'Failed to parse file as {type.value}: {str(e)}'
             ),
         )
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 100-100: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 96 - 101, The except block currently groups json.JSONDecodeError
and ValueError together in cloud_storage_controller.py, causing
resource_url/bucket-key validation ValueErrors to be reported as parse errors;
split into two except handlers: one except json.JSONDecodeError as e that
returns the JSON parse response (using JSONResponse,
status.HTTP_422_UNPROCESSABLE_ENTITY, response_formatter.buildErrorResponse) and
a separate except ValueError as e that returns a clearer validation error
message like "Invalid resource_url or bucket/key: {str(e)}" (same status and
response_formatter call), so callers can distinguish parsing vs URL/key
validation failures.

)
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ class PluginsContainer(containers.DeclarativeContainer):
# dynamic query service
cloud_provider = config.cloud_config.cloud_provider

cloud_manager = providers.Singleton(
cloud_storage_manager = providers.Singleton(
CloudStorageManager, provider=config.cloud_config.cloud_provider
)

dynamic_query_service = providers.Singleton(
DynamicQueryService,
cloud_manager=cloud_manager,
cloud_storage_manager=cloud_storage_manager,
dynamic_query_repo=dynamic_query_repository,
bucket_name=config.floware.asset_storage_bucket,
)

message_processor_service = providers.Singleton(
MessageProcessorService,
cloud_manager=cloud_manager,
cloud_storage_manager=cloud_storage_manager,
message_processor_repository=message_processor_repository,
bucket_name=config.floware.asset_storage_bucket,
hermes_url=config.hermes.url,
bucket_name=config.floware.asset_storage_bucket,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
from common_module.log.logger import logger
from flo_cloud.cloud_storage import CloudStorageManager
from db_repo_module.models.dynamic_query_yaml import DynamicQueryYaml
from typing import Optional


class DynamicQueryService:
def __init__(
self,
cloud_manager: CloudStorageManager,
cloud_storage_manager: CloudStorageManager,
dynamic_query_repo: SQLAlchemyRepository[DynamicQueryYaml],
bucket_name: str = None,
bucket_name: Optional[str] = None,
):
self.cloud_manager = cloud_manager
self.cloud_storage_manager = cloud_storage_manager
self.dynamic_query_repo = dynamic_query_repo
self.bucket_name = bucket_name
self.prefix = 'dynamic_query/v1'
Expand Down Expand Up @@ -40,7 +41,7 @@ async def store_yaml_to_bucket(self, yaml_content: dict, datasource_id: str):
file_content = yaml_string.encode('utf-8')

# storing to s3bucket
self.cloud_manager.save_small_file(
self.cloud_storage_manager.save_small_file(
file_content=file_content, bucket_name=self.bucket_name, key=file_key
)

Expand Down Expand Up @@ -70,7 +71,7 @@ async def retrive_dynamic_query_yaml(self, page_number, page_size):
Returns:
dict: Contains yamls list, pagination info, and total count
"""
files_keys, has_more = self.cloud_manager.list_files(
files_keys, has_more = self.cloud_storage_manager.list_files(
self.bucket_name, self.prefix, page_size, page_number
)
yamls = []
Expand Down Expand Up @@ -100,7 +101,7 @@ async def get_dynamic_yaml_query(self, query_id: str):
dict: Contains yaml query and their parameters
"""
file_key = f'{self.prefix}/{query_id}.yaml'
file_content = self.cloud_manager.read_file(self.bucket_name, file_key)
file_content = self.cloud_storage_manager.read_file(self.bucket_name, file_key)
yaml_query = yaml.safe_load(file_content.decode('utf-8'))
if not yaml_query:
raise ValueError('YAML file is invalid')
Expand Down Expand Up @@ -141,7 +142,7 @@ async def delete_dynamic_query(self, datasource_id: str, query_id: str):
raise ValueError(f'Query {query_id} not found')

# deleting the file from the cloud storage
self.cloud_manager.delete_file(self.bucket_name, query.file_path)
self.cloud_storage_manager.delete_file(self.bucket_name, query.file_path)
# deleting the record from the database
await self.dynamic_query_repo.delete_all(name=query_id)

Expand Down
Loading