Skip to content

Commit 62732b6

Browse files
authored
Merge pull request #63 from origoss/ack
Acknowledgment mechanism added
2 parents cfc3931 + 8d1dd5c commit 62732b6

File tree

8 files changed

+715
-159
lines changed

8 files changed

+715
-159
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
6464
Sets the timeout for Write call of logger.Post.
6565
Since the default is zero value, Write will not time out.
6666

67+
### RequestAck
68+
69+
Sets whether to request acknowledgment from Fluentd to increase the reliability
70+
of the connection. The default is false.
71+
6772
## Tests
6873
```
6974
go test

fluent/fluent.go

Lines changed: 91 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
"sync"
1313
"time"
1414

15+
"bytes"
16+
"encoding/base64"
17+
"encoding/binary"
1518
"github.com/tinylib/msgp/msgp"
19+
"math/rand"
1620
)
1721

1822
const (
@@ -52,12 +56,22 @@ type Config struct {
5256
// Sub-second precision timestamps are only possible for those using fluentd
5357
// v0.14+ and serializing their messages with msgpack.
5458
SubSecondPrecision bool `json:"sub_second_precision"`
59+
60+
// RequestAck sends the chunk option with a unique ID. The server will
61+
// respond with an acknowledgement. This option improves the reliability
62+
// of the message transmission.
63+
RequestAck bool `json:"request_ack"`
64+
}
65+
66+
type msgToSend struct {
67+
data []byte
68+
ack string
5569
}
5670

5771
type Fluent struct {
5872
Config
5973

60-
pending chan []byte
74+
pending chan *msgToSend
6175
wg sync.WaitGroup
6276

6377
muconn sync.Mutex
@@ -103,7 +117,7 @@ func New(config Config) (f *Fluent, err error) {
103117
if config.Async {
104118
f = &Fluent{
105119
Config: config,
106-
pending: make(chan []byte, config.BufferLimit),
120+
pending: make(chan *msgToSend, config.BufferLimit),
107121
}
108122
f.wg.Add(1)
109123
go f.run()
@@ -188,25 +202,25 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
188202
}
189203

190204
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
191-
var data []byte
205+
var msg *msgToSend
192206
var err error
193-
if data, err = f.EncodeData(tag, tm, message); err != nil {
207+
if msg, err = f.EncodeData(tag, tm, message); err != nil {
194208
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
195209
}
196-
return f.postRawData(data)
210+
return f.postRawData(msg)
197211
}
198212

199213
// Deprecated: Use EncodeAndPostData instead
200-
func (f *Fluent) PostRawData(data []byte) {
201-
f.postRawData(data)
214+
func (f *Fluent) PostRawData(msg *msgToSend) {
215+
f.postRawData(msg)
202216
}
203217

204-
func (f *Fluent) postRawData(data []byte) error {
218+
func (f *Fluent) postRawData(msg *msgToSend) error {
205219
if f.Config.Async {
206-
return f.appendBuffer(data)
220+
return f.appendBuffer(msg)
207221
}
208222
// Synchronous write
209-
return f.write(data)
223+
return f.write(msg)
210224
}
211225

212226
// For sending forward protocol adopted JSON
@@ -219,22 +233,59 @@ type MessageChunk struct {
219233
// So, it should write JSON marshaler by hand.
220234
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
221235
data, err := json.Marshal(chunk.message.Record)
222-
return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
223-
chunk.message.Time, data)), err
236+
if err != nil {
237+
return nil, err
238+
}
239+
option, err := json.Marshal(chunk.message.Option)
240+
if err != nil {
241+
return nil, err
242+
}
243+
return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
244+
chunk.message.Time, data, option)), err
224245
}
225246

226-
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
247+
// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
248+
// mechanism, see
249+
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
250+
func getUniqueID(timeUnix int64) (string, error) {
251+
buf := bytes.NewBuffer(nil)
252+
enc := base64.NewEncoder(base64.StdEncoding, buf)
253+
if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
254+
enc.Close()
255+
return "", err
256+
}
257+
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
258+
enc.Close()
259+
return "", err
260+
}
261+
// encoder needs to be closed before buf.String(), defer does not work
262+
// here
263+
enc.Close()
264+
return buf.String(), nil
265+
}
266+
267+
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
268+
option := make(map[string]string)
269+
msg = &msgToSend{}
227270
timeUnix := tm.Unix()
271+
if f.Config.RequestAck {
272+
var err error
273+
msg.ack, err = getUniqueID(timeUnix)
274+
if err != nil {
275+
return nil, err
276+
}
277+
option["chunk"] = msg.ack
278+
}
228279
if f.Config.MarshalAsJSON {
229-
msg := Message{Tag: tag, Time: timeUnix, Record: message}
230-
chunk := &MessageChunk{message: msg}
231-
data, err = json.Marshal(chunk)
280+
m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
281+
chunk := &MessageChunk{message: m}
282+
msg.data, err = json.Marshal(chunk)
232283
} else if f.Config.SubSecondPrecision {
233-
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
234-
data, err = msg.MarshalMsg(nil)
284+
m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
285+
msg.data, err = m.MarshalMsg(nil)
235286
} else {
236-
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
237-
data, err = msg.MarshalMsg(nil)
287+
m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
288+
msg.data, err = m.MarshalMsg(nil)
238289
}
239290
return
240291
}
@@ -250,9 +301,9 @@ func (f *Fluent) Close() (err error) {
250301
}
251302

252303
// appendBuffer appends data to buffer with lock.
253-
func (f *Fluent) appendBuffer(data []byte) error {
304+
func (f *Fluent) appendBuffer(msg *msgToSend) error {
254305
select {
255-
case f.pending <- data:
306+
case f.pending <- msg:
256307
default:
257308
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
258309
}
@@ -303,7 +354,7 @@ func e(x, y float64) int {
303354
return int(math.Pow(x, y))
304355
}
305356

306-
func (f *Fluent) write(data []byte) error {
357+
func (f *Fluent) write(msg *msgToSend) error {
307358

308359
for i := 0; i < f.Config.MaxRetry; i++ {
309360

@@ -323,17 +374,32 @@ func (f *Fluent) write(data []byte) error {
323374
}
324375
f.muconn.Unlock()
325376

326-
// We're connected, write data
377+
// We're connected, write msg
327378
t := f.Config.WriteTimeout
328379
if time.Duration(0) < t {
329380
f.conn.SetWriteDeadline(time.Now().Add(t))
330381
} else {
331382
f.conn.SetWriteDeadline(time.Time{})
332383
}
333-
_, err := f.conn.Write(data)
384+
_, err := f.conn.Write(msg.data)
334385
if err != nil {
335386
f.close()
336387
} else {
388+
// Acknowledgment check
389+
if msg.ack != "" {
390+
resp := &AckResp{}
391+
if f.Config.MarshalAsJSON {
392+
dec := json.NewDecoder(f.conn)
393+
err = dec.Decode(resp)
394+
} else {
395+
r := msgp.NewReader(f.conn)
396+
err = resp.DecodeMsg(r)
397+
}
398+
if err != nil || resp.Ack != msg.ack {
399+
f.close()
400+
continue
401+
}
402+
}
337403
return err
338404
}
339405
}

fluent/fluent_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func Test_send_WritePendingToConn(t *testing.T) {
146146
f.conn = conn
147147

148148
msg := "This is test writing."
149-
bmsg := []byte(msg)
149+
bmsg := &msgToSend{data: []byte(msg)}
150150
f.pending <- bmsg
151151

152152
err := f.write(bmsg)
@@ -155,7 +155,7 @@ func Test_send_WritePendingToConn(t *testing.T) {
155155
}
156156

157157
rcv := make([]byte, len(conn.buf))
158-
_, err = conn.Read(rcv)
158+
_, _ = conn.Read(rcv)
159159
if string(rcv) != msg {
160160
t.Errorf("got %s, except %s", string(rcv), msg)
161161
}
@@ -179,13 +179,13 @@ func Test_MarshalAsMsgpack(t *testing.T) {
179179
if err != nil {
180180
t.Error(err)
181181
}
182-
actual := string(result)
182+
actual := string(result.data)
183183

184184
// map entries are disordered in golang
185-
expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\xC0"
186-
expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\xC0"
185+
expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\x80"
186+
expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\x80"
187187
if actual != expected1 && actual != expected2 {
188-
t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2)
188+
t.Errorf("got %+v,\n except %+v\n or %+v", actual, expected1, expected2)
189189
}
190190
}
191191

@@ -211,11 +211,11 @@ func Test_SubSecondPrecision(t *testing.T) {
211211
}
212212

213213
// 8 bytes timestamp can be represented using ext 8 or fixext 8
214-
expected1 := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
215-
expected2 := "\x94\xa3tag\xD7\x00K\x92\x1Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xc0"
216-
actual := string(encodedData)
214+
expected1 := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\x80"
215+
expected2 := "\x94\xa3tag\xD7\x00K\x92\x1Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\x80"
216+
actual := string(encodedData.data)
217217
if actual != expected1 && actual != expected2 {
218-
t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2)
218+
t.Errorf("got %+v,\n except %+v\n or %+v", actual, expected1, expected2)
219219
}
220220
}
221221

@@ -235,8 +235,8 @@ func Test_MarshalAsJSON(t *testing.T) {
235235
t.Error(err)
236236
}
237237
// json.Encode marshals map keys in the order, so this expectation is safe
238-
expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},null]`
239-
actual := string(result)
238+
expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},{}]`
239+
actual := string(result.data)
240240
if actual != expected {
241241
t.Errorf("got %s, except %s", actual, expected)
242242
}
@@ -317,11 +317,11 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
317317
}{
318318
{
319319
map[string]string{"foo": "bar"},
320-
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
320+
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
321321
},
322322
{
323323
map[string]string{"fuga": "bar", "hoge": "fuga"},
324-
"[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},null]",
324+
"[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},{}]",
325325
},
326326
}
327327
for _, tt := range testData {
@@ -334,7 +334,7 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
334334
}
335335

336336
rcv := make([]byte, len(conn.buf))
337-
_, err = conn.Read(rcv)
337+
_, _ = conn.Read(rcv)
338338
if string(rcv) != tt.out {
339339
t.Errorf("got %s, except %s", string(rcv), tt.out)
340340
}
@@ -362,7 +362,7 @@ func Test_PostMsgpMarshaler(t *testing.T) {
362362
}{
363363
{
364364
&TestMessage{Foo: "bar"},
365-
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
365+
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
366366
},
367367
}
368368
for _, tt := range testData {
@@ -375,7 +375,7 @@ func Test_PostMsgpMarshaler(t *testing.T) {
375375
}
376376

377377
rcv := make([]byte, len(conn.buf))
378-
_, err = conn.Read(rcv)
378+
_, _ = conn.Read(rcv)
379379
if string(rcv) != tt.out {
380380
t.Errorf("got %s, except %s", string(rcv), tt.out)
381381
}

fluent/proto.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,29 @@ type Entry struct {
1616

1717
//msgp:tuple Forward
1818
type Forward struct {
19-
Tag string `msg:"tag"`
20-
Entries []Entry `msg:"entries"`
21-
Option interface{} `msg:"option"`
19+
Tag string `msg:"tag"`
20+
Entries []Entry `msg:"entries"`
21+
Option map[string]string
2222
}
2323

2424
//msgp:tuple Message
2525
type Message struct {
2626
Tag string `msg:"tag"`
2727
Time int64 `msg:"time"`
2828
Record interface{} `msg:"record"`
29-
Option interface{} `msg:"option"`
29+
Option map[string]string
3030
}
3131

3232
//msgp:tuple MessageExt
3333
type MessageExt struct {
3434
Tag string `msg:"tag"`
3535
Time EventTime `msg:"time,extension"`
3636
Record interface{} `msg:"record"`
37-
Option interface{} `msg:"option"`
37+
Option map[string]string
38+
}
39+
40+
type AckResp struct {
41+
Ack string `json:"ack" msg:"ack"`
3842
}
3943

4044
// EventTime is an extension to the serialized time value. It builds in support

0 commit comments

Comments
 (0)