Skip to content

Commit 3aa6724

Browse files
committed
feat(execution plan): add reusable plan
This patch adds an execution plan wrapper that allows to bind parameters and reuse plans with placeholders.
1 parent bcb311f commit 3aa6724

8 files changed

Lines changed: 439 additions & 192 deletions

File tree

datafusion/core/benches/reset_plan_states.rs

Lines changed: 169 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::cell::OnceCell;
1819
use std::sync::{Arc, LazyLock};
1920

2021
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
22+
use criterion::measurement::WallTime;
2123
use criterion::{Criterion, criterion_group, criterion_main};
2224
use datafusion::prelude::SessionContext;
2325
use datafusion_catalog::MemTable;
26+
use datafusion_common::metadata::ScalarAndMetadata;
27+
use datafusion_common::{ParamValues, ScalarValue};
2428
use datafusion_physical_plan::ExecutionPlan;
2529
use datafusion_physical_plan::displayable;
26-
use datafusion_physical_plan::execution_plan::prepare_execution;
2730
use datafusion_physical_plan::execution_plan::reset_plan_states;
31+
use datafusion_physical_plan::reuse::ReusableExecutionPlan;
2832
use tokio::runtime::Runtime;
2933

3034
const NUM_FIELDS: usize = 1000;
@@ -38,6 +42,44 @@ static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
3842
))
3943
});
4044

