Skip to content

Commit 1fd2979

Browse files
author
Simon MacMullen
committed
stable to default
2 parents 7c273eb + 8236442 commit 1fd2979

File tree

11 files changed

+286
-11
lines changed

11 files changed

+286
-11
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Implement this interface in order to be notified of connection block and
24+
* unblock events.
25+
*/
26+
public interface BlockedListener {
27+
void handleBlocked(String reason) throws IOException;
28+
void handleUnblocked() throws IOException;
29+
}

src/com/rabbitmq/client/Connection.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,24 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
214214
* operations, use -1 for infinity
215215
*/
216216
void abort(int closeCode, String closeMessage, int timeout);
217+
218+
/**
219+
* Add a {@link BlockedListener}.
220+
* @param listener the listener to add
221+
*/
222+
void addBlockedListener(BlockedListener listener);
223+
224+
/**
225+
* Remove a {@link BlockedListener}.
226+
* @param listener the listener to remove
227+
* @return <code><b>true</b></code> if the listener was found and removed,
228+
* <code><b>false</b></code> otherwise
229+
*/
230+
boolean removeBlockedListener(BlockedListener listener);
231+
232+
/**
233+
* Remove all {@link BlockedListener}s.
234+
*/
235+
void clearBlockedListeners();
236+
217237
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
2323
import java.net.SocketTimeoutException;
24+
import java.util.Collection;
2425
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
2729
import java.util.concurrent.ExecutorService;
2830
import java.util.concurrent.TimeoutException;
2931

