Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions datadog-ipc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ fn gen_handler_trait(
quote! {
fn #name(
&self,
peer: datadog_ipc::PeerCredentials,
#(#params),*
) -> impl ::std::future::Future<Output = #ret> + Send + '_;
}
Expand All @@ -223,6 +222,9 @@ fn gen_handler_trait(
/// The serve loop uses this to track received payloads.
fn recv_counter(&self) -> &::std::sync::atomic::AtomicU64;

/// Storage for the connection to read from.
fn connection(&self) -> &datadog_ipc::ipc_server::OwnedServerConn;

#(#handler_methods)*
}
}
Expand Down Expand Up @@ -260,29 +262,29 @@ fn gen_serve_fn(
quote! {
#[cfg(target_os = "linux")]
if __pending_acks > 0 {
datadog_ipc::send_acks_async(&async_fd, __pending_acks).await;
datadog_ipc::send_acks_async(handler.connection().async_conn(), __pending_acks).await;
__pending_acks = 0;
}
let result = handler.#name(peer, #(#field_names),*).await;
let result = handler.#name(#(#field_names),*).await;
let __resp_data = datadog_ipc::codec::encode(&result);
datadog_ipc::send_raw_async(&async_fd, &__resp_data).await.ok();
datadog_ipc::send_raw_async(handler.connection().async_conn(), &__resp_data).await.ok();
}
} else {
// On Linux, buffer up to 20 acks and flush in a single
// sendmmsg(2) syscall; on other platforms send each ack immediately.
quote! {
handler.#name(peer, #(#field_names),*).await;
handler.#name(#(#field_names),*).await;
#[cfg(target_os = "linux")]
{
__pending_acks += 1;
if #force_flush || __pending_acks >= datadog_ipc::ACK_BUFFER_SIZE {
datadog_ipc::send_acks_async(&async_fd, __pending_acks).await;
datadog_ipc::send_acks_async(handler.connection().async_conn(), __pending_acks).await;
__pending_acks = 0;
}
}
#[cfg(not(target_os = "linux"))]
// 1-byte ack: distinguishable from EOF (0 bytes from recvmsg on closed socket).
datadog_ipc::send_raw_async(&async_fd, &[0u8]).await.ok();
datadog_ipc::send_raw_async(handler.connection().async_conn(), &[0u8]).await.ok();
}
};

