Using CQRS with the Symfony Messenger
Foreword
I'm a big Fan of the Domain Driven Design Community and I try to break with traditional CRUD Apps more often. On this journey I learned how to split my actual business logic from the framework and infrastructure. For these purposes exist multiple patterns like the Hexagonal Architecture Pattern, Inversion of Control Principle and Command Query Responsibility Segregation. This article will focus on CQRS and its implementation, it will not cover the theory behind it. So it's recommended to have a basic understanding of this patterns.
Symfony Messenger
The Symfony Messenger Component is a standalone message bus. It's not Symfony exclusive and can be used in any other PHP environment. I'm using it because of the great integration with the symfony ecosystem and the middleware handling.
Classes which are dispatched by the Messenger are called messages. Services that process a message are called handlers. I will stick to this convention with a bit more context like EventHandler, CommandHandler or QueryHandler.
Getting Started
Requirements
For the example code I'm using
- PHP 8
- Symfony 5.2
- Symfony CLI v4.21
Installation and basic configuration
First of all I'm creating a new Symfony (Flex) Project with the Symfony CLI and installing the messenger component
symfony new symfony-cqrs
composer require messenger
This installs the symfony messenger component and it's dependencies. In an Symfony Flex environment it also creates a basic configuration file under ./config/packages/messenger.yaml
framework:
messenger:
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
# failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
# async: '%env(MESSENGER_TRANSPORT_DSN)%'
# failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
# Route your messages to the transports
# 'App\Message\YourMessage': async
This basic configuration creates a single message bus with the default middleware. To separat commands, queries and events and to clear things up we will modify this configuration.
framework:
messenger:
default_bus: messenger.bus.commands
buses:
messenger.bus.commands: ~
messenger.bus.queries: ~
messenger.bus.events:
default_middleware: allow_no_handlers
What has changed? In this example project we will stick with basic synchronous execution, so I removed the transport and routing part. After the cleanup I created three different messenger bus configurations. At this point the configuration is very basic and provides 3 different buses for the different kinds of messages. An exception is the EventBus, the allow_no_handlers
default middleware allows us to dispatch events in our business code without any handler which will handle it. This is useful if you're using an EventStore which dispatch automatically all new persisted events without the knowledge if those events are needed by any EventHandler.
Business Code Structure
To separate our business code from the framework we need a clear structure that helps to understand where each code belongs to. Each feature of the framework we are using on top of this application is located in the src folder and grouped by context. An example would be security. In the most cases I need a custom UserProvider which is coupled to the Symfony Framework, so its located under ./src/Security/UserProvider.php
.
The business code is organized under a business folder who also sits in the src
folder. In this example I will name it MyBusiness. It organizes the rest of our application and splits it into Bounded Contexts and Subdomains. Both are concepts of Domain Driven Design (in short DDD) and describe areas of the business. One technical subdomain will be Messenger to provide an internal API for our CQRS structure. This Subdomain provides the Domain code which we will use in the rest of our application and the Infrastructure code to integrate the Symfony Messenger.
src/
MyBusiness/
Messenger/
Domain/
Infrastructure/
Implementing the technical Subdomain "Messenger"
In the first place we need an API for the various kinds of messages inside our application.
// src/MyBusiness/Messenger/Domain/Command.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messaging\Domain;
interface Command
{
}
// src/MyBusiness/Messenger/Domain/Query.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messaging\Domain;
interface Query
{
}
// src/MyBusiness/Messenger/Domain/Event.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messaging\Domain;
interface Event
{
}
These are simple Interfaces without that don't require any Method. The Messenger can dispatch any kind of class. If you're starting to dispatch async Commands you have to be sure that your Command is serializable. With a clear separation of messages we continue with the related message buses.
// src/MyBusiness/Messenger/Domain/CommandBus.php
<?php
namespace App\MyBusiness\Messaging\Domain;
interface CommandBus
{
public function dispatch(Command $command): void;
}
// src/MyBusiness/Messenger/Domain/QueryBus.php
<?php
namespace App\MyBusiness\Messaging\Domain;
interface QueryBus
{
public function dispatch(Query $query): mixed;
}
// src/MyBusiness/Messenger/Domain/EventBus.php
<?php
namespace App\MyBusiness\Messaging\Domain;
interface EventBus
{
public function dispatch(Event $event): void;
}
We are using Interfaces at this point to be as flexibel as possible. The Domain part should not have any dependency to your framework or infrastructure like databases. If you provide required services as interfaces you're using the inversion of control principle and this makes it much easier to test. You will be able to replace external services in your tests with simple implementation or stubs, without the need for complicated mocks.
You will also recognize that only the QueryBus provides a return value because it's the only message type to execute read actions. CommandBus and EventBus return void. This enforce that you get nothing back from a command or event and it makes it impossible to break with this rule.
Now we have to integrate the symfony messenger. This will happen in the Infrastructure part of your technical subdomain.
// src/MyBusiness/Messenger/Infrastructure/CommandBus.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messenger\Infrastructure;
use App\MyBusiness\Messenger\Domain\Command;
use App\MyBusiness\Messenger\Domain\CommandBus as DomainCommandBus;
use Symfony\Component\Messenger\MessageBusInterface;
class CommandBus implements DomainCommandBus
{
public function __construct(private MessageBusInterface $commandMessageBus) {}
public function dispatch(Command $command): void
{
$this->commandMessageBus->dispatch($command);
}
}
// src/MyBusiness/Messenger/Infrastructure/QueryBus.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messenger\Infrastructure;
use App\MyBusiness\Messenger\Domain\Query;
use App\MyBusiness\Messenger\Domain\QueryBus as DomainQueryBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
class QueryBus implements DomainQueryBus
{
public function __construct(private MessageBusInterface $queryMessageBus) {}
public function dispatch(Query $query): mixed
{
/** @var HandledStamp|null $stamp */
$stamp = $this->queryMessageBus
->dispatch($query)
->last(HandledStamp::class);
if (null === $stamp) {
return null;
}
return $stamp->getResult();
}
}
// src/MyBusiness/Messenger/Infrastructure/CommandBus.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Messenger\Infrastructure;
use App\MyBusiness\Messenger\Domain\Event;
use App\MyBusiness\Messenger\Domain\EventBus as DomainEventBus;
use Symfony\Component\Messenger\MessageBusInterface;
class EventBus implements DomainEventBus
{
public function __construct(private MessageBusInterface $eventMessageBus) {}
public function dispatch(Event $event): void
{
$this->eventMessageBus->dispatch($event);
}
}
If you are surprised about the constructor syntax and the mixed return type. Both are new features in PHP8.
With these services we combine our interfaces with the real implementation. You can see that I'm not a fan of suffixes like CommandBusInterface. This is the reason why I have to use an Alias for my domain interfaces. This is a personal thing and you can handle it your way.
After the implementation we have to configure it as symfony services.
// config/services.yaml
parameters:
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/'
exclude:
- '../src/DependencyInjection/'
- '../src/Entity/'
- '../src/Kernel.php'
- '../src/Tests/'
- '../src/MyBusiness/'
App\Controller\:
resource: '../src/Controller/'
tags: ['controller.service_arguments']
App\MyBusiness\Messenger\Infrastructure\CommandBus:
arguments: ['@messenger.bus.commands']
App\MyBusiness\Messenger\Infrastructure\QueryBus:
arguments: ['@messenger.bus.queries']
App\MyBusiness\Messenger\Infrastructure\EventBus:
arguments: ['@messenger.bus.events']
We exclude the MyBusiness
namespace from the auto configuration of the src folder. So it does not convert every class we define in the domain logic as service. Next we configure the implementation of each message bus with the related symfony messenger we defined previously in the messenger.yaml
file.
Now that the Services are in place we can create some example logic to work with. We create a new subdomain called Example. In DDD message handlers are also called application services because they orchestrate your domain logic by injecting domain services and call them as expected for a given command or query. To address this in the structure I create a Application folder under the Example subdomain for our Handlers. Events are part of the domain logic and in the most cases are located in the Domain folder. The location of EventHandler depends on the use case and logic. In the example the handler is also located in the Domain.
src/
MyBusiness/
Messenger/
Domain
Infrastructure/
Example/
Application/
Command/
ExecuteCommand.php
CommandHandler/
ExecuteCommandHandler.php
Query/
ExecuteQuery.php
QueryHandler/
ExecuteQueryHandler.php
Domain/
Event/
CommandWasExecuted.php
EventHandler/
CommandWasExecutedHandler.php
All types of messages have no logic and only hold information if necessary to execute the given task. The naming should explain what they do. The naming of an Event should explain what happened. A basic Command and CommandHandler look like this:
// src/MyBusiness/Example/Application/Command/ExecuteCommand.php
<?php
declare(strict_types=1);
namespace App\MyBusiness\Example\Application\Command;
use App\MyBusiness\Messenger\Domain\Command;
class ExecuteCommand implements Command
{
}
// src/MyBusiness/Example/Application/CommandHandler/ExecuteCommandHandler.php
<?php
namespace App\MyBusiness\Example\Application\CommandHandler;
use App\MyBusiness\Example\Application\Command\ExecuteCommand;
use App\MyBusiness\Example\Domain\Event\CommandWasExecuted;
use App\MyBusiness\Messenger\Domain\EventBus;
class ExecuteCommandHandler
{
public function __construct(private EventBus $eventBus) {}
public function __invoke(ExecuteCommand $command): void
{
$this->eventBus->dispatch(new CommandWasExecuted());
}
}
You see that we don't process any information, so the Command is an empty class and only the name explains what it does. The CommandHandler expects its related Command as argument in its __invoke
method. This is also required if you don't use the Command in the CommandHandler because it's the way Symfony finds out which Command belongs to which CommandHandler. The same is true for Queries and Events and their related handlers.
You also see that we don't use any service from the infrastructure as dependencies in our application code. We are using the Interface from the domain.
The last things to make this work are configuration steps to be able to use our interfaces and to relate our message handlers to the related message buses.
// config/services.yaml
parameters:
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/'
exclude:
- '../src/DependencyInjection/'
- '../src/Entity/'
- '../src/Kernel.php'
- '../src/Tests/'
- '../src/MyBusiness/'
App\Controller\:
resource: '../src/Controller/'
tags: ['controller.service_arguments']
App\MyBusiness\Messenger\Infrastructure\CommandBus:
arguments: ['@messenger.bus.commands']
App\MyBusiness\Messenger\Infrastructure\QueryBus:
arguments: ['@messenger.bus.queries']
App\MyBusiness\Messenger\Infrastructure\EventBus:
arguments: ['@messenger.bus.events']
App\MyBusiness\Messenger\Domain\EventBus:
alias: 'App\MyBusiness\Messenger\Infrastructure\EventBus'
App\MyBusiness\Messenger\Domain\CommandBus:
alias: 'App\MyBusiness\Messenger\Infrastructure\CommandBus'
App\MyBusiness\Messenger\Domain\QueryBus:
alias: 'App\MyBusiness\Messenger\Infrastructure\QueryBus'
App\MyBusiness\Example\Application\CommandHandler\:
resource: '../src/MyBusiness/Example/Application/CommandHandler'
tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.commands' }]
App\MyBusiness\Example\Application\QueryHandler\:
resource: '../src/MyBusiness/Example/Application/QueryHandler'
tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.queries' }]
App\MyBusiness\Example\Domain\EventHandler\:
resource: '../src/MyBusiness/Example/Domain/EventHandler'
tags: [{ name: 'messenger.message_handler', bus: 'messenger.bus.events' }]
To work with our interfaces instead of the implementation we define each of them as alias. Then we configure our MessageHandler.
Important is the tags
section. The name
tells Symfony that all classes in this folder are MessageHandler, the bus tells Symfony which message bus it has to use. So if you're using the wrong Handler for a message nothing will happen.
You can use the following CLI command to check that the configuration works:
bin/console debug:messenger
Messenger
=========
messenger.bus.commands
----------------------
The following messages can be dispatched:
----------------------------------------------------------------------------------------
App\MyBusiness\Example\Application\Command\ExecuteCommand
handled by App\MyBusiness\Example\Application\CommandHandler\ExecuteCommandHandler
----------------------------------------------------------------------------------------
messenger.bus.queries
---------------------
The following messages can be dispatched:
------------------------------------------------------------------------------------
App\MyBusiness\Example\Application\Query\ExecuteQuery
handled by App\MyBusiness\Example\Application\QueryHandler\ExecuteQueryHandler
------------------------------------------------------------------------------------
messenger.bus.events
--------------------
The following messages can be dispatched:
-------------------------------------------------------------------------------------
App\MyBusiness\Example\Domain\Event\CommandWasExecuted
handled by App\MyBusiness\Example\Domain\EventHandler\CommandWasExecutedHandler
-------------------------------------------------------------------------------------
That's it! Isn't it? No, there is one thing left you should be aware of and my way of handling it.
Exceptions
When you throw a custom exception within your handler and try to catch this exact exception around your dispatch, this will not gonna work. When an exception occurs in an MessageHandler Symfony will wrap this Exception in an HandlerFailedException. To get what we expect in our application logic and to remove a Symfony Messenger dependency I created a small messenger middleware which unwraps this exceptions.
Because this middleware belongs to the Framework it's located in the src/Messenger
directory.
// src/Messenger/ExceptionMiddleware.php
<?php
declare(strict_types=1);
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
class ExceptionMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
try {
$envelope = $stack->next()->handle($envelope, $stack);
} catch (HandlerFailedException $exception) {
throw $exception->getPrevious() ?? $exception;
}
return $envelope;
}
}
This middleware simply unwraps the HandlerFailedException exception and throws the underlying Exception if one exists. To use this middleware we have to add it in the messenger.yaml
configuration.
// config/packages/messenger.yaml
framework:
messenger:
default_bus: messenger.bus.commands
buses:
messenger.bus.commands:
middleware:
- 'App\Messenger\ExceptionMiddleware'
messenger.bus.queries:
middleware:
- 'App\Messenger\ExceptionMiddleware'
messenger.bus.events:
default_middleware: allow_no_handlers
middleware:
- 'App\Messenger\ExceptionMiddleware'
Conclusion
That's it! Now we can start with the real business logic and focus on our app. You will use your commands and queries to interact with your business logic on Endpoints like HTTP Actions or CLI Commands. Inside of our business domain we are totally independent from Symfony and the Symfony Messenger. We can swap it out or replace it in our UnitTests by calling the Handler directly or using a stub implementation. We used the Inversion Of Control Principle to make that happen. We also created a Hexagonal Architecture by splitting up our Project in a Framework-, Infrastructure-, Application- and Domain Layer.
I hope this Article helps you to find your way of implementing CQRS in Symfony or adopt some principles for your next project. Feedback is welcome.
All Example Code shown in this Article is available on Github: https://github.com/fjogeleit/symfony-cqrs