Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ The bump arena suits ArduinoJson's allocation pattern: during a parse the varian
├─ Process close/disconnect events (on_connection_lost)
├─ Process hello events (handshake completion, handoff decisions)
├─ Call loop() on current and pending connections
└─ Check hello retry timer
└─ Check per-connection hello retry timers

2. time_burst_->loop(conn) (skipped when no current connection)
├─ Send next time message if ready
Expand Down Expand Up @@ -394,7 +394,7 @@ Both are `std::shared_ptr<SendspinConnection>`, and on the ESP server path they
### Handshake and Handoff

1. A new connection (outbound or inbound) is started. If a current connection exists, the new one becomes `pending_connection_`.
2. The connection sends a CLIENT_HELLO. Retry with exponential backoff (100 ms base, 3 attempts).
2. The connection sends a CLIENT_HELLO. Retry with exponential backoff (100 ms base, 3 attempts). Each managed connection has its own retry entry in `ConnectionManager::hello_retries_`, so a handoff candidate arriving mid-handshake cannot clobber the current connection's pending hello (and vice versa).
3. SERVER_HELLO arrives on the network thread → enqueued into mutex-protected vector.
4. Main loop processes the hello event:
- Stores server info on the connection.
Expand Down Expand Up @@ -422,7 +422,7 @@ When a connection is lost (`on_connection_lost`):

`disconnect_and_release()` calls `conn->disconnect(reason, nullptr)` and lets the local `shared_ptr` go out of scope.

