Thursday, May 1, 2014

Spring DefaultMessageListenerContainer and JmsTemplate

This analysis uses Spring 2.5.6.

DefaultMessageListenerContainer

An example of using the DefaultMessageListenerContainer class is as follows:

       <bean id="jmsContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="jmsDestConnectionFactory" />
  <property name="destination" ref="myDestination" />
  <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
  <property name="messageListener" ref="messageListener" />
  <property name="sessionTransacted" value="true" />
  <property name="concurrentConsumers" value="1"></property>
 </bean>>
There is also the property transactionManager. For simplification, we just use DMLC to denote the class DefaultMessageListenerContainer.

The logic on how transaction mangement is used is as follows.

  1. If the property "transactionManager" is set, then that transaction manager will be used. The property "sessionTransacted" will be ignored.
  2. Else if "sessionTransacted" is set to "true", local transaction will be used. And the property "sessionAcknowledgeModeName" will be ignored. Programs need to use session.commit or session.rollback to manage the transaction.
  3. Else there is no transaction. The property "sessionAcknowledgeModeName" comes into effect.
The following is from Spring javadoc site "http://docs.spring.io/spring/docs/2.5.6/api/org/springframework/jms/listener/DefaultMessageListenerContainer.html".
======= javadoc of DefaultMessageListenerContainer.html ========
It is strongly recommended to either set "sessionTransacted" to "true" or specify an external "transactionManager". See the AbstractMessageListenerContainer javadoc for details on acknowledge modes and native transaction options, as well as the AbstractPollingMessageListenerContainer javadoc for details on configuring an external transaction manager.
====== javadoc of AbstractPollingMessageListenerContainer =====
setTransactionManager
public void setTransactionManager(PlatformTransactionManager transactionManager)
Specify the Spring PlatformTransactionManager to use for transactional wrapping of message reception plus listener execution.
Default is none, not performing any transactional wrapping. If specified, this will usually be a Spring JtaTransactionManager or one of its subclasses, in combination with a JTA-aware ConnectionFactory that this message listener container obtains its Connections from.
Note: Consider the use of local JMS transactions instead. Simply switch the "sessionTransacted" flag to "true" in order to use a locally transacted JMS Session for the entire receive processing, including any Session operations performed by a SessionAwareMessageListener (e.g. sending a response message). Alternatively, a JmsTransactionManager may be used for fully synchronized Spring transactions based on local JMS transactions. Check AbstractMessageListenerContainer's javadoc for a discussion of transaction choices and message redelivery scenarios.
=====================================================

The Spring class hierarchy is the following:

JmsAccessor
    ^
JmsDestinationAccessor
    ^
AbstractJmsListeningContainer
    ^
AbstractMessageListenerContainer
    ^
AbstractPollingMessageListenerContainer
    ^
DefaultMessageListenerContainer

The top class JmsAccessor implements the Spring InitializingBean interface:
public interface InitializingBean {
 void afterPropertiesSet() throws Exception;
}
The Initialization of class

The case that a transaction manager is not configured

The class AbstractJmsListeningContainer has the following:
AbstractJmsListeningContainer:
public void afterPropertiesSet() {
  super.afterPropertiesSet();
  validateConfiguration();
  initialize();
 }
 

So the method initialize() of the class will be invoked when the bean is created. It turns out that the sub classes below AbstractJmsListeningContainer override this method and at the same time call the same method in the super class. For DefaultMessageListenerContainer, it is the following:

DefaultMessageListenerContainer:
public void initialize() {
  // Adapt default cache level.
  if (this.cacheLevel == CACHE_AUTO) {
   this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
  }

  // Prepare taskExecutor and maxMessagesPerTask.
  synchronized (this.lifecycleMonitor) {
   if (this.taskExecutor == null) {
    this.taskExecutor = createDefaultTaskExecutor();
   }
   else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
     ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
     this.maxMessagesPerTask == Integer.MIN_VALUE) {
    // TaskExecutor indicated a preference for short-lived tasks. According to
    // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
    // unless the user specified a custom value.
    this.maxMessagesPerTask = 10;
   }
  }

  // Proceed with actual listener initialization.
  super.initialize();
 }