3032
import com.rabbitmq.client.AMQP;
33+
import com.rabbitmq.client.BlockedListener;
3134
import com.rabbitmq.client.Method;
3235
import com.rabbitmq.client.AlreadyClosedException;
3336
import com.rabbitmq.client.Channel;
@@ -81,6 +84,7 @@ public static final Map<String, Object> defaultClientProperties() {
8184
capabilities.put("exchange_exchange_bindings", true);
8285
capabilities.put("basic.nack", true);
8386
capabilities.put("consumer_cancel_notify", true);
87+
capabilities.put("connection.blocked", true);
8488

8589
props.put("capabilities", capabilities);
8690

@@ -130,6 +134,7 @@ public static final Map<String, Object> defaultClientProperties() {
130134
private final int requestedFrameMax;
131135
private final String username;
132136
private final String password;
137+
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
133138

134139
/* State modified after start - all volatile */
135140

@@ -597,6 +602,25 @@ public boolean processControlCommand(Command c) throws IOException
597602
if (method instanceof AMQP.Connection.Close) {
598603
handleConnectionClose(c);
599604
return true;
605+
} else if (method instanceof AMQP.Connection.Blocked) {
606+
AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
607+
try {
608+
for (BlockedListener l : this.blockedListeners) {
609+
l.handleBlocked(blocked.getReason());
610+
}
611+
} catch (Throwable ex) {
612+
getExceptionHandler().handleBlockedListenerException(this, ex);
613+
}
614+
return true;
615+
} else if (method instanceof AMQP.Connection.Unblocked) {
616+
try {
617+
for (BlockedListener l : this.blockedListeners) {
618+
l.handleUnblocked();
619+
}
620+
} catch (Throwable ex) {
621+
getExceptionHandler().handleBlockedListenerException(this, ex);
622+
}
623+
return true;
600624
} else {
601625
return false;
602626
}
@@ -823,4 +847,16 @@ public AMQCommand transformReply(AMQCommand command) {
823847
private String getHostAddress() {
824848
return getAddress() == null ? null : getAddress().getHostAddress();
825849
}
850+
851+
public void addBlockedListener(BlockedListener listener) {
852+
blockedListeners.add(listener);
853+
}
854+
855+
public boolean removeBlockedListener(BlockedListener listener) {
856+
return blockedListeners.remove(listener);
857+
}
858+
859+
public void clearBlockedListeners() {
860+
blockedListeners.clear();
861+
}
826862
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public void handleConfirmListenerException(Channel channel, Throwable exception)
4949
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
5050
}
5151

52+
public void handleBlockedListenerException(Connection connection, Throwable exception) {
53+
handleConnectionKiller(connection, exception, "BlockedListener");
54+
}
55+
5256
public void handleConsumerException(Channel channel, Throwable exception,
5357
Consumer consumer, String consumerTag,
5458
String methodName)
@@ -76,4 +80,22 @@ protected void handleChannelKiller(Channel channel, Throwable exception, String
7680
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
7781
}
7882
}
83+
84+
protected void handleConnectionKiller(Connection connection, Throwable exception, String what) {
85+
// TODO: log the exception
86+
System.err.println("DefaultExceptionHandler: " + what + " threw an exception for connection "
87+
+ connection + ":");
88+
exception.printStackTrace();
89+
try {
90+
connection.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
91+
} catch (AlreadyClosedException ace) {
92+
// noop
93+
} catch (IOException ioe) {
94+
// TODO: log the failure
95+
System.err.println("Failure during close of connection " + connection + " after " + exception
96+
+ ":");
97+
ioe.printStackTrace();
98+
connection.abort(AMQP.INTERNAL_ERROR, "Internal error closing connection for " + what);
99+
}
100+
}
79101
}

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public interface ExceptionHandler {
6363
*/
6464
void handleConfirmListenerException(Channel channel, Throwable exception);
6565

66+
/**
67+
* Perform any required exception processing for the situation
68+
* when the driver thread for the connection has called a
69+
* BlockedListener's method, and that method has
70+
* thrown an exception.
71+
* @param connection the Connection that held the BlockedListener
72+
* @param exception the exception thrown by the BlockedListener
73+
*/
74+
void handleBlockedListenerException(Connection connection, Throwable exception);
75+
6676
/**
6777
* Perform any required exception processing for the situation
6878
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ public void handleConfirmListenerException(Channel ch, Throwable ex) {
242242
fail("handleConfirmListenerException: " + ex);
243243
}
244244

245+
public void handleBlockedListenerException(Connection conn, Throwable ex) {
246+
fail("handleBlockedListenerException: " + ex);
247+
}
248+
245249
public void handleConsumerException(Channel ch,
246250
Throwable ex,
247251
Consumer c,

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,17 @@ protected void deleteExchange(String x) throws IOException {
244244
protected void deleteQueue(String q) throws IOException {
245245
channel.queueDelete(q);
246246
}
247+
248+
protected void clearAllResourceAlarms() throws IOException, InterruptedException {
249+
clearResourceAlarm("memory");
250+
clearResourceAlarm("disk");
251+
}
252+
253+
protected void setResourceAlarm(String source) throws IOException, InterruptedException {
254+
Host.executeCommand("cd ../rabbitmq-test; make set-resource-alarm SOURCE=" + source);
255+
}
256+
257+
protected void clearResourceAlarm(String source) throws IOException, InterruptedException {
258+
Host.executeCommand("cd ../rabbitmq-test; make clear-resource-alarm SOURCE=" + source);
259+
}
247260
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AlreadyClosedException;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
9+
public class BasicGet extends BrokerTestCase {
10+
public void testBasicGetWithEnqueuedMessages() throws IOException, InterruptedException {
11+
assertTrue(channel.isOpen());
12+
String q = channel.queueDeclare().getQueue();
13+
14+
basicPublishPersistent("msg".getBytes("UTF-8"), q);
15+
Thread.sleep(250);
16+
17+
assertNotNull(channel.basicGet(q, true));
18+
channel.queuePurge(q);
19+
assertNull(channel.basicGet(q, true));
20+
channel.queueDelete(q);
21+
}
22+
23+
public void testBasicGetWithEmptyQueue() throws IOException, InterruptedException {
24+
assertTrue(channel.isOpen());
25+
String q = channel.queueDeclare().getQueue();
26+
27+
assertNull(channel.basicGet(q, true));
28+
channel.queueDelete(q);
29+
}
30+
31+
public void testBasicGetWithClosedChannel() throws IOException, InterruptedException {
32+
assertTrue(channel.isOpen());
33+
String q = channel.queueDeclare().getQueue();
34+
35+
channel.close();
36+
assertFalse(channel.isOpen());
37+
try {
38+
channel.basicGet(q, true);
39+
fail("expected basic.get on a closed channel to fail");
40+
} catch (AlreadyClosedException e) {
41+
// passed
42+
} finally {
43+
Channel tch = connection.createChannel();
44+
tch.queueDelete(q);
45+
tch.close();
46+
}
47+
48+
}
49+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client.test.server;
19+
20+
import com.rabbitmq.client.BlockedListener;
21+
import com.rabbitmq.client.Channel;
22+
import com.rabbitmq.client.Connection;
23+
import com.rabbitmq.client.ConnectionFactory;
24+
import com.rabbitmq.client.MessageProperties;
25+
import com.rabbitmq.client.test.BrokerTestCase;
26+
import com.rabbitmq.tools.Host;
27+
28+
import java.io.IOException;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
32+
public class BlockedConnection extends BrokerTestCase {
33+
protected void releaseResources() throws IOException {
34+
try {
35+
unblock();
36+
} catch (InterruptedException e) {
37+
e.printStackTrace();
38+
}
39+
}
40+
public void testBlock() throws Exception {
41+
final CountDownLatch latch = new CountDownLatch(1);
42+
43+
Connection connection = connection(latch);
44+
block();
45+
publish(connection);
46+
47+
assertTrue(latch.await(10, TimeUnit.SECONDS));
48+
}
49+
50+
public void testInitialBlock() throws Exception {
51+
final CountDownLatch latch = new CountDownLatch(1);
52+
53+
block();
54+
Connection connection = connection(latch);
55+
publish(connection);
56+
57+
assertTrue(latch.await(10, TimeUnit.SECONDS));
58+
}
59+
60+
private void block() throws IOException, InterruptedException {
61+
Host.rabbitmqctl("set_vm_memory_high_watermark 0.000000001");
62+
setResourceAlarm("disk");
63+
}
64+
65+
private void unblock() throws IOException, InterruptedException {
66+
Host.rabbitmqctl("set_vm_memory_high_watermark 0.4");
67+
clearResourceAlarm("disk");
68+
}
69+
70+
private Connection connection(final CountDownLatch latch) throws IOException {
71+
ConnectionFactory factory = new ConnectionFactory();
72+
Connection connection = factory.newConnection();
73+
connection.addBlockedListener(new BlockedListener() {
74+
public void handleBlocked(String reason) throws IOException {
75+
try {
76+
unblock();
77+
} catch (InterruptedException e) {
78+
e.printStackTrace();
79+
}
80+
}
81+
82+
public void handleUnblocked() throws IOException {
83+
latch.countDown();
84+
}
85+
});
86+
return connection;
87+
}
88+
89+
private void publish(Connection connection) throws IOException {
90+
Channel ch = connection.createChannel();
91+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
92+
}
93+
}

test/src/com/rabbitmq/client/test/server/MemoryAlarms.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@ protected void createResources() throws IOException {
6262

6363
@Override
6464
protected void releaseResources() throws IOException {
65-
channel.queueDelete(Q);
66-
}
67-
68-
protected void setResourceAlarm(String source) throws IOException, InterruptedException {
69-
Host.executeCommand("cd ../rabbitmq-test; make set-resource-alarm SOURCE=" + source);
70-
}
71-
72-
protected void clearResourceAlarm(String source) throws IOException, InterruptedException {
73-
Host.executeCommand("cd ../rabbitmq-test; make clear-resource-alarm SOURCE=" + source);
65+
try {
66+
clearAllResourceAlarms();
67+
} catch (InterruptedException e) {
68+
e.printStackTrace();
69+
} finally {
70+
channel.queueDelete(Q);
71+
}
7472
}
7573

7674
public void testFlowControl() throws IOException, InterruptedException {
@@ -87,7 +85,7 @@ public void testFlowControl() throws IOException, InterruptedException {
8785
assertNull(c.nextDelivery(3100));
8886
// once the alarm has cleared the publishes should go through
8987
clearResourceAlarm("memory");
90-
assertNotNull(c.nextDelivery());
88+
assertNotNull(c.nextDelivery(3100));
9189
// everything should be back to normal
9290
channel.basicCancel(consumerTag);
9391
basicPublishVolatile(Q);
@@ -108,7 +106,7 @@ public void testOverlappingAlarmsFlowControl() throws IOException, InterruptedEx
108106
clearResourceAlarm("memory");
109107
assertNull(c.nextDelivery(100));
110108
clearResourceAlarm("disk");
111-
assertNotNull(c.nextDelivery());
109+
assertNotNull(c.nextDelivery(3100));
112110

113111
channel.basicCancel(consumerTag);
114112
basicPublishVolatile(Q);

0 commit comments

Comments
 (0)