Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 89 additions & 30 deletions src/connmgr/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2020-2023 Fanout, Inc.
* Copyright (C) 2023-2024 Fastly, Inc.
* Copyright (C) 2023-2026 Fastly, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,13 +28,13 @@ pub struct BatchKey {
nkey: usize,
}

pub struct BatchGroup<'a, 'b> {
addr: &'b [u8],
pub struct BatchGroup<'a> {
addr: &'a [u8],
use_router: bool,
ids: memorypool::ReusableVecHandle<'b, zhttppacket::Id<'a>>,
removed: &'a [(usize, bool)],
}

impl<'a> BatchGroup<'a, '_> {
impl BatchGroup<'_> {
pub fn addr(&self) -> &[u8] {
self.addr
}
Expand All @@ -43,9 +43,39 @@ impl<'a> BatchGroup<'a, '_> {
self.use_router
}

pub fn ids(&self) -> &[zhttppacket::Id<'a>] {
/// Returns slice of (ckey, included).
pub fn removed(&self) -> &[(usize, bool)] {
self.removed
}
}

pub struct BatchGroupWithIds<'a, 'b> {
inner: BatchGroup<'a>,
ids: memorypool::ReusableVecHandle<'b, zhttppacket::Id<'b>>,
}

impl<'a, 'b> BatchGroupWithIds<'a, 'b> {
pub fn addr(&self) -> &[u8] {
self.inner.addr()
}

pub fn use_router(&self) -> bool {
self.inner.use_router()
}

/// Returns slice of (ckey, included).
#[cfg(test)]
pub fn removed(&self) -> &[(usize, bool)] {
&self.inner.removed()
}

pub fn ids(&self) -> &[zhttppacket::Id<'b>] {
&self.ids
}

pub fn discard_ids(self) -> BatchGroup<'a> {
self.inner
}
}

