Skip to content

Commit bd2472c

Browse files
Separate payload validation and sending
1 parent 30cc1f2 commit bd2472c

2 files changed

Lines changed: 298 additions & 164 deletions

File tree

src/net.rs

Lines changed: 87 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ use std::io::{self, IoSlice};
88
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};
99
#[cfg(unix)]
1010
use std::os::fd::AsRawFd;
11-
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering as AtomOrdering};
12-
use std::time::{Duration, Instant};
11+
use std::sync::atomic::{AtomicU16, Ordering as AtomOrdering};
12+
use std::time::Duration;
1313

1414
#[cfg(unix)]
15-
const DEST_ADDR_REQUIRED: i32 = libc::EDESTADDRREQ;
15+
pub const DEST_ADDR_REQUIRED: i32 = libc::EDESTADDRREQ;
1616
#[cfg(windows)]
17-
const DEST_ADDR_REQUIRED: i32 = 10039; // WSAEDESTADDRREQ
17+
pub const DEST_ADDR_REQUIRED: i32 = 10039; // WSAEDESTADDRREQ
1818

1919
const WORD_LO_U32: u32 = 0x0000_FFFF;
2020
const SWAP_LO_U32: u32 = 0xFF00_FF00;
@@ -25,6 +25,14 @@ const SWAP_HI_U64: u64 = 0x00FF_00FF_00FF_00FF;
2525
static REQUEST_ICMP_SEQ: AtomicU16 = AtomicU16::new(0);
2626
static REPLY_ICMP_SEQ: AtomicU16 = AtomicU16::new(0);
2727

28+
pub struct ValidatedPayload<'a> {
29+
src_is_icmp: bool,
30+
src_seq: u16,
31+
dst_proto: SupportedProtocol,
32+
payload: &'a [u8],
33+
pub len: usize,
34+
}
35+
2836
#[inline(always)]
2937
const fn be16_16(b0: u8, b1: u8) -> u16 {
3038
b1 as u16 | ((b0 as u16) << 8)
@@ -199,23 +207,14 @@ fn make_icmp_socket(domain: Domain, proto: Protocol, force_raw: bool) -> io::Res
199207
}
200208
}
201209

