定制 componenta/cqrs 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

componenta/cqrs

Composer 安装命令:

composer require componenta/cqrs

包简介

CQRS (Command Query Responsibility Segregation) library for PHP 8.4+

README 文档

README

componenta/cqrs executes commands and queries in a CQRS application. It separates state-changing use cases from read use cases: a command goes to the command bus, a query goes to the query bus, and a handler does the actual work.

Use componenta/cqrs when handlers are registered explicitly through maps. Use componenta/cqrs-app when an application needs to discover #[AsCommandHandler], #[AsQueryHandler], #[AsCommandListener], compile handler maps into cache, or register the console worker command.

Installation

composer require componenta/cqrs

Dependencies

Dependency Purpose
PHP >=8.4 Modern language features and strict types.
componenta/arrayable Shared array conversion contract.
componenta/config CQRS config keys and environment access.
componenta/di Handler/listener invocation through CallableInvokerInterface.
componenta/iterator Iterator helpers for locators.
componenta/policy Authorization in command and query middleware.
componenta/reflection Property access for lock keys and metadata.
cycle/database Transactions and DatabaseTransport.
psr/container Service lookup.
psr/log Worker and transport logging.
symfony/lock Resource locks for #[Lock].
ramsey/uuid Operation and envelope identifiers.

To customize command serialization, implement CommandSerializerInterface. No serializer package is required.

Core Concepts

If you have not used CQRS in Componenta before, start here:

Concept Meaning
Command Object with data for an action that changes state: create a post, publish a page, send an email.
Query Object with data for a read operation: get a post, list comments, load site settings.
Handler Class or callable that handles one command or one query.
Bus Service that receives commands or queries. CommandBusInterface returns OperationInterface; QueryBusInterface returns the handler result.
Middleware Pipeline step around the handler. It can check permissions, open a transaction, queue a command, retry a transient failure, or emit events.
Handler map Array that connects a command/query class to its handler. In production this is the fastest explicit registration path.
Attribute PHP attribute such as #[Async], #[Retry], #[Lock], #[AsCommandHandler]. The runtime package uses execution attributes; cqrs-app can discover handlers by attributes.
Actor User or system subject on whose behalf a command or query runs. Policy middleware uses it for authorization.
#[Allow] Attribute from componenta/policy that explicitly allows a command or query. This is the recommended way to mark public flows.
ATTR_SKIP_POLICY Technical per-call flag that tells policy middleware not to run authorization for this command or query. Use it for worker re-processing and rare integration cases, not as the main way to make a flow public.

Minimal setup without discovery: define handler maps in config, register Componenta\CQRS\ConfigProvider, fetch CommandBusInterface or QueryBusInterface from the container, then call dispatch() or handle().

Application Checklist

  1. Install componenta/cqrs.
  2. Register Componenta\CQRS\ConfigProvider in the container configuration.
  3. Create command and query classes.
  4. Create handlers for those commands and queries.
  5. Register handler maps with ConfigKey::COMMAND_HANDLER_MAP and ConfigKey::QUERY_HANDLER_MAP, or install componenta/cqrs-app and use discovery attributes.
  6. Configure command and query middleware with ConfigKey::COMMAND_MIDDLEWARES and ConfigKey::QUERY_MIDDLEWARES.
  7. If authorization is enabled, register Componenta\Policy\ConfigProvider and configure componenta/policy: pass an actor for protected flows, use #[Allow] for public flows, and keep missing-policy behavior at MissingPolicyBehavior::DENY unless the application intentionally chooses another rule.
  8. If async commands are needed, configure a transport, command serializer, and worker.

Quick Start

The smallest setup without middleware, authorization, or async execution consists of a command, a handler, a handler map, and a command-bus call.

final readonly class CalculateTotalCommand
{
    public function __construct(
        public int $price,
        public int $quantity,
    ) {}
}

final readonly class CalculateTotalHandler
{
    public function __invoke(CalculateTotalCommand $command): int
    {
        return $command->price * $command->quantity;
    }
}

Register the CQRS provider and the handler map:

use Componenta\CQRS\ConfigKey;

return [
    new Componenta\CQRS\ConfigProvider(),

    ConfigKey::COMMAND_HANDLER_MAP => [
        CalculateTotalCommand::class => [CalculateTotalHandler::class, '__invoke'],
    ],
];

After the container is built, fetch the command bus and dispatch the command:

