定制 micromus/kafka-bus 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

micromus/kafka-bus

最新稳定版本:v1.0.0-RC4

Composer 安装命令:

composer require micromus/kafka-bus

包简介

This is my package kafka-bus

README 文档

README

Latest Version on Packagist GitHub Tests Action Status GitHub Code Style GitHub PHPStan Total Downloads

This is where your description should go. Limit it to a paragraph or two. Consider adding a small example.

Installation

You can install the package via composer:

composer require micromus/kafka-bus

Requirements

  • PHP ^8.2
  • ext-rdkafka and a running Kafka cluster
  • Optional for consumers: ext-pcntl (to handle stop signals gracefully)

Usage (via Composer)

Quick start: Bus with producer and consumer

Below is a minimal example of wiring the bus, registering a topic, adding a producer route, and running a listener that handles messages from the same topic.

<?php

use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Connections\Config\KafkaConnectionConfig;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandler;
use Micromus\KafkaBus\Consumers\Handlers\MessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRoutes;
use Micromus\KafkaBus\Consumers\Router\Route as ConsumerRoute;
use Micromus\KafkaBus\Producers\Messages\ProducerMessage;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;

require __DIR__ . '/vendor/autoload.php';

// Define topics
$topicRegistry = (new TopicRegistry())
    ->add(new Topic('production.fact.products.1', 'products'));

// Create consumer worker (listener) that handles messages from the topic
$consumeOptions = [
    'group.id' => 'products-microservice',
    'auto.offset.reset' => 'earliest',
];

// Simple synchronous handler example
class PrintHandler implements MessageHandler {
    public function handle(object $message): void {
        // $message is your domain message from pipeline
        fwrite(STDOUT, "Handled: " . json_encode($message) . PHP_EOL);
    }
}

$consumerRoutes = ConsumerRoutesBuilder::make($topicRegistry)
    ->add(new RouteInfo('products', new PrintHandler()))
    ->build();

$publisherRoutes = PublisherRoutesBuilder::make($topicRegistry)
    ->add(ProducerMessageFaker::class, 'products')
    ->build();

$workerRegistry = Bus\Listeners\Workers\MemoryWorkerRegistry::make()
    ->add(
        new Bus\Listeners\Workers\Worker(
            'default-listener',
            $consumerRoutes,
            new Bus\Listeners\Workers\Options(additionalOptions: $consumeOptions)
        )
    );

$bus = new Bus(
    new Bus\ThreadRegistry(
        ConnectionRegistry::default(),
        new Bus\ThreadFactory(
            new Bus\Listeners\ListenerFactory(workerRegistry: $workerRegistry),
            new Bus\Publishers\PublisherFactory(routes: $publisherRoutes),
        )
    ),
    ConnectionRegistry::DEFAULT_CONNECTION_NAME
);

// Produce a message
$bus->publish(new ProducerMessage(payload: 'test-message', headers: ['foo' => 'bar']));

// Consume in the same process (or run it separately)
pcntl_async_signals(true);
$listener = $bus->listener('default-listener');
pcntl_signal(SIGINT, fn () => $listener->forceStop());

$listener->listen();

Producing only

If you only need to produce messages, configure the bus and call publish with ProducerMessage instances. You do not need to start a listener in that case.

Consuming only

If you only need to consume, configure the worker(s) and call listener('name')->listen(). Your MessageHandler implementation will be invoked for each message received.

use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;

class MessageHandler
{
    public function __invoke(ConsumerMessageInterface $message)
    {
        // $message->payload()
    }
}

Only payload

class MessageHandler
{
    public function __invoke(string $message)
    {
        // $message == kafka message->payload 
    }
}

Payload as Array

class MessageHandler
{
    public function __invoke(array $payload)
    {
        // $payload == json_decode(message->payload, true)
    }
}

Original Message

use RdKafka\Message;

class MessageHandler
{
    public function __invoke(Message $message)
    {
        // $message->key
    }
}

Customer factory

use Micromus\KafkaBus\Consumers\Attributes\MessageFactory;use Micromus\KafkaBus\Consumers\Messages\JsonMessageFactory;use RdKafka\Message;

class MessageHandler
{
    #[MessageFactory(new JsonMessageFactory())]
    public function __invoke(array $payload)
    {
        // $payload == json_decode(message->payload, true)
    }
}

More examples

  • Producer only: see examples/producer.php
  • Consumer only: see examples/consumer.php
  • Full setup with routing: see examples/bus.php

Architecture

The architecture of the package is described in STRUCTURE.md (in Russian).

Testing

composer test

Changelog

Please see CHANGELOG for more information on what has changed recently.

Contributing

Please see CONTRIBUTING for details.

Security Vulnerabilities

Please review our security policy on how to report security vulnerabilities.

Credits

License

The MIT License (MIT). Please see License File for more information.

统计信息

  • 总下载量: 689
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 5
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 1
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2024-10-01

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固