Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
validate_yaml_query,
)
import csv
import io
import yaml
from ..utils.helper import DynamicQueryRequest
from ..utils.helper import DynamicQueryExecuteRequest
Expand All @@ -55,29 +54,6 @@
datasource_router = APIRouter()


def _serialized_rows_to_csv(rows: list) -> bytes:
"""Convert a list of serialized dicts (e.g. from execute_dynamic_query) to CSV bytes."""
if not rows:
return b''
out = io.StringIO()
fieldnames = list(rows[0].keys())
for row in rows[1:]:
for k in row:
if k not in fieldnames:
fieldnames.append(k)
writer = csv.DictWriter(out, fieldnames=fieldnames, extrasaction='ignore')

def _cell_value(v):
if isinstance(v, (dict, list)):
return json.dumps(v)
return v if v is None or isinstance(v, str) else str(v)

writer.writeheader()
for row in rows:
writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames})
return out.getvalue().encode('utf-8-sig')


@datasource_router.post('/v1/datasources')
@inject
async def add_datasource(
Expand Down Expand Up @@ -769,8 +745,8 @@ async def export_dynamic_query_csv(
Provide[PluginsContainer.dynamic_query_service]
),
user_service: UserService = Depends(Provide[UserContainer.user_service]),
cloud_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_manager]
cloud_storage_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_storage_manager]
),
config: dict = Depends(Provide[PluginsContainer.config]),
cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]),
Expand Down Expand Up @@ -837,14 +813,14 @@ async def export_dynamic_query_csv(

# If not force_fetch, return existing file from bucket if present
if not force_fetch:
existing_keys, _ = cloud_manager.list_files(
existing_keys, _ = cloud_storage_manager.list_files(
bucket_name=bucket_name,
prefix=file_key,
page_size=1,
page_number=1,
)
if existing_keys and existing_keys[0] == file_key:
signed_url = cloud_manager.generate_presigned_url(
signed_url = cloud_storage_manager.generate_presigned_url(
bucket_name=bucket_name, key=file_key, type='GET'
)
return JSONResponse(
Expand Down Expand Up @@ -907,7 +883,7 @@ def _cell_value(v):
return json.dumps(v)
return v if v is None or isinstance(v, str) else str(v)

with cloud_manager.open_text_writer(
with cloud_storage_manager.open_text_writer(
bucket_name=bucket_name, key=file_key, content_type='text/csv'
) as f:
if fieldnames:
Expand All @@ -916,7 +892,7 @@ def _cell_value(v):
for row in rows:
writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames})

signed_url = cloud_manager.generate_presigned_url(
signed_url = cloud_storage_manager.generate_presigned_url(
bucket_name=bucket_name, key=file_key, type='GET'
)

Expand Down