Skip to content

Commit 96bbaa1

Browse files
committed
Acknowledgment mechanism added
Signed-off-by: Gergely Szabo <gergely.szabo@origoss.com>
1 parent 90f0f02 commit 96bbaa1

File tree

8 files changed

+699
-151
lines changed

8 files changed

+699
-151
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
6464
Sets the timeout for Write call of logger.Post.
6565
Since the default is zero value, Write will not time out.
6666

67+
### RequestAck
68+
69+
Sets whether to request acknowledgment from Fluentd to increase the reliability
70+
of the connection. The default is false.
71+
6772
## Tests
6873
```
6974
go test

fluent/fluent.go

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
"sync"
1313
"time"
1414

15+
"bytes"
16+
"encoding/base64"
17+
"encoding/binary"
1518
"github.com/tinylib/msgp/msgp"
19+
"math/rand"
1620
)
1721

1822
const (
@@ -52,12 +56,22 @@ type Config struct {
5256
// Sub-second precision timestamps are only possible for those using fluentd
5357
// v0.14+ and serializing their messages with msgpack.
5458
SubSecondPrecision bool `json:"sub_second_precision"`
59+
60+
// RequestAck sends the chunk option with a unique ID. The server will
61+
// respond with an acknowledgement. This option improves the reliability
62+
// of the message transmission.
63+
RequestAck bool `json:"request_ack"`
64+
}
65+
66+
type msgToSend struct {
67+
data []byte
68+
ack string
5569
}
5670

5771
type Fluent struct {
5872
Config
5973

60-
pending chan []byte
74+
pending chan *msgToSend
6175
wg sync.WaitGroup
6276

6377
muconn sync.Mutex
@@ -103,7 +117,7 @@ func New(config Config) (f *Fluent, err error) {
103117
if config.Async {
104118
f = &Fluent{
105119
Config: config,
106-
pending: make(chan []byte, config.BufferLimit),
120+
pending: make(chan *msgToSend, config.BufferLimit),
107121
}
108122
f.wg.Add(1)
109123
go f.run()
@@ -188,7 +202,7 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
188202
}
189203

190204
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
191-
var data []byte
205+
var data *msgToSend
192206
var err error
193207
if data, err = f.EncodeData(tag, tm, message); err != nil {
194208
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
@@ -197,11 +211,11 @@ func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}
197211
}
198212

199213
// Deprecated: Use EncodeAndPostData instead
200-
func (f *Fluent) PostRawData(data []byte) {
214+
func (f *Fluent) PostRawData(data *msgToSend) {
201215
f.postRawData(data)
202216
}
203217

204-
func (f *Fluent) postRawData(data []byte) error {
218+
func (f *Fluent) postRawData(data *msgToSend) error {
205219
if f.Config.Async {
206220
return f.appendBuffer(data)
207221
}
@@ -219,22 +233,51 @@ type MessageChunk struct {
219233
// So, it should write JSON marshaler by hand.
220234
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
221235
data, err := json.Marshal(chunk.message.Record)
222-
return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
223-
chunk.message.Time, data)), err
236+
if err != nil {
237+
return nil, err
238+
}
239+
option, err := json.Marshal(chunk.message.Option)
240+
if err != nil {
241+
return nil, err
242+
}
243+
return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
244+
chunk.message.Time, data, option)), err
224245
}
225246

226-
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
247+
// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
248+
// mechanism, see
249+
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
250+
func getUniqueID(timeUnix int64) string {
251+
buf := bytes.NewBuffer(nil)
252+
enc := base64.NewEncoder(base64.StdEncoding, buf)
253+
if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
254+
panic(err)
255+
}
256+
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
257+
panic(err)
258+
}
259+
enc.Close()
260+
return buf.String()
261+
}
262+
263+
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data *msgToSend, err error) {
264+
option := make(map[string]string)
265+
data = &msgToSend{}
227266
timeUnix := tm.Unix()
267+
if f.Config.RequestAck {
268+
data.ack = getUniqueID(timeUnix)
269+
option["chunk"] = data.ack
270+
}
228271
if f.Config.MarshalAsJSON {
229-
msg := Message{Tag: tag, Time: timeUnix, Record: message}
272+
msg := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
230273
chunk := &MessageChunk{message: msg}
231-
data, err = json.Marshal(chunk)
274+
data.data, err = json.Marshal(chunk)
232275
} else if f.Config.SubSecondPrecision {
233-
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
234-
data, err = msg.MarshalMsg(nil)
276+
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
277+
data.data, err = msg.MarshalMsg(nil)
235278
} else {
236-
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
237-
data, err = msg.MarshalMsg(nil)
279+
msg := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
280+
data.data, err = msg.MarshalMsg(nil)
238281
}
239282
return
240283
}
@@ -250,7 +293,7 @@ func (f *Fluent) Close() (err error) {
250293
}
251294

252295
// appendBuffer appends data to buffer with lock.
253-
func (f *Fluent) appendBuffer(data []byte) error {
296+
func (f *Fluent) appendBuffer(data *msgToSend) error {
254297
select {
255298
case f.pending <- data:
256299
default:
@@ -303,7 +346,7 @@ func e(x, y float64) int {
303346
return int(math.Pow(x, y))
304347
}
305348

306-
func (f *Fluent) write(data []byte) error {
349+
func (f *Fluent) write(data *msgToSend) error {
307350

308351
for i := 0; i < f.Config.MaxRetry; i++ {
309352

@@ -330,10 +373,25 @@ func (f *Fluent) write(data []byte) error {
330373
} else {
331374
f.conn.SetWriteDeadline(time.Time{})
332375
}
333-
_, err := f.conn.Write(data)
376+
_, err := f.conn.Write(data.data)
334377
if err != nil {
335378
f.close()
336379
} else {
380+
// Acknowledgment check
381+
if data.ack != "" {
382+
ack := &AckResp{}
383+
if f.Config.MarshalAsJSON {
384+
dec := json.NewDecoder(f.conn)
385+
err = dec.Decode(ack)
386+
} else {
387+
r := msgp.NewReader(f.conn)
388+
err = ack.DecodeMsg(r)
389+
}
390+
if err != nil || ack.Ack != data.ack {
391+
f.close()
392+
continue
393+
}
394+
}
337395
return err
338396
}
339397
}

fluent/fluent_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func Test_send_WritePendingToConn(t *testing.T) {
146146
f.conn = conn
147147

148148
msg := "This is test writing."
149-
bmsg := []byte(msg)
149+
bmsg := &msgToSend{data: []byte(msg)}
150150
f.pending <- bmsg
151151

152152
err := f.write(bmsg)
@@ -155,7 +155,7 @@ func Test_send_WritePendingToConn(t *testing.T) {
155155
}
156156

157157
rcv := make([]byte, len(conn.buf))
158-
_, err = conn.Read(rcv)
158+
_, _ = conn.Read(rcv)
159159
if string(rcv) != msg {
160160
t.Errorf("got %s, except %s", string(rcv), msg)
161161
}
@@ -179,13 +179,13 @@ func Test_MarshalAsMsgpack(t *testing.T) {
179179
if err != nil {
180180
t.Error(err)
181181
}
182-
actual := string(result)
182+
actual := string(result.data)
183183

184184
// map entries are disordered in golang
185-
expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\xC0"
186-
expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\xC0"
185+
expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\x80"
186+
expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\x80"
187187
if actual != expected1 && actual != expected2 {
188-
t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2)
188+
t.Errorf("got %+v,\n except %+v\n or %+v", actual, expected1, expected2)
189189
}
190190
}
191191

@@ -211,11 +211,11 @@ func Test_SubSecondPrecision(t *testing.T) {
211211
}
212212

213213
// 8 bytes timestamp can be represented using ext 8 or fixext 8
214-
expected1 := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
215-
expected2 := "\x94\xa3tag\xD7\x00K\x92\x1Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xc0"
216-
actual := string(encodedData)
214+
expected1 := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\x80"
215+
expected2 := "\x94\xa3tag\xD7\x00K\x92\x1Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\x80"
216+
actual := string(encodedData.data)
217217
if actual != expected1 && actual != expected2 {
218-
t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2)
218+
t.Errorf("got %+v,\n except %+v\n or %+v", actual, expected1, expected2)
219219
}
220220
}
221221

@@ -235,8 +235,8 @@ func Test_MarshalAsJSON(t *testing.T) {
235235
t.Error(err)
236236
}
237237
// json.Encode marshals map keys in the order, so this expectation is safe
238-
expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},null]`
239-
actual := string(result)
238+
expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},{}]`
239+
actual := string(result.data)
240240
if actual != expected {
241241
t.Errorf("got %s, except %s", actual, expected)
242242
}
@@ -317,11 +317,11 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
317317
}{
318318
{
319319
map[string]string{"foo": "bar"},
320-
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
320+
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
321321
},
322322
{
323323
map[string]string{"fuga": "bar", "hoge": "fuga"},
324-
"[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},null]",
324+
"[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},{}]",
325325
},
326326
}
327327
for _, tt := range testData {
@@ -334,7 +334,7 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
334334
}
335335

336336
rcv := make([]byte, len(conn.buf))
337-
_, err = conn.Read(rcv)
337+
_, _ = conn.Read(rcv)
338338
if string(rcv) != tt.out {
339339
t.Errorf("got %s, except %s", string(rcv), tt.out)
340340
}
@@ -362,7 +362,7 @@ func Test_PostMsgpMarshaler(t *testing.T) {
362362
}{
363363
{
364364
&TestMessage{Foo: "bar"},
365-
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
365+
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
366366
},
367367
}
368368
for _, tt := range testData {
@@ -375,7 +375,7 @@ func Test_PostMsgpMarshaler(t *testing.T) {
375375
}
376376

377377
rcv := make([]byte, len(conn.buf))
378-
_, err = conn.Read(rcv)
378+
_, _ = conn.Read(rcv)
379379
if string(rcv) != tt.out {
380380
t.Errorf("got %s, except %s", string(rcv), tt.out)
381381
}

fluent/proto.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,29 @@ type Entry struct {
1616

1717
//msgp:tuple Forward
1818
type Forward struct {
19-
Tag string `msg:"tag"`
20-
Entries []Entry `msg:"entries"`
21-
Option interface{} `msg:"option"`
19+
Tag string `msg:"tag"`
20+
Entries []Entry `msg:"entries"`
21+
Option map[string]string
2222
}
2323

2424
//msgp:tuple Message
2525
type Message struct {
2626
Tag string `msg:"tag"`
2727
Time int64 `msg:"time"`
2828
Record interface{} `msg:"record"`
29-
Option interface{} `msg:"option"`
29+
Option map[string]string
3030
}
3131

3232
//msgp:tuple MessageExt
3333
type MessageExt struct {
3434
Tag string `msg:"tag"`
3535
Time EventTime `msg:"time,extension"`
3636
Record interface{} `msg:"record"`
37-
Option interface{} `msg:"option"`
37+
Option map[string]string
38+
}
39+
40+
type AckResp struct {
41+
Ack string `json:"ack" msg:"ack"`
3842
}
3943

4044
// EventTime is an extension to the serialized time value. It builds in support

0 commit comments

Comments
 (0)