Dans les architectures orientées messages avec Symfony Messenger, il est fréquent de rencontrer des situations où une file d'attente se remplit de milliers de messages redondants.
C'est particulièrement vrai dans les systèmes réactifs où chaque modification de donnée déclenche un événement. Si un utilisateur ou un processus batch modifie 100 fois le même produit en quelques secondes, votre worker va-t-il traiter 100 fois la demande de réindexation Elasticsearch ?
Idéalement, non. On voudrait ne traiter que la dernière version de la demande et ignorer les précédentes qui sont devenues obsolètes avant même d'être traitées.
Le problème : l'engorgement inutile
Prenons un exemple concret. Vous avez une file d'attente qui reçoit des messages pour ajouter des produits et pour indexer ces produits dans un moteur de recherche.
- AddProductMessage (Produit A)
- AddProductMessage (Produit B)
- IndexElasticMessage (Produit A) - Obsolète
- AddProductMessage (Produit A - mise à jour prix)
- IndexElasticMessage (Produit A) - Obsolète
- AddProductMessage (Produit A - mise à jour stock)
- IndexElasticMessage (Produit A) <-- On ne veut garder que celui-ci
Si votre worker traite les deux premiers messages
IndexElasticMessage
, il
perd du temps et des ressources pour rien, car le troisième message va de toute façon
écraser le travail précédent. Sur des volumes importants, cela peut paralyser votre
infrastructure.
La solution : Dédoublonner avec Redis
L'idée est simple : utiliser un cache rapide (comme Redis) pour stocker l'identifiant du dernier message émis pour une "famille" donnée (par exemple, l'indexation du produit A).
Le processus se déroule en deux temps :
- À l'envoi (Dispatch) : On note dans Redis que pour la famille "product-A", le dernier message valide est le message #3.
- À la réception (Consume) : Quand le worker attrape le message #1, il vérifie dans Redis. Il voit que le dernier message valide est le #3. Comme #1 != #3, il ignore le message #1.
Implémentation avec un Middleware
Voici une implémentation propre utilisant un Middleware Symfony Messenger. Contrairement à un EventListener, le Middleware s'insère directement dans le flux de traitement du message.
Cette classe suppose que vos messages hérite d'une classe abstraite
AbstractMessage
ou implémentent une interface ayant une propriété
family
(pour grouper les messages)
et un
messageUuid
unique.
<?php declare(strict_types=1);
namespace App\Bridge\Symfony\Messenger\Middleware;
use App\Bridge\Symfony\Messenger\Message\AbstractMessage;
use Psr\Log\LoggerInterface;
use Redis;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
final readonly class DeduplicateFamilyMiddleware implements MiddlewareInterface
{
private const string PREFIX = 'deduplicate-family';
public function __construct(
private Redis $redis,
private LoggerInterface $logger,
) {
}
public function handle(
Envelope $envelope,
StackInterface $stack,
): Envelope {
$message = $envelope->getMessage();
if ($message instanceof AbstractMessage && null !== $message->family) {
if (null !== $envelope->last(ReceivedStamp::class)) {
// When deduplicating messages per family name, ensure
// the one received has the same UUID as the one in Redis.
// If not, it means the message is a duplicate and should
// be ignored until the last one is received.
$lastUuid = $this->redis->get(self::PREFIX . "-{$message->family}");
if ($lastUuid !== $message->messageUuid->toRfc4122()) {
$this->logger->debug('ignore message as this is not the last one in the family', ['family' => $message->family]);
// Stop propagation to ignore the message (it will be acknowledged)
return $envelope;
}
} else {
// Store the last message UUID sent in a family to
// be able to ignore all the previous ones.
$this->logger->debug('ignore previous family messages', ['family' => $message->family]);
$this->redis->set(self::PREFIX . "-{$message->family}", $message->messageUuid->toRfc4122(), [
// Expires in 1h if something wrong happens.
'EX' => 3600,
]);
}
}
return $stack->next()->handle($envelope, $stack);
}
}
Points clés de cette implémentation
-
MiddlewareInterface : Le middleware intercepte le message
à deux moments clés : lors du dispatch (pas de
ReceivedStamp) et lors de la consommation (présence deReceivedStamp). - Redis : Utilisé pour sa rapidité et sa capacité à gérer des clés avec expiration (TTL) pour éviter de polluer la mémoire indéfiniment.
-
Arrêt de la propagation : Si le message est obsolète, on retourne
simplement l'enveloppe sans appeler
$stack->next()->handle(). Cela arrête la chaîne de middlewares et empêche le Handler d'être exécuté. Le message est considéré comme traité avec succès (ack).
Conclusion
Cette technique est extrêmement efficace pour désengorger les workers lors de pics d'activité redondants. Elle est simple à mettre en place et ne nécessite pas de changer la logique de vos Handlers.
N'oubliez pas d'enregistrer votre middleware dans la configuration de Messenger (
config/packages/messenger.yaml
) pour qu'il soit actif !
framework:
messenger:
buses:
messenger.bus.default:
middleware:
- App\Bridge\Symfony\Messenger\Middleware\DeduplicateFamilyMiddleware