The default value of cacheLevel is CACHE_AUTO. Since we are not setting any transaction manager, the cache level will be set to CACHE_CONSUMER. Then it creates the taskExecutor, an instance of SimpleAsyncTaskExecutor. Then it calls the super.initialize():
AbstractPollingMessageListenerContainer:
public void initialize() {
  // Set sessionTransacted=true in case of a non-JTA transaction manager.
  if (!this.sessionTransactedCalled &&
    this.transactionManager instanceof ResourceTransactionManager &&
    !TransactionSynchronizationUtils.sameResourceFactory(
      (ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
   super.setSessionTransacted(true);
  }

  // Use bean name as default transaction name.
  if (this.transactionDefinition.getName() == null) {
   this.transactionDefinition.setName(getBeanName());
  }

  // Proceed with superclass initialization.
  super.initialize();
 }
Since no transaction manager is set, the first "if" is skipped. The second "if" sets the name. And then it executes the super.initialize():
AbstractJmsListeningContainer:
public void initialize() throws JmsException {
  try {
   synchronized (this.lifecycleMonitor) {
    this.active = true;
    this.lifecycleMonitor.notifyAll();
   }
   if (this.autoStartup) {
    doStart();
   }
   doInitialize();
  }
  catch (JMSException ex) {
   synchronized (this.sharedConnectionMonitor) {
    ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
    this.sharedConnection = null;
   }
   throw convertJmsAccessException(ex);
  }
 }
 
The autoStartup is true by default. So it will call doStart() and doInitialize(). In summary, the doStart() method will create the JMS connection. And the doInitialize() method will create and start a thread for each consumer. The thread will do a while-loop. In the loop,it will use the connection created in the doStart() method to create the session and consumer to consume the messages. If an exception is thrown in the loop, it will be caught. The thread will end. And a new thread will start.
-----------------------------------------
doStart()
-----------------------------------------

The method doStart() is this:

AbstractJmsListeningContainer:
protected void doStart() throws JMSException {
  // Lazily establish a shared Connection, if necessary.
  if (sharedConnectionEnabled()) {
   establishSharedConnection();
  }

  // Reschedule paused tasks, if any.
  synchronized (this.lifecycleMonitor) {
   this.running = true;
   this.lifecycleMonitor.notifyAll();
   resumePausedTasks();
  }

  // Start the shared Connection, if any.
  if (sharedConnectionEnabled()) {
   startSharedConnection();
  }
 }
The method sharedConnectionEnabled() will return true since the cache level has been set to CACHE_CONSUMER:
DefaultMessageListenerContainer:
protected final boolean sharedConnectionEnabled() {
  return (getCacheLevel() >= CACHE_CONNECTION);
 }

So next call the following:

DefaultMessageListenerContainer:
protected void establishSharedConnection() {
  try {
   super.establishSharedConnection();
  }
  catch (Exception ex) {
   logger.debug("Could not establish shared JMS Connection - " +
     "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
  }
 }
And then call the following:
AbstractJmsListeningContainer:
protected void establishSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   if (this.sharedConnection == null) {
    this.sharedConnection = createSharedConnection();
    logger.debug("Established shared JMS Connection");
   }
  }
 }

protected Connection createSharedConnection() throws JMSException {
  Connection con = createConnection();
  try {
   prepareSharedConnection(con);
   return con;
  }
  catch (JMSException ex) {
   JmsUtils.closeConnection(con);
   throw ex;
  }
 }

protected Connection createConnection() throws JMSException {
  return getConnectionFactory().createConnection();
 }

protected void prepareSharedConnection(Connection connection) throws JMSException {
  String clientId = getClientId();
  if (clientId != null) {
   connection.setClientID(clientId);
  }
 } 
Notes: 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(1) Now the JMS connection has been created! 
(2) If the connection creation throws an exception, no retry will be made. The bean creation will fail.
(3) What if the conneciton is good but it fails after the bean is created successfully? Later on we will
    see in the class AsyncMessageListenerInvoker that Spring will try to reestablish the connection. The 
    method for this is recoverAfterListenerSetupfailure().
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Next call in doStart() is the following:
resumePausedTasks();

Need to check this.  Then the following call is 

protected void startSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   this.sharedConnectionStarted = true;
   if (this.sharedConnection != null) {
    try {
     this.sharedConnection.start();
    }
    catch (javax.jms.IllegalStateException ex) {
     logger.debug("Ignoring Connection start exception - assuming already started: " + ex);
    }
   }
  }
 }

--------------------------------
doInitialize()
--------------------------------
DefaultMessageListenerContainer:
protected void doInitialize() throws JMSException {
  synchronized (this.lifecycleMonitor) {
   for (int i = 0; i < this.concurrentConsumers; i++) {
    scheduleNewInvoker();
   }
  }
 }

By default, concurrentConsumers = 1.

AbstractJmsListeningContainer:
private void scheduleNewInvoker() {
  AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
  if (rescheduleTaskIfNecessary(invoker)) {
   // This should always be true, since we're only calling this when active.
   this.scheduledInvokers.add(invoker);
  }
 }

protected final boolean rescheduleTaskIfNecessary(Object task) {
  if (this.running) {
   try {
    doRescheduleTask(task);
   }
   catch (RuntimeException ex) {
    logRejectedTask(task, ex);
    this.pausedTasks.add(task);
   }
   return true;
  }
  else if (this.active) {
   this.pausedTasks.add(task);
   return true;
  }
  else {
   return false;
  }
 } 
 
The variable running has already been set to true in the doStart() method. So doRescheduleTask will execute.
DefaultMessageListenerContainer:
protected void doRescheduleTask(Object task) {
  this.taskExecutor.execute((Runnable) task);
 }
 
The taskExecutor has already been created in the beginning part of initialize() of DefaultMessageListenerContainer.It is an instance of SimpleAsyncTaskExecutor. The execute method is as follows:
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncTaskExecutor, Serializable 

