Friday, October 17, 2014

Spring JMS Caching

Note: Spring 2.5.6 is used in this analysis.

Use Spring JmsTemplate to Send Message

when Spring JmsTemplate is used to send message, we can use the method

public void send(MessageCreator messageCreator){}. 
It will call
public Object execute(SessionCallback action, boolean startConnection){}.
The variable startConnection is false in this case. In our test case, that method will call
conToClose = createConnection();
This method is
protected Connection createConnection() throws JMSException {
  return getConnectionFactory().createConnection();
 }
After sending the message, it will finally call
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);

For a simple ConnectionFactory (for example:com.tibco.tibjms.TibjmsConnectionFactory)), when the send(...) method is called, it will create a new connection instance and a new session instance and a new producer to send the message. And then in the finally{} block, the connection and the session will be closed. And next time the send(...) is called, a new connection and session will be created.

But if the Spring SingleConnectionFactory is used, the connection will be reused. But the session and the producer are not reused. You can construct a SingleConnectionFactory to wrap a simple connection factory:

public SingleConnectionFactory(ConnectionFactory targetConnectionFactory);

The class CachingConnectionFactory extends the Spring SingleConnectionFactory. When it is used,the connection, session, and the producer will all be reused. You can construct a CachingConnectionFactory to wrap a simple factory factory:

public CachingConnectionFactory(ConnectionFactory targetConnectionFactory);

For the CachingConnectionFactory, when the

conToClose = createConnection();
is executed, it will return a proxy of the connecion. The proxy is of the type SingledConnectionFactory@SharedConnectionInvocationHandler. The trick is in the "close" method of the connection. When the "finally" block closes the connection,the proxy just does nothing! The connection and the session will be reused next time the send(...) method is called.

Recovery

Note that when a JmsTemplate bean is created, no matter what connection factory is used, the connection is not established. The JMS connection is created the first time a message needs to be sent. Because of this, the application can start successfully even if the JMS connection does not work at the startup time.

When a message needs to be sent, JmsTemplate will create a connection. If there is an exception occuring, the process will be stopped.There is no retry.

We can create a mechanism to have the retry ability. Instead of calling the send(messageCreator) method of JmsTemplate directly, we can wrap it as follows:

 void mySend(MessageCreator messageCreator) throws JmsException {  
   while (true) {
      try {
         this.jmsTemplate.send(messageCreator);
         break;
      } catch (JmsException e) {
         handleException(e, tries);
      }
   }
 }
In this code, if there is anything wrong, handleException can handle it any way you want. For example, it can sleep a while and let the loop execute again. You can configure the number of the retries. Then after the maximum retries have been attempted unsuccessfully, handleException will throw the exception that will end the while-loop.

Note that everytime the send(messageCreator) method of JmsTemplate is called, it will create a connection, session,and producer to send the message. The connection is created from a connection factory. So if it is a cached factory such as the CachingConnectionFactyory or SingleConnectionFactory, the same connection may be returned. But the connection may be bad. So in handleException, you can call the onException() method of SingleConnectionFactory, which will reset the connection. And the next time the connection factory will create a brandnew connection when asked.

Test Results

