Queues dans RabbitMQ

RabbitMQ est construit sur AMQP 0.9. Il implémente la notion de queue conformément à cette spécification.

Une queue sert à stocker les messages en vu d’être délivrés à un consommateur.

Caractéristiques

Une queue a les caractéristiques suivantes :

  • name, unique dans le virtual host,

  • durable, elle survit au redémarrage du serveur,

  • exclusive, elle est attachée à une seule connexion dont la fermeture la supprime,

  • auto-delete, elle est supprimée quand le dernier consommateur se désabonne.

Les autres paramètres doivent être fixé grâce aux x-arguments.

  • x-queue-type: classique (vide), quorum ou stream,

  • x-message-ttl, durée de vie des messages s’il n’est pas redéfini au niveau du message,

  • x-max-length et x-max-length-bytes, taille limite de la queue, en nombre de messages et en taille,

  • x-max-priority, priorité maximale des messages.

Définition

  "queues": [
    {
      "name": "q.activity",
      "vhost": "/jtips",
      "type": "stream",
      "arguments": {
        "x-queue-type": "stream",
        "x-message-ttl": 60000,
        "x-max-length": 1000
      },
      "durable": true,
      "auto_delete": false,
      "exclusive": false,
    }
  ]

Client Java

Déclaration d’une queue

Habituellement, on déclare une queue avec toutes ses caractéristiques.

channel.queueDeclare(
    "q.activity",
    true,         // durable
    false,        // not exclusive,
    false,        // not auto-delete
    Map.of()      // no argument
);

Dans certaines conditions, on déclare une queue sans paramètre. Dans ce cas, elle est nommée par le broker, et on récupère son nom.

String queueName = channel.queueDeclare().getQueue();

Utilisation d’une queue

En AMQP 0.9, utiliser une queue signifie lire ses messages.

Ça peut se faire de façon synchrone par basic.get.

Channel channel = connectionFactory
                      .createConnection()
                      .createChannel();
channel.basicGet(queueName, true); // auto-ack mode

On préfère généralement le faire de façon asynchrone, sous forme d’abonnement, par basic.consume. Cette façon de faire permet en plus de passer des paramètres.

Channel channel = connectionFactory
                      .createConnection()
                      .createChannel(false);
channel.basicConsume(
    queueName, true, // auto-ack mode
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(
              String consumerTag,
              Envelope envelope,
              AMQP.BasicProperties properties,
              byte[] body) {
            ...
        }
    });

Spring AMQP

Déclaration d’une queue

@Bean
public class MessageService {
  private final AmqpAdmin admin;

  public void createQueue(String name) {
    // Declare queue:
    // - creation if needed,
    // - nothing if already exists,
    // - error 406 if not compliant with existing
    admin.declareQueue(
        QueueBuilder.durable(name)
                    .ttl(60_000)
                    .build());
  }
}

Utilisation d’une queue

public class MessageService {
  private RabbitTemplate rabbitTemplate;

  public void send(String name, String key, String message)
    rabbitTemplate.setExchange(name);
    rabbitTemplate.setRoutingKey(key);
    rabbitTemplate.convertAndSend(message);
  }

  // ou

  @Bean
  public Consumer<String> consumer(RabbitTemplate rabbitTemplate) {
    return message
        -> log.info("Received '{}' on queue {}", message, name));
  }
}