small/swoole-rx-events 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

small/swoole-rx-events

最新稳定版本:0.2.0

Composer 安装命令:

composer require small/swoole-rx-events

包简介

This project provides implementations of reactive events for Swoole.

README 文档

README

>         

Reactive event bus for PHP powered by RxPHP and Swoole.
It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.

  • EventBus — simple Rx‐backed bus with on(), onMany(), payloads(), once(), request()
  • SwooleSchedulerAsyncSchedulerInterface using Swoole\Timer (works with RxPHP time operators)
  • Event modelBasicEvent (name, payload, meta, rid) and EventInterface (correlation id)

Requirements

  • PHP 8.3+
  • ext-swoole 4.8+ / 5.x
  • reactivex/rxphp (2.x)

Installation

composer require small/swoole-rx-events

Quick start

use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;

// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());

// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
    echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});

// Emit an event
$bus->emitName('order.created', ['id' => 123]);

// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();

Concepts

Event

All event must implement EventInterface

namespace Small\SwooleRxEvents\Contract;

interface EventInterface
{

    public function getName(): string;
    public function getRid(): string;
    public function setRid(string $rid): self;

}

BasicEvent carries:

  • name (string)
  • payload (array)
  • meta (array, e.g. tracing, user)
  • rid (string, auto‐generated correlation id)

Bus

  • stream() — all events
  • on($name) / onMany([...]) — filtered streams
  • payloads($name) — payload‐only stream
  • once($name, ?map, ?timeoutMs) — resolve first matching event (optionally mapped)
  • request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
    Emits a request with a new rid, waits for the first response with the same rid.

Timeouts require an async scheduler. This library provides SwooleScheduler which implements AsyncSchedulerInterface.

API Examples

1) Listen & emit

$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);

2) Request/Response with correlation id

// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
    $bus->emit(
        (new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
            ->setRid($e->getRid())   // correlate
    );
});

// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
    ->subscribe(
        fn($resp) => var_dump($resp->getPayload()),          // ['ok' => true]
        fn($err)  => error_log($err->getMessage())
    );

3) once() with mapping & timeout

$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
    ->subscribe(
        fn($node) => echo "node=$node\n",
        fn($err)  => echo "timeout\n"
    );
$bus->emitName('health.ok', [], ['node' => 'api-1']);

4) Backpressure / batching (Rx composition)

$bus->on('order.created')
    ->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
    ->filter(fn($batch) => !empty($batch))
    ->subscribe(fn(array $batch) => persist_batch($batch));

Swoole integration tips

  • HTTP server: in on('request'), emit an event with meta containing a respond callable or the Response object. Downstream subscribers can produce a ResponseEvent.
  • Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
  • Event loop in CLI: outside a Swoole Server, start/stop the reactor with Swoole\Event::wait() / Event::exit() for timers to fire.

License

GPL-3.0-only — see LICENSE.

统计信息

  • 总下载量: 11
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 3
  • 依赖项目数: 1
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: GPL-3.0-only
  • 更新时间: 2025-09-19

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固