45+
/// Decides when to generate placeholders, helping to form a query
46+
/// with a certain placeholders percent.
47+
struct PlaceholderCounter {
48+
placeholders_percent: usize,
49+
c: usize,
50+
placeholder_idx: usize,
51+
num_placeholders: usize,
52+
}
53+
54+
impl PlaceholderCounter {
55+
fn new(placeholders_percent: usize) -> Self {
56+
Self {
57+
placeholders_percent,
58+
c: 0,
59+
placeholder_idx: 0,
60+
num_placeholders: 0,
61+
}
62+
}
63+
64+
fn placeholder(&mut self) -> Option<String> {
65+
let is_placeholder = self.c < self.placeholders_percent;
66+
self.c += 1;
67+
if self.c >= 100 {
68+
self.c = 0;
69+
}
70+
if is_placeholder {
71+
self.num_placeholders += 1;
72+
Some("$1".to_owned())
73+
} else {
74+
None
75+
}
76+
}
77+
78+
fn placeholder_or(&mut self, f: impl FnOnce() -> String) -> String {
79+
self.placeholder().unwrap_or_else(f)
80+
}
81+
}
82+
4183
fn col_name(i: usize) -> String {
4284
format!("x_{i}")
4385
}
@@ -48,7 +90,7 @@ fn aggr_name(i: usize) -> String {
4890

4991
fn physical_plan(
5092
ctx: &SessionContext,
51-
rt: &Runtime,
93+
rt: &tokio::runtime::Handle,
5294
sql: &str,
5395
) -> Arc<dyn ExecutionPlan> {
5496
rt.block_on(async {
@@ -61,15 +103,16 @@ fn physical_plan(
61103
})
62104
}
63105

64-
fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
106+
fn predicate(mut comparee: impl FnMut(usize) -> (String, String), len: usize) -> String {
65107
let mut predicate = String::new();
66108
for i in 0..len {
67109
if i > 0 {
68110
predicate.push_str(" AND ");
69111
}
70-
predicate.push_str(&col_name(i));
112+
let (lhs, rhs) = comparee(i);
113+
predicate.push_str(&lhs);
71114
predicate.push_str(" = ");
72-
predicate.push_str(&i.to_string());
115+
predicate.push_str(&rhs);
73116
}
74117
predicate
75118
}
@@ -84,23 +127,46 @@ fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
84127
///
85128
/// Where `p1` and `p2` some long predicates.
86129
///
87-
fn query1() -> String {
130+
fn query0(placeholders_percent: usize) -> (String, usize) {
131+
let mut plc = PlaceholderCounter::new(placeholders_percent);
88132
let mut query = String::new();
89133
query.push_str("SELECT ");
90134
for i in 0..NUM_FIELDS {
91135
if i > 0 {
92136
query.push_str(", ");
93137
}
94138
query.push_str("AVG(");
95-
query.push_str(&col_name(i));
139+
140+
if let Some(placeholder) = plc.placeholder() {
141+
query.push_str(&format!("{}+{}", placeholder, col_name(i)));
142+
} else {
143+
query.push_str(&col_name(i));
144+
}
145+
96146
query.push_str(") AS ");
97147
query.push_str(&aggr_name(i));
98148
}
99149
query.push_str(" FROM t WHERE ");
100-
query.push_str(&predicate(col_name, PREDICATE_LEN));
150+
query.push_str(&predicate(
151+
|i| {
152+
(
153+
plc.placeholder_or(|| col_name(i)),
154+
plc.placeholder_or(|| col_name(i + 1)),
155+
)
156+
},
157+
PREDICATE_LEN,
158+
));
101159
query.push_str(" HAVING ");
102-
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
103-
query
160+
query.push_str(&predicate(
161+
|i| {
162+
(
163+
plc.placeholder_or(|| aggr_name(i)),
164+
plc.placeholder_or(|| aggr_name(i + 1)),
165+
)
166+
},
167+
PREDICATE_LEN,
168+
));
169+
(query, plc.num_placeholders)
104170
}
105171

106172
/// Returns a typical plan for the query like:
@@ -110,27 +176,35 @@ fn query1() -> String {
110176
/// WHERE p1
111177
/// ```
112178
///
113-
fn query2() -> String {
179+
fn query1(placeholders_percent: usize) -> (String, usize) {
180+
let mut plc = PlaceholderCounter::new(placeholders_percent);
114181
let mut query = String::new();
115182
query.push_str("SELECT ");
116183
for i in (0..NUM_FIELDS).step_by(2) {
117184
if i > 0 {
118185
query.push_str(", ");
119186
}
120-
if (i / 2) % 2 == 0 {
121-
query.push_str(&format!("t.{}", col_name(i)));
187+
let col = if (i / 2) % 2 == 0 {
188+
format!("t.{}", col_name(i))
122189
} else {
123-
query.push_str(&format!("v.{}", col_name(i)));
124-
}
190+
format!("v.{}", col_name(i))
191+
};
192+
let add = plc.placeholder_or(|| "1".to_owned());
193+
let proj = format!("{col} + {add}");
194+
query.push_str(&proj);
125195
}
126196
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
127197

128-
fn qualified_name(i: usize) -> String {
129-
format!("t.{}", col_name(i))
130-
}
131-
132-
query.push_str(&predicate(qualified_name, PREDICATE_LEN));
133-
query
198+
query.push_str(&predicate(
199+
|i| {
200+
(
201+
plc.placeholder_or(|| format!("t.{}", col_name(i))),
202+
plc.placeholder_or(|| i.to_string()),
203+
)
204+
},
205+
PREDICATE_LEN,
206+
));
207+
(query, plc.num_placeholders)
134208
}
135209

136210
/// Returns a typical plan for the query like:
@@ -140,7 +214,8 @@ fn query2() -> String {
140214
/// WHERE p
141215
/// ```
142216
///
143-
fn query3() -> String {
217+
fn query2(placeholders_percent: usize) -> (String, usize) {
218+
let mut plc = PlaceholderCounter::new(placeholders_percent);
144219
let mut query = String::new();
145220
query.push_str("SELECT ");
146221

@@ -151,24 +226,23 @@ fn query3() -> String {
151226
}
152227
query.push_str(&col_name(i * 2));
153228
query.push_str(" + ");
154-
query.push_str(&col_name(i * 2 + 1));
229+
query.push_str(&plc.placeholder_or(|| col_name(i * 2 + 1)));
155230
}
156231

157232
query.push_str(" FROM t WHERE ");
158-
query.push_str(&predicate(col_name, PREDICATE_LEN));
159-
query
233+
query.push_str(&predicate(
234+
|i| {
235+
(
236+
plc.placeholder_or(|| col_name(i)),
237+
plc.placeholder_or(|| i.to_string()),
238+
)
239+
},
240+
PREDICATE_LEN,
241+
));
242+
(query, plc.num_placeholders)
160243
}
161244

162-
fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
163-
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
164-
}
165-
166-
/// Benchmark is intended to measure overhead of actions, required to perform
167-
/// making an independent instance of the execution plan to re-execute it, avoiding
168-
/// re-planning stage.
169-
fn bench_reset_plan_states(c: &mut Criterion) {
170-
env_logger::init();
171-
245+
fn init() -> (SessionContext, Runtime) {
172246
let rt = Runtime::new().unwrap();
173247
let ctx = SessionContext::new();
174248
ctx.register_table(
@@ -182,56 +256,73 @@ fn bench_reset_plan_states(c: &mut Criterion) {
182256
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
183257
)
184258
.unwrap();
185-
186-
macro_rules! bench_query {
187-
($query_producer: expr) => {{
188-
let sql = $query_producer();
189-
let plan = physical_plan(&ctx, &rt, &sql);
190-
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
191-
move |b| run_reset_states(b, &plan)
192-
}};
193-
}
194-
195-
c.bench_function("query1", bench_query!(query1));
196-
c.bench_function("query2", bench_query!(query2));
197-
c.bench_function("query3", bench_query!(query3));
259+
(ctx, rt)
198260
}
199261

200-
fn run_prepare_execution(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
201-
b.iter(|| std::hint::black_box(prepare_execution(Arc::clone(plan), None).unwrap()));
262+
/// Benchmark is intended to measure overhead of actions, required to perform
263+
/// making an independent instance of the execution plan to re-execute it, avoiding
264+
/// re-planning stage.
265+
fn bench_reset(
266+
g: &mut criterion::BenchmarkGroup<'_, WallTime>,
267+
query_fn: impl FnOnce() -> String,
268+
) {
269+
let (ctx, rt) = init();
270+
let query = query_fn();
271+
let rt = rt.handle();
272+
let plan: OnceCell<Arc<dyn ExecutionPlan>> = OnceCell::new();
273+
g.bench_function("reset", |b| {
274+
let plan = plan.get_or_init(|| {
275+
log::info!("sql:\n{}\n\n", query);
276+
let plan = physical_plan(&ctx, rt, &query);
277+
log::info!("plan:\n{}", displayable(plan.as_ref()).indent(true));
278+
plan
279+
});
280+
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(&plan)).unwrap()))
281+
});
202282
}
203283

204-
/// Benchmark is intended to measure overhead of actions, required to perform
205-
/// making an independent instance of the execution plan to re-execute it with placeholders,
206-
/// avoiding re-planning stage.
207-
fn bench_prepare_execution(c: &mut Criterion) {
208-
let rt = Runtime::new().unwrap();
209-
let ctx = SessionContext::new();
210-
ctx.register_table(
211-
"t",
212-
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
213-
)
214-
.unwrap();
284+
/// The same as [`bench_reset`] for placeholdered plans.
285+
/// `placeholders_percent` is a percent of placeholders that must be used in generated queries.
286+
fn bench_bind(
287+
g: &mut criterion::BenchmarkGroup<'_, WallTime>,
288+
placeholders_percent: usize,
289+
query_fn: impl FnOnce(usize) -> (String, usize),
290+
) {
291+
let (ctx, rt) = init();
292+
let params = ParamValues::List(vec![ScalarAndMetadata::new(
293+
ScalarValue::Int64(Some(42)),
294+
None,
295+
)]);
296+
let (query, num_placeholders) = query_fn(placeholders_percent);
297+
let rt = rt.handle();
298+
let plan: OnceCell<ReusableExecutionPlan> = OnceCell::new();
299+
g.bench_function(format!("{num_placeholders}_placeholders"), move |b| {
300+
let plan = plan.get_or_init(|| {
301+
log::info!("sql:\n{}\n\n", query);
302+
let plan = physical_plan(&ctx, rt, &query);
303+
log::info!("plan:\n{}", displayable(plan.as_ref()).indent(true));
304+
plan.into()
305+
});
306+
b.iter(|| std::hint::black_box(plan.bind(Some(&params))))
307+
});
308+
}
215309

216-
ctx.register_table(
217-
"v",
218-
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
219-
)
220-
.unwrap();
310+
fn criterion_benchmark(c: &mut Criterion) {
311+
env_logger::init();
221312

222-
macro_rules! bench_query {
223-
($query_producer: expr) => {{
224-
let sql = $query_producer();
225-
let plan = physical_plan(&ctx, &rt, &sql);
226-
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
227-
move |b| run_prepare_execution(b, &plan)
228-
}};
313+
for (query_idx, query_fn) in [query0, query1, query2].iter().enumerate() {
314+
{
315+
let mut g = c.benchmark_group(format!("reset_query{query_idx}"));
316+
bench_reset(&mut g, || query_fn(0).0);
317+
}
318+
{
319+
let mut g = c.benchmark_group(format!("bind_query{query_idx}"));
320+
for placeholders_percent in [0, 1, 10, 50, 100] {
321+
bench_bind(&mut g, placeholders_percent, query_fn);
322+
}
323+
}
229324
}
230-
231-
c.bench_function("query1", bench_query!(query1));
232-
c.bench_function("query2", bench_query!(query2));
233-
c.bench_function("query3", bench_query!(query3));
234325
}
235326

236-
criterion_group!(benches, bench_reset_plan_states, bench_prepare_execution);
327+
criterion_group!(benches, criterion_benchmark);
237328
criterion_main!(benches);

0 commit comments

Comments
 (0)