Event Sourcing

Ke kapitole patří živá ukázka kódu: Zobrazit na GitHubu →

Co je Event Sourcing?

Tradiční CRUD persistence má slepou skvrnu: při každé změně přepíše předchozí stav a veškerá historie se nenávratně ztrácí. Event Sourcing (ES) tento problém řeší tím, že stav systému neukládá jako aktuální snímek dat, nýbrž jako sekvenci neměnných událostí, jež k danému stavu vedly [1]. Každá změna stavu domény je zaznamenána jako samostatná, pojmenovaná událost se svými daty; aktuální stav agregátu pak vzniká přehráním (replay) těchto událostí od počátku.

Klíčový princip lze vyjádřit větou: "current state is derived from the history of events". Namísto jediného řádku v databázové tabulce, který je přepisován při každé změně, existuje append-only log všech událostí, které kdy na agregátu nastaly.

Porovnání s tradiční CRUD persistencí

V klasickém CRUD přístupu se v tabulce vždy uchovává pouze aktuální stav entity. Jakmile se hodnota změní, předchozí hodnota je ztracena. Event Sourcing naproti tomu ukládá každou změnu jako novou položku do event logu; žádná informace se nikdy nepřepisuje ani nemaže.

CRUD vs. Event Sourcing - přehled

Vlastnost CRUD (tradiční) Event Sourcing
Co se ukládá Aktuální stav entity Sekvence událostí (změn)
Auditní log Vyžaduje extra implementaci Zabudován ve struktuře
Obnova stavu Přímé čtení z tabulky Replay event streamu
Temporální dotazy Obtížné / nemožné Přirozené (replay do libovolného bodu)
Složitost implementace Nízká až střední Vysoká
Výkon čtení Vysoký (přímý select) Vyžaduje projekce / snapshot

Klíčové pojmy Event Sourcingu:

  • Event (Událost) - Neměnný záznam o tom, co se v doméně přihodilo, vyjádřený v minulém čase (např. OrderPlaced, PaymentReceived). Obsahuje všechna data potřebná k rekonstrukci změny stavu.
  • Event Store - Specializované append-only úložiště pro události. Události se do něj pouze přidávají; nikdy se neupravují ani nemažou. Každá událost patří do event streamu konkrétního agregátu.
  • Aggregate (Agregát) - V kontextu ES je agregát rekonstruován přehráním všech událostí ze svého event streamu. Každá mutace stavu agregátu produkuje novou událost místo přímé modifikace atributů.
  • Projection (Projekce) - Read model sestavený z událostí. Projekce transformují event stream do podoby vhodné pro konkrétní dotazy (query) - například denormalizovaná tabulka pro přehled objednávek.
  • Snapshot - Periodicky ukládaný snímek aktuálního stavu agregátu, který slouží jako zkratka při replay. Umožňuje přehrát pouze události novější než poslední snapshot místo celého event streamu od počátku.

Vztah k CQRS

Event Sourcing a CQRS jsou dva samostatné vzory, které se však přirozeně doplňují [2]. Nejsou totéž - lze aplikovat CQRS bez Event Sourcingu a naopak ES bez CQRS, ale jejich kombinace je velmi mocná a v praxi DDD aplikací běžná.

Důvodem přirozené synergie je, že Event Sourcing produkuje události jako prvotřídní artefakt persistence, a CQRS potřebuje způsob, jak aktualizovat read modely při každé změně write strany. Události jsou ideálním mechanismem pro tuto propagaci: write side uloží událost do Event Store, read side ji přečte a aktualizuje projekci.

Datový tok v architektuře ES + CQRS:

  1. Uživatel odešle Command (např. PlaceOrderCommand).
  2. Command Handler načte agregát přehráním jeho event streamu z Event Store.
  3. Agregát validuje command a produkuje jednu nebo více Domain Events.
  4. Nové události jsou uloženy do Event Store (append).
  5. Event Bus (Symfony Messenger) distribuuje události odběratelům.
  6. Projectors přijmou události a aktualizují Read Models.
  7. Uživatel následně dotazuje read model přes Query - čte z optimalizované projekce.

Zásadní rozdíl mezi ES a CQRS

CQRS řeší jak oddělit zápis od čtení - je to organizační vzor zodpovědností. Event Sourcing řeší jak persistovat stav - je to vzor persistence. Při jejich kombinaci ES přirozeně zásobuje CQRS read side daty: každá událost o změně je současně vstupem pro aktualizaci projekcí.

Doménové události jako základ Event Sourcingu

V Event Sourcingu jsou doménové události (Domain Events) primárním datovým artefaktem. Na rozdíl od doménových událostí používaných pouze k notifikaci (side effects) jsou v ES události zdrojem pravdy o stavu systému. Musejí proto splňovat přísné požadavky:

  • Immutabilita - Po vytvoření nelze událost měnit. Veškeré její properties jsou read-only, nastavené v konstruktoru.
  • Serializovatelnost - Událost musí být serializovatelná do trvalého formátu (JSON, MessagePack…) a deserializovatelná zpět, ideálně bez ztráty informace.
  • Verzování - Schéma události se v čase může vyvíjet. Je nutné udržovat kompatibilitu starých událostí nebo implementovat upcasting (transformaci starých verzí na aktuální).
  • Pojmenování v minulém čase - Události vyjadřují fakta, která již nastala: UserRegistered, OrderPlaced, PaymentFailed.
  • Dostatečná granularita dat - Událost musí obsahovat veškerá data potřebná k tomu, aby z ní bylo možné rekonstruovat stav, aniž by byl nutný přístup k externím zdrojům.

PHP: Interface DomainEvent a konkrétní třída UserRegistered

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Event;

use DateTimeImmutable;

/**
 * Společný interface pro všechny doménové události.
 * Všechny implementace musí být immutabilní value objekty.
 */
interface DomainEvent
{
    /** Unikátní identifikátor události (UUID v4). */
    public function eventId(): string;

    /** Čas vzniku události - vždy UTC. */
    public function occurredOn(): DateTimeImmutable;

    /**
     * Název události sloužící k jejímu uložení a vyhledání v Event Store.
     * Konvence: FQCN nebo krátký slug ve tvaru "user.registered".
     */
    public function eventType(): string;

    /**
     * Verze schématu payloadu - klíčová pro upcasting starých událostí.
     * Nové verze události inkrementují toto číslo.
     */
    public function schemaVersion(): int;

