Wednesday, October 29, 2014

To Take Foreign Language Input in JSF Applicaiton

The environment I use is the following: Tomcat 6, MySQL 5.5, JSF2 Facelets. The application needs to be able to allow the user to type in foreign characters such as Chinese characters and save them into the MySql database. To accomplish this, the following is the list of changes that will make it work. Some of them are actually not needed.
  1. The column that can take the input is "comment" in the table xyz. Its type is TEXT. Do the following to modify the character set of the column:
    alter table xyz modify comment text character set utf8;
    
  2. I use the file META-INF/context.xml for the datasource:
    <Context>
        <Manager pathname="" />
     <Resource name="jdbc/inventory" auth="Container" type="javax.sql.DataSource"
      url="jdbc:mysql://localhost:3306/yourDBName" 
      ...
         />
    </Context>
    
    Change the url to:
      url="jdbc:mysql://localhost:3306/yourDBName?useUnicode=true&amp;characterEncoding=UTF-8" 
    
  3. In the file C:\apache-tomcat-6.0.35\conf\server.xml, add a line URIEncoding="UTF-8" in the Connector configuration:
    <Connector port="8080" protocol="HTTP/1.1" 
                   connectionTimeout="20000" 
                   redirectPort="8443"
                   URIEncoding="UTF-8" />
    
    But this may not be needed.
  4. All my xhtml pages use a template. In the template file, there is the line
    <h:head>
     <meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1" />
            ...
    </h:head>
    
    Change the charset in the line to charset=utf-8. But this is actually not needed.
  5. I use log4j.xml for the log file. Add a line for Encoding as below. This way the log file will not just show ? for the Chinese characters.
    <appender name="rollingFileSize" class="org.apache.log4j.RollingFileAppender">
      <param name="File" value="${logfile}" />
      <param name="MaxFileSize" value="20MB" />
      <param name="MaxBackupIndex" value="10" />
      <param name="Encoding" value="UTF-8" />
      ...
     </appender>
    
  6. In the faces-config.xml file, there is usually the following:
    <application>
        ...
        <locale-config>
         <default-locale>en</default-locale>
         <supported-locale>es</supported-locale>
        </locale-config>
     </application>
    
    You can add the following to support Chinese.
     <supported-locale>zh_CN</supported-locale>
    
    But it is not needed for the purpose to accept Chinese character input.

Friday, October 17, 2014

Spring JMS Caching

Note: Spring 2.5.6 is used in this analysis.

Use Spring JmsTemplate to Send Message

when Spring JmsTemplate is used to send message, we can use the method

public void send(MessageCreator messageCreator){}. 
It will call
public Object execute(SessionCallback action, boolean startConnection){}.
The variable startConnection is false in this case. In our test case, that method will call
conToClose = createConnection();
This method is
protected Connection createConnection() throws JMSException {
  return getConnectionFactory().createConnection();
 }
After sending the message, it will finally call
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);

For a simple ConnectionFactory (for example:com.tibco.tibjms.TibjmsConnectionFactory)), when the send(...) method is called, it will create a new connection instance and a new session instance and a new producer to send the message. And then in the finally{} block, the connection and the session will be closed. And next time the send(...) is called, a new connection and session will be created.

But if the Spring SingleConnectionFactory is used, the connection will be reused. But the session and the producer are not reused. You can construct a SingleConnectionFactory to wrap a simple connection factory:

public SingleConnectionFactory(ConnectionFactory targetConnectionFactory);

The class CachingConnectionFactory extends the Spring SingleConnectionFactory. When it is used,the connection, session, and the producer will all be reused. You can construct a CachingConnectionFactory to wrap a simple factory factory:

public CachingConnectionFactory(ConnectionFactory targetConnectionFactory);

For the CachingConnectionFactory, when the

conToClose = createConnection();
is executed, it will return a proxy of the connecion. The proxy is of the type SingledConnectionFactory@SharedConnectionInvocationHandler. The trick is in the "close" method of the connection. When the "finally" block closes the connection,the proxy just does nothing! The connection and the session will be reused next time the send(...) method is called.

Recovery

Note that when a JmsTemplate bean is created, no matter what connection factory is used, the connection is not established. The JMS connection is created the first time a message needs to be sent. Because of this, the application can start successfully even if the JMS connection does not work at the startup time.

When a message needs to be sent, JmsTemplate will create a connection. If there is an exception occuring, the process will be stopped.There is no retry.

We can create a mechanism to have the retry ability. Instead of calling the send(messageCreator) method of JmsTemplate directly, we can wrap it as follows:

 void mySend(MessageCreator messageCreator) throws JmsException {  
   while (true) {
      try {
         this.jmsTemplate.send(messageCreator);
         break;
      } catch (JmsException e) {
         handleException(e, tries);
      }
   }
 }
In this code, if there is anything wrong, handleException can handle it any way you want. For example, it can sleep a while and let the loop execute again. You can configure the number of the retries. Then after the maximum retries have been attempted unsuccessfully, handleException will throw the exception that will end the while-loop.

Note that everytime the send(messageCreator) method of JmsTemplate is called, it will create a connection, session,and producer to send the message. The connection is created from a connection factory. So if it is a cached factory such as the CachingConnectionFactyory or SingleConnectionFactory, the same connection may be returned. But the connection may be bad. So in handleException, you can call the onException() method of SingleConnectionFactory, which will reset the connection. And the next time the connection factory will create a brandnew connection when asked.

Test Results

