定制 glider88/amp-redis-streams 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

glider88/amp-redis-streams

最新稳定版本:1.0.1

Composer 安装命令:

composer require glider88/amp-redis-streams

包简介

Redis Streams with Amphp

README 文档

README

Installation:

composer require glider88/amp-redis-streams

Start docker:

bin/re # first time
bin/up # next times

Tests:

bin/unit

Run example:

Producer:

bin/php examples/producer.php

Consumer:

bin/php examples/consumer.php

Settings:

    // create amphp redis
    $ampRedis = createRedisClient('redis://redis:6379');
    
    // second amphp redis, because Stream has blocking operation 'XREADGROUP GROUP ... BLOCK ...', this is necessary to reduce latency
    $ampRedisBlocked = createRedisClient('redis://redis:6379');
    
    // wrapper for amphp redis that implement missing redis commands
    $advRedis = new AdvancedAmpRedis($ampRedis, $logger);
    $advRedisBlocked = new AdvancedAmpRedis($ampRedisBlocked, $logger);

    $redis = new Redis(
        stream: 's',                    // stream name
        group: 'g',                     // consumer group name
        redis: $advRedis,
        redisBlocked: $advRedisBlocked,
        logger: new NullLogger(),
        maxStreamLength: 1000,          // approximate stream size
        maxDlqStreamLength: 1000,       // approximate stream size for dead letters
        readRetrySetCount: 100,         // how many entities we get at time from redis sorted set
                                        // (redis set is used to implement logic of retries, with name s:g:retry)
        readAutoClaimCount: 100,        // how many autoclaim entities we get at time
        blockRead: new Sec(1),          // how long we wait first data from stream
        deduplicationTtl: new Sec(3),   // for deduplication logic used `SET $streamMessageId . '-' . $this->group` with this ttl
        autoClaimMinIdle: new Sec(1),   // after this time we get message from PEL by autoclaim
        consumer: 'c',                  // optional consumer name, or generate: 'c-'.gethostname().'-'.getmypid()
    );

    $stream = new Stream(
        stream: 's',                               // stream name
        group: 'g',                                // consumer group name
        redis: $redis,
        maxRetries: 3,                             // after we send message to dead letters stream (with s:dql name)
        logger: new NullLogger(),
        retry: new MultiplyRetry(                  // retry with incremental increase time: 0 1 2 3... seconds wait before retry
            firstOffsetDelay: new Milli(0),
            baseDelay: new Sec(1),
        ),
        scaling: new PiecewiseLinearScaling([ // complex scaling number of workers, for empty stream (< 500) use 16 worker, next 32
            16 => 0,
            32 => 500,
        ]),
        retryInterval: new Milli(100),  // how often we launch retry logic
        claimInterval: new Milli(100),  // how often we launch autoclaim logic
        timeoutJob: new Sec(100),       // timeout for job, after which we cancel it by TimeoutCancellation exception
    );

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 2
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-11-26

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固