-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlistener.go
More file actions
104 lines (91 loc) · 1.93 KB
/
listener.go
File metadata and controls
104 lines (91 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package mongoproxy
import (
"net"
"sync"
"time"
"github.com/golang/glog"
)
// Listener listen's on a particular port
type Listener struct {
// Addr is the address to listen on e.g. "0.0.0.0:6666"
Addr string
Timeout time.Duration
listener net.Listener
dispatcher *Dispatcher
wg sync.WaitGroup
isRunning atomicBool
metrics *listenerMetrics
}
type atomicBool struct {
value bool
mutex sync.RWMutex
}
func (a *atomicBool) Get() bool {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.value
}
func (a *atomicBool) Set(b bool) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.value = b
}
// Stop stops the listener
func (l *Listener) Stop() {
l.isRunning.Set(false)
if err := l.listener.Close(); err != nil {
glog.Error("error closing listener", err)
}
block := func() (interface{}, error) {
l.wg.Wait()
return nil, nil
}
TimeoutIn(block, l.Timeout)
}
// Start your listener, not thread safe, blocking.
func (l *Listener) Start() error {
var err error
l.listener, err = net.Listen("tcp", l.Addr)
if err != nil {
return err
}
l.isRunning.Set(true)
for {
if l.isRunning.Get() {
l.wg.Add(1)
conn, err := l.listener.Accept()
if err != nil {
l.wg.Done()
glog.Error("error accepting connection ", err)
continue
}
l.metrics.acceptCounter.Mark(1)
go l.readLoop(conn)
}
}
}
func (l *Listener) readLoop(conn net.Conn) {
defer l.wg.Done()
defer l.metrics.connectionDrop.Mark(1)
m := NewMongoConn(conn)
// keep reading till the connection is alive.
for {
h, err := m.ReadHeader()
if err != nil {
glog.Error("failed to read header from incoming conn", err)
m.Close()
return
}
msg := newDispatchMessage(m, h)
if err := l.dispatcher.Dispatch(msg, l.Timeout); err != nil {
glog.Error("dispatch timed out", err)
m.Close()
return
}
if err := msg.Wait(l.Timeout); err != nil {
glog.Error("timed out waiting for dispatch response")
m.Close()
return
}
}
}