netlogix/jobqueue-polling
Composer 安装命令:
composer require netlogix/jobqueue-polling
包简介
Shared poll scheduler for ReactPHP based job workers
README 文档
README
This package provides a small, dependency-free poll scheduler for ReactPHP based job workers.
There are two main goals:
- Poll for new work on a baseline interval without burning CPU when idle.
- Pick up freed capacity immediately after a job finishes, without waiting for the next interval — while never starting more work in parallel than allowed.
It is the shared polling core used by both Netlogix.JobQueue.FastRabbit (one job at
a time per worker) and Netlogix.JobQueue.Scheduled (N jobs in parallel).
Installation
composer require netlogix/jobqueue-polling
The problem it solves
A naive worker either polls aggressively (high CPU even when there is nothing to do) or only on a slow interval (low throughput, because a freed slot waits up to one interval before the next job is picked up).
PollScheduler combines a periodic baseline poll with an on-demand "immediate" poll
that callers trigger when a job completes. Immediate polls are coalesced to a single
pending future tick, and every poll is gated by a capacity check, so a burst of
completions racing the periodic timer can never start more work than capacity allows.
The correctness invariant: the capacity check and the slot occupation done inside
$tryToPickUpWork run synchronously within a single, uninterrupted loop frame
(ReactPHP is single threaded, no await in between). Therefore any number of stacked
or racing polls collapse to "fill up to capacity, then bail".
Usage
PollScheduler is DI-free — instantiate it with new or the static create()
factory, also from a standalone worker binary that has no Flow object management.
use Netlogix\JobQueue\Polling\PollScheduler; $scheduler = PollScheduler::create( loop: $loop, // React\EventLoop\LoopInterface tryToPickUpWork: fn () => $this->pickUp($pool), // picks up as much work as capacity allows hasCapacity: fn () => count($pool) < $parallel, // cheap, side-effect free interval: 0.1, // baseline poll interval in seconds ); $scheduler->start(); // register the periodic baseline poll // ... when a job finishes (e.g. in the process EXIT/SUCCESS/ERROR handler): $scheduler->requestImmediatePoll(); // pick up the next job without waiting for the interval // ... during a drain/shutdown phase: $scheduler->stop(); // cancel the baseline poll (does not stop the loop)
$tryToPickUpWork is responsible for respecting capacity itself. The two canonical
shapes:
- Single concurrency (FastRabbit):
hasCapacity: fn () => count($pool) === 0, andtryToPickUpWorkreserves exactly one job when free. - N concurrency (Scheduled):
hasCapacity: fn () => count($pool) < $parallel, andtryToPickUpWorkdrains in awhile (count($pool) < $parallel)loop.
The scheduler's own capacity pre-check is an early-out; the callback's internal limit is the authoritative bound.
API
PollScheduler::create(LoopInterface $loop, Closure $tryToPickUpWork, Closure $hasCapacity, float $interval): static/ equivalent constructor.start(): self— register the periodic baseline poll.requestImmediatePoll(): void— request an out-of-band poll as soon as possible; coalesced to at most one pending future tick.stop(): void— cancel the baseline poll and stop picking up further work (drain phase); does not stop the event loop.
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 1
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-18