The following is the actual test data. I used the debug in running the test. Tibco JMS is used on the backend. The basic code is the following:

        public void process(String[] args) throws Exception {  
  IJmsSender jmsSender = (IJmsSender) getBean("jmsSender");
  test(jmsSender);
 }

 private void test(IJmsSender jmsSender) {
  try {
   String msg = "msg1";
   jmsSender.sendMessage(msg);
   // second time
   msg = "msg2";
   jmsSender.sendMessage(msg);
   // third time
   msg = "msg3";
   jmsSender.sendMessage(msg);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
The jmsSender is just a POJO. It uses the following method to send message:
       public void sendMessage(final String message) {
  jmsTemplate.send(new MessageCreator() {
   @Override
   public Message createMessage(Session session) throws JMSException {
    BytesMessage bytesMsg = session.createBytesMessage();
    bytesMsg.writeBytes(message.getBytes());
    return bytesMsg;
   }

  });
 } 

To check the connection, session and producer object details, I found these objects have the objectDelegate attribute. That attribute is an object and it contains many data. The important one is the id value: _connid, _session_id, and _prodid. In the test, the message is sent 3 times.

If TibjmsConnectionFactory is used, every time a new _connid, _sessionid, and _prodid will be generated.

If SingleConnectionFactory is used, the results are below for the 3 sending:

                        
                        SingleConnectionFactory
connection objectDelegate          session objectDelegate              producer objectDelegate
(TibjmsConnection)                  (TibjmsxSessionImpl)                (TibjmsTopicPublisher)
_connid                              _sessionid                         _prodid
51292                              103950                               45146
51292                              103977                               45155
51292                              103978                               45156

If CachingConnectionFactory is used, the results are below for the 3 sending:

                        
                        CachingConnectionFactory
connection objectDelegate          session objectDelegate              producer objectDelegate
(TibjmsConnection)                  (TibjmsxSessionImpl)                (TibjmsTopicPublisher)
_connid                              _sessionid                         _prodid
51306                              103981                               45157
51306                              103981                               45157
51306                              103981                               45157

CachingConnectionFactory's cacheProducers Property

CachingConnectionFactory has a property cacheProducers. It is not inherited from SingleConnectionFactory. Its default value is true. If the value of this property is set to false, the producer will not be reused. So in the test case for CachingConnectionfactory above, if cacheProducers is false, the _connid and _sessionid will still be the same in the three message sendings. But the _prodid will be all different!

Note that according to the javadoc of the class, the cache is to cache JMS MessageProducers per JMS session instance (more specifically: one MessageProducer per Destination and Session).

CachingConnectionFactory's cacheConsumers Property

CachingConnectionFactory has a property cacheConsumers. It is not inherited from SingleConnectionFactory. Its default value is true. For JmsTemplate to send messages, this value does not affect the caching of the connection, session and the producer. So if the value is set to false, the 3 sendings in the above test ( suppose cacheProducers is the default true ) will use the same _connid, _sessionid, and _prodid.

Note that according to the javadoc of the class, the cache is to cache JMS MessageConsumers per JMS Session instance ( more specifically: one MessageConsumer per Destination, selector String and Session). The Durable subscribers will only be cached until logical closing of the Session handle.

SingleConnectionFactory's reconnectOnException Property

The class SingleConnectionFactory has a property reconnectOnException. The basic work flow is the following.

1. An application will first call the createConnection method of the Factory to get a JMS connection. The method in SingleConnectionFactory is

public Connection createConnection() throws JMSException {
  synchronized (this.connectionMonitor) {
   if (this.connection == null) {
    initConnection();
   }
   return this.connection;
  }
 }
Since the connection is initially null, it will call initConnection() to create a connection.

2. The initConnection() method is

public void initConnection() throws JMSException {
  if (getTargetConnectionFactory() == null) {
   throw new IllegalStateException(
     "'targetConnectionFactory' is required for lazily initializing a Connection");
  }
  synchronized (this.connectionMonitor) {
   if (this.target != null) {
    closeConnection(this.target);
   }
   this.target = doCreateConnection();
   prepareConnection(this.target);
   if (logger.isInfoEnabled()) {
    logger.info("Established shared JMS Connection: " + this.target);
   }
   this.connection = getSharedConnectionProxy(this.target);
  }
 }
Inside this method, it calls prepareConnection. This method will check if the attribute reconnectOnException is true. If yes, it will add this SingleConnectionFactory instance as an internal listener to the listener chain. Note that the SingleConnectionFactory implements the ExceptionListener interface. So it is also an exception listener.

3. The user of the JMS connection will use the connection. Since the SingleConnectionFactory implements the connection proxy, it will not close the actual connection when the close method is called. So next time the user calls the createConnection method, the connection is not null and the same connection will be returned.

4. Now if for some reason, the SingleConnectionFactory is invoked as an Exception listener, it will execute the mehtod

 /**
  * Exception listener callback that renews the underlying single Connection.
  */
 public void onException(JMSException ex) {
  resetConnection();
 }
/**
  * Reset the underlying shared Connection, to be reinitialized on next access.
  */
 public void resetConnection() {
  synchronized (this.connectionMonitor) {
   if (this.target != null) {
    closeConnection(this.target);
   }
   this.target = null;
   this.connection = null;
  }
 }
So after this, the connection will be closed and set to null.

5. The next time the createConnection method is called, since the connection is null, a new connection will be created.

Now a big question. When will the SingleConnectionFactory be invoked as an Exception listener? If you look at the code of JmsTemplate, its send() method can throw JmsException. The user will handle the JmsException. Nowhere is the Exception listener invoked in JmsTemplate.

After reading more code, I found that the ExceptionListener is invoked by the standard JMS contract. The JMS provider should invoke the ExceptionListener when it detects a serious problem. All the programmers need to do is to register the ExceptionListener on the connection using the Standard JMS API.

In the method prepareConnection(Connection con) of SingleConnectionFactory, the last line is

   con.setExceptionListener(listenerToUse);
The setExceptionListner method is an API method of javax.jms.Connection. Its javadoc is the following:
   
       Sets an exception listener for this connection.
       If a JMS provider detects a serious problem with a connection, it informs the 
connection's ExceptionListener, if one has been registered. It does this by calling 
the listener's onException method, passing it a JMSException object describing the 
problem.
       An exception listener allows a client to be notified of a problem 
asynchronously. Some connections only consume messages, so they would have no other 
way to learn their connection has failed.
   
       A connection serializes execution of its ExceptionListener.
   
       A JMS provider should attempt to resolve connection problems itself before it 
notifies the client of them.
   
       Parameters:
           listener - the exception listener 
       Throws:
           JMSException - if the JMS provider fails to set the exception listener for 
this connection.
       void 
    setExceptionListener(ExceptionListener listener) throws JMSException;
But what is considered "a serious problem" by a JMS provider? It is still not clear. A JMSException thrown in the user code does not seem to be related to this issue. When a JMSException is thrown during the runtime, JmsTemplate will just convert it to a JmsException and throw it back to the user for handling. The method is below.
    public Object execute(SessionCallback action, boolean startConnection) throws JmsException{
     try{
      ..........
     }catch(JMSException es){
       throw convertJmsAccessException(ex);
     }finally{
       ..........
     }
  }
I did a test to let the MessageCreator throw a JMSException when the second message is sent. And I catch the exception outside the jmsTemplate class and just ignore it. The third message still uses the same _connid, _sessonid, and _prodid.

By the way, note that JmsException is different from JMSException. The former is a Spring class. The later is a standard java class.

Friday, October 3, 2014

On the "nillable" Attribute in XML Schema

In the XSD, you can define an element that is nillable:
  <xs:element name="abc" nillable="true" minOccurs="1" type="xs:string"/>
What does this "nillable" exactly mean?

If the element has to be there and you want it to be able to have null value, then you need to define nillable="true". And you can do the following in the XML file:

  <abc xsi:nil="true" />
The JAXB unmarshaller will create the object with abc=null.

Note that if you do <abc/> or <abc></abc>, then the element "abc" will have the value "". It is not null.

If the element does not need to be there, i.e, if minOccurs="0", then you do not need to have nillable="true". You can simply skip this element in the XML file. And the JAXB unmarshaller wil create an object with abc=null.

For element of type int, JAXB will create the Integer object for the element. So the effect will be the same as String when an int is nillable.

For JAXB marshaller, if the object has abc=null, then if in the schema minOccurs="0" for the element, no matter whether abc is nillable or not, the generated XML file will not have this element. But If minOccurs="1", then if nillable="false", the generated XML file will not have this element; if nillable="true", the generated XML file will be <abc xsi:nil="true" />.

Saturday, September 13, 2014

Fifth year of blog

Yesterday is Sept. 12. It's been 5 years since this blog was created. This past summer was not hot in Pittsburgh. I think there were only several days with the temperature over 90F. Today's temperature is 44 - 65, cloudy. There was some rain yesterday.

I do not have a good harvest in the garden. The tomatoes did not grow well. Probably it is because that I have planted the tomatoes in the same area in the last several years.

Tuesday, June 3, 2014

JAXB Marshalling and Unmarshalling

Suppose your XSD file is your.xsd in a directory schemas.
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns="http://..."
 targetNamespace="http://..."
 elementFormDefault="qualified" attributeFormDefault="unqualified"
 version="2.0">

 <xs:element name="foo" type="Xyz" />
 <xs:complexType name="Xyz">
  <xs:sequence>
   ...
  </xs:sequence>
 </xs:complexType>
</xs:schema>
In maven, you can use the following to generate the java classes from the xsd file in the build process.
<plugin>
 <groupId>org.jvnet.jaxb2.maven2</groupId>
 <artifactId>maven-jaxb2-plugin</artifactId>
 <version>0.7.3</version>
 <executions>
  <execution>
   <phase>generate-sources</phase>
   <goals>
    <goal>generate</goal>
   </goals>
  </execution>
 </executions>
 <configuration>
  <generatePackage>your.package</generatePackage>
  <schemaDirectory>path.of.folder.of.xsd</schemaDirectory>
 </configuration>
</plugin>

To marshal a java object to XML, and validate

Note that depending on whether an element is defined as a root element or not, the marshal/unmarshal code can be different. The following code is for the XSD defined above.

 public String marshall(Xyz fooObj) throws JAXBException,
   SAXException {
  JAXBElement xmlMsg = new your.dir.ObjectFactory()
    .createXyz(fooObj);
  JAXBContext jaxbContext = JAXBContext.newInstance(Xyz.class);
  Marshaller marshaller = jaxbContext.createMarshaller();

  // valiate that the data in the object conforms to the schema
  SchemaFactory schemaFactory = SchemaFactory
    .newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
  URL xsdUrl = getClass().getClassLoader().getResource(
    "schemas/your.xsd"); // schemas/your.xsd needs to be in the classpath
  Schema schema = schemaFactory.newSchema(xsdUrl);
  marshaller.setSchema(schema); // this enables the validation

  StringWriter sw = new StringWriter();
  marshaller.marshal(xmlMsg, sw);
  return sw.toString();
}

To unmarshal from XML to a java object

JAXBContext jaxbContext = JAXBContext
  .newInstance(your.dir.ObjectFactory.class);

Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
String msg = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?><foo xmlns=...>...</foo>";
  
StringReader reader = new StringReader(msg);

// validate if the message conforms to the schema
SchemaFactory schemaFactory = SchemaFactory
    .newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
URL xsdUrl = getClass().getClassLoader().getResource(
    "schemas/your.xsd"); // schemas/your.xsd needs to be in the classpath
Schema schema = schemaFactory.newSchema(xsdUrl);
jaxbUnmarshaller.setSchema(schema);  // this enables the validation

JAXBElement elem = (JAXBElement) jaxbUnmarshaller.unmarshal(reader);

Xyz fooObj = (Xyz) elem.getValue();

Thursday, May 1, 2014

Spring DefaultMessageListenerContainer and JmsTemplate

This analysis uses Spring 2.5.6.

DefaultMessageListenerContainer

An example of using the DefaultMessageListenerContainer class is as follows:

       <bean id="jmsContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="jmsDestConnectionFactory" />
  <property name="destination" ref="myDestination" />
  <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
  <property name="messageListener" ref="messageListener" />
  <property name="sessionTransacted" value="true" />
  <property name="concurrentConsumers" value="1"></property>
 </bean>>
There is also the property transactionManager. For simplification, we just use DMLC to denote the class DefaultMessageListenerContainer.

The logic on how transaction mangement is used is as follows.

  1. If the property "transactionManager" is set, then that transaction manager will be used. The property "sessionTransacted" will be ignored.
  2. Else if "sessionTransacted" is set to "true", local transaction will be used. And the property "sessionAcknowledgeModeName" will be ignored. Programs need to use session.commit or session.rollback to manage the transaction.
  3. Else there is no transaction. The property "sessionAcknowledgeModeName" comes into effect.
The following is from Spring javadoc site "http://docs.spring.io/spring/docs/2.5.6/api/org/springframework/jms/listener/DefaultMessageListenerContainer.html".
======= javadoc of DefaultMessageListenerContainer.html ========
It is strongly recommended to either set "sessionTransacted" to "true" or specify an external "transactionManager". See the AbstractMessageListenerContainer javadoc for details on acknowledge modes and native transaction options, as well as the AbstractPollingMessageListenerContainer javadoc for details on configuring an external transaction manager.
====== javadoc of AbstractPollingMessageListenerContainer =====
setTransactionManager
public void setTransactionManager(PlatformTransactionManager transactionManager)
Specify the Spring PlatformTransactionManager to use for transactional wrapping of message reception plus listener execution.
Default is none, not performing any transactional wrapping. If specified, this will usually be a Spring JtaTransactionManager or one of its subclasses, in combination with a JTA-aware ConnectionFactory that this message listener container obtains its Connections from.
Note: Consider the use of local JMS transactions instead. Simply switch the "sessionTransacted" flag to "true" in order to use a locally transacted JMS Session for the entire receive processing, including any Session operations performed by a SessionAwareMessageListener (e.g. sending a response message). Alternatively, a JmsTransactionManager may be used for fully synchronized Spring transactions based on local JMS transactions. Check AbstractMessageListenerContainer's javadoc for a discussion of transaction choices and message redelivery scenarios.
=====================================================

The Spring class hierarchy is the following:

JmsAccessor
    ^
JmsDestinationAccessor
    ^
AbstractJmsListeningContainer
    ^
AbstractMessageListenerContainer
    ^
AbstractPollingMessageListenerContainer
    ^
DefaultMessageListenerContainer

The top class JmsAccessor implements the Spring InitializingBean interface:
public interface InitializingBean {
 void afterPropertiesSet() throws Exception;
}
The Initialization of class

The case that a transaction manager is not configured

The class AbstractJmsListeningContainer has the following:
AbstractJmsListeningContainer:
public void afterPropertiesSet() {
  super.afterPropertiesSet();
  validateConfiguration();
  initialize();
 }
 

So the method initialize() of the class will be invoked when the bean is created. It turns out that the sub classes below AbstractJmsListeningContainer override this method and at the same time call the same method in the super class. For DefaultMessageListenerContainer, it is the following:

DefaultMessageListenerContainer:
public void initialize() {
  // Adapt default cache level.
  if (this.cacheLevel == CACHE_AUTO) {
   this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
  }

  // Prepare taskExecutor and maxMessagesPerTask.
  synchronized (this.lifecycleMonitor) {
   if (this.taskExecutor == null) {
    this.taskExecutor = createDefaultTaskExecutor();
   }
   else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
     ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
     this.maxMessagesPerTask == Integer.MIN_VALUE) {
    // TaskExecutor indicated a preference for short-lived tasks. According to
    // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
    // unless the user specified a custom value.
    this.maxMessagesPerTask = 10;
   }
  }

  // Proceed with actual listener initialization.
  super.initialize();
 }
The default value of cacheLevel is CACHE_AUTO. Since we are not setting any transaction manager, the cache level will be set to CACHE_CONSUMER. Then it creates the taskExecutor, an instance of SimpleAsyncTaskExecutor. Then it calls the super.initialize():
AbstractPollingMessageListenerContainer:
public void initialize() {
  // Set sessionTransacted=true in case of a non-JTA transaction manager.
  if (!this.sessionTransactedCalled &&
    this.transactionManager instanceof ResourceTransactionManager &&
    !TransactionSynchronizationUtils.sameResourceFactory(
      (ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
   super.setSessionTransacted(true);
  }

  // Use bean name as default transaction name.
  if (this.transactionDefinition.getName() == null) {
   this.transactionDefinition.setName(getBeanName());
  }

  // Proceed with superclass initialization.
  super.initialize();
 }
Since no transaction manager is set, the first "if" is skipped. The second "if" sets the name. And then it executes the super.initialize():
AbstractJmsListeningContainer:
public void initialize() throws JmsException {
  try {
   synchronized (this.lifecycleMonitor) {
    this.active = true;
    this.lifecycleMonitor.notifyAll();
   }
   if (this.autoStartup) {
    doStart();
   }
   doInitialize();
  }
  catch (JMSException ex) {
   synchronized (this.sharedConnectionMonitor) {
    ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
    this.sharedConnection = null;
   }
   throw convertJmsAccessException(ex);
  }
 }
 
The autoStartup is true by default. So it will call doStart() and doInitialize(). In summary, the doStart() method will create the JMS connection. And the doInitialize() method will create and start a thread for each consumer. The thread will do a while-loop. In the loop,it will use the connection created in the doStart() method to create the session and consumer to consume the messages. If an exception is thrown in the loop, it will be caught. The thread will end. And a new thread will start.
-----------------------------------------
doStart()
-----------------------------------------

The method doStart() is this:

AbstractJmsListeningContainer:
protected void doStart() throws JMSException {
  // Lazily establish a shared Connection, if necessary.
  if (sharedConnectionEnabled()) {
   establishSharedConnection();
  }

  // Reschedule paused tasks, if any.
  synchronized (this.lifecycleMonitor) {
   this.running = true;
   this.lifecycleMonitor.notifyAll();
   resumePausedTasks();
  }

  // Start the shared Connection, if any.
  if (sharedConnectionEnabled()) {
   startSharedConnection();
  }
 }
The method sharedConnectionEnabled() will return true since the cache level has been set to CACHE_CONSUMER:
DefaultMessageListenerContainer:
protected final boolean sharedConnectionEnabled() {
  return (getCacheLevel() >= CACHE_CONNECTION);
 }

So next call the following:

DefaultMessageListenerContainer:
protected void establishSharedConnection() {
  try {
   super.establishSharedConnection();
  }
  catch (Exception ex) {
   logger.debug("Could not establish shared JMS Connection - " +
     "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
  }
 }
And then call the following:
AbstractJmsListeningContainer:
protected void establishSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   if (this.sharedConnection == null) {
    this.sharedConnection = createSharedConnection();
    logger.debug("Established shared JMS Connection");
   }
  }
 }

protected Connection createSharedConnection() throws JMSException {
  Connection con = createConnection();
  try {
   prepareSharedConnection(con);
   return con;
  }
  catch (JMSException ex) {
   JmsUtils.closeConnection(con);
   throw ex;
  }
 }

protected Connection createConnection() throws JMSException {
  return getConnectionFactory().createConnection();
 }

protected void prepareSharedConnection(Connection connection) throws JMSException {
  String clientId = getClientId();
  if (clientId != null) {
   connection.setClientID(clientId);
  }
 } 
Notes: 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(1) Now the JMS connection has been created! 
(2) If the connection creation throws an exception, no retry will be made. The bean creation will fail.
(3) What if the conneciton is good but it fails after the bean is created successfully? Later on we will
    see in the class AsyncMessageListenerInvoker that Spring will try to reestablish the connection. The 
    method for this is recoverAfterListenerSetupfailure().
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Next call in doStart() is the following:
resumePausedTasks();

Need to check this.  Then the following call is 

protected void startSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   this.sharedConnectionStarted = true;
   if (this.sharedConnection != null) {
    try {
     this.sharedConnection.start();
    }
    catch (javax.jms.IllegalStateException ex) {
     logger.debug("Ignoring Connection start exception - assuming already started: " + ex);
    }
   }
  }
 }

