Aller au contenu principal

Ne traiter que le dernier message avec Symfony Messenger

Photo d'Emmanuel BALLERY, fondateur de x10
Emmanuel BALLERY
CTO freelance & Architecte logiciel
calendar_today 10/01/2026
schedule 10 min lecture
Une file d'attente de messages simplifiée par un filtre intelligent

Dans les architectures orientées messages avec Symfony Messenger, il est fréquent de rencontrer des situations où une file d'attente se remplit de milliers de messages redondants.

C'est particulièrement vrai dans les systèmes réactifs où chaque modification de donnée déclenche un événement. Si un utilisateur ou un processus batch modifie 100 fois le même produit en quelques secondes, votre worker va-t-il traiter 100 fois la demande de réindexation Elasticsearch ?

Idéalement, non. On voudrait ne traiter que la dernière version de la demande et ignorer les précédentes qui sont devenues obsolètes avant même d'être traitées.

Le problème : l'engorgement inutile

Prenons un exemple concret. Vous avez une file d'attente qui reçoit des messages pour ajouter des produits et pour indexer ces produits dans un moteur de recherche.

Si votre worker traite les deux premiers messages IndexElasticMessage , il perd du temps et des ressources pour rien, car le troisième message va de toute façon écraser le travail précédent. Sur des volumes importants, cela peut paralyser votre infrastructure.

La solution : Dédoublonner avec Redis

L'idée est simple : utiliser un cache rapide (comme Redis) pour stocker l'identifiant du dernier message émis pour une "famille" donnée (par exemple, l'indexation du produit A).

Le processus se déroule en deux temps :

  1. À l'envoi (Dispatch) : On note dans Redis que pour la famille "product-A", le dernier message valide est le message #3.
  2. À la réception (Consume) : Quand le worker attrape le message #1, il vérifie dans Redis. Il voit que le dernier message valide est le #3. Comme #1 != #3, il ignore le message #1.

Implémentation avec un Middleware

Voici une implémentation propre utilisant un Middleware Symfony Messenger. Contrairement à un EventListener, le Middleware s'insère directement dans le flux de traitement du message.

Cette classe suppose que vos messages hérite d'une classe abstraite AbstractMessage ou implémentent une interface ayant une propriété family (pour grouper les messages) et un messageUuid unique.

<?php declare(strict_types=1);

namespace App\Bridge\Symfony\Messenger\Middleware;

use App\Bridge\Symfony\Messenger\Message\AbstractMessage;
use Psr\Log\LoggerInterface;
use Redis;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

final readonly class DeduplicateFamilyMiddleware implements MiddlewareInterface
{
    private const string PREFIX = 'deduplicate-family';

    public function __construct(
        private Redis $redis,
        private LoggerInterface $logger,
    ) {
    }

    public function handle(
        Envelope $envelope,
        StackInterface $stack,
    ): Envelope {
        $message = $envelope->getMessage();

        if ($message instanceof AbstractMessage && null !== $message->family) {
            if (null !== $envelope->last(ReceivedStamp::class)) {
                // When deduplicating messages per family name, ensure
                // the one received has the same UUID as the one in Redis.
                // If not, it means the message is a duplicate and should
                // be ignored until the last one is received.
                $lastUuid = $this->redis->get(self::PREFIX . "-{$message->family}");
                if ($lastUuid !== $message->messageUuid->toRfc4122()) {
                    $this->logger->debug('ignore message as this is not the last one in the family', ['family' => $message->family]);

                    // Stop propagation to ignore the message (it will be acknowledged)
                    return $envelope;
                }
            } else {
                // Store the last message UUID sent in a family to
                // be able to ignore all the previous ones.
                $this->logger->debug('ignore previous family messages', ['family' => $message->family]);
                $this->redis->set(self::PREFIX . "-{$message->family}", $message->messageUuid->toRfc4122(), [
                    // Expires in 1h if something wrong happens.
                    'EX' => 3600,
                ]);
            }
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

Points clés de cette implémentation

Conclusion

Cette technique est extrêmement efficace pour désengorger les workers lors de pics d'activité redondants. Elle est simple à mettre en place et ne nécessite pas de changer la logique de vos Handlers.

N'oubliez pas d'enregistrer votre middleware dans la configuration de Messenger ( config/packages/messenger.yaml ) pour qu'il soit actif !

framework:
  messenger:
    buses:
      messenger.bus.default:
        middleware:
          - App\Bridge\Symfony\Messenger\Middleware\DeduplicateFamilyMiddleware
Photo d'Emmanuel BALLERY, fondateur de x10

À propos de l'auteur

Emmanuel BALLERY est le fondateur de x10. Expert en architecture logicielle et passionné par la qualité du code (Software Craftsmanship), il aide les entreprises à transformer leur dette technique en actifs durables.

Voir plus arrow_forward