SimpleAsyncTaskExecutor:
public void execute(Runnable task) {
  execute(task, TIMEOUT_INDEFINITE);
 }

 /**
  * Executes the given task, within a concurrency throttle
  * if configured (through the superclass's settings).
  * 

Executes urgent tasks (with 'immediate' timeout) directly, * bypassing the concurrency throttle (if active). All other * tasks are subject to throttling. * @see #TIMEOUT_IMMEDIATE * @see #doExecute(Runnable) */ public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); doExecute(new ConcurrencyThrottlingRunnable(task)); } else { doExecute(task); } }

After checking several classes, it seems that isThrottleActive() is false. So it will call
protected void doExecute(Runnable task) {
  createThread(task).start();
 }
 
 CustomizableThreadCreator:
public Thread createThread(Runnable runnable) {
  Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
  thread.setPriority(getThreadPriority());
  thread.setDaemon(isDaemon());
  return thread;
 } 

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 
So at this point, a thread is created and started. 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Debugging shows that getThreadGroup() returns null, and nextThreadName() returns "jmsContainer-1". And the thread dump of the weblogic server does show this thread:
 "jmsContainer-1" waiting for lock java.lang.Object@3a421d TIMED_WAITING
          
             java.lang.Object.wait(Native Method)
          
             com.tibco.tibjms.TibjmsxSessionImp._getSyncMessage(TibjmsxSessionImp.java:2141)
          
             com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:1977)
          
             com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:240)
          
             com.tibco.tibjms.TibjmsMessageConsumer.receive(TibjmsMessageConsumer.java:440)
          
             com.xyz.mi.decorator.jms.XyzJmsMessageConsumer.receive(XyzJmsMessageConsumer.java:79)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:405)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
          
             java.lang.Thread.run(Thread.java:619)

Note here that the "task" object is the following invoker:
AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
 
Now we get to the actual task class that will do the essential work. It is an inner class of DefaultMessageListenerContainer.

There are lots of code there. But it is good to copy the whole class below.

 //-------------------------------------------------------------------------
 // Inner classes used as internal adapters
 //-------------------------------------------------------------------------

 /**
  * Runnable that performs looped MessageConsumer.receive() calls.
  */
 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {

  private Session session;

  private MessageConsumer consumer;

  private Object lastRecoveryMarker;

  private boolean lastMessageSucceeded;

  private int idleTaskExecutionCount = 0;

  private volatile boolean idle = true;

  public void run() {
   synchronized (lifecycleMonitor) {
    activeInvokerCount++;
    lifecycleMonitor.notifyAll();
   }
   boolean messageReceived = false;
   try {
    if (maxMessagesPerTask < 0) {
     messageReceived = executeOngoingLoop();
    }
    else {
     int messageCount = 0;
     while (isRunning() && messageCount < maxMessagesPerTask) {
      messageReceived = (invokeListener() || messageReceived);
      messageCount++;
     }
    }
   }
   catch (Throwable ex) {
    clearResources();
    if (!this.lastMessageSucceeded) {
     // We failed more than once in a row - sleep for recovery interval
     // even before first recovery attempt.
     sleepInbetweenRecoveryAttempts();
    }
    this.lastMessageSucceeded = false;
    boolean alreadyRecovered = false;
    synchronized (recoveryMonitor) {
     if (this.lastRecoveryMarker == currentRecoveryMarker) {
      handleListenerSetupFailure(ex, false);
      recoverAfterListenerSetupFailure();
      currentRecoveryMarker = new Object();
     }
     else {
      alreadyRecovered = true;
     }
    }
    if (alreadyRecovered) {
     handleListenerSetupFailure(ex, true);
    }
   }
   synchronized (lifecycleMonitor) {
    decreaseActiveInvokerCount();
    lifecycleMonitor.notifyAll();
   }
   if (!messageReceived) {
    this.idleTaskExecutionCount++;
   }
   else {
    this.idleTaskExecutionCount = 0;
   }
   synchronized (lifecycleMonitor) {
    if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
     // We're shutting down completely.
     scheduledInvokers.remove(this);
     if (logger.isDebugEnabled()) {
      logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
     }
     lifecycleMonitor.notifyAll();
     clearResources();
    }
    else if (isRunning()) {
     int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
     if (nonPausedConsumers < 1) {
      logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
        "Check your thread pool configuration! Manual recovery necessary through a start() call.");
     }
     else if (nonPausedConsumers < getConcurrentConsumers()) {
      logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
        "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
        "to be triggered by remaining consumers.");
     }
    }
   }
  }

  private boolean executeOngoingLoop() throws JMSException {
   boolean messageReceived = false;
   boolean active = true;
   while (active) {
    synchronized (lifecycleMonitor) {
     boolean interrupted = false;
     boolean wasWaiting = false;
     while ((active = isActive()) && !isRunning()) {
      if (interrupted) {
       throw new IllegalStateException("Thread was interrupted while waiting for " +
         "a restart of the listener container, but container is still stopped");
      }
      if (!wasWaiting) {
       decreaseActiveInvokerCount();
      }
      wasWaiting = true;
      try {
       lifecycleMonitor.wait();
      }
      catch (InterruptedException ex) {
       // Re-interrupt current thread, to allow other threads to react.
       Thread.currentThread().interrupt();
       interrupted = true;
      }
     }
     if (wasWaiting) {
      activeInvokerCount++;
     }
    }
    if (active) {
     messageReceived = (invokeListener() || messageReceived);
    }
   }
   return messageReceived;
  }

  private boolean invokeListener() throws JMSException {
   initResourcesIfNecessary();
   boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
   this.lastMessageSucceeded = true;
   return messageReceived;
  }

  private void decreaseActiveInvokerCount() {
   activeInvokerCount--;
   if (stopCallback != null && activeInvokerCount == 0) {
    stopCallback.run();
    stopCallback = null;
   }
  }

  private void initResourcesIfNecessary() throws JMSException {
   if (getCacheLevel() <= CACHE_CONNECTION) {
    updateRecoveryMarker();
   }
   else {
    if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
     updateRecoveryMarker();
     this.session = createSession(getSharedConnection());
    }
    if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
     this.consumer = createListenerConsumer(this.session);
    }
   }
  }

  private void updateRecoveryMarker() {
   synchronized (recoveryMonitor) {
    this.lastRecoveryMarker = currentRecoveryMarker;
   }
  }

  private void clearResources() {
   if (sharedConnectionEnabled()) {
    synchronized (sharedConnectionMonitor) {
     JmsUtils.closeMessageConsumer(this.consumer);
     JmsUtils.closeSession(this.session);
    }
   }
   else {
    JmsUtils.closeMessageConsumer(this.consumer);
    JmsUtils.closeSession(this.session);
   }
   this.consumer = null;
   this.session = null;
  }

  public boolean isLongLived() {
   return (maxMessagesPerTask < 0);
  }

  public void setIdle(boolean idle) {
   this.idle = idle;
  }

  public boolean isIdle() {
   return this.idle;
  }
 }
