Skip to content

Commit 3c0ef01

Browse files
committed
Reworked Asynchronous mode: Fluentd Post data is no longer blocking in asynchronous mode
- Configuration AsyncConnect is now named Async. - Errors during reconnect no longer cause panic. - Synchronous mode is now always synchronous (reconnect synchronously). Signed-off-by: alexlry <alexandre@keymantics.com>
1 parent 70eca0b commit 3c0ef01

File tree

3 files changed

+87
-85
lines changed

3 files changed

+87
-85
lines changed

fluent/fluent.go

Lines changed: 71 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math"
88
"net"
9+
"os"
910
"reflect"
1011
"strconv"
1112
"sync"
@@ -21,7 +22,7 @@ const (
2122
defaultPort = 24224
2223
defaultTimeout = 3 * time.Second
2324
defaultWriteTimeout = time.Duration(0) // Write() will not time out
24-
defaultBufferLimit = 8 * 1024 * 1024
25+
defaultBufferLimit = 8 * 1024
2526
defaultRetryWait = 500
2627
defaultMaxRetryWait = 60000
2728
defaultMaxRetry = 13
@@ -43,8 +44,10 @@ type Config struct {
4344
MaxRetry int `json:"max_retry"`
4445
MaxRetryWait int `json:"max_retry_wait"`
4546
TagPrefix string `json:"tag_prefix"`
46-
AsyncConnect bool `json:"async_connect"`
47-
MarshalAsJSON bool `json:"marshal_as_json"`
47+
Async bool `json:"async"`
48+
// Deprecated: Use Async instead
49+
AsyncConnect bool `json:"async_connect"`
50+
MarshalAsJSON bool `json:"marshal_as_json"`
4851

4952
// Sub-second precision timestamps are only possible for those using fluentd
5053
// v0.14+ and serializing their messages with msgpack.
@@ -54,12 +57,11 @@ type Config struct {
5457
type Fluent struct {
5558
Config
5659

57-
mubuff sync.Mutex
58-
pending []byte
60+
pending chan []byte
61+
wg sync.WaitGroup
5962

60-
muconn sync.Mutex
61-
conn net.Conn
62-
reconnecting bool
63+
muconn sync.Mutex
64+
conn net.Conn
6365
}
6466

6567
// New creates a new Logger.
@@ -95,10 +97,17 @@ func New(config Config) (f *Fluent, err error) {
9597
config.MaxRetryWait = defaultMaxRetryWait
9698
}
9799
if config.AsyncConnect {
98-
f = &Fluent{Config: config, reconnecting: true}
99-
go f.reconnect()
100+
config.Async = config.Async || config.AsyncConnect
101+
}
102+
if config.Async {
103+
f = &Fluent{
104+
Config: config,
105+
pending: make(chan []byte, config.BufferLimit),
106+
}
107+
f.wg.Add(1)
108+
go f.run()
100109
} else {
101-
f = &Fluent{Config: config, reconnecting: false}
110+
f = &Fluent{Config: config}
102111
err = f.connect()
103112
}
104113
return
@@ -192,14 +201,11 @@ func (f *Fluent) PostRawData(data []byte) {
192201
}
193202

194203
func (f *Fluent) postRawData(data []byte) error {
195-
if err := f.appendBuffer(data); err != nil {
196-
return err
204+
if f.Config.Async {
205+
return f.appendBuffer(data)
197206
}
198-
if err := f.send(); err != nil {
199-
f.close()
200-
return err
201-
}
202-
return nil
207+
// Synchronous write
208+
return f.write(data)
203209
}
204210

205211
// For sending forward protocol adopted JSON
@@ -232,23 +238,23 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data
232238
return
233239
}
234240

235-
// Close closes the connection.
241+
// Close closes the connection, waiting for pending logs to be sent
236242
func (f *Fluent) Close() (err error) {
237-
if len(f.pending) > 0 {
238-
err = f.send()
243+
if f.Config.Async {
244+
close(f.pending)
245+
f.wg.Wait()
239246
}
240247
f.close()
241248
return
242249
}
243250

244251
// appendBuffer appends data to buffer with lock.
245252
func (f *Fluent) appendBuffer(data []byte) error {
246-
f.mubuff.Lock()
247-
defer f.mubuff.Unlock()
248-
if len(f.pending)+len(data) > f.Config.BufferLimit {
249-
return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
253+
select {
254+
case f.pending <- data:
255+
default:
256+
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
250257
}
251-
f.pending = append(f.pending, data...)
252258
return nil
253259
}
254260

@@ -264,8 +270,6 @@ func (f *Fluent) close() {
264270

265271
// connect establishes a new connection using the specified transport.
266272
func (f *Fluent) connect() (err error) {
267-
f.muconn.Lock()
268-
defer f.muconn.Unlock()
269273

270274
switch f.Config.FluentNetwork {
271275
case "tcp":
@@ -275,66 +279,63 @@ func (f *Fluent) connect() (err error) {
275279
default:
276280
err = net.UnknownNetworkError(f.Config.FluentNetwork)
277281
}
282+
return err
283+
}
278284

279-
if err == nil {
280-
f.reconnecting = false
285+
func (f *Fluent) run() {
286+
for {
287+
select {
288+
case entry, ok := <-f.pending:
289+
if !ok {
290+
f.wg.Done()
291+
return
292+
}
293+
err := f.write(entry)
294+
if err != nil {
295+
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
296+
}
297+
}
281298
}
282-
return
283299
}
284300

285301
func e(x, y float64) int {
286302
return int(math.Pow(x, y))
287303
}
288304

289-
func (f *Fluent) reconnect() {
290-
for i := 0; ; i++ {
291-
err := f.connect()
292-
if err == nil {
293-
f.send()
294-
return
295-
}
296-
if i == f.Config.MaxRetry {
297-
// TODO: What we can do when connection failed MaxRetry times?
298-
panic("fluent#reconnect: failed to reconnect!")
299-
}
300-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
301-
if waitTime > f.Config.MaxRetryWait {
302-
waitTime = f.Config.MaxRetryWait
303-
}
304-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
305-
}
306-
}
307-
308-
func (f *Fluent) send() error {
309-
f.muconn.Lock()
310-
defer f.muconn.Unlock()
311-
312-
if f.conn == nil {
313-
if f.reconnecting == false {
314-
f.reconnecting = true
315-
go f.reconnect()
305+
func (f *Fluent) write(data []byte) error {
306+
307+
for i := 0; i < f.Config.MaxRetry; i++ {
308+
309+
// Connect if needed
310+
f.muconn.Lock()
311+
if f.conn == nil {
312+
err := f.connect()
313+
if err != nil {
314+
f.muconn.Unlock()
315+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
316+
if waitTime > f.Config.MaxRetryWait {
317+
waitTime = f.Config.MaxRetryWait
318+
}
319+
time.Sleep(time.Duration(waitTime) * time.Millisecond)
320+
continue
321+
}
316322
}
317-
return errors.New("fluent#send: can't send logs, client is reconnecting")
318-
}
319-
320-
f.mubuff.Lock()
321-
defer f.mubuff.Unlock()
323+
f.muconn.Unlock()
322324

323-
var err error
324-
if len(f.pending) > 0 {
325+
// We're connected, write data
325326
t := f.Config.WriteTimeout
326327
if time.Duration(0) < t {
327328
f.conn.SetWriteDeadline(time.Now().Add(t))
328329
} else {
329330
f.conn.SetWriteDeadline(time.Time{})
330331
}
331-
_, err = f.conn.Write(f.pending)
332+
_, err := f.conn.Write(data)
332333
if err != nil {
333-
f.conn.Close()
334-
f.conn = nil
334+
f.close()
335335
} else {
336-
f.pending = f.pending[:0]
336+
return err
337337
}
338338
}
339-
return err
339+
340+
return errors.New("fluent#write: failed to reconnect")
340341
}

fluent/fluent_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,16 @@ func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) {
140140
}
141141

142142
func Test_send_WritePendingToConn(t *testing.T) {
143-
f := &Fluent{Config: Config{}, reconnecting: false}
143+
f, _ := New(Config{Async: true})
144144

145145
conn := &Conn{}
146146
f.conn = conn
147147

148148
msg := "This is test writing."
149149
bmsg := []byte(msg)
150-
f.pending = append(f.pending, bmsg...)
150+
f.pending <- bmsg
151151

152-
err := f.send()
152+
err := f.write(bmsg)
153153
if err != nil {
154154
t.Error(err)
155155
}
@@ -159,10 +159,12 @@ func Test_send_WritePendingToConn(t *testing.T) {
159159
if string(rcv) != msg {
160160
t.Errorf("got %s, except %s", string(rcv), msg)
161161
}
162+
163+
f.Close()
162164
}
163165

164166
func Test_MarshalAsMsgpack(t *testing.T) {
165-
f := &Fluent{Config: Config{}, reconnecting: false}
167+
f := &Fluent{Config: Config{}}
166168

167169
conn := &Conn{}
168170
f.conn = conn
@@ -193,7 +195,6 @@ func Test_SubSecondPrecision(t *testing.T) {
193195
Config: Config{
194196
SubSecondPrecision: true,
195197
},
196-
reconnecting: false,
197198
}
198199
fluent.conn = &Conn{}
199200

@@ -215,7 +216,7 @@ func Test_SubSecondPrecision(t *testing.T) {
215216
}
216217

217218
func Test_MarshalAsJSON(t *testing.T) {
218-
f := &Fluent{Config: Config{MarshalAsJSON: true}, reconnecting: false}
219+
f := &Fluent{Config: Config{MarshalAsJSON: true}}
219220

220221
conn := &Conn{}
221222
f.conn = conn
@@ -250,11 +251,11 @@ func TestJsonConfig(t *testing.T) {
250251
FluentSocketPath: "/var/tmp/fluent.sock",
251252
Timeout: 3000,
252253
WriteTimeout: 6000,
253-
BufferLimit: 200,
254+
BufferLimit: 10,
254255
RetryWait: 5,
255256
MaxRetry: 3,
256257
TagPrefix: "fluent",
257-
AsyncConnect: false,
258+
Async: false,
258259
MarshalAsJSON: true,
259260
}
260261

@@ -276,8 +277,8 @@ func TestAsyncConnect(t *testing.T) {
276277
ch := make(chan result, 1)
277278
go func() {
278279
config := Config{
279-
FluentPort: 8888,
280-
AsyncConnect: true,
280+
FluentPort: 8888,
281+
Async: true,
281282
}
282283
f, err := New(config)
283284
ch <- result{f: f, err: err}
@@ -291,14 +292,14 @@ func TestAsyncConnect(t *testing.T) {
291292
}
292293
res.f.Close()
293294
case <-time.After(time.Millisecond * 500):
294-
t.Error("AsyncConnect must not block")
295+
t.Error("Async must not block")
295296
}
296297
}
297298

298299
func Test_PostWithTimeNotTimeOut(t *testing.T) {
299300
f, err := New(Config{
300301
FluentPort: 6666,
301-
AsyncConnect: false,
302+
Async: false,
302303
MarshalAsJSON: true, // easy to check equality
303304
})
304305
if err != nil {
@@ -343,7 +344,7 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
343344
func Test_PostMsgpMarshaler(t *testing.T) {
344345
f, err := New(Config{
345346
FluentPort: 6666,
346-
AsyncConnect: false,
347+
Async: false,
347348
MarshalAsJSON: true, // easy to check equality
348349
})
349350
if err != nil {

fluent/testdata/config.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
"fluent_socket_path":"/var/tmp/fluent.sock",
66
"timeout":3000,
77
"write_timeout":6000,
8-
"buffer_limit":200,
8+
"buffer_limit":10,
99
"retry_wait":5,
1010
"max_retry":3,
1111
"tag_prefix":"fluent",
12-
"async_connect": false,
12+
"async": false,
1313
"marshal_as_json": true
1414
}

0 commit comments

Comments
 (0)