Transaction support with RabbitMQ

In a previous post of mine, I explained about the new AMQP transport developed for WSO2 ESB based on RabbitMQ Java Client. In this post I will be explaining about how we use the transactions support in RabbitMQ Java Client in our client consumer implementation, which is also used in the new AMQP Transport for WSO2 ESB.

Say that while trying to process a consumed message from an RabbitMQ AMQP queue, an unexpected error occurred. This will result in a state that the message was consumed from the queue but it was not used for the intended purpose. To avoid these kind of situation, RabbitMQ Java Client provide inbuilt support with auto acknowledgement and transactions.

The following is a way to achieve this

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(HOST);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, false, consumer);

    while (true) {
        channel.txSelect();
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        if (delivery.getProperties().getContentType() != null) {
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            channel.txCommit();
        }
        else {
            channel.txRollback();
        }
    }

In here I’m explicitly setting autoAcknowledgement as false when consuming the message, so that message remains in the queue and then start to process the message. During processing, I’m checking for a condition. If that condition is not satisfied I roll back the transaction, and if it is satisfied, I commit the transaction by sending an acknowledgement.

This is a basic way to handle transaction with RabbitMQ Java Client. We can extend this to support complex transaction processing as-well.

Advertisements

About kishanthan

I’m currently working as a Software Engineer at WSO2, an open source software company. I hold an Engineering degree, majoring in Computer Science & Engineering field, from University of Moratuwa, Sri Lanka.
This entry was posted in How to, Java, RabbitMQ, WSO2 and tagged , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s