So now let's check out this class AsyncMessageListenerInvoker.

In its run() method, maxMessagesPerTask is initialized to a negative value. So if the value is not changed,this line will execute:

messageReceived = executeOngoingLoop();
In the above method, there is the following line:
messageReceived = (invokeListener() || messageReceived);
The method called is the following:
DefaultMessageListenerContainer$AsyncMessageListenerInvoker:
private boolean invokeListener() throws JMSException {
   initResourcesIfNecessary();
   boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
   this.lastMessageSucceeded = true;
   return messageReceived;
  }

private void initResourcesIfNecessary() throws JMSException {
   if (getCacheLevel() <= CACHE_CONNECTION) {
    updateRecoveryMarker();
   }
   else {
    if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
     updateRecoveryMarker();
     this.session = createSession(getSharedConnection());
    }
    if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
     this.consumer = createListenerConsumer(this.session);
    }
   }
  }
  
So by default, since the cache level has been set to CACHE_CONSUMER , a session will be created using the shared connection. And the consumer will then be created from this session.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Now the session and the consumer have been created.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  
AbstractPollingMessageListenerContainer:  
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
   throws JMSException {

  if (this.transactionManager != null) {
   // Execute receive within transaction.
   TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
   boolean messageReceived = true;
   try {
    messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
   }
   catch (JMSException ex) {
    rollbackOnException(status, ex);
    throw ex;
   }
   catch (RuntimeException ex) {
    rollbackOnException(status, ex);
    throw ex;
   }
   catch (Error err) {
    rollbackOnException(status, err);
    throw err;
   }
   this.transactionManager.commit(status);
   return messageReceived;
  }

  else {
   // Execute receive outside of transaction.
   return doReceiveAndExecute(invoker, session, consumer, null);
  }
 }
 
If the transactionManager is not set, then the method will just execute
 doReceiveAndExecute(invoker, session, consumer, null);
 
Its code is this:
 