    /**
     * Serializace do pole pro uložení do Event Store.
     * Musí obsahovat všechna data potřebná k rekonstrukci stavu.
     *
     * @return array<string, mixed>
     */
    public function toPayload(): array;
}
<?php

declare(strict_types=1);

namespace App\Identity\Domain\Event;

use App\Shared\Domain\Event\DomainEvent;
use DateTimeImmutable;
use Ramsey\Uuid\Uuid;

/**
 * Událost emitovaná po úspěšné registraci uživatele.
 * Immutabilní - všechny properties jsou readonly.
 */
final class UserRegistered implements DomainEvent
{
    private readonly string $eventId;
    private readonly DateTimeImmutable $occurredOn;

    public function __construct(
        private readonly string $userId,
        private readonly string $email,
        private readonly string $fullName,
    ) {
        $this->eventId    = Uuid::uuid4()->toString();
        $this->occurredOn = new DateTimeImmutable('now', new \DateTimeZone('UTC'));
    }

    public function eventId(): string
    {
        return $this->eventId;
    }

    public function occurredOn(): DateTimeImmutable
    {
        return $this->occurredOn;
    }

    public function eventType(): string
    {
        return 'identity.user_registered';
    }

    public function schemaVersion(): int
    {
        return 1; // při změně schématu inkrementujeme a vytvoříme upcaster
    }

    /** @return array<string, mixed> */
    public function toPayload(): array
    {
        return [
            'userId'         => $this->userId,
            'email'          => $this->email,
            'fullName'       => $this->fullName,
        ];
    }

    // --- Gettery (pro použití v aplikační vrstvě) ---

    public function userId(): string  { return $this->userId; }
    public function email(): string   { return $this->email; }
    public function fullName(): string { return $this->fullName; }
}

Konvence pojmenování událostí by měla být konzistentní napříč celým projektem. Doporučený formát pro eventType() je <bounded_context>.<past_tense_verb_noun>, například ordering.order_placed nebo payment.payment_received. Tato konvence usnadňuje routing událostí v Symfony Messenger a jejich filtrování v Event Store.

Implementace Event Store

Event Store je srdcem Event Sourcingu. Jedná se o append-only databázové úložiště, do nějž se ukládají všechny doménové události. Každý záznam reprezentuje jednu událost s jejím kontextem (ke kterému agregátu patří, v jaké verzi streamu, kdy nastala atd.). Záznamy se nikdy nepřepisují ani nemažou.

Struktura tabulky Event Store

SQL: Migrace tabulky event_store (MySQL/MariaDB)

CREATE TABLE event_store (
    id            BIGINT UNSIGNED  NOT NULL AUTO_INCREMENT,
    event_id      CHAR(36)         NOT NULL COMMENT 'UUID v4 události - globálně unikátní',
    aggregate_id  CHAR(36)         NOT NULL COMMENT 'UUID agregátu (vlastníka streamu)',
    aggregate_type VARCHAR(255)    NOT NULL COMMENT 'FQCN nebo slug agregátu, napr. ordering.order',
    event_type    VARCHAR(255)     NOT NULL COMMENT 'Typ události, napr. ordering.order_placed',
    payload       JSON             NOT NULL COMMENT 'Serializovaná data události',
    metadata      JSON             NOT NULL DEFAULT ('{}') COMMENT 'Korelační ID, causation ID, user ID…',
    schema_version SMALLINT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'Verze schématu payloadu - pro upcasting',
    version       INT UNSIGNED     NOT NULL COMMENT 'Pořadové číslo ve streamu agregátu (od 1)',
    occurred_on   DATETIME(6)      NOT NULL COMMENT 'UTC čas vzniku události',

    PRIMARY KEY (id),
    UNIQUE KEY uq_event_id (event_id),
    -- Optimistic locking: dvojice (aggregate_id, version) musí být unikátní
    UNIQUE KEY uq_aggregate_version (aggregate_id, version),
    KEY idx_aggregate_id (aggregate_id),
    KEY idx_aggregate_type (aggregate_type),
    KEY idx_event_type (event_type),
    KEY idx_occurred_on (occurred_on)
) ENGINE=InnoDB
  DEFAULT CHARSET=utf8mb4
  COLLATE=utf8mb4_unicode_ci
  COMMENT='Append-only store všech doménových událostí';

Sloupec version má hlavní roli při implementaci optimistic locking. Před zápisem nové události command handler přečte poslední verzi streamu agregátu. Při insertu databáze vyvolá výjimku z porušení unikátního indexu uq_aggregate_version, pokud mezitím jiný proces zapsal událost se stejnou verzí. Tímto způsobem bez pesimistického zamykání řádků předcházíme ztrátě souběžných zápisů.

PHP: Interface EventStore a Doctrine implementace

<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing;

use App\Shared\Domain\Event\DomainEvent;

interface EventStore
{
    /**
     * Uloží nové události do event streamu agregátu.
     *
     * @param DomainEvent[] $events
     * @param int           $expectedVersion Verze posledního uloženého eventu - slouží
     *                                        pro optimistic locking. Použijte 0 pro nový agregát.
     *
     * @throws ConcurrencyException Pokud $expectedVersion neodpovídá skutečné verzi streamu.
     */
    public function append(
        string $aggregateId,
        string $aggregateType,
        array $events,
        int $expectedVersion,
    ): void;

    /**
     * Načte celý event stream agregátu (nebo od dané verze pro snapshot support).
     *
     * @return EventEnvelope[]
     */
    public function loadStream(
        string $aggregateId,
        int $fromVersion = 1,
    ): array;

    /**
     * Načte všechny události z celého Event Store (pro rebuild projekcí).
     * Vrací generátor pro paměťově efektivní iteraci nad miliony záznamů.
     *
     * @return \Generator<EventEnvelope>
     */
    public function loadAll(int $batchSize = 500): \Generator;
}
<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing;

use App\Shared\Domain\Event\DomainEvent;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;

final class DoctrineEventStore implements EventStore
{
    public function __construct(
        private readonly Connection $connection,
        private readonly EventSerializer $serializer,
    ) {}

