From dcefa2ecbf46d4be5935afe3308cb032bcb81500 Mon Sep 17 00:00:00 2001 From: turuslan Date: Wed, 21 Jan 2026 13:17:47 +0500 Subject: [PATCH] gossip async validate messages --- .../03-gossip/gossip_anonymous_example.cpp | 6 +- example/03-gossip/gossip_chat_example.cpp | 2 +- include/libp2p/protocol/gossip/config.hpp | 9 ++ include/libp2p/protocol/gossip/gossip.hpp | 15 ++- include/libp2p/protocol/gossip/score.hpp | 61 ++++++++++ src/protocol/gossip/gossip.cpp | 112 +++++++++++++++--- 6 files changed, 179 insertions(+), 26 deletions(-) diff --git a/example/03-gossip/gossip_anonymous_example.cpp b/example/03-gossip/gossip_anonymous_example.cpp index d29646a1..f4e467fe 100644 --- a/example/03-gossip/gossip_anonymous_example.cpp +++ b/example/03-gossip/gossip_anonymous_example.cpp @@ -77,8 +77,7 @@ int main(int argc, char **argv) { libp2p::injector::useTransportAdaptors< libp2p::transport::QuicTransport>()); - auto io_context = - injector.create>(); + auto io_context = injector.create>(); auto host = injector.create>(); auto gossip = injector.create>(); @@ -99,7 +98,7 @@ int main(int argc, char **argv) { }); // Subscribe to topic - auto topic = gossip->subscribe("example"); + auto topic = gossip->subscribe("example", false); // Receiver task libp2p::coroSpawn(*io_context, [&]() -> libp2p::Coro { @@ -133,4 +132,3 @@ int main(int argc, char **argv) { io_context->run(); return EXIT_SUCCESS; } - diff --git a/example/03-gossip/gossip_chat_example.cpp b/example/03-gossip/gossip_chat_example.cpp index 8f53e955..3ab9b491 100644 --- a/example/03-gossip/gossip_chat_example.cpp +++ b/example/03-gossip/gossip_chat_example.cpp @@ -95,7 +95,7 @@ int main(int argc, char **argv) { }); // Subscribe to a simple topic named "example". - auto topic = gossip->subscribe("example"); + auto topic = gossip->subscribe("example", false); // Receiver task: prints all messages arriving on the topic. libp2p::coroSpawn(*io_context, [&]() -> libp2p::Coro { diff --git a/include/libp2p/protocol/gossip/config.hpp b/include/libp2p/protocol/gossip/config.hpp index 7925bc2a..31ea5576 100644 --- a/include/libp2p/protocol/gossip/config.hpp +++ b/include/libp2p/protocol/gossip/config.hpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace libp2p::protocol::gossip { using TopicHash = Bytes; @@ -51,6 +52,14 @@ namespace libp2p::protocol::gossip { std::optional signature; std::optional received_from; + std::optional message_id; + std::unordered_set duplicate_peers; + }; + + enum class ValidationResult { + Accept, + Reject, + Ignore, }; using MessageIdFn = std::function; diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 36e7e8f9..2a7aeabf 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -118,6 +118,7 @@ namespace libp2p::protocol::gossip { */ struct MessageCacheEntry { MessagePtr message; + bool validated; std::unordered_map iwant; }; @@ -146,7 +147,7 @@ namespace libp2p::protocol::gossip { public: /** Receive next message published to this topic (awaitable). */ CoroOutcome receive(); - CoroOutcome receiveMessage(); + CoroOutcome receiveMessage(); /** Publish a payload to this topic (signed locally). */ void publish(BytesIn message); /** Count outbound peers currently in the mesh. */ @@ -154,8 +155,9 @@ namespace libp2p::protocol::gossip { std::weak_ptr weak_gossip_; TopicHash topic_hash_; + bool validate_ = false; bool publish_only_ = false; - CoroOutcomeChannel receive_channel_; + CoroOutcomeChannel receive_channel_; History history_; TopicBackoff backoff_; std::unordered_set peers_; // all peers subscribed @@ -259,9 +261,10 @@ namespace libp2p::protocol::gossip { void start(); /** Subscribe locally to a topic by hash (creates Topic if new). */ - std::shared_ptr subscribe(TopicHash topic_hash); + std::shared_ptr subscribe(TopicHash topic_hash, bool validate); /** Subscribe locally to a topic by string name. */ - std::shared_ptr subscribe(std::string_view topic_hash); + std::shared_ptr subscribe(std::string_view topic_hash, + bool validate); /** * Publish message to topic. @@ -272,13 +275,14 @@ namespace libp2p::protocol::gossip { /** Publish a payload to a topic (signed, deduped, broadcast). */ void publish(Topic &topic, BytesIn data); + void validate(MessagePtr message, ValidationResult result); + /** * Broadcast a message to peers following mesh/flood rules and optional * IDONTWANT side channel. */ void broadcast(Topic &topic, std::optional from, - const MessageId &message_id, const MessagePtr &message); /** Decode and process an inbound RPC frame from a peer. */ @@ -327,6 +331,7 @@ namespace libp2p::protocol::gossip { const gossipsub::pb::RPC &pb_message); std::shared_ptr getOrCreateTopic(const TopicHash &topic_hash, + bool validate, bool publish_only); // Dependencies and state diff --git a/include/libp2p/protocol/gossip/score.hpp b/include/libp2p/protocol/gossip/score.hpp index c533dcb5..8327e544 100644 --- a/include/libp2p/protocol/gossip/score.hpp +++ b/include/libp2p/protocol/gossip/score.hpp @@ -19,6 +19,22 @@ namespace libp2p::protocol::gossip::score { constexpr std::chrono::seconds kTimeCacheDuration{120}; + /// The reason a Gossipsub message has been rejected. + enum class RejectReason { + /// The message failed the configured validation during decoding. + ValidationError, + /// The message source is us. + SelfOrigin, + /// The peer that sent the message was blacklisted. + BlackListedPeer, + /// The source (from field) of the message was blacklisted. + BlackListedSource, + /// The validation was ignored. + ValidationIgnored, + /// The validation failed. + ValidationFailed, + }; + struct DeliveryStatusUnknown {}; struct DeliveryStatusValid { Time time; @@ -226,6 +242,51 @@ namespace libp2p::protocol::gossip::score { } } + void rejectInvalidMessage(const PeerId &from, const TopicHash &topic_hash) { + markInvalidMessageDelivery(from, topic_hash); + } + + // Reject a message. + void rejectMessage(const PeerId &from, + const MessageId &message_id, + const TopicHash &topic_hash, + RejectReason reason) { + // these messages are not tracked, but the peer is penalized as they are + // invalid + if (reason == RejectReason::ValidationError + or reason == RejectReason::SelfOrigin) { + rejectInvalidMessage(from, topic_hash); + return; + } + // we ignore those messages, so do nothing. + if (reason == RejectReason::BlackListedPeer + or reason == RejectReason::BlackListedSource) { + return; + } + auto &record = deliveries_.getOrDefault(message_id); + // Multiple peers can now reject the same message as we track which peers + // send us the message. If we have already updated the status, return. + if (not std::holds_alternative(record.status)) { + return; + } + if (reason == RejectReason::ValidationIgnored) { + // we were explicitly instructed by the validator to ignore the message + // but not penalize the peer + record.status = DeliveryStatusIgnored{}; + record.peers.clear(); + return; + } + // mark the message as invalid and penalize peers that have already + // forwarded it. + record.status = DeliveryStatusInvalid{}; + // release the delivery time tracking map to free some memory early + auto peers = std::exchange(record.peers, {}); + markInvalidMessageDelivery(from, topic_hash); + for (auto &peer_id : peers) { + markInvalidMessageDelivery(peer_id, topic_hash); + } + } + void connect(const PeerId &peer_id) { peer_stats_[peer_id].expires_at.reset(); } diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 78c6514f..bd471c72 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -185,10 +185,10 @@ namespace libp2p::protocol::gossip { // Receive next message payload from a subscribed topic (for local consumer). CoroOutcome Topic::receive() { BOOST_OUTCOME_CO_TRY(auto message, co_await receiveMessage()); - co_return message.data; + co_return message->data; } - CoroOutcome Topic::receiveMessage() { + CoroOutcome Topic::receiveMessage() { co_return co_await receive_channel_.receive(); } @@ -390,17 +390,20 @@ namespace libp2p::protocol::gossip { // Subscribe locally to a topic: create Topic, announce to peers, and seed // mesh. - std::shared_ptr Gossip::subscribe(TopicHash topic_hash) { - return getOrCreateTopic(topic_hash, false); + std::shared_ptr Gossip::subscribe(TopicHash topic_hash, + bool validate) { + return getOrCreateTopic(topic_hash, validate, false); } std::shared_ptr Gossip::getOrCreateTopic(const TopicHash &topic_hash, + bool validate, bool publish_only) { auto topic_it = topics_.find(topic_hash); if (topic_it == topics_.end()) { auto topic = std::make_shared(Topic{ weak_from_this(), topic_hash, + false, publish_only, {*io_context_}, {config_.history_length}, @@ -414,6 +417,9 @@ namespace libp2p::protocol::gossip { } } auto &topic = topic_it->second; + if (validate) { + topic->validate_ = true; + } // upgrade publish only topic to subscribed topic if (not publish_only and topic->publish_only_) { topic->publish_only_ = false; @@ -446,12 +452,13 @@ namespace libp2p::protocol::gossip { return topic; } - std::shared_ptr Gossip::subscribe(std::string_view topic_hash) { - return subscribe(qtils::ByteVec(qtils::str2byte(topic_hash))); + std::shared_ptr Gossip::subscribe(std::string_view topic_hash, + bool validate) { + return subscribe(qtils::ByteVec(qtils::str2byte(topic_hash)), validate); } void Gossip::publish(const TopicHash &topic_hash, BytesIn data) { - auto topic = getOrCreateTopic(topic_hash, true); + auto topic = getOrCreateTopic(topic_hash, false, true); publish(*topic, data); } @@ -484,12 +491,58 @@ namespace libp2p::protocol::gossip { } } - auto message_id = config_.message_id_fn(*message); + message->message_id = config_.message_id_fn(*message); + auto &message_id = message->message_id.value(); if (duplicate_cache_.contains(message_id)) { return; } duplicate_cache_.insert(message_id); - broadcast(topic, std::nullopt, message_id, message); + message_cache_.emplace(message_id, + MessageCacheEntry{ + .message = message, + .validated = true, + }); + topic.history_.add(message_id); + broadcast(topic, std::nullopt, message); + } + + void Gossip::validate(MessagePtr message, ValidationResult result) { + auto topic_it = topics_.find(message->topic); + if (topic_it == topics_.end()) { + return; + } + auto &topic = topic_it->second; + if (not topic->validate_) { + return; + } + auto &message_id = message->message_id.value(); + auto cache_it = message_cache_.find(message_id); + if (cache_it == message_cache_.end()) { + return; + } + if (cache_it->second.validated) { + return; + } + if (result == ValidationResult::Accept) { + cache_it->second.validated = true; + broadcast(*topic, message->received_from.value(), message); + return; + } + message_cache_.erase(cache_it); + std::optional reject_reason; + if (result == ValidationResult::Reject) { + reject_reason = score::RejectReason::ValidationFailed; + } else if (result == ValidationResult::Ignore) { + reject_reason = score::RejectReason::ValidationIgnored; + } + auto reject = [&](const PeerId &peer_id) { + score_.rejectMessage( + peer_id, message_id, message->topic, reject_reason.value()); + }; + reject(message->received_from.value()); + for (auto &peer_id : message->duplicate_peers) { + reject(peer_id); + } } // Inbound RPC handler: subscriptions, publish, and control messages. @@ -600,15 +653,27 @@ namespace libp2p::protocol::gossip { continue; } auto &topic = topic_it->second; - auto message_id = config_.message_id_fn(*message); + message->message_id = config_.message_id_fn(*message); + auto &message_id = message->message_id.value(); if (not duplicate_cache_.insert(message_id)) { score_.duplicateMessage(peer->peer_id_, message_id, message->topic); + auto cache_it = message_cache_.find(message_id); + if (cache_it != message_cache_.end()) { + cache_it->second.message->duplicate_peers.emplace(peer->peer_id_); + } continue; } score_.validateMessage(peer->peer_id_, message_id, message->topic); - topic->receive_channel_.send(*message); - score_.deliver_message(peer->peer_id_, message_id, message->topic); - broadcast(*topic, peer->peer_id_, message_id, message); + message_cache_.emplace(message_id, + MessageCacheEntry{ + .message = message, + .validated = not topic->validate_, + }); + topic->history_.add(message_id); + if (not topic->validate_) { + broadcast(*topic, peer->peer_id_, message); + } + topic->receive_channel_.send(message); } // Handle GRAFT: accept (add to mesh) or PRUNE with backoff. @@ -689,6 +754,9 @@ namespace libp2p::protocol::gossip { auto message_id = qtils::ByteVec(qtils::str2byte(pb_message)); auto cache_it = message_cache_.find(message_id); if (cache_it != message_cache_.end()) { + if (not cache_it->second.validated) { + continue; + } auto &count = cache_it->second.iwant[peer->peer_id_]; ++count; if (count > config_.gossip_retransimission) { @@ -719,10 +787,12 @@ namespace libp2p::protocol::gossip { // Fanout to peers with mesh/flood rules and optional IDONTWANT side channel. void Gossip::broadcast(Topic &topic, std::optional from, - const MessageId &message_id, const MessagePtr &message) { - message_cache_.emplace(message_id, MessageCacheEntry{message}); - topic.history_.add(message_id); + auto &message_id = message->message_id.value(); + if (message->received_from.has_value()) { + score_.deliver_message( + message->received_from.value(), message_id, message->topic); + } gossip_promises_.remove(message_id); auto publish = not from.has_value(); @@ -747,6 +817,12 @@ namespace libp2p::protocol::gossip { if (message->from == peer->peer_id_) { return; } + if (message->received_from == peer->peer_id_) { + return; + } + if (message->duplicate_peers.contains(peer->peer_id_)) { + return; + } if (score_.below(peer->peer_id_, config_.score.publish_threshold)) { return; } @@ -1107,6 +1183,10 @@ namespace libp2p::protocol::gossip { void Gossip::emit_gossip() { for (auto &[topic_hash, topic] : topics_) { auto message_ids = topic->history_.get(config_.history_gossip); + qtils::retainIf(message_ids, [&](MessageId message_id) { + auto cache_it = message_cache_.find(message_id); + return cache_it != message_cache_.end() and cache_it->second.validated; + }); if (message_ids.empty()) { continue; }