AbstractPollingMessageListenerContainer: 
 protected boolean doReceiveAndExecute(
    Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
    throws JMSException {
 
   Connection conToClose = null;
   Session sessionToClose = null;
   MessageConsumer consumerToClose = null;
   try {
    Session sessionToUse = session;
    boolean transactional = false;
    if (sessionToUse == null) {
     sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
       getConnectionFactory(), this.transactionalResourceFactory, true);
     transactional = (sessionToUse != null);
    }
    if (sessionToUse == null) {
     Connection conToUse = null;
     if (sharedConnectionEnabled()) {
      conToUse = getSharedConnection();
     }
     else {
      conToUse = createConnection();
      conToClose = conToUse;
      conToUse.start();
     }
     sessionToUse = createSession(conToUse);
     sessionToClose = sessionToUse;
    }
    MessageConsumer consumerToUse = consumer;
    if (consumerToUse == null) {
     consumerToUse = createListenerConsumer(sessionToUse);
     consumerToClose = consumerToUse;
    }
    Message message = receiveMessage(consumerToUse);
    if (message != null) {
     if (logger.isDebugEnabled()) {
      logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
        consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
        sessionToUse + "]");
     }
     messageReceived(invoker, sessionToUse);
     boolean exposeResource = (!transactional && isExposeListenerSession() &&
       !TransactionSynchronizationManager.hasResource(getConnectionFactory()));
     if (exposeResource) {
      TransactionSynchronizationManager.bindResource(
        getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
     }
     try {
      doExecuteListener(sessionToUse, message);
     }
     catch (Throwable ex) {
      if (status != null) {
       if (logger.isDebugEnabled()) {
        logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
       }
       status.setRollbackOnly();
      }
      handleListenerException(ex);
      // Rethrow JMSException to indicate an infrastructure problem
      // that may have to trigger recovery...
      if (ex instanceof JMSException) {
       throw (JMSException) ex;
      }
     }
     finally {
      if (exposeResource) {
       TransactionSynchronizationManager.unbindResource(getConnectionFactory());
      }
     }
     return true;
    }
    else {
     if (logger.isTraceEnabled()) {
      logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
        "session [" + sessionToUse + "] did not receive a message");
     }
     noMessageReceived(invoker, sessionToUse);
     return false;
    }
   }
   finally {
    JmsUtils.closeMessageConsumer(consumerToClose);
    JmsUtils.closeSession(sessionToClose);
    ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
   }
 }
Notes:
  1. The doReceiveAndExecute(...) method lays out the overal process of processing the message. It is actually a little strange. First it uses the consumer to receive the message in the receiverMessage(...) call. Then it passes this received message to messageReceived(invoker, sessionToUse), which will eventually pass the message to the listeners that were registered by the setListener(...) method of the class AbstractMessageListenerContainer.
  2. The code uses the consumer and the session passed in to the method. So the consumer and the session are reused. Only when they are null, will the method create a new consumer and a new session. And in the "finally" block, it only closes the consumer and the session that are created in this method. Notice that in the "finally" block, the variable names are "consumerToClose" and "sessionToClose". The words are different from "consumerToUse" or "sessionToUse".
AbstractPollingMessageListenerContainer:
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
  return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
 } 
By default, the receiveTimeout in AbstractPollingMessageListenerContainer is 1000:
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
So basically the code is to try to get a message every 1000 milliseconds. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! This 1000 milliseconds give the CPU the breath room in the while-loop of the method executeOngoingLoop(). !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
DefaultMessageListenerContainer: 
protected void messageReceived(Object invoker, Session session) {
  ((AsyncMessageListenerInvoker) invoker).setIdle(false);
  scheduleNewInvokerIfAppropriate();
 } 

protected void scheduleNewInvokerIfAppropriate() {
  if (isRunning()) {
   resumePausedTasks();
   synchronized (this.lifecycleMonitor) {
    if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && getIdleInvokerCount() == 0) {
     scheduleNewInvoker();
     if (logger.isDebugEnabled()) {
      logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
     }
    }
   }
  }
 }
If a message is received, the current invoker is set to non-idle. In the method scheduleNewInvokerIfAppropriate(), if the total number of invokers are less than the configured max count and there is no idle invoker, then create a new invoker.

Note that if a message is not received, the method noMessageReceived(invoker, sessionToUse) will be called. This method will just set the current invoker to be idle.

AbstractMessageListenerContainer:
protected void doExecuteListener(Session session, Message message) throws JMSException {
  if (!isAcceptMessagesWhileStopping() && !isRunning()) {
   if (logger.isWarnEnabled()) {
    logger.warn("Rejecting received message because of the listener container " +
      "having been stopped in the meantime: " + message);
   }
   rollbackIfNecessary(session);
   throw new MessageRejectedWhileStoppingException();
  }
  try {
   invokeListener(session, message);
  }
  catch (JMSException ex) {
   rollbackOnExceptionIfNecessary(session, ex);
   throw ex;
  }
  catch (RuntimeException ex) {
   rollbackOnExceptionIfNecessary(session, ex);
   throw ex;
  }
  catch (Error err) {
   rollbackOnExceptionIfNecessary(session, err);
   throw err;
  }
  commitIfNecessary(session, message);
 } 
 
AbstractMessageListenerContainer:
protected void invokeListener(Session session, Message message) throws JMSException {
  Object listener = getMessageListener();
  if (listener instanceof SessionAwareMessageListener) {
   doInvokeListener((SessionAwareMessageListener) listener, session, message);
  }
  else if (listener instanceof MessageListener) {
   doInvokeListener((MessageListener) listener, message);
  }
  else if (listener != null) {
   throw new IllegalArgumentException(
     "Only MessageListener and SessionAwareMessageListener supported: " + listener);
  }
  else {
   throw new IllegalStateException("No message listener specified - see property 'messageListener'");
  }
 } 
 
Note here the listeners are invoked!
AbstractMessageListenerContainer:
protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
  listener.onMessage(message);
 }