    /**
     * @param DomainEvent[] $events
     */
    public function append(
        string $aggregateId,
        string $aggregateType,
        array $events,
        int $expectedVersion,
    ): void {
        $version = $expectedVersion;

        $this->connection->beginTransaction();

        try {
            foreach ($events as $event) {
                $version++;

                $this->connection->insert('event_store', [
                    'event_id'       => $event->eventId(),
                    'aggregate_id'   => $aggregateId,
                    'aggregate_type' => $aggregateType,
                    'event_type'     => $event->eventType(),
                    'payload'        => json_encode($event->toPayload(), JSON_THROW_ON_ERROR),
                    'metadata'       => '{}',
                    'schema_version' => $event->schemaVersion(),
                    'version'        => $version,
                    'occurred_on'    => $event->occurredOn()->format('Y-m-d H:i:s.u'),
                ]);
            }

            $this->connection->commit();
        } catch (UniqueConstraintViolationException $e) {
            $this->connection->rollBack();
            throw new ConcurrencyException(
                "Concurrency conflict for aggregate {$aggregateId} at version {$version}.",
                previous: $e,
            );
        } catch (\Throwable $e) {
            $this->connection->rollBack();
            throw $e;
        }
    }

    /**
     * @return EventEnvelope[]
     */
    public function loadStream(string $aggregateId, int $fromVersion = 1): array
    {
        $rows = $this->connection->fetchAllAssociative(
            'SELECT event_type, payload, schema_version, version, occurred_on
               FROM event_store
              WHERE aggregate_id = :aggregateId
                AND version >= :fromVersion
           ORDER BY version ASC',
            ['aggregateId' => $aggregateId, 'fromVersion' => $fromVersion],
        );

        return array_map(
            fn(array $row) => $this->serializer->deserialize($row),
            $rows,
        );
    }

    /**
     * Iteruje přes celý Event Store v dávkách - paměťově efektivní pro rebuild projekcí.
     *
     * @return \Generator<EventEnvelope>
     */
    public function loadAll(int $batchSize = 500): \Generator
    {
        $lastId = 0;

        do {
            $rows = $this->connection->fetchAllAssociative(
                'SELECT id, event_type, payload, schema_version, version, occurred_on
                   FROM event_store
                  WHERE id > :lastId
               ORDER BY id ASC
                  LIMIT :limit',
                ['lastId' => $lastId, 'limit' => $batchSize],
            );

            foreach ($rows as $row) {
                $lastId = (int) $row['id'];
                yield $this->serializer->deserialize($row);
            }
        } while (count($rows) === $batchSize);
    }
}

Agregát s Event Sourcingem

V klasickém DDD agregát mění svůj stav přímou modifikací vlastních atributů. V Event Sourcingu je tento mechanismus zcela jiný: každá změna stavu musí projít přes doménovou událost. Metody agregátu nevytváří efekt přímo - nahrávají událost, a teprve aplikace této události mění interní stav.

Výsledkem je, že agregát obsahuje dvě sady metod:

  • Mutační metody (public business API) - validují invarianty, rozhodují, která událost nastane, a volají interní metodu pro nahrání události (typicky recordEvent()).
  • apply*() metody (private/protected) - přijmou konkrétní typ události a aplikují změnu na interní stav. Tyto metody jsou volány jak při nahrávání nové události, tak při replay z Event Store.

Tato struktura má přímý dopad na testování: event-sourcované agregáty se testují vzorem given/when/then - given: historické události, when: volání metody, then: nově emitované události. Podrobně viz kapitola Testování DDD kódu.

PHP: Base class EventSourcedAggregate

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

use App\Shared\Domain\Event\DomainEvent;

abstract class EventSourcedAggregate
{
    /** @var DomainEvent[] Události nahrané v aktuální transakci - čekají na uložení. */
    private array $recordedEvents = [];

    private int $version = 0;

    /**
     * Nahrajeme novou událost: aplikujeme ji na stav, zapamatujeme ji pro persistenci
     * a inkrementujeme verzi streamu - nezbytné pro optimistic locking.
     */
    protected function recordEvent(DomainEvent $event): void
    {
        $this->applyEvent($event);
        $this->recordedEvents[] = $event;
        $this->version++;
    }

    /**
     * Přehrajeme historické události z Event Store (bez přidávání do $recordedEvents).
     *
     * @param DomainEvent[] $events
     */
    public static function reconstituteFromEvents(array $events): static
    {
        $aggregate = new static();

        foreach ($events as $event) {
            $aggregate->applyEvent($event);
            $aggregate->version++;
        }

        return $aggregate;
    }

    /**
     * Přehraje dodatečné události na existující instanci (pro snapshot support).
     *
     * @param DomainEvent[] $events
     */
    public function replayEvents(array $events): void
    {
        foreach ($events as $event) {
            $this->applyEvent($event);
            $this->version++;
        }
    }

    /**
     * Dynamické dispatchování na apply*() metody podle třídy události.
     * Konvence: apply + ShortClassName, napr. applyOrderCreated().
     * apply*() metody v podtřídách MUSÍ být protected (ne private),
     * jinak je PHP nemůže volat z kontextu této nadtřídy.
     */
    private function applyEvent(DomainEvent $event): void
    {
        $method = 'apply' . (new \ReflectionClass($event))->getShortName();

        if (!method_exists($this, $method)) {
            throw new \LogicException(
                sprintf('Aggregate %s must implement %s().', static::class, $method)
            );
        }

        $this->$method($event);
    }

    /** @return DomainEvent[] */
    public function releaseDomainEvents(): array
    {
        $events = $this->recordedEvents;
        $this->recordedEvents = [];
        return $events;
    }

    public function version(): int
    {
        return $this->version;
    }
}

PHP: Order agregát s Event Sourcingem

<?php

declare(strict_types=1);

namespace App\Ordering\Domain;

use App\Ordering\Domain\Event\OrderConfirmed;
use App\Ordering\Domain\Event\OrderCreated;
use App\Ordering\Domain\Event\OrderItemAdded;
use App\Ordering\Domain\Event\OrderShipped;
use App\Shared\Domain\EventSourcedAggregate;

final class Order extends EventSourcedAggregate
{
    private string $orderId;
    private string $customerId;
    private OrderStatus $status;

    /** @var OrderItem[] */
    private array $items = [];

    private ?string $trackingNumber = null;

    // Statická továrna - vytvoří objednávku ve stavu Draft
    public static function create(string $orderId, string $customerId): self
    {
        $order = new self();
        $order->recordEvent(new OrderCreated($orderId, $customerId));

        return $order;
    }