--------------------------------
doInitialize()
--------------------------------
DefaultMessageListenerContainer:
protected void doInitialize() throws JMSException {
  synchronized (this.lifecycleMonitor) {
   for (int i = 0; i < this.concurrentConsumers; i++) {
    scheduleNewInvoker();
   }
  }
 }

By default, concurrentConsumers = 1.

AbstractJmsListeningContainer:
private void scheduleNewInvoker() {
  AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
  if (rescheduleTaskIfNecessary(invoker)) {
   // This should always be true, since we're only calling this when active.
   this.scheduledInvokers.add(invoker);
  }
 }

protected final boolean rescheduleTaskIfNecessary(Object task) {
  if (this.running) {
   try {
    doRescheduleTask(task);
   }
   catch (RuntimeException ex) {
    logRejectedTask(task, ex);
    this.pausedTasks.add(task);
   }
   return true;
  }
  else if (this.active) {
   this.pausedTasks.add(task);
   return true;
  }
  else {
   return false;
  }
 } 
 
The variable running has already been set to true in the doStart() method. So doRescheduleTask will execute.
DefaultMessageListenerContainer:
protected void doRescheduleTask(Object task) {
  this.taskExecutor.execute((Runnable) task);
 }
 
The taskExecutor has already been created in the beginning part of initialize() of DefaultMessageListenerContainer.It is an instance of SimpleAsyncTaskExecutor. The execute method is as follows:
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncTaskExecutor, Serializable 

SimpleAsyncTaskExecutor:
public void execute(Runnable task) {
  execute(task, TIMEOUT_INDEFINITE);
 }

 /**
  * Executes the given task, within a concurrency throttle
  * if configured (through the superclass's settings).
  * 

Executes urgent tasks (with 'immediate' timeout) directly, * bypassing the concurrency throttle (if active). All other * tasks are subject to throttling. * @see #TIMEOUT_IMMEDIATE * @see #doExecute(Runnable) */ public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); doExecute(new ConcurrencyThrottlingRunnable(task)); } else { doExecute(task); } }

After checking several classes, it seems that isThrottleActive() is false. So it will call
protected void doExecute(Runnable task) {
  createThread(task).start();
 }
 
 CustomizableThreadCreator:
public Thread createThread(Runnable runnable) {
  Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
  thread.setPriority(getThreadPriority());
  thread.setDaemon(isDaemon());
  return thread;
 } 

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 
So at this point, a thread is created and started. 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Debugging shows that getThreadGroup() returns null, and nextThreadName() returns "jmsContainer-1". And the thread dump of the weblogic server does show this thread:
 "jmsContainer-1" waiting for lock java.lang.Object@3a421d TIMED_WAITING
          
             java.lang.Object.wait(Native Method)
          
             com.tibco.tibjms.TibjmsxSessionImp._getSyncMessage(TibjmsxSessionImp.java:2141)
          
             com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:1977)
          
             com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:240)
          
             com.tibco.tibjms.TibjmsMessageConsumer.receive(TibjmsMessageConsumer.java:440)
          
             com.xyz.mi.decorator.jms.XyzJmsMessageConsumer.receive(XyzJmsMessageConsumer.java:79)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:405)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
          
             org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
          
             org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
          
             java.lang.Thread.run(Thread.java:619)

Note here that the "task" object is the following invoker:
AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
 
Now we get to the actual task class that will do the essential work. It is an inner class of DefaultMessageListenerContainer.

There are lots of code there. But it is good to copy the whole class below.

 //-------------------------------------------------------------------------
 // Inner classes used as internal adapters
 //-------------------------------------------------------------------------

 /**
  * Runnable that performs looped MessageConsumer.receive() calls.
  */
 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {

  private Session session;

  private MessageConsumer consumer;

  private Object lastRecoveryMarker;

  private boolean lastMessageSucceeded;

  private int idleTaskExecutionCount = 0;

  private volatile boolean idle = true;

  public void run() {
   synchronized (lifecycleMonitor) {
    activeInvokerCount++;
    lifecycleMonitor.notifyAll();
   }
   boolean messageReceived = false;
   try {
    if (maxMessagesPerTask < 0) {
     messageReceived = executeOngoingLoop();
    }
    else {
     int messageCount = 0;
     while (isRunning() && messageCount < maxMessagesPerTask) {
      messageReceived = (invokeListener() || messageReceived);
      messageCount++;
     }
    }
   }
   catch (Throwable ex) {
    clearResources();
    if (!this.lastMessageSucceeded) {
     // We failed more than once in a row - sleep for recovery interval
     // even before first recovery attempt.
     sleepInbetweenRecoveryAttempts();
    }
    this.lastMessageSucceeded = false;
    boolean alreadyRecovered = false;
    synchronized (recoveryMonitor) {
     if (this.lastRecoveryMarker == currentRecoveryMarker) {
      handleListenerSetupFailure(ex, false);
      recoverAfterListenerSetupFailure();
      currentRecoveryMarker = new Object();
     }
     else {
      alreadyRecovered = true;
     }
    }
    if (alreadyRecovered) {
     handleListenerSetupFailure(ex, true);
    }
   }
   synchronized (lifecycleMonitor) {
    decreaseActiveInvokerCount();
    lifecycleMonitor.notifyAll();
   }
   if (!messageReceived) {
    this.idleTaskExecutionCount++;
   }
   else {
    this.idleTaskExecutionCount = 0;
   }
   synchronized (lifecycleMonitor) {
    if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
     // We're shutting down completely.
     scheduledInvokers.remove(this);
     if (logger.isDebugEnabled()) {
      logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
     }
     lifecycleMonitor.notifyAll();
     clearResources();
    }
    else if (isRunning()) {
     int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
     if (nonPausedConsumers < 1) {
      logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
        "Check your thread pool configuration! Manual recovery necessary through a start() call.");
     }
     else if (nonPausedConsumers < getConcurrentConsumers()) {
      logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
        "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
        "to be triggered by remaining consumers.");
     }
    }
   }
  }

  private boolean executeOngoingLoop() throws JMSException {
   boolean messageReceived = false;
   boolean active = true;
   while (active) {
    synchronized (lifecycleMonitor) {
     boolean interrupted = false;
     boolean wasWaiting = false;
     while ((active = isActive()) && !isRunning()) {
      if (interrupted) {
       throw new IllegalStateException("Thread was interrupted while waiting for " +
         "a restart of the listener container, but container is still stopped");
      }
      if (!wasWaiting) {
       decreaseActiveInvokerCount();
      }
      wasWaiting = true;
      try {
       lifecycleMonitor.wait();
      }
      catch (InterruptedException ex) {
       // Re-interrupt current thread, to allow other threads to react.
       Thread.currentThread().interrupt();
       interrupted = true;
      }
     }
     if (wasWaiting) {
      activeInvokerCount++;
     }
    }
    if (active) {
     messageReceived = (invokeListener() || messageReceived);
    }
   }
   return messageReceived;
  }

  private boolean invokeListener() throws JMSException {
   initResourcesIfNecessary();
   boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
   this.lastMessageSucceeded = true;
   return messageReceived;
  }

  private void decreaseActiveInvokerCount() {
   activeInvokerCount--;
   if (stopCallback != null && activeInvokerCount == 0) {
    stopCallback.run();
    stopCallback = null;
   }
  }

  private void initResourcesIfNecessary() throws JMSException {
   if (getCacheLevel() <= CACHE_CONNECTION) {
    updateRecoveryMarker();
   }
   else {
    if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
     updateRecoveryMarker();
     this.session = createSession(getSharedConnection());
    }
    if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
     this.consumer = createListenerConsumer(this.session);
    }
   }
  }

  private void updateRecoveryMarker() {
   synchronized (recoveryMonitor) {
    this.lastRecoveryMarker = currentRecoveryMarker;
   }
  }

  private void clearResources() {
   if (sharedConnectionEnabled()) {
    synchronized (sharedConnectionMonitor) {
     JmsUtils.closeMessageConsumer(this.consumer);
     JmsUtils.closeSession(this.session);
    }
   }
   else {
    JmsUtils.closeMessageConsumer(this.consumer);
    JmsUtils.closeSession(this.session);
   }
   this.consumer = null;
   this.session = null;
  }

  public boolean isLongLived() {
   return (maxMessagesPerTask < 0);
  }

  public void setIdle(boolean idle) {
   this.idle = idle;
  }

  public boolean isIdle() {
   return this.idle;
  }
 }
So now let's check out this class AsyncMessageListenerInvoker.

In its run() method, maxMessagesPerTask is initialized to a negative value. So if the value is not changed,this line will execute:

messageReceived = executeOngoingLoop();
In the above method, there is the following line:
messageReceived = (invokeListener() || messageReceived);
The method called is the following:
DefaultMessageListenerContainer$AsyncMessageListenerInvoker:
private boolean invokeListener() throws JMSException {
   initResourcesIfNecessary();
   boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
   this.lastMessageSucceeded = true;
   return messageReceived;
  }

private void initResourcesIfNecessary() throws JMSException {
   if (getCacheLevel() <= CACHE_CONNECTION) {
    updateRecoveryMarker();
   }
   else {
    if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
     updateRecoveryMarker();
     this.session = createSession(getSharedConnection());
    }
    if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
     this.consumer = createListenerConsumer(this.session);
    }
   }
  }
  
So by default, since the cache level has been set to CACHE_CONSUMER , a session will be created using the shared connection. And the consumer will then be created from this session.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Now the session and the consumer have been created.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  
AbstractPollingMessageListenerContainer:  
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
   throws JMSException {

  if (this.transactionManager != null) {
   // Execute receive within transaction.
   TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
   boolean messageReceived = true;
   try {
    messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
   }
   catch (JMSException ex) {
    rollbackOnException(status, ex);
    throw ex;
   }
   catch (RuntimeException ex) {
    rollbackOnException(status, ex);
    throw ex;
   }
   catch (Error err) {
    rollbackOnException(status, err);
    throw err;
   }
   this.transactionManager.commit(status);
   return messageReceived;
  }

  else {
   // Execute receive outside of transaction.
   return doReceiveAndExecute(invoker, session, consumer, null);
  }
 }
 
If the transactionManager is not set, then the method will just execute
 doReceiveAndExecute(invoker, session, consumer, null);
 
Its code is this:
 
AbstractPollingMessageListenerContainer: 
 protected boolean doReceiveAndExecute(
    Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
    throws JMSException {
 
   Connection conToClose = null;
   Session sessionToClose = null;
   MessageConsumer consumerToClose = null;
   try {
    Session sessionToUse = session;
    boolean transactional = false;
    if (sessionToUse == null) {
     sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
       getConnectionFactory(), this.transactionalResourceFactory, true);
     transactional = (sessionToUse != null);
    }
    if (sessionToUse == null) {
     Connection conToUse = null;
     if (sharedConnectionEnabled()) {
      conToUse = getSharedConnection();
     }
     else {
      conToUse = createConnection();
      conToClose = conToUse;
      conToUse.start();
     }
     sessionToUse = createSession(conToUse);
     sessionToClose = sessionToUse;
    }
    MessageConsumer consumerToUse = consumer;
    if (consumerToUse == null) {
     consumerToUse = createListenerConsumer(sessionToUse);
     consumerToClose = consumerToUse;
    }
    Message message = receiveMessage(consumerToUse);
    if (message != null) {
     if (logger.isDebugEnabled()) {
      logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
        consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
        sessionToUse + "]");
     }
     messageReceived(invoker, sessionToUse);
     boolean exposeResource = (!transactional && isExposeListenerSession() &&
       !TransactionSynchronizationManager.hasResource(getConnectionFactory()));
     if (exposeResource) {
      TransactionSynchronizationManager.bindResource(
        getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
     }
     try {
      doExecuteListener(sessionToUse, message);
     }
     catch (Throwable ex) {
      if (status != null) {
       if (logger.isDebugEnabled()) {
        logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
       }
       status.setRollbackOnly();
      }
      handleListenerException(ex);
      // Rethrow JMSException to indicate an infrastructure problem
      // that may have to trigger recovery...
      if (ex instanceof JMSException) {
       throw (JMSException) ex;
      }
     }
     finally {
      if (exposeResource) {
       TransactionSynchronizationManager.unbindResource(getConnectionFactory());
      }
     }
     return true;
    }
    else {
     if (logger.isTraceEnabled()) {
      logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
        "session [" + sessionToUse + "] did not receive a message");
     }
     noMessageReceived(invoker, sessionToUse);
     return false;
    }
   }
   finally {
    JmsUtils.closeMessageConsumer(consumerToClose);
    JmsUtils.closeSession(sessionToClose);
    ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
   }
 }
Notes:
  1. The doReceiveAndExecute(...) method lays out the overal process of processing the message. It is actually a little strange. First it uses the consumer to receive the message in the receiverMessage(...) call. Then it passes this received message to messageReceived(invoker, sessionToUse), which will eventually pass the message to the listeners that were registered by the setListener(...) method of the class AbstractMessageListenerContainer.
  2. The code uses the consumer and the session passed in to the method. So the consumer and the session are reused. Only when they are null, will the method create a new consumer and a new session. And in the "finally" block, it only closes the consumer and the session that are created in this method. Notice that in the "finally" block, the variable names are "consumerToClose" and "sessionToClose". The words are different from "consumerToUse" or "sessionToUse".
AbstractPollingMessageListenerContainer:
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
  return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
 } 
By default, the receiveTimeout in AbstractPollingMessageListenerContainer is 1000:
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
So basically the code is to try to get a message every 1000 milliseconds. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! This 1000 milliseconds give the CPU the breath room in the while-loop of the method executeOngoingLoop(). !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
DefaultMessageListenerContainer: 
protected void messageReceived(Object invoker, Session session) {
  ((AsyncMessageListenerInvoker) invoker).setIdle(false);
  scheduleNewInvokerIfAppropriate();
 } 

protected void scheduleNewInvokerIfAppropriate() {
  if (isRunning()) {
   resumePausedTasks();
   synchronized (this.lifecycleMonitor) {
    if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && getIdleInvokerCount() == 0) {
     scheduleNewInvoker();
     if (logger.isDebugEnabled()) {
      logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
     }
    }
   }
  }
 }
If a message is received, the current invoker is set to non-idle. In the method scheduleNewInvokerIfAppropriate(), if the total number of invokers are less than the configured max count and there is no idle invoker, then create a new invoker.

Note that if a message is not received, the method noMessageReceived(invoker, sessionToUse) will be called. This method will just set the current invoker to be idle.

AbstractMessageListenerContainer:
protected void doExecuteListener(Session session, Message message) throws JMSException {
  if (!isAcceptMessagesWhileStopping() && !isRunning()) {
   if (logger.isWarnEnabled()) {
    logger.warn("Rejecting received message because of the listener container " +
      "having been stopped in the meantime: " + message);
   }
   rollbackIfNecessary(session);
   throw new MessageRejectedWhileStoppingException();
  }
  try {
   invokeListener(session, message);
  }
  catch (JMSException ex) {
   rollbackOnExceptionIfNecessary(session, ex);
   throw ex;
  }
  catch (RuntimeException ex) {
   rollbackOnExceptionIfNecessary(session, ex);
   throw ex;
  }
  catch (Error err) {
   rollbackOnExceptionIfNecessary(session, err);
   throw err;
  }
  commitIfNecessary(session, message);
 } 
 
AbstractMessageListenerContainer:
protected void invokeListener(Session session, Message message) throws JMSException {
  Object listener = getMessageListener();
  if (listener instanceof SessionAwareMessageListener) {
   doInvokeListener((SessionAwareMessageListener) listener, session, message);
  }
  else if (listener instanceof MessageListener) {
   doInvokeListener((MessageListener) listener, message);
  }
  else if (listener != null) {
   throw new IllegalArgumentException(
     "Only MessageListener and SessionAwareMessageListener supported: " + listener);
  }
  else {
   throw new IllegalStateException("No message listener specified - see property 'messageListener'");
  }
 } 
 
Note here the listeners are invoked!
AbstractMessageListenerContainer:
protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
  listener.onMessage(message);
 }
Notes:
  1. Here Spring just passes the message to the onMessage(...) method of the listener. The Spring listener here is just a POJO. It knows nothing about JMS. If you look at the listener bean, you can see that it just implements the onMessage method. So the following observation:
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    Even though you specify a JMS listener in DefaultMessageListenerContainer, it is not 
    doing the asynchronous messaging!  It is not a real asynchronous mechanism. It just 
    uses the synchronous call receive(1000) repeatedly to simulate asynchronicity.
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    
  2. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
       The method doExecuteListener(...) calls the following method:
       protected void commitIfNecessary(Session session, Message message) throws 
    JMSException {
         // Commit session or acknowledge message.
         if (session.getTransacted()) {
          // Commit necessary - but avoid commit call within a JTA transaction.
          if (isSessionLocallyTransacted(session)) {
           // Transacted session created by this container -> commit.
           JmsUtils.commitIfNecessary(session);
          }
         }
         else if (isClientAcknowledge(session)) {
          message.acknowledge();
         }
     }
        This method takes care of the transaction in case of a locally transactional 
    session and the message acknowledgement in case of a non-transactional session. This
     method does not have any effect if the session is in a global transaction.
      
       The doExecuteListener(...) method also calls rollbackIfNecessary(session) if needed.
       !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    
  3. The API method onMessage(Message message) of MessageListener does not throw any Exception. So how can you make the transaction rollback? I think you can throw a RuntimeException inside the onMessage() method. This exception will be caught and the method rollbackOnExceptionIfNecessary(session, ex) is called. This method will rollback the transaction by calling session.rollback().

JmsTemplate

The class JmsTemplate extends JmsDestinationAccessor, which extends JmsAccessor. So you can also set the sessionTransacted and sessionAcknowledgeMode properties. By default, the values for these two properties are "false" and "AUTO_ACKNOWLEDGE". But in the code of JmsTemplate, these two property values are not used after the message is sent. The main method is

public Object execute(SessionCallback action, boolean startConnection) throws 
JmsException{
   ...
   try{
     ...
   }
   finally{
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(),
          startConnection);
   }
}
So it just tries to close the session and release the connection. There is no transaction commit or message acknowledgement. Why? What if sessionAcknowledgeMode is "CLIENT_ACKNOWLEDGE" or sessionTransacted is "true"?

Spring JMS pooling functionality

For database, the datasource object can maintain several connection objects in a pool so that a client can get an available connection from the pool, which can greatly enhance the performance. For JMS, how is the similar function done? From all the code above, I think the Spring JMS does not give several JMS connections. In the DefaultMessageListenerContainer, when shared, only one JMS connection is used. However, you can set multiple consumers that will use the same connection. So there is no connection pool in Spring JMS here. The connection pooling comes from the ConnectionFactory. The JmsAccessor class is a parent class of both the Spring JmsTemplate and DefaultMessageListenerContainer class. The JmsAccessor class has the ConnectionFactory instance variable. The javadoc of JmsTemplate has the following note: NOTE: The ConnectionFactory used with this template should return pooled Connections ( or a single shared Connection) as well as pooled Sessions and MessageProducers.

Spring JMS Consumer Caching

If the transaction manager is not set, DefaultMessageListenerContainer will change the default cache level from CACHE_AUTO to CACHE_CONSUMER. DMLC creates the consumers. And these consumers will be reused again and again in the loop. Of course, the connection and the session are also reused. So even if the ConnectionFactory does not returned a pooled connection, the same connection will be used.

Spring JMS Recovery functionality

Sending messages using JmsTemplate

There are basically the following steps when sending a message:

  1. create JMS connection
  2. create session
  3. create message producer
  4. send message
In the above steps, the “create” action does not necessarily create a new object. The underlying JMS factory or class may just return a pooled object. And when these objects are closed, they are not necessarily closed. The underlying implementation may just return the object to the pool.

Every time a message needs to be sent, the JmsTemplate class will call the “Send(…)” method. The method will go through the steps (1), (2), (3) and (4). If anything goes wrong in the above steps, it will throw an exception. There is no retry logic !

Receiving messages using DefaultMessageListenerContainer

Now let us check the recovery functionality of this Spring framework. What will the framework do if there is any connection failure, session failure, or other failure? Notice that a connection failure will cause the failure of session.

First of all, when the application is started, the DefaultMessageListenerContainer bean will be created. The creation process will create the JMS connection. If the connection cannot be established, the bean creation will fail. There is no retry for this. The application won't start successfully.

So the connection has to be good when the application starts. But after the application starts successfully, the DefaultMessageListenerContainer bean does have the recovery mechanism in case of connection failure or other exceptions in the middle. It will retry to create a new connection and new session and consumer to consume the message. The interval between recovery attempts defaults to 5000 ms. The following are more details on this.

The run() method of AsyncMessageListenerInvoker has the following structure:

public void run(){
   ...
   try{
      ...
      messageReceived = executeOngoingLoop();
      ...
   }catch(Throwable ex){
     ...
     if ( ...){
       ...
       handleListenerSetupFailuer(ex, false);
       recoverAfterListenerSetupFailure();
       ...
     }
   }
   ...
}
Inside the method executeOngoingLoop(), there is a call to the method doReceiveAndExeute() whose structure is the follows:
protected boolean doReceiveAndExecute(...){
    ...
    try{
       ...
       Message message = receiveMessage(consumerToUse);
       if ( message != null){
          ...
          try{
            doExecuteListener(sessionToUse, message);
          }catch(Throwable ex){
             ...
             handleListenerException(ex);
             // Rethrow JMSException to indicate an infrastructure problem
             // that may have to trigger recovery...
             if ( ex instanceof JMSException ){
                 throw (JMSException) ex;
             }
          }
       }
       else{
         ...
       }
     finally{
       ...
     }

So if there is any exception in executeOngoingLoop(), be it from connection or session or consumer or data access or anything else, the exception will be caught and processed. It won't be rethrown to the outside because the including method run() does not throw any exception!

Now pay close attention to the method doReceiveAndExecute(...). It will also catch and handle any Throwable. If the exception is not a JMSException, it will be swallowed! The exception will not be rethrown !!! Only when it is a JMSException, will the exception be rethrown and propagated to be lastly thrown by the method executeOngoingLoop().

The handleListenerException method is the following. It is defined in the upper class AbstractMessageListenerContainer.

protected void handleListenerException(Throwable ex) {
  if (ex instanceof MessageRejectedWhileStoppingException) {
   // Internal exception - has been handled before.
   return;
  }
  if (ex instanceof JMSException) {
   invokeExceptionListener((JMSException) ex);
  }
  if (isActive()) {
   // Regular case: failed while active.
   // Log at error level.
   logger.warn("Execution of JMS message listener failed", ex);
  }
  else {
   // Rare case: listener thread failed after container shutdown.
   // Log at debug level, to avoid spamming the shutdown log.
   logger.debug("Listener exception after container shutdown", ex);
  }
 }
So if the exception is a JMSException, the exception listener will be invoked. From debugging, it seems that the exception list is empty if you do not set any exception listeners. If the exception is not a JMSException, the code basically does nothing. You may create a subclass of DMLC to override this method. But it may not be very helpful because the method does not have the information about the message. It only has the exception object passed in the argument.

This way of handling the exception has another important effect. The method doExecuteListener(...) has the following structure:

protected void doExecuteListener(Session session, Message message) throws JMSException {
   ...
   try {
      invokeListener(session, message);
   }catch (JMSException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
   }catch (RuntimeException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
   }catch (Error err) {
       rollbackOnExceptionIfNecessary(session, err);
       throw err;
   }
   commitIfNecessary(session, message);
}

So if an exception is thrown, the method commitIfNecessary(...) won't be called. So the transaction is not committed if sessionTransacted=true. And the message is not acknowledged if there is no transaction and the acknowledge mode is CLIENT_ACKNOWLEDGE. Now if the exception is not a JMSException, the code will go back to the while(active) loop inside the executeOngoingLoop() method. The same message that caused the exception will be received. And most likely it will cause the same exception to be thrown and an infinite loop!

In the "catch" block of the run() method, first clearResources() will close the session and the consumer. If executeOngoingLoop() throws exception the first time it executes, the next call will be sleepInbetweenRecoveryAttempts(). By default this is to wait for 5 seconds:

DefaultMessageListenerContainer:
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

protected void sleepInbetweenRecoveryAttempts() {
  if (this.recoveryInterval > 0) {
   try {
    Thread.sleep(this.recoveryInterval);
   }
   catch (InterruptedException interEx) {
    // Re-interrupt current thread, to allow other threads to react.
    Thread.currentThread().interrupt();
   }
  }
 }
 
Next the following code is called:
synchronized (recoveryMonitor) {
    if (this.lastRecoveryMarker == currentRecoveryMarker) {
        handleListenerSetupFailure(ex, false);
 recoverAfterListenerSetupFailure();
 currentRecoveryMarker = new Object();
    }
    else {
 alreadyRecovered = true;
    }
}
In the method initResourcesIfNecessary, the method updateRecoveryMarker set the marker:
private void updateRecoveryMarker() {
   synchronized (recoveryMonitor) {
   this.lastRecoveryMarker = currentRecoveryMarker;
   }
}
So the if-condition is true.
 
protected void recoverAfterListenerSetupFailure() {
   refreshConnectionUntilSuccessful();
   refreshDestination();
}
It turns out that the method refreshConnectionUntilSuccessful is quite interesting. Just as its name implies, it will try to get a good connection until successful.
protected void refreshConnectionUntilSuccessful() {
  while (isRunning()) {
   try {
    if (sharedConnectionEnabled()) {
     refreshSharedConnection();
    }
    else {
     Connection con = createConnection();
     JmsUtils.closeConnection(con);
    }
    logger.info("Successfully refreshed JMS Connection");
    break;
   }
   catch (Exception ex) {
    StringBuffer msg = new StringBuffer();
    msg.append("Could not refresh JMS Connection for destination '");
    msg.append(getDestinationDescription()).append("' - retrying in ");
    msg.append(this.recoveryInterval).append(" ms. Cause: ");
    msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
    if (logger.isDebugEnabled()) {
     logger.info(msg, ex);
    }
    else if (logger.isInfoEnabled()) {
     logger.info(msg);
    }
   }
   sleepInbetweenRecoveryAttempts();
  }
 }

protected final void refreshSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
    ConnectionFactoryUtils.releaseConnection(
    this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted);
    this.sharedConnection = null;
    this.sharedConnection = createSharedConnection();
    if (this.sharedConnectionStarted) {
       this.sharedConnection.start();
    }
  }
}
Note that inside the method refreshConnectionUntilSuccessful is a loop that will keep going until the connection is refreshed successfully! Between two attemps, it sleeps for a while. Again the default sleep time is 5000ms. Also note that in our case, sharedConnectionStarted is true. So the method this.sharedConnection.start() will be called.

Now a big question is this. If we use the Spring SingleConnectionFactory, the connection is cached. The call to close the connection does not actually close the connection. It basically does nothing. So the createSharedConnection or the createConnection method will return the cached connection which can be bad. This will lead to an useless infinite loop. The cached connection can be reset when SingleConnectionFactory is called as an exception listener on the connection. But when and how will that happen? In the configuration for SingleConnectionFactory, we set reconnectOnException to true. This registers the SingleConnectionFactory as one exception listener to the connection created. But it is not clear if the JMS provider will invoke the listeners in case of a JMSException on the connection. If the connection reset won't be called, we should just use the simple non-caching ConnectionFactory. This way a new connection will be created. Note that for DefaultMessageListenerContainer, it does not seem to be necessary to use SingleConnectionFactory. DMLC will create a connection and the consumers at the beginning. After that, the same consumers will be used repeatedly to receive the messages.

At the end, there will be a call to rescheduleTaskIfNecessary(this).

DefaultMessageListenerContainer:
protected void doRescheduleTask(Object task) {
 this.taskExecutor.execute((Runnable) task);
} 
This will schedule a new task and now the new task will run! Meanwhile, the run() method of the current task ( thread ) will finish. So the current thread will finish. In the new task, the code will create the session and the consumer to consume the messages.

Now let's look at the case that a transaction manager is configured.

cacheLevel will be CACHE_NONE.

sharedConnectionEnabled() returns false.
An important line for transaction management is the following in the method receiveAndExecute(...) of the class AbstractPollingMessageListenerContainer:
this.transactionManager.commit(status);
===============================
Questions
===============================
1. When will the JMS session and the connection be closed? 
2. Same as in the Question 1, when is the JMS connection in the message sender(JmsTemplate) closed?
Answer to Queation 1:

First of all, the connection start() method is needed. It is actually called when the weblogic server is started.

The connection is stopped when I shut down the weblogic server. The sequence of the call is as follows:

(1) org.springframework.web.context.ContextLoader calls 
    public void closeWebApplicationContext(ServletContext servletContext) {...}
    which will call:
    protected void doClose() {
      ...
       Map lifecycleBeans = getLifecycleBeans();
       for (Iterator it = new LinkedHashSet(lifecycleBeans.keySet()).iterator(); it.hasNext();) {
        String beanName = (String) it.next();
        doStop(lifecycleBeans, beanName);
       }
       // Destroy all cached singletons in the context's BeanFactory.
       destroyBeans();
       // Close the state of this context itself.
       closeBeanFactory();
       onClose();
       synchronized (this.activeMonitor) {
        this.active = false;
       }
      }
 }
Note that in the above code, there are the call of "doStop(...)" and "destoryBeans()". The "doStop()" method will stop the connection. And the "destoryBeans()" will close the connection.
(2) XmlWebApplicationContext calls 
      private void doStop(Map lifecycleBeans, String beanName) {
        Lifecycle bean = (Lifecycle) lifecycleBeans.get(beanName);
        if (bean != null) {
         String[] dependentBeans = getBeanFactory().getDependentBeans(beanName);
         for (int i = 0; i < dependentBeans.length; i++) {
          doStop(lifecycleBeans, dependentBeans[i]);
         }
         if (bean.isRunning()) {
          bean.stop();
         }
         lifecycleBeans.remove(beanName);
        }
 }
    The line bean.stop() in the above code will do the work.
  (3)The bean here is DefaultMessageListenerContainer. It calls stop() which calls doStop().
  (4)protected void doStop() throws JMSException {
  synchronized (this.lifecycleMonitor) {
   this.running = false;
   this.lifecycleMonitor.notifyAll();
  }

  if (sharedConnectionEnabled()) {
   stopSharedConnection();
  }
 }
  (5)protected void stopSharedConnection() throws JMSException {
  synchronized (this.sharedConnectionMonitor) {
   this.sharedConnectionStarted = false;
   if (this.sharedConnection != null) {
    try {
     this.sharedConnection.stop();
    }
    catch (javax.jms.IllegalStateException ex) {
     logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex);
    }
   }
  }
 }

References

  1. http://forum.spring.io/forum/other-spring-related/remoting/24208-what-s-the-best-practice-for-using-jms-in-spring

Thursday, April 24, 2014

JMS Connection, Session, Destination, Producer and Consumer

In JMS, basically it is that the sender(producer) sends the messages to the receiver(consumer). But the sender and the receiver are completely decoupled. They do not need to know each other. The only common thing between them is the destination.

When you create a sender, you use the following steps:

  1. Get the connectionFactory. You can use IntialContext to get one.
  2. Create connection: connection = connectionFactory.createConnection()
  3. Create session: session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  4. Get/Create the destination: destination = context.lookup(a_destination_property); or destinaiton = session.createTopic(topicName);
    -------------------------------------------------------------------------
  5. Create producer: producer = session.createProducer(destination);
  6. Create and send message: message = session.createTextMessage("hello world"); producer.send(message);

