Skip to content

Commit fce8ed9

Browse files
committed
Track client addresses and add banned IPs
Attach client SocketAddr to connections and requests, enable per-server banned IP list, and improve worker wait/shutdown handling. - engine.rs: store banned_ips in Hteapot, add add_banned_ip(), initialize banned_ips in constructors, accept listener (stream, addr), drop connections from banned addresses, store (TcpStream, SocketAddr) in the worker pool, persist addr in SocketData, and use HttpRequestBuilder::new_with_addr() when creating a SocketStatus. Also refine cvar.wait_while logic to honor the shutdown signal while waiting. - request.rs: add addr: SocketAddr to HttpRequest, add new_with_addr() to HttpRequestBuilder to populate the request addr, and initialize default addr values in builders. These changes let the server know the remote address for each request (so handlers can inspect or log it) and allow banning specific client IPs before they enter the worker pool. The wait logic change ensures worker threads properly observe shutdown while waiting for new connections.
1 parent 62a08cf commit fce8ed9

5 files changed

Lines changed: 65 additions & 44 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hteapot"
3-
version = "0.6.5"
3+
version = "0.6.6"
44
edition = "2024"
55
authors = ["Alb Ruiz G. <me@albruiz.dev>"]
66
description = "HTeaPot is a lightweight HTTP server library designed to be easy to use and extend."

src/hteapot/engine.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::VecDeque;
22
use std::io::{self, Read, Write};
3-
use std::net::{Shutdown, TcpListener, TcpStream};
3+
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
44

55
use std::sync::atomic::{AtomicBool, Ordering};
66
use std::sync::{Arc, Condvar, Mutex};
@@ -29,6 +29,7 @@ pub struct Hteapot {
2929
port: u16,
3030
address: String,
3131
threads: u16,
32+
banned_ips: Vec<SocketAddr>,
3233
shutdown_signal: Option<Arc<AtomicBool>>,
3334
shutdown_hooks: Vec<Arc<dyn Fn() + Send + Sync + 'static>>,
3435
}
@@ -52,6 +53,7 @@ struct SocketStatus {
5253
/// Wraps a TCP stream and its associated state.
5354
struct SocketData {
5455
stream: TcpStream,
56+
_addr: SocketAddr,
5557
status: Option<SocketStatus>,
5658
}
5759

@@ -64,6 +66,10 @@ impl Hteapot {
6466
self.shutdown_signal.clone()
6567
}
6668

69+
pub fn add_banned_ip(&mut self, ip: SocketAddr) {
70+
self.banned_ips.push(ip);
71+
}
72+
6773
pub fn add_shutdown_hook<F>(&mut self, hook: F)
6874
where
6975
F: Fn() + Send + Sync + 'static,
@@ -81,6 +87,7 @@ impl Hteapot {
8187
port,
8288
address: address.to_string(),
8389
threads: 1,
90+
banned_ips: Vec::new(),
8491
shutdown_signal: None,
8592
shutdown_hooks: Vec::new(),
8693
}
@@ -91,6 +98,7 @@ impl Hteapot {
9198
port,
9299
address: address.to_string(),
93100
threads: if threads == 0 { 1 } else { threads },
101+
banned_ips: Vec::new(),
94102
shutdown_signal: None,
95103
shutdown_hooks: Vec::new(),
96104
}
@@ -110,7 +118,7 @@ impl Hteapot {
110118
}
111119
};
112120

