-
Notifications
You must be signed in to change notification settings - Fork 662
Expand file tree
/
Copy pathIoSessionResponder.java
More file actions
139 lines (128 loc) · 5.11 KB
/
IoSessionResponder.java
File metadata and controls
139 lines (128 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/
package quickfix.mina;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.core.write.WriteRequestQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.Responder;
import quickfix.Session;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* The class that partially integrates the QuickFIX/J Session to
* the MINA networking code.
*/
public class IoSessionResponder implements Responder {
private final Logger log = LoggerFactory.getLogger(getClass());
private final IoSession ioSession;
private final boolean synchronousWrites;
private final long synchronousWriteTimeout;
private final int maxScheduledWriteRequests;
public IoSessionResponder(IoSession session, boolean synchronousWrites, long synchronousWriteTimeout, int maxScheduledWriteRequests) {
ioSession = session;
this.synchronousWrites = synchronousWrites;
this.synchronousWriteTimeout = synchronousWriteTimeout;
this.maxScheduledWriteRequests = maxScheduledWriteRequests;
}
@Override
public boolean send(String data) {
// Check for and disconnect slow consumers.
if (maxScheduledWriteRequests > 0 && ioSession.getScheduledWriteMessages() >= maxScheduledWriteRequests) {
Session qfjSession = (Session) ioSession.getAttribute(SessionConnector.QF_SESSION);
try {
qfjSession.disconnect("Slow consumer", true);
} catch (IOException e) {
}
return false;
}
// The data is written asynchronously in a MINA thread
WriteFuture future = ioSession.write(data);
if (synchronousWrites) {
try {
if (!future.awaitUninterruptibly(synchronousWriteTimeout)) {
log.error("Synchronous write timed out after {}ms", synchronousWriteTimeout);
return false;
}
} catch (RuntimeException e) {
log.error("Synchronous write failed: {}", e.getMessage());
return false;
}
}
return true;
}
@Override
public int prioritySend(List<String> data){
final WriteRequestQueue writeRequestQueue = ioSession.getWriteRequestQueue();
final List<WriteRequest> pendingWrites = new ArrayList<>(writeRequestQueue.size());
int successfulMessageCount = 0;
try {
ioSession.suspendWrite();
// drain existing pending writes, to be rescheduled in the end
// a work around as WriteRequestQueue is currently not a Deque
WriteRequest pending;
while ((pending = writeRequestQueue.poll(ioSession)) != null) {
pendingWrites.add(pending);
}
for (String d : data) {
if (this.send(d)) {
successfulMessageCount++;
} else {
break;
}
}
} finally {
// reschedule de-prioritized over existing priority send to the end of the queue
try {
for (WriteRequest pendingWrite : pendingWrites) {
writeRequestQueue.offer(ioSession, pendingWrite);
}
} catch (Exception e) {
log.error("Failed to reschedule pending writes: {}", e.getMessage());
}
ioSession.resumeWrite();
}
return successfulMessageCount;
}
@Override
public void disconnect() {
// We cannot call join() on the CloseFuture returned
// by the following call. We are using a minimal
// threading model and calling join will prevent the
// close event from being processed by this thread (if
// this thread is the MINA IO processor thread.
ioSession.closeOnFlush();
ioSession.setAttribute(SessionConnector.QFJ_RESET_IO_CONNECTOR, Boolean.TRUE);
}
@Override
public String getRemoteAddress() {
final SocketAddress remoteAddress = ioSession.getRemoteAddress();
if (remoteAddress != null) {
return remoteAddress.toString();
}
return null;
}
IoSession getIoSession() {
return ioSession;
}
}