    public function addItem(OrderItem $item): void
    {
        if ($this->status !== OrderStatus::Draft) {
            throw new \DomainException('Items can only be added to draft orders.');
        }

        $this->recordEvent(new OrderItemAdded($this->orderId, $item));
    }

    public function confirm(): void
    {
        if ($this->status !== OrderStatus::Draft) {
            throw new \DomainException('Only draft orders can be confirmed.');
        }
        if (empty($this->items)) {
            throw new \DomainException('Cannot confirm an empty order.');
        }

        $this->recordEvent(new OrderConfirmed($this->orderId));
    }

    public function ship(string $trackingNumber): void
    {
        if ($this->status !== OrderStatus::Confirmed) {
            throw new \DomainException('Only confirmed orders can be shipped.');
        }

        $this->recordEvent(new OrderShipped($this->orderId, $trackingNumber));
    }

    // --- apply* metody - MUSÍ být protected (ne private), aby je base class mohla volat dynamicky ---
    // --- Obsahují POUZE změnu interního stavu, žádnou business logiku ---

    protected function applyOrderCreated(OrderCreated $event): void
    {
        $this->orderId    = $event->orderId();
        $this->customerId = $event->customerId();
        $this->items      = [];
        $this->status     = OrderStatus::Draft;
    }

    protected function applyOrderItemAdded(OrderItemAdded $event): void
    {
        $this->items[] = $event->item();
    }

    protected function applyOrderConfirmed(OrderConfirmed $event): void
    {
        $this->status = OrderStatus::Confirmed;
    }

    protected function applyOrderShipped(OrderShipped $event): void
    {
        $this->status         = OrderStatus::Shipped;
        $this->trackingNumber = $event->trackingNumber();
    }

    // Gettery pro aplikační vrstvu
    public function orderId(): string         { return $this->orderId; }
    public function status(): OrderStatus     { return $this->status; }
    public function trackingNumber(): ?string { return $this->trackingNumber; }
}

Načítání agregátu z event streamu (replay)

Repozitář pro event-sourcovaný agregát neprovádí SELECT do tabulky entit. Místo toho načte event stream z Event Store a předá jej statické tovární metodě reconstituteFromEvents(). Výsledný agregát má přesně takový stav, jaký odpovídá historii jeho událostí.

PHP: EventSourced repozitář pro Order agregát

<?php

declare(strict_types=1);

namespace App\Infrastructure\Ordering;

use App\Ordering\Domain\Order;
use App\Infrastructure\EventSourcing\EventStore;
use App\Infrastructure\EventSourcing\EventSerializer;

final class EventSourcedOrderRepository
{
    private const AGGREGATE_TYPE = 'ordering.order';

    public function __construct(
        private readonly EventStore $eventStore,
        private readonly EventSerializer $serializer,
    ) {}

    public function load(string $orderId): Order
    {
        $envelopes = $this->eventStore->loadStream($orderId);

        if (empty($envelopes)) {
            throw new \DomainException("Order {$orderId} not found.");
        }

        $events = array_map(
            fn($envelope) => $this->serializer->toEvent($envelope),
            $envelopes,
        );

        return Order::reconstituteFromEvents($events);
    }

    public function save(Order $order): void
    {
        $newEvents = $order->releaseDomainEvents();

        if (empty($newEvents)) {
            return;
        }

        // expectedVersion = aktuální verze PŘED novými událostmi
        $expectedVersion = $order->version() - count($newEvents);

        $this->eventStore->append(
            $order->orderId(),
            self::AGGREGATE_TYPE,
            $newEvents,
            $expectedVersion,
        );
    }
}

Projekce (Projections)

Projekce jsou read modely sestavené z event streamu. Protože Event Store je append-only a neumožňuje ad-hoc dotazy (např. "všechny objednávky zákazníka X s celkovou hodnotou nad 1000 Kč"), je nutné ze stream událostí vybudovat optimalizované denormalizované datové struktury určené pro čtení.

Synchronní vs. asynchronní projekce

  • Synchronní projekce - Projekce se aktualizuje přímo v téže transakci jako zápis události. Garantuje konzistenci dat v okamžiku odpovědi na command, ale zvyšuje latenci zápisu a zavádí těsnou vazbu mezi write a read stranou.
  • Asynchronní projekce - Události jsou po uložení do Event Store zařazeny do fronty (Symfony Messenger + transport jako RabbitMQ nebo Redis). Projector je konzument, který zpracovává zprávy nezávisle. Read model je v krátkém časovém okně nekonzistentní (eventual consistency), ale write side je rychlejší a decoupled.

PHP: OrderSummaryProjection a asynchronní Projector

<?php

declare(strict_types=1);

namespace App\Infrastructure\Ordering\Projection;

use App\Ordering\Domain\Event\OrderConfirmed;
use App\Ordering\Domain\Event\OrderCreated;
use App\Ordering\Domain\Event\OrderItemAdded;
use App\Ordering\Domain\Event\OrderShipped;
use Doctrine\DBAL\Connection;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

/**
 * Projector budující tabulku order_summary z doménových událostí.
 *
 * Každá metoda handle*() odpovídá jednomu typu události a je registrována
 * jako samostatný Messenger handler atributem #[AsMessageHandler] na úrovni metody.
 */
final class OrderSummaryProjector
{
    public function __construct(
        private readonly Connection $connection,
    ) {}

    #[AsMessageHandler]
    public function __invoke(OrderCreated $event): void
    {
        $this->connection->insert('order_summary', [
            'order_id'      => $event->orderId(),
            'customer_id'   => $event->customerId(),
            'status'        => 'draft',
            'item_count'    => 0,
            'total_amount'  => 0,
            'placed_at'     => $event->occurredOn()->format('Y-m-d H:i:s'),
            'shipped_at'    => null,
            'tracking_no'   => null,
        ]);
    }

    #[AsMessageHandler]
    public function handleOrderItemAdded(OrderItemAdded $event): void
    {
        $this->connection->executeStatement(
            'UPDATE order_summary
                SET item_count   = item_count + 1,
                    total_amount = total_amount + :price
              WHERE order_id = :orderId',
            ['price' => $event->item()->unitPrice(), 'orderId' => $event->orderId()],
        );
    }

    #[AsMessageHandler]
    public function handleOrderConfirmed(OrderConfirmed $event): void
    {
        $this->connection->executeStatement(
            'UPDATE order_summary SET status = :status WHERE order_id = :orderId',
            ['status' => 'confirmed', 'orderId' => $event->orderId()],
        );
    }

