-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreams.go
More file actions
86 lines (67 loc) · 2.55 KB
/
streams.go
File metadata and controls
86 lines (67 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// SPDX-License-Identifier: AGPL-3.0-or-later
package runtime
import (
"context"
"errors"
"github.com/pilot-protocol/common/coreapi"
"github.com/pilot-protocol/common/daemonapi"
)
// daemonStreams is the in-process implementation of coreapi.Streams.
// Lives here (plugins/runtime) so daemon doesn't carry the L10 import.
type daemonStreams struct{ d daemonapi.Daemon }
func (s daemonStreams) Listen(port uint16) (coreapi.Listener, error) {
ln, err := s.d.Ports().Bind(port)
if err != nil {
return nil, err
}
return &daemonListener{inner: ln, d: s.d}, nil
}
func (s daemonStreams) Dial(ctx context.Context, dst coreapi.Addr, port uint16) (coreapi.Stream, error) {
conn, err := s.d.DialConnectionContext(ctx, dst, port)
if err != nil {
return nil, err
}
return newStreamAdapter(s.d, conn), nil
}
func (s daemonStreams) SendDatagram(ctx context.Context, dst coreapi.Addr, port uint16, data []byte) error {
return s.d.SendDatagram(dst, port, data)
}
type daemonListener struct {
inner daemonapi.Listener
d daemonapi.Daemon
}
func (l *daemonListener) Accept() (coreapi.Stream, error) {
conn, ok := l.inner.Accept()
if !ok {
return nil, errors.New("listener closed")
}
return newStreamAdapter(l.d, conn), nil
}
func (l *daemonListener) Close() error {
l.d.Ports().Unbind(l.inner.Port())
return nil
}
func (l *daemonListener) Addr() coreapi.Addr { return l.d.Addr() }
func (l *daemonListener) Port() uint16 { return l.inner.Port() }
// streamAdapter wraps daemonapi.Connection so it satisfies coreapi.Stream.
// Daemon exposes a connAdapter via Daemon.NewConnAdapter for this.
type streamAdapter struct {
d daemonapi.Daemon
conn daemonapi.Connection
rw daemonapi.ConnReadWriter
}
func newStreamAdapter(d daemonapi.Daemon, conn daemonapi.Connection) *streamAdapter {
return &streamAdapter{d: d, conn: conn, rw: d.NewConnReadWriter(conn)}
}
func (s *streamAdapter) Read(p []byte) (int, error) { return s.rw.Read(p) }
func (s *streamAdapter) Write(p []byte) (int, error) { return s.rw.Write(p) }
func (s *streamAdapter) Close() error { return s.rw.Close() }
func (s *streamAdapter) LocalAddr() coreapi.Addr { return s.conn.Info().LocalAddr }
func (s *streamAdapter) LocalPort() uint16 { return s.conn.Info().LocalPort }
func (s *streamAdapter) RemoteAddr() coreapi.Addr { return s.conn.Info().RemoteAddr }
func (s *streamAdapter) RemotePort() uint16 { return s.conn.Info().RemotePort }
var (
_ coreapi.Streams = daemonStreams{}
_ coreapi.Listener = (*daemonListener)(nil)
_ coreapi.Stream = (*streamAdapter)(nil)
)