use Componenta\CQRS\Command\CommandBusInterface;

/** @var CommandBusInterface $commands */
$commands = $container->get(CommandBusInterface::class);

$operation = $commands->dispatch(new CalculateTotalCommand(price: 50, quantity: 3));

$operation->result?->value; // 150

When a command runs synchronously, the handler result is available in OperationInterface::$result. For an async command, result is null because the worker will run the handler later.

Configuration

Register the runtime provider:

return [
    new Componenta\CQRS\ConfigProvider(),
];

The provider registers:

Service Purpose
CommandBusInterface Dispatches commands through command middleware.
QueryBusInterface Handles queries through query middleware.
CommandHandlerLocatorInterface Reads ConfigKey::COMMAND_HANDLER_MAP.
CommandListenersLocatorInterface Reads ConfigKey::COMMAND_LISTENER_MAP.
QueryHandlerLocatorInterface Reads ConfigKey::QUERY_HANDLER_MAP.
CommandAttributeProviderInterface Reads #[Async], #[Retry], and #[Lock] from cache or reflection.

Important keys:

Key Value
ConfigKey::COMMAND_MIDDLEWARES Command middleware list in execution order.
ConfigKey::QUERY_MIDDLEWARES Query middleware list in execution order.
ConfigKey::COMMAND_HANDLER_MAP `Command::class => callable
ConfigKey::COMMAND_LISTENER_MAP Command event listener map.
ConfigKey::QUERY_HANDLER_MAP `Query::class => callable
ConfigKey::COMMAND_ATTRIBUTE_MAP Compiled metadata for #[Async], #[Retry], and #[Lock].
ConfigKey::COMPILED_MAPS Enables compiled maps. Can be overridden with CQRS_COMPILED_MAPS.

Example without discovery:

use Componenta\CQRS\Command\Middleware\EventMiddleware;
use Componenta\CQRS\Command\Middleware\PolicyMiddleware;
use Componenta\CQRS\ConfigKey;

return [
    ConfigKey::COMMAND_HANDLER_MAP => [
        PublishPostCommand::class => [PublishPostHandler::class, '__invoke'],
    ],
    ConfigKey::QUERY_HANDLER_MAP => [
        GetPostQuery::class => [GetPostHandler::class, '__invoke'],
    ],
    ConfigKey::COMMAND_MIDDLEWARES => [
        PolicyMiddleware::class,
        EventMiddleware::class,
    ],
];

Commands

A command is an object with data for a state-changing use case. When you pass it to CommandBusInterface::dispatch(), the bus returns OperationInterface with the operation id, execution attributes, and handler result when the command runs synchronously.

use Componenta\CQRS\Command\CommandBusInterface;

final readonly class PublishPostController
{
    public function __construct(
        private CommandBusInterface $commands,
    ) {}

    public function __invoke(PublishPostCommand $command): mixed
    {
        return $this->commands->dispatch($command)->result?->value;
    }
}

CommandBusInterface::dispatch(object $command, array $attributes = []) accepts a command and per-call attributes. Middleware uses those attributes to receive the actor for policy checks or force synchronous execution in a worker. For public flows, normally put #[Allow] on the command or query; keep ATTR_SKIP_POLICY for technical re-processing where authorization already happened earlier. The method returns OperationInterface with the operation id, execution attributes, and handler result when the command runs synchronously.

Operation

OperationInterface stores command execution state. It is immutable: withAttribute(), withAttributes(), and withResult() return a new instance.

Property Type Meaning
id UuidInterface Operation id. Operation::create() generates UUID v7.
command object Command being executed.
startedAt DateTimeImmutable UTC creation time.
attributes array<string, mixed> Per-dispatch context.
result ?OperationResult Handler result. Null for queued async commands.

OperationResult stores the result of a completed synchronous command:

Property Type Meaning
value mixed Value returned by the command handler.
processedAt DateTimeImmutable UTC completion time. Created automatically when not passed explicitly.

withResult() is single-assignment and fails if the operation already has a result.

Command Handlers

Handlers are resolved through CommandHandlerLocatorInterface. A map entry can be a callable or a [class-string, method-string] pair. Class/method pairs are resolved lazily through the container, so the locator needs a container for compiled maps.

use Componenta\CQRS\Command\Locator\CommandHandlerLocator;
use Componenta\CQRS\Command\Locator\CommandHandlerLocatorInterface;
use Psr\Container\ContainerInterface;

return [
    CommandHandlerLocatorInterface::class => static fn (ContainerInterface $container) =>
        new CommandHandlerLocator(
            map: [
                PublishPostCommand::class => [PublishPostHandler::class, '__invoke'],
            ],
            container: $container,
        ),
];

#[AsCommandHandler(?string $command = null)] is discovery metadata for the application package. If the command is omitted, cqrs-app infers it from the handler parameter.

Command Middleware

Command middleware implements:

public function execute(OperationInterface $operation, OperationHandlerInterface $handler): OperationInterface;

Common middleware:

Middleware Responsibility
PolicyMiddleware Checks authorization before command execution.
TransportMiddleware Queues commands marked with #[Async].
RetryMiddleware Retries commands marked with #[Retry] when the exception implements RetryableExceptionInterface.
ResourceLockMiddleware Acquires locks for commands marked with #[Lock].
SequentialMiddleware Runs nested dispatches after the root command.
TransactionMiddleware Wraps execution in a cycle/database transaction.
EventMiddleware Emits command lifecycle events.
HandleCommandHandler Terminal handler that locates and invokes the command handler. Do not add it to ConfigKey::COMMAND_MIDDLEWARES; the bus factory wires it separately.

Order is behavior. Keep authorization, transport, retry, locking, transactions, and events in middleware instead of hiding them in handlers.

Command Attributes

Attribute Target Constructor Behavior
#[AsCommandHandler] class or method ?string $command = null Discovery metadata for command handlers.
#[AsCommandListener] class string $command, int $priority = 0, array $eventTypes = [] Discovery metadata for command event listeners.
#[Async] command class string $transport = 'default', int $delay = 0 Serializes and queues the command.
#[Retry] command class int $attempts = 3, int $delayMs = 100, float $multiplier = 1.0, int $maxDelayMs = 10000 Retries transient failures.
#[Lock] command class string $key, float $ttl = 300.0, bool $blocking = true Acquires a computed resource lock.
#[Can] command class `string array $permissions, PermissionMode $mode = PermissionMode::ALL`
use Componenta\CQRS\Command\Attribute\Async;
use Componenta\CQRS\Command\Attribute\Lock;
use Componenta\CQRS\Command\Attribute\Retry;

#[Async(transport: 'emails', delay: 60)]
#[Retry(attempts: 3, delayMs: 100, multiplier: 2.0)]
#[Lock('email:{userId}', ttl: 30.0)]
final readonly class SendWelcomeEmailCommand
{
    public function __construct(
        public int $userId,
    ) {}
}

#[Lock] replaces {property} placeholders with command property values through reflection.

Policy Checks

To use PolicyMiddleware, the application must register Componenta\Policy\ConfigProvider. It provides PolicyEnforcer, policy providers, and ActorProviderInterface.

PolicyMiddleware resolves the actor in this order:

  1. PolicyMiddleware::ATTR_ACTOR operation attribute.
  2. ActorAwareInterface implemented by the command.
  3. ActorProviderInterface injected into middleware.

It adds PolicyMiddleware::ATTR_COMMAND and PolicyMiddleware::ATTR_OPERATION to policy context.

For public flows, prefer an explicit #[Allow] policy on the command or query:

use Componenta\Policy\Policies\Allow;

#[Allow]
final readonly class GetPublicPostsQuery
{
    public function __construct(
        public int $page = 1,
    ) {}
}

That keeps the code self-documenting: the flow is public because it has an explicit access policy, not because a controller bypassed authorization.

PolicyMiddleware::ATTR_SKIP_POLICY => true skips authorization for one call. Pass it in command attributes or query context only when authorization already happened earlier or cannot be evaluated in the current process. The main example is a worker: the original HTTP request passed authorization and queued the command, then the worker later executes it without an HTTP session or current user.

Do not use this flag as a shortcut around access errors. For protected flows, pass the actor through PolicyMiddleware::ATTR_ACTOR, implement ActorAwareInterface, or configure ActorProviderInterface.

Missing-policy behavior is configured in componenta/policy through MissingPolicyBehavior:

Mode Behavior
MissingPolicyBehavior::DENY If a command or query has no policy, access is denied. This is the safe default for applications.
MissingPolicyBehavior::ALLOW If no policy exists, access is allowed. Use only when this is an intentional application rule.