    #[AsMessageHandler]
    public function handleOrderShipped(OrderShipped $event): void
    {
        $this->connection->executeStatement(
            'UPDATE order_summary
                SET status      = :status,
                    shipped_at  = :shippedAt,
                    tracking_no = :trackingNo
              WHERE order_id = :orderId',
            [
                'status'     => 'shipped',
                'shippedAt'  => $event->occurredOn()->format('Y-m-d H:i:s'),
                'trackingNo' => $event->trackingNumber(),
                'orderId'    => $event->orderId(),
            ],
        );
    }
}

Aby Symfony Messenger doručoval události projektorům asynchronně, je nutné nakonfigurovat transport a routing v config/packages/messenger.yaml:

YAML: Konfigurace Symfony Messenger pro asynchronní projekce

framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: true

        routing:
            # Všechny doménové události routujeme na async transport
            'App\Ordering\Domain\Event\OrderCreated':    async
            'App\Ordering\Domain\Event\OrderItemAdded': async
            'App\Ordering\Domain\Event\OrderConfirmed': async
            'App\Ordering\Domain\Event\OrderShipped':   async

Projekce lze také přebudovat (rebuild) přehráním celého Event Store od začátku - tato schopnost je jednou z největších výhod Event Sourcingu. Při změně business požadavků stačí vytvořit novou projekci a přehrát historii; u CRUD systémů jsou historická data nenávratně ztracena.

Praktické problémy projekcí

Sekce výše ukázala, jak projekci vybudovat. V praxi se ale objevují problémy, které z jednoduchých ukázek nejsou patrné. Tato sekce pokrývá nejčastější z nich - idempotenci, chybové stavy, rebuild a eventual consistency z pohledu uživatelského rozhraní.

Idempotence projektorů

Asynchronní transport (RabbitMQ, Redis Streams, Amazon SQS) garantuje doručení zprávy alespoň jednou (at-least-once delivery). Zpráva se proto může doručit opakovaně - po timeoutu, restartu workeru nebo síťovém výpadku. Pokud projektor není idempotentní, opakované zpracování způsobí poškozená data: duplicitní řádky, zdvojené částky, nekonzistentní počty.

Idempotenci lze zajistit dvěma způsoby: upsert (INSERT … ON DUPLICATE KEY UPDATE) místo prostého INSERT, nebo tracking tabulka již zpracovaných událostí.

PHP: Idempotentní projektor s tracking tabulkou

<?php

declare(strict_types=1);

namespace App\Infrastructure\Ordering\Projection;

use App\Ordering\Domain\Event\OrderCreated;
use Doctrine\DBAL\Connection;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

/**
 * Idempotentní projektor: před zpracováním ověří, zda událost
 * již nebyla zpracována, pomocí tabulky projection_checkpoint.
 *
 * Pozn.: Atribut #[AsMessageHandler] na třídě registruje __invoke() jako handler.
 * Pro projektory zpracovávající více typů událostí použijte atribut
 * na jednotlivých metodách - viz OrderSummaryProjector výše.
 */
#[AsMessageHandler]
final class IdempotentOrderProjector
{
    public function __construct(
        private readonly Connection $connection,
    ) {}

    public function __invoke(OrderCreated $event): void
    {
        // Atomická kontrola + záznam: INSERT IGNORE vrátí 0 affected rows
        // pokud eventId již existuje → událost byla již zpracována.
        $affected = $this->connection->executeStatement(
            'INSERT IGNORE INTO projection_checkpoint (event_id, projection_name, processed_at)
             VALUES (:eventId, :projection, NOW(6))',
            ['eventId' => $event->eventId(), 'projection' => 'order_summary'],
        );

        if ($affected === 0) {
            return; // Duplicitní doručení - přeskočíme
        }

        // Vlastní projekční logika
        $this->connection->insert('order_summary', [
            'order_id'     => $event->orderId(),
            'customer_id'  => $event->customerId(),
            'status'       => 'draft',
            'item_count'   => 0,
            'total_amount' => 0,
            'placed_at'    => $event->occurredOn()->format('Y-m-d H:i:s'),
        ]);
    }
}

SQL: Tabulka projection_checkpoint pro tracking zpracovaných událostí

CREATE TABLE projection_checkpoint (
    event_id        CHAR(36)     NOT NULL COMMENT 'UUID události - odkaz na event_store.event_id',
    projection_name VARCHAR(100) NOT NULL COMMENT 'Název projekce, napr. order_summary',
    processed_at    DATETIME(6)  NOT NULL COMMENT 'Čas zpracování',

    PRIMARY KEY (event_id, projection_name)
) ENGINE=InnoDB
  DEFAULT CHARSET=utf8mb4
  COLLATE=utf8mb4_unicode_ci
  COMMENT='Tracking tabulka pro idempotentní projektory - zabraňuje duplicitnímu zpracování';

Alternativa: upsert bez tracking tabulky

Pro projekce, kde je výsledkem jediný řádek na agregát (typicky summary tabulky), je jednodušší použít INSERT … ON DUPLICATE KEY UPDATE. Tracking tabulka se vyplatí, když jedna událost aktualizuje více tabulek nebo řádků a potřebujete garantovat, že se celá operace provede právě jednou.

Chybové stavy a retry strategie

Projektor může selhat z mnoha důvodů: dočasná nedostupnost databáze, neplatný payload u staré události bez upcasteru, nebo bug v projekční logice. Symfony Messenger nabízí dvě hlavní mechaniky pro řešení:

  • Retry transport - zpráva se po selhání automaticky vrátí do fronty s exponenciálním backoffem (výchozí: 3 pokusy s násobičem 2).
  • Failed transport (dead letter queue) - po vyčerpání retry pokusů se zpráva přesune do samostatné fronty, kde čeká na manuální zásah. Nedojde ke ztrátě události ani k zablokování zbytku fronty.

YAML: Kompletní konfigurace Messenger s retry a dead letter queue

Následující konfigurace rozšiřuje základní nastavení z předchozí sekce o retry strategii a failed transport:

