Skip to content

chore: update redshift functions#235

Merged
vizsatiz merged 7 commits intodevelopfrom
feat/redshift-query-update
Mar 7, 2026
Merged

chore: update redshift functions#235
vizsatiz merged 7 commits intodevelopfrom
feat/redshift-query-update

Conversation

@vishnurk6247
Copy link
Member

@vishnurk6247 vishnurk6247 commented Mar 3, 2026

Summary by CodeRabbit

  • New Features

    • Query multiple tables with optional joins and receive row results as dictionaries
    • Dynamic multi-query execution with OData-style filtering, parameter substitution, per-query results and error reporting
    • Asynchronous query execution for data endpoints
    • Cloud storage endpoints: presigned-URL retrieval and JSON file read (with optional field projection)
    • Config responses now include available datasource IDs
  • Bug Fixes

    • Improved query error handling and clearer connection-test logging
  • Chores

    • DI provider renamed to cloud_storage_manager across services and tests

@coderabbitai
Copy link

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds dict-returning Redshift query helpers and multi-table join construction; makes Redshift plugin async, implements multi-query execution with OData substitutions and per-query orchestration; adds cloud storage presigned-url and read endpoints; renames DI/provider surface from cloud_manager to cloud_storage_manager across plugins, services, DI and tests.

Changes

Cohort / File(s) Summary
Redshift client
wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py
Added execute_query_as_dict returning List[Dict]; new private __get_join_query to build multi-table JOIN SELECTs; extended execute_query_to_dict to accept table_prefix, table_names, join_query, group_by, order_by, validation and improved error/log handling; call sites switched to dict results.
Redshift plugin & orchestration
wavefront/server/plugins/datasource/datasource/redshift/__init__.py
Made test_connection async using asyncio.to_thread; fetch_data now accepts table_names and join_query and forwards to client; added execute_query (async wrapper) and implemented execute_dynamic_query to validate queries, apply OData substitutions ({{rls}}, {{filters}}), append LIMIT/OFFSET, execute per-query via asyncio.to_thread, and aggregate statuses/results.
Floware config & server wiring
wavefront/server/apps/floware/floware/controllers/config_controller.py, wavefront/server/apps/floware/floware/server.py
get_config now depends on datasource repository and always returns a datasources list; injected/registers cloud_storage_manager and mounts cloud_storage_router; added floware.controllers to plugin wiring.
Cloud storage router & exports
wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py, .../controllers/__init__.py
New cloud_storage_router with endpoints: GET /v1/storage/signed-url (presigned GET URL) and GET /v1/storage/read (read JSON with optional projection); exported via controllers __all__.
PluginsContainer DI renames
wavefront/server/modules/plugins_module/plugins_module/plugins_container.py
Renamed provider/arg cloud_managercloud_storage_manager; updated providers that passed this dependency and reordered message_processor_service args to include hermes_url before bucket_name.
Services using cloud storage
wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py, .../services/message_processor_service.py
Replaced cloud_manager references with cloud_storage_manager; made bucket_name optional and updated constructor signatures and internal attribute names.
Application DI & consumers
wavefront/server/apps/floware/floware/di/application_container.py, .../services/config_service.py, .../tests/conftest.py, .../workflow_job/main.py, .../tools_module/...
Added cloud_storage_manager dependency to ApplicationContainer, removed old cloud_manager singleton; updated ConfigService, test fixtures, ToolsContainer, MessageProcessor wiring and callers to use cloud_storage_manager (constructor/attribute renames and overrides).
Tools & provider updates
wavefront/server/modules/tools_module/tools_module/message_processor/provider.py, .../tools_container.py
Renamed constructor parameter and attribute from cloud_manager to cloud_storage_manager; updated internal reads to use new attribute.

Sequence Diagram

sequenceDiagram
    participant Caller as Client/API Caller
    participant Plugin as Redshift Plugin
    participant Thread as Async Thread
    participant ClientLib as Redshift Client
    participant DB as Redshift DB

    Caller->>Plugin: execute_dynamic_query(queries, odata_params)
    loop per query
        Plugin->>Plugin: validate query_id & params
        Plugin->>Plugin: apply OData substitutions ({{rls}}, {{filters}})
        Plugin->>Plugin: append LIMIT/OFFSET
        Plugin->>Thread: asyncio.to_thread(client.execute_query_as_dict, query, params)
        Thread->>ClientLib: execute_query_as_dict(query, params)
        ClientLib->>DB: run SELECT (joins/filters)
        DB-->>ClientLib: return rows
        ClientLib-->>Thread: rows as List[Dict]
        Thread-->>Plugin: result or error
        Plugin->>Plugin: record per-query status/result
    end
    Plugin-->>Caller: aggregated per-query outcomes
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I hopped through joins and async streams,

