Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions nominal-streaming/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::UNIX_EPOCH;

use conjure_object::BearerToken;
use conjure_object::ResourceIdentifier;
use inner::SeriesBufferInner;
use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
Expand All @@ -24,7 +25,6 @@ use nominal_api::tonic::io::nominal::scout::api::proto::Series;
use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
use parking_lot::Condvar;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use tracing::debug;
use tracing::error;
Expand Down Expand Up @@ -495,8 +495,7 @@ impl NominalStringWriter<'_> {
}

struct SeriesBuffer {
points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
count: AtomicUsize,
inner: SeriesBufferInner,
flush_time: AtomicU64,
condvar: Condvar,
max_capacity: usize,
Expand Down Expand Up @@ -639,8 +638,7 @@ impl PartialOrd for SeriesBuffer {
impl SeriesBuffer {
fn new(capacity: usize) -> Self {
Self {
points: Mutex::new(HashMap::new()),
count: AtomicUsize::new(0),
inner: SeriesBufferInner::new(),
flush_time: AtomicU64::new(0),
condvar: Condvar::new(),
max_capacity: capacity,
Expand All @@ -652,24 +650,21 @@ impl SeriesBuffer {
/// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
/// to handle splitting batches of points across multiple requests.
fn has_capacity(&self, new_points_count: usize) -> bool {
let count = self.count.load(Ordering::Acquire);
let count = self.count();
count == 0 || count + new_points_count <= self.max_capacity
}

fn lock(&self) -> SeriesBufferGuard<'_> {
SeriesBufferGuard {
sb: self.points.lock(),
count: &self.count,
}
self.inner.lock()
}

fn take(&self) -> (usize, Vec<Series>) {
let mut points = self.lock();
let mut guard = self.lock();
self.flush_time.store(
UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
Ordering::Release,
);
let result = points
let result = guard
.sb
.drain()
.map(|(ChannelDescriptor { name, tags }, points)| {
Expand All @@ -686,7 +681,7 @@ impl SeriesBuffer {
}
})
.collect();
let result_count = self
let result_count = guard
.count
.fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
.unwrap();
Expand All @@ -698,29 +693,57 @@ impl SeriesBuffer {
}

fn count(&self) -> usize {
self.count.load(Ordering::Acquire)
self.inner.count()
}

fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
let mut points_lock = self.points.lock();
let mut guard = self.inner.lock();
// concurrency bug without this - the buffer could have been emptied since we
// checked the count, so this will wait forever & block any new points from entering
if !points_lock.is_empty() {
self.condvar.wait(&mut points_lock);
if !guard.sb.is_empty() {
self.condvar.wait(&mut guard.sb);
} else {
debug!("buffer emptied since last check, skipping condvar wait");
}
on_notify(SeriesBufferGuard {
sb: points_lock,
count: &self.count,
});
on_notify(guard);
}

fn notify(&self) -> bool {
self.condvar.notify_one()
}
}

mod inner {
use parking_lot::Mutex;

use super::*;

pub(super) struct SeriesBufferInner {
points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
count: AtomicUsize,
}

impl SeriesBufferInner {
pub(super) fn new() -> Self {
Self {
points: Mutex::new(HashMap::new()),
count: AtomicUsize::new(0),
}
}

pub(super) fn count(&self) -> usize {
self.count.load(Ordering::Acquire)
}

pub(super) fn lock(&self) -> SeriesBufferGuard<'_> {
SeriesBufferGuard {
sb: self.points.lock(),
count: &self.count,
}
}
}
}

fn batch_processor(
running: Arc<AtomicBool>,
points_buffer: Arc<SeriesBuffer>,
Expand Down