backend, Feb 9, 20232 min read

Migrating messenger queue from Doctrine to a different transport

When using Messenger in a Symfony application, Doctrine is often the clear choice, as everything is already configured and ready to be used. And as great as it can work for small queues and configuration, it is often mandatory to switch to a dedicated service, such as RabbitMQ or any other tool. 

But when this change happens, the current queue used on the production server may not be empty.

Custom error code: Queue not empty

Indeed, it may appear that when you want to deploy your brand new shiny messenger configuration using something other than Doctrine, your current queue is far from empty. It can even contain millions of messages, patiently waiting to be consumed eventually.

It is up to you (and your business requirements) to simply ignore them and purge it all (you said flamethrower?). Or create some debug consumers still using Doctrine to finally remove them one day (or another…). Or you could try and find a way to migrate them to that new consumer. And that’s the path we chose here.

Digging into Messenger

What came to our mind was quite simple: check how Messenger fetches messages from Doctrine, and mimics this behavior to republish messages into the new configuration. And that’s what we ended up doing. And it appeared to be a very simple process. Messenger takes messages from Doctrine and uses a Symfony serializer to decode them. It all happens inside `Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceiver`

Copying the behavior

We then decided that this was simple enough to be inside a single debug process as we would only need that once in our project (hopefully?)
And this is the command we came up with:

<?php

declare(strict_types=1);

namespace App\Command\Debug;

use App\Command\AbstractCommand;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class MigrateDoctrineMessagesCommand extends AbstractCommand
{
   protected static $defaultName = 'app:debug:migrate-doctrine-messages';

   public function __construct(
       private readonly EntityManagerInterface $entityManager,
       private readonly SerializerInterface $serializer,
       private readonly MessageBusInterface $messageBus,
       string $name = null
   ) {
       parent::__construct($name);
   }

   public function execute(InputInterface $input, OutputInterface $output): int
   {
       $connection = $this->entityManager->getConnection();
       $statement = $connection->prepare('SELECT * FROM messenger_messages');
       $messages = $statement->executeQuery()->fetchAllAssociative();
       foreach ($messages as $message) {
           $envelope = $this->serializer->decode([
               'body' => $message['body'],
           ]);

           $this->messageBus->dispatch($envelope);

           $deleteMessagesStmt = $connection->prepare('DELETE FROM messenger_messages WHERE id = :id');
           $deleteMessagesStmt->executeQuery(['id' => (int) $message['id']]);
       }

       return Command::SUCCESS;
   }
}

As you can see, the process is very simple. We manually fetch messages from Database using Doctrine.
Then we convert each message body (we do not need headers here since it is all already in the body) and dispatch a new message from this one as the decode function provides us an Envelope ready for sending.

Simple, isn’t it?

The last step is simply to remove that message from the Messenger queue in Doctrine and jump to the next message.

Obviously, this command could be optimized; it could work in batch processing, have some checks, and try to catch it to make sure it runs smoothly. But our main goal was to migrate the queue, and we wanted the process to fail quite hard if anything went wrong.

Conclusion

And voila! Our messages are now inserted into our new queue handler with the same stamps as previously, ready to be happily consumed.





You liked this? Give Stephane a .

337