Skip to content

Points count in SeriesBuffer can change independent of actual stored points #139

@KevinWMatthews

Description

@KevinWMatthews

Is it possible to document the preconditions for accessing SeriesBuffer safely across threads? It looks to me that the stored data and the description of the stored data can be updated across threads independently, leaving the risk of them getting out of sync.

In more detail, the number of data points in a SeriesBuffer can be changed independent of the actual points:

struct SeriesBuffer {
    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
    count: AtomicUsize,
    // ..

even across threads. Put another way, a race on individual struct members is not possible, but a race between them is. With just the right (well, wrong) implementation and execution conditions, the count can be left inaccurate.

It appears that the SeriesBufferGuard is designed to ameliorate this:

struct SeriesBufferGuard<'sb> {
    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
    count: &'sb AtomicUsize,
}

as it provides a means to update count while the mutex is locked. This seems reasonable to me (and a clever idea!), provided that count is only updated through the guard. This condition is met in SeriesBufferGuard::extend():

impl SeriesBufferGuard<'_> {
    fn extend(/* .. */) {
        // ..
        self.count.fetch_add(new_point_count, Ordering::Release);
    }
}

but it is not explicitly met in SeriesBuffer::take():

impl SeriesBuffer {
    fn take(&self) -> (/* .. */) {
        let mut points = self.lock(); // <-- lock taken
        let result = points // <-- guard used
            .sb
            .drain()
            .map(/* .. */)
            .collect();

        let result_count = self // <-- guard not explicitly held
            .count
            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
            .unwrap();
        (result_count, result)
    }
}

I think that this implementation works in practice due to the drop scope of guard points:

  • the guard is held until the end of the function
  • the guard prevents SeriesBufferGuard::extend() from being called (because another SeriesBufferGuard) can not exist

This is implicit behavior, and leaves a foot-gun for future developers if the drop scope is changed:

impl SeriesBuffer {
    fn take(&self) -> (usize, Vec<Series>) {
        let result = self.lock() // <-- lock taken, guard used and immediately dropped
            .sb
            .drain()
            .map(/* snip */)
            .collect();

        let result_count = self // <-- guard no longer exists, SeriesBufferGuard::extend() can be called
            .count
            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
            .unwrap();
        (result_count, result)
    }
}

As this precondition isn't enforced by the type system or hinted at in documentation, I could see a future developer not noticing this pitfall and making this or a similar mistake.

Are any of these solutions feasible?

  1. move count and points into a new data type so they can be locked together (not implemented; not performant)
  2. always update count through the SeriesBufferGuard (see fix: update point count while MutexGuard is held #140)
  3. move count and points into a new data type that enforces thread-safe read and write (see refactor: add SeriesBufferInner type #144)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions