RabbitMQ: An Introduction to Message Queuing, Protocols, and Policies

Krzysztof Słomka
8 min readMar 15, 2023

--

RabbitMQ is a highly popular open-source message-broker software that implements the Advanced Message Queuing Protocol (AMQP). It allows applications to communicate with each other through a highly flexible, robust, and scalable messaging system. In this blog post, we will explore the core concepts and protocols used in RabbitMQ, as well as discuss its policies and see some working examples.

“In distributed systems, message queues are the glue that holds everything together.” — Martin Fowler

Queue

A queue is a fundamental concept in RabbitMQ, where messages are sent and stored until they are consumed. Queues are named and can be bound to exchanges, which are responsible for routing messages to the appropriate queues. They can be durable, meaning they survive a broker restart, or transient, meaning they are deleted when the broker is restarted.

Queue Properties

Queues in RabbitMQ have a few properties that determine their behavior:
Name: A unique identifier for the queue within the RabbitMQ server
Durability: Determines if the queue survives a broker restart (durable or transient)
Auto-delete: Determines if the queue is deleted when no longer used (e.g., no consumers and all messages have been consumed)
Exclusive: Determines if the queue is private to the connection that created it

Queue Features

RabbitMQ provides several features related to queues that can help you manage and optimize message processing.

  • Queue Length Limit
    You can set a maximum queue length to limit the number of messages stored in a queue. When the maximum length is reached, new messages will either be discarded or replace older messages, depending on the overflow behavior.
  • Dead-Letter Queues
    Dead-letter queues are used to handle messages that cannot be processed or delivered for various reasons (e.g., TTL expiration, rejected by a consumer). By setting up a dead-letter exchange and binding it to a dead-letter queue, you can ensure that undeliverable messages are not lost and can be handled separately.
  • Message Priorities
    RabbitMQ supports message priorities, allowing you to assign a priority level (0–255) to messages. Queues can be configured to support a specific number of priority levels, and the broker will attempt to deliver higher-priority messages before lower-priority ones.
  • Queue Mirroring
    Queue mirroring is a feature that provides high availability by replicating a queue across multiple nodes in a RabbitMQ cluster. If the primary node fails, another node in the cluster can take over as the new primary, ensuring message delivery continuity.

Exchange

An exchange is responsible for receiving messages from producers and routing them to queues based on rules called bindings. There are four types of exchanges in RabbitMQ:

  1. Direct Exchange
    A direct exchange routes messages based on the routing key. When a producer sends a message to a direct exchange, the exchange compares the routing key with the binding keys of the queues bound to it. If the routing key matches a binding key, the message is routed to the corresponding queue. Direct exchanges are suitable for scenarios where you want to route messages to specific queues based on a strict criterion.
  2. Fanout Exchange
    A fanout exchange routes messages to all queues bound to it, regardless of the routing key. This type of exchange is useful for broadcasting messages to multiple queues simultaneously. Fanout exchanges are ideal for scenarios where you want to distribute messages to multiple consumers or workers for parallel processing.
  3. Topic Exchange
    A topic exchange routes messages based on wildcard patterns in the routing key. The routing key consists of words separated by dots (e.g., “user.created”, “user.updated”). The binding key can include wildcards:
    — “
    *” matches exactly one word in the routing key
    — “
    #” matches zero or more words in the routing key
    Topic exchanges are suitable for complex routing scenarios where you want to route messages based on multiple criteria or hierarchical structures.
  4. Headers Exchange
    A headers exchange routes messages based on message headers instead of the routing key. When binding a queue to a headers exchange, you can provide a set of key-value pairs as binding arguments. The exchange routes messages to the queue if their headers match the binding arguments. Headers exchanges can be useful when you want to route messages based on multiple properties without adhering to a specific routing key structure.

Binding

A binding is a relationship between an exchange and a queue that defines how messages should be routed. It can include a routing key (or a pattern for topic exchanges) to selectively route messages from the exchange to the bound queue.

Creating and Using Exchanges

When creating an exchange, you need to specify its name, type, and optional properties. For example, to create a durable topic exchange named “my.topic”:

rabbitmqctl declare exchange name=my.topic type=topic durable=true

Producers send messages to an exchange by specifying its name. For instance, using the amqplib library in Node.js:

channel.publish('my.topic', 'user.created', Buffer.from('Hello, RabbitMQ!'));

To route messages from an exchange to a queue, you need to create a binding between them. The binding specifies the criteria (binding key) for routing messages. For example:

rabbitmqctl bind_queue source=my.topic destination=my.queue routing_key="user.*"

In this example, the binding routes messages with a routing key starting with “user.” from the “my.topic” exchange to the “my.queue” queue.

Protocols

RabbitMQ supports multiple messaging protocols.

  1. AMQP 0.9.1 — The primary protocol supported by RabbitMQ, offers a wide range of features, including message persistence, priority queues, and publisher confirms.
  2. AMQP 1.0 A newer version of AMQP, which is not directly compatible with AMQP 0.9.1. RabbitMQ supports this protocol through a plugin.
  3. MQTT A lightweight messaging protocol designed for low-bandwidth, high-latency, or unreliable networks. RabbitMQ provides support for MQTT through a plugin.
  4. STOMP — A simple text-oriented messaging protocol that is supported by RabbitMQ via a plugin.

Policies

