定制 sshkolyk/laravel-queue-kafka 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

sshkolyk/laravel-queue-kafka

最新稳定版本:v2.0.0

Composer 安装命令:

composer require sshkolyk/laravel-queue-kafka

包简介

Kafka driver for Laravel Queue

README 文档

README

Latest Stable Version Software License

Installation

  1. Install librdkafka c library

    $ cd /tmp
    $ mkdir librdkafka
    $ cd librdkafka
    $ git clone https://github.com/edenhill/librdkafka.git .
    $ ./configure
    $ make
    $ make install
  2. Install the php-rdkafka PECL extension

    $ pecl install rdkafka
  3. a. Add the following to your php.ini file to enable the php-rdkafka extension extension=rdkafka.so

    b. Check if rdkafka is installed
    Note: If you want to run this on php-fpm restart your php-fpm first.

        php -i | grep rdkafka

    Your output should look something like this

    rdkafka
    rdkafka support => enabled
    librdkafka version (runtime) => 2.12.1
    librdkafka version (build) => 2.12.1.255
    
  4. Install this package via composer using:

        composer require sshkolyk/laravel-queue-kafka
  5. If you are using Lumen, put this in bootstrap/app.php:

        $app->register(Rapide\LaravelQueueKafka\LumenQueueKafkaServiceProvider::class);
  6. You can also publish queue-kafka.php config:

        php artisan vendor:publish --tag queue-kafka-config
  7. Add these properties to .env with proper values:

     QUEUE_DRIVER=kafka
    
  8. If you want to run a worker for a specific consumer group

        export KAFKA_CONSUMER_GROUP_ID="group2" && php artisan queue:work --sleep=3
  9. For run parallel in N partitions invoke N workers with:

        KAFKA_CONSUMER_PARTITION=0 php artisan queue:work
        KAFKA_CONSUMER_PARTITION=1 php artisan queue:work
        ...
  10. --tries not working with this driver. Make sure you catch all exceptions and enqueue again in your job if needed in your job
    Queue::later() also not working

Usage

Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues

Supported environment variables

KAFKA_QUEUE - default queue(topic) name

KAFKA_CONSUMER_GROUP_ID - kafka consumer group, default = 'laravel_queue'

KAFKA_CONSUMER_PARTITION - kafka partition for consume, default = 0

KAFKA_PRODUCER_PARTITIONER - Producer partitioner algorithm, default = 'murmur2_random'

Can be:
  1. random - random distribution, consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition),
  2. consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned),
  3. murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
  4. murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.),
  5. fnv1a - FNV-1a hash of key (NULL keys are mapped to single partition),
  6. fnv1a_random - FNV-1a hash of key (NULL keys are randomly partitioned).

KAFKA_BROKERS - Comma-separated list of Kafka broker addresses the client will initially connect to, default = localhost:9092

KAFKA_ERROR_SLEEP - Determine the number of seconds to sleep if there's an error communicating with kafka or false|null, default = 5

KAFKA_SASL_ENABLE - Enable SASL authentication. if false other SASL config does not matter, default = false

KAFKA_SASL_SECURITY_PROTOCOL - One of SSL, PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, default = SASL_SSL

KAFKA_SASL_MECHANISM - One of PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, default = SCRAM-SHA-512

KAFKA_SSL_CA_LOCATION - File or directory path to CA certificate(s) for verifying the broker's key, default empty

KAFKA_SASL_PLAIN_USERNAME - no default

KAFKA_SASL_PLAIN_PASSWORD - no default

KAFKA_AUTO_COMMIT - The property enable.auto.commit is set to true by default, and Kafka commits the current offset back to the Kafka broker at a specified interval, default = true. In most cases you don't need to touch this

KAFKA_AUTO_RESET - What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  1. earliest: automatically reset the offset to the earliest offset
  2. latest: automatically reset the offset to the latest offset
  3. none: throw exception to the consumer if no previous offset is found for the consumer's group
  4. anything else: throw exception to the consumer, default = 'earliest'. If you change this first producer messages may be ignored by consumer

KAFKA_TIMEOUT_MS - Timeout in ms for most operations, default = 1000

Testing

Run the tests with:

vendor/bin/phpunit

Acknowledgement

This library is based on laravel-queue-kafka by rapideinternet.

Contribution

You can contribute to this package by discovering bugs and opening issues. Please, add to which version of package you create pull request or issue.

Supported versions of Laravel

Tested on: [12.0]

统计信息

  • 总下载量: 2
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 4
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 57
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-03-08

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固