AMQP Transport for WSO2 ESB based on RabbitMQ Java Client

Introduction
WSO2 ESB is a High Performance, Light weight, Open Source Enterprise Service Bus. It also has inbuilt support for integrating different technologies which uses different transports protocols. Some of the well known transports that WSO2 ESB supports are HTTP, HTTPS, POP, IMAP, SMTP, JMS, FIX, TCP, UDP, FTPS, SFTP, CIFS, MLLP, SMS.

AMQP is an application layer level messaging protocol for message oriented architecture. It operates like the same way as HTTP, FTP, SMTP etc, to make systems inter-operate with each other. It address the issues that are faced by systems where the inter-operability is achieved by using well defined API’s (e.g JMS). For example, if your system wants to talk to another system over JMS, you have to implement the JMS API. Whereas AMQP is a wire-level protocol which describe the format of the data that is sent across the network. So irrespective of the implementation language, any system/tool that can create/send/consume/read the AMQP messages which ad-hear to the AMQP data format, gets the ability to inter-operate with each other.

RabbitMQ Java Client is one such tool which allows you to send or receive AMQP messages. Apart from this, RabbitMQ is an AMQP broker implementation, which can be used as an AMQP broker too. In this post i will be explaining about the new AMQP transport for WSO2 ESB, which is implemented using the RabbitMQ Java Client.

Applies To

WSO2 ESB 4.5.1
RabbitMQ AMQP Java Client 3.0.3

The Scenario of Interest

rabbitmq

The above scenario is considered here. This will demonstrate the ability of  WSO2 ESB to consume/publish AMQP messages from/to an AMQP broker. The “Sender” here can be anything, which is capable of publishing messages to an AMQP queue. Similarly the “Receiver” can be anything, which can consume messages form an AMQP queue. In this post I will be using RabbitMQ Java Client library in sending/receiving AMQP messages in both Sender and Receiver implementation.

A proxy service in ESB will be listening to Q1. When there is a message available in Q1, ESB will consume it. If the proxy defined in such a way that the messages should be sent to an AMQP destination , say Q2, then the consumed messages will be sent to the Q2. The proxy service configuration for this scenario is explained later in this post.

Installing RabbitMQ AMQP transport feature into WSO2 ESB

The RabbitMQ AMQP transport is developed as a separate module for transports project. It is also available as an installable p2 feature, which can be installed via the WSO2 Feature Manager in WSO2 ESB.

Following are the steps to install this feature.

1. Start the ESB server.
2. Download and unzip the p2-repo.zip file to some location. Copy the path (say /home/esb/p2-repo).
3. Go to Configure > Features from the management console view and add a new local repository by giving the path copied above.
4. Select the added repository and tick “Show only the latest versions” and click on Find features. You will see the “Axis2 Transport RabbitMQ AMQP” feature listed. Select that and install it.
5. After successful installation, shutdown the server.

Alternatively you can just copy both the jars found in plugins directory to {ESB_HOME}/repository/components/dropins/ directory.

Now the next thing is to configure the transport (Listener and Sender) in axis2.xml

Configuring RabbitMQ AMQP Transport in WSO2 ESB

1. Add the following configuration items to axis2.xml found in {ESB_HOME}/repository/conf/axis2/axis2.xml

(i) Under transport listeners section add the following RabbitMQ transport listener.

<transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener">
   <parameter name="AMQPConnectionFactory" locked="false">
      <parameter name="rabbitmq.server.host.name" locked="false">192.168.0.3</parameter>
      <parameter name="rabbitmq.server.port" locked="false">5672</parameter>
      <parameter name="rabbitmq.server.user.name" locked="false">user</parameter>
      <parameter name="rabbitmq.server.password" locked="false">abc123</parameter>
   </parameter>
</transportReceiver>

The parameters are self explanatory, which are used to create connection to AMQP broker. You can define any number of connection factories under <transportReceiver/> definition.

(ii) Under transport senders section add the following RabbitMQ transport sender.

<transportSender name="rabbitmq"
class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>

This is the transport sender which is used for sending AMQP messages out to a queue.

2. Start the ESB server.

Creating a proxy service which works with RabbitMQ AMQP transport

A sample proxy service which consumes and sends AMQP messages from and
to an RabbitMQ AMQP broker

<proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProxy"
transports="rabbitmq" statistics="disable" trace="disable"
startOnLoad="true">
   <target>
      <inSequence>
         <log level="full"/>
         <property name="OUT_ONLY" value="true"/>
         <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
      </inSequence>
      <endpoint>
         <address
         uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=192.168.0.3&rabbitmq.server.port=5672&rabbitmq.server.user.name=user&rabbitmq.server.password=abc123&rabbitmq.queue.name=queue2&rabbitmq.exchange.name=exchange2"/>
      </endpoint>
   </target>
   <parameter name="rabbitmq.queue.name">queue1</parameter>
   <parameter name="rabbitmq.exchange.name">exchange1</parameter>
   <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
   <description></description>
