Aller au contenu principal

Traiter le dernier message avec Symfony Messenger

Photo d'Emmanuel BALLERY, fondateur de x10
Emmanuel BALLERY
CTO freelance & Architecte logiciel
calendar_today 10/01/2026
schedule 10 min lecture
Une file d'attente de messages simplifiée par un filtre intelligent

Dans les architectures orientées messages avec Symfony Messenger, il est fréquent de rencontrer des situations où une file d'attente se remplit de milliers de messages redondants. Ce phénomène, bien connu des ingénieurs systèmes, peut rapidement transformer une application réactive en un goulot d'étranglement coûteux.

C'est particulièrement vrai dans les systèmes réactifs où chaque modification de donnée déclenche un événement (Event-Driven Architecture). Si un utilisateur ou un processus batch modifie 100 fois le même produit en quelques secondes, votre worker va-t-il traiter 100 fois la demande de réindexation Elasticsearch ?

Idéalement, non. On voudrait ne traiter que la dernière version de la demande et ignorer les précédentes qui sont devenues obsolètes avant même d'être traitées. Dans cet article, nous allons explorer comment mettre en place une stratégie de dédoublonnement intelligente basée sur Redis et les Middlewares Symfony.

Le problème : L'engorgement par redondance

Imaginons un cas d'usage classique : un catalogue e-commerce synchronisé avec un moteur de recherche comme Algolia ou Elasticsearch. Chaque mise à jour de prix, de stock ou de description pousse un message IndexProductMessage dans une file d'attente RabbitMQ ou Redis.

Lors d'une mise à jour massive via un import CSV ou une synchronisation ERP, un même produit peut être modifié trois ou quatre fois en l'espace de 500 millisecondes.

Si votre worker traite les deux premiers messages IndexElasticMessage, il consomme inutilement du temps CPU, de la bande passante réseau et de l'I/O sur votre moteur de recherche. Pire encore, si le traitement d'un message prend 200ms, vous venez de perdre 400ms avant de traiter l'information la plus fraîche. Sur 10 000 produits, cela représente des heures de retard accumulées.

Idempotence vs Dédoublonnement

Il est crucial de distinguer ces deux concepts. L'idempotence est une garantie de sécurité : elle assure que traiter deux fois le même message n'aura pas d'effet secondaire indésirable (ex: ne pas débiter deux fois un client).

Le dédoublonnement de flux (ou stream deduplication) est une optimisation de performance : on ne cherche pas seulement à éviter les erreurs, mais à supprimer purement et simplement les étapes de calcul inutiles. L'idempotence s'assure que le résultat est correct ; le dédoublonnement s'assure que le résultat est obtenu le plus vite possible.

La stratégie : La notion de "Famille" de messages

Pour dédoublonner efficacement, nous devons pouvoir identifier les messages qui "disputent" la même ressource. C'est ce que nous appellerons la famille.

Une famille est la combinaison d'un type d'action et d'un identifiant de ressource. Par exemple : product-indexing-123 ou user-avatar-generation-456.

L'idée est d'utiliser Redis pour stocker une empreinte (un UUID) du dernier message envoyé pour chaque famille. Tout message arrivant dans le worker dont l'UUID ne correspond plus à l'empreinte stockée dans Redis est considéré comme "dépassé".

Implémentation technique

1. Définir le contrat de message

Tous les messages ne sont pas dédoublonnables. Il faut donc une interface ou une classe abstraite pour identifier ceux qui le sont. Nous aurons besoin d'un UUID unique par instance de message et d'un nom de famille.

<?php declare(strict_types=1);

namespace App\Bridge\Symfony\Messenger\Message;

use Symfony\Component\Uid\Uuid;

abstract readonly class AbstractMessage
{
    public Uuid $messageUuid;

    public function __construct(
        public ?string $family = null,
    ) {
        $this->messageUuid = Uuid::v4();
    }
}

2. Le Middleware de dédoublonnement

Le Middleware est l'endroit parfait pour cette logique car il intercepte le message au moment de l'envoi (pour enregistrer l'intention) et au moment de la consommation (pour valider si l'intention est toujours d'actualité).

<?php declare(strict_types=1);

namespace App\Bridge\Symfony\Messenger\Middleware;

use App\Bridge\Symfony\Messenger\Message\AbstractMessage;
use Psr\Log\LoggerInterface;
use Redis;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

final readonly class DeduplicateFamilyMiddleware implements MiddlewareInterface
{
    private const string PREFIX = 'deduplicate-family';

    public function __construct(
        private Redis $redis,
        private LoggerInterface $logger,
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();

        if (!$message instanceof AbstractMessage || null === $message->family) {
            return $stack->next()->handle($envelope, $stack);
        }

        $key = sprintf('%s-%s', self::PREFIX, $message->family);

        // CAS 1 : On vient de recevoir le message du transport (Worker)
        if (null !== $envelope->last(ReceivedStamp::class)) {
            $lastUuid = $this->redis->get($key);

            if ($lastUuid !== $message->messageUuid->toRfc4122()) {
                $this->logger->info('Deduplication: Ignoring obsolete message', [
                    'family' => $message->family,
                    'msg_uuid' => $message->messageUuid->toRfc4122(),
                    'last_valid_uuid' => $lastUuid
                ]);

                // On arrête la propagation : le handler ne sera jamais appelé
                return $envelope;
            }
        }
        // CAS 2 : On est en train de dispatcher le message (Producteur)
        else {
            $this->redis->set($key, $message->messageUuid->toRfc4122(), [
                'EX' => 3600, // TTL de 1h pour éviter les fuites de mémoire
            ]);

            $this->logger->debug('Deduplication: Tracking last message for family', [
                'family' => $message->family,
                'uuid' => $message->messageUuid->toRfc4122()
            ]);
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

3. Exemple concret : Indexation d'un produit

Pour mettre cela en pratique, voici comment vous pourriez définir votre message d'indexation. Notez comment nous construisons le nom de la famille de manière unique pour chaque produit.

<?php declare(strict_types=1);

namespace App\Module\Product\Messenger;

use App\Bridge\Symfony\Messenger\Message\AbstractMessage;

final readonly class IndexProductMessage extends AbstractMessage
{
    public function __construct(
        public int $productId,
    ) {
        // La famille permet de grouper tous les messages concernant ce produit précis
        parent::__construct(family: "product-indexing-{$this->productId}");
    }
}

Mise en place d'un test unitaire

Il est vital de tester votre middleware pour s'assurer qu'il ne bloque pas tous les messages par erreur. Voici un exemple de test utilisant PHPUnit et un mock de Redis.

<?php declare(strict_types=1);

namespace App\Tests\Bridge\Symfony\Messenger\Middleware;

use App\Bridge\Symfony\Messenger\Middleware\DeduplicateFamilyMiddleware;
use App\Module\Product\Messenger\IndexProductMessage;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Redis;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

final class DeduplicateFamilyMiddlewareTest extends TestCase
{
    public function testItIgnoresObsoleteMessages(): void
    {
        $redis = $this->createMock(Redis::class);
        $logger = $this->createMock(LoggerInterface::class);
        $stack = $this->createMock(StackInterface::class);

        $message = new IndexProductMessage(123);
        $envelope = new Envelope($message, [new ReceivedStamp('transport')]);

        // On simule que Redis contient un UUID différent (un message plus récent a été envoyé)
        $redis->method('get')->willReturn('another-uuid');

        // On s'attend à ce que le logger soit appelé
        $logger->expects($this->once())->method('info');

        // On s'attend à ce que $stack->next() NE soit PAS appelé
        $stack->expects($this->never())->method('handle');

        $middleware = new DeduplicateFamilyMiddleware($redis, $logger);
        $result = $middleware->handle($envelope, $stack);

        $this->assertSame($envelope, $result);
    }
}

Considérations sur la performance de Redis

L'utilisation de Redis introduit un léger overhead (un appel réseau supplémentaire par message). Cependant, cet overhead est négligeable comparé au coût d'un traitement métier complet. Dans un réseau local ou sur un cluster Kubernetes, un appel Redis prend généralement moins d'une milliseconde.

Si votre volume de messages est colossal (plusieurs dizaines de milliers par seconde), veillez à utiliser un pool de connexions Redis persistant pour éviter la surcharge liée à l'ouverture de sockets TCP à chaque itération du middleware.

Aller plus loin : Gestion des séquences

Dans certains cas complexes, l'ordre d'arrivée des messages peut être différent de l'ordre d'émission (si vous utilisez plusieurs partitions RabbitMQ ou des délais variables). Pour gérer cela, vous pouvez remplacer l'UUID par un timestamp ou un numéro de séquence croissant.

Le middleware ne traiterait alors le message que si son numéro de séquence est strictement supérieur au dernier numéro traité. Cela garantit non seulement que vous traitez le dernier message émis, mais que vous ne traitez jamais un message "plus vieux" qui arriverait tardivement suite à un problème réseau.

La concurrence (Race Conditions)

Que se passe-t-il si deux instances du même message sont envoyées simultanément ? Redis étant single-threaded et les opérations SET étant atomiques, le dernier arrivé gagnera toujours. Cependant, si vous avez besoin d'une précision chirurgicale, vous pourriez utiliser un script Lua pour vous assurer que l'UUID ne change que si une condition temporelle est respectée.

Le TTL (Time To Live)

Il est indispensable de définir une expiration sur vos clés Redis. Si un message est perdu ou si le worker plante avant de traiter le dernier message, la clé pourrait rester indéfiniment dans Redis. Une valeur de 1 heure (3600 secondes) est généralement suffisante pour couvrir le temps d'attente dans n'importe quelle file raisonnablement configurée.

Retries et échecs de traitement

Si un message échoue et que Symfony Messenger tente de le rejouer (RetryStrategy), notre middleware doit-il l'ignorer ?

Dans notre implémentation actuelle, oui. Si un message plus récent a été envoyé entre-temps, le message en échec est devenu obsolète. C'est un avantage majeur : vous ne perdez pas de temps à "réparer" un état ancien si un état plus récent attend déjà dans la file.

Alternative : Le Debouncing avec DelayStamp

Une autre approche pour limiter la charge est de retarder l'envoi du message. C'est ce qu'on appelle le debouncing. Avec Symfony Messenger, on peut ajouter un DelayStamp lors du dispatching.

// Retarder l'exécution de 5 secondes
$bus->dispatch(new IndexProductMessage(123), [
    new DelayStamp(5000)
]);

Cependant, le délai pur n'empêche pas l'empilement. Si vous envoyez 100 messages en une seconde avec un délai de 5 secondes, vous aurez toujours 100 messages qui arriveront 5 secondes plus tard dans votre worker. Votre worker devra quand même les traiter tous un par un.

L'avantage de notre solution avec Redis est qu'elle est filtrante : si 100 messages arrivent, le middleware en éliminera 99 instantanément (quelques millisecondes d'appel Redis) au lieu de laisser votre Handler métier (probablement lent) s'exécuter 100 fois. Le debouncing est utile pour étaler la charge, mais le dédoublonnement par famille est utile pour la supprimer.

Gestion de la mémoire Redis

L'un des risques majeurs de cette technique est la consommation de mémoire dans votre instance Redis (Memory Leak). Chaque nouvelle famille de message crée une clé. Si vous avez des millions de produits et de nombreuses actions différentes, vous pourriez facilement stocker des gigaoctets de données inutiles.

C'est pourquoi l'utilisation de l'option EX (Expire) dans la commande SET est obligatoire. Dans notre middleware, nous avons mis 3600 secondes. Voici pourquoi ce choix :

Si vous utilisez Redis spécifiquement pour ce middleware, vous pouvez configurer votre instance avec une politique d'éviction de type volatile-lru. Cela dira à Redis de supprimer prioritairement les clés ayant une date d'expiration (nos familles) en utilisant l'algorithme "Least Recently Used" si la mémoire vient à manquer.

Ignorer des messages silencieusement peut rendre le debugging difficile. Il est conseillé :

  1. D'utiliser des logs avec un niveau info ou notice pour tracer les messages ignorés.
  2. D'incrémenter un compteur dans StatsD ou Prometheus pour visualiser le taux de dédoublonnement en temps réel sur vos dashboards Grafana.
  3. De créer un Stamp personnalisé DeduplicatedStamp pour garder une trace de l'historique du message au sein de l'enveloppe.

Conclusion

Le dédoublonnement par famille est une arme redoutable pour maintenir la réactivité de vos systèmes asynchrones. En déléguant la gestion de l'état de la file à Redis, vous libérez vos workers Symfony Messenger des tâches redondantes et assurez que vos données sont toujours traitées dans leur version la plus à jour.

Cette implémentation est facile à généraliser. Vous pouvez l'activer uniquement sur certains transports ou pour certains bus de messages spécifiques en modifiant votre messenger.yaml.

framework:
  messenger:
    buses:
      messenger.bus.default:
        middleware:
          - App\Bridge\Symfony\Messenger\Middleware\DeduplicateFamilyMiddleware

Note : Assurez-vous d'avoir l'extension PHP Redis (ou Predis) correctement configurée dans vos services Symfony pour que le middleware puisse injecter l'objet \Redis.

Photo d'Emmanuel BALLERY, fondateur de x10

À propos de l'auteur

Emmanuel BALLERY est le fondateur de x10. Expert en architecture logicielle et passionné par la qualité du code (Software Craftsmanship), il aide les entreprises à transformer leur dette technique en actifs durables.

Voir plus arrow_forward