Spring Integration supporte l’échange de messages asynchrone sans avoir besoin d’ajouter de librairie externe. Il supporte aussi bien le mode point-to-point que le mode publish-subscribe, avec plusieurs variantes.
Principaux concepts
Channel
Un channel est le canal de communication entre clients.
Il y a principalement trois interfaces dans Spring Framework (module spring-messaging
, pas besoin de Spring Integration).
MessageChannel
sert à envoyer des messages.
-
-
send(Message message): boolean
-
send(Message message, long timeout): boolean
-
MessageChannel channel = ...;
channel.send(new GenericMessage<>(content));
PollableChannel
hérite de MessageChannel
.
En plus de l’envoi de message, elle sert à réceptionner des messages de façon bloquante.
-
PollableChannel
→ MessageChannel-
receive(): Message<?>
-
receive(long timeout): Message<?>
-
PollableChannel channel = ...;
Message message = channel.receive();
SubscribableChannel
hérite de MessageChannel
.
En plus de l’envoi de message, elle sert à s’abonner à la réception des futurs messages.
-
SubscribableChannel
→ MessageChannel-
subscribe(MessageHandler handler): boolean
-
unsubscribe(MessageHandler handler): boolean
-
SubscribableChannel channel = ...;
MessageHandler handler = message -> ...
channel.subscribe(handler);
Message
L’interface définissant un message est dans Spring Framework. Elle est assez simple, avec des headers et un contenu de type libre.
-
-
getHeaders(): MessageHeaders
-
getPayload(): T
-
L’implémentation classique de Message<T>
est GenericMessage<T>
, immuable avec des headers et un contenu générique.
-
GenericMessage<T>
→ Message<T>-
GenericMessage(T payload)
-
GenericMessage(T payload, Map<String,Object> headers)**
-
getHeaders(): MessageHeaders
-
getPayload(): T
-
ErrorMessage
est une sous-classe de c
, et contient un Throwable en contenu.
Ce type de message peut aussi contenir le message qui serait à l’origine de l’erreur.
-
GenericMessage<T>
→ GenericMessage<Throwable>-
ErrorMessage(…)
-
getHeaders(): MessageHeaders
-
getPayload(): Throwable
-
getOriginalMessage(): Message<?>
-
Cette interface et ces deux classes sont dans Spring Framework.
Spring Integration propose aussi MutableMessage<T>
qui, contrairement à GenericMessage<T>
autorise la modification des headers.
Cette classe doit être utilisée avec précaution car rien ne garantie qu’elle soit thread safe.
Il propose aussi AdviceMessage<T>
a un contenu générique, et le message d’entrée, et est utilisé avec de l’AOP.
Modèles de communication
Spring Framework n’a qu’une seule implémentation de canal, ExecutorSubscribableChannel
, en mode publish-subscribe et qui implémente SubscribableChannel
.
Spring Integration est beaucoup plus riche.
-
PublishSubscribeChannel
en mode publish-subscribe, implémenteSubscribableChannel
-
QueueChannel
en mode point-to-point et FIFO, implémentePollableChannel
-
PriorityChannel
en mode point-to-point avec un ordre de priorité des messages, hérite de QueueChannel, implémentePollableChannel
-
RendezvousChannel
en mode point-to-point en bloquant le producteur, utilisable pour faire du request-reply, hérite deQueueChannel
-
DirectChannel
en mode point-to-point, mais en implémentantSubscribableChannel
, -
ExecutorChannel
en mode point-to-point, ressemble auDirectChannel
mais avec unsend(…)
non bloquant
Publish-subscribe
Dans ce mode, chaque message envoyé dans le canal peut être réceptionné par plusieurs consommateurs.
┌────────────┐ ┌───────────┐ ┌─────────────────────────┐ ┌──>| Consumer#1 | | Publisher ├──────>| PublishSubscribeChannel ├───┼──>| Consumer#2 | └───────────┘ └─────────────────────────┘ └──>| Consumer#3 | └────────────┘
-
PublishSubscribeChannel
→ SubscribableChannel-
PublishSubscribeChannel()
-
PublishSubscribeChannel(boolean requireSubscribers)
-
PublishSubscribeChannel(Executor executor)
-
PublishSubscribeChannel(Executor executor, boolean requireSubscribers)
-
setApplySequence(boolean applySequence)
-
setErrorHandler(ErrorHandler errorHandler)
-
setIgnoreFailures(boolean ignoreFailures)
-
setMinSubscribers(int minSubscribers)
-
send(Message message): boolean
-
send(Message message, long timeout): boolean
-
subscribe(MessageHandler handler): boolean
-
unsubscribe(MessageHandler handler): boolean
-
Avec le constructeur par défaut, la consommation des messages se fait dans le même thread que leur envoi. Pour découpler la consommation de l’envoi, il faut associer un executor.
Le paramètre de constructeur requireSubscribers
permet de vérifier si le channel a des consommateurs.
Dans ce cas, un message envoyé à un canal sans consommateur va déclencher une exception.
MessageChannel channel = new PublishSubscribeChannel(true);
channel.send(new GenericMessage<>(content));
org.springframework.integration.MessageDispatchingException:
Dispatcher has no subscribers, failedMessage=GenericMessage
[payload=Hello,
headers={id=c77a385e-a3fc-2874-3434-23c24c4892a2,
timestamp=1648380606878}]
ExecutorSubscribableChannel
est aussi utilisable, un peu plus simple.
Il peut aussi fonctionner de façon synchrone ou avec un executor.
Ça permet d’avoir un event bus dans Spring Framework, sans Spring Integration.
Point-to-point
Dans ce mode, chaque message envoyé dans le canal ne sera réceptionné que par un consommateur, même si plusieurs sont connectés. De plus, le message reste dans le canal tant qu’il n’est pas consommé.
┌────────────┐ ┌───────────┐ ┌──────────────┐ | Consumer#1 | | Publisher ├──────>| QueueChannel ├───┐ | Consumer#2 | └───────────┘ └──────────────┘ └──>| Consumer#3 | └────────────┘
QueueChannel
fonctionne avec une file de messages.
Lorsqu’un producteur envoie un message, celui-ci est stocké dans la file jusqu’à ce qu’un consomateur vienne le récupérer.
Par défaut, cette file n’est pas limité en taille.
Les messages sont mis à disposition des consommateurs dans leur ordre d’arrivée : c’est du FIFO.
-
QueueChannel
→ PollableChannel-
QueueChannel()
-
QueueChannel(int capacity)
-
QueueChannel(Queue<Message<?>> queue)
-
send(Message<?> message): boolean
-
send(Message<?> message, long timeout): boolean
-
purge(MessageSelector selector)
-
receive(): Message<?>
-
receive(long timeout): Message<?>
-
// Publisher
MessageChannel channel = new QueueChannel();
channel.send(new GenericMessage<>(content));
// Consumer
PollableChannel channel = ...; // find the channel
Message<?> message = channel.receive();
PriorityChannel
fonctionne de façon similaire, avec un notion de priorité qui détermine l’ordre de mise à disposition.
Par défaut, les messages sont triés par le header priority.
De façon optionnelle, on peut utiliser un comparateur de messages.
-
PriorityChannel
→ QueueChannel-
PriorityChannel()
-
PriorityChannel(int capacity)
-
PriorityChannel(Comparator<Message<?>> comparator)
-
PriorityChannel(int capacity, Comparator<Message<?>> comparator)
-
// Publisher
MessageChannel channel = new QueueChannel();
channel.send(
new GenericMessage<>(
content,
// Priority header
Map.of(IntegrationMessageHeaderAccessor.PRIORITY, 1)
)
);
Point-to-point avec abonnement
Dans le chapitre précédent, les consommateurs doivent prélever les messages un à un.
Avec DirectChannel
, les consommateurs s’abonnent à l’arrivée de nouveaux messages.
L’API ressemble à celle du mode pub/sub, mais chaque message n’est délivré qu’à un seul consommateur.
-
DirectChannel
→ SubscribableChannel-
DirectChannel()
-
DirectChannel(LoadBalancingStrategy loadBalancingStrategy)
-
send(Message message): boolean
-
send(Message message, long timeout): boolean
-
subscribe(MessageHandler handler): boolean
-
unsubscribe(MessageHandler handler): boolean
-
// Consumer
SubscribableChannel channel = new DirectChannel();
channel.subscribe(message -> ...);
// Publisher
MessageChannel channel = ...; // find the channel
channel.send(new GenericMessage<>(content));
Si plusieurs consommateurs souscrivent aux messages d’un canal, chaque message n’est consommé que par un seul d’entre eux.
Par défaut, la répartission des messages entre les consommateurs se fait en round robin.
Une répartission différente peut être choisie en passant une LoadBalancingStrategy
à la création du canal.
Par contre, le choix est limité puisque seul le round robin est disponible dans Spring Integration. Pour une autre stratégie, il faut implémenter soi-même l’interface.
-
-
getHandlerIterator(Message<?> message, Collection<MessageHandler> handlers): Iterator<MessageHandler>
-
ExecutorChannel
est une variation sur le même thème où l’envoi des messages se fait de façon non bloquante, via les threads d’un executor.
-
ExecutorChannel
→ SubscribableChannel-
ExecutorChannel(Executor executor)
-
ExecutorChannel(Executor executor, LoadBalancingStrategy loadBalancingStrategy)
-
Requête / réponse
Le mode requête / réponse est un dérivé du point à point. Le message initial est envoyé puis consommé en point à point. Une fois le message envoyé, le producteur se met en attente d’un message de réponse sur une file temporaire. Et quand le consommateur reçoit le message, il le traite et envoie un message de réponse dans cette file temporaire.
┌────────────┐ ┌───────────┐ ┌──────────────┐ | Consumer#1 | | Publisher ├──────>| QueueChannel ├───┐ | Consumer#2 | └───────────┘<┐ └──────────────┘ └──>| Consumer#3 | | └──────┬─────┘ | ┌───────────────────┐ | └────┤ RendezvousChannel |<─────────┘ └───────────────────┘
RendezvousChannel
est approprié pour servir de file temporaire.
Il est bloquant des deux cotés, y compris pour le producteur qui est mis en attente d’un consommateur.
// Publisher
PollableChannel replyChannel = new RendezvousChannel();
Message<?> message =
new GenericMessage<>(
content,
Map.of(MessageHeaders.REPLY_CHANNEL, replyChannel)
);
channel.send(message);
Message<?> resultMessage = replyChannel.receive();
On peut simplifier le code avec GenericMessagingTemplate
.
GenericMessagingTemplate n’utilise pas de RendezvousChannel , mais un canal privé qui ne peut contenir qu’un seul message.
|
Annotations
Spring Integration propose un stéréotype de composant avec @MessageEndpoint.
@MessageEndpoint
public class EventEndpoint {
// ...
}
@ServiceActivator
Cette annotation doit être apposée sur une méthode d’un bean, de préférence annotée avec @MessageEndpoint
.
Elle permet de s’abonner aux messages d’un canal, qu’il soit subscribable ou pollable.
@MessageEndpoint
public class EventEndpoint {
@ServiceActivator(inputChannel = "channel/subscribable")
public void onMessage(String message) {
// ...
}
}
Le paramètre d’entrée de la méthode peut être un Message
ou directement le type du contenu, comme dans l’exemple ci-dessus.
Dans le cas du contenu, on peut aussi ajouter des paramètres d’en-tête, annotés avec @Header
, ou un paramètre pour l’ensemble des valeurs d’en-tête, de type Map<String, Object>
et annoté avec @Headers
.
@MessageEndpoint
public class EventEndpoint {
@ServiceActivator(inputChannel = "channel/subscribable")
public void onMessage(String message, @Header("token") String token) {
// ...
}
}
@Poller
Si le canal est pollable, il faut associé au poller. Il peut être défini sous forme d’un bean global.
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1, TimeUnit.SECONDS));
return pollerMetadata;
}
Dans l’exemple présenté ici, la récupération des messages se fait toutes les secondes. Les triggers sont ceux de Spring Task Executor.
Pour qu’une méthode utilise un poller alternatif, il faut utiliser l’annotation @Poller
en passant le nom du bean.
@MessageEndpoint
public class EventEndpoint {
@ServiceActivator(
inputChannel = "channel/subscribable",
poller = @Poller("sw.slowPoller"))
public void onMessage(String message, @Header("token") String token) {
// ...
}
}
La même annotation peut être utilisée pour utiliser un poller interne.
@MessageEndpoint
public class EventEndpoint {
@ServiceActivator(
inputChannel = "channel/pollable",
poller = @Poller(fixedDelay = "1000"))
public void onMessage(String message, @Header("token") String token) {
// ...
}
}
@ServiceActivator
avec retour
La méthode peut aussi retourner un message, ou un objet quelconque qui servira de contenu à un message.
Ce message sera envoyé dans le canal spécifié comme outputChannel
.
@ServiceActivator(
inputChannel = "channel/pollable",
outputChannel = "channel/subscribable")
public String onMessage(String message) {
systemLogger.log(INFO, "Message received on channel/pollable: " + message);
return "Reply-" + message;
}
Si aucun outputChannel
n’est pas spécifié, le retour est envoyé dans le replyChannel
du message entrant.
Et s’il n’y en a pas non plus, une exception est levée.
Si le message a un replyChannel et qu’un outputChannel est spécifié, alors c’est outputChannel qui est utilisé.
|
@Publisher
On peut aussi envoyer des messages via des méthodes annotées. Pour que ça fonctionne, il faut activer spécifiquement la fonctionnalité.
@Configuration
@EnableIntegration
@EnablePublisher
public class IntegrationConfiguration {
// ...
}
Après ça, on peut annoter des méthodes de bean avec @Publisher
.
L’appel de ces méthodes déclenche l’envoi d’un message avec le résultat de la méthode en payload.
Le nom passé en paramètre est le nom de bean du canal.
@Publisher("channel/publish")
public String publish(String message) {
// ...
return result;
}
Comme pour les méthodes annotées avec @ServiceActivator
, il est possible d’ajouter des paramètres d’en-tête annotés avec @Header
.
@Publisher("channel/publish")
public String publish(String message, @Header("token") String token) {
// ...
return result;
}
Concepts avancés
ChannelInterceptor
Comme le nom l’indique bien, un ChannelInterceptor
est rattaché à un Channel
.
Il s’insère au niveaux des interactions entre le message et le canal, avant/après l’envoi et avant/après la réception.
-
-
preSend(Message<?> message, MessageChannel channel): Message<?>
-
postSend(Message<?> message, MessageChannel channel, boolean sent)
-
afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex)
-
preReceive(MessageChannel channel): boolean
-
postReceive(Message<?> message, MessageChannel channel): Message<?>
-
afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex)
-
Toutes les méthodes de l’interface sont default
.
On n’implémente que celles qu’on veut effectivement redéfinir.
@Component
public class SecurityInterceptor implements ChannelInterceptor {
private final SecurityService securityService;
public SecurityInterceptor(SecurityService securityService) {
this.securityService = securityService;
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
String token = message.getHeaders().get("token", String.class);
securityService.validateToken(token);
return ChannelInterceptor.super.preSend(message, channel);
}
}
L’intercepteur peut être associé à un ou plusieurs canaux. De même un canal peut avoir plusieurs intercepteurs.
@Bean
public PollableChannel pollableChannel(SecurityInterceptor securityInterceptor) {
QueueChannel channel = new QueueChannel();
channel.addInterceptor(securityInterceptor);
return channel;
}
Un intercepteur peut fonctionner de façon non intrusive, pour du logging par exemple. Il peut empêcher l’envoi du message, dans une logique de sécurité. Il peut aussi transformer un message avant l’envoi ou après la réception.
EIP patterns
Spring Integration implémente plein de patterns d’intégration. Ça fait plusieurs sujet à approfondir sur ce thème.