Skip to content

Commit 89b6d02

Browse files
zhyassdantengsky
andauthored
fix: query error when stream_consume_batch_size_hint is not 0 (#19074)
* fix: query error when stream_consume_batch_size_hint is not 0 * fix * add logic test for issue #19073 --------- Co-authored-by: dantengsky <dantengsky@gmail.com>
1 parent 4618b41 commit 89b6d02

10 files changed

+119
-72
lines changed

src/query/service/src/sessions/query_ctx.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,34 +1457,32 @@ impl TableContext for QueryContext {
14571457
table: &str,
14581458
max_batch_size: Option<u64>,
14591459
) -> Result<Arc<dyn Table>> {
1460-
let max_batch_size = {
1461-
match max_batch_size {
1462-
Some(v) => {
1463-
// use the batch size specified in the statement
1460+
let final_batch_size = match max_batch_size {
1461+
Some(v) => {
1462+
// use the batch size specified in the statement
1463+
Some(v)
1464+
}
1465+
None => {
1466+
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
1467+
info!("Overriding stream max_batch_size with setting value: {}", v);
14641468
Some(v)
1465-
}
1466-
None => {
1467-
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
1468-
info!("Overriding stream max_batch_size with setting value: {}", v);
1469-
Some(v)
1470-
} else {
1471-
None
1472-
}
1469+
} else {
1470+
None
14731471
}
14741472
}
14751473
};
14761474

14771475
let table = self
1478-
.get_table_from_shared(catalog, database, table, max_batch_size)
1476+
.get_table_from_shared(catalog, database, table, final_batch_size)
14791477
.await?;
14801478
if table.is_stream() {
14811479
let stream = StreamTable::try_from_table(table.as_ref())?;
14821480
let actual_batch_limit = stream.max_batch_size();
1483-
if actual_batch_limit != max_batch_size {
1481+
if actual_batch_limit != final_batch_size {
14841482
return Err(ErrorCode::StorageUnsupported(
14851483
format!(
14861484
"Stream batch size must be consistent within transaction: actual={:?}, requested={:?}",
1487-
actual_batch_limit, max_batch_size
1485+
actual_batch_limit, final_batch_size
14881486
)
14891487
));
14901488
}

tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ drop stream s3
172172
statement ok
173173
drop table t2 all
174174

175-
query TTTTT
175+
query TTTTTT
176176
select catalog, database, name, mode, table_name, comment from system.streams where database='test_stream' order by name
177177
----
178178
default test_stream s append_only test_stream.t (empty)
@@ -194,7 +194,7 @@ drop stream t1
194194
statement ok
195195
drop table t1 all
196196

197-
query T
197+
query TTTTAA
198198
show columns from s2
199199
----
200200
a int YES (empty) NULL NULL
@@ -210,12 +210,12 @@ s2 Change tracking is not enabled on table 'test_stream'.'t'
210210
statement ok
211211
drop table t all
212212

213-
query TTT
213+
query TT
214214
select name, invalid_reason from system.streams where database='test_stream' order by name
215215
----
216216
s2 Unknown table 't'
217217

218-
query T
218+
query TTTTAA
219219
show columns from s2
220220
----
221221

@@ -535,7 +535,7 @@ statement ok
535535
create stream s8_1 on table t7
536536

537537
# test merge into insert only
538-
query T
538+
query I
539539
merge into t7 using (select a, b from t8) as s on t7.a=s.a when not matched then insert *
540540
----
541541
1
@@ -546,7 +546,7 @@ select a, b, change$action, change$is_update from s8_1 order by a, b
546546
3 3 INSERT 0
547547

548548
# test merge into matched only
549-
query TT
549+
query II
550550
merge into t7 using (select a, b+1 as b from t8) as s on t7.a=s.a when matched and s.a=2 then update * when matched and s.a=4 then delete
551551
----
552552
1 1
@@ -572,7 +572,7 @@ statement ok
572572
create stream s9_1 on table t7
573573

574574
# test merge into full operation
575-
query TTT
575+
query III
576576
merge into t7 using t8 on t7.a=t8.a when matched and t8.a=2 then update set t7.b=t8.b when matched and t8.a=3 then delete when not matched then insert (a,b) values(t8.a,t8.b)
577577
----
578578
1 1 1
@@ -673,7 +673,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up
673673
----
674674
1 0
675675

676-
query T
676+
query ITTB
677677
select id, c1, change$action, change$is_update from stream_t9;
678678
----
679679
10 a INSERT 0
@@ -693,7 +693,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up
693693
----
694694
0 1
695695

696-
query T
696+
query ITTTB
697697
select * from stream_t9;
698698
----
699699

@@ -810,15 +810,15 @@ insert into t1 values(1,1),(2,3),(3,3);
810810
statement ok
811811
explain merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *;
812812

813-
query TT
813+
query II
814814
merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *;
815815
----
816816
1 2
817817

818818
statement ok
819819
alter table t1 rename to t1_1
820820

821-
query T
821+
query TT
822822
select name, invalid_reason from system.streams where database='test_stream' order by name
823823
----
824824
s1 (empty)

tests/sqllogictests/suites/ee/06_ee_stream/06_0001_stream_status.test

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,45 +39,45 @@ alter table t set options(change_tracking=true)
3939
statement ok
4040
create stream if not exists s on table t
4141

42-
query I
42+
query B
4343
select * from stream_status('s')
4444
----
4545
0
4646

47-
query I
47+
query B
4848
select * from stream_status('test_stream_status.s')
4949
----
5050
0
5151

52-
query I
52+
query B
5353
select * from stream_status('default.test_stream_status.s')
5454
----
5555
0
5656

57-
query I
57+
query B
5858
call system$stream_status('s')
5959
----
6060
0
6161

62-
query I
62+
query B
6363
call system$stream_status('test_stream_status.s')
6464
----
6565
0
6666

67-
query I
67+
query B
6868
call system$stream_status('default.test_stream_status.s')
6969
----
7070
0
7171

7272
statement ok
7373
insert into t values(2)
7474

75-
query I
75+
query B
7676
select * from stream_status('s')
7777
----
7878
1
7979

80-
query I
80+
query B
8181
call system$stream_status('s')
8282
----
8383
1

tests/sqllogictests/suites/ee/06_ee_stream/06_0002_stream_txn_consume.test

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ insert into t_1 (str) values ('a'), ('b');
371371
statement ok
372372
begin;
373373

374-
query I
374+
query T
375375
select str from s_1 with consume order by str;
376376
----
377377
a
@@ -383,7 +383,7 @@ select 1/0;
383383
statement ok
384384
commit;
385385

386-
query I
386+
query T
387387
select str from s_1 order by str;
388388
----
389389
a
@@ -394,20 +394,20 @@ b
394394
statement ok
395395
begin;
396396

397-
query I
397+
query T
398398
select str from s_1 with consume order by str;
399399
----
400400
a
401401
b
402402

403403
# inside txn, s_1 is consumed, expects empty result set
404-
query I
404+
query T
405405
select str from s_1;
406406

407407
statement ok
408408
rollback;
409409

410-
query I
410+
query T
411411
select str from s_1 order by str;
412412
----
413413
a
@@ -428,7 +428,7 @@ statement ok
428428
insert into tmp_sink select str from s_1;
429429

430430
# changes should not be consumed
431-
query I
431+
query T
432432
select str from s_1 order by str;
433433
----
434434
a
@@ -459,7 +459,7 @@ select count() from tmp_sink
459459
statement ok
460460
commit;
461461

462-
query I
462+
query T
463463
select str from s_1 order by str;
464464
----
465465

@@ -486,7 +486,7 @@ statement ok
486486
explain select str from s_1;
487487

488488
# explain should not consume the stream
489-
query I
489+
query T
490490
select str from s_1 order by str;
491491
----
492492
a
@@ -508,7 +508,7 @@ begin;
508508
statement ok
509509
insert into target_1 select str from s_1;
510510

511-
query I
511+
query T
512512
select str from s_1 with consume order by str;
513513
----
514514
a
@@ -520,13 +520,13 @@ insert into target_2 select str from s_1;
520520
statement ok
521521
commit;
522522

523-
query I
523+
query T
524524
select str from target_1 order by str;
525525
----
526526
a
527527
b
528528

529-
query I
529+
query T
530530
select str from target_2 order by str;
531531
----
532532

tests/sqllogictests/suites/ee/06_ee_stream/06_0003_stream_multi_table_insert.test

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
# multi table INSERT
1616
statement ok
17-
create or replace database test_txn_stream;
17+
create or replace database test_txn_stream_1;
1818

1919
statement ok
20-
use test_txn_stream;
20+
use test_txn_stream_1;
2121

2222
statement ok
2323
CREATE TABLE t_append_only(a INT);
@@ -194,4 +194,4 @@ SELECT * FROM t_consume_append_only_6_1; -- empty
194194
----
195195

196196
statement ok
197-
drop database test_txn_stream;
197+
drop database test_txn_stream_1;

0 commit comments

Comments
 (0)