Policies in RabbitMQ are used to manage settings and behaviors across multiple queues or exchanges. They can be applied to a set of queues or exchanges using pattern matching on their names. Policies can be used to configure features such as high availability, message TTL, and dead-lettering.

1. High Availability

RabbitMQ provides high availability through the use of mirrored queues. When a queue is mirrored, its messages and metadata are replicated across multiple nodes in a cluster. If the primary node fails, another node in the cluster can take over as the new primary.

To configure high availability using policies, you can apply the following policy:

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all","ha-sync-mode":"automatic"}'

This policy will apply to all queues with names starting with “ha.”. The ha-mode is set to "all", meaning that all nodes in the cluster will mirror the queues. The ha-sync-mode is set to "automatic", which means that the nodes will synchronize automatically.

Now, any queue with a name starting with “ha.” will be highly available across all nodes in the cluster.

You can find a working RabbitMQ HA example here: https://github.com/kisztof/rabbitmq-ha-demo

2. Message Time-To-Live (TTL)

Message Time-To-Live (TTL) is a feature that allows you to set an expiration time on messages. If a message is not consumed within the specified time, it will be removed from the queue.

To set a message TTL using policies, apply the following policy:

rabbitmqctl set_policy ttl "^(ttl-queue)" '{"message-ttl":60000}'

This policy will apply to all queues with names starting with “ttl-queue”. The message-ttl is set to 60000 milliseconds (1 minute). Messages in these queues will expire if they are not consumed within 1 minute.

3. Dead-Letter Exchanges

Dead-letter exchanges are a mechanism for handling undeliverable messages. When a message cannot be delivered, it is sent to a dead-letter exchange, which routes it to an associated dead-letter queue.

To configure a dead-letter exchange using policies, apply the following policy:

rabbitmqctl set_policy dlx "^dlx\." '{"dead-letter-exchange":"my-dlx"}'

This policy will apply to all queues with names starting with “dlx.”. The dead-letter-exchange is set to "my-dlx", which should be an existing exchange.

Now, when a message cannot be delivered in any queue with a name starting with “dlx.”, it will be sent to the “my-dlx” exchange.

4. Max Queue Length

You can set a maximum queue length to limit the number of messages stored in a queue. When the maximum length is reached, new messages will either be discarded or will replace older messages, depending on the overflow behavior.

To set a maximum queue length using policies, apply the following policy:

rabbitmqctl set_policy max-length "^max-length\." '{"max-length":1000, "overflow": "reject-publish"}'

This policy will apply to all queues with names starting with “max-length.”. The max-length is set to 1000 messages. The overflow is set to "reject-publish", meaning that new messages will be discarded when the queue reaches its maximum length.

You can also set the overflow to "drop-head" if you want the oldest messages to be removed from the queue when it reaches their maximum length.

Working Example

Setting Up RabbitMQ with Docker

First, ensure that you have Docker installed. Then, run the following command to start a RabbitMQ instance.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

This will start a RabbitMQ instance with the management plugin enabled, allowing you to access the management console at http://127.0.0.1:15672.

Producer & Consumer example

// src/producer.ts

import * as amqp from 'amqplib';

const sendMessage = async () => {
const connection = await amqp.connect('amqp://127.0.0.1');
const channel = await connection.createChannel();
const queue = 'my-queue';
const message = 'Hello, RabbitMQ!';

await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(message));
console.log(`Sent: ${message}`);

setTimeout(() => {
connection.close();
}, 500);
};

sendMessage().catch(console.error);
// src/consumer.ts

import * as amqp from 'amqplib';

const receiveMessage = async () => {
const connection = await amqp.connect('amqp://127.0.0.1');
const channel = await connection.createChannel();
const queue = 'my-queue';

await channel.assertQueue(queue, { durable: true });
console.log('Waiting for messages...');

channel.consume(
queue,
(msg) => {
if (msg) {
const content = msg.content.toString();
console.log(`Received: ${content}`);
channel.ack(msg);
}
},
{ noAck: false }
);
};

receiveMessage().catch(console.error);
 "dependencies": {
"amqplib": "^0.10.3",
"tslib": "^2.5.0"
},
"devDependencies": {
"@types/amqplib": "^0.10.1",
"ts-node": "^10.9.1",
"typescript": "^4.9.5"
}

Now, you can start the producer and consumer in separate terminals by running the following commands.

npx ts-node src/producer.ts
npx ts-node src/consumer.ts

You should see the producer sending a message to the queue, and the consumer receiving and processing the message.

❯ npx ts-node src/producer.ts
Sent: Hello, RabbitMQ!
❯ npx ts-node src/consumer.ts
Waiting for messages...
Received: Hello, RabbitMQ!

Conclusion

In this post, we explored RabbitMQ, an open-source message broker, and its core concepts such as queues, exchanges, and policies. We discussed the four types of exchanges (Direct, Fanout, Topic, and Headers) and their routing criteria, as well as the use of policies for bulk configuration of RabbitMQ features. By providing examples of setting up a RabbitMQ with Docker and implementing producers and consumers using TypeScript, I demonstrated how to apply these concepts, enabling the design of robust and efficient messaging systems for distributed applications.

RabbitMQ is a powerful and flexible messaging platform that can be used to build distributed systems and microservices architectures. By understanding its core concepts and features, developers can leverage RabbitMQ to create scalable, fault-tolerant, and high-performance applications.

--

--

Krzysztof Słomka

My name is Krzysztof, I'm a software architect and developer, with experience of leading teams and delivering large scalable projects for over 13 years...