Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions log/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func (l *Logstream) Log(fields log.Fields) {
fmt.Fprintf(l.writer, l.format, vals...)
}

// Write writes p to the internal buffer.
func (l *Logstream) Write(p []byte) (int, error) {
l.mux.Lock()
defer l.mux.Unlock()
return l.writer.Write(p)
}

// Run is usually used as a deamon. All the buffered data is flushed periodically
// until it is stopped.
func (l *Logstream) Run() {
Expand Down
24 changes: 16 additions & 8 deletions log/logstream/logstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/yieldr/go-log/log"
)

func TestNewLogstream(t *testing.T) {
l := New(nil, time.Second, log.BasicFormat, log.BasicFields)
assert.NotNil(t, l)
if l == nil {
t.Fatal("NewLogstream failed")
}
}

func TestLogStreamLog(t *testing.T) {
stream := new(StreamMock)
stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil)

l := &Logstream{
format: log.BasicFormat,
Expand All @@ -30,13 +29,16 @@ func TestLogStreamLog(t *testing.T) {
"message": func() interface{} { return "foo" },
}
l.Log(fields)
l.Log(fields)
l.Flush()

assert.Equal(t, "now [INFO] foo\n", string(l.writer.buffer[0]))
if stream.buf.String() != "now [INFO] foo\nnow [INFO] foo\n" {
t.Error("Logstream log buffer not matched")
}
}

func TestLogStreamRun(t *testing.T) {
stream := new(StreamMock)
stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil)

l := &Logstream{
interval: time.Second * 3,
Expand All @@ -60,8 +62,14 @@ func TestLogStreamRun(t *testing.T) {

// data is flushed every 5s
time.Sleep(time.Second * 5)
assert.Nil(t, l.writer.buffer)
assert.Equal(t, "now [INFO] foo\n", stream.buf.String())

if 0 != l.writer.buf.getSize() {
t.Error("writer buffer size should be 0.")
}

if "now [INFO] foo\n" != stream.buf.String() {
t.Error("writer buffer content not match")
}

// stop
l.Stop()
Expand Down
86 changes: 68 additions & 18 deletions log/logstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package logstream

import (
"bytes"

"github.com/stretchr/testify/mock"
"errors"
)

// StreamRecord represents a record to be sent to Stream.
Expand All @@ -27,20 +26,20 @@ type StreamResponse interface {
type StreamWriter struct {
stream Stream

buffer []StreamRecord
bufferSize int

maxBufferItems int
maxBufferSize int

buf *recordBuffer
}

// NewStreamWriter creates a new stream writer.
func NewStreamWriter(s Stream) *StreamWriter {
return &StreamWriter{
stream: s,
bufferSize: 0,
maxBufferItems: 500,
maxBufferSize: 1024 * 1024, //1MB

buf: newRecordBuffer(500),
}
}

Expand All @@ -50,46 +49,98 @@ func NewStreamWriter(s Stream) *StreamWriter {
func (s *StreamWriter) Write(p []byte) (n int, err error) {
n = len(p)

if n > 0 {
s.buffer = append(s.buffer, StreamRecord(p))
s.bufferSize += n
if s.buf.getItems() >= s.maxBufferItems || s.buf.getSize() >= s.maxBufferSize {
if err = s.Flush(); err != nil {
return 0, err
}
}

if s.bufferSize > s.maxBufferSize || len(s.buffer) > s.maxBufferItems {
err = s.Flush()
// Do not just retain or modify p, copy it!
// See:http://golang.org/pkg/io/#Writer
data := make([]byte, n)
copy(data, p)

if err = s.buf.append(StreamRecord(data)); err != nil {
return 0, err
}

return
}

// Flush buffered data into the stream.
func (s *StreamWriter) Flush() error {
_, err := s.stream.Put(s.buffer)
_, err := s.stream.Put(s.buf.getRecords())
s.Reset()
return err
}

// Reset the internal fields in s.
func (s *StreamWriter) Reset() {
s.buffer = nil
s.bufferSize = 0
s.buf.reset()
}

// Close the stream in s.
func (s *StreamWriter) Close() error {
return s.stream.Close()
}

// recordBuffer uses a pre-allocated, fixed-size slice to buffer StreamRecord.
type recordBuffer struct {
records []StreamRecord
pos int
size int
}

// newWriterBuffer returns a new initialized recordBuffer.
func newRecordBuffer(maxRecords int) *recordBuffer {
return &recordBuffer{
records: make([]StreamRecord, maxRecords),
pos: 0,
size: 0,
}
}

// reset resets the current position and size of records.
func (r *recordBuffer) reset() {
r.pos = 0
r.size = 0
}

// append appends r into the r.records.
func (r *recordBuffer) append(s StreamRecord) error {
if r.pos >= len(r.records) {
return errors.New("reach the end of buffer.")
}

r.records[r.pos] = s
r.pos++
r.size += len(s)
return nil
}

// getSize returns the byte size of r.
func (r *recordBuffer) getSize() int {
return r.size
}

// getItems returns the number of records in r.
func (r *recordBuffer) getItems() int {
return r.pos
}

// getRecords returns a new slice of all the stored records in r.
func (r *recordBuffer) getRecords() []StreamRecord {
return r.records[0:r.pos]
}

// StreamResponseMock is a mock for StreamResponse.
type StreamResponseMock struct {
StreamResponse
mock.Mock
}

// StreamMock is a mock for Stream.
type StreamMock struct {
Stream
mock.Mock
buf bytes.Buffer
}

Expand All @@ -100,6 +151,5 @@ func (s *StreamMock) Put(records []StreamRecord) (StreamResponse, error) {
s.buf.Write(r)
}

args := s.Called(records)
return args.Get(0).(StreamResponse), args.Error(1)
return new(StreamResponseMock), nil
}
26 changes: 19 additions & 7 deletions log/logstream/kinesis.go → log/logstream/stream/kinesis.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
package logstream
package stream

import (
"errors"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"

"github.com/yieldr/go-log/log/logstream"
)

// Kinesis implements Stream interface and wraps a kinesis client.
// TODO: need aws.Config.
type Kinesis struct {
streamName string
stream kinesis.Kinesis
name string
stream *kinesis.Kinesis
}

// New created a new Kinesis stream with given name and config.
func New(name string, c aws.Config) logstream.Stream {
return &Kinesis{
name: name,
stream: kinesis.New(&c),
}
}

// Put records into a remote kinesis stream.
func (k *Kinesis) Put(records []StreamRecord) (StreamResponse, error) {
func (k *Kinesis) Put(records []logstream.StreamRecord) (logstream.StreamResponse, error) {
if len(records) == 0 {
return nil, errors.New("empty records for kinesis.")
}

entries := make([]*kinesis.PutRecordsRequestEntry, len(records))
for i, record := range records {
Expand All @@ -28,14 +41,13 @@ func (k *Kinesis) Put(records []StreamRecord) (StreamResponse, error) {

params := &kinesis.PutRecordsInput{
Records: entries,
StreamName: aws.String(k.streamName),
StreamName: aws.String(k.name),
}

return k.stream.PutRecords(params)
}

// Close.
// TODO: do we close the connection to kinesis?
func (k *Kinesis) Close() error {
return nil
}
Expand Down
Loading