iamfarhad/laravel-rabbitmq 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

iamfarhad/laravel-rabbitmq

最新稳定版本:1.3.0

Composer 安装命令:

composer require iamfarhad/laravel-rabbitmq

包简介

A robust RabbitMQ driver for Laravel Queue with advanced message queuing, reliable delivery, and high-performance async processing capabilities

README 文档

README

Latest Stable Version Total Downloads License Tests

A production-focused RabbitMQ queue driver for Laravel with native Laravel Queue integration, connection/channel pooling, configurable RabbitMQ topology, Horizon hooks, Octane-safe pool reset support, and an optional high-performance basic_consume worker mode.

Features

  • Laravel Queue API compatible RabbitMQ driver.
  • Native ext-amqp implementation for high-performance AMQP operations.
  • Connection and channel pooling with health checks and retry backoff.
  • Backward-compatible single-host config and production multi-host config.
  • Connection timeout support: read, write, and connect timeout.
  • Configurable exchange, exchange type, and routing-key publishing for normal Laravel jobs.
  • Lazy queues, priority queues, quorum queues, delayed messages, dead-letter routing, and failed-message rerouting.
  • Publisher confirms, transactions, RPC helpers, exchange helpers, and queue management helpers.
  • Optional Laravel Horizon integration via RABBITMQ_WORKER=horizon.
  • Optional Laravel Octane pool reset via RABBITMQ_OCTANE_RESET_ON_REQUEST=true.
  • Optional basic_consume worker mode via RABBITMQ_CONSUME_MODE=consume or --consume-mode=consume.
  • Artisan admin commands for declaring exchanges, declaring queues, purging queues, deleting queues, and viewing pool stats.

Requirements

  • PHP 8.2 or higher.
  • Laravel 11.x, 12.x, or 13.x.
  • RabbitMQ 3.8 or higher.
  • ext-amqp PHP extension.
  • ext-pcntl is optional and only required when running rabbitmq:consume --num-processes with a value greater than 1.

Installation

composer require iamfarhad/laravel-rabbitmq

Install the AMQP extension when it is not already available:

pecl install amqp

For Debian/Ubuntu images, install the native dependency first:

sudo apt-get update
sudo apt-get install -y librabbitmq-dev libssh-dev
sudo pecl install amqp

Publish the config:

php artisan vendor:publish \
  --provider="iamfarhad\LaravelRabbitMQ\LaravelRabbitQueueServiceProvider" \
  --tag="config"

Set Laravel to use RabbitMQ:

QUEUE_CONNECTION=rabbitmq

Quick start

QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
RABBITMQ_QUEUE=default

Start RabbitMQ locally:

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

Dispatch Laravel jobs normally:

dispatch(new App\Jobs\ProcessPodcast($podcast));
dispatch(new App\Jobs\ProcessPodcast($podcast))->onQueue('podcasts');
dispatch(new App\Jobs\ProcessPodcast($podcast))->delay(now()->addMinutes(10));

Run a worker:

php artisan rabbitmq:consume --queue=default --num-processes=1

Laravel's default worker also works:

php artisan queue:work rabbitmq --queue=default

Configuration

The package merges config/rabbitmq.php into queue.connections.rabbitmq, so you can configure the package from the published config file or directly in config/queue.php.

Single-host configuration