Notes:
  1. Here Spring just passes the message to the onMessage(...) method of the listener. The Spring listener here is just a POJO. It knows nothing about JMS. If you look at the listener bean, you can see that it just implements the onMessage method. So the following observation:
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    Even though you specify a JMS listener in DefaultMessageListenerContainer, it is not 
    doing the asynchronous messaging!  It is not a real asynchronous mechanism. It just 
    uses the synchronous call receive(1000) repeatedly to simulate asynchronicity.
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    
  2. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
       The method doExecuteListener(...) calls the following method:
       protected void commitIfNecessary(Session session, Message message) throws 
    JMSException {
         // Commit session or acknowledge message.
         if (session.getTransacted()) {
          // Commit necessary - but avoid commit call within a JTA transaction.
          if (isSessionLocallyTransacted(session)) {
           // Transacted session created by this container -> commit.
           JmsUtils.commitIfNecessary(session);
          }
         }
         else if (isClientAcknowledge(session)) {
          message.acknowledge();
         }
     }
        This method takes care of the transaction in case of a locally transactional 
    session and the message acknowledgement in case of a non-transactional session. This
     method does not have any effect if the session is in a global transaction.
      
       The doExecuteListener(...) method also calls rollbackIfNecessary(session) if needed.
       !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    
  3. The API method onMessage(Message message) of MessageListener does not throw any Exception. So how can you make the transaction rollback? I think you can throw a RuntimeException inside the onMessage() method. This exception will be caught and the method rollbackOnExceptionIfNecessary(session, ex) is called. This method will rollback the transaction by calling session.rollback().

JmsTemplate

The class JmsTemplate extends JmsDestinationAccessor, which extends JmsAccessor. So you can also set the sessionTransacted and sessionAcknowledgeMode properties. By default, the values for these two properties are "false" and "AUTO_ACKNOWLEDGE". But in the code of JmsTemplate, these two property values are not used after the message is sent. The main method is

public Object execute(SessionCallback action, boolean startConnection) throws 
JmsException{
   ...
   try{
     ...
   }
   finally{
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(),
          startConnection);
   }
}
So it just tries to close the session and release the connection. There is no transaction commit or message acknowledgement. Why? What if sessionAcknowledgeMode is "CLIENT_ACKNOWLEDGE" or sessionTransacted is "true"?

Spring JMS pooling functionality

For database, the datasource object can maintain several connection objects in a pool so that a client can get an available connection from the pool, which can greatly enhance the performance. For JMS, how is the similar function done? From all the code above, I think the Spring JMS does not give several JMS connections. In the DefaultMessageListenerContainer, when shared, only one JMS connection is used. However, you can set multiple consumers that will use the same connection. So there is no connection pool in Spring JMS here. The connection pooling comes from the ConnectionFactory. The JmsAccessor class is a parent class of both the Spring JmsTemplate and DefaultMessageListenerContainer class. The JmsAccessor class has the ConnectionFactory instance variable. The javadoc of JmsTemplate has the following note: NOTE: The ConnectionFactory used with this template should return pooled Connections ( or a single shared Connection) as well as pooled Sessions and MessageProducers.

Spring JMS Consumer Caching

If the transaction manager is not set, DefaultMessageListenerContainer will change the default cache level from CACHE_AUTO to CACHE_CONSUMER. DMLC creates the consumers. And these consumers will be reused again and again in the loop. Of course, the connection and the session are also reused. So even if the ConnectionFactory does not returned a pooled connection, the same connection will be used.

Spring JMS Recovery functionality

Sending messages using JmsTemplate

There are basically the following steps when sending a message:

  1. create JMS connection
  2. create session
  3. create message producer
  4. send message
In the above steps, the “create” action does not necessarily create a new object. The underlying JMS factory or class may just return a pooled object. And when these objects are closed, they are not necessarily closed. The underlying implementation may just return the object to the pool.

Every time a message needs to be sent, the JmsTemplate class will call the “Send(…)” method. The method will go through the steps (1), (2), (3) and (4). If anything goes wrong in the above steps, it will throw an exception. There is no retry logic !

Receiving messages using DefaultMessageListenerContainer

Now let us check the recovery functionality of this Spring framework. What will the framework do if there is any connection failure, session failure, or other failure? Notice that a connection failure will cause the failure of session.

First of all, when the application is started, the DefaultMessageListenerContainer bean will be created. The creation process will create the JMS connection. If the connection cannot be established, the bean creation will fail. There is no retry for this. The application won't start successfully.

So the connection has to be good when the application starts. But after the application starts successfully, the DefaultMessageListenerContainer bean does have the recovery mechanism in case of connection failure or other exceptions in the middle. It will retry to create a new connection and new session and consumer to consume the message. The interval between recovery attempts defaults to 5000 ms. The following are more details on this.

The run() method of AsyncMessageListenerInvoker has the following structure:

public void run(){
   ...
   try{
      ...
      messageReceived = executeOngoingLoop();
      ...
   }catch(Throwable ex){
     ...
     if ( ...){
       ...
       handleListenerSetupFailuer(ex, false);
       recoverAfterListenerSetupFailure();
       ...
     }
   }
   ...
}
Inside the method executeOngoingLoop(), there is a call to the method doReceiveAndExeute() whose structure is the follows:
protected boolean doReceiveAndExecute(...){
    ...
    try{
       ...
       Message message = receiveMessage(consumerToUse);
       if ( message != null){
          ...
          try{
            doExecuteListener(sessionToUse, message);
          }catch(Throwable ex){
             ...
             handleListenerException(ex);
             // Rethrow JMSException to indicate an infrastructure problem
             // that may have to trigger recovery...
             if ( ex instanceof JMSException ){
                 throw (JMSException) ex;
             }
          }
       }
       else{
         ...
       }
     finally{
       ...
     }

So if there is any exception in executeOngoingLoop(), be it from connection or session or consumer or data access or anything else, the exception will be caught and processed. It won't be rethrown to the outside because the including method run() does not throw any exception!

Now pay close attention to the method doReceiveAndExecute(...). It will also catch and handle any Throwable. If the exception is not a JMSException, it will be swallowed! The exception will not be rethrown !!! Only when it is a JMSException, will the exception be rethrown and propagated to be lastly thrown by the method executeOngoingLoop().

The handleListenerException method is the following. It is defined in the upper class AbstractMessageListenerContainer.

protected void handleListenerException(Throwable ex) {
  if (ex instanceof MessageRejectedWhileStoppingException) {
   // Internal exception - has been handled before.
   return;
  }
  if (ex instanceof JMSException) {
   invokeExceptionListener((JMSException) ex);
  }
  if (isActive()) {
   // Regular case: failed while active.
   // Log at error level.
   logger.warn("Execution of JMS message listener failed", ex);
  }
  else {
   // Rare case: listener thread failed after container shutdown.
   // Log at debug level, to avoid spamming the shutdown log.
   logger.debug("Listener exception after container shutdown", ex);
  }
 }
So if the exception is a JMSException, the exception listener will be invoked. From debugging, it seems that the exception list is empty if you do not set any exception listeners. If the exception is not a JMSException, the code basically does nothing. You may create a subclass of DMLC to override this method. But it may not be very helpful because the method does not have the information about the message. It only has the exception object passed in the argument.

This way of handling the exception has another important effect. The method doExecuteListener(...) has the following structure:

protected void doExecuteListener(Session session, Message message) throws JMSException {
   ...
   try {
      invokeListener(session, message);
   }catch (JMSException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
   }catch (RuntimeException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
   }catch (Error err) {
       rollbackOnExceptionIfNecessary(session, err);
       throw err;
   }
   commitIfNecessary(session, message);
}

So if an exception is thrown, the method commitIfNecessary(...) won't be called. So the transaction is not committed if sessionTransacted=true. And the message is not acknowledged if there is no transaction and the acknowledge mode is CLIENT_ACKNOWLEDGE. Now if the exception is not a JMSException, the code will go back to the while(active) loop inside the executeOngoingLoop() method. The same message that caused the exception will be received. And most likely it will cause the same exception to be thrown and an infinite loop!

In the "catch" block of the run() method, first clearResources() will close the session and the consumer. If executeOngoingLoop() throws exception the first time it executes, the next call will be sleepInbetweenRecoveryAttempts(). By default this is to wait for 5 seconds:

DefaultMessageListenerContainer:
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

protected void sleepInbetweenRecoveryAttempts() {
  if (this.recoveryInterval > 0) {
   try {
    Thread.sleep(this.recoveryInterval);
   }
   catch (InterruptedException interEx) {
    // Re-interrupt current thread, to allow other threads to react.
    Thread.currentThread().interrupt();
   }
  }
 }
 
Next the following code is called:
synchronized (recoveryMonitor) {
    if (this.lastRecoveryMarker == currentRecoveryMarker) {
        handleListenerSetupFailure(ex, false);
 recoverAfterListenerSetupFailure();
 currentRecoveryMarker = new Object();
    }
    else {
 alreadyRecovered = true;
    }
}
In the method initResourcesIfNecessary, the method updateRecoveryMarker set the marker:
private void updateRecoveryMarker() {
   synchronized (recoveryMonitor) {
   this.lastRecoveryMarker = currentRecoveryMarker;
   }
}
So the if-condition is true.
 
protected void recoverAfterListenerSetupFailure() {
   refreshConnectionUntilSuccessful();
   refreshDestination();
}
It turns out that the method refreshConnectionUntilSuccessful is quite interesting. Just as its name implies, it will try to get a good connection until successful.
protected void refreshConnectionUntilSuccessful() {
  while (isRunning()) {
   try {
    if (sharedConnectionEnabled()) {
     refreshSharedConnection();
    }
    else {
     Connection con = createConnection();
     JmsUtils.closeConnection(con);
    }
    logger.info("Successfully refreshed JMS Connection");
    break;
   }
   catch (Exception ex) {
    StringBuffer msg = new StringBuffer();
    msg.append("Could not refresh JMS Connection for destination '");
    msg.append(getDestinationDescription()).append("' - retrying in ");
    msg.append(this.recoveryInterval).append(" ms. Cause: ");
    msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
    if (logger.isDebugEnabled()) {
     logger.info(msg, ex);
    }
    else if (logger.isInfoEnabled()) {
     logger.info(msg);
    }
   }
   sleepInbetweenRecoveryAttempts();
  }
 }

protected final void refreshSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
    ConnectionFactoryUtils.releaseConnection(
    this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted);
    this.sharedConnection = null;
    this.sharedConnection = createSharedConnection();
    if (this.sharedConnectionStarted) {
       this.sharedConnection.start();
    }
  }
}
Note that inside the method refreshConnectionUntilSuccessful is a loop that will keep going until the connection is refreshed successfully! Between two attemps, it sleeps for a while. Again the default sleep time is 5000ms. Also note that in our case, sharedConnectionStarted is true. So the method this.sharedConnection.start() will be called.