framework:
    messenger:
        # Failed transport - sem padají zprávy po vyčerpání retry pokusů
        failure_transport: failed

        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    delay: 1000        # 1 sekunda
                    multiplier: 2      # exponenciální backoff: 1s, 2s, 4s
                    max_delay: 60000   # max 60 sekund mezi pokusy

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Ordering\Domain\Event\OrderCreated':    async
            'App\Ordering\Domain\Event\OrderItemAdded': async
            'App\Ordering\Domain\Event\OrderConfirmed': async
            'App\Ordering\Domain\Event\OrderShipped':   async

Pro diagnostiku a opětovné zpracování selhavších zpráv slouží příkazy Symfony Messenger:

  • bin/console messenger:failed:show - zobrazí zprávy v dead letter queue
  • bin/console messenger:failed:retry - pokusí se zprávy znovu zpracovat
  • bin/console messenger:failed:remove {id} - odstraní neplatnou zprávu

Rebuild projekcí

Schopnost přebudovat projekci od začátku je jednou z největších výhod Event Sourcingu, ale v praxi jde o netriviální operaci. Nestačí jen přehrát události - je nutné zajistit, že se rebuild neprovádí souběžně s normálním provozem projektoru, že se stará data korektně odstraní a že po rebuildu projekce odpovídá aktuálnímu stavu Event Store.

PHP: Symfony konzolový příkaz pro rebuild projekce

<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing\Console;

use App\Infrastructure\EventSourcing\EventStore;
use App\Infrastructure\EventSourcing\EventSerializer;
use Doctrine\DBAL\Connection;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

#[AsCommand(
    name: 'app:projection:rebuild',
    description: 'Přebuduje projekci přehráním všech událostí z Event Store.',
)]
final class RebuildProjectionCommand extends Command
{
    /** @var array<string, array{projector: callable, table: string}> Registr projektorů dle názvu */
    private array $projectors;

    /**
     * @param iterable<string, callable> $projectors Symfony tagged_iterator
     * @param array<string, string>      $projectionTables Mapa: název projekce → název tabulky
     */
    public function __construct(
        private readonly Connection $connection,
        private readonly EventStore $eventStore,
        private readonly EventSerializer $serializer,
        iterable $projectors,
        array $projectionTables,
    ) {
        parent::__construct();
        foreach ($projectors as $name => $projector) {
            $this->projectors[$name] = [
                'projector' => $projector,
                'table'     => $projectionTables[$name] ?? throw new \InvalidArgumentException(
                    "Projekce '{$name}' nemá definovanou tabulku v \$projectionTables.",
                ),
            ];
        }
    }

    protected function configure(): void
    {
        $this->addArgument('projection', InputArgument::REQUIRED, 'Název projekce k přebudování');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);
        $name = $input->getArgument('projection');

        if (!isset($this->projectors[$name])) {
            $io->error("Projekce '{$name}' neexistuje. Dostupné: " . implode(', ', array_keys($this->projectors)));
            return Command::FAILURE;
        }

        $config = $this->projectors[$name];
        $table  = $config['table'];

        $io->warning("Rebuild smaže data projekce '{$name}' (tabulka '{$table}') a přehraje celý Event Store.");

        // 1. Smazat stávající data projekce - název tabulky pochází z whitelistu,
        //    nikoli z uživatelského vstupu, takže nehrozí SQL injection.
        $this->connection->executeStatement("TRUNCATE TABLE {$table}");

        // 2. Vymazat checkpoint záznamy pro tuto projekci
        $this->connection->executeStatement(
            'DELETE FROM projection_checkpoint WHERE projection_name = :name',
            ['name' => $name],
        );

        // 3. Přehrát všechny události z Event Store
        $projector = $config['projector'];
        $count = 0;
        $batchSize = 500;

        foreach ($this->eventStore->loadAll($batchSize) as $envelope) {
            $event = $this->serializer->toEvent($envelope);
            $projector($event);
            $count++;

            if ($count % $batchSize === 0) {
                $io->text("Zpracováno {$count} událostí…");
            }
        }

        $io->success("Projekce '{$name}' přebudována. Celkem {$count} událostí.");
        return Command::SUCCESS;
    }
}

Eventual consistency a uživatelské rozhraní

Při asynchronních projekcích existuje časové okno (typicky milisekundy až jednotky sekund), kdy uživatel provede akci (např. potvrdí objednávku), ale read model ještě neobsahuje aktualizovaná data. Uživatel tak po kliknutí na "Potvrdit" může vidět stále "Draft" stav.

Tento problém není bug - je to vlastnost eventual consistency. Existuje několik osvědčených přístupů, jak s ním v UI pracovat:

  • Optimistická aktualizace UI - Frontend po úspěšné odpovědi na command okamžitě zobrazí očekávaný stav (např. "Potvrzeno"), aniž čeká na aktualizaci projekce. Nejjednodušší a nejčastější řešení.
  • Potvrzovací stránka - Po provedení akce přesměrovat uživatele na stránku, která nezávisí na projekci (např. "Objednávka č. X byla potvrzena"), místo okamžitého návratu na výpis.
  • Polling / SSE - Frontend periodicky dotazuje API nebo naslouchá Server-Sent Events, dokud projekce nedorazí do požadovaného stavu.

Synchronní projekce jako pragmatický kompromis

Pokud vaše aplikace nemá vysokou zátěž na write straně a latence zápisu je přijatelná, je zcela legitimní začít se synchronními projekcemi a na asynchronní přejít až ve chvíli, kdy se synchronní aktualizace stane úzkým hrdlem. Vyhýbáte se tak problémům s eventual consistency v raných fázích projektu.

Snapshotting

Se stárnutím systému rostou event streamy agregátů. Agregát s tisíci událostmi vyžaduje načtení a přehrání tisíce řádků z databáze při každém command handleru - to je výkonnostní problém, který se v reálných systémech projevuje velmi brzy.

Řešením je snapshotting: v pravidelných intervalech (každých N událostí, nebo časově) se aktuální stav agregátu serializuje a uloží jako snapshot. Při příštím načtení repozitář nejprve vyhledá poslední snapshot a z Event Store načte pouze události novější než tento snapshot.

Kdy vytvářet snapshots

  • Poté, co event stream agregátu překročí určitý počet událostí (typicky 50–200).
  • Periodicky (např. jednou denně) pro agregáty s vysokou frekvencí událostí.
  • Na vyžádání - jako optimalizační krok po migraci nebo importu dat.

PHP: Snapshot třída a repozitář se snapshot podporou

<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing;

use DateTimeImmutable;

/**
 * Snapshot uchovává serializovaný stav agregátu v konkrétní verzi event streamu.
 */
