componenta/cqrs
Composer 安装命令:
composer require componenta/cqrs
包简介
CQRS (Command Query Responsibility Segregation) library for PHP 8.4+
README 文档
README
componenta/cqrs executes commands and queries in a CQRS application. It separates state-changing use cases from read use cases: a command goes to the command bus, a query goes to the query bus, and a handler does the actual work.
Use componenta/cqrs when handlers are registered explicitly through maps. Use componenta/cqrs-app when an application needs to discover #[AsCommandHandler], #[AsQueryHandler], #[AsCommandListener], compile handler maps into cache, or register the console worker command.
Installation
composer require componenta/cqrs
Dependencies
| Dependency | Purpose |
|---|---|
PHP >=8.4 |
Modern language features and strict types. |
componenta/arrayable |
Shared array conversion contract. |
componenta/config |
CQRS config keys and environment access. |
componenta/di |
Handler/listener invocation through CallableInvokerInterface. |
componenta/iterator |
Iterator helpers for locators. |
componenta/policy |
Authorization in command and query middleware. |
componenta/reflection |
Property access for lock keys and metadata. |
cycle/database |
Transactions and DatabaseTransport. |
psr/container |
Service lookup. |
psr/log |
Worker and transport logging. |
symfony/lock |
Resource locks for #[Lock]. |
ramsey/uuid |
Operation and envelope identifiers. |
To customize command serialization, implement CommandSerializerInterface. No serializer package is required.
Core Concepts
If you have not used CQRS in Componenta before, start here:
| Concept | Meaning |
|---|---|
| Command | Object with data for an action that changes state: create a post, publish a page, send an email. |
| Query | Object with data for a read operation: get a post, list comments, load site settings. |
| Handler | Class or callable that handles one command or one query. |
| Bus | Service that receives commands or queries. CommandBusInterface returns OperationInterface; QueryBusInterface returns the handler result. |
| Middleware | Pipeline step around the handler. It can check permissions, open a transaction, queue a command, retry a transient failure, or emit events. |
| Handler map | Array that connects a command/query class to its handler. In production this is the fastest explicit registration path. |
| Attribute | PHP attribute such as #[Async], #[Retry], #[Lock], #[AsCommandHandler]. The runtime package uses execution attributes; cqrs-app can discover handlers by attributes. |
| Actor | User or system subject on whose behalf a command or query runs. Policy middleware uses it for authorization. |
#[Allow] |
Attribute from componenta/policy that explicitly allows a command or query. This is the recommended way to mark public flows. |
ATTR_SKIP_POLICY |
Technical per-call flag that tells policy middleware not to run authorization for this command or query. Use it for worker re-processing and rare integration cases, not as the main way to make a flow public. |
Minimal setup without discovery: define handler maps in config, register Componenta\CQRS\ConfigProvider, fetch CommandBusInterface or QueryBusInterface from the container, then call dispatch() or handle().
Application Checklist
- Install
componenta/cqrs. - Register
Componenta\CQRS\ConfigProviderin the container configuration. - Create command and query classes.
- Create handlers for those commands and queries.
- Register handler maps with
ConfigKey::COMMAND_HANDLER_MAPandConfigKey::QUERY_HANDLER_MAP, or installcomponenta/cqrs-appand use discovery attributes. - Configure command and query middleware with
ConfigKey::COMMAND_MIDDLEWARESandConfigKey::QUERY_MIDDLEWARES. - If authorization is enabled, register
Componenta\Policy\ConfigProviderand configurecomponenta/policy: pass an actor for protected flows, use#[Allow]for public flows, and keep missing-policy behavior atMissingPolicyBehavior::DENYunless the application intentionally chooses another rule. - If async commands are needed, configure a transport, command serializer, and worker.
Quick Start
The smallest setup without middleware, authorization, or async execution consists of a command, a handler, a handler map, and a command-bus call.
final readonly class CalculateTotalCommand { public function __construct( public int $price, public int $quantity, ) {} } final readonly class CalculateTotalHandler { public function __invoke(CalculateTotalCommand $command): int { return $command->price * $command->quantity; } }
Register the CQRS provider and the handler map:
use Componenta\CQRS\ConfigKey; return [ new Componenta\CQRS\ConfigProvider(), ConfigKey::COMMAND_HANDLER_MAP => [ CalculateTotalCommand::class => [CalculateTotalHandler::class, '__invoke'], ], ];
After the container is built, fetch the command bus and dispatch the command:
use Componenta\CQRS\Command\CommandBusInterface; /** @var CommandBusInterface $commands */ $commands = $container->get(CommandBusInterface::class); $operation = $commands->dispatch(new CalculateTotalCommand(price: 50, quantity: 3)); $operation->result?->value; // 150
When a command runs synchronously, the handler result is available in OperationInterface::$result. For an async command, result is null because the worker will run the handler later.
Configuration
Register the runtime provider:
return [ new Componenta\CQRS\ConfigProvider(), ];
The provider registers:
| Service | Purpose |
|---|---|
CommandBusInterface |
Dispatches commands through command middleware. |
QueryBusInterface |
Handles queries through query middleware. |
CommandHandlerLocatorInterface |
Reads ConfigKey::COMMAND_HANDLER_MAP. |
CommandListenersLocatorInterface |
Reads ConfigKey::COMMAND_LISTENER_MAP. |
QueryHandlerLocatorInterface |
Reads ConfigKey::QUERY_HANDLER_MAP. |
CommandAttributeProviderInterface |
Reads #[Async], #[Retry], and #[Lock] from cache or reflection. |
Important keys:
| Key | Value |
|---|---|
ConfigKey::COMMAND_MIDDLEWARES |
Command middleware list in execution order. |
ConfigKey::QUERY_MIDDLEWARES |
Query middleware list in execution order. |
ConfigKey::COMMAND_HANDLER_MAP |
`Command::class => callable |
ConfigKey::COMMAND_LISTENER_MAP |
Command event listener map. |
ConfigKey::QUERY_HANDLER_MAP |
`Query::class => callable |
ConfigKey::COMMAND_ATTRIBUTE_MAP |
Compiled metadata for #[Async], #[Retry], and #[Lock]. |
ConfigKey::COMPILED_MAPS |
Enables compiled maps. Can be overridden with CQRS_COMPILED_MAPS. |
Example without discovery:
use Componenta\CQRS\Command\Middleware\EventMiddleware; use Componenta\CQRS\Command\Middleware\PolicyMiddleware; use Componenta\CQRS\ConfigKey; return [ ConfigKey::COMMAND_HANDLER_MAP => [ PublishPostCommand::class => [PublishPostHandler::class, '__invoke'], ], ConfigKey::QUERY_HANDLER_MAP => [ GetPostQuery::class => [GetPostHandler::class, '__invoke'], ], ConfigKey::COMMAND_MIDDLEWARES => [ PolicyMiddleware::class, EventMiddleware::class, ], ];
Commands
A command is an object with data for a state-changing use case. When you pass it to CommandBusInterface::dispatch(), the bus returns OperationInterface with the operation id, execution attributes, and handler result when the command runs synchronously.
use Componenta\CQRS\Command\CommandBusInterface; final readonly class PublishPostController { public function __construct( private CommandBusInterface $commands, ) {} public function __invoke(PublishPostCommand $command): mixed { return $this->commands->dispatch($command)->result?->value; } }
CommandBusInterface::dispatch(object $command, array $attributes = []) accepts a command and per-call attributes. Middleware uses those attributes to receive the actor for policy checks or force synchronous execution in a worker. For public flows, normally put #[Allow] on the command or query; keep ATTR_SKIP_POLICY for technical re-processing where authorization already happened earlier. The method returns OperationInterface with the operation id, execution attributes, and handler result when the command runs synchronously.
Operation
OperationInterface stores command execution state. It is immutable: withAttribute(), withAttributes(), and withResult() return a new instance.
| Property | Type | Meaning |
|---|---|---|
id |
UuidInterface |
Operation id. Operation::create() generates UUID v7. |
command |
object |
Command being executed. |
startedAt |
DateTimeImmutable |
UTC creation time. |
attributes |
array<string, mixed> |
Per-dispatch context. |
result |
?OperationResult |
Handler result. Null for queued async commands. |
OperationResult stores the result of a completed synchronous command:
| Property | Type | Meaning |
|---|---|---|
value |
mixed |
Value returned by the command handler. |
processedAt |
DateTimeImmutable |
UTC completion time. Created automatically when not passed explicitly. |
withResult() is single-assignment and fails if the operation already has a result.
Command Handlers
Handlers are resolved through CommandHandlerLocatorInterface. A map entry can be a callable or a [class-string, method-string] pair. Class/method pairs are resolved lazily through the container, so the locator needs a container for compiled maps.
use Componenta\CQRS\Command\Locator\CommandHandlerLocator; use Componenta\CQRS\Command\Locator\CommandHandlerLocatorInterface; use Psr\Container\ContainerInterface; return [ CommandHandlerLocatorInterface::class => static fn (ContainerInterface $container) => new CommandHandlerLocator( map: [ PublishPostCommand::class => [PublishPostHandler::class, '__invoke'], ], container: $container, ), ];
#[AsCommandHandler(?string $command = null)] is discovery metadata for the application package. If the command is omitted, cqrs-app infers it from the handler parameter.
Command Middleware
Command middleware implements:
public function execute(OperationInterface $operation, OperationHandlerInterface $handler): OperationInterface;
Common middleware:
| Middleware | Responsibility |
|---|---|
PolicyMiddleware |
Checks authorization before command execution. |
TransportMiddleware |
Queues commands marked with #[Async]. |
RetryMiddleware |
Retries commands marked with #[Retry] when the exception implements RetryableExceptionInterface. |
ResourceLockMiddleware |
Acquires locks for commands marked with #[Lock]. |
SequentialMiddleware |
Runs nested dispatches after the root command. |
TransactionMiddleware |
Wraps execution in a cycle/database transaction. |
EventMiddleware |
Emits command lifecycle events. |
HandleCommandHandler |
Terminal handler that locates and invokes the command handler. Do not add it to ConfigKey::COMMAND_MIDDLEWARES; the bus factory wires it separately. |
Order is behavior. Keep authorization, transport, retry, locking, transactions, and events in middleware instead of hiding them in handlers.
Command Attributes
| Attribute | Target | Constructor | Behavior |
|---|---|---|---|
#[AsCommandHandler] |
class or method | ?string $command = null |
Discovery metadata for command handlers. |
#[AsCommandListener] |
class | string $command, int $priority = 0, array $eventTypes = [] |
Discovery metadata for command event listeners. |
#[Async] |
command class | string $transport = 'default', int $delay = 0 |
Serializes and queues the command. |
#[Retry] |
command class | int $attempts = 3, int $delayMs = 100, float $multiplier = 1.0, int $maxDelayMs = 10000 |
Retries transient failures. |
#[Lock] |
command class | string $key, float $ttl = 300.0, bool $blocking = true |
Acquires a computed resource lock. |
#[Can] |
command class | `string | array $permissions, PermissionMode $mode = PermissionMode::ALL` |
use Componenta\CQRS\Command\Attribute\Async; use Componenta\CQRS\Command\Attribute\Lock; use Componenta\CQRS\Command\Attribute\Retry; #[Async(transport: 'emails', delay: 60)] #[Retry(attempts: 3, delayMs: 100, multiplier: 2.0)] #[Lock('email:{userId}', ttl: 30.0)] final readonly class SendWelcomeEmailCommand { public function __construct( public int $userId, ) {} }
#[Lock] replaces {property} placeholders with command property values through reflection.
Policy Checks
To use PolicyMiddleware, the application must register Componenta\Policy\ConfigProvider. It provides PolicyEnforcer, policy providers, and ActorProviderInterface.
PolicyMiddleware resolves the actor in this order:
PolicyMiddleware::ATTR_ACTORoperation attribute.ActorAwareInterfaceimplemented by the command.ActorProviderInterfaceinjected into middleware.
It adds PolicyMiddleware::ATTR_COMMAND and PolicyMiddleware::ATTR_OPERATION to policy context.
For public flows, prefer an explicit #[Allow] policy on the command or query:
use Componenta\Policy\Policies\Allow; #[Allow] final readonly class GetPublicPostsQuery { public function __construct( public int $page = 1, ) {} }
That keeps the code self-documenting: the flow is public because it has an explicit access policy, not because a controller bypassed authorization.
PolicyMiddleware::ATTR_SKIP_POLICY => true skips authorization for one call. Pass it in command attributes or query context only when authorization already happened earlier or cannot be evaluated in the current process. The main example is a worker: the original HTTP request passed authorization and queued the command, then the worker later executes it without an HTTP session or current user.
Do not use this flag as a shortcut around access errors. For protected flows, pass the actor through PolicyMiddleware::ATTR_ACTOR, implement ActorAwareInterface, or configure ActorProviderInterface.
Missing-policy behavior is configured in componenta/policy through MissingPolicyBehavior:
| Mode | Behavior |
|---|---|
MissingPolicyBehavior::DENY |
If a command or query has no policy, access is denied. This is the safe default for applications. |
MissingPolicyBehavior::ALLOW |
If no policy exists, access is allowed. Use only when this is an intentional application rule. |
For one call, behavior can be overridden through PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR in policy context, but public commands and queries should normally use #[Allow] instead of setting ALLOW from the controller.
If a per-call override is really needed, pass it through policy context:
use Componenta\CQRS\Command\Middleware\PolicyMiddleware as CommandPolicyMiddleware; use Componenta\CQRS\Query\Middleware\PolicyMiddleware as QueryPolicyMiddleware; use Componenta\Policy\MissingPolicyBehavior; use Componenta\Policy\PolicyEnforcer; $commands->dispatch($command, [ CommandPolicyMiddleware::ATTR_CONTEXT => [ PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR => MissingPolicyBehavior::ALLOW, ], ]); $queries->handle($query, [ QueryPolicyMiddleware::ATTR_POLICY_CONTEXT => [ PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR => MissingPolicyBehavior::ALLOW, ], ]);
This override affects only the current call. Use #[Allow] for permanent public flows.
Async Execution
TransportMiddleware reads #[Async] through CommandAttributeProviderInterface.
| Case | Behavior |
|---|---|
Command without #[Async] |
Runs synchronously and gets ExecutionMode::SYNC. |
Command with #[Async] |
Is serialized into an Envelope, sent to transport, and gets ExecutionMode::ASYNC. The handler is not called. |
| Worker message | Is re-dispatched with ExecutionMode::SYNC for immediate execution. |
use Componenta\CQRS\Command\Transport\DatabaseTransport; use Componenta\CQRS\Command\Transport\TransportRegistry; $registry = new TransportRegistry(); $registry->register('emails', new DatabaseTransport($database, 'emails'));
ConfigProvider registers the TransportMiddleware factory, but it does not create the transport registry or command serializer for the application. If you add TransportMiddleware to ConfigKey::COMMAND_MIDDLEWARES, the container must return services for TransportRegistryInterface::class and CommandSerializerInterface::class.
use Componenta\CQRS\Command\Transport\CommandSerializerInterface; use Componenta\CQRS\Command\Transport\DatabaseTransport; use Componenta\CQRS\Command\Transport\JsonCommandSerializer; use Componenta\CQRS\Command\Transport\TransportRegistry; use Componenta\CQRS\Command\Transport\TransportRegistryInterface; $registry = new TransportRegistry(); $registry->register('emails', new DatabaseTransport($database, 'emails')); $serializer = new JsonCommandSerializer();
In your container, $registry must be available as TransportRegistryInterface::class, and $serializer must be available as CommandSerializerInterface::class.
Transport contracts:
| Contract | Methods |
|---|---|
TransportInterface |
send(), get(), ack(), reject() |
TransportRegistryInterface |
get(), has() |
TransportRegistry |
register(), get(), has() |
CommandSerializerInterface |
serialize(), deserialize() |
DatabaseTransport uses command_transport and command_transport_failed. Successful messages are removed through ack(), failed messages are moved through reject().
JsonCommandSerializer serializes initialized public properties to JSON and restores commands through constructors. Replace the CommandSerializerInterface service when a command needs another payload format.
Worker
$worker = new CommandWorker($bus, $serializer, $transport, $logger); $processed = $worker->processOne(); $worker->run(sleep: 1); $worker->stop();
The cqrs:worker console command is registered by componenta/cqrs-app. The runtime package contains the queue processor but has no Symfony Console dependency.
Command Events
| Event | When |
|---|---|
CommandProcessEvent |
Before command execution. |
CommandProcessedEvent |
After successful execution. |
CommandFailedEvent |
After failure, before the exception is rethrown. |
Listeners are resolved through CommandListenersLocatorInterface. EventMiddleware suppresses listener failures by default. Use suppressExceptions: false for fail-fast development behavior.
Queries
Queries describe read use cases and return the handler result directly.
use Componenta\CQRS\Query\QueryBusInterface; $post = $queries->handle(new GetPostQuery($id));
QueryBusInterface::handle(object $query, ContextInterface|array $context = []) converts arrays to immutable Context.
Query Context
Use query context for per-call flags: actor overrides, sparse fieldsets, and technical read options. For public queries, put #[Allow] on the query class. Keep PolicyMiddleware::ATTR_SKIP_POLICY for technical cases where authorization already happened earlier or cannot be evaluated in the current process.
use Componenta\CQRS\Query\Context\Context; use Componenta\CQRS\Query\Middleware\PolicyMiddleware; $context = new Context([ PolicyMiddleware::ATTR_ACTOR => $actor, ]); $context = $context->withAttribute('preview', ['title', 'summary']);
Query Handlers
Handlers are resolved through QueryHandlerLocatorInterface.
use Componenta\CQRS\ConfigKey; return [ ConfigKey::QUERY_HANDLER_MAP => [ GetPostQuery::class => [GetPostHandler::class, '__invoke'], ], ];
#[AsQueryHandler(?string $query = null)] is discovery metadata used by cqrs-app.
Query policy middleware uses the same actor lookup order: ATTR_ACTOR in the call context, then ActorAwareInterface on the query, then ActorProviderInterface. It adds Componenta\CQRS\Query\Middleware\PolicyMiddleware::ATTR_QUERY to policy context.
Command Metadata
CommandAttributeProviderInterface exposes #[Async], #[Retry], and #[Lock] metadata.
| Provider | Use case |
|---|---|
ReflectionCommandAttributeProvider |
Compiled map disabled or empty. |
CompiledCommandAttributeProvider |
ConfigKey::COMMAND_ATTRIBUTE_MAP exists and compiled maps are enabled. |
CommandAttributeMapCompiler from cqrs-app can build the array for ConfigKey::COMMAND_ATTRIBUTE_MAP, but it is not a class-finder listener compiler by itself. The cache builder must pass command classes to it and write the result into config.
Extension Points
Public interfaces let an application replace individual CQRS parts without changing commands, queries, or handlers:
| Replace | Contract | When to use |
|---|---|---|
| Command name | NamedCommandInterface or CommandNameResolverInterface |
The handler-map key must differ from the command class FQCN. |
| Query name | NamedQueryInterface or QueryNameResolverInterface |
The handler-map key must differ from the query class FQCN. |
| Policy action id | ActionIdResolverInterface |
Authorization must check an action id other than the command/query FQCN. |
| Command chain step | Componenta\CQRS\Command\Middleware\MiddlewareInterface |
A custom step must run around command execution. |
| Query chain step | Componenta\CQRS\Query\Middleware\MiddlewareInterface |
A custom step must run around query execution. |
| Queue transport | TransportInterface |
Queued commands must be stored outside DatabaseTransport. |
| Command serialization | CommandSerializerInterface |
Public-property JSON is not the right command format. |
| Command event listeners | CommandListenerInterface |
A side effect should react to command events without changing the command handler. |
| Command metadata | CommandAttributeProviderInterface |
#[Async], #[Retry], and #[Lock] must come from a source other than reflection or the standard compiled map. |
When the handler key must be a stable string name instead of the class FQCN, implement NamedCommandInterface or NamedQueryInterface:
use Componenta\CQRS\Command\NamedCommandInterface; final class RebuildSearchIndexCommand implements NamedCommandInterface { public string $commandName { get => 'search.rebuild'; } }
The handler map then uses that name:
use Componenta\CQRS\ConfigKey; return [ ConfigKey::COMMAND_HANDLER_MAP => [ 'search.rebuild' => [RebuildSearchIndexHandler::class, '__invoke'], ], ];
Queries use the same idea through NamedQueryInterface and the queryName property:
use Componenta\CQRS\Query\NamedQueryInterface; final class GetPublicFeedQuery implements NamedQueryInterface { public string $queryName { get => 'feed.public'; } }
If authorization must use another action id, replace ActionIdResolverInterface in the container:
use Componenta\CQRS\Resolver\ActionIdResolverInterface; final readonly class AppActionIdResolver implements ActionIdResolverInterface { public function resolve(object $subject): string { return match ($subject::class) { PublishPostCommand::class => 'posts.publish', GetPublicFeedQuery::class => 'feed.read', default => $subject::class, }; } }
OperationFactoryInterface exists in the package, but the current CommandBus creates operations directly through Operation::create(). Do not treat it as a bus customization point until the bus factory wires it.
Failures
| Failure | Exception |
|---|---|
| Missing command handler | Componenta\CQRS\Command\Exception\HandlerNotFoundException |
| Invalid command handler | InvalidHandlerException |
| Missing query handler | Componenta\CQRS\Query\Exception\HandlerNotFoundException |
| Missing actor for policy | AuthenticationRequiredException |
| Lock key cannot be built | LockKeyResolutionException |
| Lock cannot be acquired | LockAcquisitionException |
| Missing transport | TransportNotFoundException |
| Serialization or delivery failure | TransportException |
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 1
- 依赖项目数: 3
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-14