承接 netlogix/jobqueue-polling 相关项目开发

从需求分析到上线部署,全程专人跟进,保证项目质量与交付效率

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

netlogix/jobqueue-polling

Composer 安装命令:

composer require netlogix/jobqueue-polling

包简介

Shared poll scheduler for ReactPHP based job workers

README 文档

README

This package provides a small, dependency-free poll scheduler for ReactPHP based job workers.

There are two main goals:

  1. Poll for new work on a baseline interval without burning CPU when idle.
  2. Pick up freed capacity immediately after a job finishes, without waiting for the next interval — while never starting more work in parallel than allowed.

It is the shared polling core used by both Netlogix.JobQueue.FastRabbit (one job at a time per worker) and Netlogix.JobQueue.Scheduled (N jobs in parallel).

Installation

composer require netlogix/jobqueue-polling

The problem it solves

A naive worker either polls aggressively (high CPU even when there is nothing to do) or only on a slow interval (low throughput, because a freed slot waits up to one interval before the next job is picked up).

PollScheduler combines a periodic baseline poll with an on-demand "immediate" poll that callers trigger when a job completes. Immediate polls are coalesced to a single pending future tick, and every poll is gated by a capacity check, so a burst of completions racing the periodic timer can never start more work than capacity allows.

The correctness invariant: the capacity check and the slot occupation done inside $tryToPickUpWork run synchronously within a single, uninterrupted loop frame (ReactPHP is single threaded, no await in between). Therefore any number of stacked or racing polls collapse to "fill up to capacity, then bail".

Usage

PollScheduler is DI-free — instantiate it with new or the static create() factory, also from a standalone worker binary that has no Flow object management.

use Netlogix\JobQueue\Polling\PollScheduler;

$scheduler = PollScheduler::create(
    loop: $loop,                              // React\EventLoop\LoopInterface
    tryToPickUpWork: fn () => $this->pickUp($pool),   // picks up as much work as capacity allows
    hasCapacity: fn () => count($pool) < $parallel,   // cheap, side-effect free
    interval: 0.1,                            // baseline poll interval in seconds
);

$scheduler->start();                          // register the periodic baseline poll

// ... when a job finishes (e.g. in the process EXIT/SUCCESS/ERROR handler):
$scheduler->requestImmediatePoll();           // pick up the next job without waiting for the interval

// ... during a drain/shutdown phase:
$scheduler->stop();                           // cancel the baseline poll (does not stop the loop)

$tryToPickUpWork is responsible for respecting capacity itself. The two canonical shapes:

  • Single concurrency (FastRabbit): hasCapacity: fn () => count($pool) === 0, and tryToPickUpWork reserves exactly one job when free.
  • N concurrency (Scheduled): hasCapacity: fn () => count($pool) < $parallel, and tryToPickUpWork drains in a while (count($pool) < $parallel) loop.

The scheduler's own capacity pre-check is an early-out; the callback's internal limit is the authoritative bound.

API

  • PollScheduler::create(LoopInterface $loop, Closure $tryToPickUpWork, Closure $hasCapacity, float $interval): static / equivalent constructor.
  • start(): self — register the periodic baseline poll.
  • requestImmediatePoll(): void — request an out-of-band poll as soon as possible; coalesced to at most one pending future tick.
  • stop(): void — cancel the baseline poll and stop picking up further work (drain phase); does not stop the event loop.

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-06-18

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固