</proxy>

1.  Transport configuration (transports=”rabbitmq”)
The proxy is defined under amqp transport as OUT_ONLY proxy service, where it does not expect a response from the endpoint.

2. RabbitMQ AMQP Connection factory configuration (AMQPConnectionFactory)

In here we specify that the name of connection factory which used to listen to the queue to consume messages as a parameter. This connection factory is defined in the AMQP transport listener configuration of axis2.xml. This proxy reads message from the specified queue and sends the messages to the defined in the endpoint address.

3. RabbitMQ Queue name to listen for messages (rabbitmq.queue.name)

This is the queue name that the listener configuration for AMQPConnectionFactory will be listening on. If no name is specfied, then it is assumed the the name of the Proxy Service used as the queue name.

3. RabbitMQ Endpoint configuration

<address
uri="rabbitmq:/SampleProxy?rabbitmq.server.host.name=192.168.0.3&rabbitmq.server.port=5672&rabbitmq.server.user.name=guest&rabbitmq.server.password=&rabbitmq.queue.name=queue2&rabbitmq.exchange.name=exchange2"/>

The endpoint address should be of the format specified as above. All the parameters needed for the proxy service to send messages to endpoint should be given in the address uri format as mentioned above. If the exchange name is not specified, then as default, an empty value will be used as the exchange name.

4. RabbitMQ AMQP Transport properties

  • rabbitmq.server.host.name – Host name of the RabbitMQ server running on.
  • rabbitmq.server.port – Port value on which the server is running.
  • rabbitmq.server.user.name – User name to connect to RabbitMQ server.
  • rabbitmq.server.password – Password of the account to connect to RabbitMQ server.
  • rabbitmq.server.virtual.host – Virtual host name of the RabbitMQ server running on, if any.
  • rabbitmq.queue.name – Queue name to send or consume messages.
  • rabbitmq.exchange.name – Exchange name the queue is bound.

There can be situations where you only want to send AMQP messages out OR only want receive AMQP messages. The above proxy can be changed to suite both of the above scenarios. Also you can change it to work with different transport as-well. For example, a proxy which listen messages over AMQP and send over HTTP or JMS, etc.

Sample Proxy Service in operation

Note : This feature is currently tested for soap messages which are sent and consumed from AMQP broker queues with content type “text/xml”.

1. When the proxy service is created by following the above steps and deployed in ESB, it will be listening to the queue specified in rabbitmq.connection.factory under RabbitMQ AMQP Transport Listener. When there is a message available in queue, it will be consumed by the listener.

2. The consumed message will be then be sent to the endpoint queue specified in the Endpoint configuration of proxy.

A sample java client to send soap xml message to an AMQP queue

In a previous post of mine, I explained on how to use RabbitMQ Java client to send/receive messages. In here i’m using the same way to send and receive messages from/to an RabbitMQ queue.

Note : Give the correct queue name for publishing messages. This will be the same queue where the AMQP Transport listener will be listening on.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
factory.setPort(port);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueBind(queueName, exchangeName, routingKey);

// The message to be sent
String message = "<soapenv:Envelope
                  xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope">\n" +  
                 "<soapenv:Header/>\n" +
                 "<soapenv:Body>\n" +
                 "  <p:greet xmlns:p=\"http://greet.service.kishanthan.org\">\n" + 
                 "     <in>" + name + "</in>\n" +
                 "  </p:greet>\n" +
                 "</soapenv:Body>\n" +
                 "</soapenv:Envelope>";

// Populate the AMQP message properties
AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties().builder();
builder.messageId(messageID);
builder.contentType("text/xml");
builder.replyTo(replyToAddress);
builder.correlationId(correlationId);
builder.contentEncoding(contentEncoding);

// Custom user properties
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("SOAP_ACTION", "greet");
builder.headers(headers);

// Publish the message to exchange
channel.basicPublish(exchangeName, queueName, builder.build(), message.getBytes());

A sample java client to consume the message from an AMQP queue

Note : Give the correct queue name for consuming messages. In here the queue name will be the one configured in the Endpoint configuration of the proxy service.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
factory.setPort(port);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueBind(queueName, exchangeName, routingKey);

// Create the consumer
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

// Start consuming messages
while (true) {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
}

The above two java clients can be used to test the scenario where you publish a message to a RabbitMQ AMQP queue, which is consumed by the ESB and published to another queue, which in turns consumed by a java client. When running both the above clients, you will get the messages sent by the Sender client at Receiver side. If the above works as expected, then you have configured RabbitMQ AMQP transport correctly in WSO2 ESB.

Conclusion

This new RabbitMQ AMQP transport implementation will solve the issue of calling an AMQP broker, such as RabbitMQ, directly without the need to use different transport mechanisms, such as JMS. The underlying protocol used in this transport is AMQP.

