RabbitMQ CLI, Dead Letter Exchange and Quorum Queue

Today I've taken a closer look at this code snippet in one of our services:

@Queue("${app.messaging.queueName}")
public void receive(byte[] data,
                    @Nullable @Header("x-redelivered-count") Integer count,
                    RabbitAcknowledgement rabbitAcknowledgement) {
    LOGGER.info("Received message from queue");

    try {
        someService.handleMessage(new String(data, UTF_8));
        rabbitAcknowledgement.ack();
    } catch (Exception ex) {
        LOGGER.warn("Couldn't send queued message to service", ex);
        if (count != null && count >= MAX_REDELIVER_COUNT) {
            LOGGER.error("Max redeliver count limit of {} reached. Message will be discarded.", MAX_REDELIVER_COUNT);
        } else {
            LOGGER.info("Requeue message because of exception from service.");
            count = (count == null ? 1 : ++count);
            rabbitClient.send(count, data);
        }
        rabbitAcknowledgement.ack();
    }
}

So this piece makes sure if a message cannot be handled, it should be retried by re-queuing the same message, but only for a maximum of MAX_REDELIVER_COUNT times.

I thought RabbitMQ should be able to handle this logic for us. And indeed, it offers a retry mechanism. It just needs to be enabled/used.

In this post, I walk you through the settings and CLI commands for a retry queue. Also, we will have a dead letter queue, which catches all messages failing for more than MAX_REDELIVER_COUNT times.

1. Simple topic exchange with classic queue

docker run -it --name rabbitmq --rm -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
docker exec -it rabbitmq rabbitmqadmin declare exchange name=my-exchange type=topic durable=true
docker exec -it rabbitmq rabbitmqadmin declare queue name=my-queue durable=true
docker exec -it rabbitmq rabbitmqadmin declare binding source="my-exchange" destination_type="queue" destination="my-queue" routing_key="*"

docker exec -it rabbitmq rabbitmqadmin publish exchange=my-exchange routing_key=my-routing-key properties="{\"delivery_mode\":2}" payload='test'
docker exec -it rabbitmq rabbitmqadmin get queue=my-queue ackmode=ack_requeue_true --depth=4

2. Add a dead letter queue

Use the same my-exchange as above.

docker exec -it rabbitmq rabbitmqadmin declare exchange name=dead-letter-exchange type=topic durable=true
docker exec -it rabbitmq rabbitmqadmin declare queue name=dead-letter-queue durable=true
docker exec -it rabbitmq rabbitmqadmin declare binding source="dead-letter-exchange" destination_type="queue" destination="dead-letter-queue" routing_key="*"

docker exec -it rabbitmq rabbitmqadmin declare queue name=my-dlx-queue durable=true arguments="{\"x-dead-letter-exchange\":\"dead-letter-exchange\"}"
docker exec -it rabbitmq rabbitmqadmin declare binding source="my-exchange" destination_type="queue" destination="my-dlx-queue" routing_key="*"

docker exec -it rabbitmq rabbitmqadmin publish exchange=my-exchange routing_key=my-routing-key properties="{\"delivery_mode\":2}" payload='test'
docker exec -it rabbitmq rabbitmqadmin get queue=my-dlx-queue ackmode=reject_requeue_false --depth=4

Note: We set the ackmode to reject_requeue_false to reject the message. The rejected message will be placed in the dead-letter-queue.

3. Use a quorum queue for automatic retries

RabbitMQ offers a special queue type which has a retry feature: Quorum queues. The feature is named Poison Message Handling and can be enabled by setting x-delivery-limit on the queue.

Quorum queues typically require more resources (disk and RAM) than classic mirrored queues.

docker exec -it rabbitmq rabbitmqadmin declare queue name=my-quorum-queue queue_type=quorum durable=true arguments="{\"x-dead-letter-exchange\":\"dead-letter-exchange\",\"x-dead-letter-routing-key\":\"bar\",\"x-delivery-limit\":3}"
docker exec -it rabbitmq rabbitmqadmin declare binding source="my-exchange" destination_type="queue" destination="my-quorum-queue" routing_key="*"

docker exec -it rabbitmq rabbitmqadmin publish exchange=my-exchange routing_key=my-routing-key properties="{\"delivery_mode\":2}" payload='test'
docker exec -it rabbitmq rabbitmqadmin get queue=my-quorum-queue ackmode=ack_requeue_true --depth=4

Note: We set x-dead-letter-routing-key this time. This is optional, just to show it is possible. And of course we set the queue_type. This is a CLI shortcut for having an argument x-queue-type set to quorum.

If we run rabbitmqadmin get, we see it has a new header field x-delivery-count set to 0.

+----------------+-------------+---------------+---------+---------------+------------------+--------------------------+-------------------------------------+-------------+
|  routing_key   |  exchange   | message_count | payload | payload_bytes | payload_encoding | properties.delivery_mode | properties.headers.x-delivery-count | redelivered |
+----------------+-------------+---------------+---------+---------------+------------------+--------------------------+-------------------------------------+-------------+
| my-routing-key | my-exchange | 0             | test    | 4             | string           | 2                        | 0                                   | False       |
+----------------+-------------+---------------+---------+---------------+------------------+--------------------------+-------------------------------------+-------------+

We can run the command for three more times, always receiving the same message. On the fifth time we don't receive any more messages.

If we look into the dead-letter-queue, we find it has moved here, which is exactly what we tried to accomplish: Retry for three times, then move into DLX.

4. Simplified consumer

Now back to our Java code, the consumer now looks much simpler:

@Queue(value = "${app.messaging.queueName}", reQueue = true)
public void receive(byte[] data,
                    @Nullable @Header("x-delivery-count") Integer deliveryCount,
                    Envelope envelope) {
    LOGGER.info("Received message from queue. Routing key: {}; Redelivered: {}, Delivery count: {}",
            envelope.getRoutingKey(), envelope.isRedeliver(), deliveryCount);
    someService.handleMessage(new String(data, UTF_8));
}

Differences:

5. Possible values for ackmode

See here and here.

ackmode Description
ack_requeue_true Nack message requeue true
ack_requeue_false Automatic ack
reject_requeue_true Reject requeue true
reject_requeue_false Reject requeue false

6. Documentation?

I find the documentation about rabbitmqadmin very poor. I had to look up the possible values for ackmode in the sources, the option to display message headers (--depth) is just not very intuitive, and the delivery_mode 2 for persistent messages is..also not very intuitive. The last one might be the fault of AMQP, but nevertheless the documentation about it is bad (just mentioned in the tutorial).

Also, how does ack_requeue_true map to Nack message requeue true? Well.

7. Delay?

I wish there would also be an option to have a delay between retries. If a service is not available, it might be because of a short downtime and it might be up again in five minutes.

I know about the rabbitmq-delayed-message-exchange. However, if we set x-delay on the initial message, the first delivery won't be instantly. I don't know about an easy way to have a message handled instantly for the first time, but having an (in best case increasing) delay for subsequent retries.

If this is needed, I might start with the Java snippet from above and manually requeue the message with a programmatically set x-delay header. Another option I haven't tried is using a dead-letter-queue combined with a time-to-live (TTL), but to me that seems like abusing the dead-letter concept a bit and you end up with a lot of queues and exchanges. Simplicity? Gone.

It's a shame this feature is not supported out-of-the-box, because I think it's a pretty common use case.