The following is the actual test data. I used the debug in running the test. Tibco JMS is used on the backend. The basic code is the following:

        public void process(String[] args) throws Exception {  
  IJmsSender jmsSender = (IJmsSender) getBean("jmsSender");
  test(jmsSender);
 }

 private void test(IJmsSender jmsSender) {
  try {
   String msg = "msg1";
   jmsSender.sendMessage(msg);
   // second time
   msg = "msg2";
   jmsSender.sendMessage(msg);
   // third time
   msg = "msg3";
   jmsSender.sendMessage(msg);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
The jmsSender is just a POJO. It uses the following method to send message:
       public void sendMessage(final String message) {
  jmsTemplate.send(new MessageCreator() {
   @Override
   public Message createMessage(Session session) throws JMSException {
    BytesMessage bytesMsg = session.createBytesMessage();
    bytesMsg.writeBytes(message.getBytes());
    return bytesMsg;
   }

  });
 } 

To check the connection, session and producer object details, I found these objects have the objectDelegate attribute. That attribute is an object and it contains many data. The important one is the id value: _connid, _session_id, and _prodid. In the test, the message is sent 3 times.

If TibjmsConnectionFactory is used, every time a new _connid, _sessionid, and _prodid will be generated.

If SingleConnectionFactory is used, the results are below for the 3 sending:

                        
                        SingleConnectionFactory
connection objectDelegate          session objectDelegate              producer objectDelegate
(TibjmsConnection)                  (TibjmsxSessionImpl)                (TibjmsTopicPublisher)
_connid                              _sessionid                         _prodid
51292                              103950                               45146
51292                              103977                               45155
51292                              103978                               45156

If CachingConnectionFactory is used, the results are below for the 3 sending:

                        
                        CachingConnectionFactory
connection objectDelegate          session objectDelegate              producer objectDelegate
(TibjmsConnection)                  (TibjmsxSessionImpl)                (TibjmsTopicPublisher)
_connid                              _sessionid                         _prodid
51306                              103981                               45157
51306                              103981                               45157
51306                              103981                               45157

CachingConnectionFactory's cacheProducers Property

CachingConnectionFactory has a property cacheProducers. It is not inherited from SingleConnectionFactory. Its default value is true. If the value of this property is set to false, the producer will not be reused. So in the test case for CachingConnectionfactory above, if cacheProducers is false, the _connid and _sessionid will still be the same in the three message sendings. But the _prodid will be all different!

Note that according to the javadoc of the class, the cache is to cache JMS MessageProducers per JMS session instance (more specifically: one MessageProducer per Destination and Session).

CachingConnectionFactory's cacheConsumers Property

CachingConnectionFactory has a property cacheConsumers. It is not inherited from SingleConnectionFactory. Its default value is true. For JmsTemplate to send messages, this value does not affect the caching of the connection, session and the producer. So if the value is set to false, the 3 sendings in the above test ( suppose cacheProducers is the default true ) will use the same _connid, _sessionid, and _prodid.

Note that according to the javadoc of the class, the cache is to cache JMS MessageConsumers per JMS Session instance ( more specifically: one MessageConsumer per Destination, selector String and Session). The Durable subscribers will only be cached until logical closing of the Session handle.

SingleConnectionFactory's reconnectOnException Property

The class SingleConnectionFactory has a property reconnectOnException. The basic work flow is the following.

1. An application will first call the createConnection method of the Factory to get a JMS connection. The method in SingleConnectionFactory is

public Connection createConnection() throws JMSException {
  synchronized (this.connectionMonitor) {
   if (this.connection == null) {
    initConnection();
   }
   return this.connection;
  }
 }
Since the connection is initially null, it will call initConnection() to create a connection.

2. The initConnection() method is

public void initConnection() throws JMSException {
  if (getTargetConnectionFactory() == null) {
   throw new IllegalStateException(
     "'targetConnectionFactory' is required for lazily initializing a Connection");
  }
  synchronized (this.connectionMonitor) {
   if (this.target != null) {
    closeConnection(this.target);
   }
   this.target = doCreateConnection();
   prepareConnection(this.target);
   if (logger.isInfoEnabled()) {
    logger.info("Established shared JMS Connection: " + this.target);
   }
   this.connection = getSharedConnectionProxy(this.target);
  }
 }
Inside this method, it calls prepareConnection. This method will check if the attribute reconnectOnException is true. If yes, it will add this SingleConnectionFactory instance as an internal listener to the listener chain. Note that the SingleConnectionFactory implements the ExceptionListener interface. So it is also an exception listener.

3. The user of the JMS connection will use the connection. Since the SingleConnectionFactory implements the connection proxy, it will not close the actual connection when the close method is called. So next time the user calls the createConnection method, the connection is not null and the same connection will be returned.

4. Now if for some reason, the SingleConnectionFactory is invoked as an Exception listener, it will execute the mehtod

 /**
  * Exception listener callback that renews the underlying single Connection.
  */
 public void onException(JMSException ex) {
  resetConnection();
 }
/**
  * Reset the underlying shared Connection, to be reinitialized on next access.
  */
 public void resetConnection() {
  synchronized (this.connectionMonitor) {
   if (this.target != null) {
    closeConnection(this.target);
   }
   this.target = null;
   this.connection = null;
  }
 }
So after this, the connection will be closed and set to null.

5. The next time the createConnection method is called, since the connection is null, a new connection will be created.

Now a big question. When will the SingleConnectionFactory be invoked as an Exception listener? If you look at the code of JmsTemplate, its send() method can throw JmsException. The user will handle the JmsException. Nowhere is the Exception listener invoked in JmsTemplate.

After reading more code, I found that the ExceptionListener is invoked by the standard JMS contract. The JMS provider should invoke the ExceptionListener when it detects a serious problem. All the programmers need to do is to register the ExceptionListener on the connection using the Standard JMS API.

In the method prepareConnection(Connection con) of SingleConnectionFactory, the last line is

   con.setExceptionListener(listenerToUse);
The setExceptionListner method is an API method of javax.jms.Connection. Its javadoc is the following:
   
       Sets an exception listener for this connection.
       If a JMS provider detects a serious problem with a connection, it informs the 
connection's ExceptionListener, if one has been registered. It does this by calling 
the listener's onException method, passing it a JMSException object describing the 
problem.
       An exception listener allows a client to be notified of a problem 
asynchronously. Some connections only consume messages, so they would have no other 
way to learn their connection has failed.
   
       A connection serializes execution of its ExceptionListener.
   
       A JMS provider should attempt to resolve connection problems itself before it 
notifies the client of them.
   
       Parameters:
           listener - the exception listener 
       Throws:
           JMSException - if the JMS provider fails to set the exception listener for 
this connection.
       void 
    setExceptionListener(ExceptionListener listener) throws JMSException;
But what is considered "a serious problem" by a JMS provider? It is still not clear. A JMSException thrown in the user code does not seem to be related to this issue. When a JMSException is thrown during the runtime, JmsTemplate will just convert it to a JmsException and throw it back to the user for handling. The method is below.
    public Object execute(SessionCallback action, boolean startConnection) throws JmsException{
     try{
      ..........
     }catch(JMSException es){
       throw convertJmsAccessException(ex);
     }finally{
       ..........
     }
  }
I did a test to let the MessageCreator throw a JMSException when the second message is sent. And I catch the exception outside the jmsTemplate class and just ignore it. The third message still uses the same _connid, _sessonid, and _prodid.

By the way, note that JmsException is different from JMSException. The former is a Spring class. The later is a standard java class.

No comments:

Post a Comment