RabbitMQ Java Client is available under Mozilla Public Licence 1.1. WSO2 ESB is available under Apache Licence v2.

About these ads

About kishanthan

I’m currently working as a Software Engineer at WSO2, an open source software company. I hold an Engineering degree, majoring in Computer Science & Engineering field, from University of Moratuwa, Sri Lanka.
This entry was posted in Java, RabbitMQ, WSO2 and tagged , , , , . Bookmark the permalink.

15 Responses to AMQP Transport for WSO2 ESB based on RabbitMQ Java Client

  1. TSD says:

    I am new to wso2 and would like to use this to create an input for wso2-cep. Do I need to have ESB set up to make this work or am I completely off base here. The qpid solutions I have seen do not allow me to configure the AMQP exchange and params.

    • kishanthan says:

      There are multiple ways which you can connect to WSO2 CEP using this RabbitMQ AMQP transport.
      1. Install this RabbitMQ AMQP transport feature on WSO2 CEP and write a broker [1] which receives the events for AMQP messages, via a service.
      2. Install the CEP features on ESB, which is already configured with RabbitMQ AMQP transport and use the BAM or Event mediator in the proxy service to send events to CEP, when the proxy service receives an AMQP message.

      I have a plan to do the first approach and will be writing a post on explaining it in detail.

      Thanks,
      Kishanthan.
      [1] http://docs.wso2.org/wiki/display/CEP210/Writing+Custom+Broker

  2. Frank Boller says:

    p2-repo.zip contains a feature.xml tagged with a 3.0.3 version.
    Is there a file drop for a version which can be used with version 4.1.0?

    • kishanthan says:

      Hi Frank,
      What is meant by the 4.1.0 version here? Are you talking about the RabbitMQ Java Client version or the WSO2 Carbon version?

      Currently I have used the 3.0.3 version of RabbitMQ Java Client, which was the latest stable release, in this transport implementation. It is the same version that is used in exporting the packages from the RabbitMQ OSGI bundle and also as the version of the bundle. The bundles which imports these packages, will be using this version in their import headers. This will not be a problem.

      Thanks,
      Kishanthan.

  3. Pingback: Transaction support with RabbitMQ | Kishanthan's Blog

  4. ploef says:

    Hi Frank,
    I’m currently investigating the combination of WSO2 esb and RabbitMq and, although I’m getting closer (:-) ), something still goes wrong.
    The feature is installed, receiver and sender configured. I see the queue1 and exchange1 automaticly being created in RabbitMq. But everytime I want to test the configuration by sending a message to queue1 in the RabbitMq management site I get following error:

    ——–
    [2013-06-07 16:37:00,230] INFO – ProxyService Successfully created the Axis2 service for Proxy service : AMQPProxy
    [2013-06-07 16:37:33,600] ERROR – NativeWorkerPool Uncaught exception java.lang.NullPointerException
    at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.getConsumerDelivery(ServiceTaskManager.java:290)
    at org.apache.axis2.transport.rabbitmq.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:179)
    at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
    ———-
    Do you have any idea what is going on here?

    Thanks,
    Peter

    • kishanthan says:

      Hi Peter,

      This could be due to not setting some required property(s) at client side. For example following are the expected properties for a rabbitmq message.

      AMQP.BasicProperties.Builder builder = new
      AMQP.BasicProperties().builder();
      builder.messageId(messageID);
      builder.contentType(“text/xml”);
      builder.replyTo(replyToAddress);
      builder.correlationId(correlationId);
      builder.contentEncoding(contentEncoding);

      Can you check again whether you are sending the above with the request?

      Thanks,
      Kishanthan.

  5. JJ says:

    Hello Kishanthan,
    I am trying to configure wso2 esb with rabbitmq and all the steps above. But i am getting an error when I am configuring the endpoint in the proxy service.

  6. Giles says:

    The endpoint config doesn’t seem to work for me in WSO2 :(

    It seems that for an address endpoint WSO2 expects a normal URL, not a URI that starts with “rabbitmq:/” (if I create the endpoint explicitly then when I hit “test” it responds with “Malformed host address uri”).

    this is with WSO2 4.7.

    • kishanthan says:

      Hi Giles,
      If you want to send message to an endpoint over rabbitmq transport, then you should give the endpoint url with the convention “rabbitmq:/”.

      For address endpoints, currently rabbitmq style url’s are not yet supported. You have to define your endpoint inline in your proxy configuration.

      Thanks,
      Kishanthan

  7. Pingback: Receiving messages from and sending messages to RabbitMq from WSO2 esb (v. 4.6.0) | Technology & Programming

  8. JAMSHEER T says:

    I am trying to configure wso2 esb with rabbitmq. I’ve read this article.Whenever i sent a message from the sender , It reached out the queue named Q1 as well as ESB console. But it doesn’t reach queue named Q2. any solution ?

  9. JAMSHEER T says:

    k thanks kishanthan

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s