Novedades en Symfony 8.1: Mejoras en Messenger

Novedades en Symfony 8.1: Mejoras en Messenger

  • Ruben
Novedades en Symfony 8.1: Mejoras en Messenger

El componente Symfony Messenger continúa evolucionando en Symfony 8.1 con mejoras en la ejecución de workers, comportamiento de transportes, serialización y manejo de fallos.

Recuperación de Mensajes por Lotes
En versiones anteriores de Symfony, los workers recuperaban un mensaje a la vez del transporte, requiriendo un viaje de ida y vuelta por mensaje. Symfony 8.1 añade la opción `--fetch-size` al comando `messenger:consume` para que los workers puedan solicitar múltiples mensajes en una sola llamada:

php bin/console messenger:consume async --fetch-size=8

Los transportes usan esta sugerencia cuando su protocolo subyacente lo soporta (Amazon SQS hasta `10` mensajes por llamada `ReceiveMessage`, Redis `XREADGROUP COUNT`, Doctrine `LIMIT`, AMQP `basic_get` repetido)

Intervalo de Restablecimiento de Servicios Configurable
Por defecto, `messenger:consume` restablece los servicios después de cada mensaje para evitar fugas de estado entre handlers. La opción `--no-reset` deshabilita los restablecimientos por completo, lo que mejora el rendimiento pero puede causar problemas de memoria o estado en workers de larga duración.

Symfony 8.1 introduce un punto intermedio permitiendo un valor entero para `--no-reset`. Los servicios se restablecen cada `N` mensajes en lugar de después de cada mensaje:

# por defecto: restablece servicios después de cada mensaje
php bin/console messenger:consume async
# restablece servicios cada 100 mensajes (nuevo en 8.1)
php bin/console messenger:consume async --no-reset=100

Nombre de Tipo Serializado Personalizado para Mensajes
Cuando las aplicaciones intercambian mensajes a través de un broker, deben acordar cómo se codifican los mensajes. El transporte `Serializer` de Symfony actualmente almacena el nombre de clase completamente cualificado de PHP en un encabezado `type`, pero ese valor a menudo no es útil para consumidores que no usan Symfony, o incluso para aplicaciones Symfony que usan espacios de nombres diferentes.
Symfony 8.1 añade un argumento `serializedTypeName` al atributo `#[AsMessage]`. Su valor se usa como el encabezado `type` en lugar del FQCN:

namespace App\Crawler\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage(serializedTypeName: 'crawler.vectorization_finished')]
final readonly class VectorizationFinished
{
    public function __construct(
        public string $crawlId,
    ) {
    }
}

Prioridad por Mensaje en AMQP
RabbitMQ soporta prioridades de mensaje a través de la propiedad `priority`, permitiendo que mensajes de mayor prioridad adelanten a los de menor prioridad dentro de la misma cola. Symfony 8.1 expone esto a través de un nuevo `AmqpPriorityStamp` para el transporte AMQP, similar al `BeanstalkdPriorityStamp` existente:

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpPriorityStamp;
$bus->dispatch($message, [new AmqpPriorityStamp(5)]);

El soporte de prioridad por mensaje está intencionalmente limitado a AMQP porque transportes como Redis, Doctrine o SQS no soportan prioridades nativas. Para enrutamiento de prioridad entre transportes, configura colas separadas y consúmelas en orden de prioridad:

$ php bin/console messenger:consume high_priority low_priority

Tiempo de Inactividad para Handlers por Lotes
El `BatchHandlerTrait` permite que un solo handler procese mensajes en lotes una vez que se alcanza el tamaño de lote configurado. Sin embargo, en escenarios de bajo rendimiento, los lotes pueden tardar mucho en llenarse, dejando mensajes almacenados en búfer en memoria.
Symfony 8.1 añade un método opcional `getIdleTimeout()` al trait. Cuando el worker permanece inactivo durante el tiempo configurado, el lote actual se vacía incluso si no está completo:

use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
class IndexProductsHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;
    // vacía lotes parciales después de 5 segundos de inactividad
    private function getIdleTimeout(): ?float
    {
        return 5.0;
    }
    private function shouldFlush(): bool
    {
        return 100 <= \count($this->jobs);
    }
    // ...
}

PostgreSQL LISTEN/NOTIFY No Bloqueante
El transporte Doctrine utiliza la funcionalidad LISTEN/NOTIFY de PostgreSQL para despertar a los workers tan pronto como llegan nuevos mensajes, en lugar de depender de polling.
Antes de Symfony 8.1, la llamada `LISTEN` bloqueante ocurría dentro del propio transporte. Al consumir de varias colas PostgreSQL con prioridades, el worker podía bloquearse en la primera cola y nunca revisar las demás.
Symfony 8.1 mueve la espera bloqueante de `LISTEN/NOTIFY` a un subscriber de worker dedicado que se activa en el evento *idle*. El bucle del worker ahora revisa todos los transportes en orden de prioridad en cada iteración y solo se bloquea en `NOTIFY` después de que todas las colas se encuentran vacías.
Este cambio es completamente transparente. Las aplicaciones existentes que usan el transporte Doctrine con PostgreSQL se benefician automáticamente.