final class Snapshot
{
    public function __construct(
        public readonly string $aggregateId,
        public readonly string $aggregateType,
        public readonly int $version,
        public readonly array $state,       // serializovaný stav agregátu
        public readonly DateTimeImmutable $takenAt,
    ) {}
}
<?php

declare(strict_types=1);

namespace App\Infrastructure\Ordering;

use App\Ordering\Domain\Order;
use App\Infrastructure\EventSourcing\EventStore;
use App\Infrastructure\EventSourcing\Snapshot;
use App\Infrastructure\EventSourcing\SnapshotStore;
use App\Infrastructure\EventSourcing\EventSerializer;

final class SnapshottingOrderRepository
{
    private const AGGREGATE_TYPE    = 'ordering.order';
    private const SNAPSHOT_INTERVAL = 50; // snapshot každých 50 událostí

    public function __construct(
        private readonly EventStore    $eventStore,
        private readonly SnapshotStore $snapshotStore,
        private readonly EventSerializer $serializer,
    ) {}

    public function load(string $orderId): Order
    {
        // 1. Pokusíme se načíst snapshot
        $snapshot = $this->snapshotStore->findLatest($orderId, self::AGGREGATE_TYPE);

        if ($snapshot !== null) {
            // 2a. Máme snapshot - přehrajeme pouze události novější než snapshot
            $fromVersion = $snapshot->version + 1;
            $aggregate   = Order::reconstituteFromSnapshot($snapshot->state);
        } else {
            // 2b. Nemáme snapshot - přehrajeme celý event stream od začátku
            $fromVersion = 1;
            $aggregate   = null;
        }

        $envelopes = $this->eventStore->loadStream($orderId, $fromVersion);

        if (empty($envelopes) && $aggregate === null) {
            throw new \DomainException("Order {$orderId} not found.");
        }

        if (!empty($envelopes)) {
            $events = array_map(
                fn($e) => $this->serializer->toEvent($e),
                $envelopes,
            );

            if ($aggregate !== null) {
                $aggregate->replayEvents($events);
            } else {
                $aggregate = Order::reconstituteFromEvents($events);
            }
        }

        return $aggregate;
    }

    public function save(Order $order): void
    {
        $newEvents = $order->releaseDomainEvents();

        if (empty($newEvents)) {
            return;
        }

        $expectedVersion = $order->version() - count($newEvents);

        $this->eventStore->append(
            $order->orderId(),
            self::AGGREGATE_TYPE,
            $newEvents,
            $expectedVersion,
        );

        // Automatické snapshotování
        if ($order->version() % self::SNAPSHOT_INTERVAL === 0) {
            $this->snapshotStore->save(new Snapshot(
                aggregateId:   $order->orderId(),
                aggregateType: self::AGGREGATE_TYPE,
                version:       $order->version(),
                state:         $order->toSnapshot(),
                takenAt:       new \DateTimeImmutable('now', new \DateTimeZone('UTC')),
            ));
        }
    }
}

Aby byl snapshotting funkční, musí agregát implementovat metody toSnapshot(): array (serializace aktuálního stavu) a statickou reconstituteFromSnapshot(array $state): static (deserializace). Na rozdíl od reconstituteFromEvents() tato metoda nevytváří apply*() volání - přímo nastaví properties z uloženého snímku. Je proto nezbytné zajistit, aby se formát snapshotu vyvíjel v souladu se změnami doménového modelu.

Verzování událostí (Event Versioning)

V Event Sourcingu jsou události permanentní - jednou uložené do Event Store se nikdy nemažou ani nepřepisují. Zároveň se ale doménový model v čase vyvíjí: přibývají nové atributy, mění se struktura dat, původní pole se rozdělují nebo slučují. Vzniká tak praktický problém: jak přečíst starou událost novým kódem?

Řešením je event versioning - strategie, která zajistí zpětnou čitelnost starých událostí i po změně jejich schématu. Nejrozšířenějším vzorem je upcasting: při deserializaci se starší verze payloadu transformuje na aktuální formát, takže doménový model vždy pracuje pouze s nejnovější verzí události.

Proč je verzování nezbytné

  • Append-only princip - Události v Event Store nelze měnit. Pokud změníte schéma události, stará data zůstávají v původním formátu navždy.
  • Replay a projekce - Při přebudování projekcí nebo replay agregátu se přehrávají všechny historické události, včetně těch z prvních verzí systému.
  • Dlouhověkost systému - Event-sourcovaný systém může běžet roky. Za tu dobu se business požadavky změní mnohokrát a schémata událostí se musejí vyvíjet spolu s nimi.

Vzor Upcaster

Upcaster je objekt zodpovědný za transformaci payloadu události z jedné verze do následující. Upcasters se řetězí: pokud existuje událost ve verzi 1 a aktuální verze je 3, proběhne transformace v1 → v2 → v3. Upcasting se provádí při čtení (deserializaci), nikoli při zápisu - původní data v Event Store zůstávají nedotčena.

PHP: Interface EventUpcaster

<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing\Versioning;

/**
 * Upcaster transformuje payload události ze starší verze na novější.
 * Každý upcaster je zodpovědný za přesně jeden přechod verze (např. v1 → v2).
 */
interface EventUpcaster
{
    /**
     * Typ události, na který se upcaster vztahuje (např. "identity.user_registered").
     */
    public function eventType(): string;

    /**
     * Zdrojová verze payloadu, kterou tento upcaster transformuje.
     */
    public function fromVersion(): int;

    /**
     * Cílová verze payloadu po transformaci.
     */
    public function toVersion(): int;

    /**
     * Transformuje payload ze zdrojové verze na cílovou.
     *
     * @param array<string, mixed> $payload Data události ve zdrojové verzi.
     * @return array<string, mixed> Data události v cílové verzi.
     */
    public function upcast(array $payload): array;
}

Konkrétní příklad: rozdělení pole fullName

Představme si reálnou situaci: při spuštění systému událost UserRegistered obsahovala pole fullName (celé jméno jako jeden řetězec). Později business požadoval rozlišit křestní jméno a příjmení - vznikla verze 2 se dvěma poli firstName a lastName. V Event Store ale stále existují tisíce událostí v1 s polem fullName.

PHP: Upcaster pro UserRegistered v1 → v2

<?php

declare(strict_types=1);

namespace App\Infrastructure\Identity\Versioning;

