Skip to content

Commit 9c8571d

Browse files
committed
container/ringbuf: improve, add tests
1 parent 6a835a8 commit 9c8571d

4 files changed

Lines changed: 500 additions & 76 deletions

File tree

container/ringbuf/buffer.go

Lines changed: 193 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,47 @@
11
// Copyright 2025 the toolbox authors.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
// Package ringbuf implements a dynamically growing ring buffer.
45
package ringbuf
56

67
import (
8+
"errors"
9+
"io"
710
"iter"
811
"slices"
912
)
1013

1114
// TODO:
12-
// - Add Reset, Read, ReadFrom, WriteTo, and all the other crazy methods of bytes.Buffer.
13-
// - Add a method for changing maxLen.
14-
// - Add a method for getting the n-th latest byte?
15+
//
16+
// - Add a Truncate() method.
17+
// Of course if cap hasn't been reached, truncation is super easy.
18+
// Otherwise, if the buffer ain't that full,
19+
// we can shift things around in the buffer itself.
20+
// If it's really full, put the extra shit
21+
// in a lil buffer you got from a pool.
22+
//
23+
// - Add the ability to lower the cap of the buffer.
24+
// of course if the new cap(r.buf) is less than len(r.buf),
25+
// we need to jettison SOME data.
26+
// Do we jettison the newest data? The oldest data?
27+
//
28+
// - Add Seek().
29+
30+
var ErrNegativeOffset = errors.New("negative offset")
1531

1632
type Buffer[T any] struct {
1733
buf []T
1834
maxLen int
19-
writePos int
20-
byte [1]T
35+
writePos int // writePos is an actual position in buf.
36+
readPos int // readPos is relative to writePos. It wraps around buf.
37+
byteBuf [1]T
2138
}
2239