Fallos de Decodificación Enrutados a través del Manejo de Fallos
Contribuido por Nicolas Grekas en [#62888](https://github.com/symfony/symfony/pull/62888)
Anteriormente, cuando Messenger no podía decodificar un mensaje, por ejemplo porque su clase ya no existía después de una refactorización, el transporte lanzaba una `MessageDecodingFailedException` y descartaba silenciosamente el mensaje de la cola.
Symfony 8.1 ahora enruta los fallos de decodificación a través del pipeline normal de manejo de fallos. El mensaje permanece reconocible, se envuelve en un envelope `MessageDecodingFailedException` y pasa por los transportes de reintento y fallo configurados como cualquier otro mensaje fallido.
Un nuevo `DecodeFailedMessageMiddleware` reintenta la decodificación en cada intento de reintento. Si se despliega una corrección más tarde, como restaurar una clase faltante o actualizar un serializador, el mensaje puede decodificarse exitosamente en un intento posterior y continuar a través del bus con todos los stamps originales preservados.

Receptor Redis Listable
El `ListableReceiverInterface` permite que herramientas de monitorización y bundles como zenstruck/messenger-monitor-bundle inspeccionen mensajes pendientes sin consumirlos. El transporte Doctrine ya implementaba esta interfaz; Symfony 8.1 añade el mismo soporte al transporte Redis.
`RedisReceiver` ahora expone los métodos `all()` y `find()` respaldados por comandos Redis Stream `XRANGE`:

// listar todos los mensajes pendientes en el transporte
$envelopes = $receiver->all();
// encontrar un mensaje específico por ID de stream de Redis
$envelope = $receiver->find('1700000000000-0');

Forzar Cluster Redis vía DSN
En versiones anteriores de Symfony, configurar un transporte Redis Cluster requería listar cada nodo del cluster en el DSN. Este enfoque es frágil cuando los nodos del cluster cambian e imposible cuando solo se expone un endpoint balanceado.
Symfony 8.1 añade una opción `redis_cluster=true` al DSN del transporte Redis. Cuando está habilitada, el cliente Redis se conecta en modo cluster a través de un único endpoint y confía en el descubrimiento del cluster para los nodos restantes:


# antes: enumerar cada nodo del cluster
MESSENGER_TRANSPORT_DSN=redis://redis-0:6379,redis://redis-1:6379
# después: un solo endpoint con redis_cluster=true
MESSENGER_TRANSPORT_DSN=redis://redis-cluster:6379?redis_cluster=true

Colas de Quórum Retrasadas para AMQP
El transporte AMQP utiliza colas de *retraso* dedicadas para diferir la entrega de mensajes. Con las colas de quórum de RabbitMQ, estas colas de retraso requieren un argumento `x-expires` explícito para que el broker eventualmente las elimine, pero el valor debe sobrevivir a todos los mensajes retrasados aún almacenados en la cola.
Symfony 8.1 reescribe la estrategia de colas de retraso para colas de quórum para usar una cola por día, con una expiración establecida en `1 día + retraso + 10 segundos`. Los mensajes programados para el mismo día calendario ahora comparten una cola, y se garantiza que esa cola sobreviva el tiempo suficiente para que el mensaje más lento sea entregado.
Esto soluciona el problema existente desde hace tiempo donde las colas de quórum retrasadas podían expirar antes de que sus mensajes vencieran.

Deshabilitar Enlace de Cola AMQP por Defecto
Al declarar un transporte AMQP sin listar colas explícitamente, `messenger:setup-transports` crea una cola `messages` por defecto y la enlaza al exchange configurado. Esto es útil para consumidores pero innecesario para transportes de solo escritura que solo publican en un exchange.
Symfony 8.1 permite deshabilitar la creación de cola por defecto estableciendo `queues` a `[]` o `false`:
framework:
    messenger:
        transports:
            # transporte de solo escritura: no se declara ni enlaza ninguna cola
            outgoing_events:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: 'EVENTS'
                        type: fanout
                    queues: false

Liberar Bloqueo de Desduplicación en Fallo Definitivo
Symfony 7.3 introdujo el middleware de desduplicación, que adquiere un bloqueo claveado por un `DeduplicateStamp` para que los envíos duplicados del mismo mensaje se omitan. El bloqueo se libera después de que el mensaje se maneja exitosamente, o se mantiene hasta la expiración (300 segundos por defecto) mientras el mensaje sigue fallando.
En Symfony 8.1, cuando un mensaje falla definitivamente, es decir, la estrategia de reintento se rinde y el mensaje se mueve al transporte de fallos, el bloqueo de desduplicación se libera inmediatamente. Los nuevos envíos que usan la misma clave pueden entonces entrar a la cola de inmediato en lugar de ser omitidos hasta que el TTL expire.

New in Symfony 8.1: Messenger Improvements

Artículo original: https://symfony.com/blog/new-in-symfony-8-1-messenger-improvements
  • Desarrollo
  • Symfony
  • PHP
Deja un comentario: