Set - 3

Question 6 :

When should I use server session pools and connection consumers?

Answer :

WebLogic JMS implements an optional JMS facility for defining a server-managed pool of server sessions. This facility enables an application to process messages concurrently. A ConnectionConsumer object uses a server session to process received messages. If message traffic is heavy, the connection consumer can load each server session with multiple messages to minimize thread context switching. Multiple connection consumers can share server sessions in a server session pool.
To learn how to use the connection consumers within an application, see the section Processing Messages Concurrently in Programming WebLogic JMS, or the javax.jms.ConnectionConsumer javadoc.
Note: Server session pools can also be implemented using Message Driven Beans. Using MDBs is preferable to using server session pools - see the answer to the question, "How do server session pools and Message Driven Beans compare?" For information on using message driven beans to implement server session pools, see Programming WebLogic Enterprise JavaBeans.


Question 7 :

How do I issue the close() method within an onMessage() method call and what are the semantics of the close() method?

Answer :

If you wish to issue the close() method within an onMessage() method call, the system administrator must select the Allow Close In OnMessage check box when configuring the connection factory. For more information, see JMS Connection Factories in the Administration Console Online Help. If this check box is not selected and you issue the close() method within an onMessage() method call, the call will hang.

The close() method performs the following steps to execute an orderly shutdown:
* Terminates the receipt of all pending messages. Applications may return a message or null if a message was not available at the time of the close.
* Waits until all message listeners that are currently processing messages have completed (except for the message listener from which the close() method is being called).
* Rolls back in-process transactions on its transacted sessions (unless such transactions are part of an external JTA user transaction).
* Does not force an acknowledge of client-acknowledged sessions. By not forcing an acknowledge, no messages are lost for queues and durable subscriptions that require reliable processing.

When you close a connection, all associated objects are also closed. You can continue to use the message objects created or received via the connection, except the received message's acknowledge() method. Closing a closed connection has no effect.
Note: Attempting to acknowledge a received message from a closed connection's session throws an IllegalStateException.
When you close a session, all associated producers and consumers are also closed.
For more information about the impact of the close() method for each object, see the appropriate javax.jms javadoc.


Question 8 :

How do I publish an XML message?

Answer :

Follow these steps:
1. Generate XML from the DOM document tree.
2. Serialize the generated DOM document to a StringWriter.
3. Call toString on the StringWriter and pass it into message.setText.
4. Publish the message.


Question 9 :

Is it possible to send or receive a message from within a message listener?

Answer :

Yes. You can send to or receive from any queue or topic from within in a message listener.
If it's not an MDB, you can use the same Connection or Session that the onMessage() is part of to do this. When you create your message listener, you pass in a session in your constructor. Then you have access to the session in your onMessage method and you would be able to make synchronous, not asynchronous, calls from within the onMessage method. Do not use another Session that is servicing another onMessage() because that would multi-thread that Session and Sessions don't support multi-threading.
When things are done non-transactionally, there can be duplicates or lost messages (assuming your onMessage() code is attempting to forward messages):
1. If you call acknowledge after the publish() and the acknowledge fails for whatever reason (network/server failure), then you will see the message again and will end up publishing twice (possible duplicate semantics). You can try to keep track of sequence numbers to detect duplicates but this is not easy.
2. If you call acknowledge before the publish(), you get at-most-once semantics. If the publish() fails, you don't know if the failure occurred before or after the message reached the server.
If you want exactly once, transactional semantics using onMessage, you must use transactional MDBs. The onMessage() for a transactional MDB starts the transaction, includes the WebLogic Server JMS message received within that transaction and the publish() would also be in the same transaction. The following code sends a response to each message that it receives. It creates the connection, etc. in the ejbCreate method so that it doesn't need to create it every time onMessage is called. The QueueSender is anonymous (null Queue) since we don't know to whom we will have to reply. The ejbRemove method cleans up by closing the connection. This same approach can be used to create a receiver, subscriber or publisher.
import javax.ejb.CreateException;
import javax.ejb.EJBContext;
import javax.naming.*;
import javax.naming.directory.*;
import java.util.Hashtable;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.*;

