Skip to content

Commit 90f0f02

Browse files
authored
Merge pull request #56 from keymantics/features/v1.3.0-async-send
Async send
2 parents 8bbc235 + 7d967ba commit 90f0f02

File tree

3 files changed

+99
-84
lines changed

3 files changed

+99
-84
lines changed

fluent/fluent.go

Lines changed: 77 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math"
88
"net"
9+
"os"
910
"reflect"
1011
"strconv"
1112
"sync"
@@ -21,8 +22,9 @@ const (
2122
defaultPort = 24224
2223
defaultTimeout = 3 * time.Second
2324
defaultWriteTimeout = time.Duration(0) // Write() will not time out
24-
defaultBufferLimit = 8 * 1024 * 1024
25+
defaultBufferLimit = 8 * 1024
2526
defaultRetryWait = 500
27+
defaultMaxRetryWait = 60000
2628
defaultMaxRetry = 13
2729
defaultReconnectWaitIncreRate = 1.5
2830
// Default sub-second precision value to false since it is only compatible
@@ -40,9 +42,12 @@ type Config struct {
4042
BufferLimit int `json:"buffer_limit"`
4143
RetryWait int `json:"retry_wait"`
4244
MaxRetry int `json:"max_retry"`
45+
MaxRetryWait int `json:"max_retry_wait"`
4346
TagPrefix string `json:"tag_prefix"`
44-
AsyncConnect bool `json:"async_connect"`
45-
MarshalAsJSON bool `json:"marshal_as_json"`
47+
Async bool `json:"async"`
48+
// Deprecated: Use Async instead
49+
AsyncConnect bool `json:"async_connect"`
50+
MarshalAsJSON bool `json:"marshal_as_json"`
4651

4752
// Sub-second precision timestamps are only possible for those using fluentd
4853
// v0.14+ and serializing their messages with msgpack.
@@ -52,12 +57,11 @@ type Config struct {
5257
type Fluent struct {
5358
Config
5459

55-
mubuff sync.Mutex
56-
pending []byte
60+
pending chan []byte
61+
wg sync.WaitGroup
5762

58-
muconn sync.Mutex
59-
conn net.Conn
60-
reconnecting bool
63+
muconn sync.Mutex
64+
conn net.Conn
6165
}
6266

6367
// New creates a new Logger.
@@ -89,11 +93,22 @@ func New(config Config) (f *Fluent, err error) {
8993
if config.MaxRetry == 0 {
9094
config.MaxRetry = defaultMaxRetry
9195
}
96+
if config.MaxRetryWait == 0 {
97+
config.MaxRetryWait = defaultMaxRetryWait
98+
}
9299
if config.AsyncConnect {
93-
f = &Fluent{Config: config, reconnecting: true}
94-
go f.reconnect()
100+
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
101+
config.Async = config.Async || config.AsyncConnect
102+
}
103+
if config.Async {
104+
f = &Fluent{
105+
Config: config,
106+
pending: make(chan []byte, config.BufferLimit),
107+
}
108+
f.wg.Add(1)
109+
go f.run()
95110
} else {
96-
f = &Fluent{Config: config, reconnecting: false}
111+
f = &Fluent{Config: config}
97112
err = f.connect()
98113
}
99114
return
@@ -187,14 +202,11 @@ func (f *Fluent) PostRawData(data []byte) {
187202
}
188203

189204
func (f *Fluent) postRawData(data []byte) error {
190-
if err := f.appendBuffer(data); err != nil {
191-
return err
192-
}
193-
if err := f.send(); err != nil {
194-
f.close()
195-
return err
205+
if f.Config.Async {
206+
return f.appendBuffer(data)
196207
}
197-
return nil
208+
// Synchronous write
209+
return f.write(data)
198210
}
199211

200212
// For sending forward protocol adopted JSON
@@ -227,23 +239,23 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data
227239
return
228240
}
229241

230-
// Close closes the connection.
242+
// Close closes the connection, waiting for pending logs to be sent
231243
func (f *Fluent) Close() (err error) {
232-
if len(f.pending) > 0 {
233-
err = f.send()
244+
if f.Config.Async {
245+
close(f.pending)
246+
f.wg.Wait()
234247
}
235248
f.close()
236249
return
237250
}
238251

239252
// appendBuffer appends data to buffer with lock.
240253
func (f *Fluent) appendBuffer(data []byte) error {
241-
f.mubuff.Lock()
242-
defer f.mubuff.Unlock()
243-
if len(f.pending)+len(data) > f.Config.BufferLimit {
244-
return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
254+
select {
255+
case f.pending <- data:
256+
default:
257+
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
245258
}
246-
f.pending = append(f.pending, data...)
247259
return nil
248260
}
249261

@@ -259,8 +271,6 @@ func (f *Fluent) close() {
259271

260272
// connect establishes a new connection using the specified transport.
261273
func (f *Fluent) connect() (err error) {
262-
f.muconn.Lock()
263-
defer f.muconn.Unlock()
264274

265275
switch f.Config.FluentNetwork {
266276
case "tcp":
@@ -270,63 +280,63 @@ func (f *Fluent) connect() (err error) {
270280
default:
271281
err = net.UnknownNetworkError(f.Config.FluentNetwork)
272282
}
283+
return err
284+
}
273285

274-
if err == nil {
275-
f.reconnecting = false
286+
func (f *Fluent) run() {
287+
for {
288+
select {
289+
case entry, ok := <-f.pending:
290+
if !ok {
291+
f.wg.Done()
292+
return
293+
}
294+
err := f.write(entry)
295+
if err != nil {
296+
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
297+
}
298+
}
276299
}
277-
return
278300
}
279301

280302
func e(x, y float64) int {
281303
return int(math.Pow(x, y))
282304
}
283305

284-
func (f *Fluent) reconnect() {
285-
for i := 0; ; i++ {
286-
err := f.connect()
287-
if err == nil {
288-
f.send()
289-
return
290-
}
291-
if i == f.Config.MaxRetry {
292-
// TODO: What we can do when connection failed MaxRetry times?
293-
panic("fluent#reconnect: failed to reconnect!")
294-
}
295-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
296-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
297-
}
298-
}
299-
300-
func (f *Fluent) send() error {
301-
f.muconn.Lock()
302-
defer f.muconn.Unlock()
303-
304-
if f.conn == nil {
305-
if f.reconnecting == false {
306-
f.reconnecting = true
307-
go f.reconnect()
306+
func (f *Fluent) write(data []byte) error {
307+
308+
for i := 0; i < f.Config.MaxRetry; i++ {
309+
310+
// Connect if needed
311+
f.muconn.Lock()
312+
if f.conn == nil {
313+
err := f.connect()
314+
if err != nil {
315+
f.muconn.Unlock()
316+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
317+
if waitTime > f.Config.MaxRetryWait {
318+
waitTime = f.Config.MaxRetryWait
319+
}
320+
time.Sleep(time.Duration(waitTime) * time.Millisecond)
321+
continue
322+
}
308323
}
309-
return errors.New("fluent#send: can't send logs, client is reconnecting")
310-
}
311-
312-
f.mubuff.Lock()
313-
defer f.mubuff.Unlock()
324+
f.muconn.Unlock()
314325

315-
var err error
316-
if len(f.pending) > 0 {
326+
// We're connected, write data
317327
t := f.Config.WriteTimeout
318328
if time.Duration(0) < t {
319329
f.conn.SetWriteDeadline(time.Now().Add(t))
320330
} else {
321331
f.conn.SetWriteDeadline(time.Time{})
322332
}
323-
_, err = f.conn.Write(f.pending)
333+
_, err := f.conn.Write(data)
324334
if err != nil {
325-
f.conn.Close()
326-
f.conn = nil
335+
f.close()
327336
} else {
328-
f.pending = f.pending[:0]
337+
return err
329338
}
330339
}
331-
return err
340+
341+
return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
332342
}

fluent/fluent_test.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,16 @@ func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) {
140140
}
141141

142142
func Test_send_WritePendingToConn(t *testing.T) {
143-
f := &Fluent{Config: Config{}, reconnecting: false}
143+
f, _ := New(Config{Async: true})
144144

145145
conn := &Conn{}
146146
f.conn = conn
147147

148148
msg := "This is test writing."
149149
bmsg := []byte(msg)
150-
f.pending = append(f.pending, bmsg...)
150+
f.pending <- bmsg
151151

152-
err := f.send()
152+
err := f.write(bmsg)
153153
if err != nil {
154154
t.Error(err)
155155
}
@@ -159,10 +159,12 @@ func Test_send_WritePendingToConn(t *testing.T) {
159159
if string(rcv) != msg {
160160
t.Errorf("got %s, except %s", string(rcv), msg)
161161
}
162+
163+
f.Close()
162164
}
163165

164166
func Test_MarshalAsMsgpack(t *testing.T) {
165-
f := &Fluent{Config: Config{}, reconnecting: false}
167+
f := &Fluent{Config: Config{}}
166168

167169
conn := &Conn{}
168170
f.conn = conn
@@ -193,7 +195,6 @@ func Test_SubSecondPrecision(t *testing.T) {
193195
Config: Config{
194196
SubSecondPrecision: true,
195197
},
196-
reconnecting: false,
197198
}
198199
fluent.conn = &Conn{}
199200

@@ -209,13 +210,17 @@ func Test_SubSecondPrecision(t *testing.T) {
209210
t.Error(err)
210211
}
211212

212-
expected := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
213+
// 8 bytes timestamp can be represented using ext 8 or fixext 8
214+
expected1 := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
215+
expected2 := "\x94\xa3tag\xD7\x00K\x92\x1Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xc0"
213216
actual := string(encodedData)
214-
assert.Equal(t, expected, actual)
217+
if actual != expected1 && actual != expected2 {
218+
t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2)
219+
}
215220
}
216221

217222
func Test_MarshalAsJSON(t *testing.T) {
218-
f := &Fluent{Config: Config{MarshalAsJSON: true}, reconnecting: false}
223+
f := &Fluent{Config: Config{MarshalAsJSON: true}}
219224

220225
conn := &Conn{}
221226
f.conn = conn
@@ -250,11 +255,11 @@ func TestJsonConfig(t *testing.T) {
250255
FluentSocketPath: "/var/tmp/fluent.sock",
251256
Timeout: 3000,
252257
WriteTimeout: 6000,
253-
BufferLimit: 200,
258+
BufferLimit: 10,
254259
RetryWait: 5,
255260
MaxRetry: 3,
256261
TagPrefix: "fluent",
257-
AsyncConnect: false,
262+
Async: false,
258263
MarshalAsJSON: true,
259264
}
260265

@@ -276,8 +281,8 @@ func TestAsyncConnect(t *testing.T) {
276281
ch := make(chan result, 1)
277282
go func() {
278283
config := Config{
279-
FluentPort: 8888,
280-
AsyncConnect: true,
284+
FluentPort: 8888,
285+
Async: true,
281286
}
282287
f, err := New(config)
283288
ch <- result{f: f, err: err}
@@ -291,14 +296,14 @@ func TestAsyncConnect(t *testing.T) {
291296
}
292297
res.f.Close()
293298
case <-time.After(time.Millisecond * 500):
294-
t.Error("AsyncConnect must not block")
299+
t.Error("Async must not block")
295300
}
296301
}
297302

298303
func Test_PostWithTimeNotTimeOut(t *testing.T) {
299304
f, err := New(Config{
300305
FluentPort: 6666,
301-
AsyncConnect: false,
306+
Async: false,
302307
MarshalAsJSON: true, // easy to check equality
303308
})
304309
if err != nil {
@@ -343,7 +348,7 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
343348
func Test_PostMsgpMarshaler(t *testing.T) {
344349
f, err := New(Config{
345350
FluentPort: 6666,
346-
AsyncConnect: false,
351+
Async: false,
347352
MarshalAsJSON: true, // easy to check equality
348353
})
349354
if err != nil {

fluent/testdata/config.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
"fluent_socket_path":"/var/tmp/fluent.sock",
66
"timeout":3000,
77
"write_timeout":6000,
8-
"buffer_limit":200,
8+
"buffer_limit":10,
99
"retry_wait":5,
1010
"max_retry":3,
1111
"tag_prefix":"fluent",
12-
"async_connect": false,
12+
"async": false,
1313
"marshal_as_json": true
1414
}

0 commit comments

Comments
 (0)