When you create a consumer, you use the following steps:

  1. Get the connectionFactory. You can use IntialContext to get one.
  2. Create connection: connection = connectionFactory.createConnection()
  3. Create session: session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  4. Get/Create the destination: destination = context.lookup(a_destination_property); or destinaiton = session.createTopic(topicName);
    --------------------------------------------------------------------------
  5. Create consumer: consumer = session.createConsumer(destination);
  6. Receive message: message = consumer.receive(1000) or use message listeners:
    consumer.setMessageListener(new javax.jms.MessageListener()
            {
              public void onMessage(Message m)
              {
                try{
                   m.acknowledge();
                }catch(JMSException e){}
              }
             }

The first 4 steps of the sender and the consumer are the same!

When you create a listener, you use the same 5 steps as in creating a consumer. But in the next step, you just do the following:

6. Set listener: consumer.setMessageListener(new javax.jms.MessageListener(){ public void on Message(Message m){...} });

So an important question is: what is a destination? The sender and the receiver may be two processes running on the local machine. But where is this destination. Is this a process phisically running on the remote JMS server? How to manage this destination? For example, is the message retention policy performed on this destination instance?

When a connection is created, there is communication with the remote JMS provider. This can be seen from the following stacktrace when the connection fails to be created:

Caused by: javax.jms.JMSException: Failed to connect to the server at tcp://xxxxxx.com:48200
 at com.tibco.tibjms.TibjmsxLinkTcp._createSocket(TibjmsxLinkTcp.java:831)
 at com.tibco.tibjms.TibjmsxLinkTcp.connect(TibjmsxLinkTcp.java:922)
 at com.tibco.tibjms.TibjmsConnection._create(TibjmsConnection.java:1302)
 at com.tibco.tibjms.TibjmsConnection.(TibjmsConnection.java:4182)
 at com.tibco.tibjms.TibjmsxCFImpl._createImpl(TibjmsxCFImpl.java:209)
 at com.tibco.tibjms.TibjmsxCFImpl._createConnection(TibjmsxCFImpl.java:253)
 at com.tibco.tibjms.TibjmsConnectionFactory.createConnection(TibjmsConnectionFactory.java:58)
From this code, it can be seen that it is trying to create a socket.

Note: Now I got more information on this. A destination ( topic or queue ) is usually a permanent existence on the JMS provider. It is independent from any JMS connection and session. You can use JNDI to lookup for a destination. And vice versa, a JMS connection or session is independent from destination. You can just use a connection factory to create a JMS connection and session without knowing any destination. So how is a destination related to a session? They are related to each other when you create the JMS producer and consumer. At that time, the session is bound to the destination. That said, there are two other scenarios where you can actually create a destination using a session. You can create a dynamic destination or a temporary destination. This kind of destination seems not to be permanent and will be gone when the session is gone.

When a destination is created, it does not actually has any communication to the remote JMS provider. It can just be an invalid destination. For example, in the method session.createTopic("myTopicName"), the value of "myTopicName" can be any string. It will not throw exception. The actual communication to the remote JMS provider happens when a producer is created using the session and the destination. In the case of Tibco JMS, there is the following code in creating a producer:

 TibjmsMessageProducer _createProducer(TibjmsDestination tibjmsdestination)
        throws JMSException
 {
        ...
       TibjmsMessage tibjmsmessage1 = _connection._link.sendRequestMsg(tibjmsmessage, _connection._requestTimeout);
        ...
 }
In the above code, the _link object is of the abstract class TibjmsxLink whose only subclass is TibjmsxLinkTcp. And internally this TibjmsxLinkTcp uses the class TibjmsxStream to communicate to the remote site. An exception will be thrown if the destination is not valid when a producer is being created.

Question 1: When a JMS session is created, we can pass the Acknowledge mode. It is easy to understand the mode for a consumer because in the consumer code we can add code to acknowledge the message. But what is the use of the mode for a producer? Does the JMS provider acknowledge the producer?

Question 2: What is exactly a session? Note that it is not even required to have a destination when you create a session. You only need a connection to create a session. If we create a session and then create both the producer and the consumer using this same session, anything interesting will happen?

One observation. The sessionTransacted and sessionAcknowledgeMode need to be specified when you create a session. The JMS API does not let you change these two properties after a session is created. There is no "set" method for these properties.

Note on message ID:

The JMS provider will set the unique ID for each message. In the following test, I just let the producer send the same message object twice. The printed message still shows that they have different ID values:

code:

producer.send(message);
System.out.println("JMSmessageID: " + message.getJMSMessageID());
producer.send(message);
System.out.println("JMSmessageID: " + message.getJMSMessageID());

output:    
JMSmessageID: ID:EMS01-PMITST02.7C0051A50F852287AD:2
JMSmessageID: ID:EMS01-PMITST02.7C0051A50F852287AD:3

Thursday, March 27, 2014

Maven respository configuration in settings.xml

The following is an analysis on the maven configuration file settings.xml. In this example, it is assumed that you are working on a computer that is in your company office. So it uses a company proxy to go to the internet. Assume the company name is MyCompany. The maven version in this analysis is 2.2.1.
  1. Maven has its own settings.xml. This is a global file in C:\apache-maven-2.2.1\conf. But the file content is basically all commented out. So it is basically just a placeholder there.
  2. Maven then uses the settings.xml in the user directory C:\Users\userid\.m2
  3. Inside MyCompany network, the http traffic usually uses a proxy.
  4. For IE browser, you can configure the proxy by following the menu: tools --> internet options --> connections - LAN settings. Then you can check "Use Automatic configuration script" and set the URL to the script in the "Address" input field. For example, you can specify the following in the Address: http://autoproxy.MyCompany.com/.
    If the MyCompany proxy is not set, it seems that no traffic can go outside. Even the URL www.google.com will time out. And the central maven repository http://central.maven.org/maven2/ will time out too of course.
  5. For maven, the proxy is configured using the<proxy> tag inside the settings.xml file. For example:
     <proxies>
        <proxy>
           <active/>
           <port>80</port>
           <host>internet.myCompany.com</host>
           <id>my proxy</id>
           <nonProxyHosts>*.myCompany.com</nonProxyHosts>
        </proxy>
     </proxies>
     
  6. Maven uses the Central Repository http://central.maven.org/maven2/ as the default repository. Even if you do not specify any repository in the settings.xml file, maven will use this Central repository.
Because of the license issues, it can be expected that MyCompany needs to have a repository that hold all the permitted java libraries. It will be wrong for the application to be able to download libraries from outside repositories freely, even if it is the central maven repository. So how to achieve this? The trick is in the "mirror" configuration in the settings.xml. There are documents here: https://maven.apache.org/guides/mini/guide-mirror-settings.html. The following are the important things.
  1. By default, Maven will download from the central repository. To override this, you need to specify a mirror.
  2. You can force Maven to use a single repository by having it mirror all repository requests. The repository must contain all of the desired artifacts, or be able to proxy the requests to other repositories. This setting is most useful when using an internal company repository with the Maven Repository Manager to proxy external requests. To achieve this, set mirrorOf to *.
  3. The official Maven 2 repository is at http://repo.maven.apache.org/maven2 hosted in the US.
  4. The ID of the main Maven Central US repository included by default is central.
At MyCompany, the production settings.xml has the following:
   <mirrors>
     <mirror>
          <id>maven.MyCompany.com</id>
          <name>Maven Repository Manager running on maven.MyCompany.com</name>
          <url>http://maven.MyCompany.com/m2</url>
          <mirrorOf>central</mirrorOf>
     </mirror>
   </mirrors>
    
    <repositories>
           <repository>
             <id>mvnown</id>
             <name>Maven Repository Manager running on maven.MyCompany.com</name>
             <url>http://maven.MyCompany.com/m2</url>
           </repository>
     </repositories>
  
From the above, we can see that the maven "central" repository is overridden by the MyCompany repository. The only repository used is http://maven.MyCompany.com/m2. In one project, we got the following error messages when building the application:
Downloading: http://maven.MyCompany.com/m2/org/jboss/jboss-parent/6/jboss-parent-6.pom
[INFO] Unable to find resource 'org.jboss:jboss-parent:pom:6' in repository mvnown (http://maven.MyCompany.com/m2)
Downloading: http://maven.MyCompany.com/m2/org/jboss/jboss-parent/6/jboss-parent-6.pom
[INFO] Unable to find resource 'org.jboss:jboss-parent:pom:6' in repository central (http://repo1.maven.org/maven2)
You can see that maven tried the two repositories: Firstly repository mvnown (http://maven.MyCompany.com/m2). Secondly central (http://repo1.maven.org/maven2).

The first repository is specified in the<repository> setting. The second repository is the default. This error message shows that even if you do not specify the central repository in the settings.xml, maven will still try to download libraries from that repository. In our case, it is mirrored. So what maven got is actually still mvnown (http://maven.MyCompany.com/m2)! And of course, nothing new is found there!

Now if I remove the mirror for the central, what will happen? Can maven find that jboss-parente-6.pom file? You will expect that the answer should be a "Yes". And that is right! It will be found and the build will be a success!