Ságy a Process Managery
V předchozí kapitole jsme se zabývali Event Sourcingem - vzorem persistence, který ukládá stav jako sekvenci neměnných událostí. Ságy na tento koncept přirozeně navazují: zatímco Event Sourcing řeší persistenci uvnitř jednoho agregátu, ságy koordinují procesy napříč více agregáty a Bounded Contexts, které spolu komunikují právě prostřednictvím doménových událostí.
Proč potřebujeme ságy?
Jako ilustrativní příklad slouží typický e-shop: zákazník odešle objednávku a systém musí provést čtyři kroky napříč odlišnými Bounded Contexts:
- Ordering - vytvoření objednávky (agregát
Order), - Payment - stržení platby zákazníkovi (agregát
Payment), - Warehouse - rezervace zboží na skladě (agregát
StockReservation), - Shipping - vytvoření zásilky (agregát
Shipment).
Každý z těchto kontextů má vlastní agregát, vlastní databázi (nebo alespoň vlastní tabulky se striktně oddělenou odpovědností) a vlastní invarianty, které musí chránit. Agregáty v různých Bounded Contexts nelze měnit v jedné databázové transakci - to by porušilo autonomii kontextů, jež je základním pilířem DDD. Jeden kontext nesmí sahat do databáze jiného kontextu; komunikace probíhá výhradně prostřednictvím zpráv (událostí a příkazů).
Proč nemůžeme jednoduše zabalit všechny čtyři kroky do jediné databázové transakce? Jednotlivé kontexty mohou běžet na různých serverech, používat různé databázové systémy (PostgreSQL pro objednávky, Redis pro skladové rezervace, externí platební bránu pro platby) a komunikovat asynchronně přes frontu zpráv. Koncept atomické transakce se zde rozpadá.
Proč ne Two-Phase Commit (2PC)?
Distribuované databáze nabízejí protokol Two-Phase Commit (2PC), který koordinuje commit napříč více databázemi. V první fázi (prepare) se všichni účastníci ptají, zda mohou commitnout; ve druhé fázi (commit) koordinátor rozhodne o globálním commitu nebo rollbacku. Tento přístup je však pro DDD systémy nevhodný z několika důvodů:
- Výkonnostní overhead - všichni účastníci drží zámky po celou dobu obou fází, což dramaticky snižuje propustnost systému.
- Tight coupling - všechny kontexty musí být dostupné současně; výpadek jediného účastníka zablokuje celou transakci.
- Single point of failure - koordinátor 2PC je kritické místo; jeho selhání mezi fázemi zanechá účastníky v nejistém stavu.
- Nekompatibilita s autonomií Bounded Contexts - 2PC vyžaduje, aby všechny kontexty sdílely transakční protokol, čímž porušuje princip nezávislého nasazení a vývoje jednotlivých kontextů.
Pro ilustraci uvažujme o scénáři selhání: systém úspěšně strhne platbu zákazníkovi (krok 2), ale při rezervaci skladu zjistí, že zboží není dostupné (krok 3 selže). Zákazník přišel o peníze, zboží nemá a systém je v nekonzistentním stavu. Bez mechanismu, který by tento stav detekoval a napravil, zůstane zákazník bez peněz i bez zboží - což je v produkčním systému nepřijatelné.
Řešení tohoto problému navrhli již v roce 1987 Hector Garcia-Molina a Kenneth Salem ve svém přelomovém článku Sagas. Místo jedné velké distribuované transakce navrhli rozdělit proces na sérii lokálních transakcí, z nichž každá má definovanou kompenzační akci. Pokud některý krok selže, systém provede kompenzační akce pro všechny předchozí úspěšné kroky - v opačném pořadí. Tento vzor se v DDD komunitě ustálil pod názvy Saga a Process Manager.
Citace: Garcia-Molina, H. & Salem, K., Sagas, ACM SIGMOD (1987); Vernon, V., Implementing Domain-Driven Design (2013), kap. 8.
V následujících sekcích si ukážeme dva základní přístupy ke koordinaci ság - choreografii a orchestraci - a jejich praktickou implementaci v Symfony 8 s využitím Symfony Messenger.
Kompenzační transakce
Kompenzační transakce je sémantické vrácení efektu předchozího kroku. Na rozdíl od technického rollbacku databázové transakce (který „vymaže" změny, jako by se nikdy nestaly) je kompenzace plnohodnotná business operace, která má vlastní vedlejší efekty - notifikace, auditní záznamy, aktualizace stavů. Kompenzace nevrací systém do původního stavu bit po bitu; místo toho přivede systém do stavu, který je z business pohledu ekvivalentní stavu před provedením kompenzovaného kroku.
Pro náš e-shop scénář vypadá mapa akcí a jejich kompenzací následovně:
| Akce | Kompenzace | Poznámka |
|---|---|---|
ChargeCustomer |
RefundCustomer |
Zahrnuje notifikaci zákazníka |
ReserveStock |
ReleaseStock |
Uvolnění rezervace, nikoliv smazání |
CreateShipment |
CancelShipment |
Pouze do okamžiku odeslání |
Klíčový princip: kompenzace není přesný inverzní příkaz. Zatímco
ChargeCustomer strhne peníze, kompenzační RefundCustomer nejenže
vrátí peníze, ale navíc odešle zákazníkovi notifikaci o vrácení platby, zapíše záznam
do auditního logu a může aktualizovat interní metriky. Každá kompenzace je samostatný
příkaz s vlastní logikou, validací a vedlejšími efekty.
PHP: Rozhraní CompensatableCommand
<?php
declare(strict_types=1);
namespace App\SharedKernel\Application\Command;
/**
* Command, který lze kompenzovat - definuje svůj "undo" příkaz.
*/
interface CompensatableCommand
{
/**
* Vrátí příkaz, který sémanticky vrátí efekt tohoto příkazu.
*/
public function compensation(): object;
}
PHP: ChargeCustomer s kompenzací
<?php
declare(strict_types=1);
namespace App\Payment\Application\Command;
use App\SharedKernel\Application\Command\CompensatableCommand;
final readonly class ChargeCustomer implements CompensatableCommand
{
public function __construct(
public string $orderId,
public string $customerId,
public int $amountCents,
) {}
public function compensation(): RefundCustomer
{
return new RefundCustomer(
orderId: $this->orderId,
customerId: $this->customerId,
amountCents: $this->amountCents,
reason: 'Saga compensation',
);
}
}
Kompenzace musí být idempotentní
V distribuovaném systému se může stát, že kompenzační příkaz bude doručen více než
jednou - například kvůli retry mechanismu Symfony Messenger, výpadku workeru nebo
duplikaci zprávy ve frontě. Proto musí být každá kompenzace idempotentní:
opakované provedení téhož kompenzačního příkazu nesmí mít žádný další efekt.
Typicky se toho dosahuje kontrolou aktuálního stavu před provedením akce
(např. RefundCustomer nejprve ověří, zda platba již nebyla vrácena).
Choreografie
Choreografie je přístup ke koordinaci ságy, při němž neexistuje centrální koordinátor. Každý Bounded Context reaguje na události publikované jinými kontexty a na jejich základě provádí svůj krok procesu. Žádná služba neví o celém toku - každá zná pouze svou část a ví, na které události má reagovat.
V našem e-shop scénáři probíhá choreografická sága následovně: kontext Ordering
publikuje událost OrderPlaced. Kontext Payment na ni reaguje, strhne
platbu a publikuje PaymentSucceeded. Kontext Warehouse naslouchá
události PaymentSucceeded, rezervuje zboží a publikuje
StockReserved. Kontext Shipping reaguje na StockReserved
a vytvoří zásilku, čímž publikuje ShipmentCreated. Celý tok vzniká
emergentně z reakcí jednotlivých kontextů na události ostatních - bez centrálního
řízení.
PHP: Choreografie - tři nezávislé handlery
Handler 1 - InitiatePaymentOnOrderPlaced:
<?php
declare(strict_types=1);
namespace App\Payment\Application\Handler;
use App\Ordering\Domain\Event\OrderPlaced;
use App\Payment\Application\Command\ChargeCustomer;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final readonly class InitiatePaymentOnOrderPlaced
{
public function __construct(
private MessageBusInterface $commandBus,
) {}
public function __invoke(OrderPlaced $event): void
{
$this->commandBus->dispatch(new ChargeCustomer(
orderId: $event->orderId,
customerId: $event->customerId,
amountCents: $event->totalAmountCents,
));
}
}
Handler 2 - ReserveStockOnPaymentSucceeded:
<?php
declare(strict_types=1);
namespace App\Warehouse\Application\Handler;
use App\Payment\Domain\Event\PaymentSucceeded;
use App\Warehouse\Application\Command\ReserveStock;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final readonly class ReserveStockOnPaymentSucceeded
{
public function __construct(
private MessageBusInterface $commandBus,
) {}
public function __invoke(PaymentSucceeded $event): void
{
$this->commandBus->dispatch(new ReserveStock(
orderId: $event->orderId,
));
}
}
Handler 3 - CreateShipmentOnStockReserved:
<?php
declare(strict_types=1);
namespace App\Shipping\Application\Handler;
use App\Warehouse\Domain\Event\StockReserved;
use App\Shipping\Application\Command\CreateShipment;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final readonly class CreateShipmentOnStockReserved
{
public function __construct(
private MessageBusInterface $commandBus,
) {}
public function __invoke(StockReserved $event): void
{
$this->commandBus->dispatch(new CreateShipment(
orderId: $event->orderId,
));
}
}
YAML: Konfigurace Messenger pro choreografii
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Ordering\Domain\Event\OrderPlaced': async
'App\Payment\Domain\Event\PaymentSucceeded': async
'App\Warehouse\Domain\Event\StockReserved': async
Hlavní výhodou choreografie je volné provázání (loose coupling) mezi
kontexty. Žádný kontext nemusí znát ostatní kontexty - reaguje pouze na události, které
mu přicházejí. Přidání nového kontextu (například Loyalty, který přidělí body za
objednávku) je triviální: stačí vytvořit nový handler naslouchající
OrderPlaced. Pro jednoduché procesy se dvěma až třemi kroky je choreografie
elegantní a snadno pochopitelné řešení.
Limity choreografie
Choreografie funguje dobře pro jednoduché procesy, ale s rostoucí komplexitou naráží na reálné limity. V praxi se tyto problémy projeví obvykle ve chvíli, kdy proces zahrnuje pět a více kontextů nebo vyžaduje podmíněné větvení toku.
1. Neviditelný tok procesu
Při choreografii neexistuje žádné jedno místo, kde by byl celý business proces popsán. Tok procesu je „rozprostřen" do desítek handlerů v různých kontextech. S pěti a více kontexty se stává prakticky nemožné vizualizovat kompletní tok - nikdo nemá přehled o tom, které kroky po sobě následují, kde se proces větví a jaké jsou alternativní cesty při selhání. Vzniká fenomén, který se někdy označuje jako „distribuované špagety" (distributed spaghetti) - analogie ke špagetovému kódu, ale rozloženému do celého systému.
2. Porušení Open-Closed Principle
Přidání nového kroku do procesu často vyžaduje úpravu existujícího kontextu. Například
pokud chceme mezi platbu a sklad vložit krok „ověření proti podvodům" (Fraud Detection),
musíme změnit handler ve Warehouse, aby místo události PaymentSucceeded
naslouchal na FraudCheckPassed. Tím porušujeme
Open-Closed Principle - stávající kód kontextu Warehouse je nutné
upravit, aby fungoval s novým krokem. Při orchestraci by stačilo přidat krok do
centrálního Process Manageru bez zásahu do existujících kontextů.
3. Obtížná diagnostika selhání
Když se proces „zasekne", kde hledat příčinu? Každý kontext zná pouze svůj krok -
neví, jaký je celkový stav procesu. Operátor musí ručně procházet logy všech
kontextů, korelovat události podle orderId a rekonstruovat, kde přesně
proces selhal. Neexistuje centrální dashboard, který by zobrazil:
„Objednávka #42 - platba OK, sklad SELHÁNÍ, zásilka NESPUŠTĚNA."
V produkčním prostředí s tisíci objednávkami denně je tento přístup neúnosný.
4. Chybějící timeout management
Kdo detekuje, že proces „visí"? Pokud kontext Payment strhne platbu, ale Warehouse
nikdy nezareaguje (handler spadl, zpráva se ztratila ve frontě), kdo zjistí, že
proces stojí? Každý kontext zná pouze svůj krok a nemá přehled o časových limitech
celého procesu. V choreografii neexistuje přirozené místo pro definici globálního
timeoutu - nikdo nehlídá, že celý proces od OrderPlaced po
ShipmentCreated musí trvat maximálně 30 minut.
Všechny tyto problémy poukazují na jednu věc: u komplexních procesů potřebujeme centrální místo, které zná celý tok, řídí kroky, detekuje selhání a spouští kompenzace. Tímto centrálním místem je orchestrátor - Process Manager.
Choreografie má své místo
Choreografie je stále legitimním řešením pro jednoduché procesy se dvěma až třemi kroky, kde je tok lineární a selhání řeší jednoduchá kompenzace. Nepřehánějte engineering - pokud váš proces zahrnuje pouze „vytvoření objednávky → stržení platby → potvrzení", choreografie je jednodušší a přímočařejší než plnohodnotný Process Manager. Orchestraci zavádějte až ve chvíli, kdy narazíte na výše popsané problémy.
Orchestrace - Process Manager
Orchestrace je přístup, při kterém existuje centrální koordinátor -
tzv. Process Manager - který řídí celý business proces jako stavový
automat s definovanými stavy a přechody. V našem e-shop scénáři je tímto koordinátorem
třída OrderProcessManager, která přijímá události ze všech kontextů
(Payment, Warehouse, Shipping) a na jejich základě rozhoduje, jaký příkaz vydat jako
další krok. Na rozdíl od choreografie, kde je tok „rozprostřen" do desítek handlerů,
orchestrátor koncentruje celou logiku procesu do jediné třídy - jednoho místa, kde je
viditelný kompletní tok od OrderPlaced po ConfirmOrder.
Následující diagram zobrazuje stavový automat procesu objednávky. Zelené šipky značí úspěšné přechody, červené selhání a oranžová cesta vede přes kompenzaci:
PHP: Enum OrderSagaStatus
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Saga;
enum OrderSagaStatus: string
{
case AwaitingPayment = 'awaiting_payment';
case AwaitingStockReservation = 'awaiting_stock_reservation';
case AwaitingShipment = 'awaiting_shipment';
case Completed = 'completed';
case Compensating = 'compensating';
case Failed = 'failed';
}
PHP: OrderProcessManager - jádro orchestrace
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Saga;
use App\Ordering\Domain\Event\OrderPlaced;
use App\Payment\Domain\Event\PaymentSucceeded;
use App\Payment\Domain\Event\PaymentFailed;
use App\Warehouse\Domain\Event\StockReserved;
use App\Warehouse\Domain\Event\StockReservationFailed;
use App\Shipping\Domain\Event\ShipmentCreated;
use App\Payment\Application\Command\ChargeCustomer;
use App\Payment\Application\Command\RefundCustomer;
use App\Warehouse\Application\Command\ReserveStock;
use App\Shipping\Application\Command\CreateShipment;
use App\Ordering\Application\Command\ConfirmOrder;
use App\Ordering\Application\Command\CancelOrder;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
/**
* Process Manager koordinující objednávkový proces napříč kontexty:
* Ordering → Payment → Warehouse → Shipping → Ordering (potvrzení)
*/
#[AsMessageHandler]
final class OrderProcessManager
{
public function __construct(
private readonly MessageBusInterface $commandBus,
private readonly SagaStateRepositoryInterface $sagaStateRepository,
) {}
public function __invoke(
OrderPlaced|PaymentSucceeded|PaymentFailed|StockReserved|StockReservationFailed|ShipmentCreated $event,
): void {
match (true) {
$event instanceof OrderPlaced => $this->onOrderPlaced($event),
$event instanceof PaymentSucceeded => $this->onPaymentSucceeded($event),
$event instanceof PaymentFailed => $this->onPaymentFailed($event),
$event instanceof StockReserved => $this->onStockReserved($event),
$event instanceof StockReservationFailed => $this->onStockReservationFailed($event),
$event instanceof ShipmentCreated => $this->onShipmentCreated($event),
};
}
private function onOrderPlaced(OrderPlaced $event): void
{
$state = SagaState::start(
sagaType: 'order_process',
correlationId: $event->orderId,
status: OrderSagaStatus::AwaitingPayment,
context: [
'customerId' => $event->customerId,
'amountCents' => $event->totalAmountCents,
'completedSteps' => [],
],
);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new ChargeCustomer(
orderId: $event->orderId,
customerId: $event->customerId,
amountCents: $event->totalAmountCents,
));
}
private function onPaymentSucceeded(PaymentSucceeded $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::AwaitingStockReservation);
$state->updateContext('completedSteps', [
...$state->context()['completedSteps'],
'payment_charged',
]);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new ReserveStock(orderId: $event->orderId));
}
private function onPaymentFailed(PaymentFailed $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::Failed);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new CancelOrder(
orderId: $event->orderId,
reason: 'Platba selhala: ' . $event->failureReason,
));
}
private function onStockReserved(StockReserved $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::AwaitingShipment);
$state->updateContext('completedSteps', [
...$state->context()['completedSteps'],
'stock_reserved',
]);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new CreateShipment(orderId: $event->orderId));
}
private function onStockReservationFailed(StockReservationFailed $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::Compensating);
$this->sagaStateRepository->save($state);
// Kompenzace: vrátit platbu
$this->commandBus->dispatch(new RefundCustomer(
orderId: $event->orderId,
customerId: $state->context()['customerId'],
amountCents: $state->context()['amountCents'],
reason: 'Zboží není skladem',
));
$this->commandBus->dispatch(new CancelOrder(
orderId: $event->orderId,
reason: 'Zboží není skladem',
));
$state->transitionTo(OrderSagaStatus::Failed);
$this->sagaStateRepository->save($state);
}
private function onShipmentCreated(ShipmentCreated $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::Completed);
$state->updateContext('completedSteps', [
...$state->context()['completedSteps'],
'shipment_created',
]);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new ConfirmOrder(orderId: $event->orderId));
}
}
Orchestrace přináší oproti choreografii několik praktických výhod: celý business proces je popsán na jediném místě, takže vývojář okamžitě vidí kompletní tok od objednávky po potvrzení. Debugování je výrazně jednodušší - stačí zkontrolovat stav ságy v databázi a okamžitě je jasné, ve kterém kroku proces stojí. Rozšíření procesu o nový krok (například přidání Fraud Detection mezi platbu a sklad) znamená přidání jedné metody do Process Manageru a jednoho nového stavu do enumu - bez zásahu do existujících kontextů.
Každá metoda = jeden krok stavového automatu
Každá privátní metoda v OrderProcessManager reprezentuje jeden krok
stavového automatu. Přidání nového kroku do procesu znamená přidání jedné metody
a jedné události - stávající metody ani stávající kontexty se nemění. Tím je splněn
Open-Closed Principle: Process Manager je otevřený pro rozšíření
(nové kroky), ale uzavřený pro modifikaci (existující kroky zůstávají beze změny).
Perzistence stavu ságy
Process Manager potřebuje perzistentní úložiště stavu, aby přežil
restart workeru, nové nasazení aplikace i horizontální škálování na více instancí.
Bez perzistence by pád workeru mezi kroky OrderPlaced a
PaymentSucceeded znamenal ztrátu informace o tom, kde se proces nachází -
sága by zůstala navždy „viset" bez možnosti dokončení nebo kompenzace. Stav ságy proto
ukládáme do databáze jako Doctrine entitu.
PHP: SagaState - Doctrine entita
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Saga;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
#[ORM\Table(name: 'saga_state')]
#[ORM\Index(fields: ['correlationId'], name: 'idx_saga_correlation')]
#[ORM\Index(fields: ['status'], name: 'idx_saga_status')]
class SagaState
{
#[ORM\Id]
#[ORM\GeneratedValue]
#[ORM\Column]
private ?int $id = null;
#[ORM\Column(length: 64)]
private string $sagaType;
#[ORM\Column(length: 128)]
private string $correlationId;
#[ORM\Column(length: 32)]
private string $status;
/** @var array<string, mixed> */
#[ORM\Column(type: 'json')]
private array $context = [];
#[ORM\Column]
private \DateTimeImmutable $startedAt;
#[ORM\Column]
private \DateTimeImmutable $updatedAt;
#[ORM\Column(nullable: true)]
private ?\DateTimeImmutable $completedAt = null;
private function __construct() {}
public static function start(
string $sagaType,
string $correlationId,
OrderSagaStatus $status,
array $context = [],
): self {
$state = new self();
$state->sagaType = $sagaType;
$state->correlationId = $correlationId;
$state->status = $status->value;
$state->context = $context;
$state->startedAt = new \DateTimeImmutable();
$state->updatedAt = new \DateTimeImmutable();
return $state;
}
public function transitionTo(OrderSagaStatus $newStatus): void
{
$this->status = $newStatus->value;
$this->updatedAt = new \DateTimeImmutable();
if ($newStatus === OrderSagaStatus::Completed || $newStatus === OrderSagaStatus::Failed) {
$this->completedAt = new \DateTimeImmutable();
}
}
public function status(): OrderSagaStatus
{
return OrderSagaStatus::from($this->status);
}
/** @return array<string, mixed> */
public function context(): array
{
return $this->context;
}
public function updateContext(string $key, mixed $value): void
{
$this->context[$key] = $value;
$this->updatedAt = new \DateTimeImmutable();
}
public function correlationId(): string
{
return $this->correlationId;
}
public function isCompleted(): bool
{
return $this->completedAt !== null;
}
public function startedAt(): \DateTimeImmutable
{
return $this->startedAt;
}
public function updatedAt(): \DateTimeImmutable
{
return $this->updatedAt;
}
}
PHP: Rozhraní SagaStateRepositoryInterface
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Saga;
/**
* Rozhraní repozitáře stavu ságy - umožňuje snadnou záměnu
* implementace (Doctrine v produkci, in-memory v testech).
*/
interface SagaStateRepositoryInterface
{
public function save(SagaState $state): void;
public function findByCorrelationId(string $correlationId): SagaState;
/** @return list<SagaState> */
public function findStale(\DateTimeImmutable $olderThan): array;
}
PHP: Doctrine implementace SagaStateRepository
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Saga;
use Doctrine\ORM\EntityManagerInterface;
final readonly class SagaStateRepository implements SagaStateRepositoryInterface
{
public function __construct(
private EntityManagerInterface $em,
) {}
public function save(SagaState $state): void
{
$this->em->persist($state);
$this->em->flush();
}
public function findByCorrelationId(string $correlationId): SagaState
{
$state = $this->em->getRepository(SagaState::class)
->findOneBy(['correlationId' => $correlationId]);
if ($state === null) {
throw new \RuntimeException(
sprintf('Saga state not found for correlation ID "%s"', $correlationId),
);
}
return $state;
}
/** @return list<SagaState> */
public function findStale(\DateTimeImmutable $olderThan): array
{
return $this->em->createQueryBuilder()
->select('s')
->from(SagaState::class, 's')
->where('s.completedAt IS NULL')
->andWhere('s.updatedAt < :threshold')
->setParameter('threshold', $olderThan)
->getQuery()
->getResult();
}
}
Díky perzistenci stavu je obnova po selhání přímočará: představte si, že worker spadne
mezi zpracováním OrderPlaced a příchodem PaymentSucceeded.
Po restartu workeru Messenger znovu doručí nedokončenou událost a Process Manager načte
stav ságy z databáze - okamžitě ví, že sága čeká na platbu
(AwaitingPayment), a pokračuje od správného kroku. Metoda
findStale() v repository navíc umožňuje periodicky detekovat „zaseklé" ságy,
které se déle než stanovený práh neposunuly kupředu, a spustit pro ně kompenzaci nebo
eskalaci.
Optimistické zamykání v produkci
V produkčním prostředí s více workery zvažte přidání sloupce pro optimistické
zamykání (#[ORM\Version]). Bez něj by dva workery zpracovávající
události pro stejnou objednávku mohly současně načíst stejný stav ságy a přepsat
si navzájem změny. Optimistický zámek zajistí, že druhý worker dostane výjimku
OptimisticLockException a Messenger zprávu automaticky zopakuje.
Implementace v Symfony Messenger
Předchozí sekce ukázaly Process Manager (orchestrátor) a perzistenci stavu ságy. Nyní propojíme obě části s Symfony Messenger - asynchronním message busem, který zajistí spolehlivé doručování událostí a příkazů mezi kontexty. Základní konfigurace Messenger busů je popsána v kapitole CQRS - Symfony Messenger; zde se zaměříme na specifika pro ságy: oddělené transporty pro události a příkazy a retry strategie kritické pro spolehlivost dlouhotrvajících procesů.
YAML: Kompletní konfigurace Messenger
# config/packages/messenger.yaml
framework:
messenger:
default_bus: command.bus
buses:
command.bus:
middleware:
- doctrine_transaction
event.bus:
default_middleware:
enabled: true
allow_no_handlers: true
transports:
async_events:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
async_commands:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
routing:
'App\Ordering\Domain\Event\OrderPlaced': async_events
'App\Payment\Domain\Event\PaymentSucceeded': async_events
'App\Payment\Domain\Event\PaymentFailed': async_events
'App\Warehouse\Domain\Event\StockReserved': async_events
'App\Warehouse\Domain\Event\StockReservationFailed': async_events
'App\Shipping\Domain\Event\ShipmentCreated': async_events
'App\Payment\Application\Command\ChargeCustomer': async_commands
'App\Payment\Application\Command\RefundCustomer': async_commands
'App\Warehouse\Application\Command\ReserveStock': async_commands
'App\Warehouse\Application\Command\ReleaseStock': async_commands
'App\Shipping\Application\Command\CreateShipment': async_commands
PHP: Doménová událost OrderPlaced
<?php
declare(strict_types=1);
namespace App\Ordering\Domain\Event;
final readonly class OrderPlaced
{
public function __construct(
public string $orderId,
public string $customerId,
public int $totalAmountCents,
public \DateTimeImmutable $occurredAt = new \DateTimeImmutable(),
) {}
}
Celý tok funguje následovně: agregát Order v kontextu Ordering publikuje
událost OrderPlaced na event bus. Messenger ji podle konfigurace routingu
odešle do transportu async_events. Worker naslouchající na tomto transportu
zprávu vyzvedne a předá ji OrderProcessManager, který ji zpracuje metodou
onOrderPlaced(). Ta uloží stav ságy a dispatchne příkaz
ChargeCustomer na command bus. Messenger tento příkaz routuje do transportu
async_commands, kde ho vyzvedne handler v kontextu Payment. Po úspěšném
zpracování Payment publikuje PaymentSucceeded - a cyklus se opakuje
pro další krok procesu.
Spouštění workerů
V produkci spouštějte oddělené workery pro každý transport:
php bin/console messenger:consume async_events async_commands --time-limit=3600.
Parametr --time-limit zajistí, že worker se po hodině automaticky restartuje
(a uvolní paměť). Pro vysokou dostupnost spouštějte více instancí workeru -
Messenger zajistí, že každou zprávu zpracuje právě jeden worker.
Pozor na ztrátu zpráv: Outbox pattern
Výše uvedená konfigurace předpokládá, že doménová událost se spolehlivě dostane do message brokeru. V praxi to však není samozřejmé: agregát uloží změny do databáze (Doctrine flush), ale dispatch události do fronty může selhat - síťový výpadek, pád workeru mezi flush a dispatch, restart aplikace. Výsledkem je „ztracená" událost a sága, která se nikdy nespustí.
Řešením je Outbox pattern: událost se zapíše do speciální tabulky
outbox v téže databázové transakci jako doménová změna. Samostatný
proces (relay/poller) pak události z outbox tabulky přenáší do message brokeru a po
úspěšném odeslání je označí jako zpracované. Tím je zaručeno, že žádná událost se
neztratí - a to i při selhání mezi kroky. Podrobněji viz kapitola
Event Sourcing, kde je Outbox pattern
popsán v kontextu event store.
Podrobnější informace o asynchronním zpracování zpráv, konfiguraci transportů a retry strategiích najdete v kapitole CQRS - asynchronní zpracování.
Timeouty a deadliny
Co se stane, když událost PaymentSucceeded nikdy nedorazí? Síťový výpadek,
nedostupnost platební brány, ztráta zprávy ve frontě - v distribuovaném systému musíte
vždy počítat s tím, že odpověď nepřijde. Bez explicitního timeout mechanismu sága zůstane
navždy ve stavu AwaitingPayment a objednávka se nikdy nedokončí ani nezruší.
Proto potřebujeme timeout check - odložený příkaz, který po uplynutí
stanovené doby zkontroluje, zda se sága posunula dál, a pokud ne, spustí kompenzaci.
PHP: CheckSagaTimeout command
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Command;
final readonly class CheckSagaTimeout
{
public function __construct(
public string $orderId,
public string $expectedStatus,
) {}
}
PHP: CheckSagaTimeoutHandler
<?php
declare(strict_types=1);
namespace App\Ordering\Application\Handler;
use App\Ordering\Application\Command\CheckSagaTimeout;
use App\Ordering\Application\Command\CancelOrder;
use App\Ordering\Application\Saga\OrderSagaStatus;
use App\Ordering\Application\Saga\SagaStateRepository;
use App\Payment\Application\Command\RefundCustomer;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final readonly class CheckSagaTimeoutHandler
{
public function __construct(
private SagaStateRepositoryInterface $sagaStateRepository,
private MessageBusInterface $commandBus,
) {}
public function __invoke(CheckSagaTimeout $command): void
{
$state = $this->sagaStateRepository->findByCorrelationId($command->orderId);
// Saga se od posledního kroku posunula - timeout neplatí
if ($state->status()->value !== $command->expectedStatus) {
return;
}
// Zapamatovat původní stav před přechodem
$originalStatus = OrderSagaStatus::from($command->expectedStatus);
$state->transitionTo(OrderSagaStatus::Compensating);
$this->sagaStateRepository->save($state);
match ($originalStatus) {
OrderSagaStatus::AwaitingPayment => $this->commandBus->dispatch(
new CancelOrder(orderId: $command->orderId, reason: 'Payment timeout'),
),
OrderSagaStatus::AwaitingStockReservation => $this->compensatePayment($state),
default => null,
};
}
private function compensatePayment(/* SagaState */ $state): void
{
$this->commandBus->dispatch(new RefundCustomer(
orderId: $state->correlationId(),
customerId: $state->context()['customerId'],
amountCents: $state->context()['amountCents'],
reason: 'Timeout: stock reservation not received',
));
$this->commandBus->dispatch(new CancelOrder(
orderId: $state->correlationId(),
reason: 'Timeout waiting for stock reservation',
));
}
}
Timeout check naplánujeme přímo v Process Manageru při zpracování události
OrderPlaced. Messenger nabízí DelayStamp, který zprávu
podrží v transportu po zadanou dobu a teprve poté ji doručí workeru:
PHP: Naplánování timeout checku v OrderProcessManager
use Symfony\Component\Messenger\Stamp\DelayStamp;
private function onOrderPlaced(OrderPlaced $event): void
{
// ... (vytvoření SagaState a dispatch ChargeCustomer - viz sekce 5)
// Naplánovat timeout check za 5 minut
$this->commandBus->dispatch(
new CheckSagaTimeout(
orderId: $event->orderId,
expectedStatus: OrderSagaStatus::AwaitingPayment->value,
),
[new DelayStamp(5 * 60 * 1000)], // 5 minut v milisekundách
);
}
Konfigurovatelné timeouty
Každý krok ságy může vyžadovat jiný timeout. Platební brána typicky potřebuje
5 minut (zákazník zadává údaje karty), rezervace skladu by měla
proběhnout do 30 sekund (interní synchronní operace) a potvrzení
zásilky může trvat i 24 hodin (závisí na externím dopravci).
Timeouty proto definujte konfigurovatelně - ideálně jako parametry v
services.yaml, aby je bylo možné upravit bez změny kódu.
Požadavky na transport
DelayStamp vyžaduje asynchronní transport, který podporuje odložené
doručování zpráv. Doctrine transport tuto funkcionalitu podporuje
nativně (používá sloupec available_at). Pokud používáte
RabbitMQ, potřebujete plugin
rabbitmq-delayed-message-exchange. Synchronní transport
(sync://) DelayStamp ignoruje a zprávu doručí okamžitě.
Kompenzační strategie v praxi
Když krok ságy selže, máme dvě základní strategie, jak situaci řešit. Volba závisí na povaze chyby - je přechodná (síťový výpadek, dočasná nedostupnost služby), nebo trvalá (nedostatek prostředků na účtu, zboží vyprodáno)?
Forward recovery (retry)
Při přechodných chybách je nejjednodušší strategií opakování - pokus
o provedení stejného kroku znovu. Symfony Messenger nabízí vestavěnou retry strategii
s exponenciálním backoffem, kterou jsme konfigurovali v
sekci 7. Worker automaticky opakuje selhané
zprávy podle nastavení max_retries, delay a
multiplier. Tento přístup je vhodný, když věříme, že problém je dočasný
a opakování může uspět.
Backward recovery (kompenzace)
Při trvalých chybách (business failures) musíme spustit kompenzaci -
vrátit systém do konzistentního stavu provedením kompenzačních akcí v
opačném pořadí dokončených kroků. Klíčový princip:
kompenzace je sémantická, nikoli technická. Neděláme
DELETE FROM payments - místo toho dispatchujeme nový doménový příkaz
RefundCustomer, který vytvoří novou transakci (refund). Každá kompenzační
akce je plnohodnotná doménová operace s vlastními pravidly a událostmi.
PHP: Kompenzační logika v opačném pořadí kroků
/**
* Kompenzace: spouštěna při selhání libovolného kroku.
* Provádí kompenzační akce v opačném pořadí dokončených kroků.
*/
private function compensate(SagaState $state): void
{
$completedSteps = $state->context()['completedSteps'] ?? [];
foreach (array_reverse($completedSteps) as $step) {
match ($step) {
'shipment_created' => $this->commandBus->dispatch(
new \App\Shipping\Application\Command\CancelShipment(
orderId: $state->correlationId(),
),
),
'stock_reserved' => $this->commandBus->dispatch(
new ReleaseStock(orderId: $state->correlationId()),
),
'payment_charged' => $this->commandBus->dispatch(
new RefundCustomer(
orderId: $state->correlationId(),
customerId: $state->context()['customerId'],
amountCents: $state->context()['amountCents'],
reason: 'Order saga compensation',
),
),
default => null,
};
}
$state->transitionTo(OrderSagaStatus::Failed);
$this->sagaStateRepository->save($state);
}
Idempotence kompenzačních handlerů
Každý kompenzační handler musí být idempotentní. Zpráva může být
doručena vícekrát (at-least-once delivery), a proto handler musí bezpečně zvládnout
opakované volání. Například RefundCustomerHandler by měl před vytvořením
refundu ověřit, zda refund pro danou objednávku již neexistuje.
Podrobnější informace o Dead Letter Queue, retry strategiích a zpracování chyb v Messenger najdete v kapitole CQRS - zpracování chyb.
Paralelní kroky
Dosud jsme uvažovali sériové provádění kroků - jeden po druhém. V praxi však některé kroky na sobě nezávisí a mohou běžet současně. Například po úspěšné platbě chceme zároveň rezervovat zboží na skladě a vygenerovat fakturu. Obě operace jsou nezávislé - výsledek jedné neovlivňuje druhou. Paralelním zpracováním zkrátíme celkovou dobu trvání ságy.
Princip je jednoduchý: sága dispatchuje oba příkazy současně a přejde do stavu
AwaitingStockAndInvoice. V kontextu si uchovává dva příznaky
(stockReserved a invoiceCreated). Teprve když oba dorazí
jako splněné, sága pokračuje dalším krokem - vytvořením zásilky. Tomuto vzoru se říká
synchronizační bariéra (synchronization barrier).
PHP: Paralelní zpracování kroků se synchronizační bariérou
private function onPaymentSucceeded(PaymentSucceeded $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->transitionTo(OrderSagaStatus::AwaitingStockAndInvoice);
$state->updateContext('stockReserved', false);
$state->updateContext('invoiceCreated', false);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new ReserveStock(orderId: $event->orderId));
$this->commandBus->dispatch(new CreateInvoice(orderId: $event->orderId));
}
private function onStockReserved(StockReserved $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->updateContext('stockReserved', true);
$state->updateContext('completedSteps', [
...$state->context()['completedSteps'] ?? [],
'stock_reserved',
]);
$this->sagaStateRepository->save($state);
$this->proceedIfParallelStepsCompleted($state);
}
private function onInvoiceCreated(InvoiceCreated $event): void
{
$state = $this->sagaStateRepository->findByCorrelationId($event->orderId);
$state->updateContext('invoiceCreated', true);
$state->updateContext('completedSteps', [
...$state->context()['completedSteps'] ?? [],
'invoice_created',
]);
$this->sagaStateRepository->save($state);
$this->proceedIfParallelStepsCompleted($state);
}
private function proceedIfParallelStepsCompleted(SagaState $state): void
{
if ($state->context()['stockReserved'] && $state->context()['invoiceCreated']) {
$state->transitionTo(OrderSagaStatus::AwaitingShipment);
$this->sagaStateRepository->save($state);
$this->commandBus->dispatch(new CreateShipment(
orderId: $state->correlationId(),
));
}
}
Kompenzace paralelních kroků
Paralelní kroky zvyšují složitost kompenzace. Pokud rezervace skladu uspěje, ale
generování faktury selže, musíte sklad uvolnit - přestože samotná rezervace proběhla
správně. Kompenzační logika z předchozí sekce
toto řeší automaticky díky poli completedSteps: kompenzuje se pouze to,
co skutečně proběhlo.
Optimistické zamykání
Při paralelních krocích mohou dvě události (StockReserved a
InvoiceCreated) dorazit téměř současně a oba handlery se pokusí
aktualizovat stejný SagaState záznam. Bez ochrany hrozí ztráta dat
(lost update). Řešením je optimistické zamykání - entita
SagaState obsahuje sloupec version (viz
sekce 6) a při uložení Doctrine ověří, že verze
nebyla mezitím změněna. Pokud ano, vyhodí
OptimisticLockException a Messenger zprávu automaticky zopakuje.
Monitoring a observabilita
Produkční ságy potřebují observabilitu - musíte vědět, které ságy právě běží, které se zasekly a které selhaly. Bez monitoringu je ladění distribuovaných procesů noční můrou: zprávy se ztratí ve frontě, stav ságy zamrzne a nikdo si toho nevšimne, dokud si zákazník nestěžuje. Pro řešení těchto problémů existují dva hlavní nástroje: korelační ID pro trasování a detekce zaseklých ság.
Korelační ID
Každá zpráva v jedné sáze nese stejné korelační ID - typicky
orderId. Díky němu můžete v logu vyfiltrovat všechny zprávy patřící
ke konkrétní objednávce a sledovat celý průběh procesu od začátku do konce.
Více o korelačních identifikátorech najdete v
glosáři.
SagaLoggingMiddleware
Middleware pro Symfony Messenger, který loguje každou zprávu procházející ságou.
Zaregistrujte ho v messenger.yaml a všechny zprávy se automaticky
zaznamenají s korelačním ID:
<?php
declare(strict_types=1);
namespace App\SharedKernel\Infrastructure\Middleware;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
final readonly class SagaLoggingMiddleware implements MiddlewareInterface
{
public function __construct(
private LoggerInterface $logger,
) {}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$messageName = (new \ReflectionClass($message))->getShortName();
$this->logger->info('Saga: zpracovávám zprávu', [
'message' => $messageName,
'correlationId' => $message->orderId ?? 'unknown',
]);
$envelope = $stack->next()->handle($envelope, $stack);
$handledStamp = $envelope->last(HandledStamp::class);
$this->logger->info('Saga: zpráva zpracována', [
'message' => $messageName,
'handler' => $handledStamp?->getHandlerName(),
]);
return $envelope;
}
}
Detekce zaseklých ság
I při nejlepším návrhu se stane, že zpráva se ztratí, worker spadne nebo externí služba přestane odpovídat. Proto potřebujete cron/scheduled command, který pravidelně kontroluje, zda některá sága nezůstala příliš dlouho v mezistavech:
Symfony Console příkaz pro detekci zaseklých ság
<?php
declare(strict_types=1);
namespace App\Ordering\Infrastructure\Command;
use App\Ordering\Application\Saga\SagaStateRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
#[AsCommand(name: 'app:saga:check-stale', description: 'Najde ságy zaseklé déle než 30 minut')]
final class CheckStaleSagasCommand extends Command
{
public function __construct(
private readonly SagaStateRepositoryInterface $sagaStateRepository,
) {
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$threshold = new \DateTimeImmutable('-30 minutes');
$staleSagas = $this->sagaStateRepository->findStale($threshold);
if (count($staleSagas) === 0) {
$io->success('Žádné zaseklé ságy.');
return Command::SUCCESS;
}
$io->warning(sprintf('Nalezeno %d zaseklých ság:', count($staleSagas)));
foreach ($staleSagas as $saga) {
$io->writeln(sprintf(
' [%s] %s - stav: %s, poslední aktivita: %s',
$saga->correlationId(),
'order_process',
$saga->status()->value,
$saga->updatedAt()->format('Y-m-d H:i:s'),
));
}
return Command::FAILURE;
}
}
Integrace s alertingem
V produkčním prostředí napojte detekci zaseklých ság na váš alertingový systém -
Prometheus pro metriky (počet aktivních ság, průměrná doba dokončení),
Grafana pro dashboardy a PagerDuty nebo podobný
nástroj pro eskalaci kritických situací. Příkaz app:saga:check-stale
může běžet jako Kubernetes CronJob nebo Symfony Scheduler task.
Podrobnosti o implementaci middleware v Symfony Messenger najdete v kapitole CQRS - sekce middleware.
Testování ság
Ságy koordinují složité vícekrokové procesy napříč Bounded Contexts - testování je proto kriticky důležité. Pokud sága obsahuje chybu v přechodové logice nebo kompenzacích, důsledky se projeví až v produkci a mohou být velmi nákladné (stržená platba bez doručeného zboží, duplikované zásilky apod.). Testujeme na třech úrovních.
Unit testy stavového automatu
Nejdůležitější úroveň: testujeme samotný Process Manager izolovaně od infrastruktury. Místo skutečného message busu použijeme spy implementaci, která zaznamenává dispatchované příkazy, a místo databáze in-memory repozitář:
PHPUnit test ságy
<?php
declare(strict_types=1);
namespace App\Tests\Ordering\Application\Saga;
use App\Ordering\Application\Saga\OrderProcessManager;
use App\Ordering\Application\Saga\OrderSagaStatus;
use App\Ordering\Application\Saga\SagaState;
use App\Ordering\Application\Saga\SagaStateRepository;
use App\Ordering\Domain\Event\OrderPlaced;
use App\Payment\Application\Command\ChargeCustomer;
use App\Payment\Domain\Event\PaymentFailed;
use App\Payment\Domain\Event\PaymentSucceeded;
use App\Warehouse\Application\Command\ReserveStock;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
final class OrderProcessManagerTest extends TestCase
{
private MessageBusInterface $commandBus;
private SagaStateRepository $repository;
private OrderProcessManager $saga;
/** @var list<object> */
private array $dispatchedCommands = [];
protected function setUp(): void
{
$this->dispatchedCommands = [];
$this->commandBus = new class($this->dispatchedCommands) implements MessageBusInterface {
/** @param list<object> $commands */
public function __construct(private array &$commands) {}
public function dispatch(object $message, array $stamps = []): Envelope
{
$this->commands[] = $message;
return new Envelope($message);
}
};
$this->repository = new InMemorySagaStateRepository();
$this->saga = new OrderProcessManager($this->commandBus, $this->repository);
}
public function testOrderPlacedInitiatesPayment(): void
{
($this->saga)(new OrderPlaced(
orderId: 'order-1',
customerId: 'cust-1',
totalAmountCents: 10000,
));
self::assertCount(1, $this->dispatchedCommands);
self::assertInstanceOf(ChargeCustomer::class, $this->dispatchedCommands[0]);
self::assertSame('order-1', $this->dispatchedCommands[0]->orderId);
$state = $this->repository->findByCorrelationId('order-1');
self::assertSame(OrderSagaStatus::AwaitingPayment, $state->status());
}
public function testPaymentSucceededReservesStock(): void
{
($this->saga)(new OrderPlaced('order-1', 'cust-1', 10000));
$this->dispatchedCommands = [];
($this->saga)(new PaymentSucceeded(orderId: 'order-1'));
self::assertCount(1, $this->dispatchedCommands);
self::assertInstanceOf(ReserveStock::class, $this->dispatchedCommands[0]);
$state = $this->repository->findByCorrelationId('order-1');
self::assertSame(OrderSagaStatus::AwaitingStockReservation, $state->status());
}
public function testPaymentFailedCancelsOrder(): void
{
($this->saga)(new OrderPlaced('order-1', 'cust-1', 10000));
$this->dispatchedCommands = [];
($this->saga)(new PaymentFailed(
orderId: 'order-1',
failureReason: 'Insufficient funds',
));
$state = $this->repository->findByCorrelationId('order-1');
self::assertSame(OrderSagaStatus::Failed, $state->status());
}
}
InMemorySagaStateRepository
Testovací in-memory implementace repozitáře, kterou používáme místo Doctrine:
<?php
declare(strict_types=1);
namespace App\Tests\Ordering\Application\Saga;
use App\Ordering\Application\Saga\SagaState;
use App\Ordering\Application\Saga\SagaStateRepositoryInterface;
final class InMemorySagaStateRepository implements SagaStateRepositoryInterface
{
/** @var array<string, SagaState> */
private array $states = [];
public function save(SagaState $state): void
{
$this->states[$state->correlationId()] = $state;
}
public function findByCorrelationId(string $correlationId): SagaState
{
return $this->states[$correlationId]
?? throw new \RuntimeException(
sprintf('Saga state not found for "%s"', $correlationId),
);
}
}
Další vzory pro testování doménové logiky, agregátů a event handlerů najdete v kapitole Testování DDD aplikací.