domraider/bunny
Composer 安装命令:
composer require domraider/bunny
包简介
Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library
关键字:
README 文档
README
Fork to handle automatic reconnect :
require "vendor/autoload.php"; $loop = \React\EventLoop\Factory::create(); $mq = new \Bunny\Async\Client($loop, [ 'host' => '127.0.0.1', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', ]); // Connect is refactored to be retryable $mq->connect() ->retryWhen(function ($errors) { // Retry to connect on error return $errors->delay(2000) ->doOnNext(function () { echo "Disconnected, retry\n"; }); }) ->flatMap(function () use ($mq) { // Open only 1 channel return $mq->channel(); }) ->subscribeCallback(function (\Bunny\Channel $channel) use ($mq, $loop) { echo "connected\n"; // Publish every 2s \Rx\Observable::interval(2000) // <- here we drop during disconnect // <- You have to add a buffer ->flatMap(function () use ($mq, $channel) { $data = md5(microtime(true)); echo "Produce {$data}\n"; return \Rx\React\Promise::toObservable($channel->publish($data, [], 'amq.direct', 'test')) ->map(function () use ($data) { return $data; }); }) ->subscribeCallback( function ($data) { echo " {$data} : OK\n"; }, function () { echo "error during produce\n"; }, null, new \Rx\Scheduler\EventLoopScheduler($loop) ); }, function (\Exception $e) { echo "disconnected : {$e->getMessage()}\n"; }, null, new \Rx\Scheduler\EventLoopScheduler($loop)); $loop->run();
Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library
统计信息
- 总下载量: 7.6k
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 2
- 点击次数: 0
- 依赖项目数: 1
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2016-11-08