For one call, behavior can be overridden through PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR in policy context, but public commands and queries should normally use #[Allow] instead of setting ALLOW from the controller.

If a per-call override is really needed, pass it through policy context:

use Componenta\CQRS\Command\Middleware\PolicyMiddleware as CommandPolicyMiddleware;
use Componenta\CQRS\Query\Middleware\PolicyMiddleware as QueryPolicyMiddleware;
use Componenta\Policy\MissingPolicyBehavior;
use Componenta\Policy\PolicyEnforcer;

$commands->dispatch($command, [
    CommandPolicyMiddleware::ATTR_CONTEXT => [
        PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR => MissingPolicyBehavior::ALLOW,
    ],
]);

$queries->handle($query, [
    QueryPolicyMiddleware::ATTR_POLICY_CONTEXT => [
        PolicyEnforcer::ATTR_MISSING_POLICY_BEHAVIOR => MissingPolicyBehavior::ALLOW,
    ],
]);

This override affects only the current call. Use #[Allow] for permanent public flows.

Async Execution

TransportMiddleware reads #[Async] through CommandAttributeProviderInterface.

Case Behavior
Command without #[Async] Runs synchronously and gets ExecutionMode::SYNC.
Command with #[Async] Is serialized into an Envelope, sent to transport, and gets ExecutionMode::ASYNC. The handler is not called.
Worker message Is re-dispatched with ExecutionMode::SYNC for immediate execution.
use Componenta\CQRS\Command\Transport\DatabaseTransport;
use Componenta\CQRS\Command\Transport\TransportRegistry;

$registry = new TransportRegistry();
$registry->register('emails', new DatabaseTransport($database, 'emails'));

ConfigProvider registers the TransportMiddleware factory, but it does not create the transport registry or command serializer for the application. If you add TransportMiddleware to ConfigKey::COMMAND_MIDDLEWARES, the container must return services for TransportRegistryInterface::class and CommandSerializerInterface::class.

use Componenta\CQRS\Command\Transport\CommandSerializerInterface;
use Componenta\CQRS\Command\Transport\DatabaseTransport;
use Componenta\CQRS\Command\Transport\JsonCommandSerializer;
use Componenta\CQRS\Command\Transport\TransportRegistry;
use Componenta\CQRS\Command\Transport\TransportRegistryInterface;

$registry = new TransportRegistry();
$registry->register('emails', new DatabaseTransport($database, 'emails'));

$serializer = new JsonCommandSerializer();

In your container, $registry must be available as TransportRegistryInterface::class, and $serializer must be available as CommandSerializerInterface::class.

Transport contracts:

Contract Methods
TransportInterface send(), get(), ack(), reject()
TransportRegistryInterface get(), has()
TransportRegistry register(), get(), has()
CommandSerializerInterface serialize(), deserialize()

DatabaseTransport uses command_transport and command_transport_failed. Successful messages are removed through ack(), failed messages are moved through reject().

JsonCommandSerializer serializes initialized public properties to JSON and restores commands through constructors. Replace the CommandSerializerInterface service when a command needs another payload format.

Worker

$worker = new CommandWorker($bus, $serializer, $transport, $logger);

$processed = $worker->processOne();
$worker->run(sleep: 1);
$worker->stop();

The cqrs:worker console command is registered by componenta/cqrs-app. The runtime package contains the queue processor but has no Symfony Console dependency.

Command Events

Event When
CommandProcessEvent Before command execution.
CommandProcessedEvent After successful execution.
CommandFailedEvent After failure, before the exception is rethrown.

Listeners are resolved through CommandListenersLocatorInterface. EventMiddleware suppresses listener failures by default. Use suppressExceptions: false for fail-fast development behavior.

Queries

Queries describe read use cases and return the handler result directly.

use Componenta\CQRS\Query\QueryBusInterface;

$post = $queries->handle(new GetPostQuery($id));

QueryBusInterface::handle(object $query, ContextInterface|array $context = []) converts arrays to immutable Context.

Query Context

Use query context for per-call flags: actor overrides, sparse fieldsets, and technical read options. For public queries, put #[Allow] on the query class. Keep PolicyMiddleware::ATTR_SKIP_POLICY for technical cases where authorization already happened earlier or cannot be evaluated in the current process.

use Componenta\CQRS\Query\Context\Context;
use Componenta\CQRS\Query\Middleware\PolicyMiddleware;

$context = new Context([
    PolicyMiddleware::ATTR_ACTOR => $actor,
]);

$context = $context->withAttribute('preview', ['title', 'summary']);

Query Handlers

Handlers are resolved through QueryHandlerLocatorInterface.

use Componenta\CQRS\ConfigKey;

return [
    ConfigKey::QUERY_HANDLER_MAP => [
        GetPostQuery::class => [GetPostHandler::class, '__invoke'],
    ],
];

#[AsQueryHandler(?string $query = null)] is discovery metadata used by cqrs-app.

Query policy middleware uses the same actor lookup order: ATTR_ACTOR in the call context, then ActorAwareInterface on the query, then ActorProviderInterface. It adds Componenta\CQRS\Query\Middleware\PolicyMiddleware::ATTR_QUERY to policy context.

Command Metadata

CommandAttributeProviderInterface exposes #[Async], #[Retry], and #[Lock] metadata.

Provider Use case
ReflectionCommandAttributeProvider Compiled map disabled or empty.
CompiledCommandAttributeProvider ConfigKey::COMMAND_ATTRIBUTE_MAP exists and compiled maps are enabled.

CommandAttributeMapCompiler from cqrs-app can build the array for ConfigKey::COMMAND_ATTRIBUTE_MAP, but it is not a class-finder listener compiler by itself. The cache builder must pass command classes to it and write the result into config.

Extension Points

Public interfaces let an application replace individual CQRS parts without changing commands, queries, or handlers:

Replace Contract When to use
Command name NamedCommandInterface or CommandNameResolverInterface The handler-map key must differ from the command class FQCN.
Query name NamedQueryInterface or QueryNameResolverInterface The handler-map key must differ from the query class FQCN.
Policy action id ActionIdResolverInterface Authorization must check an action id other than the command/query FQCN.
Command chain step Componenta\CQRS\Command\Middleware\MiddlewareInterface A custom step must run around command execution.
Query chain step Componenta\CQRS\Query\Middleware\MiddlewareInterface A custom step must run around query execution.
Queue transport TransportInterface Queued commands must be stored outside DatabaseTransport.
Command serialization CommandSerializerInterface Public-property JSON is not the right command format.
Command event listeners CommandListenerInterface A side effect should react to command events without changing the command handler.
Command metadata CommandAttributeProviderInterface #[Async], #[Retry], and #[Lock] must come from a source other than reflection or the standard compiled map.

When the handler key must be a stable string name instead of the class FQCN, implement NamedCommandInterface or NamedQueryInterface:

use Componenta\CQRS\Command\NamedCommandInterface;

final class RebuildSearchIndexCommand implements NamedCommandInterface
{
    public string $commandName {
        get => 'search.rebuild';
    }
}

The handler map then uses that name:

use Componenta\CQRS\ConfigKey;

return [
    ConfigKey::COMMAND_HANDLER_MAP => [
        'search.rebuild' => [RebuildSearchIndexHandler::class, '__invoke'],
    ],
];

Queries use the same idea through NamedQueryInterface and the queryName property:

use Componenta\CQRS\Query\NamedQueryInterface;

final class GetPublicFeedQuery implements NamedQueryInterface
{
    public string $queryName {
        get => 'feed.public';
    }
}

If authorization must use another action id, replace ActionIdResolverInterface in the container:

use Componenta\CQRS\Resolver\ActionIdResolverInterface;

final readonly class AppActionIdResolver implements ActionIdResolverInterface
{
    public function resolve(object $subject): string
    {
        return match ($subject::class) {
            PublishPostCommand::class => 'posts.publish',
            GetPublicFeedQuery::class => 'feed.read',
            default => $subject::class,
        };
    }
}

OperationFactoryInterface exists in the package, but the current CommandBus creates operations directly through Operation::create(). Do not treat it as a bus customization point until the bus factory wires it.

Failures

Failure Exception
Missing command handler Componenta\CQRS\Command\Exception\HandlerNotFoundException
Invalid command handler InvalidHandlerException
Missing query handler Componenta\CQRS\Query\Exception\HandlerNotFoundException
Missing actor for policy AuthenticationRequiredException
Lock key cannot be built LockKeyResolutionException
Lock cannot be acquired LockAcquisitionException
Missing transport TransportNotFoundException
Serialization or delivery failure TransportException

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 3
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-06-14

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固