Now a big question is this. If we use the Spring SingleConnectionFactory, the connection is cached. The call to close the connection does not actually close the connection. It basically does nothing. So the createSharedConnection or the createConnection method will return the cached connection which can be bad. This will lead to an useless infinite loop. The cached connection can be reset when SingleConnectionFactory is called as an exception listener on the connection. But when and how will that happen? In the configuration for SingleConnectionFactory, we set reconnectOnException to true. This registers the SingleConnectionFactory as one exception listener to the connection created. But it is not clear if the JMS provider will invoke the listeners in case of a JMSException on the connection. If the connection reset won't be called, we should just use the simple non-caching ConnectionFactory. This way a new connection will be created. Note that for DefaultMessageListenerContainer, it does not seem to be necessary to use SingleConnectionFactory. DMLC will create a connection and the consumers at the beginning. After that, the same consumers will be used repeatedly to receive the messages.

At the end, there will be a call to rescheduleTaskIfNecessary(this).

DefaultMessageListenerContainer:
protected void doRescheduleTask(Object task) {
 this.taskExecutor.execute((Runnable) task);
} 
This will schedule a new task and now the new task will run! Meanwhile, the run() method of the current task ( thread ) will finish. So the current thread will finish. In the new task, the code will create the session and the consumer to consume the messages.

Now let's look at the case that a transaction manager is configured.

cacheLevel will be CACHE_NONE.

sharedConnectionEnabled() returns false.
An important line for transaction management is the following in the method receiveAndExecute(...) of the class AbstractPollingMessageListenerContainer:
this.transactionManager.commit(status);
===============================
Questions
===============================
1. When will the JMS session and the connection be closed? 
2. Same as in the Question 1, when is the JMS connection in the message sender(JmsTemplate) closed?
Answer to Queation 1:

First of all, the connection start() method is needed. It is actually called when the weblogic server is started.

The connection is stopped when I shut down the weblogic server. The sequence of the call is as follows:

(1) org.springframework.web.context.ContextLoader calls 
    public void closeWebApplicationContext(ServletContext servletContext) {...}
    which will call:
    protected void doClose() {
      ...
       Map lifecycleBeans = getLifecycleBeans();
       for (Iterator it = new LinkedHashSet(lifecycleBeans.keySet()).iterator(); it.hasNext();) {
        String beanName = (String) it.next();
        doStop(lifecycleBeans, beanName);
       }
       // Destroy all cached singletons in the context's BeanFactory.
       destroyBeans();
       // Close the state of this context itself.
       closeBeanFactory();
       onClose();
       synchronized (this.activeMonitor) {
        this.active = false;
       }
      }
 }
Note that in the above code, there are the call of "doStop(...)" and "destoryBeans()". The "doStop()" method will stop the connection. And the "destoryBeans()" will close the connection.
(2) XmlWebApplicationContext calls 
      private void doStop(Map lifecycleBeans, String beanName) {
        Lifecycle bean = (Lifecycle) lifecycleBeans.get(beanName);
        if (bean != null) {
         String[] dependentBeans = getBeanFactory().getDependentBeans(beanName);
         for (int i = 0; i < dependentBeans.length; i++) {
          doStop(lifecycleBeans, dependentBeans[i]);
         }
         if (bean.isRunning()) {
          bean.stop();
         }
         lifecycleBeans.remove(beanName);
        }
 }
    The line bean.stop() in the above code will do the work.
  (3)The bean here is DefaultMessageListenerContainer. It calls stop() which calls doStop().
  (4)protected void doStop() throws JMSException {
  synchronized (this.lifecycleMonitor) {
   this.running = false;
   this.lifecycleMonitor.notifyAll();
  }

  if (sharedConnectionEnabled()) {
   stopSharedConnection();
  }
 }
  (5)protected void stopSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   this.sharedConnectionStarted = false;
   if (this.sharedConnection != null) {
    try {
     this.sharedConnection.stop();
    }
    catch (javax.jms.IllegalStateException ex) {
     logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex);
    }
   }
  }
 }

References

  1. http://forum.spring.io/forum/other-spring-related/remoting/24208-what-s-the-best-practice-for-using-jms-in-spring