40+
// NewBuffer returns a new ring buffer.
41+
//
42+
// Incoming data grows the buffer from initialCap
43+
// (which is zero if not provided)
44+
// to maxLen, then overwrites old data.
2345
func NewBuffer[T any](maxLen int, initialCap ...int) *Buffer[T] {
2446
r := &Buffer[T]{
2547
maxLen: maxLen,
@@ -30,98 +52,204 @@ func NewBuffer[T any](maxLen int, initialCap ...int) *Buffer[T] {
3052
return r
3153
}
3254

33-
func (r *Buffer[T]) Write(b []T) (int, error) {
55+
func (b *Buffer[T]) Write(src []T) (n int, err error) {
3456

35-
if len(b) == 0 {
36-
return 0, nil
57+
n = len(src)
58+
59+
if len(src) == 0 || b.maxLen == 0 {
60+
return n, nil
3761
}
3862

39-
if len(b) >= r.maxLen {
40-
r.writePos = 0
41-
r.buf = slices.Grow(r.buf, r.maxLen-len(r.buf))[:r.maxLen]
42-
copy(r.buf, b[len(b)-r.maxLen:])
43-
return len(b), nil
63+
if len(src) >= b.maxLen {
64+
b.writePos = 0
65+
b.buf = slices.Grow(b.buf, b.rem())[:b.maxLen]
66+
copy(b.buf, src[len(src)-b.maxLen:])
67+
return n, nil
4468
}
4569

46-
rem := r.maxLen - len(r.buf)
47-
if n := min(len(b), rem); n > 0 {
48-
r.buf = append(r.buf, b[:n]...)
49-
b = b[n:]
70+
if m := min(len(src), b.rem()); m > 0 {
71+
b.buf = append(b.buf, src[:m]...)
72+
src = src[m:]
5073
}
5174

52-
for len(b) > 0 {
53-
n := copy(r.buf[r.writePos:], b)
54-
b = b[n:]
55-
r.writePos += n
56-
if r.writePos == r.maxLen {
57-
r.writePos = 0
75+
for len(src) > 0 {
76+
m := copy(b.buf[b.writePos:], src)
77+
src = src[m:]
78+
b.writePos += m
79+
if b.writePos == b.maxLen {
80+
b.writePos = 0
5881
}
5982
}
6083

61-
return len(b), nil
84+
return n, nil
6285
}
6386

64-
func (r *Buffer[T]) WriteByte(v T) error {
65-
r.byte[0] = v
66-
r.Write(r.byte[:])
87+
func (b *Buffer[T]) WriteByte(v T) error {
88+
b.byteBuf[0] = v
89+
b.Write(b.byteBuf[:])
6790
return nil
6891
}
6992

70-
// Bytes returns a copy of the contents of the buffer.
71-
// If you want to access buffer data without copying/allocation,
72-
// consider using [Buffer.BytesSeq], [Buffer.CopyTo] or [Buffer.AppendTo].
73-
//
74-
//x:x
75-
func (r *Buffer[T]) Bytes() []T {
76-
return r.AppendTo(nil)
93+
func (b *Buffer[T]) Read(dest []T) (int, error) {
94+
n, err := b.ReadAt(dest, int64(b.readPos))
95+
b.readPos += n
96+
return n, err
7797
}
7898

79-
// BytesSeq returns an iterator over the segments of the buffer.
80-
// The iterator iterates at least once.
81-
func (r *Buffer[T]) BytesSeq() iter.Seq[[]T] {
82-
return func(yield func([]T) bool) {
83-
if !yield(r.buf[r.writePos:]) {
84-
return
85-
}
86-
if r.writePos > 0 {
87-
yield(r.buf[:r.writePos])
88-
}
99+
func (b *Buffer[T]) ReadByte() (T, error) {
100+
_, err := b.Read(b.byteBuf[:])
101+
return b.byteBuf[0], err
102+
}
103+
104+
func (b *Buffer[T]) ReadAt(dest []T, offset int64) (n int, err error) {
105+
if offset < 0 {
106+
return 0, ErrNegativeOffset
89107
}
108+
if offset >= int64(len(b.buf)) {
109+
return 0, io.EOF
110+
}
111+
if len(dest) == 0 {
112+
return 0, nil
113+
}
114+
for b := range b.seq(int(offset)) {
115+
m := copy(dest, b)
116+
dest = dest[m:]
117+
n += m
118+
}
119+
return n, nil
120+
}
121+
122+
// Bytes returns a copy of the unread elements of the buffer.
123+
//
124+
// If you want to access buffer data without copying/allocation,
125+
// consider using [Buffer.BytesSeq].
126+
func (r *Buffer[T]) Bytes() []T {
127+
b := make([]T, r.Len())
128+
r.ReadAt(b, int64(r.readPos))
129+
return b
90130
}
91131

92-
func (r *Buffer[T]) CopyTo(s []T) int {
93-
n := copy(s, r.buf[r.writePos:])
94-
n += copy(s[n:], r.buf[:r.writePos])
95-
return n
132+
// BytesSeq returns an iterator over the unread elements of the buffer.
133+
// It's similar to [Buffer.Bytes]
134+
//
135+
// The returned slices alias the buffer content
136+
// at least until the next buffer modification.
137+
func (b *Buffer[T]) BytesSeq() iter.Seq[[]T] {
138+
return b.seq(b.readPos)
96139
}
97140

98-
func (r *Buffer[T]) AppendTo(s []T) []T {
99-
s = slices.Grow(s, r.Len())
100-
r.CopyTo(s)
141+
// Next returns a copy of the first n unread elements of the buffer,
142+
// advancing the buffer as if the data had been returned by [Buffer.Read].
143+
//
144+
// If you want to access this data without allocations,
145+
// consider using [Buffer.NextSeq].
146+
func (b *Buffer[T]) Next(n int) []T {
147+
s := make([]T, min(n, b.Len()))
148+
b.Read(s)
101149
return s
102150
}
103151

104-
func (r *Buffer[T]) Len() int {
105-
if r.writePos == 0 {
106-
return len(r.buf)
152+
// NextSeq returns an iterator over the next n elements of the buffer,
153+
// advancing the buffer as if the data had been returned by [Buffer.Read].
154+
//
155+
// The returned slices alias the buffer content
156+
// at least until the next buffer modification.
157+
func (b *Buffer[T]) NextSeq(n int) iter.Seq[[]T] {
158+
if n < 0 {
159+
panic("ringbuf.Buffer.NextSeq: n < 0")
160+
}
161+
return func(yield func([]T) bool) {
162+
for s := range b.seq(b.readPos) {
163+
if n == 0 {
164+
break
165+
}
166+
s = s[:min(len(s), n)]
167+
if !yield(s) {
168+
break
169+
}
170+
n -= len(s)
171+
}
172+
b.readPos += n
107173
}
108-
return r.maxLen
109174
}
110175

111-
func (r *Buffer[T]) MaxLen() int {
112-
return r.maxLen
176+
// seq returns an iterator over the segments of the buffer.
177+
// It does not iterate if the buffer is empty.
178+
func (b *Buffer[T]) seq(offset int) iter.Seq[[]T] {
179+
180+
if offset < 0 {
181+
panic("ringbuf.Buffer.seq: offset < 0")
182+
}
183+
184+
return func(yield func([]T) bool) {
185+
186+
if len(b.buf) == 0 {
187+
return
188+
}
189+
190+
s := b.buf[b.writePos:]
191+
if offset < len(s) && !yield(s[offset:]) {
192+
return
193+
}
194+
195+
offset = max(offset-len(s), 0)
196+
s = b.buf[offset:b.writePos]
197+
if len(s) > 0 && !yield(s[offset:]) {
198+
return
199+
}
200+
}
113201
}
114202

115-
// // Truncate grows or shrinks the maximum length of the buffer to n.
203+
// // Truncate discards all but the first unread bytes from the buffer
204+
// // but continues to use the same allocated storage.
205+
// //
206+
// // It panics if n is negative or greater than r.Len().
116207
// func (r *Buffer[T]) Truncate(n int) {
208+
// if n < 0 {
209+
// panic("ringbuf.Buffer.Truncate: n < 0")
210+
// }
211+
// if n > r.Len() {
212+
// panic("ringbuf.Buffer.Truncate: n > r.Len()")
213+
// }
214+
// if n == r.Len() {
215+
// return
216+
// }
117217
// if r.writePos == 0 {
118-
// r.maxLen = n
119-
// if n < len(r.buf) {
120-
// r.buf = r.buf[len(r.buf)-n:]
121-
// }
218+
// r.buf = r.buf[:r.readPos+n]
219+
// return
220+
// }
221+
// if end := r.writePos + r.readPos + n; end < len(r.buf) && /* overflow check: */ end > 0 {
222+
// copy(r.buf, r.buf[r.writePos:n])
223+
// r.buf = r.buf[:n-r.writePos]
122224
// return
123225
// }
124-
// r2 := NewBuffer[T](r.Len(), n)
125-
// r2.Write(r.Bytes())
126-
// *r = *r2
127226
// }
227+
228+
func (b *Buffer[T]) Reset() {
229+
b.buf = b.buf[:0]
230+
b.writePos = 0
231+
b.readPos = 0
232+
}
233+
234+
// Len returns the number of unread elements of the buffer;
235+
// r.Len() == len(r.Bytes())
236+
func (b *Buffer[T]) Len() int {
237+
return len(b.buf) - b.readPos
238+
}
239+
240+
func (b *Buffer[T]) MaxLen() int {
241+
return b.maxLen
242+
}
243+
244+
func (b *Buffer[T]) Cap() int {
245+
return cap(b.buf)
246+
}
247+
248+
func (b *Buffer[T]) Grow(n int) {
249+
b.buf = slices.Grow(b.buf, min(n, b.rem()))
250+
}
251+
252+
// rem returns the remaining length until maxLen is reached.
253+
func (b *Buffer[T]) rem() int {
254+
return b.maxLen - len(b.buf)
255+
}

0 commit comments

Comments
 (0)