use App\Infrastructure\EventSourcing\Versioning\EventUpcaster;

/**
 * Transformuje UserRegistered v1 (fullName) na v2 (firstName + lastName).
 *
 * Strategie rozdělení: první slovo je firstName, zbytek lastName.
 * Pokud jméno obsahuje pouze jedno slovo, lastName se nastaví na prázdný řetězec.
 */
final readonly class UserRegisteredV1ToV2Upcaster implements EventUpcaster
{
    public function eventType(): string
    {
        return 'identity.user_registered';
    }

    public function fromVersion(): int
    {
        return 1;
    }

    public function toVersion(): int
    {
        return 2;
    }

    /**
     * @param array<string, mixed> $payload
     * @return array<string, mixed>
     */
    public function upcast(array $payload): array
    {
        $fullName = $payload['fullName'] ?? '';
        $parts    = explode(' ', trim($fullName), 2);

        $payload['firstName'] = $parts[0];
        $payload['lastName']  = $parts[1] ?? '';

        // Odstraníme původní pole - v2 schéma jej již nepoužívá
        unset($payload['fullName']);

        return $payload;
    }
}

PHP: UpcasterChain - řetězení upcasterů při deserializaci

<?php

declare(strict_types=1);

namespace App\Infrastructure\EventSourcing\Versioning;

/**
 * Řetězí upcasters a transformuje payload z libovolné historické verze
 * na aktuální verzi. Upcasters se aplikují postupně: v1 → v2 → v3 → …
 */
final readonly class UpcasterChain
{
    /** @var array<string, array<int, EventUpcaster>> Klíč = eventType, vnitřní klíč = fromVersion */
    private array $upcasters;

    /**
     * @param EventUpcaster[] $upcasters
     */
    public function __construct(array $upcasters)
    {
        $map = [];

        foreach ($upcasters as $upcaster) {
            $map[$upcaster->eventType()][$upcaster->fromVersion()] = $upcaster;
        }

        $this->upcasters = $map;
    }

    /**
     * Aplikuje všechny relevantní upcasters na payload.
     *
     * @param string              $eventType      Typ události (např. "identity.user_registered").
     * @param int                 $schemaVersion  Verze payloadu uloženého v Event Store.
     * @param array<string, mixed> $payload        Původní payload z Event Store.
     * @return array<string, mixed> Transformovaný payload v aktuální verzi.
     */
    public function upcast(string $eventType, int $schemaVersion, array $payload): array
    {
        if (!isset($this->upcasters[$eventType])) {
            return $payload;
        }

        $version = $schemaVersion;

        while (isset($this->upcasters[$eventType][$version])) {
            $upcaster = $this->upcasters[$eventType][$version];
            $payload  = $upcaster->upcast($payload);
            $version  = $upcaster->toVersion();
        }

        return $payload;
    }
}

V praxi se UpcasterChain integruje do EventSerializer: při deserializaci se z uloženého záznamu přečte event_type a schema_version, payload projde řetězem upcasterů a teprve výsledná transformovaná data se předají konstruktoru aktuální třídy události.

Weak vs. strong schema strategie

Při verzování událostí existují dva základní přístupy, jak přistupovat ke schématu payloadu:

  • Weak schema (slabé schéma) - Payload je uložen jako volný JSON bez formální definice. Upcasters transformují data ad-hoc. Výhodou je flexibilita a rychlost vývoje; nevýhodou je, že chyby v transformaci se projeví až za běhu a je obtížné ověřit konzistenci napříč verzemi.
  • Strong schema (silné schéma) - Každá verze události má explicitně definované schéma (např. pomocí JSON Schema nebo PHP třídy s validací). Upcaster pak transformuje mezi dvěma dobře definovanými strukturami. Výhodou je vyšší bezpečnost a možnost automatického testování kompatibility; nevýhodou je vyšší režie při každé změně schématu.

Pro většinu projektů je rozumným kompromisem kombinace obou přístupů: silné schéma pro kritické události v Core Doméně (finanční transakce, stavy objednávek) a slabé schéma pro méně kritické události v podpůrných kontextech (notifikace, logy aktivit).

Kdy použít Event Sourcing

Event Sourcing je mocný nástroj, ale s výraznou přidanou složitostí. Před jeho adopcí je nutné pečlivě zvážit, zda přínosy pro daný kontext převažují nad náklady na implementaci a provoz.

Vhodné use cases

  • Auditní log jako business požadavek - Finanční systémy, zdravotnické záznamy, nebo jakákoli doména, kde je zákonná povinnost uchovávat kompletní historii změn. ES auditní log poskytuje přirozeně a bez nutnosti extra implementace.
  • Komplexní doménová logika s bohatými stavovými přechody - Agregáty procházejí mnoha stavy, každý přechod má svou sémantiku a musí být rekonstruovatelný. Typicky: objednávkové systémy, workflow enginy, bankovní transakce.
  • Temporální dotazy - Potřeba "přehrát" stav systému k libovolnému bodu v minulosti (debugging, analýza, "what-if" scénáře). U ES stačí replay eventů do daného timestampu.
  • Event-driven integrace - Systém produkuje události, které konzumují jiné bounded contexts nebo externí systémy. ES zajišťuje, že žádná událost nebude ztracena - Event Store je zdrojem pravdy pro integraci.
  • CQRS s vysokou čtecí zátěží - ES umožňuje vybudovat libovolný počet optimalizovaných read modelů z jednoho event streamu, aniž by bylo nutné měnit write model.

Nevhodné use cases

  • Jednoduché CRUD aplikace - Pokud doménová logika spočívá v základních operacích Create/Read/Update/Delete bez složitých stavových přechodů, ES přináší jen zbytečnou složitost.
  • Systémy orientované převážně na reporting - Pokud je primárním požadavkem rychlé čtení a agregace dat (BI, analytics), jsou vhodnější klasická DW řešení nebo OLAP databáze.
  • Prototypy a MVP - Rychlá validace business nápadu nepotřebuje složitou infrastrukturu. ES lze přidat do zralého systému incrementálně, pokud se ukáže potřeba - viz Migrace z CRUD.
  • Týmy bez zkušeností s ES - Implementace Event Sourcingu bez předchozí zkušenosti přináší vysoké riziko chyb v kritické infrastruktuře (Event Store, serializace, versioning). Doporučuje se začít s menším bounded contextem jako experimentem.