First, the connection factory Spring bean:
<spring:bean name="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<spring:property name="brokerURL" value="tcp://${esb.jms.endpoint}"/>
</spring:bean>
Since I have Maven filtering my resources, the actual tcp URI will be replaced with the appropriate environmental property--in my case, being in an active development environment, and using activeMQ 5.3.0, the filtered value would be "tcp://localhost:61616".
Next is the connector definition:
<jms:connector name="JMSConnector"
specification="1.1"
persistentDelivery="true"
connectionFactory-ref="activeMQConnectionFactory" >
And finally, the endpoint:
<jms:endpoint name="asynchIn" queue="asynch.in"/>
The service definition for this simple case is:
<service name="Asynchronous processing">
<inbound>
<inbound-endpoint ref="asynchIn" synchronous="false"/>
<wire-tap-router>
<stdio:outbound-endpoint system="OUT" name="debugTrace" connector-ref="SysOut"/>
</wire-tap-router>
</inbound>
<outbound>
<filtering-router>
<jms:outbound-endpoint queue="test.out" />
<message-property-filter pattern="JMSType=test"/>
</filtering-router>
<filtering-router>
<jms:outbound-endpoint queue="test2.out" />
<message-property-filter pattern="JMSType=test2"/>
</filtering-router>
</outbound>
</service>
Notice that the inbound definition contains a wire-tap-router reference, this makes it much easier (IMO) to trace the message flow during development while defining the routing rules and generally tweaking things. Mule will send the message to sysout and also apply filter routing.
The filters generally speak for themselves, in the cases above the filters are based on the type of message.
To test the setup with a vanilla ActiveMQ install (stomp enabled and the stomp gem installed), this quick Ruby script works quite handily:
require 'stomp'
Stomp::Client.open("stomp://localhost:61612").send("/queue/asynch.in","\n\n\n!!!!!!!!!!!!!!\ntest message\n!!!!!!!!!!!!!!!",{:persistent => true, :type => 'test'})
Mule's wire-tap-router should dump the message:
system out:ActiveMQBytesMessage {commandId = 3, responseRequired = false, messageId = ID:vsbeta-45609-1261505977249-4:104:-1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:vsbeta-45609-1261505977249-4:104:-1:1, destination = queue://asynch.in, transactionId = null, expiration = 0, timestamp = 1261518816945, arrival = 0, brokerInTime = 1261518816946, brokerOutTime = 1261518816946, correlationId = null, replyTo = null, persistent = true, type = test, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@7c66f0, marshalledProperties = org.apache.activemq.util.ByteSequence@4a4890, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {content-type=text/plain; charset=UTF-8}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null }INFO 2009-12-22 14:53:37,033 [JMSConnector.dispatcher.1] org.mule.transport.jms.JmsMessageDispatcher: Connected: endpoint.outbound.jms://test.out
ActiveMQ's admin screen should show pending messages inside of the test.out or test2.out queues. Messages could be consumed via Stomp:
require 'stomp'
client = Stomp::Client.open("stomp://localhost:61612")
client.subscribe("/queue/test.out"){|message| puts "consuming #{message.body} with properties #{message.headers.inspect}"}
producing output:
consuming
!!!!!!!!!!!!!!
test message
!!!!!!!!!!!!!!! with properties {"MULE_ORIGINATING_ENDPOINT"=>"asynchIn", "content_type"=>"text/plain; charset=UTF-8", "MULE_CORRELATION_ID"=>"0f2295b0-ef45-11de-856a-538c667e24a7", "expires"=>"0", "timestamp"=>"1261519075346", "destination"=>"/queue/test.out", "message-id"=>"ID:Rohirrim.local-51739-1261518810672-0:0:7:1:1", "priority"=>"4", "MULE_SESSION"=>"SUQ9MGYyMjk1YjEtZWY0NS0xMWRlLTg1NmEtNTM4YzY2N2UyNGE3", "content-length"=>"46", "MULE_MESSAGE_ID"=>"ID:vsbeta-4
5609-1261505977249-4:117:-1:1:1", "correlation-id"=>"0f2295b0-ef45-11de-856a-538c667e24a7", "MULE_ENCODING"=>"UTF-8", "MULE_ENDPOINT"=>"jms://test.out"}=> nil
This approach enables a single queue to collect asynchronous message requests and leverage Mule's filtering-routers to decouple the producer and consumer. The requests go on the ESB, Mule defines where they should go, and the service components act and process the request independent of the requester.