Tuesday, December 22, 2009

Mule JMS message routing using an external ActiveMQ instance

I have a scenario where I'd like Mule to monitor an incoming queue, filter the messages and route to appropriate outgoing queue--using a separate ActiveMQ instance instead of the optional embedded one. While perusing Google results I didn't find a source that explicitly showed how to accomplish this. So using what information I did find from indirect examples and other documentation, this is what I came up with.
First, the connection factory Spring bean:
 <spring:bean name="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
   <spring:property name="brokerURL" value="tcp://${esb.jms.endpoint}"/>  
  </spring:bean>  

Since I have Maven filtering my resources, the actual tcp URI will be replaced with the appropriate environmental property--in my case, being in an active development environment, and using activeMQ 5.3.0, the filtered value would be "tcp://localhost:61616".

Next is the connector definition:
  <jms:connector name="JMSConnector"  
          specification="1.1"  
          persistentDelivery="true"  
          connectionFactory-ref="activeMQConnectionFactory" >  

And finally, the endpoint:
 <jms:endpoint name="asynchIn" queue="asynch.in"/>  

The service definition for this simple case is:

 <service name="Asynchronous processing">  
    <inbound>  
     <inbound-endpoint ref="asynchIn" synchronous="false"/>  
     <wire-tap-router>  
      <stdio:outbound-endpoint system="OUT" name="debugTrace" connector-ref="SysOut"/>  
     </wire-tap-router>  
    </inbound>  
    <outbound>  
     <filtering-router>  
      <jms:outbound-endpoint queue="test.out" />  
      <message-property-filter pattern="JMSType=test"/>  
     </filtering-router>  
     <filtering-router>  
      <jms:outbound-endpoint queue="test2.out" />  
      <message-property-filter pattern="JMSType=test2"/>  
     </filtering-router>  
    </outbound>  
   </service>  

Notice that the inbound definition contains a wire-tap-router reference, this makes it much easier (IMO) to trace the message flow during development while defining the routing rules and generally tweaking things. Mule will send the message to sysout and also apply filter routing.

The filters generally speak for themselves, in the cases above the filters are based on the type of message.

To test the setup with a vanilla ActiveMQ install (stomp enabled and the stomp gem installed), this quick Ruby script works quite handily:
 require 'stomp'  
  Stomp::Client.open("stomp://localhost:61612").send("/queue/asynch.in","\n\n\n!!!!!!!!!!!!!!\ntest message\n!!!!!!!!!!!!!!!",{:persistent => true, :type => 'test'})  

Mule's wire-tap-router should dump the message:
 system out:ActiveMQBytesMessage {commandId = 3, responseRequired = false, messageId = ID:vsbeta-45609-1261505977249-4:104:-1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:vsbeta-45609-1261505977249-4:104:-1:1, destination = queue://asynch.in, transactionId = null, expiration = 0, timestamp = 1261518816945, arrival = 0, brokerInTime = 1261518816946, brokerOutTime = 1261518816946, correlationId = null, replyTo = null, persistent = true, type = test, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@7c66f0, marshalledProperties = org.apache.activemq.util.ByteSequence@4a4890, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {content-type=text/plain; charset=UTF-8}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null }INFO 2009-12-22 14:53:37,033 [JMSConnector.dispatcher.1] org.mule.transport.jms.JmsMessageDispatcher: Connected: endpoint.outbound.jms://test.out  

ActiveMQ's admin screen should show pending messages inside of the test.out or test2.out queues. Messages could be consumed via Stomp:
 require 'stomp'  
 client = Stomp::Client.open("stomp://localhost:61612")  
 client.subscribe("/queue/test.out"){|message| puts "consuming #{message.body} with properties #{message.headers.inspect}"}  

producing output:
 consuming   
 !!!!!!!!!!!!!!  
 test message  
 !!!!!!!!!!!!!!! with properties {"MULE_ORIGINATING_ENDPOINT"=>"asynchIn", "content_type"=>"text/plain; charset=UTF-8", "MULE_CORRELATION_ID"=>"0f2295b0-ef45-11de-856a-538c667e24a7", "expires"=>"0", "timestamp"=>"1261519075346", "destination"=>"/queue/test.out", "message-id"=>"ID:Rohirrim.local-51739-1261518810672-0:0:7:1:1", "priority"=>"4", "MULE_SESSION"=>"SUQ9MGYyMjk1YjEtZWY0NS0xMWRlLTg1NmEtNTM4YzY2N2UyNGE3", "content-length"=>"46", "MULE_MESSAGE_ID"=>"ID:vsbeta-4  
 5609-1261505977249-4:117:-1:1:1", "correlation-id"=>"0f2295b0-ef45-11de-856a-538c667e24a7", "MULE_ENCODING"=>"UTF-8", "MULE_ENDPOINT"=>"jms://test.out"}=> nil  

This approach enables a single queue to collect asynchronous message requests and leverage Mule's filtering-routers to decouple the producer and consumer. The requests go on the ESB, Mule defines where they should go, and the service components act and process the request independent of the requester.

2 comments:

Unknown said...

Simplify your life, use <jms:activemq-connector brokerURL="..."/> ;)

RR said...

@aperepel thanks for the tip!