Is a JMS MessageConsumer truly asynchronous/non-blocking?

Yes.

The code tree is as follows:

Receive call on the ActiveMQMessageConsumer class, which implements the javax.jms.MessageConsumer interface…

  public javax.jms.Message receive()
    throws JMSException
  {
    checkClosed();
    checkMessageListener();
    
    sendPullCommand(0L);
    MessageDispatch md = dequeue(-1L);
    if (md == null) {
      return null;
    }
    beforeMessageIsConsumed(md);
    afterMessageIsConsumed(md, false);
    
    return createActiveMQMessage(md);
  }

The sendPullCommand() method, in the same class, is shown below…

protected void sendPullCommand(long timeout)
    throws JMSException
  {
    clearDeliveredList();
    if ((this.info.getCurrentPrefetchSize() == 0) && (this.unconsumedMessages.isEmpty()))
    {
      MessagePull messagePull = new MessagePull();
      messagePull.configure(this.info);
      messagePull.setTimeout(timeout);
      this.session.asyncSendPacket(messagePull);
    }
  }

The MessagePull{} class is configured and sent to the broker with the asyncSendPacket() method call.

For empirical evidence, an actual strace of a consumer registered with ActiveMQ shows what is below.

We identified the socket associated with the file descriptor 430 using strace against the Consumer. What is below is the strace against ActiveMQ itself. We start our consumer and create a Queue connection to the broker at 9:17:05. At 9:17:17, we send a message from our producer to the queue. There is no other traffic between the consumer and ActiveMQ until ActiveMQ sends it the message at 9:17:17. It doesn’t even contain a poll call every so often to see if there is work to do. As a result, it is truly asynchronous.

15032 09:17:05.722665 fcntl(430, F_GETFL 
15032 09:17:05.722741 fcntl(430, F_SETFL, O_RDWR 
30525 09:17:05.724339 setsockopt(430, SOL_SOCKET, SO_RCVBUF, [65536], 4) = 0
30525 09:17:05.724409 setsockopt(430, SOL_SOCKET, SO_SNDBUF, [65536], 4) = 0
30525 09:17:05.725299 sendto(430, "ActiveMQ MaxFrameSize CacheSize CacheEnabled SizePrefixDisabled MaxInactivityDurationInitalDelay TcpNoDelayEnabled MaxInactivityDuration TightEncodingEnabled StackTraceEnabled", 244, 0, NULL, 0 
30657 09:17:05.731351 setsockopt(430, SOL_TCP, TCP_NODELAY, [1], 4 
30658 09:17:05.738382 sendto(430, "ID:cmhlcarchapp01.foo.com-18813-1481762261285-0:1 (tcp://cmhlcarchapp01.foo.com:61616 amq2", 121, 0, NULL, 0) = 121
30658 09:17:05.807018 sendto(430, "", 16, 0, NULL, 0) = 16
30657 09:17:05.807576 sendto(430, "", 15, 0, NULL, 0) = 15
30657 09:17:05.807658 recvfrom(430, "ID:cmhlcarchapp01.foo.com-15477-1481897825526-1:1 ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic", 8192, 0, NULL, NULL) = 153
30657 09:17:05.817916 sendto(430, "", 15, 0, NULL, 0) = 15
30657 09:17:05.852143 recvfrom(430, "} ID:cmhlcarchapp01.foo.com-15477-1481897825526-1:1 requestQ ID:cmhlcarchapp01.expressco.com-15477-1481897825526-0:1", 8192, 0, NULL, NULL) = 157
30657 09:17:05.865215 sendto(430, "", 15, 0, NULL, 0 
30657 09:17:05.874460 sendto(430, "", 15, 0, NULL, 0 
30658 09:17:17.179415 sendto(430, "ID:cmhlcarchapp01.foo.com-15477-1481897825526-1:1 requestQ {ID:cmhlcarchapp01.foo.com-63166-1481897836921-1:1 replyQ Y Hello world. Y Y {", 247, 0, NULL, 0) = 247
30657 09:17:17.202377 sendto(430, "", 15, 0, NULL, 0) = 15
30657 09:17:17.202469 recvfrom(430, "ID:cmhlcarchapp01.foo.com-63166-1481897836921-1:1:1:1:1 Y Hello world.", 8192, 0, NULL, NULL) = 133
30657 09:17:17.206053 sendto(430, "", 15, 0, NULL, 0 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.