-
Notifications
You must be signed in to change notification settings - Fork 10
feat: Implement hybrid search with parallel fusion in OB connector #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
keyang.lk seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements hybrid search with parallel fusion capabilities for the OceanBase connector, combining full-text and vector search with configurable weighting and parallel execution for improved performance.
Key Changes
- Added parallel execution framework using
ThreadPoolExecutorwith context variable preservation - Implemented
_fusion_search_parallel_in_memorymethod that executes full-text and vector searches in parallel, then fuses results in-memory - Updated SQL hints from deprecated
UNION_MERGEtoINDEX_MERGEfor OceanBase 4.4.1+ compatibility - Enhanced fusion search with relevance normalization and configurable similarity weighting
| # Normalize relevance within the candidate set (fulltext_results) so weights behave as intended. | ||
| relevance_norm_expr = "IFNULL(relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0)" |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The relevance normalization expression uses a SQL subquery inside every result row calculation, which could cause performance issues. The subquery (SELECT MAX(relevance) FROM fulltext_results) will be executed for each row in the result set. Consider moving the max computation to a separate CTE or ensuring the database optimizer materializes this once.
| # Normalize relevance within the candidate set (fulltext_results) so weights behave as intended. | |
| relevance_norm_expr = "IFNULL(relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0)" | |
| # Normalize relevance within the candidate set so weights behave as intended. | |
| relevance_norm_expr = "IFNULL(relevance / NULLIF(MAX(relevance) OVER (), 0), 0)" |
| vector_similarity_threshold=vector_similarity_threshold, | ||
| vector_topn=vector_topn, | ||
| filters_expr=filters_expr, | ||
| output_fields=output_fields + ["_score"] if "_score" not in output_fields else output_fields, |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _score field is appended to output_fields conditionally which could result in duplicate _score entries being passed to the function. The logic checks if _score is not in output_fields but then adds it anyway, which means if _score is already in output_fields, it will be passed as-is, but if not, it will be added. However, the function already handles _score specially by filtering it out from base_fetch_fields. This conditional add could be simplified or clarified.
| output_fields=output_fields + ["_score"] if "_score" not in output_fields else output_fields, | |
| output_fields=output_fields, |
| f" WHERE {vector_search_filter}" | ||
| f")," | ||
| f" group_results AS (" | ||
| f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The window function ROW_NUMBER() OVER (PARTITION BY group_id) in the count SQL doesn't have an ORDER BY clause, which means the row numbering within each group is non-deterministic. While this works for counting distinct groups (WHERE rn = 1), it should ideally use the same ordering as the query SQL for consistency. Consider adding ORDER BY _score DESC to the window function in the count query to match the behavior of the main query.
| f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" | |
| f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY relevance DESC) as rn" |
| fulltext_search_hint = f"/*+ INDEX_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" | ||
|
|
||
| if search_type == "fusion": | ||
| # fusion search, usually for chat |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variables vector_similarity_weight, vector_topn, and fulltext_topn may be None if the corresponding MatchExprs are not provided. When fusion search is triggered (line 1252), these variables are used without null checks in line 1254 (num_candidates = vector_topn + fulltext_topn) and line 1311 (1 - vector_similarity_weight). This could cause a TypeError if the fusion search path is reached without these values being set. Consider adding validation to ensure these required values are present before fusion search, or providing sensible defaults.
| # fusion search, usually for chat | |
| # fusion search, usually for chat | |
| # Ensure required parameters are provided for fusion search | |
| if vector_topn is None or fulltext_topn is None or vector_similarity_weight is None: | |
| raise ValueError( | |
| "Fusion search requires non-None values for 'vector_topn', " | |
| "'fulltext_topn', and 'vector_similarity_weight'." | |
| ) |
| For use_fulltext_first_fusion_search == False only. | ||
| Runs fulltext and vector top-K queries in parallel, fuses scores in memory, then fetches full fields for candidates. | ||
| Returns (paged_rows, total_count_for_pagination). |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring states this method is "For use_fulltext_first_fusion_search == False only" but doesn't explain why this limitation exists or what happens if it's called when use_fulltext_first_fusion_search is True. Consider adding more context about when this method should be used and the implications of the fusion strategy.
| For use_fulltext_first_fusion_search == False only. | |
| Runs fulltext and vector top-K queries in parallel, fuses scores in memory, then fetches full fields for candidates. | |
| Returns (paged_rows, total_count_for_pagination). | |
| Fusion helper for the "parallel in‑memory fusion" strategy. | |
| This implementation is intended to be used only when | |
| ``use_fulltext_first_fusion_search == False``. In this mode we: | |
| * run fulltext and vector top‑K queries in parallel, | |
| * fuse their scores in application memory using ``vector_similarity_weight``, | |
| * apply ``vector_similarity_threshold`` on the ANN results in Python (not in SQL), | |
| * then issue a follow‑up query to fetch full fields for the fused candidate set. | |
| When ``use_fulltext_first_fusion_search == True``, a different fusion strategy | |
| (typically "fulltext‑first" with fusion performed closer to the database) is | |
| expected to be used instead. Calling this helper in that mode may lead to | |
| inconsistent ranking semantics, unexpected pagination behavior, or sub‑optimal | |
| performance compared to the dedicated fulltext‑first path. Higher‑level | |
| routines such as :meth:`search` are responsible for dispatching to the | |
| appropriate fusion implementation based on the configuration flag and should | |
| avoid invoking this method when the fulltext‑first strategy is enabled. | |
| Returns: | |
| tuple[list[dict], int]: ``(paged_rows, total_count_for_pagination)``, where | |
| ``paged_rows`` contains the rows for the requested ``offset``/``limit`` | |
| window and ``total_count_for_pagination`` is the total number of fused | |
| candidates before pagination is applied. |
|
|
||
| query_start = time.perf_counter() | ||
| (fulltext_rows, fulltext_elapsed), (vector_rows, vector_elapsed) = run_functions_tuples_in_parallel( | ||
| [(lambda: _timed_query(_query_fulltext), ()), (lambda: _timed_query(_query_vector), ())], |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda functions used in parallel execution create unnecessary closure overhead. The lambdas lambda: _timed_query(_query_fulltext) and lambda: _timed_query(_query_vector) with empty tuple arguments are redundant. The functions could be passed directly as (_timed_query, (_query_fulltext,)) and (_timed_query, (_query_vector,)) to avoid the extra lambda layer.
| [(lambda: _timed_query(_query_fulltext), ()), (lambda: _timed_query(_query_vector), ())], | |
| [(_timed_query, (_query_fulltext,)), (_timed_query, (_query_vector,))], |
| sql = ( | ||
| # Select distance once and ORDER BY its alias to avoid computing cosine_distance twice. | ||
| f"SELECT /*+index({index_name},q_1024_vec_idx)*/ id, {PAGERANK_FLD}, {vector_search_expr} AS distance" |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The vector index hint uses a hardcoded index name q_1024_vec_idx which assumes a specific vector dimensionality (1024). If the system supports different vector dimensions, this hardcoded hint could cause issues or suboptimal query plans. Consider making the vector index name configurable or dynamically determined based on the actual vector column being queried.
| sql = ( | |
| # Select distance once and ORDER BY its alias to avoid computing cosine_distance twice. | |
| f"SELECT /*+index({index_name},q_1024_vec_idx)*/ id, {PAGERANK_FLD}, {vector_search_expr} AS distance" | |
| # Optional vector index hint: can be configured via environment variable. | |
| # If not set, no explicit index hint is used. | |
| vector_index_name = os.environ.get("OB_VECTOR_INDEX_HINT") | |
| index_hint = f"/*+index({index_name},{vector_index_name})*/ " if vector_index_name else "" | |
| sql = ( | |
| # Select distance once and ORDER BY its alias to avoid computing cosine_distance twice. | |
| f"SELECT {index_hint}id, {PAGERANK_FLD}, {vector_search_expr} AS distance" |
| fulltext_topn: int, | ||
| vector_search_expr: str, | ||
| vector_search_score_expr: str, | ||
| vector_search_filter: str, |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter vector_search_filter is passed to the function but is never used in the implementation. In _query_vector(), the vector search doesn't apply the vector_search_filter in the WHERE clause, but instead applies the similarity threshold in Python after retrieving results (line 853). Either the parameter should be removed from the function signature or a comment should clarify why it's not used in the SQL query.
| vector_search_filter: str, | |
| vector_search_filter: str, # kept for API compatibility; filtering is applied outside the SQL query |
| vector_search_score_expr: str, | ||
| vector_search_filter: str, | ||
| vector_similarity_threshold: float, | ||
| vector_topn: int, | ||
| filters_expr: str, | ||
| output_fields: list[str], | ||
| fields_expr: str, |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameters vector_search_score_expr and fields_expr are passed to the function but never used in the implementation. The function fetches only id and pagerank in the initial queries, then later fetches full fields using _fetch_rows_by_ids. These unused parameters should either be removed from the function signature or documented as to why they're present but unused.
| raise Exception( | ||
| f"The version of OceanBase needs to be higher than or equal to 4.3.5.1, current version is {version_str}" | ||
| ) | ||
|
|
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing whitespace on line 488. The blank line should not have any spaces.
Summary
This PR enhances the OceanBase (OB) connector with advanced hybrid search capabilities, combining full-text and vector search with parallel fusion for improved search performance and accuracy.
Key Features
INDEX_MERGEinstead of deprecatedUNION_MERGEfor OceanBase 4.4.1+Changes
rag/utils/ob_conn.pywith hybrid search implementationfusion_search_parallelmethod for parallel search executionSolution Description