kynx/swoole-processor 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

kynx/swoole-processor

最新稳定版本:0.4.0

Composer 安装命令:

composer require kynx/swoole-processor

包简介

Execute tasks in multi-process environment

README 文档

README

Continuous Integration

Run CLI batch jobs in coroutines across multiple processes.

Based on Swoole, the processor is ideal for running a large number of IO-intensive operations.

Install

composer require kynx/swoole-processor

Use

Create a JobProvider class that returns Job objects containing the payload you want to process:

use Kynx\Swoole\Processor\Job\Job;
use Kynx\Swoole\Processor\Job\JobProviderInterface;

class JobProvider implements JobProviderInterface
{
    public function getIterator(): Generator
    {
        foreach (range(0, 10) as $payload) {
            // NOTE: your payload must be serializable!!
            yield new Job($payload);
        }
    }
}

Create a Worker class to handle the jobs:

use Kynx\Swoole\Processor\Job\WorkerInterface;

class Worker implement WorkerInterface
{
    public function init(int $workerId): void
    {
        // perform any initialisation needed
    }

    public function run(Job $job): Job
    {
        // do work on payload
        echo "Got payload: " . $job->getPayload() . "\n";
        
        // return job with result of process
        return $job->withResult("Processed: " . $job->getPayload());
    }
}

If required, create a CompletionHandler to do any post-processing:

use Kynx\Swoole\Processor\Job\CompletionHandlerInterface;

class CompletionHandler implements CompletionHandlerInterface
{
    public function complete(Job $job): void
    {
        // mark job as complete
        echo "Completed: " . $job->getResult() . "\n";
    }
}

Create a script to run the jobs in parallel:

use Kynx\Swoole\Processor\Config;
use Kynx\Swoole\Processor\Processor;
use Throwable;

$processor = new Processor(
    new Config(),
    new JobProvider(),
    new Worker(),
    new CompletionHandler()
);

// this will block until all jobs are processed
$result = $processor->run();
return $result === true ? 0 : 1;

If you don't need to handle completion, omit the fourth parameter.

How it works

The JobProvider runs in its own process, and the resulting jobs are fired at a Swoole server listening on a local socket. Both it and the the CompletionHandler are blocking. If the JobProvider returns a large number of jobs or performs time-consuming operations, return a Generator so jobs can be started as soon as possible. The CompletionHandler should be fast.

The server spawns a number of processes to handle the jobs - by default one less than the number of CPU cores detected. The process runs your Worker inside a coroutine, ensuring IO operations do not block. Because of this, ensure all IO uses a Connection Pool, covered in more detail below.

You can control the number of simultaneous jobs the processor will spawn by setting the concurrency parameter on the the Config you pass to the constructor. It defaults to 10, with a maximum of workers x maxCoroutines.

Caveats

  • Job payloads and results must be serializable and, but default, together should be less than 2M when serialized
  • A Worker should be stateless. If you need share data between workers, use a Table.
  • Uncaught exceptions will crash the process, causing it it re-spawn. For this reason your Worker::run() is called inside a try ... catch block. However the exception is discarded: if you care about where your program is going wrong, catch all exceptions inside your Worker and handle them yourself.

统计信息

  • 总下载量: 37
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 1
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-01-20

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固