public class MDB
implements MessageDrivenBean, MessageListener {
public static final String WLSqcf =
"javax.jms.QueueConnectionFactory";
public static final String WLSqname =
"jms.queue.TestQueue1";
public static final String WLSurl =
"t3://localhost:7001";
public static final String WLSJNDIfactory =
"weblogic.jndi.WLInitialContextFactory";
private MessageDrivenContext context;
private QueueSession session;
private QueueConnection connection = null;
private QueueConnectionFactory factory;
private InitialContext ctx;
private QueueSender QueueSender;

// Required - public constructor with no argument
public MDB() {}

// Required - ejbActivate
public void ejbActivate() {}
// Required - ejbRemove
public void ejbRemove() {
context = null;
if (connection != null) {
try {
connection.close();
} catch(Exception e) {}
connection = null;
}
}

// Required - ejbPassivate
public void ejbPassivate() {}

public void setMessageDrivenContext(
MessageDrivenContext mycontext) {
context = mycontext;
}

// Required - ejbCreate() with no arguments
public void ejbCreate () throws CreateException {
try {
// Get the initial context
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, WLSJNDIfactory);
env.put(Context.PROVIDER_URL, WLSurl);
env.put(Context.REFERRAL, "throw");
ctx = new InitialContext(env);

factory = (QueueConnectionFactory)ctx.lookup(WLSqcf);

// Create a QueueConnection, QueueSession, QueueSender
connection = factory.createQueueConnection();
session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueSender = session.createSender(null);
connection.start();
} catch (Exception e) {
throw(new CreateException(e.toString()));
}
}

// Implementation of MessageListener
// Throws no exceptions
public void onMessage(Message msg) {
try {
System.out.println("MDB: " +
((TextMessage)msg).getText());
msg.clearBody();
((TextMessage)msg).setText("reply message");
queueSender.send((Queue)msg.getJMSReplyTo(), msg);
}
catch(Exception e) { // Catch any exception

e.printStackTrace();
}
}
}

This approach creates a connection per EJB/MDB instance, so you might want to create a producer pool that is shared by the EJB instances. This is done by writing a class that populates a static pool with producers (see the next question for a sample producer pool). The onMessage call grabs a producer when needed. Since Sessions must be single threaded, make sure there is only one producer per session within the producer pool.


Question 10 :

How do I create a producer pool?

Answer :

The following is some pseudo-code
for a producer class.

class ProducerPool {
static Hashmap pSets = new Hashtable();
static Hashmap inUse = new Hashtable();

QueueSender get(String contextURL,
String connectionFactoryName,
String destinationName) {
String lookup = contextURL+";
"+connectionFactName+";"+destName;
synchronized(pSets) {
producer set = pSets.get(lookup);
if (set != null && set not empty)
qs = set.removeFirst();
}
if (producer == null) {
create ctx
get connect factory
create connection
create session
look up destination
qs = create queue sender
}
synchronized(inUse) {
inUse.put(qs, lookup);
}
return qs;
}

void put(QueueSender qs) {
String lookup;
synchronized(inUse) {
lookup = inUse.remove(p);
}
synchronzied(pSets) {
producer set = pSets.get(lookup);
if (set == null) {
producer set = new producer set
pSets.put(lookup, producer set);
}
producer set.add(qs);
}
}
}

Note: Static classes may be garbage collected if there are no references to them, so make sure the application server has a permanent pointer to them in some manner. One way is to reference it permanently from within a servlet or EJB when they are initialized at startup.
Here is an example of using the producer pool within the onMessage method.
onMessage() {
QueueSender qs = ProducerPool.get(...);
qs.send(...);
ProducerPool.put(qs);
}

You can pre-populate this pool by calling it from a startup class or a load-on-start servlet class.