202-
pub fn send_payload(
210+
#[inline]
211+
pub fn validate_payload<'a>(
203212
c2u: bool,
204-
worker_id: usize,
205-
t_start: Instant,
206-
t_recv: Instant,
207213
cfg: &Config,
208214
stats: &Stats,
209-
last_seen: &AtomicU64,
210-
sock: &Socket,
211-
buf: &[u8],
212-
sock_connected: bool,
213-
sock_type: Type,
214-
dest_sa: &SockAddr,
215-
dest_port_id: u16,
215+
buf: &'a [u8],
216216
recv_port_id: u16,
217-
log_drops: bool,
218-
) -> bool {
217+
) -> io::Result<ValidatedPayload<'a>> {
219218
// Determine source/destination protocol for this direction once.
220219
let (src_proto, dst_proto) = if c2u {
221220
(cfg.listen_proto, cfg.upstream_proto)
@@ -232,126 +231,107 @@ pub fn send_payload(
232231
_ => (false, true, buf, recv_port_id, 0u16, c2u),
233232
};
234233

235-
// Size check on the normalized payload.
236-
let len = payload.len();
237-
238-
// Handle forwarding errors
239234
if !icmp_success {
240-
log_debug_dir!(
241-
log_drops,
242-
worker_id,
243-
c2u,
244-
"Dropping packet: Invalid or truncated ICMP Echo header"
245-
);
235+
// Parsed as ICMP but the Echo header is missing or truncated.
246236
stats.drop_err(c2u);
247-
return true;
248-
} else if c2u != src_is_req || src_ident != recv_port_id {
237+
return Err(io::Error::new(
238+
io::ErrorKind::InvalidData,
239+
"invalid ICMP Echo header (missing or truncated)",
240+
));
241+
}
242+
243+
if c2u != src_is_req || src_ident != recv_port_id {
249244
// If this is the client->upstream direction and we received an ICMP Echo *reply* or
250245
// upstream->client and we received an ICMP Echo *request*, drop it to avoid feedback loops.
251246
// Also, ignore all packets with the wrong identity field.
247+
// This is a validation failure, not an I/O error on the socket.
248+
return Err(io::Error::new(
249+
io::ErrorKind::InvalidData,
250+
"ICMP Echo direction or identity mismatch",
251+
));
252+
}
252253

253-
// Not an error; just ignore replies from the client side.
254-
log_debug_dir!(
255-
log_drops,
256-
worker_id,
257-
c2u,
258-
"Dropping packet: Wrong direction or identity"
259-
);
260-
return true;
261-
} else if len > cfg.max_payload {
262-
log_debug_dir!(
263-
log_drops,
264-
worker_id,
265-
c2u,
266-
"Dropping packet: {} bytes exceeds max {}",
267-
len,
268-
cfg.max_payload
269-
);
254+
let len = payload.len();
255+
if len == 0 || len > cfg.max_payload {
256+
// Payload is well-formed but does not fit within configured bounds.
270257
stats.drop_oversize(c2u);
271-
return true;
258+
return Err(io::Error::new(
259+
io::ErrorKind::InvalidData,
260+
format!(
261+
"payload length {} is zero or exceeds max {}",
262+
len, cfg.max_payload
263+
),
264+
));
272265
}
273266

267+
Ok(ValidatedPayload {
268+
src_is_icmp,
269+
src_seq,
270+
dst_proto,
271+
payload,
272+
len,
273+
})
274+
}
275+
276+
pub fn send_payload(
277+
c2u: bool,
278+
validated: &ValidatedPayload<'_>,
279+
sock: &Socket,
280+
sock_connected: bool,
281+
sock_type: Type,
282+
dest_sa: &SockAddr,
283+
dest_port_id: u16,
284+
) -> io::Result<bool> {
274285
// Update ICMP reply sequence when we receive a request
275-
if src_is_icmp && c2u {
276-
REPLY_ICMP_SEQ.store(src_seq, AtomOrdering::Relaxed);
286+
if validated.src_is_icmp && c2u {
287+
REPLY_ICMP_SEQ.store(validated.src_seq, AtomOrdering::Relaxed);
277288
}
278289

279-
// Send according to destination protocol and connection state.
280-
let mut dest_addr_okay = true;
281-
let send_res = match dst_proto {
290+
// Send according to destination protocol and connection state
291+
let send_res = match validated.dst_proto {
282292
SupportedProtocol::ICMP => send_icmp_echo(
283293
sock,
284294
sock_connected,
285295
sock_type,
286296
dest_sa,
287297
dest_port_id,
288298
!c2u,
289-
payload,
299+
validated.payload,
290300
),
291301
_ => {
292302
if sock_connected {
293-
sock.send(payload)
303+
sock.send(validated.payload)
294304
} else {
295-
sock.send_to(payload, dest_sa)
305+
sock.send_to(validated.payload, dest_sa)
296306
}
297307
}
298308
};
299309

300-
if let Ok(_) = send_res {
301-
last_seen.store(Stats::dur_ns(t_start, t_recv), AtomOrdering::Relaxed);
302-
if cfg.stats_interval_mins != 0 {
303-
let t_send = Instant::now();
304-
stats.send_add(c2u, len as u64, t_recv, t_send);
305-
}
306-
} else if sock_connected && is_dest_addr_required(&send_res) {
307-
dest_addr_okay = false;
308-
309-
// Try sending one more time
310-
let retry_res = match dst_proto {
311-
SupportedProtocol::ICMP => {
312-
send_icmp_echo(sock, false, sock_type, dest_sa, dest_port_id, !c2u, payload)
313-
}
314-
_ => sock.send_to(payload, dest_sa),
315-
};
316-
317-
match retry_res {
318-
Ok(_) => {
319-
last_seen.store(Stats::dur_ns(t_start, t_recv), AtomOrdering::Relaxed);
320-
if cfg.stats_interval_mins != 0 {
321-
let t_send = Instant::now();
322-
stats.send_add(c2u, len as u64, t_recv, t_send);
323-
}
324-
}
325-
Err(e) => {
326-
log_debug_dir!(
327-
log_drops,
328-
worker_id,
329-
c2u,
330-
"send_to on dest_sa '{:?}' error: {}",
331-
dest_sa.as_socket(),
332-
e
333-
);
334-
stats.drop_err(c2u);
310+
match send_res {
311+
Ok(_) => Ok(true),
312+
Err(e) if sock_connected && e.raw_os_error() == Some(DEST_ADDR_REQUIRED) => {
313+
// Try sending one more time
314+
let retry_res = match validated.dst_proto {
315+
SupportedProtocol::ICMP => send_icmp_echo(
316+
sock,
317+
false,
318+
sock_type,
319+
dest_sa,
320+
dest_port_id,
321+
!c2u,
322+
validated.payload,
323+
),
324+
_ => sock.send_to(validated.payload, dest_sa),
325+
};
326+
327+
match retry_res {
328+
// Propagate DEST_ADDR_REQUIRED and dest_addr_okay=false to the handler to update socket connection status
329+
Ok(_) => Ok(false),
330+
Err(retry_err) => Err(retry_err),
335331
}
336332
}
337-
} else if let Err(e) = send_res {
338-
log_debug_dir!(
339-
log_drops,
340-
worker_id,
341-
c2u,
342-
"{} on dest_sa '{:?}' error: {}",
343-
if sock_connected { "send" } else { "send_to" },
344-
dest_sa.as_socket(),
345-
e
346-
);
347-
stats.drop_err(c2u);
333+
Err(e) => Err(e),
348334
}
349-
dest_addr_okay
350-
}
351-
352-
#[inline]
353-
fn is_dest_addr_required(res: &io::Result<usize>) -> bool {
354-
matches!(res, Err(e) if e.raw_os_error() == Some(DEST_ADDR_REQUIRED))
355335
}
356336

357337
/// Some OSes (notably Linux for IPv4 raw sockets) deliver the full IP header

0 commit comments

Comments
 (0)