Skip to content

Commit 829b85c

Browse files
author
Steve Powell
committed
AMQCommand is now thread-safe
1 parent eb1a3bd commit 829b85c

File tree

2 files changed

+98
-126
lines changed

2 files changed

+98
-126
lines changed

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* method, header and body from a series of frames, unless these are
2929
* supplied at construction time.
3030
* <p/><b>Concurrency</b><br/>
31-
* This class is <i>not</i> thread-safe.
31+
* This class is thread-safe.
3232
*/
3333
public class AMQCommand implements Command {
3434

@@ -38,13 +38,12 @@ public class AMQCommand implements Command {
3838
* <li>4 bytes of frame payload length</li>
3939
* <li>1 byte of payload trailer FRAME_END byte</li></ul>
4040
* See {@link #checkEmptyContentBodyFrameSize}, an assertion
41-
* called at startup.
41+
* checked at startup.
4242
*/
4343
private static final int EMPTY_CONTENT_BODY_FRAME_SIZE = 8;
4444

45-
/** The current assembler for this command */
46-
private CommandAssembler assembler;
47-
45+
/** The assembler for this command - synchronised on - contains all the state */
46+
private final CommandAssembler assembler;
4847

4948
/** Construct a command ready to fill in by reading frames */
5049
public AMQCommand() {
@@ -71,7 +70,6 @@ public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHea
7170

7271
/** Public API - {@inheritDoc} */
7372
public Method getMethod() {
74-
if (this.assembler == null) return null;
7573
return this.assembler.getMethod();
7674
}
7775

@@ -85,9 +83,6 @@ public byte[] getContentBody() {
8583
return this.assembler.getContentBody();
8684
}
8785

88-
public boolean isComplete() {
89-
return assembler.isComplete();
90-
}
9186
public boolean handleFrame(Frame f) throws IOException {
9287
return this.assembler.handleFrame(f);
9388
}
@@ -102,23 +97,28 @@ public void transmit(AMQChannel channel) throws IOException {
10297
int channelNumber = channel.getChannelNumber();
10398
AMQConnection connection = channel.getConnection();
10499

105-
Method m = getMethod();
106-
connection.writeFrame(m.toFrame(channelNumber));
107-
108-
if (m.hasContent()) {
109-
byte[] body = getContentBody();
100+
synchronized (assembler) {
101+
Method m = this.assembler.getMethod();
102+
connection.writeFrame(m.toFrame(channelNumber));
103+
if (m.hasContent()) {
104+
byte[] body = this.assembler.getContentBody();
110105

111-
connection.writeFrame(getContentHeader().toFrame(channelNumber, body.length));
106+
connection.writeFrame(this.assembler.getContentHeader()
107+
.toFrame(channelNumber, body.length));
112108

113-
int frameMax = connection.getFrameMax();
114-
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax - EMPTY_CONTENT_BODY_FRAME_SIZE;
109+
int frameMax = connection.getFrameMax();
110+
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
111+
- EMPTY_CONTENT_BODY_FRAME_SIZE;
115112

116-
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
117-
int remaining = body.length - offset;
113+
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
114+
int remaining = body.length - offset;
118115

119-
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
120-
Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
121-
connection.writeFrame(frame);
116+
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
117+
: bodyPayloadMax;
118+
Frame frame = Frame.fromBodyFragment(channelNumber, body,
119+
offset, fragmentLength);
120+
connection.writeFrame(frame);
121+
}
122122
}
123123
}
124124
}
@@ -128,15 +128,29 @@ public void transmit(AMQChannel channel) throws IOException {
128128
}
129129

130130
public String toString(boolean suppressBody){
131-
byte[] body = getContentBody();
132-
String contentStr;
131+
synchronized (assembler) {
132+
return new StringBuilder()
133+
.append('{')
134+
.append(this.assembler.getMethod())
135+
.append(", ")
136+
.append(this.assembler.getContentHeader())
137+
.append(", ")
138+
.append(contentBodyStringBuilder(
139+
this.assembler.getContentBody(), suppressBody))
140+
.append('}').toString();
141+
}
142+
}
143+
144+
private static StringBuilder contentBodyStringBuilder(byte[] body, boolean suppressBody) {
133145
try {
134-
contentStr = suppressBody ? (body.length + " bytes of payload") :
135-
("\"" + new String(body) + "\"");
146+
if (suppressBody) {
147+
return new StringBuilder().append(body.length).append(" bytes of payload");
148+
} else {
149+
return new StringBuilder().append('\"').append(body).append('\"');
150+
}
136151
} catch (Exception e) {
137-
contentStr = "|" + body.length + "|";
152+
return new StringBuilder().append('|').append(body.length).append('|');
138153
}
139-
return "{" + getMethod() + "," + getContentHeader() + "," + contentStr + "}";
140154
}
141155

142156
/** Called to check internal code assumptions. */

src/com/rabbitmq/client/impl/CommandAssembler.java

Lines changed: 56 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -26,105 +26,88 @@
2626
/**
2727
* Class responsible for piecing together a command from a series of {@link Frame}s.
2828
* <p/><b>Concurrency</b><br/>
29-
* This class is <i>not</i> thread-safe.
29+
* This class is thread-safe, since all methods are synchronised. Callers should not
30+
* synchronise on objects of this class unless they are sole owners.
3031
* @see AMQCommand
3132
*/
3233
final class CommandAssembler {
3334
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
3435

3536
/** Current state, used to decide how to handle each incoming frame. */
36-
private int state;
37-
38-
private static final int STATE_EXPECTING_METHOD = 0;
39-
private static final int STATE_EXPECTING_CONTENT_HEADER = 1;
40-
private static final int STATE_EXPECTING_CONTENT_BODY = 2;
41-
private static final int STATE_COMPLETE = 3;
37+
private enum CAState {
38+
EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE;
39+
}
40+
private CAState state;
4241

4342
/** The method for this command */
44-
private Method _method;
43+
private Method method;
4544

4645
/** The content header for this command */
47-
private AMQContentHeader _contentHeader;
46+
private AMQContentHeader contentHeader;
4847

49-
/** The first (and usually only) fragment of the content body */
50-
private byte[] _body0;
51-
/** The remaining fragments of this command's content body - a list of byte[] */
52-
private List<byte[]> _bodyN;
48+
/** The fragments of this command's content body - a list of byte[] */
49+
private final List<byte[]> bodyN;
5350
/** sum of the lengths of all fragments */
5451
private int bodyLength;
5552

56-
/**
57-
* How many more bytes of content body are expected to arrive
58-
* from the broker.
59-
*/
53+
/** No bytes of content body not yet accumulated */
6054
private long remainingBodyBytes;
6155

62-
public CommandAssembler() {
63-
this(null, null, null);
64-
}
65-
6656
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
67-
this._method = method;
68-
this._contentHeader = contentHeader;
69-
setContentBody(body);
70-
resetState();
71-
}
72-
73-
private void resetState() {
57+
this.method = method;
58+
this.contentHeader = contentHeader;
59+
this.bodyN = new ArrayList<byte[]>(2);
60+
this.bodyLength = 0;
7461
this.remainingBodyBytes = 0;
75-
if (this._method == null) {
76-
this.state = STATE_EXPECTING_METHOD;
77-
} else if (this._contentHeader == null) {
78-
this.state = this._method.hasContent() ? STATE_EXPECTING_CONTENT_HEADER : STATE_COMPLETE;
62+
appendBodyFragment(body);
63+
if (method == null) {
64+
this.state = CAState.EXPECTING_METHOD;
65+
} else if (contentHeader == null) {
66+
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
7967
} else {
80-
this.remainingBodyBytes = this._contentHeader.getBodySize() - contentBodySize();
68+
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
69+
updateContentBodyState();
8170
}
8271
}
8372

84-
private int contentBodySize() {
85-
return this.bodyLength;
86-
}
87-
88-
public Method getMethod() {
89-
return _method;
73+
public synchronized Method getMethod() {
74+
return this.method;
9075
}
9176

92-
public AMQContentHeader getContentHeader() {
93-
return _contentHeader;
77+
public synchronized AMQContentHeader getContentHeader() {
78+
return this.contentHeader;
9479
}
9580

9681
/** @return true if the command is complete */
97-
public boolean isComplete() {
98-
return (this.state == STATE_COMPLETE);
82+
public synchronized boolean isComplete() {
83+
return (this.state == CAState.COMPLETE);
9984
}
10085

10186
/** Decides whether more body frames are expected */
10287
private void updateContentBodyState() {
103-
this.state = (this.remainingBodyBytes > 0) ? STATE_EXPECTING_CONTENT_BODY : STATE_COMPLETE;
88+
this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
10489
}
10590

106-
private boolean consumeMethodFrame(Frame f) throws IOException {
91+
private void consumeMethodFrame(Frame f) throws IOException {
10792
if (f.type == AMQP.FRAME_METHOD) {
108-
this._method = AMQImpl.readMethodFrom(f.getInputStream());
109-
state = this._method.hasContent() ? STATE_EXPECTING_CONTENT_HEADER : STATE_COMPLETE;
110-
return isComplete();
93+
this.method = AMQImpl.readMethodFrom(f.getInputStream());
94+
this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
11195
} else {
11296
throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
11397
}
11498
}
11599

116-
private boolean consumeHeaderFrame(Frame f) throws IOException {
100+
private void consumeHeaderFrame(Frame f) throws IOException {
117101
if (f.type == AMQP.FRAME_HEADER) {
118-
this._contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
119-
this.remainingBodyBytes = this._contentHeader.getBodySize();
102+
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
103+
this.remainingBodyBytes = this.contentHeader.getBodySize();
120104
updateContentBodyState();
121-
return isComplete();
122105
} else {
123106
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
124107
}
125108
}
126109

127-
private boolean consumeBodyFrame(Frame f) {
110+
private void consumeBodyFrame(Frame f) {
128111
if (f.type == AMQP.FRAME_BODY) {
129112
byte[] fragment = f.getPayload();
130113
this.remainingBodyBytes -= fragment.length;
@@ -133,81 +116,56 @@ private boolean consumeBodyFrame(Frame f) {
133116
throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
134117
}
135118
appendBodyFragment(fragment);
136-
return isComplete();
137119
} else {
138120
throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);
139121
}
140122
}
141123

142124
/** Stitches together a fragmented content body into a single byte array */
143-
private void coalesceContentBody() {
144-
List<byte[]> oldFragments = _bodyN;
145-
byte[] firstFragment = _body0 == null ? EMPTY_BYTE_ARRAY : _body0;
125+
private byte[] coalesceContentBody() {
126+
if (this.bodyLength == 0) return EMPTY_BYTE_ARRAY;
127+
if (this.bodyN.size() == 1) return this.bodyN.get(0);
146128

147129
byte[] body = new byte[bodyLength];
148-
System.arraycopy(firstFragment, 0, body, 0, firstFragment.length);
149-
int offset = firstFragment.length;
150-
for (byte[] fragment : oldFragments) {
130+
int offset = 0;
131+
for (byte[] fragment : this.bodyN) {
151132
System.arraycopy(fragment, 0, body, offset, fragment.length);
152133
offset += fragment.length;
153134
}
154-
155-
setContentBody(body);
156-
}
157-
158-
public byte[] getContentBody() {
159-
if (_bodyN != null) {
160-
coalesceContentBody();
161-
}
162-
return _body0 == null ? EMPTY_BYTE_ARRAY : _body0;
135+
this.bodyN.clear();
136+
this.bodyN.add(body);
137+
return body;
163138
}
164139

165-
/**
166-
* Set the Command's content body.
167-
* @param body the body data
168-
*/
169-
private void setContentBody(byte[] body) {
170-
_body0 = body;
171-
_bodyN = null;
172-
bodyLength = byteArrayLength(_body0);
140+
public synchronized byte[] getContentBody() {
141+
return coalesceContentBody();
173142
}
174143

175144
private static int byteArrayLength(byte[] ba) {
176145
return (ba == null) ? 0 : ba.length;
177146
}
178147

179148
private void appendBodyFragment(byte[] fragment) {
180-
if (_body0 == null) {
181-
_body0 = fragment;
182-
bodyLength = byteArrayLength(fragment);
183-
} else {
184-
if (_bodyN == null) {
185-
_bodyN = new ArrayList<byte[]>();
186-
}
187-
_bodyN.add(fragment);
188-
bodyLength += byteArrayLength(fragment);
189-
}
149+
if (fragment == null || fragment.length == 0) return;
150+
bodyN.add(fragment);
151+
bodyLength += byteArrayLength(fragment);
190152
}
191153

192154
/**
193155
* @param f frame to be incorporated
194-
* @return true if command is complete
195-
* @throws IOException if error reading in frame
156+
* @return true if command becomes complete
157+
* @throws IOException if error reading frame
196158
*/
197-
public boolean handleFrame(Frame f) throws IOException
159+
public synchronized boolean handleFrame(Frame f) throws IOException
198160
{
199161
switch (this.state) {
200-
case STATE_EXPECTING_METHOD:
201-
return consumeMethodFrame(f);
202-
203-
case STATE_EXPECTING_CONTENT_HEADER:
204-
return consumeHeaderFrame(f);
205-
206-
case STATE_EXPECTING_CONTENT_BODY:
207-
return consumeBodyFrame(f);
162+
case EXPECTING_METHOD: consumeMethodFrame(f); break;
163+
case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;
164+
case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;
208165

209166
default:
210167
throw new AssertionError("Bad Command State " + this.state);
211168
}
169+
return isComplete();
212170
}
213171
}

0 commit comments

Comments
 (0)