Dicts now sparkle in query dreams,
Filters stitched and limits met,
Signed URLs and data fetched yet,
A happy hop — code carrots set!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning The title states 'update redshift functions' but the PR significantly expands Redshift functionality with new methods, async handling, cloud storage support, and query processing features across multiple modules. Revise the title to better reflect the scope of changes, such as 'feat: add async Redshift queries, cloud storage endpoints, and multi-table support' or similar to accurately represent the feature additions.
Docstring Coverage ⚠️ Warning Docstring coverage is 48.72% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/redshift-query-update

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py`:
- Around line 220-237: The code is building SQL by concatenating unsafe
fragments (projection, table_names, where_clause, join_query, order_by,
group_by) which is SQL-injection prone; update the query construction in the
method that builds these queries (including __get_join_query and the SELECT/ELSE
branch shown) to stop interpolating raw fragments: enforce strict whitelists or
validate/quote identifiers (tables, columns, aliases) using a safe
identifier-quoting helper, build ORDER BY/GROUP BY only from validated column
names and directions, convert any value conditions in where_clause into
parameterized bind params (or let callers supply bind params instead of raw
SQL), and if complex joins are needed use a proper query builder (e.g.,
SQLAlchemy Core text with bindparams or a sanitized assembler) rather than
string concatenation. Ensure the updated logic applies to both the join path
(__get_join_query) and the non-join SELECT branch so all fragments are
validated/quoted and values are bound.
- Around line 268-273: The current alias list creation uses
string.ascii_lowercase which will IndexError once len(table_names) > 26; update
the alias generation used where aliases is created (and referenced in the loop
that iterates over table_names, join_query, and where_clause) to produce a
scalable sequence (a, b, ..., z, aa, ab, ...) instead of a fixed 26-item list;
implement a small helper (e.g., generate_alias(i) or an itertools-based
generator) and replace aliases = list(string.ascii_lowercase) with that
generator so alias = generate_alias(i) is safe for any number of tables and
avoids IndexError.
- Around line 274-279: The current string replace calls for
processed_join/processed_where can corrupt SQL (e.g., turning "LEFT JOIN" into
"LEFT LEFT JOIN" or altering unintended substrings); replace them with targeted
regex-based replacements that match whole-word JOIN occurrences and
table-qualified column references only: for the JOIN clause, match a
case-insensitive pattern like a word-boundary JOIN followed by the exact
table_name (and ensure it is not already preceded by LEFT/INNER/RIGHT via a
negative lookbehind or by checking the captured join keyword) and replace with
"LEFT JOIN {qualified} AS {alias}"; for column refs, replace occurrences of the
exact sequence "{table_name}." using a word-boundary or lookbehind so you only
change true table qualifiers (not substrings), applying these to processed_join
and processed_where and using the function/variables processed_join,
processed_where, table_name, alias, qualified to locate the code to update.
- Line 240: The current logger.debug call prints the full SQL string via
logger.debug(f'Executing query: {query}'), which may leak sensitive predicates;
change it to avoid logging raw SQL by replacing that call with a metadata-only
log: log a redaction or fingerprint of the SQL (e.g., SHA256 hash or truncated
preview), the statement type (SELECT/UPDATE/...), and any non-sensitive
parameter counts or keys (but not their values); update the code that references
the query variable and the logger.debug call so it emits only the redacted_query
or query_hash and parameter metadata instead of the full query string.

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py`:
- Around line 80-86: The default for parameters in the block using
query_obj.get('parameters', {}) is wrong — change it to an empty list
(query_params = query_obj.get('parameters', [])) and validate its shape before
using it: ensure query_params is a list and each element is a mapping with a
'name' key (raise a ValueError or skip invalid entries). Update the logic that
builds params_key = [p['name'] for p in query_params] and params_to_execute to
handle an empty list safely and to fail-fast with a clear message if an item
lacks 'name' (reference symbols: query_obj, query_params, params_key,
params_to_execute).
- Around line 82-94: The current validation raises ValueError and aborts the
whole batch when a single query is invalid; instead, change the logic in the
block that checks query_id and builds params_to_execute (references: query_id,
query_params, params_to_execute, params) to collect validation errors per query
and skip execution for that query while allowing the loop to continue for
others. Concretely: if query_id is falsy or any expected param from params_key
is missing, do not raise; instead append an error entry (including query_id or
an index and a descriptive message) to the per-query results/errors aggregation
structure and continue to the next query; only build and use params_to_execute
for queries that pass validation so later per-query result aggregation remains
consistent.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 60ad6f1 and 20d52c5.

📒 Files selected for processing (2)
  • wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py
  • wavefront/server/plugins/datasource/datasource/redshift/__init__.py

