Skip to content

Commit 4f13319

Browse files
authored
feat: implement PhysicalOptimizerRule in FFI crate (#20451)
## Which issue does this PR close? - Closes #20450 ## Rationale for this change This PR is a pure addition to implement a FFI safe PhysicalOptimizerRule. With this change downstream projects, such as `datafusion-python` can share optimizer rules across libraries. ## What changes are included in this PR? Implement physical optimizer rule following same pattern as the rest of the FFI crate. ## Are these changes tested? Both unit and integration tests are provided. ## Are there any user-facing changes? None
1 parent 6980fcf commit 4f13319

8 files changed

Lines changed: 511 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ datafusion-functions-table = { workspace = true, optional = true }
6161
datafusion-functions-window = { workspace = true, optional = true }
6262
datafusion-physical-expr = { workspace = true }
6363
datafusion-physical-expr-common = { workspace = true }
64+
datafusion-physical-optimizer = { workspace = true }
6465
datafusion-physical-plan = { workspace = true }
6566
datafusion-proto = { workspace = true }
6667
datafusion-proto-common = { workspace = true }

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod execution_plan;
3434
pub mod expr;
3535
pub mod insert_op;
3636
pub mod physical_expr;
37+
pub mod physical_optimizer;
3738
pub mod plan_properties;
3839
pub mod proto;
3940
pub mod record_batch_stream;
Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::ffi::c_void;
19+
use std::sync::Arc;
20+
21+
use abi_stable::StableAbi;
22+
use abi_stable::std_types::{RResult, RStr};
23+
use async_trait::async_trait;
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_common::error::Result;
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use tokio::runtime::Handle;
29+
30+
use crate::config::FFI_ConfigOptions;
31+
use crate::execution_plan::FFI_ExecutionPlan;
32+
use crate::util::FFIResult;
33+
use crate::{df_result, rresult_return};
34+
35+
/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries.
36+
#[repr(C)]
37+
#[derive(Debug, StableAbi)]
38+
pub struct FFI_PhysicalOptimizerRule {
39+
pub optimize: unsafe extern "C" fn(
40+
&Self,
41+
plan: &FFI_ExecutionPlan,
42+
config: FFI_ConfigOptions,
43+
) -> FFIResult<FFI_ExecutionPlan>,
44+
45+
pub name: unsafe extern "C" fn(&Self) -> RStr,
46+
47+
pub schema_check: unsafe extern "C" fn(&Self) -> bool,
48+
49+
/// Used to create a clone on the rule. This should
50+
/// only need to be called by the receiver of the plan.
51+
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
52+
53+
/// Release the memory of the private data when it is no longer being used.
54+
pub release: unsafe extern "C" fn(arg: &mut Self),
55+
56+
/// Return the major DataFusion version number of this rule.
57+
pub version: unsafe extern "C" fn() -> u64,
58+
59+
/// Internal data. This is only to be accessed by the provider of the rule.
60+
/// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data.
61+
pub private_data: *mut c_void,
62+
63+
/// Utility to identify when FFI objects are accessed locally through
64+
/// the foreign interface.
65+
pub library_marker_id: extern "C" fn() -> usize,
66+
}
67+
68+
unsafe impl Send for FFI_PhysicalOptimizerRule {}
69+
unsafe impl Sync for FFI_PhysicalOptimizerRule {}
70+
71+
struct RulePrivateData {
72+
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
73+
runtime: Option<Handle>,
74+
}
75+
76+
impl FFI_PhysicalOptimizerRule {
77+
fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> {
78+
let private_data = self.private_data as *const RulePrivateData;
79+
unsafe { &(*private_data).rule }
80+
}
81+
82+
fn runtime(&self) -> Option<Handle> {
83+
let private_data = self.private_data as *const RulePrivateData;
84+
unsafe { (*private_data).runtime.clone() }
85+
}
86+
}
87+
88+
unsafe extern "C" fn optimize_fn_wrapper(
89+
rule: &FFI_PhysicalOptimizerRule,
90+
plan: &FFI_ExecutionPlan,
91+
config: FFI_ConfigOptions,
92+
) -> FFIResult<FFI_ExecutionPlan> {
93+
let runtime = rule.runtime();
94+
let rule = rule.inner();
95+
let plan: Arc<dyn ExecutionPlan> = rresult_return!(plan.try_into());
96+
let config = rresult_return!(ConfigOptions::try_from(config));
97+
let optimized_plan = rresult_return!(rule.optimize(plan, &config));
98+
99+
RResult::ROk(FFI_ExecutionPlan::new(optimized_plan, runtime))
100+
}
101+
102+
unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> RStr<'_> {
103+
let rule = rule.inner();
104+
rule.name().into()
105+
}
106+
107+
unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool {
108+
rule.inner().schema_check()
109+
}
110+
111+
unsafe extern "C" fn release_fn_wrapper(rule: &mut FFI_PhysicalOptimizerRule) {
112+
unsafe {
113+
debug_assert!(!rule.private_data.is_null());
114+
let private_data = Box::from_raw(rule.private_data as *mut RulePrivateData);
115+
drop(private_data);
116+
rule.private_data = std::ptr::null_mut();
117+
}
118+
}
119+
120+
unsafe extern "C" fn clone_fn_wrapper(
121+
rule: &FFI_PhysicalOptimizerRule,
122+
) -> FFI_PhysicalOptimizerRule {
123+
let runtime = rule.runtime();
124+
let rule = Arc::clone(rule.inner());
125+
126+
let private_data =
127+
Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void;
128+
129+
FFI_PhysicalOptimizerRule {
130+
optimize: optimize_fn_wrapper,
131+
name: name_fn_wrapper,
132+
schema_check: schema_check_fn_wrapper,
133+
clone: clone_fn_wrapper,
134+
release: release_fn_wrapper,
135+
version: super::version,
136+
private_data,
137+
library_marker_id: crate::get_library_marker_id,
138+
}
139+
}
140+
141+
impl Drop for FFI_PhysicalOptimizerRule {
142+
fn drop(&mut self) {
143+
unsafe { (self.release)(self) }
144+
}
145+
}
146+
147+
impl FFI_PhysicalOptimizerRule {
148+
/// Creates a new [`FFI_PhysicalOptimizerRule`].
149+
pub fn new(
150+
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
151+
runtime: Option<Handle>,
152+
) -> Self {
153+
if let Some(rule) = (Arc::clone(&rule) as Arc<dyn std::any::Any>)
154+
.downcast_ref::<ForeignPhysicalOptimizerRule>()
155+
{
156+
return rule.0.clone();
157+
}
158+
159+
let private_data = Box::new(RulePrivateData { rule, runtime });
160+
let private_data = Box::into_raw(private_data) as *mut c_void;
161+
162+
Self {
163+
optimize: optimize_fn_wrapper,
164+
name: name_fn_wrapper,
165+
schema_check: schema_check_fn_wrapper,
166+
clone: clone_fn_wrapper,
167+
release: release_fn_wrapper,
168+
version: super::version,
169+
private_data,
170+
library_marker_id: crate::get_library_marker_id,
171+
}
172+
}
173+
}
174+
175+
/// This wrapper struct exists on the receiver side of the FFI interface, so it has
176+
/// no guarantees about being able to access the data in `private_data`. Any functions
177+
/// defined on this struct must only use the stable functions provided in
178+
/// FFI_PhysicalOptimizerRule to interact with the foreign rule.
179+
#[derive(Debug)]
180+
pub struct ForeignPhysicalOptimizerRule(pub FFI_PhysicalOptimizerRule);
181+
182+
unsafe impl Send for ForeignPhysicalOptimizerRule {}
183+
unsafe impl Sync for ForeignPhysicalOptimizerRule {}
184+
185+
impl From<&FFI_PhysicalOptimizerRule> for Arc<dyn PhysicalOptimizerRule + Send + Sync> {
186+
fn from(rule: &FFI_PhysicalOptimizerRule) -> Self {
187+
if (rule.library_marker_id)() == crate::get_library_marker_id() {
188+
return Arc::clone(rule.inner());
189+
}
190+
191+
Arc::new(ForeignPhysicalOptimizerRule(rule.clone()))
192+
as Arc<dyn PhysicalOptimizerRule + Send + Sync>
193+
}
194+
}
195+
196+
impl Clone for FFI_PhysicalOptimizerRule {
197+
fn clone(&self) -> Self {
198+
unsafe { (self.clone)(self) }
199+
}
200+
}
201+
202+
#[async_trait]
203+
impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
204+
fn optimize(
205+
&self,
206+
plan: Arc<dyn ExecutionPlan>,
207+
config: &ConfigOptions,
208+
) -> Result<Arc<dyn ExecutionPlan>> {
209+
let config_options: FFI_ConfigOptions = config.into();
210+
let plan = FFI_ExecutionPlan::new(plan, None);
211+
212+
let optimized_plan =
213+
unsafe { df_result!((self.0.optimize)(&self.0, &plan, config_options))? };
214+
(&optimized_plan).try_into()
215+
}
216+
217+
fn name(&self) -> &str {
218+
unsafe { (self.0.name)(&self.0).as_str() }
219+
}
220+
221+
fn schema_check(&self) -> bool {
222+
unsafe { (self.0.schema_check)(&self.0) }
223+
}
224+
}
225+
226+
#[cfg(test)]
227+
mod tests {
228+
use std::sync::Arc;
229+
230+
use arrow::datatypes::{DataType, Field, Schema};
231+
use datafusion_common::config::ConfigOptions;
232+
use datafusion_common::error::Result;
233+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
234+
use datafusion_physical_plan::ExecutionPlan;
235+
236+
use super::*;
237+
use crate::execution_plan::tests::EmptyExec;
238+
239+
#[derive(Debug)]
240+
struct NoOpRule {
241+
schema_check: bool,
242+
}
243+
244+
impl PhysicalOptimizerRule for NoOpRule {
245+
fn optimize(
246+
&self,
247+
plan: Arc<dyn ExecutionPlan>,
248+
_config: &ConfigOptions,
249+
) -> Result<Arc<dyn ExecutionPlan>> {
250+
Ok(plan)
251+
}
252+
253+
fn name(&self) -> &str {
254+
"no_op_rule"
255+
}
256+
257+
fn schema_check(&self) -> bool {
258+
self.schema_check
259+
}
260+
}
261+
262+
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
263+
let schema =
264+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
265+
Arc::new(EmptyExec::new(schema))
266+
}
267+
268+
#[test]
269+
fn test_round_trip_ffi_physical_optimizer_rule() -> Result<()> {
270+
for expected_schema_check in [true, false] {
271+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(NoOpRule {
272+
schema_check: expected_schema_check,
273+
});
274+
275+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
276+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
277+
278+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
279+
(&ffi_rule).into();
280+
281+
assert_eq!(foreign_rule.name(), "no_op_rule");
282+
assert_eq!(foreign_rule.schema_check(), expected_schema_check);
283+
}
284+
285+
Ok(())
286+
}
287+
288+
#[test]
289+
fn test_round_trip_optimize() -> Result<()> {
290+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
291+
Arc::new(NoOpRule { schema_check: true });
292+
293+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
294+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
295+
296+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
297+
(&ffi_rule).into();
298+
299+
let plan = create_test_plan();
300+
let config = ConfigOptions::new();
301+
302+
let optimized = foreign_rule.optimize(plan, &config)?;
303+
assert_eq!(optimized.name(), "empty-exec");
304+
305+
Ok(())
306+
}
307+
308+
#[test]
309+
fn test_local_bypass() -> Result<()> {
310+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
311+
Arc::new(NoOpRule { schema_check: true });
312+
313+
// Without mock marker, local bypass should return the original rule
314+
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
315+
let recovered: Arc<dyn PhysicalOptimizerRule + Send + Sync> = (&ffi_rule).into();
316+
let any_ref: &dyn std::any::Any = &*recovered;
317+
assert!(any_ref.downcast_ref::<NoOpRule>().is_some());
318+
319+
// With mock marker, should wrap in ForeignPhysicalOptimizerRule
320+
let rule2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
321+
Arc::new(NoOpRule { schema_check: true });
322+
let mut ffi_rule2 = FFI_PhysicalOptimizerRule::new(rule2, None);
323+
ffi_rule2.library_marker_id = crate::mock_foreign_marker_id;
324+
let recovered2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
325+
(&ffi_rule2).into();
326+
let any_ref2: &dyn std::any::Any = &*recovered2;
327+
assert!(
328+
any_ref2
329+
.downcast_ref::<ForeignPhysicalOptimizerRule>()
330+
.is_some()
331+
);
332+
333+
Ok(())
334+
}
335+
336+
#[test]
337+
fn test_clone() -> Result<()> {
338+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
339+
Arc::new(NoOpRule { schema_check: true });
340+
341+
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
342+
let cloned = ffi_rule.clone();
343+
344+
assert_eq!(unsafe { (ffi_rule.name)(&ffi_rule).as_str() }, unsafe {
345+
(cloned.name)(&cloned).as_str()
346+
});
347+
348+
Ok(())
349+
}
350+
351+
#[test]
352+
fn test_foreign_rule_rewrap_bypass() -> Result<()> {
353+
// When creating an FFI wrapper from a ForeignPhysicalOptimizerRule,
354+
// it should return the inner FFI rule rather than double-wrapping.
355+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
356+
Arc::new(NoOpRule { schema_check: true });
357+
358+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
359+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
360+
361+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
362+
(&ffi_rule).into();
363+
364+
// Now wrap the foreign rule back into FFI - should not double-wrap
365+
let re_wrapped = FFI_PhysicalOptimizerRule::new(foreign_rule, None);
366+
assert_eq!(
367+
unsafe { (re_wrapped.name)(&re_wrapped).as_str() },
368+
"no_op_rule"
369+
);
370+
371+
Ok(())
372+
}
373+
}

0 commit comments

Comments
 (0)