RabbitMQ est construit sur AMQP 0.9. Il implémente la notion de queue conformément à cette spécification.
Une queue sert à stocker les messages en vu d’être délivrés à un consommateur.
Caractéristiques
Une queue a les caractéristiques suivantes :
-
name, unique dans le virtual host,
-
durable, elle survit au redémarrage du serveur,
-
exclusive, elle est attachée à une seule connexion dont la fermeture la supprime,
-
auto-delete, elle est supprimée quand le dernier consommateur se désabonne.
Les autres paramètres doivent être fixé grâce aux x-arguments.
-
x-message-ttl, durée de vie des messages s’il n’est pas redéfini au niveau du message,
-
x-max-length et x-max-length-bytes, taille limite de la queue, en nombre de messages et en taille,
-
x-max-priority, priorité maximale des messages.
Définition
"queues": [
{
"name": "q.activity",
"vhost": "/jtips",
"type": "stream",
"arguments": {
"x-queue-type": "stream",
"x-message-ttl": 60000,
"x-max-length": 1000
},
"durable": true,
"auto_delete": false,
"exclusive": false,
}
]
Client Java
Déclaration d’une queue
Habituellement, on déclare une queue avec toutes ses caractéristiques.
channel.queueDeclare(
"q.activity",
true, // durable
false, // not exclusive,
false, // not auto-delete
Map.of() // no argument
);
Dans certaines conditions, on déclare une queue sans paramètre. Dans ce cas, elle est nommée par le broker, et on récupère son nom.
String queueName = channel.queueDeclare().getQueue();
Utilisation d’une queue
En AMQP 0.9, utiliser une queue signifie lire ses messages.
Ça peut se faire de façon synchrone par basic.get
.
Channel channel = connectionFactory
.createConnection()
.createChannel();
channel.basicGet(queueName, true); // auto-ack mode
On préfère généralement le faire de façon asynchrone, sous forme d’abonnement, par basic.consume
.
Cette façon de faire permet en plus de passer des paramètres.
Channel channel = connectionFactory
.createConnection()
.createChannel(false);
channel.basicConsume(
queueName, true, // auto-ack mode
new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
...
}
});
Spring AMQP
Déclaration d’une queue
@Bean
public class MessageService {
private final AmqpAdmin admin;
public void createQueue(String name) {
// Declare queue:
// - creation if needed,
// - nothing if already exists,
// - error 406 if not compliant with existing
admin.declareQueue(
QueueBuilder.durable(name)
.ttl(60_000)
.build());
}
}
Utilisation d’une queue
public class MessageService {
private RabbitTemplate rabbitTemplate;
public void send(String name, String key, String message)
rabbitTemplate.setExchange(name);
rabbitTemplate.setRoutingKey(key);
rabbitTemplate.convertAndSend(message);
}
// ou
@Bean
public Consumer<String> consumer(RabbitTemplate rabbitTemplate) {
return message
-> log.info("Received '{}' on queue {}", message, name));
}
}
Références