113-
let pool: Arc<(Mutex<VecDeque<TcpStream>>, Condvar)> =
121+
let pool: Arc<(Mutex<VecDeque<(TcpStream, SocketAddr)>>, Condvar)> =
114122
Arc::new((Mutex::new(VecDeque::new()), Condvar::new()));
115123
let priority_list: Arc<Mutex<Vec<usize>>> =
116124
Arc::new(Mutex::new(vec![0; self.threads as usize]));
@@ -120,10 +128,10 @@ impl Hteapot {
120128
let shutdown_signal = self.shutdown_signal.clone();
121129
let shutdown_hooks = Arc::new(self.shutdown_hooks.clone());
122130

123-
for thread_index in 0..self.threads {
131+
for _thread_index in 0..self.threads {
124132
let pool_clone = pool.clone();
125133
let action_clone = arc_action.clone();
126-
let priority_list_clone = priority_list.clone();
134+
let _priority_list_clone = priority_list.clone();
127135
let shutdown_signal_clone = shutdown_signal.clone();
128136

129137
thread::spawn(move || {
@@ -132,28 +140,33 @@ impl Hteapot {
132140
{
133141
let (lock, cvar) = &*pool_clone;
134142
let mut pool = lock.lock().expect("Error locking pool");
143+
135144
if streams_to_handle.is_empty() {
136145
// Store the returned guard back into pool
137-
pool = cvar
138-
.wait_while(pool, |pool| pool.is_empty())
139-
.expect("Error waiting on cvar");
140-
}
141-
//TODO: move this to allow process the last request
142-
if let Some(signal) = &shutdown_signal_clone {
143-
if !signal.load(Ordering::SeqCst) {
144-
break; // Exit the server loop
145-
}
146+
pool = if let Some(signal) = &shutdown_signal_clone {
147+
if !signal.load(Ordering::SeqCst) {
148+
break;
149+
}
150+
cvar.wait_while(pool, |pool| {
151+
pool.is_empty() && signal.load(Ordering::SeqCst)
152+
})
153+
.expect("Error waiting on cvar")
154+
} else {
155+
cvar.wait_while(pool, |pool| pool.is_empty())
156+
.expect("Error waiting on cvar")
157+
};
146158
}
147159

148-
while let Some(stream) = pool.pop_back() {
160+
while let Some((stream, addr)) = pool.pop_back() {
149161
let socket_status = SocketStatus {
150162
ttl: Instant::now(),
151163
status: Status::Read,
152164
response: Box::new(EmptyHttpResponse {}),
153-
request: HttpRequestBuilder::new(),
165+
request: HttpRequestBuilder::new_with_addr(addr),
154166
index_writed: 0,
155167
};
156168
let socket_data = SocketData {
169+
_addr: addr,
157170
stream,
158171
status: Some(socket_status),
159172
};
@@ -190,10 +203,15 @@ impl Hteapot {
190203
break;
191204
}
192205
}
193-
let stream = match listener.accept() {
194-
Ok((stream, _)) => stream,
206+
207+
let (stream, addr) = match listener.accept() {
208+
Ok((stream, addr)) => (stream, addr),
195209
Err(_) => continue,
196210
};
211+
if self.banned_ips.contains(&addr) {
212+
let _ = stream.shutdown(Shutdown::Both);
213+
continue;
214+
}
197215

198216
if stream.set_nonblocking(true).is_err() {
199217
eprintln!("Error setting non-blocking mode on stream");
@@ -209,7 +227,7 @@ impl Hteapot {
209227
let mut pool = lock.lock().expect("Error locking pool");
210228

211229
// Add the connection to the pool for the least-loaded thread
212-
pool.push_front(stream);
230+
pool.push_front((stream, addr));
213231
cvar.notify_one();
214232
}
215233
}

src/hteapot/request.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use super::HttpHeaders;
1111
use super::HttpMethod;
1212
use std::hash::Hash;
13+
use std::net::SocketAddr;
1314
use std::{cmp::min, collections::HashMap, net::TcpStream, str};
1415

1516
const MAX_HEADER_SIZE: usize = 1024 * 16;
@@ -25,6 +26,7 @@ pub struct HttpRequest {
2526
pub args: HashMap<String, String>,
2627
pub headers: HttpHeaders,
2728
pub body: Vec<u8>,
29+
pub addr: SocketAddr,
2830
stream: Option<TcpStream>,
2931
}
3032

@@ -60,6 +62,8 @@ impl HttpRequest {
6062
args: HashMap::new(),
6163
headers: HttpHeaders::new(),
6264
body: Vec::new(),
65+
addr: SocketAddr::from(([0, 0, 0, 0], 0)),
66+
6367
stream: None,
6468
};
6569
}
@@ -71,6 +75,8 @@ impl HttpRequest {
7175
path: String::new(),
7276
args: HashMap::new(),
7377
headers: HttpHeaders::new(),
78+
addr: SocketAddr::from(([0, 0, 0, 0], 0)),
79+
7480
body: Vec::new(),
7581
stream: None,
7682
}
@@ -83,6 +89,7 @@ impl HttpRequest {
8389
path: self.path.clone(),
8490
args: self.args.clone(),
8591
headers: self.headers.clone(),
92+
addr: self.addr.clone(),
8693
body: self.body.clone(),
8794
stream: None,
8895
};
@@ -135,6 +142,25 @@ impl HttpRequestBuilder {
135142
args: HashMap::new(),
136143
headers: HttpHeaders::new(),
137144
body: Vec::new(),
145+
addr: SocketAddr::from(([0, 0, 0, 0], 0)),
146+
stream: None,
147+
},
148+
chunked: false,
149+
state: State::Init,
150+
body_size: 0,
151+
buffer: Vec::new(),
152+
};
153+
}
154+
155+
pub fn new_with_addr(addr: SocketAddr) -> Self {
156+
return HttpRequestBuilder {
157+
request: HttpRequest {
158+
method: HttpMethod::GET,
159+
path: String::new(),
160+
args: HashMap::new(),
161+
headers: HttpHeaders::new(),
162+
body: Vec::new(),
163+
addr,
138164
stream: None,
139165
},
140166
chunked: false,
@@ -152,30 +178,6 @@ impl HttpRequestBuilder {
152178
}
153179
}
154180

155-
/// Reads bytes into the request body based on `Content-Length`.
156-
fn read_body_len(&mut self) -> Option<()> {
157-
let body_left = self.body_size.saturating_sub(self.request.body.len());
158-
159-
let body_left = self.body_size.saturating_sub(self.request.body.len());
160-
161-
if body_left > 0 {
162-
return None;
163-
} else {
164-
return Some(());
165-
}
166-
}
167-
168-
/// Placeholder for future support of chunked body parsing.
169-
fn _read_body_chunk(&mut self) -> Option<()> {
170-
//TODO: this will support chunked body in the future
171-
todo!()
172-
}
173-
174-
/// Main entry point for reading the request body.
175-
fn read_body(&mut self) -> Option<()> {
176-
return self.read_body_len();
177-
}
178-
179181
pub fn done(&self) -> bool {
180182
self.state == State::Finish
181183
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ fn main() {
218218

219219
let response = handlers.get_handler(&ctx);
220220
if response.is_none() {
221+
logger.error("No handler found for request".to_string());
221222
return HttpResponse::new(HttpStatus::InternalServerError, "content", None);
222223
}
223224
let response = response.unwrap().run(&mut ctx);

0 commit comments

Comments
 (0)