Skip to content

Commit 7936827

Browse files
authored
Merge pull request #51 from fujiwara/msgp-marshaler
Enable to post msgp.Marshaler directly
2 parents d1deca7 + 6bdaf30 commit 7936827

File tree

6 files changed

+321
-1
lines changed

6 files changed

+321
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func main() {
5151
}
5252
```
5353

54-
`data` must be a value like `map[string]literal`, `map[string]interface{}` or `struct`. Logger refers tags `msg` or `codec` of each fields of structs.
54+
`data` must be a value like `map[string]literal`, `map[string]interface{}`, `struct` or [`msgp.Marshaler`](http://godoc.org/github.com/tinylib/msgp/msgp#Marshaler). Logger refers tags `msg` or `codec` of each fields of structs.
5555

5656
## Setting config values
5757

fluent/fluent.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"strconv"
1111
"sync"
1212
"time"
13+
14+
"github.com/tinylib/msgp/msgp"
1315
)
1416

1517
const (
@@ -135,6 +137,10 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
135137
tag = f.TagPrefix + "." + tag
136138
}
137139

140+
if m, ok := message.(msgp.Marshaler); ok {
141+
return f.EncodeAndPostData(tag, tm, m)
142+
}
143+
138144
msg := reflect.ValueOf(message)
139145
msgtype := msg.Type()
140146

fluent/fluent_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,47 @@ func Test_PostWithTimeNotTimeOut(t *testing.T) {
340340
}
341341
}
342342

343+
func Test_PostMsgpMarshaler(t *testing.T) {
344+
f, err := New(Config{
345+
FluentPort: 6666,
346+
AsyncConnect: false,
347+
MarshalAsJSON: true, // easy to check equality
348+
})
349+
if err != nil {
350+
t.Error(err)
351+
return
352+
}
353+
354+
var testData = []struct {
355+
in *TestMessage
356+
out string
357+
}{
358+
{
359+
&TestMessage{Foo: "bar"},
360+
"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
361+
},
362+
}
363+
for _, tt := range testData {
364+
conn := &Conn{}
365+
f.conn = conn
366+
367+
err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in)
368+
if err != nil {
369+
t.Errorf("in=%s, err=%s", tt.in, err)
370+
}
371+
372+
rcv := make([]byte, len(conn.buf))
373+
_, err = conn.Read(rcv)
374+
if string(rcv) != tt.out {
375+
t.Errorf("got %s, except %s", string(rcv), tt.out)
376+
}
377+
378+
if !conn.writeDeadline.IsZero() {
379+
t.Errorf("got %s, except 0", conn.writeDeadline)
380+
}
381+
}
382+
}
383+
343384
func Benchmark_PostWithShortMessage(b *testing.B) {
344385
b.StopTimer()
345386
f, err := New(Config{})
@@ -466,6 +507,22 @@ func Benchmark_PostWithMapString(b *testing.B) {
466507
}
467508
}
468509

510+
func Benchmark_PostWithMsgpMarshaler(b *testing.B) {
511+
b.StopTimer()
512+
f, err := New(Config{})
513+
if err != nil {
514+
panic(err)
515+
}
516+
517+
b.StartTimer()
518+
data := &TestMessage{Foo: "bar"}
519+
for i := 0; i < b.N; i++ {
520+
if err := f.Post("tag", data); err != nil {
521+
panic(err)
522+
}
523+
}
524+
}
525+
469526
func Benchmark_PostWithMapSlice(b *testing.B) {
470527
b.StopTimer()
471528
f, err := New(Config{})

fluent/test_message.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package fluent
2+
3+
//go:generate msgp
4+
type TestMessage struct {
5+
Foo string `msg:"foo" json:"foo,omitempty"`
6+
Hoge string `msg:"hoge" json:"hoge,omitempty"`
7+
}

fluent/test_message_gen.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package fluent
2+
3+
// NOTE: THIS FILE WAS PRODUCED BY THE
4+
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
5+
// DO NOT EDIT
6+
7+
import (
8+
"github.com/tinylib/msgp/msgp"
9+
)
10+
11+
// DecodeMsg implements msgp.Decodable
12+
func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) {
13+
var field []byte
14+
_ = field
15+
var zxvk uint32
16+
zxvk, err = dc.ReadMapHeader()
17+
if err != nil {
18+
return
19+
}
20+
for zxvk > 0 {
21+
zxvk--
22+
field, err = dc.ReadMapKeyPtr()
23+
if err != nil {
24+
return
25+
}
26+
switch msgp.UnsafeString(field) {
27+
case "foo":
28+
z.Foo, err = dc.ReadString()
29+
if err != nil {
30+
return
31+
}
32+
case "hoge":
33+
z.Hoge, err = dc.ReadString()
34+
if err != nil {
35+
return
36+
}
37+
default:
38+
err = dc.Skip()
39+
if err != nil {
40+
return
41+
}
42+
}
43+
}
44+
return
45+
}
46+
47+
// EncodeMsg implements msgp.Encodable
48+
func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) {
49+
// map header, size 2
50+
// write "foo"
51+
err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f)
52+
if err != nil {
53+
return err
54+
}
55+
err = en.WriteString(z.Foo)
56+
if err != nil {
57+
return
58+
}
59+
// write "hoge"
60+
err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65)
61+
if err != nil {
62+
return err
63+
}
64+
err = en.WriteString(z.Hoge)
65+
if err != nil {
66+
return
67+
}
68+
return
69+
}
70+
71+
// MarshalMsg implements msgp.Marshaler
72+
func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) {
73+
o = msgp.Require(b, z.Msgsize())
74+
// map header, size 2
75+
// string "foo"
76+
o = append(o, 0x82, 0xa3, 0x66, 0x6f, 0x6f)
77+
o = msgp.AppendString(o, z.Foo)
78+
// string "hoge"
79+
o = append(o, 0xa4, 0x68, 0x6f, 0x67, 0x65)
80+
o = msgp.AppendString(o, z.Hoge)
81+
return
82+
}
83+
84+
// UnmarshalMsg implements msgp.Unmarshaler
85+
func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) {
86+
var field []byte
87+
_ = field
88+
var zbzg uint32
89+
zbzg, bts, err = msgp.ReadMapHeaderBytes(bts)
90+
if err != nil {
91+
return
92+
}
93+
for zbzg > 0 {
94+
zbzg--
95+
field, bts, err = msgp.ReadMapKeyZC(bts)
96+
if err != nil {
97+
return
98+
}
99+
switch msgp.UnsafeString(field) {
100+
case "foo":
101+
z.Foo, bts, err = msgp.ReadStringBytes(bts)
102+
if err != nil {
103+
return
104+
}
105+
case "hoge":
106+
z.Hoge, bts, err = msgp.ReadStringBytes(bts)
107+
if err != nil {
108+
return
109+
}
110+
default:
111+
bts, err = msgp.Skip(bts)
112+
if err != nil {
113+
return
114+
}
115+
}
116+
}
117+
o = bts
118+
return
119+
}
120+
121+
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
122+
func (z TestMessage) Msgsize() (s int) {
123+
s = 1 + 4 + msgp.StringPrefixSize + len(z.Foo) + 5 + msgp.StringPrefixSize + len(z.Hoge)
124+
return
125+
}

fluent/test_message_gen_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package fluent
2+
3+
// NOTE: THIS FILE WAS PRODUCED BY THE
4+
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
5+
// DO NOT EDIT
6+
7+
import (
8+
"bytes"
9+
"testing"
10+
11+
"github.com/tinylib/msgp/msgp"
12+
)
13+
14+
func TestMarshalUnmarshalTestMessage(t *testing.T) {
15+
v := TestMessage{}
16+
bts, err := v.MarshalMsg(nil)
17+
if err != nil {
18+
t.Fatal(err)
19+
}
20+
left, err := v.UnmarshalMsg(bts)
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
if len(left) > 0 {
25+
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
26+
}
27+
28+
left, err = msgp.Skip(bts)
29+
if err != nil {
30+
t.Fatal(err)
31+
}
32+
if len(left) > 0 {
33+
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
34+
}
35+
}
36+
37+
func BenchmarkMarshalMsgTestMessage(b *testing.B) {
38+
v := TestMessage{}
39+
b.ReportAllocs()
40+
b.ResetTimer()
41+
for i := 0; i < b.N; i++ {
42+
v.MarshalMsg(nil)
43+
}
44+
}
45+
46+
func BenchmarkAppendMsgTestMessage(b *testing.B) {
47+
v := TestMessage{}
48+
bts := make([]byte, 0, v.Msgsize())
49+
bts, _ = v.MarshalMsg(bts[0:0])
50+
b.SetBytes(int64(len(bts)))
51+
b.ReportAllocs()
52+
b.ResetTimer()
53+
for i := 0; i < b.N; i++ {
54+
bts, _ = v.MarshalMsg(bts[0:0])
55+
}
56+
}
57+
58+
func BenchmarkUnmarshalTestMessage(b *testing.B) {
59+
v := TestMessage{}
60+
bts, _ := v.MarshalMsg(nil)
61+
b.ReportAllocs()
62+
b.SetBytes(int64(len(bts)))
63+
b.ResetTimer()
64+
for i := 0; i < b.N; i++ {
65+
_, err := v.UnmarshalMsg(bts)
66+
if err != nil {
67+
b.Fatal(err)
68+
}
69+
}
70+
}
71+
72+
func TestEncodeDecodeTestMessage(t *testing.T) {
73+
v := TestMessage{}
74+
var buf bytes.Buffer
75+
msgp.Encode(&buf, &v)
76+
77+
m := v.Msgsize()
78+
if buf.Len() > m {
79+
t.Logf("WARNING: Msgsize() for %v is inaccurate", v)
80+
}
81+
82+
vn := TestMessage{}
83+
err := msgp.Decode(&buf, &vn)
84+
if err != nil {
85+
t.Error(err)
86+
}
87+
88+
buf.Reset()
89+
msgp.Encode(&buf, &v)
90+
err = msgp.NewReader(&buf).Skip()
91+
if err != nil {
92+
t.Error(err)
93+
}
94+
}
95+
96+
func BenchmarkEncodeTestMessage(b *testing.B) {
97+
v := TestMessage{}
98+
var buf bytes.Buffer
99+
msgp.Encode(&buf, &v)
100+
b.SetBytes(int64(buf.Len()))
101+
en := msgp.NewWriter(msgp.Nowhere)
102+
b.ReportAllocs()
103+
b.ResetTimer()
104+
for i := 0; i < b.N; i++ {
105+
v.EncodeMsg(en)
106+
}
107+
en.Flush()
108+
}
109+
110+
func BenchmarkDecodeTestMessage(b *testing.B) {
111+
v := TestMessage{}
112+
var buf bytes.Buffer
113+
msgp.Encode(&buf, &v)
114+
b.SetBytes(int64(buf.Len()))
115+
rd := msgp.NewEndlessReader(buf.Bytes(), b)
116+
dc := msgp.NewReader(rd)
117+
b.ReportAllocs()
118+
b.ResetTimer()
119+
for i := 0; i < b.N; i++ {
120+
err := v.DecodeMsg(dc)
121+
if err != nil {
122+
b.Fatal(err)
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)