Skip to content

Commit 00ef9d6

Browse files
committed
Implement physical optimizer rule
1 parent 263003d commit 00ef9d6

5 files changed

Lines changed: 225 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ datafusion-functions-table = { workspace = true, optional = true }
6161
datafusion-functions-window = { workspace = true, optional = true }
6262
datafusion-physical-expr = { workspace = true }
6363
datafusion-physical-expr-common = { workspace = true }
64+
datafusion-physical-optimizer = { workspace = true }
6465
datafusion-physical-plan = { workspace = true }
6566
datafusion-proto = { workspace = true }
6667
datafusion-proto-common = { workspace = true }

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod execution_plan;
3434
pub mod expr;
3535
pub mod insert_op;
3636
pub mod physical_expr;
37+
pub mod physical_optimizer;
3738
pub mod plan_properties;
3839
pub mod proto;
3940
pub mod record_batch_stream;
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::ffi::c_void;
19+
use std::sync::Arc;
20+
21+
use abi_stable::StableAbi;
22+
use abi_stable::std_types::{RResult, RStr};
23+
use async_trait::async_trait;
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_common::error::Result;
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use tokio::runtime::Handle;
29+
30+
use crate::config::FFI_ConfigOptions;
31+
use crate::execution_plan::FFI_ExecutionPlan;
32+
use crate::util::FFIResult;
33+
use crate::{df_result, rresult_return};
34+
35+
/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries.
36+
#[repr(C)]
37+
#[derive(Debug, StableAbi)]
38+
pub struct FFI_PhysicalOptimizerRule {
39+
pub optimize: unsafe extern "C" fn(
40+
&Self,
41+
plan: &FFI_ExecutionPlan,
42+
config: FFI_ConfigOptions,
43+
) -> FFIResult<FFI_ExecutionPlan>,
44+
45+
pub name: unsafe extern "C" fn(&Self) -> RStr,
46+
47+
pub schema_check: unsafe extern "C" fn(&Self) -> bool,
48+
49+
/// Used to create a clone on the provider of the execution plan. This should
50+
/// only need to be called by the receiver of the plan.
51+
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
52+
53+
/// Release the memory of the private data when it is no longer being used.
54+
pub release: unsafe extern "C" fn(arg: &mut Self),
55+
56+
/// Return the major DataFusion version number of this provider.
57+
pub version: unsafe extern "C" fn() -> u64,
58+
59+
/// Internal data. This is only to be accessed by the provider of the plan.
60+
/// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data.
61+
pub private_data: *mut c_void,
62+
63+
/// Utility to identify when FFI objects are accessed locally through
64+
/// the foreign interface.
65+
pub library_marker_id: extern "C" fn() -> usize,
66+
}
67+
68+
unsafe impl Send for FFI_PhysicalOptimizerRule {}
69+
unsafe impl Sync for FFI_PhysicalOptimizerRule {}
70+
71+
struct RulePrivateData {
72+
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
73+
runtime: Option<Handle>,
74+
}
75+
76+
impl FFI_PhysicalOptimizerRule {
77+
fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> {
78+
let private_data = self.private_data as *const RulePrivateData;
79+
unsafe { &(*private_data).rule }
80+
}
81+
82+
fn runtime(&self) -> Option<Handle> {
83+
let private_data = self.private_data as *const RulePrivateData;
84+
unsafe { (*private_data).runtime.clone() }
85+
}
86+
}
87+
88+
unsafe extern "C" fn optimize_fn_wrapper(
89+
rule: &FFI_PhysicalOptimizerRule,
90+
plan: &FFI_ExecutionPlan,
91+
config: FFI_ConfigOptions,
92+
) -> FFIResult<FFI_ExecutionPlan> {
93+
let runtime = rule.runtime();
94+
let rule = rule.inner();
95+
let plan: Arc<dyn ExecutionPlan> = rresult_return!(plan.try_into());
96+
let config = rresult_return!(ConfigOptions::try_from(config));
97+
let optimized_plan = rresult_return!(rule.optimize(plan, &config));
98+
99+
RResult::ROk(FFI_ExecutionPlan::new(optimized_plan, runtime))
100+
}
101+
102+
unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> RStr<'_> {
103+
let rule = rule.inner();
104+
rule.name().into()
105+
}
106+
107+
unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool {
108+
rule.inner().schema_check()
109+
}
110+
111+
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalOptimizerRule) {
112+
let private_data =
113+
unsafe { Box::from_raw(provider.private_data as *mut RulePrivateData) };
114+
drop(private_data);
115+
}
116+
117+
unsafe extern "C" fn clone_fn_wrapper(
118+
rule: &FFI_PhysicalOptimizerRule,
119+
) -> FFI_PhysicalOptimizerRule {
120+
let runtime = rule.runtime();
121+
let rule = Arc::clone(rule.inner());
122+
123+
let private_data =
124+
Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void;
125+
126+
FFI_PhysicalOptimizerRule {
127+
optimize: optimize_fn_wrapper,
128+
name: name_fn_wrapper,
129+
schema_check: schema_check_fn_wrapper,
130+
clone: clone_fn_wrapper,
131+
release: release_fn_wrapper,
132+
version: super::version,
133+
private_data,
134+
library_marker_id: crate::get_library_marker_id,
135+
}
136+
}
137+
138+
impl Drop for FFI_PhysicalOptimizerRule {
139+
fn drop(&mut self) {
140+
unsafe { (self.release)(self) }
141+
}
142+
}
143+
144+
impl FFI_PhysicalOptimizerRule {
145+
/// Creates a new [`FFI_PhysicalOptimizerRule`].
146+
pub fn new(
147+
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
148+
runtime: Option<Handle>,
149+
) -> Self {
150+
if let Some(rule) = (Arc::clone(&rule) as Arc<dyn std::any::Any>)
151+
.downcast_ref::<ForeignPhysicalOptimizerRule>()
152+
{
153+
return rule.0.clone();
154+
}
155+
156+
let private_data = Box::new(RulePrivateData { rule, runtime });
157+
let private_data = Box::into_raw(private_data) as *mut c_void;
158+
159+
Self {
160+
optimize: optimize_fn_wrapper,
161+
name: name_fn_wrapper,
162+
schema_check: schema_check_fn_wrapper,
163+
clone: clone_fn_wrapper,
164+
release: release_fn_wrapper,
165+
version: super::version,
166+
private_data,
167+
library_marker_id: crate::get_library_marker_id,
168+
}
169+
}
170+
}
171+
172+
/// This wrapper struct exists on the receiver side of the FFI interface, so it has
173+
/// no guarantees about being able to access the data in `private_data`. Any functions
174+
/// defined on this struct must only use the stable functions provided in
175+
/// FFI_PhysicalOptimizerRule to interact with the foreign table provider.
176+
#[derive(Debug)]
177+
pub struct ForeignPhysicalOptimizerRule(pub FFI_PhysicalOptimizerRule);
178+
179+
unsafe impl Send for ForeignPhysicalOptimizerRule {}
180+
unsafe impl Sync for ForeignPhysicalOptimizerRule {}
181+
182+
impl From<&FFI_PhysicalOptimizerRule> for Arc<dyn PhysicalOptimizerRule + Send + Sync> {
183+
fn from(provider: &FFI_PhysicalOptimizerRule) -> Self {
184+
if (provider.library_marker_id)() == crate::get_library_marker_id() {
185+
return Arc::clone(provider.inner());
186+
}
187+
188+
Arc::new(ForeignPhysicalOptimizerRule(provider.clone()))
189+
as Arc<dyn PhysicalOptimizerRule + Send + Sync>
190+
}
191+
}
192+
193+
impl Clone for FFI_PhysicalOptimizerRule {
194+
fn clone(&self) -> Self {
195+
unsafe { (self.clone)(self) }
196+
}
197+
}
198+
199+
#[async_trait]
200+
impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
201+
fn optimize(
202+
&self,
203+
plan: Arc<dyn ExecutionPlan>,
204+
config: &ConfigOptions,
205+
) -> Result<Arc<dyn ExecutionPlan>> {
206+
let config_options: FFI_ConfigOptions = config.into();
207+
let plan = FFI_ExecutionPlan::new(plan, None);
208+
209+
let optimized_plan =
210+
unsafe { df_result!((self.0.optimize)(&self.0, &plan, config_options))? };
211+
(&optimized_plan).try_into()
212+
}
213+
214+
fn name(&self) -> &str {
215+
unsafe { (self.0.name)(&self.0).as_str() }
216+
}
217+
218+
fn schema_check(&self) -> bool {
219+
unsafe { (self.0.schema_check)(&self.0) }
220+
}
221+
}

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion_physical_plan::ExecutionPlan;
4848
/// `PhysicalOptimizerRule`s.
4949
///
5050
/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
51-
pub trait PhysicalOptimizerRule: Debug {
51+
pub trait PhysicalOptimizerRule: Debug + std::any::Any {
5252
/// Rewrite `plan` to an optimized form
5353
fn optimize(
5454
&self,

0 commit comments

Comments
 (0)