Accès à RabbitMQ depuis Spring

Template

Comme dans d’autres parties de Spring, le template est une couche d’abstraction entre les beans et le client RabbitMQ. Il permet d’envoyer ou de recevoir des messages.

La conception du template est plutôt souple, dans le sens où une instance de template peut servir pour un ou plusieurs exchange(s) et une ou plusieurs clés de routage pour l’envoi. Il peut aussi revevoir sur une ou plusieurs queues.

Par ailleurs, il peut travailler avec des instances de Message (de Spring) ou des types personnalisés pour lesquels il gère la transformation.

  • RabbitTemplate

    • ConnectionFactory connectionFactory

    • String defaultReceiveQueue

    • String exchange

    • String routingKey

    • // Send to default exchange and routing key

    • send(Message message)

    • convertAndSend(Object object)

    • sendAndReceive(Message message): Message

    • convertSendAndReceive(Object message): Object

    • <T> convertSendAndReceiveAsType(Object message, ParameterizedTypeReference<T> responseType): T

    • // Send to specific routing key

    • send(String routingKey, Message message)

    • …​

    • // Send to specific exchange and routing key

    • send(String exchange, String routingKey, Message message)

    • …​

    • // Receive from default queue

    • receive(): Message

    • receiveAndConvert(): Object

    • <T> receiveAndConvert(String queueName, ParameterizedTypeReference<T> type): T

    • <R, S> receiveAndReply(ReceiveAndReplyCallback<R, S> callback): boolean

    • // Receive from specific queue

    • receive(String queueName): Message

Toutes les méthodes de réception peuvent avoir un timeout en paramètre.

Les templates ne sont généralement pas de beans.

Listener Container

Un listener est un objet qui se met à l’écoute sur une queue pour en consommer les messages. Dans Spring, les listeners sont gérés par des conteneurs, eux-même créés par des fabriques.

RabbitListenerContainerFactory<?> -> AbstractMessageListenerContainer -> MessageListener

Le conteneur peut être créé manuellement. Pour ça notre code doit appeler la méthode de création sur la fabrique, ajouter le listener ainsi que la queue, ou les queues, à écouter. Enfin, notre code doit appeler la méthode de démarrage du conteneur. Cette façon de faire permet aussi de configurer le conteneur indépendamment de la fabrique.

Le conteneur peut aussi être créé automatiquement. C’est le fonctionnement avec l’annotation @RabbitListener.

Spring container for RabbitMQ

Spring propose deux sortes de conteneur: simple ou direct.

Direct container

Le conteneur direct est le plus simple à comprendre et aussi le plus récent.

Un conteneur direct peut écouter plusieurs queues. Pour chaque queue, il crée un ou plusieurs consommateurs, selon la configuration.

Le conteneur établit la connexion à RabbitMQ et chaque consommateur ouvre un canal.

Spring direct container for RabbitMQ

Simple container

Le conteneur simple a un fonctionnement un peu plus complexe et est la première implémentation de Spring.

Un conteneur simple peut aussi écouter plusieurs queues. La principale différence, c’est que plusieurs consommateurs écoutent sur l’ensemble des queues.

Le conteneur établit la connexion à RabbitMQ et chaque consommateur ouvre un canal.

Spring simple container for RabbitMQ

Programmation

Quand il s’agit de créer un conteneur, cela peut être fait par code, en invoquant la méthode createListenerContainer() sur la fabrique.

Après la création, nous devons ajouter des queues d’attente et un écouteur de message. Nous pouvons également ajouter des paramètres personnalisés. À la fin, le conteneur doit être explicitement démarré.

// Create
listenerContainer = listenerContainerFactory.createListenerContainer();
// Add queue(s)
listenerContainer.addQueueNames(UPLOAD_QUEUE_NAME);
// Set the message listener
listenerContainer.setMessageListener(this::onMessage);
// Custom settings (direct)
listenerContainer.setConsumersPerQueue(50);
// Start
listenerContainer.start();

Déclaration

Nous pouvons simplement déclarer les méthodes en tant qu’écouteurs de messages avec l’annotation @RabbitListener. Le conteneur est alors créé par Spring Framework.

@RabbitListener(queues = "ClientQueue")
public ClientResponse onClientMessage(ClientRequest request) {
  ...
}

Comparaison

La documentation n’est pas très bavarde sur les points forts et faibles des deux types de conteneurs. La seul chose qui y est bien expliquée, c’est que les conteneurs directs supportent mieux l’ajout et le retrait de queues.

Ajout et retrait de queues

L’ajout des queues à écouter se fait normalement avant le démarrage du conteneur. Il reste possible d’ajouter ou de retirer des queues à un conteneur en cours de fonctionnement.

Pour un conteneur direct, chaque consommateur écoute une queue. L’opération revient à ajouter ou supprimer un consommateur, ce qui se fait réellement dynamiquement sans effet de bord sur les autres consommateurs.

Pour un conteneur simple, l’opération nécessite le redémarrage du conteneur et de tous ses consommateurs.

Beaucoup de queues

Si un même conteneur doit être à l’écoute d’un nombre important de queues, les conteneurs peuvent être limités par le nombre de canaux.

En effet, le conteneur ouvre une connexion et chaque consommateur utilise son propre canal dans cette connexion. Le nombre de consommateurs par conteneur est donc limité par le nombre maximum de canaux par connexion (channel_max). Ce sujet a déjà été traité dans la page sur la configuration RabbitMQ.

Pour un conteneur direct, le nombre de queues qu’il peut écouté est donc channel_max / consumersPerQueue. Pour écouter sur un grand nombre de queues, il faut donc augmenter channel_max et diminuer consumersPerQueue.

Cette dernière modification peut se faire dans les propriétés Spring pour la fabrique ou directement sur le conteneur. On peut aussi garder la valeur par défaut qui est 1.

listenerContainer.setConsumersPerQueue(2);

Pour un conteneur simple, chaque consommateur écoute en même temps sur toutes les queues. La limite n’est donc jamais atteinte, sauf à paramétrer concurrentConsumers et maxConcurrentConsumers avec des valeurs énormes, ce qui est de toute façon contreproductif.

Gros débit de messages

La capacité de traité des messages est forcément limité. Comment est-ce que le conteneur va réagir à un excès de messages?

Avec un conteneur simple, la capacité de traitement des messages dépend du nombre de consommateurs. Si le débit en entrée est supérieur à la capacité des consommateurs, les messages sont mis en buffer.

Avec un conteneur direct, la capacité de traitement dépend aussi du nombre de consommateurs. Mais la réaction à un débit entrant trop important dépend du nombre de queues. Avec un nombre de queues faible, les messages sont aussi mis en buffer, alors qu’avec beaucoup de queues, on a une exception.

ERROR c.r.c.impl.ForgivingExceptionHandler.log(119) - An unexpected connection driver error occurred
  java.util.concurrent.RejectedExecutionException: Task com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable@5f340329
      rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor$1@1de16a73
      [Running, pool size = 20, active threads = 20, queued tasks = 100, completed tasks = 31]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor$1.execute(ThreadPoolTaskExecutor.java:269)
    at com.rabbitmq.client.impl.ConsumerWorkService.addWork(ConsumerWorkService.java:88)
    at com.rabbitmq.client.impl.ConsumerDispatcher.execute(ConsumerDispatcher.java:214)
Il faut que j’approfondisse le fonctionnement du buffer. A priori, il doit être au niveau du consommateur.