Comment on lines +220 to +237
if join_query:
query = self.__get_join_query(
join_query,
table_names,
table_prefix,
projection,
where_clause,
limit,
offset,
order_by,
group_by=group_by,
)
else:
query = (
f'SELECT {projection} FROM {base_table} AS a '
f'WHERE {where_clause} {group_by_clause} {order_by_clause} '
f'LIMIT {limit} OFFSET {offset}'
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: dynamic SQL assembly is injectable.

projection, table_names, where_clause, join_query, order_by, and group_by are concatenated directly into SQL. Parameter binding only protects values, not SQL fragments/identifiers.

Suggested hardening pattern
-        if join_query:
+        if join_query:
             query = self.__get_join_query(
                 join_query,
                 table_names,
                 table_prefix,
                 projection,
                 where_clause,
                 limit,
                 offset,
                 order_by,
                 group_by=group_by,
             )
         else:
             query = (
-                f'SELECT {projection} FROM {base_table} AS a '
-                f'WHERE {where_clause} {group_by_clause} {order_by_clause} '
-                f'LIMIT {limit} OFFSET {offset}'
+                f'SELECT {self._validate_projection(projection)} '
+                f'FROM {self._validate_identifier(base_table)} AS a '
+                f'WHERE {self._validate_sql_fragment(where_clause)} {group_by_clause} {order_by_clause} '
+                'LIMIT :limit OFFSET :offset'
             )
+        exec_params = {**(params or {}), 'limit': int(limit), 'offset': int(offset)}
 ...
-                cursor.execute(query, params)
+                cursor.execute(query, exec_params)

Also applies to: 252-289

🧰 Tools
🪛 Ruff (0.15.2)

[error] 234-236: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py` around lines
220 - 237, The code is building SQL by concatenating unsafe fragments
(projection, table_names, where_clause, join_query, order_by, group_by) which is
SQL-injection prone; update the query construction in the method that builds
these queries (including __get_join_query and the SELECT/ELSE branch shown) to
stop interpolating raw fragments: enforce strict whitelists or validate/quote
identifiers (tables, columns, aliases) using a safe identifier-quoting helper,
build ORDER BY/GROUP BY only from validated column names and directions, convert
any value conditions in where_clause into parameterized bind params (or let
callers supply bind params instead of raw SQL), and if complex joins are needed
use a proper query builder (e.g., SQLAlchemy Core text with bindparams or a
sanitized assembler) rather than string concatenation. Ensure the updated logic
applies to both the join path (__get_join_query) and the non-join SELECT branch
so all fragments are validated/quoted and values are bound.

)

try:
logger.debug(f'Executing query: {query}')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging full SQL text with dynamic filters.

Full query logging can leak sensitive predicates or identifiers in logs. Prefer metadata-only logging or redacted SQL.

Safer logging example
-            logger.debug(f'Executing query: {query}')
+            logger.debug(
+                'Executing Redshift query',
+                extra={
+                    'has_params': bool(params),
+                    'limit': limit,
+                    'offset': offset,
+                    'table_count': len(table_names),
+                },
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py` at line 240,
The current logger.debug call prints the full SQL string via
logger.debug(f'Executing query: {query}'), which may leak sensitive predicates;
change it to avoid logging raw SQL by replacing that call with a metadata-only
log: log a redaction or fingerprint of the SQL (e.g., SHA256 hash or truncated
preview), the statement type (SELECT/UPDATE/...), and any non-sensitive
parameter counts or keys (but not their values); update the code that references
the query variable and the logger.debug call so it emits only the redacted_query
or query_hash and parameter metadata instead of the full query string.

Comment on lines +268 to +273
aliases = list(string.ascii_lowercase)
processed_join = join_query
processed_where = where_clause
for i, table_name in enumerate(table_names):
alias = aliases[i]
qualified = f'{table_prefix}{table_name}'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle alias overflow when joining many tables.

string.ascii_lowercase supports only 26 aliases. More tables will fail with IndexError.

Boundary guard
         aliases = list(string.ascii_lowercase)
+        if len(table_names) > len(aliases):
+            raise ValueError('join_query supports at most 26 tables')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py` around lines
268 - 273, The current alias list creation uses string.ascii_lowercase which
will IndexError once len(table_names) > 26; update the alias generation used
where aliases is created (and referenced in the loop that iterates over
table_names, join_query, and where_clause) to produce a scalable sequence (a, b,
..., z, aa, ab, ...) instead of a fixed 26-item list; implement a small helper
(e.g., generate_alias(i) or an itertools-based generator) and replace aliases =
list(string.ascii_lowercase) with that generator so alias = generate_alias(i) is
safe for any number of tables and avoids IndexError.

Comment on lines +274 to +279
processed_join = processed_join.replace(
f'JOIN {table_name}',
f'LEFT JOIN {qualified} AS {alias}',
)
processed_join = processed_join.replace(f'{table_name}.', f'{alias}.')
processed_where = processed_where.replace(f'{table_name}.', f'{alias}.')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

JOIN rewrite via replace() can generate invalid SQL.

If input already contains LEFT JOIN, replacing JOIN <table> yields LEFT LEFT JOIN .... Plain .replace() can also mutate unintended substrings.

Safer replacement approach
+import re
...
-            processed_join = processed_join.replace(
-                f'JOIN {table_name}',
-                f'LEFT JOIN {qualified} AS {alias}',
-            )
-            processed_join = processed_join.replace(f'{table_name}.', f'{alias}.')
-            processed_where = processed_where.replace(f'{table_name}.', f'{alias}.')
+            processed_join = re.sub(
+                rf'\b(?:LEFT|RIGHT|FULL|INNER|CROSS)?\s*JOIN\s+{re.escape(table_name)}\b',
+                f'LEFT JOIN {qualified} AS {alias}',
+                processed_join,
+            )
+            processed_join = re.sub(
+                rf'\b{re.escape(table_name)}\.',
+                f'{alias}.',
+                processed_join,
+            )
+            processed_where = re.sub(
+                rf'\b{re.escape(table_name)}\.',
+                f'{alias}.',
+                processed_where,
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/aws/redshift.py` around lines
274 - 279, The current string replace calls for processed_join/processed_where
can corrupt SQL (e.g., turning "LEFT JOIN" into "LEFT LEFT JOIN" or altering
unintended substrings); replace them with targeted regex-based replacements that
match whole-word JOIN occurrences and table-qualified column references only:
for the JOIN clause, match a case-insensitive pattern like a word-boundary JOIN
followed by the exact table_name (and ensure it is not already preceded by
LEFT/INNER/RIGHT via a negative lookbehind or by checking the captured join
keyword) and replace with "LEFT JOIN {qualified} AS {alias}"; for column refs,
replace occurrences of the exact sequence "{table_name}." using a word-boundary
or lookbehind so you only change true table qualifiers (not substrings),
applying these to processed_join and processed_where and using the
function/variables processed_join, processed_where, table_name, alias, qualified
to locate the code to update.

Comment on lines +80 to +86
query_params = query_obj.get('parameters', {})
query_id = query_obj.get('id')
if not query_id:
raise ValueError('Query ID is required')

params_key = [p['name'] for p in query_params]
params_to_execute: Dict[str, Any] = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: default parameters shape causes runtime failure.

When parameters is absent, defaulting to {} makes params_key = [p['name'] for p in query_params] fail because iteration yields dict keys, not parameter objects.

Type-safe default and validation
-            query_params = query_obj.get('parameters', {})
+            query_params = query_obj.get('parameters') or []
+            if not isinstance(query_params, list):
+                raise ValueError('`parameters` must be a list of objects')
             query_id = query_obj.get('id')
...
-            params_key = [p['name'] for p in query_params]
+            params_key = [p['name'] for p in query_params if 'name' in p]
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 83-83: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py` around
lines 80 - 86, The default for parameters in the block using
query_obj.get('parameters', {}) is wrong — change it to an empty list
(query_params = query_obj.get('parameters', [])) and validate its shape before
using it: ensure query_params is a list and each element is a mapping with a
'name' key (raise a ValueError or skip invalid entries). Update the logic that
builds params_key = [p['name'] for p in query_params] and params_to_execute to
handle an empty list safely and to fail-fast with a clear message if an item
lacks 'name' (reference symbols: query_obj, query_params, params_key,
params_to_execute).

Comment on lines +82 to +94
if not query_id:
raise ValueError('Query ID is required')

params_key = [p['name'] for p in query_params]
params_to_execute: Dict[str, Any] = {}

if params is None:
params = {}

for key in params_key:
if key not in params:
raise ValueError(f'Missing parameter: {key} for query {query_id}')
params_to_execute[key] = params[key]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

One invalid query should not fail the whole batch.

Current ValueError paths abort all processing, which conflicts with the per-query result aggregation behavior later in the method.

Aggregate validation errors per query and continue
-            if not query_id:
-                raise ValueError('Query ID is required')
+            if not query_id:
+                results[f'query_{len(results)}'] = {
+                    'status': 'error',
+                    'error': 'Query ID is required',
+                    'description': 'Invalid query definition',
+                    'result': [],
+                }
+                continue
...
-            for key in params_key:
-                if key not in params:
-                    raise ValueError(f'Missing parameter: {key} for query {query_id}')
-                params_to_execute[key] = params[key]
+            missing = [key for key in params_key if key not in params]
+            if missing:
+                results[query_id] = {
+                    'status': 'error',
+                    'error': f'Missing parameter(s): {", ".join(missing)}',
+                    'description': f'Invalid input for query {query_id}',
+                    'result': [],
+                }
+                continue
+            for key in params_key:
+                params_to_execute[key] = params[key]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not query_id:
raise ValueError('Query ID is required')
params_key = [p['name'] for p in query_params]
params_to_execute: Dict[str, Any] = {}
if params is None:
params = {}
for key in params_key:
if key not in params:
raise ValueError(f'Missing parameter: {key} for query {query_id}')
params_to_execute[key] = params[key]
if not query_id:
results[f'query_{len(results)}'] = {
'status': 'error',
'error': 'Query ID is required',
'description': 'Invalid query definition',
'result': [],
}
continue
params_key = [p['name'] for p in query_params]
params_to_execute: Dict[str, Any] = {}
if params is None:
params = {}
missing = [key for key in params_key if key not in params]
if missing:
results[query_id] = {
'status': 'error',
'error': f'Missing parameter(s): {", ".join(missing)}',
'description': f'Invalid input for query {query_id}',
'result': [],
}
continue
for key in params_key:
params_to_execute[key] = params[key]
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 83-83: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 93-93: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py` around
lines 82 - 94, The current validation raises ValueError and aborts the whole
batch when a single query is invalid; instead, change the logic in the block
that checks query_id and builds params_to_execute (references: query_id,
query_params, params_to_execute, params) to collect validation errors per query
and skip execution for that query while allowing the loop to continue for
others. Concretely: if query_id is falsy or any expected param from params_key
is missing, do not raise; instead append an error entry (including query_id or
an index and a descriptive message) to the per-query results/errors aggregation
structure and continue to the next query; only build and use params_to_execute
for queries that pass validation so later per-query result aggregation remains
consistent.

Comment on lines +101 to +107
query_to_execute = query_to_execute.replace(
'{{rls}}', f'{odata_data_filter}' if odata_data_filter else 'TRUE'
)
query_to_execute = query_to_execute.replace(
'{{filters}}', f'{odata_filter}' if odata_filter else 'TRUE'
)
query_to_execute += f' LIMIT {limit} OFFSET {offset}'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: unsafe SQL interpolation in dynamic query path.

odata_filter, odata_data_filter, and LIMIT/OFFSET are inserted directly into SQL text. This enables injection if any of those fields are user-controlled.

Immediate mitigation pattern
+            if odata_filter and any(tok in odata_filter for tok in (';', '--', '/*', '*/')):
+                raise ValueError('Unsafe token in odata_filter')
+            if odata_data_filter and any(tok in odata_data_filter for tok in (';', '--', '/*', '*/')):
+                raise ValueError('Unsafe token in odata_data_filter')
...
-            query_to_execute += f' LIMIT {limit} OFFSET {offset}'
+            params_to_execute['limit'] = int(0 if limit is None else limit)
+            params_to_execute['offset'] = int(0 if offset is None else offset)
+            query_to_execute += ' LIMIT :limit OFFSET :offset'

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@wavefront/server/apps/floware/floware/controllers/config_controller.py`:
- Around line 100-104: The response currently injects app_config directly from
config_service.get_app_config() which can be None, causing 'app_config' to be
null; update the controller code that builds the response (where the dict
contains 'app_icon', 'app_config', 'datasources') to normalize app_config to a
stable object by replacing None with an empty dict (e.g., app_config =
app_config or {}) before inserting it into the response so clients always
receive an object for 'app_config'.
- Around line 94-95: The code calls datasource_repository.find() which uses
SQLAlchemyRepository.find() default limit (100) and can truncate results; change
the call to explicitly request all rows (e.g., call
datasource_repository.find(limit=None) or datasource_repository.find(limit=0)
depending on repository semantics) so datasources contains every datasource
before building datasource_ids = [str(datasource.id) for datasource in
datasources]; update that invocation in config_controller.py to pass the
explicit limit to eliminate silent truncation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4b351173-ae56-4d8d-ba33-fefda2b04955

📥 Commits

Reviewing files that changed from the base of the PR and between 20d52c5 and 5dad7b2.

📒 Files selected for processing (2)
  • wavefront/server/apps/floware/floware/controllers/config_controller.py
  • wavefront/server/apps/floware/floware/server.py

Comment on lines +94 to +95
datasources = await datasource_repository.find()
datasource_ids = [str(datasource.id) for datasource in datasources]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

find() default limit silently truncates datasource IDs.

Line 94 calls datasource_repository.find() without limit; SQLAlchemyRepository.find() defaults to 100, so the response can omit datasources once count exceeds 100 (see wavefront/server/modules/db_repo_module/db_repo_module/repositories/sql_alchemy_repository.py, Line 75).
This makes the new datasources field incomplete under normal growth.

💡 Proposed fix
-    datasources = await datasource_repository.find()
+    # Avoid implicit repository default truncation (default limit=100)
+    datasources = await datasource_repository.find(limit=10_000)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
datasources = await datasource_repository.find()
datasource_ids = [str(datasource.id) for datasource in datasources]
# Avoid implicit repository default truncation (default limit=100)
datasources = await datasource_repository.find(limit=10_000)
datasource_ids = [str(datasource.id) for datasource in datasources]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/controllers/config_controller.py`
around lines 94 - 95, The code calls datasource_repository.find() which uses
SQLAlchemyRepository.find() default limit (100) and can truncate results; change
the call to explicitly request all rows (e.g., call
datasource_repository.find(limit=None) or datasource_repository.find(limit=0)
depending on repository semantics) so datasources contains every datasource
before building datasource_ids = [str(datasource.id) for datasource in
datasources]; update that invocation in config_controller.py to pass the
explicit limit to eliminate silent truncation.

Comment on lines +100 to +104
{
'app_icon': url,
'app_config': app_config,
'datasources': datasource_ids,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Response shape can become inconsistent when config is missing.

config_service.get_app_config() can return app_config = None, so Line 102 may return null instead of an object. Prefer returning a stable shape ({}) for client safety.

💡 Proposed fix
             {
                 'app_icon': url,
-                'app_config': app_config,
+                'app_config': app_config or {},
                 'datasources': datasource_ids,
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
{
'app_icon': url,
'app_config': app_config,
'datasources': datasource_ids,
}
{
'app_icon': url,
'app_config': app_config or {},
'datasources': datasource_ids,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/controllers/config_controller.py`
around lines 100 - 104, The response currently injects app_config directly from
config_service.get_app_config() which can be None, causing 'app_config' to be
null; update the controller code that builds the response (where the dict
contains 'app_icon', 'app_config', 'datasources') to normalize app_config to a
stable object by replacing None with an empty dict (e.g., app_config =
app_config or {}) before inserting it into the response so clients always
receive an object for 'app_config'.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
wavefront/server/apps/floware/floware/server.py (1)

139-143: ⚠️ Potential issue | 🔴 Critical

PluginsContainer override still uses removed provider name.

PluginsContainer now uses cloud_storage_manager; passing cloud_manager= here can break container initialization at startup.

Proposed fix
 plugins_container = PluginsContainer(
     db_client=db_repo_container.db_client,
-    cloud_manager=common_container.cloud_storage_manager,
+    cloud_storage_manager=common_container.cloud_storage_manager,
     dynamic_query_repository=db_repo_container.dynamic_query_repository,
     cache_manager=db_repo_container.cache_manager,
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/server.py` around lines 139 - 143, The
PluginsContainer instantiation still passes the removed parameter name
cloud_manager; update the call to use the new parameter name by replacing
cloud_manager=common_container.cloud_storage_manager with
cloud_storage_manager=common_container.cloud_storage_manager so
PluginsContainer(...) receives the correct keyword argument; locate the
constructor call to PluginsContainer in server.py and make this single-argument
rename.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py`:
- Line 5: The __all__ list in __init__.py is unsorted and triggers Ruff RUF022;
reorder the entries in the __all__ variable so they are in alphabetical order
(e.g., 'authenticator_router', 'cloud_storage_router', 'datasource_router') to
satisfy the linter and keep the exported module names sorted.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`:
- Around line 19-21: The expires_in query parameter currently allows zero or
negative values; update its Query declaration in cloud_storage_controller.py
(the expires_in: int = Query(...) parameter) to enforce a lower bound (e.g., add
ge=1) so only positive seconds are accepted, and adjust the description
accordingly; if you prefer explicit runtime checking instead, validate
expires_in at the start of the handler (the controller function using
expires_in) and return a 400/raise HTTPException when expires_in < 1.
- Around line 43-47: The current broad except in cloud_storage_controller
catches all exceptions and returns HTTP 400; change it to only catch expected
client/validation errors (e.g., ValidationError or whatever specific exceptions
your input validation uses) and keep the JSONResponse with
status.HTTP_400_BAD_REQUEST and response_formatter.buildErrorResponse for those
specific exceptions, while allowing unexpected exceptions to propagate (or catch
Exception and return a 500 using status.HTTP_500_INTERNAL_SERVER_ERROR with an
appropriate generic error message) so internal failures are not reported as
client input errors; update the except block around the code that currently uses
JSONResponse, status.HTTP_400_BAD_REQUEST, and
response_formatter.buildErrorResponse accordingly (or re-raise the exception)
and ensure logging is added for internal errors if you choose to convert them to
500.
- Around line 25-31: The DI binding in cloud_storage_controller is referencing a
removed provider name; change the dependency injection target from
Provide[PluginsContainer.cloud_manager] to
Provide[PluginsContainer.cloud_storage_manager] so the CloudStorageManager
parameter in the controller (cloud_manager: CloudStorageManager = Depends(...))
is wired correctly; update the import or reference to PluginsContainer if
necessary and ensure usages like get_bucket_key and generate_presigned_url still
operate on the injected cloud_manager instance.

---

Outside diff comments:
In `@wavefront/server/apps/floware/floware/server.py`:
- Around line 139-143: The PluginsContainer instantiation still passes the
removed parameter name cloud_manager; update the call to use the new parameter
name by replacing cloud_manager=common_container.cloud_storage_manager with
cloud_storage_manager=common_container.cloud_storage_manager so
PluginsContainer(...) receives the correct keyword argument; locate the
constructor call to PluginsContainer in server.py and make this single-argument
rename.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 943f4c70-00fa-4479-b420-cf2b5ca11998

📥 Commits

Reviewing files that changed from the base of the PR and between 5dad7b2 and 2308517.

📒 Files selected for processing (6)
  • wavefront/server/apps/floware/floware/server.py
  • wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py
  • wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py
  • wavefront/server/modules/plugins_module/plugins_module/plugins_container.py
  • wavefront/server/modules/plugins_module/plugins_module/services/dynamic_query_service.py
  • wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py

from .cloud_storage_controller import cloud_storage_router

__all__ = ['datasource_router', 'authenticator_router']
__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Sort __all__ to satisfy Ruff (RUF022).

Current ordering triggers the static-analysis warning.

Proposed fix
-__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
+__all__ = ['authenticator_router', 'cloud_storage_router', 'datasource_router']
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
__all__ = ['datasource_router', 'authenticator_router', 'cloud_storage_router']
__all__ = ['authenticator_router', 'cloud_storage_router', 'datasource_router']
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 5-5: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/__init__.py`
at line 5, The __all__ list in __init__.py is unsorted and triggers Ruff RUF022;
reorder the entries in the __all__ variable so they are in alphabetical order
(e.g., 'authenticator_router', 'cloud_storage_router', 'datasource_router') to
satisfy the linter and keep the exported module names sorted.

Comment on lines +19 to +21
expires_in: int = Query(
300, description='Expiry time in seconds for the presigned URL'
),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add lower-bound validation for expires_in.

expires_in currently accepts zero/negative values, which can generate invalid or immediately expired URLs.

Proposed fix
     expires_in: int = Query(
-        300, description='Expiry time in seconds for the presigned URL'
+        300,
+        ge=1,
+        description='Expiry time in seconds for the presigned URL',
     ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expires_in: int = Query(
300, description='Expiry time in seconds for the presigned URL'
),
expires_in: int = Query(
300,
ge=1,
description='Expiry time in seconds for the presigned URL',
),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 19 - 21, The expires_in query parameter currently allows zero or
negative values; update its Query declaration in cloud_storage_controller.py
(the expires_in: int = Query(...) parameter) to enforce a lower bound (e.g., add
ge=1) so only positive seconds are accepted, and adjust the description
accordingly; if you prefer explicit runtime checking instead, validate
expires_in at the start of the handler (the controller function using
expires_in) and return a 400/raise HTTPException when expires_in < 1.

Comment on lines +43 to +47
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not convert all failures into HTTP 400.

Unexpected internal failures should not be reported as client input errors. Keep 400 for validation errors and let unknown exceptions propagate as 500.

Proposed fix
-    except Exception as e:
+    except ValueError as e:
         return JSONResponse(
             status_code=status.HTTP_400_BAD_REQUEST,
             content=response_formatter.buildErrorResponse(str(e)),
         )
+    except Exception:
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
except ValueError as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
except Exception:
raise
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 43-43: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 43 - 47, The current broad except in cloud_storage_controller
catches all exceptions and returns HTTP 400; change it to only catch expected
client/validation errors (e.g., ValidationError or whatever specific exceptions
your input validation uses) and keep the JSONResponse with
status.HTTP_400_BAD_REQUEST and response_formatter.buildErrorResponse for those
specific exceptions, while allowing unexpected exceptions to propagate (or catch
Exception and return a 500 using status.HTTP_500_INTERNAL_SERVER_ERROR with an
appropriate generic error message) so internal failures are not reported as
client input errors; update the except block around the code that currently uses
JSONResponse, status.HTTP_400_BAD_REQUEST, and
response_formatter.buildErrorResponse accordingly (or re-raise the exception)
and ensure logging is added for internal errors if you choose to convert them to
500.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py (2)

19-21: ⚠️ Potential issue | 🟡 Minor

Enforce a positive lower bound for expires_in.

expires_in currently allows zero/negative values, which can yield invalid or immediately expired URLs.

Proposed fix
     expires_in: int = Query(
-        300, description='Expiry time in seconds for the presigned URL'
+        300,
+        ge=1,
+        description='Expiry time in seconds for the presigned URL',
     ),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 19 - 21, The expires_in Query parameter currently allows zero or
negative values; update the Query declaration for expires_in to enforce a
positive lower bound (e.g. add ge=1) so it becomes expires_in: int = Query(300,
ge=1, description='Expiry time in seconds for the presigned URL'), ensuring any
handler/function using expires_in (the Query param symbol) rejects non-positive
values at validation time.

43-47: ⚠️ Potential issue | 🟠 Major

Do not convert all failures to HTTP 400 or return raw internal error text.

The broad except Exception block misclassifies server errors as client errors and leaks internal details via str(e).

Proposed fix
-    except Exception as e:
+    except ValueError as e:
         return JSONResponse(
             status_code=status.HTTP_400_BAD_REQUEST,
             content=response_formatter.buildErrorResponse(str(e)),
         )
+    except Exception:
+        raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 43 - 47, Replace the broad "except Exception as e" in
cloud_storage_controller.py (the exception handling block that returns
JSONResponse using response_formatter.buildErrorResponse(str(e))) with targeted
exception handling: catch known client errors (e.g., ValidationError/ValueError
-> return status.HTTP_400_BAD_REQUEST), not-found errors (e.g.,
FileNotFoundError or custom NotFound -> status.HTTP_404_NOT_FOUND), and
permission/auth errors (PermissionError or custom AuthError ->
status.HTTP_403_FORBIDDEN); for all other exceptions log the full exception
internally (use the module logger) and return a generic 500 response
(status.HTTP_500_INTERNAL_SERVER_ERROR) with a non-sensitive message via
response_formatter.buildErrorResponse without including str(e). Ensure you
update the handlers around the same try/except in cloud_storage_controller.py
and remove any direct exposure of internal error text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`:
- Around line 15-36: The get_resource_presigned_url handler currently signs any
caller-supplied resource_url without auth; before calling
cloud_storage_manager.get_bucket_key and generate_presigned_url, extract the
caller context using get_current_user(request) (same pattern as
datasource_controller) and enforce authorization for the resolved bucket/key
(e.g., call an existing permission check like
cloud_storage_manager.validate_access(user, bucket_name, key) or use the
module's authorization utility) to ensure the user has rights to access that
resource; only after successful validation proceed to call
cloud_storage_manager.generate_presigned_url with the bucket_name and key.

---

Duplicate comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`:
- Around line 19-21: The expires_in Query parameter currently allows zero or
negative values; update the Query declaration for expires_in to enforce a
positive lower bound (e.g. add ge=1) so it becomes expires_in: int = Query(300,
ge=1, description='Expiry time in seconds for the presigned URL'), ensuring any
handler/function using expires_in (the Query param symbol) rejects non-positive
values at validation time.
- Around line 43-47: Replace the broad "except Exception as e" in
cloud_storage_controller.py (the exception handling block that returns
JSONResponse using response_formatter.buildErrorResponse(str(e))) with targeted
exception handling: catch known client errors (e.g., ValidationError/ValueError
-> return status.HTTP_400_BAD_REQUEST), not-found errors (e.g.,
FileNotFoundError or custom NotFound -> status.HTTP_404_NOT_FOUND), and
permission/auth errors (PermissionError or custom AuthError ->
status.HTTP_403_FORBIDDEN); for all other exceptions log the full exception
internally (use the module logger) and return a generic 500 response
(status.HTTP_500_INTERNAL_SERVER_ERROR) with a non-sensitive message via
response_formatter.buildErrorResponse without including str(e). Ensure you
update the handlers around the same try/except in cloud_storage_controller.py
and remove any direct exposure of internal error text.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 754c5dd7-246a-4af0-bc43-05aa28500537

📥 Commits

Reviewing files that changed from the base of the PR and between 2308517 and 65a1bbd.

📒 Files selected for processing (8)
  • wavefront/server/apps/floware/floware/di/application_container.py
  • wavefront/server/apps/floware/floware/server.py
  • wavefront/server/apps/floware/floware/services/config_service.py
  • wavefront/server/apps/floware/tests/conftest.py
  • wavefront/server/background_jobs/workflow_job/workflow_job/main.py
  • wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py
  • wavefront/server/modules/tools_module/tools_module/message_processor/provider.py
  • wavefront/server/modules/tools_module/tools_module/tools_container.py

Comment on lines +15 to +36
@cloud_storage_router.get('/v1/storage/signed-url')
@inject
async def get_resource_presigned_url(
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
expires_in: int = Query(
300, description='Expiry time in seconds for the presigned URL'
),
response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
cloud_storage_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_storage_manager]
),
):
try:
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
presigned_url = cloud_storage_manager.generate_presigned_url(
bucket_name=bucket_name,
key=key,
type='GET',
expiresIn=expires_in,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for auth middleware and exemptions
echo "=== Searching for auth middleware and exemptions ==="
rg -n --type=py 'RequireAuthMiddleware|exclude|exempt|public|skip_auth|@protected|@auth' | head -50

echo ""
echo "=== Searching for the endpoint and related auth code ==="
rg -n --type=py 'get_resource_presigned_url' -B5 -A10 | head -100

echo ""
echo "=== Searching for CloudStorageManager implementation ==="
rg -n --type=py 'class CloudStorageManager' -A30 | head -100

echo ""
echo "=== Searching for get_bucket_key method ==="
rg -n --type=py 'def get_bucket_key' -B3 -A15

echo ""
echo "=== Searching for authorization check patterns ==="
rg -n --type=py 'check_auth|get_current_user|verify_access|authorize|is_admin' | head -30

Repository: rootflo/wavefront

Length of output: 28985


🏁 Script executed:

#!/bin/bash
# Search for cloud_storage_router registration and middleware setup
echo "=== Searching for cloud_storage_router registration and middleware ==="
rg -n --type=py 'cloud_storage_router|PluginsContainer' -B3 -A5 | head -80

echo ""
echo "=== Full cloud_storage_controller.py file ==="
cat wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py

echo ""
echo "=== Searching for current_user or authorization in plugins_module ==="
rg -n --type=py 'current_user|authorize|permission|scope' wavefront/server/modules/plugins_module/ | head -30

Repository: rootflo/wavefront

Length of output: 13600


Add authentication and authorization checks before generating presigned URLs.

The handler accepts a caller-supplied resource_url and signs it without verifying the caller's identity or access rights. Any request can generate presigned URLs for arbitrary buckets and keys, bypassing resource-level access controls.

Implement the same authorization pattern used elsewhere in the module (e.g., datasource_controller.py): call get_current_user(request) to extract user/role context, then validate that the caller has permission to access the specific bucket and key before signing the URL.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 22-24: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


[warning] 25-27: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 15 - 36, The get_resource_presigned_url handler currently signs any
caller-supplied resource_url without auth; before calling
cloud_storage_manager.get_bucket_key and generate_presigned_url, extract the
caller context using get_current_user(request) (same pattern as
datasource_controller) and enforce authorization for the resolved bucket/key
(e.g., call an existing permission check like
cloud_storage_manager.validate_access(user, bucket_name, key) or use the
module's authorization utility) to ensure the user has rights to access that
resource; only after successful validation proceed to call
cloud_storage_manager.generate_presigned_url with the bucket_name and key.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py (3)

51-55: ⚠️ Potential issue | 🟠 Major

Do not collapse all runtime failures into HTTP 400.

Catching Exception and returning 400 misclassifies internal/storage failures as client input errors. Restrict 400 to known validation errors and let unexpected errors propagate (or map to 500).

Proposed fix
-    except Exception as e:
+    except ValueError as e:
         return JSONResponse(
             status_code=status.HTTP_400_BAD_REQUEST,
             content=response_formatter.buildErrorResponse(str(e)),
         )
+    except Exception:
+        raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 51 - 55, The current except Exception block in
cloud_storage_controller.py collapses all failures into an HTTP 400; replace it
by catching only known client/validation errors (e.g., specific ValidationError
or ValueError types used by your service) and return JSONResponse with
status.HTTP_400_BAD_REQUEST using response_formatter.buildErrorResponse for
those, while allowing unexpected exceptions to propagate or be converted to a
500 (use logging and re-raise or return JSONResponse with
status.HTTP_500_INTERNAL_SERVER_ERROR for other exceptions); remove the broad
except Exception, update the handler around the
JSONResponse/response_formatter.buildErrorResponse usage, and ensure
JSONResponse is only used for true client errors.

23-44: ⚠️ Potential issue | 🔴 Critical

Add authN/authZ before signing caller-supplied storage URLs.

The endpoint signs arbitrary resource_url input without verifying caller identity/permissions. This can allow unauthorized access via presigned URLs.

Suggested direction
 `@cloud_storage_router.get`('/v1/storage/signed-url')
 `@inject`
 async def get_resource_presigned_url(
+    request: Request,
     resource_url: str = Query(..., description='The cloud storage URL of the resource'),
@@
 ):
     try:
+        user = get_current_user(request)
         bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
+        cloud_storage_manager.validate_access(user, bucket_name, key)
         presigned_url = cloud_storage_manager.generate_presigned_url(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 23 - 44, get_resource_presigned_url currently signs caller-supplied
resource_url without authZ/authN; before calling
cloud_storage_manager.get_bucket_key and generate_presigned_url, require and
verify the caller identity (e.g., inject current user via Depends like
get_current_user) and perform an authorization check (call a permission method
such as cloud_storage_manager.has_access(user, bucket, key) or an ACL check) to
ensure the user is allowed to access that specific resource; if the check fails,
return a 403/Unauthorized response and do not generate a presigned URL; also
validate/sanitize resource_url to ensure it maps to allowed buckets/keys before
signing.

27-29: ⚠️ Potential issue | 🟡 Minor

Enforce a positive lower bound for expires_in.

expires_in still accepts zero/negative values. Please constrain it at the query layer to avoid invalid/instantly expired URLs.

Proposed fix
     expires_in: int = Query(
-        300, description='Expiry time in seconds for the presigned URL'
+        300,
+        ge=1,
+        description='Expiry time in seconds for the presigned URL',
     ),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 27 - 29, The expires_in query parameter currently allows zero or
negative values; update its FastAPI Query declaration to enforce a positive
lower bound (e.g., use ge=1 or gt=0) so presigned URLs cannot be created with
non-positive expiry; locate the expires_in parameter in
cloud_storage_controller.py (the Query(...) call for expires_in) and add the
numeric constraint to the Query arguments while leaving the default and
description intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`:
- Around line 96-101: The except block currently groups json.JSONDecodeError and
ValueError together in cloud_storage_controller.py, causing
resource_url/bucket-key validation ValueErrors to be reported as parse errors;
split into two except handlers: one except json.JSONDecodeError as e that
returns the JSON parse response (using JSONResponse,
status.HTTP_422_UNPROCESSABLE_ENTITY, response_formatter.buildErrorResponse) and
a separate except ValueError as e that returns a clearer validation error
message like "Invalid resource_url or bucket/key: {str(e)}" (same status and
response_formatter call), so callers can distinguish parsing vs URL/key
validation failures.
- Around line 58-76: read_storage_file accepts user-supplied resource_url but
performs no authN/authZ; replicate the same authorization checks used by the
signed-url endpoint before calling
cloud_storage_manager.get_bucket_key/read_file. Locate the signed-url handler
(the endpoint that returns signed URLs) and copy the authorization flow (e.g.,
validate caller identity, verify permissions/ownership for the resolved
bucket/key or call the same cloud_storage_manager.authorize_* method), then
invoke that check in read_storage_file immediately after resolving
bucket_name,key and before calling cloud_storage_manager.read_file; ensure
failures return the same error/HTTP status used by the signed-url flow.

---

Duplicate comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`:
- Around line 51-55: The current except Exception block in
cloud_storage_controller.py collapses all failures into an HTTP 400; replace it
by catching only known client/validation errors (e.g., specific ValidationError
or ValueError types used by your service) and return JSONResponse with
status.HTTP_400_BAD_REQUEST using response_formatter.buildErrorResponse for
those, while allowing unexpected exceptions to propagate or be converted to a
500 (use logging and re-raise or return JSONResponse with
status.HTTP_500_INTERNAL_SERVER_ERROR for other exceptions); remove the broad
except Exception, update the handler around the
JSONResponse/response_formatter.buildErrorResponse usage, and ensure
JSONResponse is only used for true client errors.
- Around line 23-44: get_resource_presigned_url currently signs caller-supplied
resource_url without authZ/authN; before calling
cloud_storage_manager.get_bucket_key and generate_presigned_url, require and
verify the caller identity (e.g., inject current user via Depends like
get_current_user) and perform an authorization check (call a permission method
such as cloud_storage_manager.has_access(user, bucket, key) or an ACL check) to
ensure the user is allowed to access that specific resource; if the check fails,
return a 403/Unauthorized response and do not generate a presigned URL; also
validate/sanitize resource_url to ensure it maps to allowed buckets/keys before
signing.
- Around line 27-29: The expires_in query parameter currently allows zero or
negative values; update its FastAPI Query declaration to enforce a positive
lower bound (e.g., use ge=1 or gt=0) so presigned URLs cannot be created with
non-positive expiry; locate the expires_in parameter in
cloud_storage_controller.py (the Query(...) call for expires_in) and add the
numeric constraint to the Query arguments while leaving the default and
description intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8881c5fa-0de5-4ab5-ab78-7c7ec978ada7

📥 Commits

Reviewing files that changed from the base of the PR and between 65a1bbd and 1dad3f1.

📒 Files selected for processing (1)
  • wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py

Comment on lines +58 to +76
@cloud_storage_router.get('/v1/storage/read')
@inject
async def read_storage_file(
resource_url: str = Query(..., description='The cloud storage URL of the resource'),
type: StorageFileType = Query(StorageFileType.json, description='File type'),
projection: Optional[str] = Query(
None,
description='Comma-separated list of top-level fields to return from the parsed data',
),
response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
cloud_storage_manager: CloudStorageManager = Depends(
Provide[PluginsContainer.cloud_storage_manager]
),
):
try:
bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
file_buffer = cloud_storage_manager.read_file(bucket_name, key)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Apply the same authN/authZ checks on /v1/storage/read.

read_storage_file also accepts caller-supplied resource_url and reads it directly without access checks. This is an authorization gap equivalent to the signed-url endpoint.

Suggested direction
 async def read_storage_file(
+    request: Request,
     resource_url: str = Query(..., description='The cloud storage URL of the resource'),
@@
 ):
     try:
+        user = get_current_user(request)
         bucket_name, key = cloud_storage_manager.get_bucket_key(resource_url)
+        cloud_storage_manager.validate_access(user, bucket_name, key)
         file_buffer = cloud_storage_manager.read_file(bucket_name, key)
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 62-62: Do not perform function call Query in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


[warning] 67-69: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


[warning] 70-72: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 58 - 76, read_storage_file accepts user-supplied resource_url but
performs no authN/authZ; replicate the same authorization checks used by the
signed-url endpoint before calling
cloud_storage_manager.get_bucket_key/read_file. Locate the signed-url handler
(the endpoint that returns signed URLs) and copy the authorization flow (e.g.,
validate caller identity, verify permissions/ownership for the resolved
bucket/key or call the same cloud_storage_manager.authorize_* method), then
invoke that check in read_storage_file immediately after resolving
bucket_name,key and before calling cloud_storage_manager.read_file; ensure
failures return the same error/HTTP status used by the signed-url flow.

Comment on lines +96 to +101
except (json.JSONDecodeError, ValueError) as e:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=response_formatter.buildErrorResponse(
f'Failed to parse file as {type.value}: {str(e)}'
),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Separate URL/key validation errors from JSON parse errors.

ValueError is grouped with JSONDecodeError and reported as “Failed to parse file as json”, which is misleading when the actual failure is invalid resource_url/bucket-key parsing.

Proposed fix
-    except (json.JSONDecodeError, ValueError) as e:
+    except ValueError as e:
+        return JSONResponse(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            content=response_formatter.buildErrorResponse(str(e)),
+        )
+    except json.JSONDecodeError as e:
         return JSONResponse(
             status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
             content=response_formatter.buildErrorResponse(
                 f'Failed to parse file as {type.value}: {str(e)}'
             ),
         )
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 100-100: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/cloud_storage_controller.py`
around lines 96 - 101, The except block currently groups json.JSONDecodeError
and ValueError together in cloud_storage_controller.py, causing
resource_url/bucket-key validation ValueErrors to be reported as parse errors;
split into two except handlers: one except json.JSONDecodeError as e that
returns the JSON parse response (using JSONResponse,
status.HTTP_422_UNPROCESSABLE_ENTITY, response_formatter.buildErrorResponse) and
a separate except ValueError as e that returns a clearer validation error
message like "Invalid resource_url or bucket/key: {str(e)}" (same status and
response_formatter call), so callers can distinguish parsing vs URL/key
validation failures.

vizsatiz
vizsatiz previously approved these changes Mar 7, 2026
@vizsatiz vizsatiz merged commit b1ab5c3 into develop Mar 7, 2026
9 checks passed
@vizsatiz vizsatiz deleted the feat/redshift-query-update branch March 7, 2026 10:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants