Skip to content

Commit b1ab5c3

Browse files
authored
chore: update redshift functions (#235)
* chore: update redshift functions * feat: add datasources in get config * feat: add get resource url api using cloud storage manager * chore: rename all cloud_manager to cloud_storage_manager * feat: add read api for cloud storage files * fix: fix storage read
1 parent 8062a9c commit b1ab5c3

16 files changed

Lines changed: 359 additions & 83 deletions

File tree

wavefront/server/apps/floware/floware/controllers/config_controller.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from fastapi.params import Depends
99
from dependency_injector.wiring import inject
1010
from dependency_injector.wiring import Provide
11+
from db_repo_module.models.datasource import Datasource
12+
from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository
13+
from plugins_module.plugins_container import PluginsContainer
1114
from floware.services.config_service import ConfigService
1215
from user_management_module.utils.user_utils import get_current_user, check_is_admin
1316
from fastapi import HTTPException
@@ -74,6 +77,10 @@ async def get_config(
7477
ConfigService,
7578
Depends(Provide[ApplicationContainer.config_service]),
7679
],
80+
datasource_repository: Annotated[
81+
SQLAlchemyRepository[Datasource],
82+
Depends(Provide[PluginsContainer.datasource_repository]),
83+
],
7784
response_formatter: Annotated[
7885
ResponseFormatter,
7986
Depends(Provide[CommonContainer.response_formatter]),
@@ -84,16 +91,16 @@ async def get_config(
8491
such as logo, table to query, etc.
8592
"""
8693
url, app_config = await config_service.get_app_config()
87-
if not url:
88-
return JSONResponse(
89-
status_code=status.HTTP_200_OK,
90-
content=response_formatter.buildSuccessResponse(
91-
{'message': 'No config found'}
92-
),
93-
)
94+
datasources = await datasource_repository.find()
95+
datasource_ids = [str(datasource.id) for datasource in datasources]
96+
9497
return JSONResponse(
9598
status_code=status.HTTP_200_OK,
9699
content=response_formatter.buildSuccessResponse(
97-
{'app_icon': url, 'app_config': app_config}
100+
{
101+
'app_icon': url,
102+
'app_config': app_config,
103+
'datasources': datasource_ids,
104+
}
98105
),
99106
)

wavefront/server/apps/floware/floware/di/application_container.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from dependency_injector import providers
33

44
from floware.services.notification_service import NotificationService
5-
from flo_cloud.cloud_storage import CloudStorageManager
65
from floware.services.config_service import ConfigService
76

87

@@ -22,20 +21,16 @@ class ApplicationContainer(containers.DeclarativeContainer):
2221
notification_repository = providers.Dependency()
2322
notification_user_repository = providers.Dependency()
2423
config_repository = providers.Dependency()
24+
cloud_storage_manager = providers.Dependency()
2525

2626
# services
2727
notification_service = providers.Singleton(
2828
NotificationService, notification_repository, notification_user_repository
2929
)
3030

31-
cloud_manager = providers.Singleton(
32-
CloudStorageManager,
33-
provider=config.cloud_config.cloud_provider,
34-
)
35-
3631
config_service = providers.Singleton(
3732
ConfigService,
3833
config_repository=config_repository,
39-
cloud_manager=cloud_manager,
34+
cloud_storage_manager=cloud_storage_manager,
4035
config=config,
4136
)

wavefront/server/apps/floware/floware/server.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
from plugins_module.controllers.message_processor_controller import (
9494
message_processor_router,
9595
)
96+
from plugins_module.controllers.cloud_storage_controller import cloud_storage_router
9697

9798
# API Services Module
9899
from api_services_module.api_services_container import create_api_services_container
@@ -119,6 +120,7 @@
119120

120121
application_container = ApplicationContainer(
121122
db_client=db_repo_container.db_client,
123+
cloud_storage_manager=common_container.cloud_storage_manager,
122124
email_repository=db_repo_container.email_repository,
123125
oauth_credential_repository=db_repo_container.oauth_credential_repository,
124126
user_repository=db_repo_container.user_repository,
@@ -138,7 +140,7 @@
138140

139141
plugins_container = PluginsContainer(
140142
db_client=db_repo_container.db_client,
141-
cloud_manager=common_container.cloud_storage_manager,
143+
cloud_storage_manager=common_container.cloud_storage_manager,
142144
dynamic_query_repository=db_repo_container.dynamic_query_repository,
143145
cache_manager=db_repo_container.cache_manager,
144146
)
@@ -162,7 +164,7 @@
162164
knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository,
163165
message_processor_repository=plugins_container.message_processor_repository,
164166
api_services_manager=api_services_container.api_service_manager,
165-
cloud_manager=common_container.cloud_storage_manager,
167+
cloud_storage_manager=common_container.cloud_storage_manager,
166168
message_processor_bucket_name=bucket_name,
167169
)
168170

@@ -382,6 +384,7 @@ async def metrics(request: Request):
382384
app.include_router(voice_agent_router, prefix='/floware')
383385
app.include_router(tool_router, prefix='/floware')
384386
app.include_router(message_processor_router, prefix='/floware')
387+
app.include_router(cloud_storage_router, prefix='/floware')
385388

386389

387390
@app.exception_handler(Exception)
@@ -495,6 +498,7 @@ async def global_exception_handler(request: Request, exc: Exception):
495498
packages=[
496499
'plugins_module.controllers',
497500
'plugins_module.services',
501+
'floware.controllers',
498502
'user_management_module.controllers',
499503
'user_management_module.authorization',
500504
'tools_module.datasources',

wavefront/server/apps/floware/floware/services/config_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ class ConfigService:
1010
def __init__(
1111
self,
1212
config_repository: SQLAlchemyRepository[Config],
13-
cloud_manager: CloudStorageManager,
13+
cloud_storage_manager: CloudStorageManager,
1414
config: dict[str, Any],
1515
) -> None:
1616
self.config_repository = config_repository
17-
self.cloud_manager = cloud_manager
17+
self.cloud_storage_manager = cloud_storage_manager
1818
self.config = config
1919

2020
def _get_floware_credentials(self) -> dict[str, Any]:
@@ -46,7 +46,7 @@ async def store_app_config(
4646

4747
file_content = await file.read() if file else None
4848
if file_content:
49-
self.cloud_manager.save_small_file(
49+
self.cloud_storage_manager.save_small_file(
5050
file_content,
5151
config_credentials['asset_storage_bucket'],
5252
config_credentials['config_file_name'],
@@ -74,7 +74,7 @@ async def get_app_config(self):
7474
config_path = config_record[0].value.get('app_icon')
7575
config_credentials = self._get_floware_credentials()
7676
# Generate new presigned URL
77-
url = self.cloud_manager.generate_presigned_url(
77+
url = self.cloud_storage_manager.generate_presigned_url(
7878
config_credentials['asset_storage_bucket'],
7979
config_path,
8080
'get',

wavefront/server/apps/floware/tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def setup_containers(test_engine, test_session, test_user_id, test_session_id):
153153

154154

155155
@pytest.fixture
156-
def mock_cloud_manager():
156+
def mock_cloud_storage_manager():
157157
"""Mock CloudStorageManager for testing"""
158158
mock_cloud = Mock(spec=CloudStorageManager)
159159
mock_cloud.save_small_file = Mock()
@@ -203,13 +203,13 @@ async def mock_get_app_config():
203203

204204
@pytest.fixture
205205
def setup_application_container(
206-
mock_cloud_manager, mock_config_repository, mock_config, mock_config_service
206+
mock_cloud_storage_manager, mock_config_repository, mock_config, mock_config_service
207207
):
208208
"""Setup ApplicationContainer with mocked dependencies"""
209209
app_container = ApplicationContainer()
210210

211211
# Override the dependencies
212-
app_container.cloud_manager.override(mock_cloud_manager)
212+
app_container.cloud_storage_manager.override(mock_cloud_storage_manager)
213213
app_container.config_repository.override(mock_config_repository)
214214
app_container.config.override(mock_config)
215215
app_container.config_service.override(mock_config_service)

wavefront/server/background_jobs/workflow_job/workflow_job/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
plugins_container = PluginsContainer(
3535
db_client=db_repo_container.db_client,
36-
cloud_manager=common_container.cloud_storage_manager,
36+
cloud_storage_manager=common_container.cloud_storage_manager,
3737
dynamic_query_repository=db_repo_container.dynamic_query_repository,
3838
cache_manager=db_repo_container.cache_manager,
3939
)
@@ -51,7 +51,7 @@
5151
knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository,
5252
message_processor_repository=plugins_container.message_processor_repository,
5353
api_services_manager=api_services_container.api_service_manager,
54-
cloud_manager=common_container.cloud_storage_manager,
54+
cloud_storage_manager=common_container.cloud_storage_manager,
5555
message_processor_bucket_name=bucket_name,
5656
)
5757

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .datasource_controller import datasource_router
22
from .authenticator_controller import authenticator_router
3+
from .cloud_storage_controller import cloud_storage_router
34

4-
__all__ = ['datasource_router', 'authenticator_router']
5+
__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import json
2+
from enum import Enum
3+
from typing import Optional
4+
5+
from dependency_injector.wiring import inject, Provide
6+
from fastapi import Depends, Query, status
7+
from fastapi.responses import JSONResponse
8+
from fastapi.routing import APIRouter
9+
10+
from common_module.common_container import CommonContainer
11+
from common_module.response_formatter import ResponseFormatter
12+
from flo_cloud.cloud_storage import CloudStorageManager
13+
from plugins_module.plugins_container import PluginsContainer
14+
15+
16+
cloud_storage_router = APIRouter()
17+
18+
19+
class StorageFileType(str, Enum):
20+
json = 'json'
21+
22+
23+
@cloud_storage_router.get('/v1/storage/signed-url')
24+
@inject
25+
async def get_resource_presigned_url(
26+
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
27+
expires_in: int = Query(
28+
300, description='Expiry time in seconds for the presigned URL'
29+
),
30+
response_formatter: ResponseFormatter = Depends(
31+
Provide[CommonContainer.response_formatter]
32+
),
33+
cloud_storage_manager: CloudStorageManager = Depends(
34+
Provide[PluginsContainer.cloud_storage_manager]
35+
),
36+
):
37+
try:
38+
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
39+
presigned_url = cloud_storage_manager.generate_presigned_url(
40+
bucket_name=bucket_name,
41+
key=key,
42+
type='GET',
43+
expiresIn=expires_in,
44+
)
45+
return JSONResponse(
46+
status_code=status.HTTP_200_OK,
47+
content=response_formatter.buildSuccessResponse(
48+
{'resource_url': presigned_url}
49+
),
50+
)
51+
except Exception as e:
52+
return JSONResponse(
53+
status_code=status.HTTP_400_BAD_REQUEST,
54+
content=response_formatter.buildErrorResponse(str(e)),
55+
)
56+
57+
58+
@cloud_storage_router.get('/v1/storage/read')
59+
@inject
60+
async def read_storage_file(
61+
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
62+
type: StorageFileType = Query(StorageFileType.json, description='File type'),
63+
projection: Optional[str] = Query(
64+
None,
65+
description='Comma-separated list of top-level fields to return from the parsed data',
66+
),
67+
response_formatter: ResponseFormatter = Depends(
68+
Provide[CommonContainer.response_formatter]
69+
),
70+
cloud_storage_manager: CloudStorageManager = Depends(
71+
Provide[PluginsContainer.cloud_storage_manager]
72+
),
73+
):
74+
try:
75+
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
76+
file_content = cloud_storage_manager.read_file(bucket_name, key)
77+
78+
if type == StorageFileType.json:
79+
data = json.loads(file_content)
80+
if projection:
81+
fields = {f.strip() for f in projection.split(',') if f.strip()}
82+
if isinstance(data, dict):
83+
data = {k: v for k, v in data.items() if k in fields}
84+
elif isinstance(data, list):
85+
data = [
86+
{k: v for k, v in item.items() if k in fields}
87+
if isinstance(item, dict)
88+
else item
89+
for item in data
90+
]
91+
92+
return JSONResponse(
93+
status_code=status.HTTP_200_OK,
94+
content=response_formatter.buildSuccessResponse({'data': data}),
95+
)
96+
except (json.JSONDecodeError, ValueError) as e:
97+
return JSONResponse(
98+
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
99+
content=response_formatter.buildErrorResponse(
100+
f'Failed to parse file as {type.value}: {str(e)}'
101+
),
102+
)

wavefront/server/modules/plugins_module/plugins_module/plugins_container.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,21 @@ class PluginsContainer(containers.DeclarativeContainer):
3939
# dynamic query service
4040
cloud_provider = config.cloud_config.cloud_provider
4141

42-
cloud_manager = providers.Singleton(
42+
cloud_storage_manager = providers.Singleton(
4343
CloudStorageManager, provider=config.cloud_config.cloud_provider
4444
)
4545

4646
dynamic_query_service = providers.Singleton(
4747
DynamicQueryService,
48-
cloud_manager=cloud_manager,
48+
cloud_storage_manager=cloud_storage_manager,
4949
dynamic_query_repo=dynamic_query_repository,
5050
bucket_name=config.floware.asset_storage_bucket,
5151
)
5252

5353
message_processor_service = providers.Singleton(
5454
MessageProcessorService,
55-
cloud_manager=cloud_manager,
55+
cloud_storage_manager=cloud_storage_manager,
5656
message_processor_repository=message_processor_repository,
57-
bucket_name=config.floware.asset_storage_bucket,
5857
hermes_url=config.hermes.url,
58+
bucket_name=config.floware.asset_storage_bucket,
5959
)

wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@
33
from common_module.log.logger import logger
44
from flo_cloud.cloud_storage import CloudStorageManager
55
from db_repo_module.models.dynamic_query_yaml import DynamicQueryYaml
6+
from typing import Optional
67

78

89
class DynamicQueryService:
910
def __init__(
1011
self,
11-
cloud_manager: CloudStorageManager,
12+
cloud_storage_manager: CloudStorageManager,
1213
dynamic_query_repo: SQLAlchemyRepository[DynamicQueryYaml],
13-
bucket_name: str = None,
14+
bucket_name: Optional[str] = None,
1415
):
15-
self.cloud_manager = cloud_manager
16+
self.cloud_storage_manager = cloud_storage_manager
1617
self.dynamic_query_repo = dynamic_query_repo
1718
self.bucket_name = bucket_name
1819
self.prefix = 'dynamic_query/v1'
@@ -40,7 +41,7 @@ async def store_yaml_to_bucket(self, yaml_content: dict, datasource_id: str):
4041
file_content = yaml_string.encode('utf-8')
4142

4243
# storing to s3bucket
43-
self.cloud_manager.save_small_file(
44+
self.cloud_storage_manager.save_small_file(
4445
file_content=file_content, bucket_name=self.bucket_name, key=file_key
4546
)
4647

@@ -70,7 +71,7 @@ async def retrive_dynamic_query_yaml(self, page_number, page_size):
7071
Returns:
7172
dict: Contains yamls list, pagination info, and total count
7273
"""
73-
files_keys, has_more = self.cloud_manager.list_files(
74+
files_keys, has_more = self.cloud_storage_manager.list_files(
7475
self.bucket_name, self.prefix, page_size, page_number
7576
)
7677
yamls = []
@@ -100,7 +101,7 @@ async def get_dynamic_yaml_query(self, query_id: str):
100101
dict: Contains yaml query and their parameters
101102
"""
102103
file_key = f'{self.prefix}/{query_id}.yaml'
103-
file_content = self.cloud_manager.read_file(self.bucket_name, file_key)
104+
file_content = self.cloud_storage_manager.read_file(self.bucket_name, file_key)
104105
yaml_query = yaml.safe_load(file_content.decode('utf-8'))
105106
if not yaml_query:
106107
raise ValueError('YAML file is invalid')
@@ -141,7 +142,7 @@ async def delete_dynamic_query(self, datasource_id: str, query_id: str):
141142
raise ValueError(f'Query {query_id} not found')
142143

143144
# deleting the file from the cloud storage
144-
self.cloud_manager.delete_file(self.bucket_name, query.file_path)
145+
self.cloud_storage_manager.delete_file(self.bucket_name, query.file_path)
145146
# deleting the record from the database
146147
await self.dynamic_query_repo.delete_all(name=query_id)
147148

0 commit comments

Comments
 (0)