-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtcp_connection_impl.h
More file actions
244 lines (221 loc) · 7.21 KB
/
tcp_connection_impl.h
File metadata and controls
244 lines (221 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#include <list>
#include "timing_wheel.h"
#ifndef _WIN32
#include <unistd.h>
#endif
#include "../tcp_connection.h"
namespace netpoll {
class Channel;
class Socket;
class TcpConnectionImpl : public TcpConnection,
public noncopyable,
public std::enable_shared_from_this<TcpConnectionImpl>
{
public:
friend class TcpServer;
friend class TcpClient;
TcpConnectionImpl(EventLoop *loop, int socketfd,
const InetAddress &localAddr, const InetAddress &peerAddr);
static TcpConnectionPtr New(EventLoop *loop, int socketfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);
class KickoffEntry
{
public:
explicit KickoffEntry(const std::weak_ptr<TcpConnection> &conn)
: m_conn(conn)
{
}
void reset() { m_conn.reset(); }
~KickoffEntry()
{
auto conn = m_conn.lock();
if (conn) { conn->forceClose(); }
}
private:
std::weak_ptr<TcpConnection> m_conn;
};
~TcpConnectionImpl() override = default;
void send(StringView const &msg) override;
void send(const MessageBuffer &buffer) override;
void send(MessageBuffer &&buffer) override;
void send(const std::shared_ptr<MessageBuffer> &msgPtr) override;
void sendFile(StringView const &fileName, size_t offset = 0,
size_t length = 0) override;
void sendStream(
std::function<std::size_t(char *, std::size_t)> callback) override;
const InetAddress &localAddr() const override { return m_localAddr; }
const InetAddress &peerAddr() const override { return m_peerAddr; }
bool connected() const override { return m_status == ConnStatus::Connected; }
bool disconnected() const override
{
return m_status == ConnStatus::Disconnected;
}
MessageBuffer *getRecvBuffer() override { return &m_readBuffer; }
// set callbacks
void setHighWaterMarkCallback(const HighWaterMarkCallback &cb,
size_t markLen) override
{
m_highWaterMarkCallback = cb;
m_highWaterMarkLen = markLen;
}
void keepAlive() override
{
m_idleTimeout = 0;
auto entry = m_kickoffEntry.lock();
if (entry) { entry->reset(); }
}
bool isKeepAlive() override { return m_idleTimeout == 0; }
void setTcpNoDelay(bool on) override;
void shutdown() override;
void forceClose() override;
EventLoop *getLoop() override { return m_loop; }
size_t bytesSent() const override { return m_bytesSent; }
size_t bytesReceived() const override { return m_bytesReceived; }
private:
/// Internal use only.
std::weak_ptr<KickoffEntry> m_kickoffEntry;
std::weak_ptr<TimingWheel> m_timingWheelWeakPtr;
size_t m_idleTimeout{0};
Timestamp m_lastTimingWheelUpdateTime;
void enableKickingOff(size_t timeout, const TimingWheel::Ptr &timingWheel)
{
assert(timingWheel);
assert(timeout > 0);
assert(timingWheel->getLoop() == m_loop);
auto entry = std::make_shared<KickoffEntry>(shared_from_this());
m_timingWheelWeakPtr = timingWheel;
m_kickoffEntry = entry;
timingWheel->insertEntry(timeout, entry);
}
void extendLife();
#ifndef _WIN32
void sendFile(int sfd, size_t offset = 0, size_t length = 0);
#else
void sendFile(const wchar_t *fileName, size_t offset, size_t length);
void sendFile(FILE *fp, size_t offset = 0, size_t length = 0);
#endif
void setRecvMsgCallback(const RecvMessageCallback &cb)
{
m_recvMsgCallback = cb;
}
void setRecvMsgCallback(RecvMessageCallback &&cb)
{
m_recvMsgCallback = std::move(cb);
}
void setConnectionCallback(const ConnectionCallback &cb)
{
m_connectionCallback = cb;
}
void setConnectionCallback(ConnectionCallback &&cb)
{
m_connectionCallback = std::move(cb);
}
void setWriteCompleteCallback(const WriteCompleteCallback &cb)
{
m_writeCompleteCallback = cb;
}
void setWriteCompleteCallback(WriteCompleteCallback &&cb)
{
m_writeCompleteCallback = std::move(cb);
}
void setCloseCallback(const CloseCallback &cb) { m_closeCallback = cb; }
void setCloseCallback(CloseCallback &&cb)
{
m_closeCallback = std::move(cb);
}
void connectDestroyed();
virtual void connectEstablished();
protected:
struct BufferNode
{
// sendFile() specific
#ifndef _WIN32
int m_sendFd{-1};
off_t m_offset{0};
#else
FILE *m_sendFp{nullptr};
long long m_offset{0};
#endif
ssize_t fileBytesToSend_{0};
// sendStream() specific
std::function<std::size_t(char *, std::size_t)> streamCallback_;
#ifndef NDEBUG // defined by CMake for release build
std::size_t nDataWritten_{0};
#endif
// generic
std::shared_ptr<MessageBuffer> msgBuffer_;
bool isFile() const
{
if (streamCallback_) return true;
#ifndef _WIN32
if (m_sendFd >= 0) return true;
#else
if (m_sendFp) return true;
#endif
return false;
}
~BufferNode()
{
#ifndef _WIN32
if (m_sendFd >= 0) ::close(m_sendFd);
#else
if (m_sendFp) ::fclose(m_sendFp);
#endif
if (streamCallback_)
streamCallback_(nullptr, 0); // cleanup callback internals
}
};
using BufferNodePtr = std::shared_ptr<BufferNode>;
enum class ConnStatus { Disconnected, Connecting, Connected, Disconnecting };
void sendFileInLoop(const BufferNodePtr &file);
#ifndef _WIN32
void sendInLoop(const void *buffer, size_t length);
ssize_t writeInLoop(const void *buffer, size_t length);
#else
void sendInLoop(const char *buffer, size_t length);
ssize_t writeInLoop(const char *buffer, size_t length);
#endif
void handleRead();
void handleWrite();
void sendNext();
void handleClose();
void handleError();
EventLoop *m_loop;
std::unique_ptr<Channel> m_ioChannelPtr;
std::unique_ptr<Socket> m_socketPtr;
MessageBuffer m_readBuffer;
std::list<BufferNodePtr> m_writeBufferList;
InetAddress m_localAddr, m_peerAddr;
ConnStatus m_status{ConnStatus::Connecting};
// callbacks
RecvMessageCallback m_recvMsgCallback;
ConnectionCallback m_connectionCallback;
CloseCallback m_closeCallback;
WriteCompleteCallback m_writeCompleteCallback;
HighWaterMarkCallback m_highWaterMarkCallback;
size_t m_highWaterMarkLen{};
std::string m_name;
uint64_t m_sendNum{0};
std::mutex m_sendNumMutex;
uint64_t getSendNumByGuard()
{
std::lock_guard<std::mutex> lockGuard(m_sendNumMutex);
return m_sendNum;
}
uint64_t plusSendNumByGuard()
{
std::lock_guard<std::mutex> lockGuard(m_sendNumMutex);
return ++m_sendNum;
}
uint64_t minusSendNumByGuard()
{
std::lock_guard<std::mutex> lockGuard(m_sendNumMutex);
return --m_sendNum;
}
size_t m_bytesSent{0};
size_t m_bytesReceived{0};
std::unique_ptr<std::vector<char>> m_fileBufferPtr;
};
using TcpConnectionImplPtr = std::shared_ptr<TcpConnectionImpl>;
} // namespace netpoll