Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/docs/connector.md"))]

use crate::{
ConnectorFallible, ConnectorResult, Input, Output, ffi::NativeConnector,
ConnectorFallible, ConnectorResult, Input, Output, ffi::FfiConnector,
result::ErrorKind,
};
use std::{
Expand Down Expand Up @@ -99,7 +99,7 @@ pub struct Connector {
name: String,

/// The native connector instance, protected by a RwLock for thread-safe access.
native: RwLock<NativeConnector>,
native: RwLock<FfiConnector>,

/// Thread-safe holders for Input entities.
inputs: ThreadSafeEntityHolder<InputRecord>,
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Connector {
static VERSION_STRING: &str = env!("CARGO_PKG_VERSION");

let (ndds_build_id_string, rtiddsconnector_build_id_string) =
NativeConnector::get_build_versions().unwrap_or((
FfiConnector::get_build_versions().unwrap_or((
"<Unknown RTI Connext version>".to_string(),
"<Unknown RTI Connector for Rust version>".to_string(),
));
Expand All @@ -149,22 +149,22 @@ impl Connector {

/// Get the last error message from the underlying RTI Connector C API.
pub(crate) fn get_last_error_message() -> Option<String> {
NativeConnector::get_last_error_message()
FfiConnector::get_last_error_message()
}

/// Create a new [`Connector`] from a named configuration contained
/// in an external XML file.
pub fn new(config_name: &str, config_file: &str) -> ConnectorResult<Connector> {
static NATIVE_CONNECTOR_CREATION_LOCK: Mutex<()> = Mutex::new(());

let native: NativeConnector = {
let native: FfiConnector = {
let _guard = NATIVE_CONNECTOR_CREATION_LOCK
.lock()
.inspect_err(|_| {
eprintln!("An error occurred while trying to lock the global native connector creation lock, continuing anyway...");
})
.unwrap_or_else(|poisoned| poisoned.into_inner());
NativeConnector::new(config_name, config_file)?
FfiConnector::new(config_name, config_file)?
};

Ok(Connector {
Expand Down Expand Up @@ -254,10 +254,10 @@ impl Connector {
self.outputs.release_entity(name)
}

/// Get immutable access to the [`NativeConnector`] (for read operations)
/// Get immutable access to the [`FfiConnector`] (for read operations)
pub(crate) fn native_ref(
&self,
) -> ConnectorResult<std::sync::RwLockReadGuard<'_, NativeConnector>> {
) -> ConnectorResult<std::sync::RwLockReadGuard<'_, FfiConnector>> {
self.native.read().map_err(|_| {
ErrorKind::lock_poisoned_error(
"Another thread panicked while holding the native connector lock",
Expand All @@ -266,10 +266,10 @@ impl Connector {
})
}

/// Get mutable access to the [`NativeConnector`] (for write operations)
/// Get mutable access to the [`FfiConnector`] (for write operations)
pub(crate) fn native_mut(
&self,
) -> ConnectorResult<std::sync::RwLockWriteGuard<'_, NativeConnector>> {
) -> ConnectorResult<std::sync::RwLockWriteGuard<'_, FfiConnector>> {
self.native.write().map_err(|_| {
ErrorKind::lock_poisoned_error(
"Another thread panicked while holding the native connector lock",
Expand Down
58 changes: 27 additions & 31 deletions src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use rtiddsconnector::ReturnCode;

use crate::result::ErrorKind;
use rtiddsconnector::{ConnectorIndex, NativeAllocatedString, NativeStringTrait};
use std::ffi::CString;
use std::{ffi::CString, ptr::NonNull};

/// Helper for converting a [`std::ffi::NulError`] into a [`ConnectorError`][crate::ConnectorError]
impl From<std::ffi::NulError> for crate::ConnectorError {
Expand Down Expand Up @@ -67,12 +67,12 @@ impl GlobalsDropGuard {

/// Newtype wrappers for native Sample pointers
#[allow(unused)]
pub struct NativeSample(*const rtiddsconnector::SamplePtr);
pub struct FfiSample(NonNull<rtiddsconnector::OpaqueSample>);

/// Newtype wrappers for native DataReader pointers
pub struct NativeInput(*const rtiddsconnector::DataReaderPtr);
pub struct FfiInput(NonNull<rtiddsconnector::OpaqueDataReader>);

impl NativeInput {
impl FfiInput {
pub fn wait_for_matched_publication(
&self,
timeout: Option<i32>,
Expand Down Expand Up @@ -106,9 +106,9 @@ impl NativeInput {
}

/// Newtype wrappers for native DataWriter pointers
pub struct NativeOutput(*const rtiddsconnector::DataWriterPtr);
pub struct FfiOutput(NonNull<rtiddsconnector::OpaqueDataWriter>);

impl NativeOutput {
impl FfiOutput {
pub fn wait_for_matched_subscription(
&self,
timeout: Option<i32>,
Expand Down Expand Up @@ -145,55 +145,52 @@ impl NativeOutput {
}

/// Newtype wrappers for native Connector pointers
pub struct NativeConnector(*const rtiddsconnector::ConnectorPtr);
pub struct FfiConnector(NonNull<rtiddsconnector::OpaqueConnector>);

impl Drop for NativeConnector {
impl Drop for FfiConnector {
fn drop(&mut self) {
if let Err(e) = self.delete() {
eprintln!("ERROR: failed to delete native participant: {}", e);
}
}
}

impl NativeConnector {
impl FfiConnector {
pub fn new(
connector_name: &str,
config_file: &str,
) -> crate::ConnectorResult<NativeConnector> {
) -> crate::ConnectorResult<FfiConnector> {
let config_name = CString::new(connector_name)?;
let config_file = CString::new(config_file)?;

unsafe {
NonNull::new(unsafe {
rtiddsconnector::RTI_Connector_new(
config_name.as_ptr(),
config_file.as_ptr(),
&rtiddsconnector::ConnectorOptions::default(),
)
.as_ref()
}
.map(|ptr| NativeConnector(ptr))
})
.map(FfiConnector)
.ok_or_else(|| ErrorKind::entity_not_found_error(connector_name).into())
}

pub fn get_output(&self, output_name: &str) -> crate::ConnectorResult<NativeOutput> {
pub fn get_output(&self, output_name: &str) -> crate::ConnectorResult<FfiOutput> {
let entity_name = CString::new(output_name)?;

unsafe {
NonNull::new(unsafe {
rtiddsconnector::RTI_Connector_get_datawriter(self.0, entity_name.as_ptr())
.as_ref()
}
.map(|ptr| NativeOutput(ptr))
})
.map(FfiOutput)
.ok_or_else(|| ErrorKind::entity_not_found_error(output_name).into())
}

pub fn get_input(&self, input_name: &str) -> crate::ConnectorResult<NativeInput> {
pub fn get_input(&self, input_name: &str) -> crate::ConnectorResult<FfiInput> {
let entity_name = CString::new(input_name)?;

unsafe {
NonNull::new(unsafe {
rtiddsconnector::RTI_Connector_get_datareader(self.0, entity_name.as_ptr())
.as_ref()
}
.map(|ptr| NativeInput(ptr))
})
.map(FfiInput)
.ok_or_else(|| ErrorKind::entity_not_found_error(input_name).into())
}

Expand All @@ -202,25 +199,24 @@ impl NativeConnector {
&self,
output_name: &str,
index: usize,
) -> crate::ConnectorResult<NativeSample> {
) -> crate::ConnectorResult<FfiSample> {
let entity_name: CString = CString::new(output_name)?;
let index: ConnectorIndex = index.try_into()?;

unsafe {
NonNull::new(unsafe {
rtiddsconnector::RTI_Connector_get_native_sample(
self.0,
entity_name.as_ptr(),
index,
)
.as_ref()
}
.map(|ptr| NativeSample(ptr))
})
.map(FfiSample)
.ok_or_else(|| ErrorKind::entity_not_found_error(output_name).into())
}

fn delete(&mut self) -> crate::ConnectorFallible {
InvokeResult::never_fails(|| unsafe {
rtiddsconnector::RTI_Connector_delete(self.0)
rtiddsconnector::RTI_Connector_delete(self.0.as_ptr())
})
.into()
}
Expand Down Expand Up @@ -649,7 +645,7 @@ impl NativeConnector {
pub fn get_native_instance(
&self,
entity_name: &str,
) -> crate::ConnectorResult<*const rtiddsconnector::SamplePtr> {
) -> crate::ConnectorResult<*const rtiddsconnector::OpaqueSample> {
unimplemented!();
}

Expand Down
Loading