struct AddrItem {
Expand All @@ -59,7 +89,7 @@ pub struct Batch {
addrs: Vec<AddrItem>,
addr_index: usize,
group_ids: memorypool::ReusableVec,
last_group_ckeys: Vec<usize>,
group_removed: Vec<(usize, bool)>,
}

impl Batch {
Expand All @@ -69,7 +99,7 @@ impl Batch {
addrs: Vec::with_capacity(capacity),
addr_index: 0,
group_ids: memorypool::ReusableVec::new::<zhttppacket::Id>(capacity),
last_group_ckeys: Vec::with_capacity(capacity),
group_removed: Vec::with_capacity(capacity),
}
}

Expand Down Expand Up @@ -147,13 +177,29 @@ impl Batch {
self.nodes.remove(key.nkey);
}

pub fn take_group<'a, 'b: 'a, F>(&'a mut self, get_id: F) -> Option<BatchGroup<'a, 'b>>
/// Returns a set of IDs for connections in the batch that have the same
/// peer. The caller can then easily send a single packet addressed to
/// all of them. The caller should repeatedly call `take_group` until all
/// the connections are drained from the batch. Returns None when there
/// are no connections in the batch.
///
/// This method works by removing connections from the batch one at a
/// time, and calling the `include` function for each one with its ckey.
/// If `include` returns Some((ID, seq)), then it is included in the
/// returned set of IDs. If it returns None, then the connection is
/// excluded from the set.
///
/// If the batch has connections and `include` returns None for all of
/// them, then this method will return an empty set of IDs.
pub fn take_group<'a: 'b, 'b, F>(&'a mut self, include: F) -> Option<BatchGroupWithIds<'a, 'b>>
where
F: Fn(usize) -> Option<(&'b [u8], u32)>,
{
let addrs = &mut self.addrs;
let mut ids = self.group_ids.get_as_new();

self.group_removed.clear();

while ids.is_empty() {
// Find the next addr with items
while self.addr_index < addrs.len() && addrs[self.addr_index].keys.is_empty() {
Expand All @@ -163,14 +209,11 @@ impl Batch {
// If all are empty, we're done
if self.addr_index == addrs.len() {
assert!(self.nodes.is_empty());
return None;
break;
}

let keys = &mut addrs[self.addr_index].keys;

self.last_group_ckeys.clear();
ids.clear();

// Get ids/seqs
while ids.len() < zhttppacket::IDS_MAX {
let nkey = match keys.pop_front(&mut self.nodes) {
Expand All @@ -179,27 +222,42 @@ impl Batch {
};

let ckey = self.nodes[nkey].value;
self.nodes.remove(nkey);

if let Some((id, seq)) = get_id(ckey) {
self.last_group_ckeys.push(ckey);
let included = if let Some((id, seq)) = include(ckey) {
ids.push(zhttppacket::Id { id, seq: Some(seq) });
}

true
} else {
false
};

self.nodes.remove(nkey);
self.group_removed.push((ckey, included));
}
}

let ai = &addrs[self.addr_index];
if self.group_removed.is_empty() {
assert!(ids.is_empty());
return None;
}

let (addr, use_router): (&[u8], bool) = if !ids.is_empty() {
let ai = &addrs[self.addr_index];

(&ai.addr, ai.use_router)
} else {
(b"", false)
};

Some(BatchGroup {
addr: &ai.addr,
use_router: ai.use_router,
Some(BatchGroupWithIds {
inner: BatchGroup {
addr,
use_router,
removed: &self.group_removed,
},
ids,
})
}

pub fn last_group_ckeys(&self) -> &[usize] {
&self.last_group_ckeys
}
}

#[cfg(test)]
Expand All @@ -213,7 +271,6 @@ mod tests {

assert_eq!(batch.capacity(), 4);
assert_eq!(batch.len(), 0);
assert!(batch.last_group_ckeys().is_empty());

assert!(batch.add(b"addr-a", false, 1).is_ok());
assert!(batch.add(b"addr-a", false, 2).is_ok());
Expand All @@ -235,9 +292,9 @@ mod tests {
assert_eq!(group.ids()[1].seq, Some(0));
assert_eq!(group.addr(), b"addr-a");
assert!(!group.use_router());
assert_eq!(group.removed(), &[(1, true), (2, true)]);
drop(group);
assert_eq!(batch.is_empty(), false);
assert_eq!(batch.last_group_ckeys(), &[1, 2]);

let group = batch
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
Expand All @@ -247,9 +304,9 @@ mod tests {
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert!(!group.use_router());
assert_eq!(group.removed(), &[(3, true)]);
drop(group);
assert_eq!(batch.is_empty(), false);
assert_eq!(batch.last_group_ckeys(), &[3]);

let group = batch
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
Expand All @@ -259,14 +316,13 @@ mod tests {
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert!(group.use_router());
assert_eq!(group.removed(), &[(4, true)]);
drop(group);
assert_eq!(batch.is_empty(), true);
assert_eq!(batch.last_group_ckeys(), &[4]);

assert!(batch
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.is_none());
assert_eq!(batch.last_group_ckeys(), &[4]);
}

#[test]
Expand All @@ -287,6 +343,7 @@ mod tests {
assert_eq!(group.ids()[0].id, b"id-2");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert_eq!(group.removed(), &[(2, true)]);
drop(group);
assert_eq!(batch.is_empty(), true);

Expand All @@ -301,6 +358,7 @@ mod tests {
assert_eq!(group.ids()[0].id, b"id-3");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-a");
assert_eq!(group.removed(), &[(3, true)]);
drop(group);
assert_eq!(batch.is_empty(), true);
}
Expand All @@ -327,6 +385,7 @@ mod tests {
assert_eq!(group.ids()[0].id, b"id-3");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert_eq!(group.removed(), &[(1, false), (2, false), (3, true)]);
drop(group);
assert_eq!(batch.is_empty(), true);
}
Expand Down
78 changes: 46 additions & 32 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2023 Fanout, Inc.
* Copyright (C) 2023-2025 Fastly, Inc.
* Copyright (C) 2023-2026 Fastly, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
* limitations under the License.
*/

use crate::connmgr::batch::{Batch, BatchKey};
use crate::connmgr::batch::{Batch, BatchGroupWithIds, BatchKey};
use crate::connmgr::connection::{
client_req_connection, client_stream_connection, make_zhttp_response, ConnectionPool,
StreamSharedData,
Expand Down Expand Up @@ -116,6 +116,7 @@ fn async_local_channel<T>(
(s, r)
}

#[derive(Copy, Clone)]
enum BatchType {
KeepAlive,
Cancel,
Expand Down Expand Up @@ -145,6 +146,29 @@ impl<T> ChannelPool<T> {
}
}

fn make_batch_response(
from: &str,
btype: BatchType,
group: &BatchGroupWithIds,
) -> Result<(Option<ArrayVec<u8, 64>>, zmq::Message), io::Error> {
assert!(group.ids().len() <= zhttppacket::IDS_MAX);

let zresp = zhttppacket::Response {
from: from.as_bytes(),
ids: group.ids(),
multi: true,
ptype: match btype {
BatchType::KeepAlive => zhttppacket::ResponsePacket::KeepAlive,
BatchType::Cancel => zhttppacket::ResponsePacket::Cancel,
},
ptype_str: "",
};

let mut scratch = [0; BULK_PACKET_SIZE_MAX];

make_zhttp_response(group.addr(), group.use_router(), zresp, &mut scratch)
}

struct ConnectionDone {
ckey: usize,
}
Expand Down Expand Up @@ -399,7 +423,8 @@ impl Connections {
let nodes = &mut items.nodes;
let batch = &mut items.batch;

while !batch.is_empty() {
loop {
// Wrap in a block to avoid lifetime extension
let group = {
let group = batch.take_group(|ckey| {
let ci = &nodes[ckey].value;
Expand All @@ -418,47 +443,36 @@ impl Connections {

match group {
Some(group) => group,
None => continue,
None => break,
}
};

let count = group.ids().len();
let mut to_send = None;

assert!(count <= zhttppacket::IDS_MAX);

let zresp = zhttppacket::Response {
from: from.as_bytes(),
ids: group.ids(),
multi: true,
ptype: match btype {
BatchType::KeepAlive => zhttppacket::ResponsePacket::KeepAlive,
BatchType::Cancel => zhttppacket::ResponsePacket::Cancel,
},
ptype_str: "",
};
if count > 0 {
match make_batch_response(from, btype, &group) {
Ok(ret) => to_send = Some(ret),
Err(e) => error!("failed to serialize batched packet with {count} ids: {e}"),
}
}

let mut scratch = [0; BULK_PACKET_SIZE_MAX];
let group = group.discard_ids();

let (addr, msg) =
match make_zhttp_response(group.addr(), group.use_router(), zresp, &mut scratch) {
Ok(resp) => resp,
Err(e) => {
error!(
"failed to serialize keep-alive packet with {} ids: {}",
count, e
);
continue;
}
};
// Before we do anything that might fail, let's clear the batch keys
for &(ckey, _) in group.removed() {
let ci = &mut nodes[ckey].value;
ci.batch_key = None;
}

drop(group);
let Some((addr, msg)) = to_send else {
continue;
};

for &ckey in batch.last_group_ckeys() {
for &(ckey, _) in group.removed().iter().filter(|(_, included)| *included) {
let ci = &mut nodes[ckey].value;
let cshared = ci.shared.as_ref().unwrap();

cshared.inc_out_seq();
ci.batch_key = None;
}

return Some((count, addr, msg));
Expand Down
Loading
Loading