- **ESP server**: the goodbye text is queued as an httpd worker job; the worker looks the connection up via `httpd_sess_get_ctx`, sends the frame, then runs the completion lambda that calls `trigger_close()`. The session slot installed in `open_callback` keeps the connection alive across that whole sequence even after `ConnectionManager`'s observer `shared_ptr` is dropped. The session is finally freed when httpd invokes the slot's `free_fn` (see [Server connection ownership (ESP)](#server-connection-ownership-esp)). The completion lambda captures a `weak_ptr` to make this lifetime explicit — `trigger_close()` is skipped if the conn has already been freed.
- **ESP server**: the goodbye text is queued as an httpd worker job. The worker resolves the connection by `lock()`ing the `weak_ptr` captured in the queued arg when the goodbye was enqueued; if it resolves it sends the frame, then runs the completion lambda that calls `trigger_close()`. The session slot installed in `open_callback` keeps the connection alive across that whole sequence even after `ConnectionManager`'s observer `shared_ptr` is dropped. The session is finally freed when httpd invokes the slot's `free_fn` (see [Server connection ownership (ESP)](#server-connection-ownership-esp)). The completion lambda also captures a `weak_ptr` to make this lifetime explicit — `trigger_close()` is skipped if the conn has already been freed. Goodbye is one of the two messages that pass `allow_before_hello=true`, so it is not blocked by the pre-hello send gate (a rejected connection is told to leave before it ever sends a hello).
- **Host client**: the IXWebSocket send is synchronous, so the goodbye and close have both completed by the time `disconnect()` returns and the `shared_ptr` drops the last reference.

### Server connection ownership (ESP)
Expand All @@ -431,10 +431,12 @@ On the ESP build, `SendspinServerConnection` lifetime is pinned to the httpd ses

1. `SendspinWsServer::open_callback` (the httpd `open_fn`) creates the `shared_ptr<SendspinServerConnection>`, heap-allocates a `shared_ptr*` slot, and calls `httpd_sess_set_ctx(handle, sockfd, slot, free_fn)` with a deleter that `delete`s the slot. That slot is the authoritative reference.
2. The same shared_ptr is forwarded into `ConnectionManager::on_new_connection`, which stores it in `current_connection_` or `pending_connection_` as a *secondary observer*.
3. The httpd WebSocket handler (`websocket_handler`) and queued workers (`async_send_text`, `async_send_time_text`) look the connection up by `httpd_sess_get_ctx(handle, sockfd)` at run time, copying the slot's `shared_ptr` for the duration of their work. They never assume the manager's observer slot is alive.
3. The httpd WebSocket handler (`websocket_handler`) looks the connection up by `httpd_sess_get_ctx(handle, sockfd)` at run time, copying the slot's `shared_ptr` for the duration of its work; it never assumes the manager's observer slot is alive. The queued send workers (`async_send_text`, `async_send_time_text`) instead capture a `weak_ptr<SendspinServerConnection>` to the originating connection and `lock()` it when they run.
4. When the socket closes, httpd calls the `close_fn` first (which fires `connection_closed_callback_` so `ConnectionManager` can drop its observer in the next `loop()`), then later calls the slot's `free_fn` to release the authoritative reference once no workers are queued for that session.

Queued workers carry only a `{httpd_handle_t, int sockfd}` pair (`AsyncRespArg` for text sends, `SessionLookup` for time sends) rather than capturing the connection directly. If the session has already been torn down by the time a worker runs, the lookup returns null and the worker no-ops cleanly. Both arg structs are allocated through `platform_malloc` / `platform_malloc_internal` and freed in the worker.
Queued send workers capture a `weak_ptr<SendspinServerConnection>` to the originating connection — `AsyncRespArg` for text sends, `SessionLookup` for time sends — and `lock()` it when the worker runs. This is deliberately **not** a `{httpd_handle_t, int sockfd}` pair: identifying the target by sockfd risked binding to a *different* connection that had recycled the same fd after the original closed, sending a frame to the wrong peer. The `weak_ptr` resolves to the exact connection that queued the work, or to null if it has since been destroyed, in which case the worker no-ops cleanly. Because these structs now hold non-trivial members (the `weak_ptr`, and `AsyncRespArg`'s completion `std::function`), they are constructed with placement-new and explicitly destroyed before `platform_free` rather than treated as POD. Both are allocated through `platform_malloc` / `platform_malloc_internal`.

The send workers also enforce the protocol's "hello is always first" rule: a frame is dropped unless `client_hello_sent_` is set on the resolved connection, *unless* the caller passed `allow_before_hello=true`. Exactly two callers do — the `client/hello` itself (which would otherwise gate its own send and deadlock) and `goodbye` — so a stale or out-of-order frame can never precede the handshake. The `weak_ptr` guards identity; the gate guards ordering; the two are independent.

The host build does not need this scheme: `SendspinWsServer` (host) routes IXWebSocket messages by calling `find_connection_callback_` to resolve a synthetic sockfd back to the connection that `ConnectionManager` is holding. The ESP build keeps the `set_find_connection_callback()` setter as a no-op stub for symmetry; see the comment at the call site in `ConnectionManager::init_server`.

Expand Down
5 changes: 4 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ void SendspinConnection::init_time_filter() {

SsErr SendspinConnection::send_goodbye_reason(SendspinGoodbyeReason reason,
SendCompleteCallback on_complete) {
return this->send_text_message(format_client_goodbye_message(reason), std::move(on_complete));
// Goodbye is a control message that may legitimately be sent before the client/hello (e.g.,
// when rejecting an excess connection), so it bypasses the pre-hello send gate.
return this->send_text_message(format_client_goodbye_message(reason), std::move(on_complete),
/*allow_before_hello=*/true);
}

// ============================================================================
Expand Down
26 changes: 20 additions & 6 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,19 @@ class SendspinConnection : public std::enable_shared_from_this<SendspinConnectio
}

/// @brief Sends a text message to the server with a completion callback
/// @param msg The message string to send.
/// @param cb Callback invoked after send completes.
/// @param message The message string to send.
/// @param cb Callback invoked with the send result. On asynchronous transports it is not
/// guaranteed to fire: if the connection is torn down before the queued send runs, or
/// the message is dropped by the pre-hello gate, the callback is skipped. Treat it as a
/// best-effort completion notification, not an unconditional "send finished" signal.
/// @param allow_before_hello If true, the message may be sent before the client/hello has been
/// sent on this connection (used for the hello itself and for goodbye). If false (the
/// default), platform transports that send asynchronously drop the message when no
/// client/hello has been sent yet, preserving the "hello is always first" protocol
/// invariant. Synchronous transports ignore this flag.
/// @return SsErr::OK if queued successfully, error code otherwise.
virtual SsErr send_text_message(const std::string& message, SendCompleteCallback cb) = 0;
virtual SsErr send_text_message(const std::string& message, SendCompleteCallback cb,
bool allow_before_hello = false) = 0;

/// @brief Sends a client/time synchronization message
///
Expand Down Expand Up @@ -362,8 +371,10 @@ class SendspinConnection : public std::enable_shared_from_this<SendspinConnectio

// 8-bit fields

/// Hello handshake state.
bool client_hello_sent_{false};
/// Hello handshake state. Atomic because it is set from the send-completion callback (the httpd
/// worker thread on ESP) and the disconnect handlers (network thread), while
/// is_handshake_complete() and the pre-hello send gate read it from other threads.
std::atomic<bool> client_hello_sent_{false};

/// true if the current message being assembled is text, false if binary
/// Needed because WebSocket continuation frames do not carry the original frame type
Expand All @@ -375,7 +386,10 @@ class SendspinConnection : public std::enable_shared_from_this<SendspinConnectio

/// Time message state.
bool pending_time_message_{false};
bool server_hello_received_{false};

/// Atomic for the same reason as client_hello_sent_: written on network threads, read from the
/// main loop via is_handshake_complete().
std::atomic<bool> server_hello_received_{false};

/// Memory placement preference for `websocket_payload_` allocations (ESP-IDF only).
MemoryLocation websocket_payload_location_{MemoryLocation::PREFER_EXTERNAL};
Expand Down
136 changes: 90 additions & 46 deletions src/connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ static constexpr int64_t HELLO_INITIAL_DELAY_US = HELLO_INITIAL_DELAY_MS * US_PE
ConnectionManager::ConnectionManager(SendspinClient* client) : client_(client) {}

ConnectionManager::~ConnectionManager() {
{
std::lock_guard<std::mutex> lock(this->conn_ptr_mutex_);
this->current_connection_.reset();
this->pending_connection_.reset();
}
this->hello_retry_.conn.reset();
std::lock_guard<std::mutex> lock(this->conn_ptr_mutex_);
this->current_connection_.reset();
this->pending_connection_.reset();
// Clear under the same lock that guards every other access to hello_retries_, keeping the
// locking discipline uniform.
this->hello_retries_.clear();
}

// ============================================================================
Expand Down Expand Up @@ -225,28 +225,41 @@ void ConnectionManager::loop() {
pending_copy->loop();
}

// Check hello retry timer
if (this->hello_retry_.retry_time_us > 0 &&
platform_time_us() >= this->hello_retry_.retry_time_us) {
this->hello_retry_.retry_time_us = 0;

// Check hello retry timers (one entry per managed connection, so a second connection arriving
// mid-handshake cannot clobber the first connection's pending hello).
{
std::lock_guard<std::mutex> lock(this->conn_ptr_mutex_);
// Verify connection is still valid
if (this->hello_retry_.conn == this->current_connection_ ||
this->hello_retry_.conn == this->pending_connection_) {
if (!this->send_hello_message(this->hello_retry_.attempts - 1,
this->hello_retry_.conn.get())) {
// Transient failure - retry with exponential backoff
if (this->hello_retry_.attempts > 1) {
this->hello_retry_.delay_ms *= 2;
this->hello_retry_.attempts--;
this->hello_retry_.retry_time_us =
platform_time_us() +
static_cast<int64_t>(this->hello_retry_.delay_ms) * US_PER_MS;
}
const int64_t now_us = platform_time_us();
for (auto it = this->hello_retries_.begin(); it != this->hello_retries_.end();) {
HelloRetryState& retry = *it;

// Drop retries whose connection is no longer managed.
if (retry.conn != this->current_connection_ &&
retry.conn != this->pending_connection_) {
it = this->hello_retries_.erase(it);
continue;
}

if (retry.retry_time_us == 0 || now_us < retry.retry_time_us) {
++it;
continue;
}

if (this->send_hello_message(retry.attempts - 1, retry.conn.get())) {
// Sent successfully (or connection no longer valid) - retry is complete.
it = this->hello_retries_.erase(it);
continue;
}

// Transient failure - retry with exponential backoff until attempts are exhausted.
if (retry.attempts > 1) {
retry.delay_ms *= 2;
retry.attempts--;
retry.retry_time_us = now_us + static_cast<int64_t>(retry.delay_ms) * US_PER_MS;
++it;
} else {
it = this->hello_retries_.erase(it);
}
} else {
this->hello_retry_.conn.reset();
}
}
}
Expand Down Expand Up @@ -343,11 +356,40 @@ void ConnectionManager::on_new_connection(std::shared_ptr<SendspinServerConnecti

void ConnectionManager::initiate_hello(SendspinConnection* conn) {
// Note: caller must hold conn_ptr_mutex_
// Set up retry state: initial delay, 3 attempts
this->hello_retry_.conn = conn->shared_from_this();
this->hello_retry_.delay_ms = HelloRetryState::INITIAL_RETRY_DELAY_MS;
this->hello_retry_.attempts = 3;
this->hello_retry_.retry_time_us = platform_time_us() + HELLO_INITIAL_DELAY_US;
// Arm a per-connection hello retry: initial delay, 3 attempts. If an entry for this connection
// already exists (a duplicate connected event for the same connection would land here twice),
// re-arm it in place instead of pushing a second one, so a connection never gets two timers.
auto conn_sp = conn->shared_from_this();
const int64_t retry_time_us = platform_time_us() + HELLO_INITIAL_DELAY_US;

for (auto& retry : this->hello_retries_) {
if (retry.conn == conn_sp) {
retry.delay_ms = HelloRetryState::INITIAL_RETRY_DELAY_MS;
retry.attempts = 3;
retry.retry_time_us = retry_time_us;
return;
}
}

HelloRetryState retry;
retry.conn = std::move(conn_sp);
retry.delay_ms = HelloRetryState::INITIAL_RETRY_DELAY_MS;
retry.attempts = 3;
retry.retry_time_us = retry_time_us;
this->hello_retries_.push_back(std::move(retry));
}

void ConnectionManager::remove_hello_retry(SendspinConnection* conn) {
// Note: caller must hold conn_ptr_mutex_
// Safe to call unconditionally: a no-op if conn never had a retry entry (e.g. a connection
// rejected before initiate_hello, or one whose hello already sent and cleared its entry).
for (auto it = this->hello_retries_.begin(); it != this->hello_retries_.end();) {
if (it->conn.get() == conn) {
it = this->hello_retries_.erase(it);
} else {
++it;
}
}
}

bool ConnectionManager::send_hello_message(uint8_t remaining_attempts, SendspinConnection* conn) {
Expand All @@ -364,13 +406,16 @@ bool ConnectionManager::send_hello_message(uint8_t remaining_attempts, SendspinC

std::string hello_message = this->client_->build_hello_message();

SsErr err = conn->send_text_message(hello_message, [conn](bool success) {
if (success) {
conn->set_client_hello_sent(true);
} else {
SS_LOGW(TAG, "Hello message send failed");
}
});
SsErr err = conn->send_text_message(
hello_message,
[conn](bool success) {
if (success) {
conn->set_client_hello_sent(true);
} else {
SS_LOGW(TAG, "Hello message send failed");
}
},
/*allow_before_hello=*/true);

if (err == SsErr::OK) {
return true; // Successfully queued
Expand Down Expand Up @@ -400,10 +445,7 @@ void ConnectionManager::on_connection_lost(SendspinConnection* conn) {
conn->disable_message_dispatch();
this->client_->time_burst_->reset();
this->client_->cleanup_connection_state();
if (this->hello_retry_.conn.get() == conn) {
this->hello_retry_.conn.reset();
this->hello_retry_.retry_time_us = 0;
}
this->remove_hello_retry(conn);
this->current_connection_.reset();

if (this->pending_connection_ != nullptr) {
Expand All @@ -412,10 +454,7 @@ void ConnectionManager::on_connection_lost(SendspinConnection* conn) {
}
} else if (this->pending_connection_ != nullptr && this->pending_connection_.get() == conn) {
SS_LOGD(TAG, "Pending connection lost");
if (this->hello_retry_.conn.get() == conn) {
this->hello_retry_.conn.reset();
this->hello_retry_.retry_time_us = 0;
}
this->remove_hello_retry(conn);
this->pending_connection_.reset();
}
}
Expand Down Expand Up @@ -468,6 +507,10 @@ void ConnectionManager::complete_handoff(bool switch_to_new) {
this->client_->cleanup_connection_state();
auto old_current = std::move(this->current_connection_);
this->current_connection_ = std::move(this->pending_connection_);
// Drop any retry entry now that this connection is leaving the managed set, mirroring
// on_connection_lost. (loop() also prunes orphaned entries, but removing it here closes
// the window where a retry could fire against an already-handed-off connection.)
this->remove_hello_retry(old_current.get());
this->disconnect_and_release(std::move(old_current),
SendspinGoodbyeReason::ANOTHER_SERVER);
} else {
Expand All @@ -476,6 +519,7 @@ void ConnectionManager::complete_handoff(bool switch_to_new) {
} else {
SS_LOGD(TAG, "Completing handoff: keeping current server");
if (this->pending_connection_ != nullptr) {
this->remove_hello_retry(this->pending_connection_.get());
this->disconnect_and_release(std::move(this->pending_connection_),
SendspinGoodbyeReason::ANOTHER_SERVER);
}
Expand Down
Loading
Loading