@@ -523,8 +523,10 @@ fn read_source_ordinals(
523523}
524524
525525/// Try to build a parameterized WHERE clause for the target read query.
526- /// Single-column key sets: WHERE t."col" = ANY($N::text::type[])
527- /// Multi-column key sets: WHERE (t."c1", t."c2") IN (SELECT c1, c2 FROM unnest($N1::text::type1[], $N2::text::type2[]) AS u(c1, c2))
526+ /// Uses = ANY() per column, enabling Bitmap Index Scan instead of Nested Loop.
527+ /// Single-column: WHERE t."col" = ANY($N::text::type[])
528+ /// Multi-column: WHERE (t."c1" = ANY($1::text::type1[]) AND t."c2" = ANY($2::text::type2[]))
529+ /// Nullable: WHERE (t."col" IS NOT NULL AND t."c1" = ANY($1) AND t."col" = ANY($2)) OR ...
528530/// Returns None only if any column type is unknown.
529531/// Returns Some(("", [])) for full-scan modes (no WHERE clause needed).
530532fn try_build_parameterized_filter (
@@ -583,16 +585,15 @@ fn try_build_parameterized_filter(
583585 return Some ( ( String :: new ( ) , vec ! [ ] ) ) ; // No filter needed
584586 }
585587
586- // Build parameterized WHERE clause — handles both single and multi- column key sets .
588+ // Build parameterized WHERE clause using = ANY() per column.
587589 // For key sets with nullable columns, uses partition-by-NULL optimization:
588- // generates separate EXISTS per nullable column with IS NOT NULL + = joins,
589- // enabling Hash/Merge joins instead of Nested Loop with IS NOT DISTINCT FROM.
590+ // generates separate (IS NOT NULL AND col = ANY(...)) per nullable column.
590591 let mut params = Vec :: new ( ) ;
591592 let mut where_parts = Vec :: new ( ) ;
592593 let mut param_index = 1usize ;
593594 let mut key_set_id = 0usize ;
594595
595- for ( _original_idx , ( cols, is_identity) ) in filter_key_sets. iter ( ) . enumerate ( ) {
596+ for ( cols, is_identity) in & filter_key_sets {
596597 // Verify all columns have known types
597598 for col in cols {
598599 if !target_col_types. contains_key ( col) {
@@ -606,9 +607,9 @@ fn try_build_parameterized_filter(
606607 . collect ( ) ;
607608
608609 if nullable_cols. is_empty ( ) {
609- // All columns are NOT NULL — use existing approach
610- if cols . len ( ) == 1 {
611- let col = & cols[ 0 ] ;
610+ // All columns NOT NULL — = ANY() per column
611+ let mut conditions = Vec :: new ( ) ;
612+ for col in cols {
612613 let pg_type = target_col_types. get ( col) . unwrap ( ) ;
613614 params. push ( FilterParam {
614615 col_name : col. clone ( ) ,
@@ -618,132 +619,55 @@ fn try_build_parameterized_filter(
618619 key_set_id,
619620 partition_not_null_cols : vec ! [ ] ,
620621 } ) ;
621- where_parts . push ( format ! (
622- "EXISTS (SELECT 1 FROM unnest (${idx}::text::{typ}[]) AS _u(v) WHERE t.{col} IS NOT DISTINCT FROM _u.v )" ,
622+ conditions . push ( format ! (
623+ "t.{col} = ANY (${idx}::text::{typ}[])" ,
623624 col = qi( col) ,
624625 idx = param_index,
625626 typ = pg_type,
626627 ) ) ;
627628 param_index += 1 ;
629+ }
630+ if conditions. len ( ) == 1 {
631+ where_parts. push ( conditions. into_iter ( ) . next ( ) . unwrap ( ) ) ;
628632 } else {
629- // Multi-column, all NOT NULL: EXISTS with IS NOT DISTINCT FROM
630- let mut unnest_calls = Vec :: new ( ) ;
631- let mut u_col_names = Vec :: new ( ) ;
632- let mut conditions = Vec :: new ( ) ;
633-
634- for ( ci, col) in cols. iter ( ) . enumerate ( ) {
635- let pg_type = target_col_types. get ( col) . unwrap ( ) ;
636- params. push ( FilterParam {
637- col_name : col. clone ( ) ,
638- pg_type : pg_type. clone ( ) ,
639- param_index,
640- is_identity : * is_identity,
641- key_set_id,
642- partition_not_null_cols : vec ! [ ] ,
643- } ) ;
644- let u_alias = format ! ( "_c{}" , ci) ;
645- unnest_calls. push ( format ! (
646- "unnest(${idx}::text::{typ}[])" ,
647- idx = param_index,
648- typ = pg_type,
649- ) ) ;
650- u_col_names. push ( u_alias. clone ( ) ) ;
651- conditions. push ( format ! (
652- "t.{col} IS NOT DISTINCT FROM _u.{u_alias}" ,
653- col = qi( col) ,
654- u_alias = u_alias,
655- ) ) ;
656- param_index += 1 ;
657- }
658-
659- where_parts. push ( format ! (
660- "EXISTS (SELECT 1 FROM ROWS FROM({fns}) AS _u({cols}) WHERE {conds})" ,
661- fns = unnest_calls. join( ", " ) ,
662- cols = u_col_names. join( ", " ) ,
663- conds = conditions. join( " AND " ) ,
664- ) ) ;
633+ where_parts. push ( format ! ( "({})" , conditions. join( " AND " ) ) ) ;
665634 }
666635 key_set_id += 1 ;
667636 } else {
668- // Has nullable columns — partition-by-NULL optimization.
669- // For each nullable column, generate a separate EXISTS clause with:
670- // - IS NOT NULL filter on target column
671- // - = equality joins (Hash/Merge join eligible)
672- // - Source params filtered to rows where that column IS NOT NULL
673- // Mirrors PL/pgSQL partition-by-NULL pattern (src/2k_temporal_merge_plan.sql:1183-1254).
637+ // Has nullable columns — partition-by-NULL with = ANY() per column.
674638 let notnull_cols: Vec < & String > = cols. iter ( )
675639 . filter ( |c| target_col_notnull. get ( * c) . copied ( ) . unwrap_or ( true ) )
676640 . collect ( ) ;
677641
678642 for nullable_col in & nullable_cols {
679- // Partition columns = NOT NULL cols + this nullable col
680643 let partition_cols: Vec < & String > = notnull_cols. iter ( )
681644 . copied ( )
682645 . chain ( std:: iter:: once ( * nullable_col) )
683646 . collect ( ) ;
684647
685- if partition_cols. len ( ) == 1 {
686- // Single-column partition (nullable col only, no NOT NULL cols)
687- let col = partition_cols[ 0 ] ;
688- let pg_type = target_col_types. get ( col) . unwrap ( ) ;
648+ let mut conditions = Vec :: new ( ) ;
649+ conditions. push ( format ! ( "t.{} IS NOT NULL" , qi( nullable_col) ) ) ;
650+
651+ for col in & partition_cols {
652+ let pg_type = target_col_types. get ( * col) . unwrap ( ) ;
689653 params. push ( FilterParam {
690- col_name : col. clone ( ) ,
654+ col_name : ( * col) . clone ( ) ,
691655 pg_type : pg_type. clone ( ) ,
692656 param_index,
693657 is_identity : * is_identity,
694658 key_set_id,
695659 partition_not_null_cols : vec ! [ ( * nullable_col) . clone( ) ] ,
696660 } ) ;
697- where_parts. push ( format ! (
698- "EXISTS (SELECT 1 FROM unnest(${idx}::text::{typ}[]) AS _u(v) \
699- WHERE t.{col} IS NOT NULL AND t.{col} = _u.v)",
661+ conditions. push ( format ! (
662+ "t.{col} = ANY(${idx}::text::{typ}[])" ,
700663 col = qi( col) ,
701664 idx = param_index,
702665 typ = pg_type,
703666 ) ) ;
704667 param_index += 1 ;
705- } else {
706- // Multi-column partition: ROWS FROM with = equality joins
707- let mut unnest_calls = Vec :: new ( ) ;
708- let mut u_col_names = Vec :: new ( ) ;
709- let mut conditions = Vec :: new ( ) ;
710-
711- // Add IS NOT NULL condition for the nullable column on target
712- conditions. push ( format ! ( "t.{} IS NOT NULL" , qi( nullable_col) ) ) ;
713-
714- for ( ci, col) in partition_cols. iter ( ) . enumerate ( ) {
715- let pg_type = target_col_types. get ( * col) . unwrap ( ) ;
716- params. push ( FilterParam {
717- col_name : ( * col) . clone ( ) ,
718- pg_type : pg_type. clone ( ) ,
719- param_index,
720- is_identity : * is_identity,
721- key_set_id,
722- partition_not_null_cols : vec ! [ ( * nullable_col) . clone( ) ] ,
723- } ) ;
724- let u_alias = format ! ( "_c{}" , ci) ;
725- unnest_calls. push ( format ! (
726- "unnest(${idx}::text::{typ}[])" ,
727- idx = param_index,
728- typ = pg_type,
729- ) ) ;
730- u_col_names. push ( u_alias. clone ( ) ) ;
731- // Use = for equality (all columns in partition are NOT NULL by construction)
732- conditions. push ( format ! (
733- "t.{col} = _u.{u_alias}" ,
734- col = qi( col) ,
735- u_alias = u_alias,
736- ) ) ;
737- param_index += 1 ;
738- }
739-
740- where_parts. push ( format ! (
741- "EXISTS (SELECT 1 FROM ROWS FROM({fns}) AS _u({cols}) WHERE {conds})" ,
742- fns = unnest_calls. join( ", " ) ,
743- cols = u_col_names. join( ", " ) ,
744- conds = conditions. join( " AND " ) ,
745- ) ) ;
746668 }
669+
670+ where_parts. push ( format ! ( "({})" , conditions. join( " AND " ) ) ) ;
747671 key_set_id += 1 ;
748672 }
749673 }
@@ -756,108 +680,54 @@ fn try_build_parameterized_filter(
756680/// Extract distinct filter values from source rows for parameterized target read.
757681/// Returns one PostgreSQL array literal string per FilterParam.
758682///
759- /// For single- column key sets: extracts distinct values for that column .
760- /// For multi-column key sets: extracts distinct tuples as parallel arrays,
761- /// ensuring the arrays stay aligned (same index = same tuple) .
683+ /// Each column collects its distinct values independently (no tuple pairing) .
684+ /// This matches the = ANY() per-column WHERE clause: the cross-product may
685+ /// over-fetch a few target rows, but the sweep handles extras at zero cost .
762686pub fn extract_filter_values (
763687 source_rows : & [ SourceRow ] ,
764688 filter_params : & [ FilterParam ] ,
765689) -> Vec < String > {
766- // Group params by key_set_id to handle multi-column correctly
767- let mut key_set_ids: Vec < usize > = filter_params. iter ( ) . map ( |p| p. key_set_id ) . collect ( ) ;
768- key_set_ids. sort_unstable ( ) ;
769- key_set_ids. dedup ( ) ;
770-
771- // For each key_set, extract distinct tuples (Option<String> to support NULLs)
772- let mut key_set_values: std:: collections:: HashMap < usize , Vec < Vec < Option < String > > > > =
773- std:: collections:: HashMap :: new ( ) ;
774-
775- for & ks_id in & key_set_ids {
776- let ks_params: Vec < & FilterParam > = filter_params
777- . iter ( )
778- . filter ( |p| p. key_set_id == ks_id)
779- . collect ( ) ;
780-
781- // Get partition NOT NULL filter columns (same for all params in a key_set)
782- let partition_not_null = ks_params. first ( )
783- . map ( |p| & p. partition_not_null_cols )
784- . cloned ( )
785- . unwrap_or_default ( ) ;
786-
787- let mut seen = std:: collections:: HashSet :: new ( ) ;
788- let n_cols = ks_params. len ( ) ;
789- let mut columns: Vec < Vec < Option < String > > > = vec ! [ Vec :: new( ) ; n_cols] ;
790-
791- for row in source_rows {
792- // Partition-by-NULL filter: skip rows where required columns are NULL
793- if !partition_not_null. is_empty ( ) {
794- let passes = partition_not_null. iter ( ) . all ( |col| {
795- let val = row. identity_keys . get ( col)
796- . or_else ( || row. lookup_keys . get ( col) ) ;
797- val. map_or ( false , |v| !v. is_null ( ) )
798- } ) ;
799- if !passes {
800- continue ;
690+ // Each FilterParam gets independent per-column distinct values.
691+ filter_params
692+ . iter ( )
693+ . map ( |param| {
694+ let mut distinct = std:: collections:: BTreeSet :: new ( ) ;
695+
696+ for row in source_rows {
697+ // Partition-by-NULL filter: skip rows where required columns are NULL
698+ if !param. partition_not_null_cols . is_empty ( ) {
699+ let passes = param. partition_not_null_cols . iter ( ) . all ( |col| {
700+ let val = row. identity_keys . get ( col)
701+ . or_else ( || row. lookup_keys . get ( col) ) ;
702+ val. map_or ( false , |v| !v. is_null ( ) )
703+ } ) ;
704+ if !passes {
705+ continue ;
706+ }
801707 }
802- }
803-
804- let mut tuple: Vec < Option < String > > = Vec :: with_capacity ( n_cols) ;
805708
806- for param in & ks_params {
807- // Check both maps: a column might be in identity_keys or lookup_keys
808- // depending on how PL/pgSQL wrapper classified it. For filter purposes,
809- // we just need the value from whichever map has it.
810- let val_opt = row
811- . identity_keys
812- . get ( & param. col_name )
709+ let val_opt = row. identity_keys . get ( & param. col_name )
813710 . or_else ( || row. lookup_keys . get ( & param. col_name ) ) ;
814711
815712 if let Some ( val) = val_opt {
816- if val. is_null ( ) {
817- tuple. push ( None ) ; // NULL — included for IS NOT DISTINCT FROM matching
818- } else {
713+ if !val. is_null ( ) {
819714 let text_val = match val {
820715 serde_json:: Value :: String ( s) => s. clone ( ) ,
821716 serde_json:: Value :: Number ( n) => n. to_string ( ) ,
822717 serde_json:: Value :: Bool ( b) => b. to_string ( ) ,
823718 _ => val. to_string ( ) ,
824719 } ;
825- tuple . push ( Some ( text_val) ) ;
720+ distinct . insert ( text_val) ;
826721 }
827- } else {
828- tuple. push ( None ) ; // Missing column treated as NULL
829722 }
830723 }
831724
832- // Deduplicate by tuple (represent NULL as sentinel for dedup key)
833- let tuple_key: Vec < String > = tuple. iter ( ) . map ( |v| match v {
834- Some ( s) => s. clone ( ) ,
835- None => "\x00 NULL\x00 " . to_string ( ) ,
836- } ) . collect ( ) ;
837- if seen. insert ( tuple_key) {
838- for ( i, val) in tuple. into_iter ( ) . enumerate ( ) {
839- columns[ i] . push ( val) ;
840- }
841- }
842- }
843-
844- key_set_values. insert ( ks_id, columns) ;
845- }
846-
847- // Map back to per-FilterParam array literals (in order)
848- filter_params
849- . iter ( )
850- . map ( |param| {
851- let columns = key_set_values. get ( & param. key_set_id ) . unwrap ( ) ;
852- let ks_params: Vec < & FilterParam > = filter_params
853- . iter ( )
854- . filter ( |p| p. key_set_id == param. key_set_id )
725+ // Convert to PostgreSQL array literal (no NULLs — = ANY() doesn't match NULLs)
726+ let values: Vec < Option < String > > = distinct
727+ . into_iter ( )
728+ . map ( Some )
855729 . collect ( ) ;
856- let col_idx = ks_params
857- . iter ( )
858- . position ( |p| p. param_index == param. param_index )
859- . unwrap ( ) ;
860- format_pg_array_literal ( & columns[ col_idx] )
730+ format_pg_array_literal ( & values)
861731 } )
862732 . collect ( )
863733}
0 commit comments