Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ __pycache__

# misc
.vscode
.mcp.json
.claude
CLAUDE.md
.kli/**
110 changes: 76 additions & 34 deletions lightbug_http/server.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,60 @@ fn handle_connection[
break


struct SyncExecutor[T: HTTPService](Movable):
"""Single-threaded executor: handles each connection to completion before accepting the next.

This is the default executor used by `Server.serve`. It processes connections
sequentially — receive, parse, dispatch to handler, respond — then loops back
to accept the next connection.

Parameters:
T: The HTTP service type that handles incoming requests.
"""

var provision_pool: ProvisionPool
var config: ServerConfig
var server_address: String
var tcp_keep_alive: Bool

fn __init__(out self, config: ServerConfig, server_address: String, tcp_keep_alive: Bool):
self.provision_pool = ProvisionPool(config.max_connections, config)
self.config = config.copy()
self.server_address = server_address
self.tcp_keep_alive = tcp_keep_alive

fn execute(mut self, var conn: TCPConnection[NetworkType.tcp4], mut handler: Self.T):
var index: Int
try:
index = self.provision_pool.borrow()
except:
try:
conn^.teardown()
except:
pass
return

try:
handle_connection(
conn,
self.provision_pool.provisions[index],
handler,
self.config,
self.server_address,
self.tcp_keep_alive,
)
except:
pass
finally:
try:
conn^.teardown()
except:
pass
self.provision_pool.provisions[index].prepare_for_new_request()
self.provision_pool.provisions[index].keepalive_count = 0
self.provision_pool.release(index)


struct Server(Movable):
"""HTTP/1.1 Server implementation."""

Expand Down Expand Up @@ -543,7 +597,7 @@ struct Server(Movable):
raise server_err^

fn serve[T: HTTPService](self, ln: NoTLSListener[NetworkType.tcp4], mut handler: T) raises ServerError:
"""Serve HTTP requests from an existing listener.
"""Serve HTTP requests from an existing listener using the default single-threaded executor.

Parameters:
T: The type of HTTPService that handles incoming requests.
Expand All @@ -555,46 +609,34 @@ struct Server(Movable):
Raises:
ServerError: If accept fails or critical connection handling errors occur.
"""
var provision_pool = ProvisionPool(self.config.max_connections, self.config)
var executor = SyncExecutor[T](self.config, self._address, self.tcp_keep_alive)
self.serve_with_executor(ln, handler, executor)

fn serve_with_executor[
T: HTTPService,
](self, ln: NoTLSListener[NetworkType.tcp4], mut handler: T, mut executor: SyncExecutor[T]) raises ServerError:
"""Serve HTTP requests using a custom executor for connection dispatch.

Use this method to provide a custom execution model such as a thread pool
or async runtime for handling connections concurrently.

Parameters:
T: The type of HTTPService that handles incoming requests.
Args:
ln: TCP server that listens for incoming connections.
handler: An object that handles incoming HTTP requests.
executor: The executor that dispatches each accepted connection.

Raises:
ServerError: If accept fails or critical connection handling errors occur.
"""
while True:
var conn: TCPConnection[NetworkType.tcp4]
try:
conn = ln.accept()
except listener_err:
raise listener_err^

var index: Int
try:
index = provision_pool.borrow()
except provision_err:
# Pool exhausted - close the connection and continue
try:
conn^.teardown()
except:
pass
continue

try:
handle_connection(
conn,
provision_pool.provisions[index],
handler,
self.config,
self.address(),
self.tcp_keep_alive,
)
except socket_err:
# Connection handling failed - just close the connection
pass
finally:
try:
conn^.teardown()
except:
pass
provision_pool.provisions[index].prepare_for_new_request()
provision_pool.provisions[index].keepalive_count = 0
provision_pool.release(index)
executor.execute(conn^, handler)


fn _send_error_response(mut conn: TCPConnection[NetworkType.tcp4], var response: HTTPResponse):
Expand Down
Loading