Using CQRS with the Symfony Messenger

Article published: January 10, 2021

Foreword

I'm a big Fan of the Domain Driven Design Community and I try to break with traditional CRUD Apps more often. On this journey I learned how to split my actual business logic from the framework and infrastructure. For these purposes exist multiple patterns like the Hexagonal Architecture Pattern, Inversion of Control Principle and Command Query Responsibility Segregation. This article will focus on CQRS and its implementation, it will not cover the theory behind it. So it's recommended to have a basic understanding of this patterns.

Symfony Messenger

The Symfony Messenger Component is a standalone message bus. It's not Symfony exclusive and can be used in any other PHP environment. I'm using it because of the great integration with the symfony ecosystem and the middleware handling.

Classes which are dispatched by the Messenger are called messages. Services that process a message are called handlers. I will stick to this convention with a bit more context like EventHandler, CommandHandler or QueryHandler.

Getting Started

Requirements

For the example code I'm using

  • PHP 8
  • Symfony 5.2
  • Symfony CLI v4.21

Installation and basic configuration

First of all I'm creating a new Symfony (Flex) Project with the Symfony CLI and installing the messenger component

symfony new symfony-cqrs
composer require messenger

This installs the symfony messenger component and it's dependencies. In an Symfony Flex environment it also creates a basic configuration file under ./config/packages/messenger.yaml

framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        # failure_transport: failed
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
            # failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'
        routing:
            # Route your messages to the transports
            # 'App\Message\YourMessage': async

This basic configuration creates a single message bus with the default middleware. To separat commands, queries and events and to clear things up we will modify this configuration.

framework:
    messenger:
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands: ~
            messenger.bus.queries: ~
            messenger.bus.events:
                default_middleware: allow_no_handlers

What has changed? In this example project we will stick with basic synchronous execution, so I removed the transport and routing part. After the cleanup I created three different messenger bus configurations. At this point the configuration is very basic and provides 3 different buses for the different kinds of messages. An exception is the EventBus, the allow_no_handlers default middleware allows us to dispatch events in our business code without any handler which will handle it. This is useful if you're using an EventStore which dispatch automatically all new persisted events without the knowledge if those events are needed by any EventHandler.

Business Code Structure

To separate our business code from the framework we need a clear structure that helps to understand where each code belongs to. Each feature of the framework we are using on top of this application is located in the src folder and grouped by context. An example would be security. In the most cases I need a custom UserProvider which is coupled to the Symfony Framework, so its located under ./src/Security/UserProvider.php.

The business code is organized under a business folder who also sits in the src folder. In this example I will name it MyBusiness. It organizes the rest of our application and splits it into Bounded Contexts and Subdomains. Both are concepts of Domain Driven Design (in short DDD) and describe areas of the business. One technical subdomain will be Messenger to provide an internal API for our CQRS structure. This Subdomain provides the Domain code which we will use in the rest of our application and the Infrastructure code to integrate the Symfony Messenger.

src/
  MyBusiness/
    Messenger/
      Domain/
      Infrastructure/

Implementing the technical Subdomain "Messenger"

In the first place we need an API for the various kinds of messages inside our application.

// src/MyBusiness/Messenger/Domain/Command.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messaging\Domain;

interface Command
{
}
// src/MyBusiness/Messenger/Domain/Query.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messaging\Domain;

interface Query
{
}
// src/MyBusiness/Messenger/Domain/Event.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messaging\Domain;

interface Event
{
}

These are simple Interfaces without that don't require any Method. The Messenger can dispatch any kind of class. If you're starting to dispatch async Commands you have to be sure that your Command is serializable. With a clear separation of messages we continue with the related message buses.

// src/MyBusiness/Messenger/Domain/CommandBus.php

<?php

namespace App\MyBusiness\Messaging\Domain;

interface CommandBus
{
    public function dispatch(Command $command): void;
}
// src/MyBusiness/Messenger/Domain/QueryBus.php

<?php

namespace App\MyBusiness\Messaging\Domain;

interface QueryBus
{
    public function dispatch(Query $query): mixed;
}
// src/MyBusiness/Messenger/Domain/EventBus.php

<?php

namespace App\MyBusiness\Messaging\Domain;

interface EventBus
{
    public function dispatch(Event $event): void;
}

We are using Interfaces at this point to be as flexibel as possible. The Domain part should not have any dependency to your framework or infrastructure like databases. If you provide required services as interfaces you're using the inversion of control principle and this makes it much easier to test. You will be able to replace external services in your tests with simple implementation or stubs, without the need for complicated mocks.

You will also recognize that only the QueryBus provides a return value because it's the only message type to execute read actions. CommandBus and EventBus return void. This enforce that you get nothing back from a command or event and it makes it impossible to break with this rule.

Now we have to integrate the symfony messenger. This will happen in the Infrastructure part of your technical subdomain.

// src/MyBusiness/Messenger/Infrastructure/CommandBus.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messenger\Infrastructure;

use App\MyBusiness\Messenger\Domain\Command;
use App\MyBusiness\Messenger\Domain\CommandBus as DomainCommandBus;
use Symfony\Component\Messenger\MessageBusInterface;

