Skip to content

Commit a9c0901

Browse files
authored
Add support for FFI config extensions (#19469)
## Which issue does this PR close? This addresses part of #17035 This is also a blocker for #20450 ## Rationale for this change Currently we cannot support user defined configuration extensions via FFI. This is because much of the infrastructure on how to add and extract custom extensions relies on knowing concrete types of the extensions. This is not supported in FFI. This PR adds an implementation of configuration extensions that can be used across a FFI boundary. ## What changes are included in this PR? - Implement `FFI_ExtensionOptions`. - Update `ConfigOptions` to check if a `datafusion_ffi` namespace exists when setting values - Add unit test ## Are these changes tested? Unit test added. Also tested against `datafusion-python` locally. With this code I have the following test that passes. I have created a simple python exposed `MyConfig`: ```python from datafusion import SessionConfig from datafusion_ffi_example import MyConfig def test_catalog_provider(): config = MyConfig() config = SessionConfig().with_extension(config) config.set("my_config.baz_count", "42") ``` ## Are there any user-facing changes? New addition only.
1 parent 4a41587 commit a9c0901

File tree

8 files changed

+678
-35
lines changed

8 files changed

+678
-35
lines changed

datafusion/common/src/config.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
12561256
}
12571257

12581258
/// A key value pair, with a corresponding description
1259-
#[derive(Debug, Hash, PartialEq, Eq)]
1259+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
12601260
pub struct ConfigEntry {
12611261
/// A unique string to identify this config value
12621262
pub key: String,
@@ -1352,6 +1352,10 @@ impl ConfigField for ConfigOptions {
13521352
}
13531353
}
13541354

1355+
/// This namespace is reserved for interacting with Foreign Function Interface
1356+
/// (FFI) based configuration extensions.
1357+
pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1358+
13551359
impl ConfigOptions {
13561360
/// Creates a new [`ConfigOptions`] with default values
13571361
pub fn new() -> Self {
@@ -1366,12 +1370,12 @@ impl ConfigOptions {
13661370

13671371
/// Set a configuration option
13681372
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1369-
let Some((prefix, key)) = key.split_once('.') else {
1373+
let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
13701374
return _config_err!("could not find config namespace for key \"{key}\"");
13711375
};
13721376

13731377
if prefix == "datafusion" {
1374-
if key == "optimizer.enable_dynamic_filter_pushdown" {
1378+
if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
13751379
let bool_value = value.parse::<bool>().map_err(|e| {
13761380
DataFusionError::Configuration(format!(
13771381
"Failed to parse '{value}' as bool: {e}",
@@ -1386,13 +1390,23 @@ impl ConfigOptions {
13861390
}
13871391
return Ok(());
13881392
}
1389-
return ConfigField::set(self, key, value);
1393+
return ConfigField::set(self, inner_key, value);
1394+
}
1395+
1396+
if !self.extensions.0.contains_key(prefix)
1397+
&& self
1398+
.extensions
1399+
.0
1400+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1401+
{
1402+
inner_key = key;
1403+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
13901404
}
13911405

13921406
let Some(e) = self.extensions.0.get_mut(prefix) else {
13931407
return _config_err!("Could not find config namespace \"{prefix}\"");
13941408
};
1395-
e.0.set(key, value)
1409+
e.0.set(inner_key, value)
13961410
}
13971411

13981412
/// Create new [`ConfigOptions`], taking values from environment variables
@@ -2157,7 +2171,7 @@ impl TableOptions {
21572171
///
21582172
/// A result indicating success or failure in setting the configuration option.
21592173
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2160-
let Some((prefix, _)) = key.split_once('.') else {
2174+
let Some((mut prefix, _)) = key.split_once('.') else {
21612175
return _config_err!("could not find config namespace for key \"{key}\"");
21622176
};
21632177

@@ -2169,6 +2183,15 @@ impl TableOptions {
21692183
return Ok(());
21702184
}
21712185

2186+
if !self.extensions.0.contains_key(prefix)
2187+
&& self
2188+
.extensions
2189+
.0
2190+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2191+
{
2192+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2193+
}
2194+
21722195
let Some(e) = self.extensions.0.get_mut(prefix) else {
21732196
return _config_err!("Could not find config namespace \"{prefix}\"");
21742197
};
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::HashMap;
20+
use std::ffi::c_void;
21+
22+
use abi_stable::StableAbi;
23+
use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
24+
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
25+
use datafusion_common::{Result, exec_err};
26+
27+
use crate::df_result;
28+
29+
/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
30+
///
31+
/// Unlike other FFI structs in this crate, we do not construct a foreign
32+
/// variant of this object. This is due to the typical method for interacting
33+
/// with extension options is by creating a local struct of your concrete type.
34+
/// To support this methodology use the `to_extension` method instead.
35+
///
36+
/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension
37+
/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys
38+
/// are stored with the full path prefix to avoid overwriting values when using
39+
/// multiple extensions.
40+
#[repr(C)]
41+
#[derive(Debug, StableAbi)]
42+
pub struct FFI_ExtensionOptions {
43+
/// Return a deep clone of this [`ExtensionOptions`]
44+
pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions,
45+
46+
/// Set the given `key`, `value` pair
47+
pub set:
48+
unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>,
49+
50+
/// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
51+
pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>,
52+
53+
/// Release the memory of the private data when it is no longer being used.
54+
pub release: unsafe extern "C" fn(&mut Self),
55+
56+
/// Internal data. This is only to be accessed by the provider of the options.
57+
pub private_data: *mut c_void,
58+
}
59+
60+
unsafe impl Send for FFI_ExtensionOptions {}
61+
unsafe impl Sync for FFI_ExtensionOptions {}
62+
63+
pub struct ExtensionOptionsPrivateData {
64+
pub options: HashMap<String, String>,
65+
}
66+
67+
impl FFI_ExtensionOptions {
68+
#[inline]
69+
fn inner_mut(&mut self) -> &mut HashMap<String, String> {
70+
let private_data = self.private_data as *mut ExtensionOptionsPrivateData;
71+
unsafe { &mut (*private_data).options }
72+
}
73+
74+
#[inline]
75+
fn inner(&self) -> &HashMap<String, String> {
76+
let private_data = self.private_data as *const ExtensionOptionsPrivateData;
77+
unsafe { &(*private_data).options }
78+
}
79+
}
80+
81+
unsafe extern "C" fn cloned_fn_wrapper(
82+
options: &FFI_ExtensionOptions,
83+
) -> FFI_ExtensionOptions {
84+
options
85+
.inner()
86+
.iter()
87+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
88+
.collect::<HashMap<String, String>>()
89+
.into()
90+
}
91+
92+
unsafe extern "C" fn set_fn_wrapper(
93+
options: &mut FFI_ExtensionOptions,
94+
key: RStr,
95+
value: RStr,
96+
) -> RResult<(), RString> {
97+
let _ = options.inner_mut().insert(key.into(), value.into());
98+
RResult::ROk(())
99+
}
100+
101+
unsafe extern "C" fn entries_fn_wrapper(
102+
options: &FFI_ExtensionOptions,
103+
) -> RVec<Tuple2<RString, RString>> {
104+
options
105+
.inner()
106+
.iter()
107+
.map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into())
108+
.collect()
109+
}
110+
111+
unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
112+
unsafe {
113+
debug_assert!(!options.private_data.is_null());
114+
let private_data =
115+
Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData);
116+
drop(private_data);
117+
options.private_data = std::ptr::null_mut();
118+
}
119+
}
120+
121+
impl Default for FFI_ExtensionOptions {
122+
fn default() -> Self {
123+
HashMap::new().into()
124+
}
125+
}
126+
127+
impl From<HashMap<String, String>> for FFI_ExtensionOptions {
128+
fn from(options: HashMap<String, String>) -> Self {
129+
let private_data = ExtensionOptionsPrivateData { options };
130+
131+
Self {
132+
cloned: cloned_fn_wrapper,
133+
set: set_fn_wrapper,
134+
entries: entries_fn_wrapper,
135+
release: release_fn_wrapper,
136+
private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
137+
}
138+
}
139+
}
140+
141+
impl Drop for FFI_ExtensionOptions {
142+
fn drop(&mut self) {
143+
unsafe { (self.release)(self) }
144+
}
145+
}
146+
147+
impl Clone for FFI_ExtensionOptions {
148+
fn clone(&self) -> Self {
149+
unsafe { (self.cloned)(self) }
150+
}
151+
}
152+
153+
impl ConfigExtension for FFI_ExtensionOptions {
154+
const PREFIX: &'static str =
155+
datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE;
156+
}
157+
158+
impl ExtensionOptions for FFI_ExtensionOptions {
159+
fn as_any(&self) -> &dyn Any {
160+
self
161+
}
162+
163+
fn as_any_mut(&mut self) -> &mut dyn Any {
164+
self
165+
}
166+
167+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
168+
let ffi_options = unsafe { (self.cloned)(self) };
169+
Box::new(ffi_options)
170+
}
171+
172+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
173+
if key.split_once('.').is_none() {
174+
return exec_err!("Unable to set FFI config value without namespace set");
175+
};
176+
177+
df_result!(unsafe { (self.set)(self, key.into(), value.into()) })
178+
}
179+
180+
fn entries(&self) -> Vec<ConfigEntry> {
181+
unsafe {
182+
(self.entries)(self)
183+
.into_iter()
184+
.map(|entry_tuple| ConfigEntry {
185+
key: entry_tuple.0.into(),
186+
value: Some(entry_tuple.1.into()),
187+
description: "ffi_config_options",
188+
})
189+
.collect()
190+
}
191+
}
192+
}
193+
194+
impl FFI_ExtensionOptions {
195+
/// Add all of the values in a concrete configuration extension to the
196+
/// FFI variant. This is safe to call on either side of the FFI
197+
/// boundary.
198+
pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> {
199+
for entry in config.entries() {
200+
if let Some(value) = entry.value {
201+
let key = format!("{}.{}", C::PREFIX, entry.key);
202+
self.set(key.as_str(), value.as_str())?;
203+
}
204+
}
205+
206+
Ok(())
207+
}
208+
209+
/// Merge another `FFI_ExtensionOptions` configurations into this one.
210+
/// This is safe to call on either side of the FFI boundary.
211+
pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> {
212+
for entry in other.entries() {
213+
if let Some(value) = entry.value {
214+
self.set(entry.key.as_str(), value.as_str())?;
215+
}
216+
}
217+
Ok(())
218+
}
219+
220+
/// Create a concrete extension type from the FFI variant.
221+
/// This is safe to call on either side of the FFI boundary.
222+
pub fn to_extension<C: ConfigExtension + Default>(&self) -> Result<C> {
223+
let mut result = C::default();
224+
225+
unsafe {
226+
for entry in (self.entries)(self) {
227+
let key = entry.0.as_str();
228+
let value = entry.1.as_str();
229+
230+
if let Some((prefix, inner_key)) = key.split_once('.')
231+
&& prefix == C::PREFIX
232+
{
233+
result.set(inner_key, value)?;
234+
}
235+
}
236+
}
237+
238+
Ok(result)
239+
}
240+
}
241+
242+
#[cfg(test)]
243+
mod tests {
244+
use datafusion_common::config::{ConfigExtension, ConfigOptions};
245+
use datafusion_common::extensions_options;
246+
247+
use crate::config::extension_options::FFI_ExtensionOptions;
248+
249+
// Define a new configuration struct using the `extensions_options` macro
250+
extensions_options! {
251+
/// My own config options.
252+
pub struct MyConfig {
253+
/// Should "foo" be replaced by "bar"?
254+
pub foo_to_bar: bool, default = true
255+
256+
/// How many "baz" should be created?
257+
pub baz_count: usize, default = 1337
258+
}
259+
}
260+
261+
impl ConfigExtension for MyConfig {
262+
const PREFIX: &'static str = "my_config";
263+
}
264+
265+
#[test]
266+
fn round_trip_ffi_extension_options() {
267+
// set up config struct and register extension
268+
let mut config = ConfigOptions::default();
269+
let mut ffi_options = FFI_ExtensionOptions::default();
270+
ffi_options.add_config(&MyConfig::default()).unwrap();
271+
272+
config.extensions.insert(ffi_options);
273+
274+
// overwrite config default
275+
config.set("my_config.baz_count", "42").unwrap();
276+
277+
// check config state
278+
let returned_ffi_config =
279+
config.extensions.get::<FFI_ExtensionOptions>().unwrap();
280+
let my_config: MyConfig = returned_ffi_config.to_extension().unwrap();
281+
282+
// check default value
283+
assert!(my_config.foo_to_bar);
284+
285+
// check overwritten value
286+
assert_eq!(my_config.baz_count, 42);
287+
}
288+
}

0 commit comments

Comments
 (0)