Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setUp() throws Exception {
final Message toSend = session.createMessage();
toSend.setStringProperty("foo", "bar");
final MessageProducer producer = session.createProducer(queue);
producer.send(queue, toSend);
producer.send(toSend);
} finally {
conn.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ public void start() throws JMSException {
*/
@Override
public void stop() throws JMSException {
for (final ActiveMQSession session : sessions) {
session.checkNotInCompletionListenerCallback("stop");
session.checkNotInMessageListenerCallback("stop");
}
doStop(true);
}

Expand Down Expand Up @@ -677,6 +681,10 @@ void doStop(boolean checkClosed) throws JMSException {
*/
@Override
public void close() throws JMSException {
for (final ActiveMQSession session : sessions) {
session.checkNotInCompletionListenerCallback("close");
session.checkNotInMessageListenerCallback("close");
}
try {
// If we were running, lets stop first.
if (!closed.get() && !transportFailed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,12 @@ public void dispatch(MessageDispatch md) {
try {
boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
if (!expired) {
listener.onMessage(message);
session.messageListenerThread.set(Thread.currentThread());
try {
listener.onMessage(message);
} finally {
session.messageListenerThread.set(null);
}
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public Destination getDestination() throws JMSException {
*/
@Override
public void close() throws JMSException {
session.checkNotInCompletionListenerCallback("close");
if (!closed) {
dispose();
this.session.asyncSendPacket(info.createRemoveCommand());
Expand Down Expand Up @@ -197,6 +198,35 @@ protected void checkClosed() throws IllegalStateException {
}
}

@Override
public void send(Message message) throws JMSException {
checkClosed();
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
super.send(message);
}

@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosed();
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
validateDeliveryMode(deliveryMode);
validatePriority(priority);
super.send(message, deliveryMode, priority, timeToLive);
}

@Override
public void send(Destination destination, Message message) throws JMSException {
checkClosed();
if (info.getDestination() != null) {
throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message) variants.");
}
super.send(destination, message);
}

/**
* Sends a message to a destination for an unidentified message producer,
* specifying delivery mode, priority and time to live.
Expand All @@ -221,42 +251,133 @@ protected void checkClosed() throws IllegalStateException {
*/
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null);
validateDeliveryMode(deliveryMode);
validatePriority(priority);
this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback) null);
}

/**
*
* @param message the message to send
* @param CompletionListener to callback
* @param completionListener to callback
* @throws JMSException if the JMS provider fails to send the message due to
* some internal error.
* @throws UnsupportedOperationException if an invalid destination is
* specified.
* @throws InvalidDestinationException if a client uses this method with an
* invalid destination.
* @throws UnsupportedOperationException if called on an anonymous producer (no fixed destination)
* @see jakarta.jms.Session#createProducer
* @since 2.0
*/
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported");
checkClosed();
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener must not be null");
}
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
this.doSendWithCompletionListener(info.getDestination(), message, this.defaultDeliveryMode,
this.defaultPriority, this.defaultTimeToLive,
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
}

@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive,
CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
checkClosed();
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener must not be null");
}
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
validateDeliveryMode(deliveryMode);
validatePriority(priority);
this.doSendWithCompletionListener(info.getDestination(), message, deliveryMode, priority, timeToLive,
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
}

@Override
public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported");
checkClosed();
if (info.getDestination() != null) {
throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants.");
}
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener must not be null");
}
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message,
this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive,
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
}

@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
checkClosed();
if (info.getDestination() != null) {
throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants.");
}
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener must not be null");
}
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
validateDeliveryMode(deliveryMode);
validatePriority(priority);
this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message,
deliveryMode, priority, timeToLive,
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
}

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean disableMessageID, boolean disableMessageTimestamp,
CompletionListener completionListener) throws JMSException {
checkClosed();
if (info.getDestination() != null) {
throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants.");
}
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener must not be null");
}
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
validateDeliveryMode(deliveryMode);
validatePriority(priority);
this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message,
deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, completionListener);
}

private void doSendWithCompletionListener(final ActiveMQDestination dest, Message message,
final int deliveryMode, final int priority, final long timeToLive,
final boolean disableMessageID, final boolean disableMessageTimestamp,
final CompletionListener completionListener) throws JMSException {
if (dest == null) {
throw new JMSException("No destination specified");
}

if (transformer != null) {
final Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}

if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}

this.session.send(this, dest, message, deliveryMode, priority, timeToLive,
disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, completionListener);
stats.onMessage();
}

public void send(Message message, AsyncCallback onComplete) throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ public void send(Destination destination, Message message) throws JMSException {

protected abstract void checkClosed() throws IllegalStateException;

protected static void validateDeliveryMode(final int deliveryMode) throws JMSException {
if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) {
throw new JMSException("Invalid delivery mode: " + deliveryMode);
}
}

protected static void validatePriority(final int priority) throws JMSException {
if (priority < 0 || priority > 9) {
throw new JMSException("Invalid priority: " + priority + " (must be 0-9)");
}
}

/**
* @return the sendTimeout
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ActiveMQProducer implements JMSProducer {
// Properties applied to all messages on a per-JMS producer instance basis
private Map<String, Object> messageProperties = null;

private CompletionListener completionListener = null;

ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) {
this.activemqContext = activemqContext;
this.activemqMessageProducer = activemqMessageProducer;
Expand Down Expand Up @@ -90,7 +92,12 @@ public JMSProducer send(Destination destination, Message message) {
}
}

activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
if (completionListener != null) {
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(),
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
} else {
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), (AsyncCallback) null);
}
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand Down Expand Up @@ -256,12 +263,13 @@ public long getDeliveryDelay() {

@Override
public JMSProducer setAsync(CompletionListener completionListener) {
throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported");
this.completionListener = completionListener;
return this;
}

@Override
public CompletionListener getAsync() {
throw new UnsupportedOperationException("getAsync() is not supported");
return this.completionListener;
}

@Override
Expand Down
Loading