Skip to content

Commit d1deca7

Browse files
authored
Merge pull request #49 from jwolski2/eventtime
Define EventTime for marshalling nanosecond-precision timestamps over-the-wire
2 parents b8d749d + 4fa1023 commit d1deca7

File tree

5 files changed

+458
-118
lines changed

5 files changed

+458
-118
lines changed

fluent/fluent.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ const (
2323
defaultRetryWait = 500
2424
defaultMaxRetry = 13
2525
defaultReconnectWaitIncreRate = 1.5
26+
// Default sub-second precision value to false since it is only compatible
27+
// with fluentd versions v0.14 and above.
28+
defaultSubSecondPrecision = false
2629
)
2730

2831
type Config struct {
@@ -38,6 +41,10 @@ type Config struct {
3841
TagPrefix string `json:"tag_prefix"`
3942
AsyncConnect bool `json:"async_connect"`
4043
MarshalAsJSON bool `json:"marshal_as_json"`
44+
45+
// Sub-second precision timestamps are only possible for those using fluentd
46+
// v0.14+ and serializing their messages with msgpack.
47+
SubSecondPrecision bool `json:"sub_second_precision"`
4148
}
4249

4350
type Fluent struct {
@@ -207,6 +214,9 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data
207214
msg := Message{Tag: tag, Time: timeUnix, Record: message}
208215
chunk := &MessageChunk{message: msg}
209216
data, err = json.Marshal(chunk)
217+
} else if f.Config.SubSecondPrecision {
218+
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
219+
data, err = msg.MarshalMsg(nil)
210220
} else {
211221
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
212222
data, err = msg.MarshalMsg(nil)

fluent/fluent_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,33 @@ func Test_MarshalAsMsgpack(t *testing.T) {
187187
}
188188
}
189189

190+
func Test_SubSecondPrecision(t *testing.T) {
191+
// Setup the test subject
192+
fluent := &Fluent{
193+
Config: Config{
194+
SubSecondPrecision: true,
195+
},
196+
reconnecting: false,
197+
}
198+
fluent.conn = &Conn{}
199+
200+
// Exercise the test subject
201+
timestamp := time.Unix(1267867237, 256)
202+
encodedData, err := fluent.EncodeData("tag", timestamp, map[string]string{
203+
"foo": "bar",
204+
})
205+
206+
// Assert no encoding errors and that the timestamp has been encoded into
207+
// the message as expected.
208+
if err != nil {
209+
t.Error(err)
210+
}
211+
212+
expected := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
213+
actual := string(encodedData)
214+
assert.Equal(t, expected, actual)
215+
}
216+
190217
func Test_MarshalAsJSON(t *testing.T) {
191218
f := &Fluent{Config: Config{MarshalAsJSON: true}, reconnecting: false}
192219

fluent/proto.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
package fluent
44

5+
import (
6+
"time"
7+
8+
"github.com/tinylib/msgp/msgp"
9+
)
10+
511
//msgp:tuple Entry
612
type Entry struct {
713
Time int64 `msg:"time"`
@@ -22,3 +28,69 @@ type Message struct {
2228
Record interface{} `msg:"record"`
2329
Option interface{} `msg:"option"`
2430
}
31+
32+
//msgp:tuple MessageExt
33+
type MessageExt struct {
34+
Tag string `msg:"tag"`
35+
Time EventTime `msg:"time,extension"`
36+
Record interface{} `msg:"record"`
37+
Option interface{} `msg:"option"`
38+
}
39+
40+
// EventTime is an extension to the serialized time value. It builds in support
41+
// for sub-second (nanosecond) precision in serialized timestamps.
42+
//
43+
// You can find the full specification for the msgpack message payload here:
44+
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.
45+
//
46+
// You can find more information on msgpack extension types here:
47+
// https://github.com/tinylib/msgp/wiki/Using-Extensions.
48+
type EventTime time.Time
49+
50+
const (
51+
extensionType = 0
52+
length = 8
53+
)
54+
55+
func init() {
56+
msgp.RegisterExtension(extensionType, func() msgp.Extension { return new(EventTime) })
57+
}
58+
59+
func (t *EventTime) ExtensionType() int8 { return extensionType }
60+
61+
func (t *EventTime) Len() int { return length }
62+
63+
func (t *EventTime) MarshalBinaryTo(b []byte) error {
64+
// Unwrap to Golang time
65+
goTime := time.Time(*t)
66+
67+
// There's no support for timezones in fluentd's protocol for EventTime.
68+
// Convert to UTC.
69+
utc := goTime.UTC()
70+
71+
// Warning! Converting seconds to an int32 is a lossy operation. This code
72+
// will hit the "Year 2038" problem.
73+
sec := int32(utc.Unix())
74+
nsec := utc.Nanosecond()
75+
76+
// Fill the buffer with 4 bytes for the second component of the timestamp.
77+
b[0] = byte(sec >> 24)
78+
b[1] = byte(sec >> 16)
79+
b[2] = byte(sec >> 8)
80+
b[3] = byte(sec)
81+
82+
// Fill the buffer with 4 bytes for the nanosecond component of the
83+
// timestamp.
84+
b[4] = byte(nsec >> 24)
85+
b[5] = byte(nsec >> 16)
86+
b[6] = byte(nsec >> 8)
87+
b[7] = byte(nsec)
88+
89+
return nil
90+
}
91+
92+
// UnmarshalBinary is not implemented since decoding messages is not supported
93+
// by this library.
94+
func (t *EventTime) UnmarshalBinary(b []byte) error {
95+
return nil
96+
}

0 commit comments

Comments
 (0)