Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions example/03-gossip/gossip_anonymous_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ int main(int argc, char **argv) {
libp2p::injector::useTransportAdaptors<
libp2p::transport::QuicTransport>());

auto io_context =
injector.create<std::shared_ptr<boost::asio::io_context>>();
auto io_context = injector.create<std::shared_ptr<boost::asio::io_context>>();
auto host = injector.create<std::shared_ptr<libp2p::host::BasicHost>>();
auto gossip =
injector.create<std::shared_ptr<libp2p::protocol::gossip::Gossip>>();
Expand All @@ -99,7 +98,7 @@ int main(int argc, char **argv) {
});

// Subscribe to topic
auto topic = gossip->subscribe("example");
auto topic = gossip->subscribe("example", false);

// Receiver task
libp2p::coroSpawn(*io_context, [&]() -> libp2p::Coro<void> {
Expand Down Expand Up @@ -133,4 +132,3 @@ int main(int argc, char **argv) {
io_context->run();
return EXIT_SUCCESS;
}

2 changes: 1 addition & 1 deletion example/03-gossip/gossip_chat_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ int main(int argc, char **argv) {
});

// Subscribe to a simple topic named "example".
auto topic = gossip->subscribe("example");
auto topic = gossip->subscribe("example", false);

// Receiver task: prints all messages arriving on the topic.
libp2p::coroSpawn(*io_context, [&]() -> libp2p::Coro<void> {
Expand Down
9 changes: 9 additions & 0 deletions include/libp2p/protocol/gossip/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <libp2p/peer/stream_protocols.hpp>
#include <qtils/bytes_std_hash.hpp>
#include <unordered_map>
#include <unordered_set>

namespace libp2p::protocol::gossip {
using TopicHash = Bytes;
Expand Down Expand Up @@ -51,6 +52,14 @@ namespace libp2p::protocol::gossip {
std::optional<Bytes> signature;

std::optional<PeerId> received_from;
std::optional<MessageId> message_id;
std::unordered_set<PeerId> duplicate_peers;
};

enum class ValidationResult {
Accept,
Reject,
Ignore,
};

using MessageIdFn = std::function<MessageId(const Message &)>;
Expand Down
15 changes: 10 additions & 5 deletions include/libp2p/protocol/gossip/gossip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ namespace libp2p::protocol::gossip {
*/
struct MessageCacheEntry {
MessagePtr message;
bool validated;
std::unordered_map<PeerId, size_t> iwant;
};

Expand Down Expand Up @@ -147,16 +148,17 @@ namespace libp2p::protocol::gossip {
public:
/** Receive next message published to this topic (awaitable). */
CoroOutcome<Bytes> receive();
CoroOutcome<Message> receiveMessage();
CoroOutcome<MessagePtr> receiveMessage();
/** Publish a payload to this topic (signed locally). */
void publish(BytesIn message);
/** Count outbound peers currently in the mesh. */
size_t meshOutCount() const;

std::weak_ptr<Gossip> weak_gossip_;
TopicHash topic_hash_;
bool validate_ = false;
bool publish_only_ = false;
CoroOutcomeChannel<Message> receive_channel_;
CoroOutcomeChannel<MessagePtr> receive_channel_;
History history_;
TopicBackoff backoff_;
std::unordered_set<PeerPtr> peers_; // all peers subscribed
Expand Down Expand Up @@ -260,9 +262,10 @@ namespace libp2p::protocol::gossip {
void start();

/** Subscribe locally to a topic by hash (creates Topic if new). */
std::shared_ptr<Topic> subscribe(TopicHash topic_hash);
std::shared_ptr<Topic> subscribe(TopicHash topic_hash, bool validate);
/** Subscribe locally to a topic by string name. */
std::shared_ptr<Topic> subscribe(std::string_view topic_hash);
std::shared_ptr<Topic> subscribe(std::string_view topic_hash,
bool validate);

/**
* Publish message to topic.
Expand All @@ -273,13 +276,14 @@ namespace libp2p::protocol::gossip {
/** Publish a payload to a topic (signed, deduped, broadcast). */
void publish(Topic &topic, BytesIn data);

void validate(MessagePtr message, ValidationResult result);

/**
* Broadcast a message to peers following mesh/flood rules and optional
* IDONTWANT side channel.
*/
void broadcast(Topic &topic,
std::optional<PeerId> from,
const MessageId &message_id,
const MessagePtr &message);

/** Decode and process an inbound RPC frame from a peer. */
Expand Down Expand Up @@ -328,6 +332,7 @@ namespace libp2p::protocol::gossip {
const gossipsub::pb::RPC &pb_message);

std::shared_ptr<Topic> getOrCreateTopic(const TopicHash &topic_hash,
bool validate,
bool publish_only);

// Dependencies and state
Expand Down
61 changes: 61 additions & 0 deletions include/libp2p/protocol/gossip/score.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ namespace libp2p::protocol::gossip::score {

constexpr std::chrono::seconds kTimeCacheDuration{120};

/// The reason a Gossipsub message has been rejected.
enum class RejectReason {
/// The message failed the configured validation during decoding.
ValidationError,
/// The message source is us.
SelfOrigin,
/// The peer that sent the message was blacklisted.
BlackListedPeer,
/// The source (from field) of the message was blacklisted.
BlackListedSource,
/// The validation was ignored.
ValidationIgnored,
/// The validation failed.
ValidationFailed,
};

struct DeliveryStatusUnknown {};
struct DeliveryStatusValid {
Time time;
Expand Down Expand Up @@ -226,6 +242,51 @@ namespace libp2p::protocol::gossip::score {
}
}

void rejectInvalidMessage(const PeerId &from, const TopicHash &topic_hash) {
markInvalidMessageDelivery(from, topic_hash);
}

// Reject a message.
void rejectMessage(const PeerId &from,
const MessageId &message_id,
const TopicHash &topic_hash,
RejectReason reason) {
// these messages are not tracked, but the peer is penalized as they are
// invalid
if (reason == RejectReason::ValidationError
or reason == RejectReason::SelfOrigin) {
rejectInvalidMessage(from, topic_hash);
return;
}
// we ignore those messages, so do nothing.
if (reason == RejectReason::BlackListedPeer
or reason == RejectReason::BlackListedSource) {
return;
}
auto &record = deliveries_.getOrDefault(message_id);
// Multiple peers can now reject the same message as we track which peers
// send us the message. If we have already updated the status, return.
if (not std::holds_alternative<DeliveryStatusUnknown>(record.status)) {
return;
}
if (reason == RejectReason::ValidationIgnored) {
// we were explicitly instructed by the validator to ignore the message
// but not penalize the peer
record.status = DeliveryStatusIgnored{};
record.peers.clear();
return;
}
// mark the message as invalid and penalize peers that have already
// forwarded it.
record.status = DeliveryStatusInvalid{};
// release the delivery time tracking map to free some memory early
auto peers = std::exchange(record.peers, {});
markInvalidMessageDelivery(from, topic_hash);
for (auto &peer_id : peers) {
markInvalidMessageDelivery(peer_id, topic_hash);
}
}

void connect(const PeerId &peer_id) {
peer_stats_[peer_id].expires_at.reset();
}
Expand Down
Loading
Loading