guidepilot/php-underground-radio 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

guidepilot/php-underground-radio

Composer 安装命令:

composer require guidepilot/php-underground-radio

包简介

A PHP library that allows messages to be sent via various services that implement both a message queue architecture and a publish/subscribe service

关键字:

README 文档

README

Disclaimer: This project is in a very early stage of development and should not be used in production. The present code is not much more than a first draft. Nevertheless, it may already be of use to someone. Pull requests always welcome...

Introduction

The UndergroundRadio library allows messages to be sent via various services that implement both a message queue architecture and a publish/subscribe service.

Although the architecture provides an abstract interface for the integration of any suitable services, at this time only an implementation based on Redis exists. Here the focus is on the use of modern Redis features like Streams and Pub/Sub.

This library is inspired by Enqueue library and the queue-interop protocol, but takes some different approaches in detail.

Installation

This package requires PHP 8.1 or greater!

Installing with composer:

$ composer require guidepilot/php-underground-radio

Usage examples for pub/sub pattern

This pattern is also known as broadcast or fan-out architecture.

Simple message producer via channel

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$channel = new Channel('fooChannel');

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello world!');

$producer->send($message, $channel);

Simple message subscriber

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\Broadcast\Interfaces\SubscriptionMessageProcessor;
use GuidePilot\UndergroundRadio\Broadcast\Subscriber;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;


$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$subscriber = new Subscriber($context);
$channel = new Channel('fooChannel');

$subscriber->subscribe($channel, new class implements SubscriptionMessageProcessor {

    public function processMessage(\GuidePilot\UndergroundRadio\Interfaces\Message $message, \GuidePilot\UndergroundRadio\Broadcast\Interfaces\Channel $channel) {
        echo "--- New message from {$channel->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;
    }
});

Usage examples message queue pattern

Simple message producer via queue

use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\Queue\CappedQueue;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$queue = new CappedQueue('fooQueue', 42);

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello queue world! (capped)');

$producer->send($message, $queue);

Simple queue consumer

use GuidePilot\UndergroundRadio\Interfaces\Message;
use GuidePilot\UndergroundRadio\PhpSerializer;
use GuidePilot\UndergroundRadio\Queue\Interfaces\ProcessorResult;
use GuidePilot\UndergroundRadio\Queue\Interfaces\QueueMessageProcessor;
use GuidePilot\UndergroundRadio\Queue\Queue;
use GuidePilot\UndergroundRadio\Queue\QueueConsumer;
use GuidePilot\UndergroundRadio\Queue\QueueConsumerGroup;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new PhpSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$group = new QueueConsumerGroup('worker');
$consumer = new QueueConsumer($context, 'worker-0', $group);
$queue = new Queue('fooQueue');

$consumer->consume($queue, new class implements QueueMessageProcessor {

    public function processMessage(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue): ProcessorResult {
        echo "--- New message from {$queue->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;

        return ProcessorResult::Acknowledge;
    }

    public function handleMaxRequeueCountReached(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue) {
        echo "!!! Message {$message->getMessageId()} reached max requeue count !!!".PHP_EOL;
    }

});

License

It is released under the MIT License.

统计信息

  • 总下载量: 6
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 1
  • 点击次数: 2
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 1
  • Watchers: 1
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2022-10-18

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固