Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
# Disable warnings as errors
rustflags: ""
- run: sudo apt-get update && sudo apt-get install -y libglib2.0-dev libgtk-3-dev libjavascriptcoregtk-4.1-dev libsoup-3.0 libwebkit2gtk-4.1-dev libxdo-dev
- run: cargo test --all-features
- run: cargo update -Z minimal-versions && cargo test --all-features

# Check formatting with rustfmt
formatting:
Expand Down
638 changes: 638 additions & 0 deletions ossa-core/docs/miniprotocols/store_sc_sync.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ossa-core/src/protocol/store_bft_sync/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ where
}

fn run_client<S: Stream<Self::Message>>(
self,
mut self,
mut stream: S,
) -> impl Future<Output = ()> + Send {
async move {
debug!("StoreBFTSync client running!");
let mut bft_sync: Option<BFTSyncResponder> = None;
let mut bft_sync: Option<BFTSyncResponder<SHeaderId>> = None;

// TODO: Check when done.
loop {
Expand Down
94 changes: 69 additions & 25 deletions ossa-core/src/protocol/store_peer/dag_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub(crate) enum MsgDAGSyncResponse<HeaderId, Header> {
// Has initiative
pub(crate) struct DAGSyncInitiator<Hash, HeaderId, Header> {
have: Vec<HeaderId>,
sent_tips: BTreeSet<HeaderId>,
phantom: PhantomData<fn(Hash, HeaderId, Header)>,
}

Expand All @@ -98,6 +99,33 @@ impl<
Header: Debug + Send + Sync,
> DAGSyncInitiator<Hash, HeaderId, Header>
{
/// Select tips that haven't been sent yet, capped at `MAX_HAVE_HEADERS`.
///
/// Prunes `sent_tips` of entries no longer in the frontier, then collects
/// unsent tips (up to the cap) and records them as sent.
fn select_new_tips(
dag_state: &dag::UntypedState<HeaderId, Header>,
sent_tips: &mut BTreeSet<HeaderId>,
) -> Vec<HeaderId> {
let current_tips = dag_state.tips();

// Prune sent_tips: remove entries no longer in the frontier.
sent_tips.retain(|t| current_tips.contains(t));

// Collect unsent tips, capped at MAX_HAVE_HEADERS.
let new_tips: Vec<HeaderId> = current_tips
.iter()
.filter(|t| !sent_tips.contains(t))
.take(MAX_HAVE_HEADERS.into())
.cloned()
.collect();

// Record them as sent.
sent_tips.extend(new_tips.iter().cloned());

new_tips
}

async fn receive_response_helper<S: Stream<Msg>, Msg>(
stream: &mut S,
) -> (Vec<HeaderId>, Vec<(Header, RawDAGBody)>)
Expand Down Expand Up @@ -131,18 +159,18 @@ impl<
MsgDAGSyncRequest<HeaderId>: Into<Msg>,
Msg: TryInto<MsgDAGSyncResponse<HeaderId, Header>>,
{
// TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS)
warn!("TODO: Check request sizes.");
let req = MsgDAGSyncRequest::DAGInitialSync {
tips: dag_state.tips().iter().cloned().collect(),
};
let mut sent_tips = BTreeSet::new();
let tips = Self::select_new_tips(dag_state, &mut sent_tips);
let req = MsgDAGSyncRequest::DAGInitialSync { tips };
send(stream, req).await.expect("TODO");

// Receive response.
let (have, operations) = Self::receive_response_helper(stream).await;

let dag_sync = DAGSyncInitiator {
have,
sent_tips,
phantom: PhantomData,
};

Expand All @@ -168,10 +196,9 @@ impl<
}
}

// TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS)
warn!("TODO: Check request sizes.");
let tips = Self::select_new_tips(dag_state, &mut self.sent_tips);
let req = MsgDAGSyncRequest::DAGSync {
tips: dag_state.tips().iter().cloned().collect(), // JP: Why does this send all the tips again? TODO: Limit the tips... Maybe do this on the DAG construction side?
tips,
known: known_bitmap,
};
send(stream, req).await.expect("TODO");
Expand Down Expand Up @@ -207,8 +234,8 @@ pub(crate) fn mark_as_known_helper<HeaderId, Header>(
queue.push_back(header_id);

while let Some(header_id) = queue.pop_front() {
let contains = their_known.insert(header_id);
if !contains {
let newly_inserted = their_known.insert(header_id);
if newly_inserted {
if let Some(parents) = state.get_parents(&header_id) {
queue.extend(parents);
} else {
Expand Down Expand Up @@ -356,6 +383,31 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
.await;
}

// Send children of a node as long as their parents are known.
fn send_children(
&mut self,
ecg_state: &dag::UntypedState<HeaderId, Header>,
header_id: &HeaderId,
)
where
HeaderId: Ord + Copy,
{
let children = ecg_state
.get_children_with_depth(header_id)
.expect("Unreachable since we have this header.")
.into_iter()
// Skip if any children are unknown.
.filter(|c_id|
!ecg_state
.get_parents(&c_id.1)
.expect("We know this header.")
.iter()
.any(|p| !self.they_know(p))
)
.collect::<Vec<_>>();
self.send_queue.extend(children);
}

fn prepare_operations(
&mut self,
ecg_state: &dag::UntypedState<HeaderId, Header>,
Expand All @@ -366,9 +418,10 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
{
let mut operations = Vec::with_capacity(MAX_DELIVER_HEADERS as usize);

while let Some((_depth, header_id)) = self.send_queue.pop() {
while let Some((depth, header_id)) = self.send_queue.pop() {
// Skip if they already know this header.
let skip = self.they_know(&header_id);

if !skip {
// Send header to peer.
if let Some(node) = ecg_state.get_node(&header_id) {
Expand All @@ -382,10 +435,7 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
}

// Add children to queue.
let children = ecg_state
.get_children_with_depth(&header_id)
.expect("Unreachable since we proposed this header.");
self.send_queue.extend(children);
self.send_children(ecg_state, &header_id);

if operations.len() == MAX_DELIVER_HEADERS.into() {
return operations;
Expand Down Expand Up @@ -449,10 +499,7 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
self.mark_as_known(ecg_state, *header_id);

// Add children to send queue.
let children = ecg_state
.get_children_with_depth(header_id)
.expect("Unreachable since we have this header.");
self.send_queue.extend(children);
self.send_children(ecg_state, &header_id);
} else {
// Record header as known by them but not us.
self.our_unknown.insert(*header_id);
Expand Down Expand Up @@ -488,20 +535,17 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
) where
HeaderId: Copy + Ord,
{
for (i, header_id) in self.sent_haves.iter().cloned().enumerate() {
for (i, header_id) in self.sent_haves.clone().iter().enumerate() {
// Check if they claimed they know this header.
let they_know = *their_known
.get(i)
.expect("Unreachable since we're iterating on the headers we sent.");
if they_know {
// Mark header as known by them.
mark_as_known_helper(&mut self.their_known, ecg_state, header_id);
mark_as_known_helper(&mut self.their_known, ecg_state, *header_id);

// Send children if they know this node.
let children = ecg_state
.get_children_with_depth(&header_id)
.expect("Unreachable since we sent this header.");
self.send_queue.extend(children);
self.send_children(ecg_state, &header_id);
} else {
let parents = ecg_state
.get_parents(&header_id)
Expand All @@ -513,7 +557,7 @@ impl<Hash, HeaderId, Header> DAGSyncResponder<Hash, HeaderId, Header> {
let depth = ecg_state
.get_header_depth(&header_id)
.expect("Unreachable since we sent this header.");
self.send_queue.push((Reverse(depth), header_id));
self.send_queue.push((Reverse(depth), *header_id));
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ossa-core/src/protocol/store_sc_dag/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
pub mod v0;

#[cfg(test)]
mod test;
Loading
Loading