Expand All @@ -296,23 +298,14 @@ fn gen_serve_fn(

quote! {
pub async fn #serve_fn<H: #trait_name>(
conn: datadog_ipc::SeqpacketConn,
handler: ::std::sync::Arc<H>,
) {
let peer = conn.peer_credentials().unwrap_or_default();
let async_fd = match conn.into_async_conn() {
Ok(fd) => fd,
Err(e) => {
::tracing::error!("IPC serve: into_async_conn failed: {e}");
return;
}
};
// Pending 1-byte acks for fire-and-forget methods, flushed via sendmmsg(2) on Linux.
#[cfg(target_os = "linux")]
let mut __pending_acks: u32 = 0;
loop {
let (mut req, fds) = match datadog_ipc::recv_raw_async(
&async_fd,
&handler.connection().async_conn(),
|buf| datadog_ipc::codec::decode::<#enum_name>(buf),
).await {
Ok((Ok(req), fds)) => (req, fds),
Expand All @@ -334,7 +327,7 @@ fn gen_serve_fn(
break;
}
let recv_counter = handler.recv_counter().load(::std::sync::atomic::Ordering::Relaxed) + 1;
::tracing::trace!(recv_counter, ?req, pid = peer.pid, "IPC recv");
::tracing::trace!(recv_counter, ?req, pid = handler.connection().peer().pid, "IPC recv");

match req {
#(#match_arms)*
Expand Down
67 changes: 34 additions & 33 deletions datadog-ipc/src/example_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use super::platform::{FileBackedHandle, PlatformHandle, ShmHandle};
use crate::ipc_server::OwnedServerConn;

extern crate self as datadog_ipc;

Expand All @@ -30,6 +31,7 @@ pub trait ExampleInterface {
async fn echo_len(payload: Vec<u8>) -> u32;
}

/// Shared server state. Cloned into a per-connection [`ExampleConnectionHandler`] on accept.
#[derive(Default, Clone)]
pub struct ExampleServer {
req_cnt: Arc<AtomicU64>,
Expand All @@ -38,70 +40,69 @@ pub struct ExampleServer {

impl ExampleServer {
pub async fn accept_connection(self, conn: crate::SeqpacketConn) {
serve_example_interface_connection(conn, Arc::new(self)).await
let connection = match OwnedServerConn::new(conn) {
Ok(c) => c,
Err(e) => {
::tracing::error!("ExampleServer: failed to set up connection: {e}");
return;
}
};
serve_example_interface_connection(Arc::new(ExampleConnectionHandler {
server: self,
connection,
}))
.await
}
}

impl ExampleInterface for ExampleServer {
/// Per-connection handler: owns the connection and serves requests received on it.
struct ExampleConnectionHandler {
server: ExampleServer,
connection: OwnedServerConn,
}

impl ExampleInterface for ExampleConnectionHandler {
fn recv_counter(&self) -> &AtomicU64 {
&self.req_cnt
&self.server.req_cnt
}

fn notify(
&self,
_peer: datadog_ipc::PeerCredentials,
) -> impl std::future::Future<Output = ()> + Send + '_ {
fn connection(&self) -> &OwnedServerConn {
&self.connection
}

fn notify(&self) -> impl std::future::Future<Output = ()> + Send + '_ {
std::future::ready(())
}

fn ping(
&self,
_peer: datadog_ipc::PeerCredentials,
) -> impl std::future::Future<Output = ()> + Send + '_ {
fn ping(&self) -> impl std::future::Future<Output = ()> + Send + '_ {
std::future::ready(())
}

fn time_now(
&self,
_peer: datadog_ipc::PeerCredentials,
) -> impl std::future::Future<Output = Duration> + Send + '_ {
fn time_now(&self) -> impl std::future::Future<Output = Duration> + Send + '_ {
std::future::ready(Instant::now().elapsed())
}

fn req_cnt(
&self,
_peer: datadog_ipc::PeerCredentials,
) -> impl std::future::Future<Output = u32> + Send + '_ {
std::future::ready(self.req_cnt.load(Ordering::Relaxed) as u32)
fn req_cnt(&self) -> impl std::future::Future<Output = u32> + Send + '_ {
std::future::ready(self.server.req_cnt.load(Ordering::Relaxed) as u32)
}

fn store_file(
&self,
_peer: datadog_ipc::PeerCredentials,
file: PlatformHandle<File>,
) -> impl std::future::Future<Output = ()> + Send + '_ {
#[allow(clippy::unwrap_used)]
self.stored_files.lock().unwrap().push(file);
self.server.stored_files.lock().unwrap().push(file);
std::future::ready(())
}

async fn shm_sum(
&self,
_peer: datadog_ipc::PeerCredentials,
handle: ShmHandle,
len: usize,
) -> u64 {
async fn shm_sum(&self, handle: ShmHandle, len: usize) -> u64 {
match handle.map() {
Ok(mapped) => mapped.as_slice()[..len].iter().map(|&b| b as u64).sum(),
Err(_) => u64::MAX,
}
}

fn echo_len(
&self,
_peer: datadog_ipc::PeerCredentials,
payload: Vec<u8>,
) -> impl std::future::Future<Output = u32> + Send + '_ {
fn echo_len(&self, payload: Vec<u8>) -> impl std::future::Future<Output = u32> + Send + '_ {
std::future::ready(payload.len() as u32)
}
}
32 changes: 32 additions & 0 deletions datadog-ipc/src/ipc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::{PeerCredentials, SeqpacketConn};
Comment thread
bwoebi marked this conversation as resolved.
use std::io;

pub struct OwnedServerConn {
connection: crate::AsyncConn,
peer: PeerCredentials,
}

impl OwnedServerConn {
pub fn new(conn: SeqpacketConn) -> io::Result<Self> {
let peer = conn.peer_credentials().unwrap_or_default();
let connection = conn.into_async_conn()?;
Ok(Self { connection, peer })
}

/// Construct from an already-async connection and a known peer. Useful for callers that have
/// already wrapped the fd (and for tests).
pub fn from_async(connection: crate::AsyncConn, peer: PeerCredentials) -> Self {
Self { connection, peer }
}

pub fn async_conn(&self) -> &crate::AsyncConn {
&self.connection
}

pub fn peer(&self) -> &PeerCredentials {
&self.peer
}
}
2 changes: 2 additions & 0 deletions datadog-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub mod shm_stats;
mod atomic_option;
pub mod client;
pub mod codec;
pub mod ipc_server;

pub use atomic_option::AtomicOption;

pub use client::IpcClientConn;
Expand Down
16 changes: 0 additions & 16 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use datadog_live_debugger::debugger_defs::DebuggerPayload;
use datadog_sidecar::agent_remote_config::{new_reader, reader_from_shm, AgentRemoteConfigWriter};
use datadog_sidecar::config;
use datadog_sidecar::config::LogMethod;
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::telemetry::InternalTelemetryAction;
use datadog_sidecar::service::{
Expand Down Expand Up @@ -1614,21 +1613,6 @@ pub extern "C" fn ddog_sidecar_reconnect(
transport.reconnect(|| unsafe { factory() });
}

/// Return the path of the crashtracker unix domain socket.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi::CharSlice<'static>
{
let socket_path = crashtracker_unix_socket_path();
let str = socket_path.to_str().unwrap_or_default();

let size = str.len();
let malloced = libc::malloc(size) as *mut u8;
let buf = slice::from_raw_parts_mut(malloced, size);
buf.copy_from_slice(str.as_bytes());
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}

/// Gets an agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl AppSecConfig {
pub struct FromEnv {}

impl FromEnv {
fn ipc_mode() -> IpcMode {
pub fn ipc_mode() -> IpcMode {
let mode = std::env::var(ENV_SIDECAR_IPC_MODE).unwrap_or_default();

match mode.as_str() {
Expand Down
Loading
Loading