CQRS v Symfony 8
Co je CQRS?
CQRS vychází z jednoho přímočarého pozorování: model, který slouží k zápisu dat, nemusí být tentýž model, který slouží k jejich čtení. CQRS (Command Query Responsibility Segregation) tento princip povyšuje na architektonický vzor – formálně popsaný Gregem Youngem [1] jako rozšíření Command-Query Separation (CQS) od Bertranda Meyera [2].
V tradičních aplikacích používáme jednu entitu (např. Doctrine ORM entity)
pro obojí - vytváříme objednávku i zobrazujeme seznam objednávek přes tentýž objekt Order.
CQRS tuto zodpovědnost explicitně rozděluje do dvou oddělených modelů, z nichž každý může být
optimalizován pro svůj specifický úkol.
Základní principy CQRS:
- Commands - Příkazy, které mění stav systému. Ve striktním pojetí CQS nevracejí žádná data; v praxi CQRS mohou vracet identifikátor vytvořeného záznamu.
- Queries - Dotazy, které vrací data, ale nemění stav systému.
- Oddělené modely - Write model (bohatý doménový model s business logikou) a Read model (jednoduchá denormalizovaná datová struktura optimalizovaná pro dotazy).
- Oddělené databáze - V pokročilých implementacích mohou být použity oddělené databáze pro čtení a zápis, čímž lze nezávisle škálovat zátěž.
CQRS je často používán v kombinaci s Event Sourcing, což je vzor, který ukládá změny stavu jako sekvenci událostí místo ukládání aktuálního stavu. Tyto dva vzory jsou však nezávislé - lze plnohodnotně implementovat CQRS s klasickou Doctrine ORM persistencí na write straně a denormalizovanými tabulkami na read straně, aniž by se sahalo po Event Sourcingu.
CQS vs. CQRS - kde je hranice?
Bertrand Meyer formuloval princip Command-Query Separation (CQS) jako pravidlo na úrovni metod: každá metoda by měla buď měnit stav (command), nebo vracet hodnotu (query), ale nikdy obojí. CQS je návrhové pravidlo pro rozhraní tříd.
Greg Young posunul tuto myšlenku na architektonickou úroveň: CQRS není pravidlo pro jednotlivé metody, ale rozhodnutí o struktuře celé aplikace. Místo jednoho doménového modelu vznikají dva oddělené modely - každý s vlastní sadou tříd, vlastním úložištěm a vlastním optimalizačním profilem.
CQS vs. CQRS - přehled
| Aspekt | CQS | CQRS |
|---|---|---|
| Úroveň | Metoda / třída | Architektura celé aplikace |
| Pravidlo | Metoda buď mění stav, nebo vrací data | Oddělený write model a read model |
| Počet modelů | Jeden sdílený model | Dva (nebo více) oddělených modelů |
| Databáze | Sdílená | Může být oddělená (write DB + read DB) |
| Složitost | Nízká - jde o konvenci | Střední až vysoká - jde o architekturu |
| Příklad | getBalance() nemodifikuje účet |
RegisterUserHandler a GetUserProfileHandler pracují s různými datovými strukturami |
V praxi se CQS přirozeně stává výchozím bodem pro CQRS. Pokud dodržujete CQS na úrovni metod, zjistíte, že metody měnící stav (command methods) mají výrazně odlišné požadavky na data než metody, které stav čtou (query methods). CQRS toto pozorování formalizuje rozdělením do dvou explicitních modelů.
Úrovně adopce CQRS
CQRS není binární volba. Existuje spektrum úrovní adopce, od nejjednodušší po nejpokročilejší:
- Oddělené handlery - Command handlers a query handlers jako samostatné třídy, ale sdílí tutéž databázi a ORM entity. Nejjednodušší forma CQRS, vhodná pro většinu aplikací.
- Oddělené modely - Write model používá doménové entity (Doctrine ORM), read model používá vlastní DTO/ViewModely plněné přímým SQL nebo Doctrine DBAL. Sdílená databáze, ale oddělené PHP třídy.
- Oddělená úložiště - Write databáze (PostgreSQL) a read databáze (Elasticsearch, Redis, denormalizované tabulky). Změny se propagují asynchronně přes události.
- CQRS + Event Sourcing - Write side ukládá události do Event Store, read side buduje projekce z event streamu. Nejvyšší složitost, ale i nejvyšší flexibilita.
Doporučený přístup: začněte na úrovni 1 nebo 2. Na úroveň 3 a 4 přejděte teprve tehdy, když to vyžadují konkrétní škálovací nebo business požadavky. Pokud máte existující CRUD aplikaci, postup migrace popisuje kapitola Migrace z CRUD.
Důležité: CQRS se v rámci DDD typicky aplikuje per Bounded Context, nikoli globálně na celou aplikaci. Core doména s komplexní logikou může těžit z plného CQRS (úroveň 3–4), zatímco podpůrné kontexty (notifikace, administrace) si vystačí s jednoduchým CRUD - viz Bounded Contexts.
Výhody CQRS
CQRS přináší řadu architektonických výhod, které se projevují zejména u aplikací s netriviální doménovou logikou a odlišnými požadavky na čtení a zápis:
- Oddělení zodpovědností - Write model se soustředí výhradně na business logiku, validaci invariantů a konzistenci dat. Read model se soustředí na rychlou prezentaci dat uživateli. Každý model obsahuje přesně to, co potřebuje - nic navíc.
- Nezávislá optimalizace - Write model může používat normalizované relační schéma a Doctrine ORM entity s bohatou doménovou logikou. Read model může být denormalizovaná tabulka, Elasticsearch index, nebo Redis cache - cokoli, co nejlépe vyhovuje konkrétním dotazům.
- Škálovatelnost - Ve většině aplikací výrazně převažuje čtení nad zápisem (poměr 10:1 až 100:1). CQRS umožňuje nezávisle škálovat read stranu (repliky, cache, CDN) bez dopadu na write stranu.
- Testovatelnost - Command handlers se testují jako čistě doménová logika (given state → when command → then events/state). Query handlers se testují na správnost vrácených dat. Žádné propletení obou zodpovědností v jedné testovací sadě. Viz kapitola Testování DDD kódu.
- Flexibilita evoluce - Read model lze kdykoli přebudovat (rebuild projekcí), přidat nový read model pro nový use case, nebo změnit strukturu dotazu - bez jakéhokoli dopadu na write model a business logiku.
Výzvy a omezení CQRS
CQRS není stříbrná kulka. Jako každý architektonický vzor přináší kompromisy, které je nutné pečlivě zvážit ještě před adopcí:
- Zvýšená složitost kódu - Místo jednoho modelu existují dva (nebo více). Každý command a query vyžaduje vlastní třídu, handler, a často i vlastní datovou strukturu. Pro jednoduchou CRUD operaci to může znamenat 4–6 tříd místo jedné.
- Eventual consistency - Při asynchronní propagaci změn z write strany na read stranu existuje časové okno, kdy read model neodráží poslední zápis. Uživatel může po odeslání formuláře vidět „starou" verzi dat. Řešení tohoto problému vyžaduje promyšlený přístup v UI - viz sekce Eventual Consistency.
- Synchronizace modelů - Při oddělených úložištích je nutné zajistit, že read model bude vždy aktualizován po každé změně write modelu. Selhání propagace (výpadek fronty, chyba projektoru) vede k divergenci modelů.
- Učební křivka - CQRS vyžaduje změnu myšlení oproti tradičnímu přístupu, kde jeden model pokrývá všechny operace. Vývojáři musejí porozumět konceptům jako message bus, eventual consistency, idempotence handlerů a read model projekce.
Kdy nepoužívat CQRS
CQRS nemusí být vhodný pro všechny projekty. Nepoužívejte CQRS, pokud:
- Vyvíjíte jednoduchou aplikaci s minimální doménovou logikou - klasický CRUD s Doctrine ORM bude jednodušší a rychlejší na vývoj.
- Požadavky na čtení a zápis jsou téměř identické - CQRS přináší hodnotu teprve tehdy, když se datové struktury pro zápis a čtení výrazně liší.
- Nemáte potřebu škálovat operace čtení a zápisu nezávisle - pokud celá aplikace běží na jednom serveru a zvládá zátěž, oddělená infrastruktura je zbytečná režie.
- Váš tým nemá zkušenosti s asynchronním zpracováním - eventual consistency problémy mohou být frustrující bez předchozí zkušenosti s distribuovanými systémy.
Dobrým kompromisem je začít s CQRS na úrovni 1 (oddělené handlery, sdílená databáze) a rozšiřovat postupně. Viz také Anti-vzory - Over-engineering u jednoduchých aplikací.
Symfony Messenger jako základ CQRS
Symfony Messenger je komponenta, která poskytuje infrastrukturu pro odesílání a zpracování zpráv. Pro CQRS je podstatná schopnost definovat více message busů - jeden pro příkazy (command bus) a jeden pro dotazy (query bus). Každý bus může mít vlastní sadu middleware, vlastní transport a vlastní strategii zpracování.
Konfigurace Symfony Messenger pro CQRS
# config/packages/messenger.yaml
framework:
messenger:
# Výchozí bus - použitý, když není specifikovaný jiný
default_bus: command.bus
# Konfigurace transportů
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
sync: 'sync://'
# Konfigurace busů
buses:
command.bus:
middleware:
- validation
- doctrine_transaction
query.bus:
middleware:
- validation
# Směrování zpráv - mapuje konkrétní třídy nebo rozhraní na transport
routing:
# Příkazy vhodné pro asynchronní zpracování:
# operace, kde uživatel nemusí čekat na výsledek
App\Notification\Application\Command\SendWelcomeEmail: async
App\Reporting\Application\Command\GenerateMonthlyReport: async
# Dotazy jsou zpracovány synchronně (výchozí, není třeba uvádět)
# App\UserManagement\Profile\Query\GetUserProfile: sync
V této konfiguraci jsou definovány dva transporty: async pro asynchronní zpracování a sync pro synchronní zpracování.
Jsou také definovány dva message busy: command.bus (command bus) pro příkazy s doctrine_transaction
middleware (automatická transakce kolem handleru) a query.bus (query bus) pro dotazy pouze s validací.
Proč dva oddělené busy?
Oddělení command busu a query busu není jen formální gesto. Má konkrétní praktické důsledky:
- Různý middleware - Command bus potřebuje
doctrine_transaction(každý command je atomická operace). Query bus transakci nepotřebuje - jen čte data. - Různé transporty - Commands mohou být směrovány na async transport (frontu). Queries jsou vždy synchronní - uživatel čeká na odpověď.
- Type safety - Pokud controller injektuje
$commandBus, je jasné, že dispatchuje příkaz. Pokud injektuje$queryBus, je jasné, že čte data. To zlepšuje čitelnost kódu a brání náhodným záměnám.
Jak vybrat příkazy pro asynchronní zpracování
Asynchronně zpracovávejte operace, kde uživatel nemusí čekat na výsledek - odesílání e-mailů, generování reportů, aktualizace read modelů, notifikace.
Synchronně zpracovávejte operace vyžadující okamžitou zpětnou vazbu - registrace uživatele, vytvoření objednávky, přihlášení. Uživatel čeká na odpověď (úspěch / chyba validace) a potřebuje ji okamžitě.
Implementace Commands
Commands v CQRS jsou příkazy, které mění stav systému. V Symfony 8 se implementují jako jednoduché PHP třídy - immutabilní datové objekty (DTO), které nesou veškerá data potřebná pro vykonání operace. Command sám o sobě neobsahuje žádnou business logiku; je to pouhý přepravní kontejner dat.
Dobře navržený command má několik vlastností:
- Je immutabilní (
readonlyproperties) - po vytvoření se nemění. - Obsahuje validační atributy - díky middleware
validationna command busu se command validuje ještě před předáním handleru. - Pojmenování vyjadřuje záměr -
RegisterUser,PlaceOrder,CancelSubscription. NeSaveUserneboUpdateOrder. - Neobsahuje reference na doménové objekty - pracuje s primitivními typy (string, int, float), které jsou serializovatelné pro asynchronní transport.
PHP: Implementace příkazu v Symfony 8
<?php
declare(strict_types=1);
namespace App\UserManagement\Registration\Command;
use Symfony\Component\Validator\Constraints as Assert;
/**
* Příkaz pro registraci nového uživatele.
* Immutabilní DTO - neslouží k business logice, pouze přenáší data.
*/
final class RegisterUser
{
public function __construct(
#[Assert\NotBlank]
#[Assert\Length(min: 2, max: 255)]
public readonly string $name,
#[Assert\NotBlank]
#[Assert\Email]
public readonly string $email,
#[Assert\NotBlank]
#[Assert\Length(min: 8)]
public readonly string $password
) {
}
}
V tomto příkladu je RegisterUser příkaz, který obsahuje data potřebná pro registraci uživatele.
Příkaz používá PHP atributy pro validaci dat - ta proběhne automaticky díky validation middleware
na command busu, ještě než se command dostane k handleru.
Mají commands vracet hodnotu?
Ve striktním CQS pojetí commands nevracejí žádná data. V praxi CQRS však existují legitimní scénáře, kdy je užitečné vrátit alespoň identifikátor nově vytvořeného záznamu. Dva běžné přístupy:
- ID generovat na klientovi - Command obsahuje
$userIdjako UUID vygenerované před dispatchem. Handler toto ID použije. Klient zná ID okamžitě, command nemusí nic vracet. Toto je preferovaný přístup. - ID vracet z handleru - Handler vrátí ID přes
HandledStamp. Jednodušší na implementaci, ale porušuje striktní CQS a komplikuje asynchronní zpracování.
Implementace Queries
Queries v CQRS jsou dotazy, které vrací data bez změny stavu systému. Podobně jako commands
se implementují jako immutabilní DTO třídy, ale na rozdíl od commands vždy vracejí
hodnotu - handler vrací data přes HandledStamp.
PHP: Implementace dotazu v Symfony 8
<?php
declare(strict_types=1);
namespace App\UserManagement\Profile\Query;
use Symfony\Component\Validator\Constraints as Assert;
final class GetUserProfile
{
public function __construct(
#[Assert\NotBlank]
#[Assert\Uuid]
public readonly string $userId
) {
}
}
V tomto příkladu je GetUserProfile dotaz, který obsahuje ID uživatele, jehož profil chceme získat.
Dotaz používá atributy pro validaci dat - nevalidní UUID bude odmítnuto ještě před zpracováním.
Queries s filtrováním a stránkováním
Reálné aplikace potřebují dotazy složitější než pouhé „dej mi záznam podle ID". Queries mohou obsahovat filtrovací kritéria, řazení a stránkování:
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Query;
use Symfony\Component\Validator\Constraints as Assert;
final class ListOrders
{
public function __construct(
#[Assert\NotBlank]
#[Assert\Uuid]
public readonly string $customerId,
public readonly ?string $status = null,
#[Assert\Range(min: 1, max: 100)]
public readonly int $limit = 20,
#[Assert\PositiveOrZero]
public readonly int $offset = 0,
public readonly string $sortBy = 'createdAt',
public readonly string $sortDirection = 'DESC',
) {
}
}
Implementace Handlers
Handlers v CQRS jsou objekty, které zpracovávají příkazy a dotazy. V Symfony 8 se implementují
jako PHP třídy s atributem AsMessageHandler a metodou __invoke().
Symfony Messenger automaticky spojí handler s jeho command/query podle type-hintu parametru.
Command handler a query handler mají odlišnou zodpovědnost:
- Command handler - Načte agregát z repozitáře, zavolá na něm business metodu (která validuje invarianty) a uloží změny. Může emitovat doménové události. Pracuje s doménovým modelem (entity, value objects, repozitáře).
- Query handler - Čte data z optimalizovaného zdroje (denormalizovaná tabulka, Elasticsearch, cache) a vrací je jako ViewModel. Nepracuje s doménovým modelem - obchází ho záměrně, protože doménový model není optimalizovaný pro čtení.
PHP: Command handler - RegisterUserHandler
<?php
declare(strict_types=1);
namespace App\UserManagement\Registration\Command;
use App\UserManagement\Domain\Model\User;
use App\UserManagement\Domain\Repository\UserRepository;
use App\UserManagement\Domain\Service\PasswordHasher;
use App\UserManagement\Domain\ValueObject\Email;
use App\UserManagement\Domain\ValueObject\UserId;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class RegisterUserHandler
{
public function __construct(
private UserRepository $userRepository,
private PasswordHasher $passwordHasher
) {
}
public function __invoke(RegisterUser $command): void
{
$email = new Email($command->email);
if ($this->userRepository->findByEmail($email)) {
throw new \DomainException('User with this email already exists');
}
$hashedPassword = $this->passwordHasher->hashPassword(
$command->password
);
$user = User::register(
new UserId(),
$command->name,
$email,
$hashedPassword
);
$this->userRepository->save($user);
}
}
Pozn.: Tato varianta používá PasswordHasher jako závislost handleru.
Alternativní přístup s HashedPassword hodnotovým objektem je ukázán v kapitole
Implementace v Symfony.
PHP: Query handler - GetUserProfileHandler
<?php
declare(strict_types=1);
namespace App\UserManagement\Profile\Query;
use App\UserManagement\Profile\ReadModel\UserProfileReadRepository;
use App\UserManagement\Profile\ViewModel\UserProfileViewModel;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class GetUserProfileHandler
{
public function __construct(
private UserProfileReadRepository $readRepository
) {
}
public function __invoke(GetUserProfile $query): ?UserProfileViewModel
{
return $this->readRepository->findById($query->userId);
}
}
Všimněte si podstatného rozdílu: command handler pracuje s doménovým modelem (UserRepository,
User entita, value objects), zatímco query handler pracuje s read repozitářem
(UserProfileReadRepository), který vrací přímo ViewModel - jednoduchou datovou strukturu
optimalizovanou pro prezentaci. Query handler neprochází přes doménový model.
ViewModely a Read Modely
ViewModel (nebo Read Model) je datová struktura navržená výhradně pro potřeby konkrétního dotazu
nebo obrazovky. Na rozdíl od doménové entity neobsahuje žádnou business logiku - je to čistě
prezentační objekt. Zatímco doménová entita User chrání invarianty a zapouzdřuje
chování, ViewModel UserProfileViewModel obsahuje přesně ta data, která potřebuje
šablona nebo API endpoint.
PHP: UserProfileViewModel
<?php
declare(strict_types=1);
namespace App\UserManagement\Profile\ViewModel;
/**
* Read model pro zobrazení uživatelského profilu.
* Obsahuje pouze data potřebná pro prezentaci - žádná business logika.
*/
final readonly class UserProfileViewModel
{
public function __construct(
public string $userId,
public string $name,
public string $email,
public \DateTimeImmutable $registeredAt,
public int $totalOrders,
public string $membershipTier,
) {
}
}
ViewModel často obsahuje data z více agregátů - v příkladu výše kombinuje údaje o uživateli s počtem objednávek a členskou úrovní. Zápis přes doménový model by vyžadoval načtení uživatele, jeho objednávek a propočet úrovně - pomalé a porušující hranice agregátů. Read model tato data drží připravená v denormalizované podobě.
PHP: Read repozitář s přímým SQL (Doctrine DBAL)
<?php
declare(strict_types=1);
namespace App\UserManagement\Infrastructure\ReadModel;
use App\UserManagement\Profile\ReadModel\UserProfileReadRepository;
use App\UserManagement\Profile\ViewModel\UserProfileViewModel;
use Doctrine\DBAL\Connection;
final class DbalUserProfileReadRepository implements UserProfileReadRepository
{
public function __construct(
private readonly Connection $connection,
) {}
public function findById(string $userId): ?UserProfileViewModel
{
$row = $this->connection->fetchAssociative(
'SELECT u.id, u.name, u.email, u.registered_at,
COUNT(o.id) AS total_orders,
COALESCE(m.tier, :defaultTier) AS membership_tier
FROM users u
LEFT JOIN orders o ON o.customer_id = u.id
LEFT JOIN memberships m ON m.user_id = u.id
WHERE u.id = :userId
GROUP BY u.id',
['userId' => $userId, 'defaultTier' => 'standard'],
);
if (!$row) {
return null;
}
return new UserProfileViewModel(
userId: $row['id'],
name: $row['name'],
email: $row['email'],
registeredAt: new \DateTimeImmutable($row['registered_at']),
totalOrders: (int) $row['total_orders'],
membershipTier: $row['membership_tier'],
);
}
}
Proč ne Doctrine ORM pro read stranu?
Doctrine ORM je navržen pro práci s doménovým modelem - mapuje entity, řeší vztahy, lazy loading, identity map a unit of work. Pro read stranu CQRS je to zbytečná režie. Read model potřebuje pouze rychle načíst data a namapovat je na ViewModel. Doctrine DBAL (přímý SQL přes Connection) nebo nativní PDO je pro tento účel rychlejší, jednodušší a bez rizika N+1 problémů. O výkonnostních dopadech pojednává kapitola Výkonnostní aspekty.
Implementace Command a Query Buses
Command a Query Buses v CQRS jsou objekty, které směrují příkazy a dotazy na příslušné handlery.
V Symfony 8 se pro injektování správného busu používá named autowiring - názvy parametrů
v konstruktoru musejí odpovídat konfiguraci v messenger.yaml:
PHP: Použití command busu v controlleru
<?php
declare(strict_types=1);
namespace App\UserManagement\Registration\Controller;
use App\UserManagement\Registration\Command\RegisterUser;
use App\UserManagement\Registration\Form\RegistrationFormType;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;
final class RegistrationController extends AbstractController
{
public function __construct(
private MessageBusInterface $commandBus
) {
}
#[Route('/register', name: 'app_register')]
public function register(Request $request): Response
{
$form = $this->createForm(RegistrationFormType::class);
$form->handleRequest($request);
if ($form->isSubmitted() && $form->isValid()) {
$data = $form->getData();
$command = new RegisterUser(
$data['name'],
$data['email'],
$data['password']
);
try {
$this->commandBus->dispatch($command);
$this->addFlash('success', 'Váš účet byl vytvořen. Nyní se můžete přihlásit.');
return $this->redirectToRoute('app_login');
} catch (\DomainException $e) {
$this->addFlash('error', $e->getMessage());
}
}
return $this->render('@UserManagement/Registration/View/registration.html.twig', [
'form' => $form->createView(),
]);
}
}
PHP: Použití query busu v controlleru
<?php
declare(strict_types=1);
namespace App\UserManagement\Profile\Controller;
use App\UserManagement\Profile\Query\GetUserProfile;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Routing\Attribute\Route;
use Symfony\Component\Security\Core\User\UserInterface;
final class ProfileController extends AbstractController
{
public function __construct(
private MessageBusInterface $queryBus
) {
}
#[Route('/profile', name: 'app_profile')]
public function profile(UserInterface $user): Response
{
$query = new GetUserProfile($user->getUserIdentifier());
$envelope = $this->queryBus->dispatch($query);
$profile = $envelope->last(HandledStamp::class)->getResult();
if (!$profile) {
throw $this->createNotFoundException('User not found');
}
return $this->render('@UserManagement/Profile/View/profile.html.twig', [
'profile' => $profile,
]);
}
}
V těchto příkladech je commandBus a queryBus injektováno pomocí named autowiring -
Symfony automaticky přiřadí bus podle názvu parametru v konstruktoru (musí odpovídat klíči v konfiguraci
buses v messenger.yaml, kde command.bus se namapuje na $commandBus).
Dosud jsme se soustředili na základní infrastrukturu CQRS - příkazy, dotazy, handlery a busy. V následujících sekcích se zaměříme na pokročilejší aspekty: jak optimalizovat read stranu pro konkrétní dotazy, jak pracovat s eventual consistency a jak řešit provozní problémy v asynchronním prostředí.
Optimalizace Read Modelů
Jednou z hlavních výhod CQRS je možnost nezávisle optimalizovat read stranu pro konkrétní dotazy. Zatímco write model je normalizovaný (aby chránil konzistenci dat), read model může být maximálně denormalizovaný - přesně ve tvaru, který potřebuje UI.
Strategie optimalizace read modelů
Přehled strategií
| Strategie | Popis | Vhodné pro | Složitost |
|---|---|---|---|
| Přímý SQL (DBAL) | Query handler čte z téže DB přes Doctrine DBAL, obchází ORM | Většinu aplikací na úrovni 1–2 | Nízká |
| Denormalizované tabulky | Separátní tabulky s předpočítanými daty, aktualizované přes eventy | Složité dashboard dotazy, reporting | Střední |
| Materialized views (DB) | Databázové materialized views refreshované periodicky nebo triggerem | Agregační dotazy nad velkými daty | Střední |
| Elasticsearch / Meilisearch | Fulltextový engine jako read store, plněný asynchronně z eventů | Fulltextové vyhledávání, faceted search | Vysoká |
| Redis cache | Hotová data serializovaná do Redis, invalidace přes eventy | Vysoká čtecí zátěž, nízká latence | Střední |
Denormalizované tabulky jako read model
Nejrozšířenější strategií v praxi je vytvoření denormalizované tabulky, která obsahuje přesně ta data, jež potřebuje konkrétní obrazovka nebo API endpoint. Tabulka se aktualizuje asynchronně přes doménové události.
PHP: Projektor aktualizující denormalizovanou tabulku
<?php
declare(strict_types=1);
namespace App\Ordering\Infrastructure\Projection;
use App\Ordering\Domain\Event\OrderPlaced;
use App\Ordering\Domain\Event\OrderShipped;
use App\Ordering\Domain\Event\OrderCancelled;
use Doctrine\DBAL\Connection;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
/**
* Asynchronní projektor: naslouchá doménovým událostem a aktualizuje
* denormalizovanou tabulku order_dashboard, optimalizovanou pro
* obrazovku "Přehled objednávek".
*/
#[AsMessageHandler]
final class OrderDashboardProjector
{
public function __construct(
private readonly Connection $connection,
) {}
public function __invoke(OrderPlaced|OrderShipped|OrderCancelled $event): void
{
match (true) {
$event instanceof OrderPlaced => $this->onOrderPlaced($event),
$event instanceof OrderShipped => $this->onOrderShipped($event),
$event instanceof OrderCancelled => $this->onOrderCancelled($event),
};
}
private function onOrderPlaced(OrderPlaced $event): void
{
$this->connection->executeStatement(
'INSERT INTO order_dashboard
(order_id, customer_name, total_amount, status, placed_at, updated_at)
VALUES (:orderId, :customerName, :totalAmount, :status, :placedAt, :updatedAt)
ON DUPLICATE KEY UPDATE
status = VALUES(status), updated_at = VALUES(updated_at)',
[
'orderId' => $event->orderId(),
'customerName' => $event->customerName(),
'totalAmount' => $event->totalAmount(),
'status' => 'placed',
'placedAt' => $event->occurredOn()->format('Y-m-d H:i:s'),
'updatedAt' => $event->occurredOn()->format('Y-m-d H:i:s'),
],
);
}
private function onOrderShipped(OrderShipped $event): void
{
$this->connection->executeStatement(
'UPDATE order_dashboard
SET status = :status,
tracking_number = :trackingNumber,
updated_at = :updatedAt
WHERE order_id = :orderId',
[
'orderId' => $event->orderId(),
'status' => 'shipped',
'trackingNumber' => $event->trackingNumber(),
'updatedAt' => $event->occurredOn()->format('Y-m-d H:i:s'),
],
);
}
private function onOrderCancelled(OrderCancelled $event): void
{
$this->connection->executeStatement(
'UPDATE order_dashboard
SET status = :status, updated_at = :updatedAt
WHERE order_id = :orderId',
[
'orderId' => $event->orderId(),
'status' => 'cancelled',
'updatedAt' => $event->occurredOn()->format('Y-m-d H:i:s'),
],
);
}
}
Idempotence projektorů
Při asynchronním zpracování může být událost doručena více než jednou
(at-least-once delivery). Projektor proto musí být idempotentní - opakované
zpracování téže události nesmí vést k nesprávným datům. V příkladu výše je idempotence
zajištěna konstrukcí ON DUPLICATE KEY UPDATE, která při opakovaném insertu
provede pouze update. Alternativní přístupy:
- Position tracking - projektor si ukládá pozici posledního zpracovaného eventu (event ID nebo sequence number) a ignoruje události se stejnou nebo nižší pozicí.
- Upsert/Merge -
INSERT ... ON CONFLICT DO UPDATE(PostgreSQL) neboREPLACE INTO(MySQL). Jednoduchý, ale méně flexibilní.
Podrobněji o idempotenci projektorů a dalších praktických problémech pojednává kapitola Event Sourcing - Praktické problémy projekcí.
Rebuild projekcí
Klíčovou výhodou CQRS s asynchronními projekcemi je možnost kompletního rebuildu read modelu. Pokud se změní struktura denormalizované tabulky (nový sloupec, jiný formát dat), stačí:
- Vytvořit novou verzi projekční tabulky.
- Přehrát všechny relevantní události přes projektor.
- Přepnout read dotazy na novou tabulku.
- Smazat starou tabulku.
Tento přístup je realizovatelný pouze tehdy, jsou-li zdrojové události stále dostupné (v Event Store nebo v message logu). Bez Event Sourcingu je rebuild projekcí možný, ale musíte mít alternativní zdroj dat (např. change data capture z write databáze).
Eventual Consistency v praxi
Eventual consistency je nejčastějším zdrojem nejistoty při adopci CQRS. Při asynchronní propagaci změn z write strany na read stranu existuje časové okno (typicky milisekundy až jednotky sekund), kdy read model ještě neodráží poslední zápis. Uživatel odešle formulář, dostane potvrzení o úspěchu, ale seznam na další stránce ještě nezobrazuje nový záznam.
Tento problém není bug - je to vlastnost distribuované architektury. Následující diagram zachycuje celý datový tok - od zápisu přes asynchronní propagaci až po čtení - a zvýrazňuje okno, ve kterém k eventual consistency dochází:
Existuje několik osvědčených vzorů, jak eventual consistency v UI řešit:
Strategie řešení v UI
Přehled strategií pro práci s eventual consistency
| Strategie | Princip | Implementace |
|---|---|---|
| Optimistická aktualizace UI | UI okamžitě zobrazí nový stav, aniž čeká na read model | Frontend (JavaScript) přidá záznam do seznamu lokálně po úspěšném POST |
| Post-Redirect-Get s flash | Po command se provede redirect a zobrazí se potvrzující zpráva | Standardní Symfony flash messages - uživatel vidí potvrzení a read model má čas se aktualizovat |
| Polling / Long polling | Frontend periodicky dotazuje read model, dokud nezobrazí aktuální stav | AJAX request každých N milisekund s timeoutem |
| Write-through cache | Command handler po úspěšném zápisu synchronně aktualizuje i read model / cache | Porušuje čisté oddělení, ale eliminuje lag pro kritické operace |
| Synchronní projekce pro kritické cesty | Některé projekce se aktualizují synchronně (ve stejné transakci), ostatní asynchronně | Hybrid: synchronní projekce pro okamžitou konzistenci, asynchronní pro reporting |
PHP: Optimistická aktualizace - controller vrací data z command handleru
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Controller;
use App\Ordering\Application\Command\PlaceOrder;
use App\Ordering\Application\Form\PlaceOrderFormType;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;
use Ramsey\Uuid\Uuid;
final class PlaceOrderController extends AbstractController
{
public function __construct(
private MessageBusInterface $commandBus,
) {}
#[Route('/orders', name: 'place_order', methods: ['POST'])]
public function __invoke(Request $request): Response
{
$form = $this->createForm(PlaceOrderFormType::class);
$form->handleRequest($request);
if (!$form->isSubmitted() || !$form->isValid()) {
return $this->redirectToRoute('cart');
}
// ID generováno na straně klienta - command nemusí vracet hodnotu
$orderId = Uuid::uuid4()->toString();
$data = $form->getData();
$command = new PlaceOrder(
orderId: $orderId,
customerId: $this->getUser()->getUserIdentifier(),
items: $data['items'],
);
$this->commandBus->dispatch($command);
// Redirect na detail objednávky - read model se může
// ještě aktualizovat, ale uživatel vidí potvrzení
$this->addFlash('success', 'Objednávka byla úspěšně vytvořena.');
return $this->redirectToRoute('order_detail', ['id' => $orderId]);
}
}
Kdy eventual consistency NENÍ přijatelná
Existují scénáře, kde i krátkodobá nekonzistence dat je nepřijatelná:
- Finanční zůstatky - Uživatel nesmí vidět neaktuální stav účtu a provést operaci na základě zastaralých dat.
- Unikátní omezení - Kontrola duplicitního e-mailu při registraci musí být konzistentní v okamžiku zápisu, ne „až se read model aktualizuje".
- Limity a kvóty - Pokud uživatel nesmí překročit limit 10 objednávek denně, kontrola musí být přesná v okamžiku commandu.
Pro tyto scénáře zajistěte konzistenci na write straně (v command handleru přes doménový model a databázová omezení), ne přes read model. Read model eventual consistency má vliv pouze na zobrazení dat, nikoli na business rozhodnutí.
Asynchronní zpracování
Jednou z výhod CQRS je možnost asynchronního zpracování příkazů. V Symfony 8 se asynchronní zpracování konfiguruje přes transporty v Messenger komponentě. Příkaz označený pro asynchronní transport je při dispatchi serializován a zařazen do fronty; Messenger worker jej později vyzvedne a předá handleru.
Konfigurace asynchronního zpracování v Symfony 8
# config/packages/messenger.yaml
framework:
messenger:
# Konfigurace transportů
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: commands
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 60000
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: high_priority
# Směrování zpráv - mapuje konkrétní třídy nebo rozhraní na transport
routing:
# Příkazy vhodné pro asynchronní zpracování:
# odesílání notifikací, generování reportů, aktualizace read modelů
App\Notification\Application\Command\SendWelcomeEmail: async
App\Reporting\Application\Command\GenerateMonthlyReport: async
# Vysoká priorita - aktualizace read modelů pro kritické obrazovky
App\Ordering\Domain\Event\OrderPlaced: async_priority_high
V této konfiguraci jsou příkazy pro odesílání e-mailů a generování reportů směrovány na asynchronní
transport s retry strategií (3 pokusy s exponenciálním backoffem). Pro kritické události je definován
samostatný transport async_priority_high s vlastní frontou - Messenger worker
pro tuto frontu může běžet s vyšší prioritou nebo na dedikovaném serveru.
Spuštění Messenger workerů
# Konzumace zpráv z obou front - high_priority má přednost
$ php bin/console messenger:consume async_priority_high async
# V produkci: Supervisor nebo systemd pro automatický restart
# /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /var/www/app/bin/console messenger:consume async_priority_high async --time-limit=3600 --memory-limit=128M
numprocs=2
autostart=true
autorestart=true
startsecs=0
redirect_stderr=true
stdout_logfile=/var/log/messenger-worker.log
Produkční provoz workerů
Messenger workery jsou dlouhodobě běžící procesy. V produkci je nutné zajistit:
- Automatický restart - Worker může spadnout (memory leak, neočekávaná výjimka). Supervisor nebo systemd zajistí automatický restart.
- Time limit a memory limit -
--time-limit=3600ukončí worker po hodině,--memory-limit=128Mpo dosažení limitu paměti. Supervisor pak worker restartuje s čistým stavem. - Graceful shutdown - Při deployi je třeba workerům poslat signál
SIGTERM. Worker dokončí aktuálně zpracovávanou zprávu a poté se ukončí. Příkazmessenger:stop-workerstoho docílí přes cache signal.
Zpracování chyb a Dead Letter Queue
V asynchronním prostředí je zpracování chyb podstatně odlišné od synchronního zpracování. Při synchronním dispatchi výjimka probublá přímo do controlleru a uživatel vidí chybovou hlášku. Při asynchronním dispatchi je zpráva ve frontě - pokud handler selže, uživatel o tom neví a zpráva musí být zpracována znovu.
Retry strategie
Symfony Messenger podporuje automatické opakování selhavších zpráv. Konfigurace
retry_strategy na transportu definuje, kolikrát a s jakým zpožděním
se handler znovu zavolá:
max_retries: 3- Maximální počet opakování.delay: 1000- Zpoždění prvního opakování (v ms).multiplier: 2- Exponenciální backoff: 1s → 2s → 4s.max_delay: 60000- Maximální zpoždění (60 sekund).
Failed transport (Dead Letter Queue)
Po vyčerpání všech pokusů o retry je zpráva přesunuta na failed transport (dead letter queue). Zprávy na failed transportu čekají na manuální zpracování - vývojář je může prozkoumat, opravit příčinu chyby a znovu odeslat.
Konfigurace failed transportu a diagnostické příkazy
# config/packages/messenger.yaml
framework:
messenger:
failure_transport: failed
transports:
failed:
dsn: 'doctrine://default?queue_name=failed'
# Zobrazení selhavších zpráv
$ php bin/console messenger:failed:show
# Detail konkrétní selhavší zprávy (včetně výjimky)
$ php bin/console messenger:failed:show 42
# Opakované zpracování selhavší zprávy
$ php bin/console messenger:failed:retry 42
# Opakování všech selhavších zpráv
$ php bin/console messenger:failed:retry
# Trvalé odstranění selhavší zprávy (po analýze)
$ php bin/console messenger:failed:remove 42
Monitoring selhavších zpráv
Dead letter queue není „koš" - je to fronta vyžadující pozornost.
V produkčním systému musíte monitorovat počet zpráv na failed transportu
a nastavit alerting (např. přes Prometheus metriky nebo jednoduchý cron job
kontrolující messenger:failed:show --format=json). Neošetřené selhávající
zprávy mohou znamenat, že read model diverguje od write modelu, události se ztrácejí,
nebo notifikace nejsou doručovány.
Middleware v CQRS
Symfony Messenger middleware je řetěz (chain) komponent, které zpracovávají zprávu před a po jejím předání handleru. Middleware umožňuje přidat cross-cutting concerns (validace, logování, transakce, autorizace) bez změny samotných handlerů.
Vestavěné middleware validation a doctrine_transaction jsme
již viděli v konfiguraci. Pro pokročilejší scénáře si můžete vytvořit vlastní middleware:
PHP: Logovací middleware pro command bus
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Middleware;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
final class CommandLoggingMiddleware implements MiddlewareInterface
{
public function __construct(
private readonly LoggerInterface $logger,
) {}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$commandName = (new \ReflectionClass($message))->getShortName();
$this->logger->info('Dispatching command: {command}', [
'command' => $commandName,
// Pozor: v produkci filtrujte citlivá pole (hesla, tokeny)
// pomocí vlastního serializéru nebo allowlistu properties
'payload' => get_object_vars($message),
]);
$startTime = microtime(true);
try {
$envelope = $stack->next()->handle($envelope, $stack);
$this->logger->info('Command handled: {command} ({duration}ms)', [
'command' => $commandName,
'duration' => round((microtime(true) - $startTime) * 1000, 2),
]);
return $envelope;
} catch (\Throwable $e) {
$this->logger->error('Command failed: {command} - {error}', [
'command' => $commandName,
'error' => $e->getMessage(),
'duration' => round((microtime(true) - $startTime) * 1000, 2),
]);
throw $e;
}
}
}
Registrace vlastního middleware
# config/packages/messenger.yaml
framework:
messenger:
buses:
command.bus:
middleware:
- App\Infrastructure\Messenger\Middleware\CommandLoggingMiddleware
- validation
- doctrine_transaction
Pořadí middleware je důležité: v příkladu výše se logování provede jako první (zachytí
i validační chyby), následuje validace (odmítne nevalidní command ještě před zahájením
transakce) a nakonec doctrine_transaction (obalí handler do DB transakce).
Testování CQRS
CQRS přirozeně vede k snadno testovatelné architektuře. Command handlers, query handlers a projektory jsou izolované komponenty s jasně definovanými vstupy a výstupy. Testovací strategie se liší podle testovaného komponentu:
Testování command handlerů
Command handler se testuje jako unit test s mocknutým repozitářem. Ověřujete, že handler správně validuje invarianty, volá doménový model a ukládá změny:
PHP: Test command handleru
<?php
declare(strict_types=1);
namespace Tests\UserManagement\Registration\Command;
use App\UserManagement\Domain\Model\User;
use App\UserManagement\Domain\Repository\UserRepository;
use App\UserManagement\Domain\Service\PasswordHasher;
use App\UserManagement\Registration\Command\RegisterUser;
use App\UserManagement\Registration\Command\RegisterUserHandler;
use PHPUnit\Framework\TestCase;
final class RegisterUserHandlerTest extends TestCase
{
public function testRegistersNewUser(): void
{
$repository = $this->createMock(UserRepository::class);
$repository->expects($this->once())
->method('findByEmail')
->willReturn(null); // žádný existující uživatel
$repository->expects($this->once())
->method('save')
->with($this->isInstanceOf(User::class));
$hasher = $this->createMock(PasswordHasher::class);
$hasher->method('hashPassword')->willReturn('hashed_password');
$handler = new RegisterUserHandler($repository, $hasher);
$handler(new RegisterUser(
name: 'Jan Novák',
email: 'jan@example.com',
password: 'securepassword123',
));
}
public function testRejectsDuplicateEmail(): void
{
$repository = $this->createMock(UserRepository::class);
$repository->method('findByEmail')
->willReturn($this->createMock(User::class)); // uživatel existuje
$hasher = $this->createMock(PasswordHasher::class);
$handler = new RegisterUserHandler($repository, $hasher);
$this->expectException(\DomainException::class);
$this->expectExceptionMessage('User with this email already exists');
$handler(new RegisterUser(
name: 'Jan Novák',
email: 'jan@example.com',
password: 'securepassword123',
));
}
}
Testování query handlerů
Query handler se testuje na správnost mapování dat z read repozitáře na ViewModel. Pro integrační testy s reálnou databází můžete ověřit i správnost SQL dotazů:
PHP: Test query handleru
<?php
declare(strict_types=1);
namespace Tests\UserManagement\Profile\Query;
use App\UserManagement\Profile\Query\GetUserProfile;
use App\UserManagement\Profile\Query\GetUserProfileHandler;
use App\UserManagement\Profile\ReadModel\UserProfileReadRepository;
use App\UserManagement\Profile\ViewModel\UserProfileViewModel;
use PHPUnit\Framework\TestCase;
final class GetUserProfileHandlerTest extends TestCase
{
public function testReturnsProfileForExistingUser(): void
{
$expectedProfile = new UserProfileViewModel(
userId: '550e8400-e29b-41d4-a716-446655440000',
name: 'Jan Novák',
email: 'jan@example.com',
registeredAt: new \DateTimeImmutable('2025-01-15'),
totalOrders: 5,
membershipTier: 'gold',
);
$readRepository = $this->createMock(UserProfileReadRepository::class);
$readRepository->method('findById')
->with('550e8400-e29b-41d4-a716-446655440000')
->willReturn($expectedProfile);
$handler = new GetUserProfileHandler($readRepository);
$result = $handler(new GetUserProfile('550e8400-e29b-41d4-a716-446655440000'));
$this->assertSame($expectedProfile, $result);
}
public function testReturnsNullForNonExistingUser(): void
{
$readRepository = $this->createMock(UserProfileReadRepository::class);
$readRepository->method('findById')->willReturn(null);
$handler = new GetUserProfileHandler($readRepository);
$result = $handler(new GetUserProfile('non-existing-id'));
$this->assertNull($result);
}
}
Testování projektorů
Projektory se nejlépe testují jako integrační testy s reálnou databází. Ověřujete, že po zpracování sekvence událostí read model obsahuje očekávaná data:
PHP: Integrační test projektoru
<?php
declare(strict_types=1);
namespace Tests\Ordering\Infrastructure\Projection;
use App\Ordering\Domain\Event\OrderPlaced;
use App\Ordering\Domain\Event\OrderShipped;
use App\Ordering\Infrastructure\Projection\OrderDashboardProjector;
use Doctrine\DBAL\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
final class OrderDashboardProjectorTest extends KernelTestCase
{
private Connection $connection;
private OrderDashboardProjector $projector;
protected function setUp(): void
{
$this->connection = self::getContainer()->get(Connection::class);
$this->projector = new OrderDashboardProjector($this->connection);
// Vyčistit testovací tabulku
$this->connection->executeStatement('DELETE FROM order_dashboard');
}
public function testProjectsOrderLifecycle(): void
{
// Given: objednávka byla vytvořena
($this->projector)(new OrderPlaced(
orderId: 'order-1',
customerName: 'Jan Novák',
totalAmount: 1500,
));
// When: objednávka byla odeslána
($this->projector)(new OrderShipped(
orderId: 'order-1',
trackingNumber: 'CZ123456789',
));
// Then: read model obsahuje aktuální stav
$row = $this->connection->fetchAssociative(
'SELECT * FROM order_dashboard WHERE order_id = :id',
['id' => 'order-1'],
);
$this->assertSame('shipped', $row['status']);
$this->assertSame('CZ123456789', $row['tracking_number']);
$this->assertSame(1500, (int) $row['total_amount']);
}
public function testIdempotentProjection(): void
{
$event = new OrderPlaced(
orderId: 'order-2',
customerName: 'Eva Černá',
totalAmount: 800,
);
// Zpracovat stejnou událost dvakrát (at-least-once delivery)
($this->projector)($event);
($this->projector)($event);
// Read model obsahuje záznam pouze jednou
$count = $this->connection->fetchOne(
'SELECT COUNT(*) FROM order_dashboard WHERE order_id = :id',
['id' => 'order-2'],
);
$this->assertSame(1, (int) $count);
}
}
Kompletnější přehled testovacích strategií pro DDD kód - včetně testování agregátů, value objects a doménových služeb - najdete v kapitole Testování DDD kódu.
Saga / Process Manager
Při použití CQRS s více Bounded Contexts vzniká potřeba koordinovat dlouhotrvající procesy napříč kontexty. Vzor Saga (neboli Process Manager) naslouchá doménovým událostem a na základě nich odesílá příkazy, čímž propojuje command a event stranu CQRS do ucelených business procesů.
Podrobný výklad ság - včetně implementace v Symfony Messenger, kompenzačních strategií a testování - najdete v kapitole Ságy a Process Managery.