'rabbitmq' => [
    'driver' => 'rabbitmq',
    'queue' => env('RABBITMQ_QUEUE', 'default'),
    'hosts' => [
        'host' => env('RABBITMQ_HOST', '127.0.0.1'),
        'port' => env('RABBITMQ_PORT', 5672),
        'user' => env('RABBITMQ_USER', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost' => env('RABBITMQ_VHOST', '/'),
        'heartbeat' => env('RABBITMQ_HEARTBEAT_CONNECTION', 0),
        'read_timeout' => env('RABBITMQ_READ_TIMEOUT', 0),
        'write_timeout' => env('RABBITMQ_WRITE_TIMEOUT', 0),
        'connect_timeout' => env('RABBITMQ_CONNECT_TIMEOUT', 0),
    ],
]

Multi-host configuration

Use a list of hosts for basic high-availability. The connector randomly selects a host for each new connection attempt.

'hosts' => [
    [
        'host' => 'rabbitmq-1',
        'port' => 5672,
        'user' => 'laravel',
        'password' => 'secret',
        'vhost' => '/',
    ],
    [
        'host' => 'rabbitmq-2',
        'port' => 5672,
        'user' => 'laravel',
        'password' => 'secret',
        'vhost' => '/',
    ],
],

Pool configuration

RABBITMQ_MAX_CONNECTIONS=10
RABBITMQ_MIN_CONNECTIONS=2
RABBITMQ_MAX_CHANNELS_PER_CONNECTION=100
RABBITMQ_MAX_RETRIES=3
RABBITMQ_RETRY_DELAY=1000
RABBITMQ_HEALTH_CHECK_ENABLED=true
RABBITMQ_HEALTH_CHECK_INTERVAL=30
Setting Default Description
max_connections 10 Maximum open AMQP connections.
min_connections 2 Minimum warm connections to keep available.
max_channels_per_connection 100 Channel cap per AMQP connection.
max_retries 3 Connection retry attempts.
retry_delay 1000 Initial retry delay in milliseconds.
health_check_enabled true Enable pool health checks.
health_check_interval 30 Health check interval in seconds.

Publishing topology

By default the package preserves RabbitMQ's default exchange behavior and routes jobs directly to the queue name. For production routing, configure an exchange and routing-key pattern:

RABBITMQ_EXCHANGE=jobs
RABBITMQ_EXCHANGE_TYPE=topic
RABBITMQ_EXCHANGE_ROUTING_KEY=jobs.%s

%s is replaced with the Laravel queue name. For example, queue emails publishes with routing key jobs.emails.

Supported exchange types:

  • direct
  • fanout
  • topic
  • headers

Queue options

Lazy queues

RABBITMQ_QUEUE_LAZY=true

Or per queue:

'queues' => [
    'bulk' => [
        'name' => 'bulk',
        'lazy' => true,
    ],
],

Priority queues

RABBITMQ_PRIORITIZE_DELAYED=true
RABBITMQ_QUEUE_MAX_PRIORITY=10

Per queue:

'queues' => [
    'critical' => [
        'name' => 'critical',
        'priority' => 10,
    ],
],

Quorum queues

RABBITMQ_QUEUE_QUORUM=true

Per queue:

'queues' => [
    'orders' => [
        'name' => 'orders',
        'quorum' => true,
    ],
],

Priority queues and quorum queues are mutually exclusive in RabbitMQ. When quorum mode is enabled, the package does not set x-max-priority.

Failed-message rerouting

RABBITMQ_REROUTE_FAILED=true
RABBITMQ_FAILED_EXCHANGE=failed.jobs
RABBITMQ_FAILED_ROUTING_KEY=%s.failed

When enabled, declared queues receive dead-letter arguments pointing failed messages to the configured failed exchange and routing key.

Worker modes

Poll mode

poll is the default and uses basic_get. It is the safest mode and matches Laravel's worker lifecycle expectations.

php artisan rabbitmq:consume --queue=default --consume-mode=poll

Consume mode

consume uses RabbitMQ's basic_consume push-style delivery. It avoids polling overhead and is better for hot queues, but it should be used with one queue per worker process.

php artisan rabbitmq:consume --queue=default --consume-mode=consume

Or configure it globally:

RABBITMQ_CONSUME_MODE=consume

Parallel workers

php artisan rabbitmq:consume --queue=default --num-processes=4

ext-pcntl is required only for --num-processes greater than 1.

Laravel Horizon

Install Horizon in your Laravel application, then enable the Horizon queue integration:

RABBITMQ_WORKER=horizon

When Horizon is installed and RABBITMQ_WORKER=horizon is set, the package uses an optional Horizon-aware queue class and dispatches Horizon events for:

  • pending jobs
  • pushed jobs
  • reserved jobs
  • deleted jobs
  • failed jobs

The package does not require Horizon. All Horizon references are guarded and inactive when Horizon is not installed.

Laravel Octane

Long-running Octane workers can reuse AMQP pools across requests. That is usually best for performance. If your application needs a fresh pool after every Octane request, enable:

RABBITMQ_OCTANE_RESET_ON_REQUEST=true

This hook is optional and only active when Laravel Octane is installed.

Admin commands

# Pool stats
php artisan rabbitmq:pool-stats
php artisan rabbitmq:pool-stats --json
php artisan rabbitmq:pool-stats --watch --interval=5

# Exchanges
php artisan rabbitmq:exchange-declare jobs --type=topic

# Queues
php artisan rabbitmq:queue-declare orders --durable=1
php artisan rabbitmq:queue-declare bulk --lazy=1
php artisan rabbitmq:queue-declare quorum-orders --quorum=1
php artisan rabbitmq:queue-declare critical --priority=10
php artisan rabbitmq:queue-purge orders --force
php artisan rabbitmq:queue-delete orders --force

Advanced helpers

Publish to an exchange

$queue = Queue::connection('rabbitmq');

$queue->publishToExchange(
    'notifications',
    json_encode(['message' => 'Welcome']),
    'user.created.email',
    ['source' => 'users']
);

Delayed messages

$queue->publishDelayed('emails', $payload, delay: 3600);

For the RabbitMQ delayed-message plugin:

RABBITMQ_DELAYED_PLUGIN_ENABLED=true
RABBITMQ_DELAYED_EXCHANGE=delayed

Publisher confirms

RABBITMQ_PUBLISHER_CONFIRMS_ENABLED=true
RABBITMQ_PUBLISHER_CONFIRMS_TIMEOUT=5

RPC

RABBITMQ_RPC_ENABLED=true
RABBITMQ_RPC_TIMEOUT=30
$response = Queue::connection('rabbitmq')->rpcCall(
    'rpc-queue',
    json_encode(['action' => 'calculate'])
);

Transactions

RABBITMQ_TRANSACTIONS_ENABLED=true
Queue::connection('rabbitmq')->transaction(function ($queue) {
    $queue->push(new App\Jobs\ProcessOrder(1));
    $queue->push(new App\Jobs\ProcessOrder(2));
});

Environment variables

QUEUE_CONNECTION=rabbitmq
RABBITMQ_QUEUE=default
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT_CONNECTION=0
RABBITMQ_READ_TIMEOUT=0
RABBITMQ_WRITE_TIMEOUT=0
RABBITMQ_CONNECT_TIMEOUT=0
RABBITMQ_SECURE=false

RABBITMQ_MAX_CONNECTIONS=10
RABBITMQ_MIN_CONNECTIONS=2
RABBITMQ_MAX_CHANNELS_PER_CONNECTION=100
RABBITMQ_MAX_RETRIES=3
RABBITMQ_RETRY_DELAY=1000
RABBITMQ_HEALTH_CHECK_ENABLED=true
RABBITMQ_HEALTH_CHECK_INTERVAL=30

RABBITMQ_EXCHANGE=
RABBITMQ_EXCHANGE_TYPE=direct
RABBITMQ_EXCHANGE_ROUTING_KEY=%s
RABBITMQ_PRIORITIZE_DELAYED=false
RABBITMQ_QUEUE_MAX_PRIORITY=10
RABBITMQ_QUEUE_LAZY=false
RABBITMQ_QUEUE_QUORUM=false
RABBITMQ_REROUTE_FAILED=false
RABBITMQ_FAILED_EXCHANGE=
RABBITMQ_FAILED_ROUTING_KEY=%s.failed

RABBITMQ_WORKER=default
RABBITMQ_CONSUME_MODE=poll
RABBITMQ_OCTANE_RESET_ON_REQUEST=false

RABBITMQ_PUBLISHER_CONFIRMS_ENABLED=false
RABBITMQ_PUBLISHER_CONFIRMS_TIMEOUT=5
RABBITMQ_RPC_ENABLED=false
RABBITMQ_RPC_TIMEOUT=30
RABBITMQ_TRANSACTIONS_ENABLED=false
RABBITMQ_DELAYED_PLUGIN_ENABLED=false
RABBITMQ_DELAYED_EXCHANGE=delayed

Production deployment

Supervisor example

[program:laravel-rabbitmq]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan rabbitmq:consume --queue=default --consume-mode=consume --memory=256 --tries=3 --timeout=60 --num-processes=1
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=4
redirect_stderr=true
stdout_logfile=/var/log/supervisor/laravel-rabbitmq.log
stopwaitsecs=3600

For --consume-mode=consume, prefer one queue per worker process. Scale with Supervisor numprocs, containers, or process managers rather than a comma-separated queue list.

Docker notes

Install AMQP in your PHP image:

RUN apt-get update && apt-get install -y \
    librabbitmq-dev \
    libssh-dev \
    && pecl install amqp \
    && docker-php-ext-enable amqp

Install pcntl only if you use multi-process consumers inside the same PHP process:

RUN docker-php-ext-install pcntl

Testing and quality

composer format-test
composer analyse
composer test

The GitHub Actions test workflow runs Pint, PHPStan, and PHPUnit across the supported Laravel/PHP matrix with a RabbitMQ service container.

Troubleshooting

Class AMQPConnection not found

Install and enable ext-amqp:

pecl install amqp
php -m | grep amqp

Parallel worker error about pcntl

Install ext-pcntl, or run a single process:

php artisan rabbitmq:consume --queue=default --num-processes=1

Queue size or queue existence checks return zero after missing queue errors

RabbitMQ closes a channel after a passive queue declaration miss. The package releases that cached channel and uses a fresh channel on the next operation.

Horizon events do not appear

Confirm Horizon is installed and set:

RABBITMQ_WORKER=horizon

Then restart your workers.

License

The MIT License (MIT). See LICENSE for more information.

统计信息

  • 总下载量: 17.23k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 32
  • 点击次数: 4
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 32
  • Watchers: 2
  • Forks: 3
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2022-12-30

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固