class CommandBus implements DomainCommandBus
{
    public function __construct(private MessageBusInterface $commandMessageBus) {}

    public function dispatch(Command $command): void
    {
        $this->commandMessageBus->dispatch($command);
    }
}
// src/MyBusiness/Messenger/Infrastructure/QueryBus.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messenger\Infrastructure;

use App\MyBusiness\Messenger\Domain\Query;
use App\MyBusiness\Messenger\Domain\QueryBus as DomainQueryBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

class QueryBus implements DomainQueryBus
{
    public function __construct(private MessageBusInterface $queryMessageBus) {}

    public function dispatch(Query $query): mixed
    {
        /** @var HandledStamp|null $stamp */
        $stamp = $this->queryMessageBus
            ->dispatch($query)
            ->last(HandledStamp::class);

        if (null === $stamp) {
            return null;
        }

        return $stamp->getResult();
    }
}
// src/MyBusiness/Messenger/Infrastructure/CommandBus.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Messenger\Infrastructure;

use App\MyBusiness\Messenger\Domain\Event;
use App\MyBusiness\Messenger\Domain\EventBus as DomainEventBus;
use Symfony\Component\Messenger\MessageBusInterface;

class EventBus implements DomainEventBus
{
    public function __construct(private MessageBusInterface $eventMessageBus) {}

    public function dispatch(Event $event): void
    {
        $this->eventMessageBus->dispatch($event);
    }
}

If you are surprised about the constructor syntax and the mixed return type. Both are new features in PHP8.

With these services we combine our interfaces with the real implementation. You can see that I'm not a fan of suffixes like CommandBusInterface. This is the reason why I have to use an Alias for my domain interfaces. This is a personal thing and you can handle it your way.

After the implementation we have to configure it as symfony services.

// config/services.yaml

parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    App\:
        resource: '../src/'
        exclude:
            - '../src/DependencyInjection/'
            - '../src/Entity/'
            - '../src/Kernel.php'
            - '../src/Tests/'
            - '../src/MyBusiness/'

    App\Controller\:
        resource: '../src/Controller/'
        tags: ['controller.service_arguments']

    App\MyBusiness\Messenger\Infrastructure\CommandBus:
        arguments: ['@messenger.bus.commands']

    App\MyBusiness\Messenger\Infrastructure\QueryBus:
        arguments: ['@messenger.bus.queries']

    App\MyBusiness\Messenger\Infrastructure\EventBus:
        arguments: ['@messenger.bus.events']

We exclude the MyBusiness namespace from the auto configuration of the src folder. So it does not convert every class we define in the domain logic as service. Next we configure the implementation of each message bus with the related symfony messenger we defined previously in the messenger.yaml file.

Now that the Services are in place we can create some example logic to work with. We create a new subdomain called Example. In DDD message handlers are also called application services because they orchestrate your domain logic by injecting domain services and call them as expected for a given command or query. To address this in the structure I create a Application folder under the Example subdomain for our Handlers. Events are part of the domain logic and in the most cases are located in the Domain folder. The location of EventHandler depends on the use case and logic. In the example the handler is also located in the Domain.

src/
  MyBusiness/
    Messenger/
      Domain
      Infrastructure/
    Example/
      Application/
        Command/
          ExecuteCommand.php
        CommandHandler/
          ExecuteCommandHandler.php
        Query/
          ExecuteQuery.php
        QueryHandler/
          ExecuteQueryHandler.php
      Domain/
        Event/
          CommandWasExecuted.php
        EventHandler/
          CommandWasExecutedHandler.php

All types of messages have no logic and only hold information if necessary to execute the given task. The naming should explain what they do. The naming of an Event should explain what happened. A basic Command and CommandHandler look like this:

// src/MyBusiness/Example/Application/Command/ExecuteCommand.php

<?php

declare(strict_types=1);

namespace App\MyBusiness\Example\Application\Command;

use App\MyBusiness\Messenger\Domain\Command;

class ExecuteCommand implements Command
{
}
// src/MyBusiness/Example/Application/CommandHandler/ExecuteCommandHandler.php

<?php

namespace App\MyBusiness\Example\Application\CommandHandler;

use App\MyBusiness\Example\Application\Command\ExecuteCommand;
use App\MyBusiness\Example\Domain\Event\CommandWasExecuted;
use App\MyBusiness\Messenger\Domain\EventBus;

class ExecuteCommandHandler
{
    public function __construct(private EventBus $eventBus) {}
   
    public function __invoke(ExecuteCommand $command): void
    {
        $this->eventBus->dispatch(new CommandWasExecuted());
    }
}

You see that we don't process any information, so the Command is an empty class and only the name explains what it does. The CommandHandler expects its related Command as argument in its __invoke method. This is also required if you don't use the Command in the CommandHandler because it's the way Symfony finds out which Command belongs to which CommandHandler. The same is true for Queries and Events and their related handlers.

You also see that we don't use any service from the infrastructure as dependencies in our application code. We are using the Interface from the domain.

The last things to make this work are configuration steps to be able to use our interfaces and to relate our message handlers to the related message buses.

// config/services.yaml

parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true
    App\:
        resource: '../src/'
        exclude:
            - '../src/DependencyInjection/'
            - '../src/Entity/'
            - '../src/Kernel.php'
            - '../src/Tests/'
            - '../src/MyBusiness/'

    App\Controller\:
        resource: '../src/Controller/'
        tags: ['controller.service_arguments']

    App\MyBusiness\Messenger\Infrastructure\CommandBus:
        arguments: ['@messenger.bus.commands']

    App\MyBusiness\Messenger\Infrastructure\QueryBus:
        arguments: ['@messenger.bus.queries']

    App\MyBusiness\Messenger\Infrastructure\EventBus:
        arguments: ['@messenger.bus.events']

    App\MyBusiness\Messenger\Domain\EventBus:
        alias: 'App\MyBusiness\Messenger\Infrastructure\EventBus'

    App\MyBusiness\Messenger\Domain\CommandBus:
        alias: 'App\MyBusiness\Messenger\Infrastructure\CommandBus'

    App\MyBusiness\Messenger\Domain\QueryBus:
        alias: 'App\MyBusiness\Messenger\Infrastructure\QueryBus'

    App\MyBusiness\Example\Application\CommandHandler\:
        resource: '../src/MyBusiness/Example/Application/CommandHandler'
        tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.commands' }]

    App\MyBusiness\Example\Application\QueryHandler\:
        resource: '../src/MyBusiness/Example/Application/QueryHandler'
        tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.queries' }]

    App\MyBusiness\Example\Domain\EventHandler\:
        resource: '../src/MyBusiness/Example/Domain/EventHandler'
        tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.events' }]

To work with our interfaces instead of the implementation we define each of them as alias. Then we configure our MessageHandler.

Important is the tags section. The name tells Symfony that all classes in this folder are MessageHandler, the bus tells Symfony which message bus it has to use. So if you're using the wrong Handler for a message nothing will happen.

You can use the following CLI command to check that the configuration works:

bin/console debug:messenger

Messenger
=========
messenger.bus.commands
----------------------
 The following messages can be dispatched:
 ---------------------------------------------------------------------------------------- 
  App\MyBusiness\Example\Application\Command\ExecuteCommand                               
      handled by App\MyBusiness\Example\Application\CommandHandler\ExecuteCommandHandler  
                                                                                          
 ---------------------------------------------------------------------------------------- 
messenger.bus.queries
---------------------
 The following messages can be dispatched:
 ------------------------------------------------------------------------------------ 
  App\MyBusiness\Example\Application\Query\ExecuteQuery                               
      handled by App\MyBusiness\Example\Application\QueryHandler\ExecuteQueryHandler  
                                                                                      
 ------------------------------------------------------------------------------------ 
messenger.bus.events
--------------------
 The following messages can be dispatched:
 ------------------------------------------------------------------------------------- 
  App\MyBusiness\Example\Domain\Event\CommandWasExecuted                               
      handled by App\MyBusiness\Example\Domain\EventHandler\CommandWasExecutedHandler  
                                                                                       
 -------------------------------------------------------------------------------------

That's it! Isn't it? No, there is one thing left you should be aware of and my way of handling it.

Exceptions

When you throw a custom exception within your handler and try to catch this exact exception around your dispatch, this will not gonna work. When an exception occurs in an MessageHandler Symfony will wrap this Exception in an HandlerFailedException. To get what we expect in our application logic and to remove a Symfony Messenger dependency I created a small messenger middleware which unwraps this exceptions.

Because this middleware belongs to the Framework it's located in the src/Messenger directory.

// src/Messenger/ExceptionMiddleware.php

<?php

declare(strict_types=1);

namespace App\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

class ExceptionMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        try {
            $envelope = $stack->next()->handle($envelope, $stack);
        } catch (HandlerFailedException $exception) {
            throw $exception->getPrevious() ?? $exception;
        }

        return $envelope;
    }
}

This middleware simply unwraps the HandlerFailedException exception and throws the underlying Exception if one exists. To use this middleware we have to add it in the messenger.yaml configuration.

// config/packages/messenger.yaml

framework:
    messenger:
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands:
                middleware:
                    - 'App\Messenger\ExceptionMiddleware'
            messenger.bus.queries:
                middleware:
                    - 'App\Messenger\ExceptionMiddleware'
            messenger.bus.events:
                default_middleware: allow_no_handlers
                middleware:
                    - 'App\Messenger\ExceptionMiddleware'

Conclusion

That's it! Now we can start with the real business logic and focus on our app. You will use your commands and queries to interact with your business logic on Endpoints like HTTP Actions or CLI Commands. Inside of our business domain we are totally independent from Symfony and the Symfony Messenger. We can swap it out or replace it in our UnitTests by calling the Handler directly or using a stub implementation. We used the Inversion Of Control Principle to make that happen. We also created a Hexagonal Architecture by splitting up our Project in a Framework-, Infrastructure-, Application- and Domain Layer.

I hope this Article helps you to find your way of implementing CQRS in Symfony or adopt some principles for your next project. Feedback is welcome.

All Example Code shown in this Article is available on Github: https://github.com/fjogeleit/symfony-cqrs