awdn/vigilant-queue 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

awdn/vigilant-queue

Composer 安装命令:

composer require awdn/vigilant-queue

包简介

React-PHP based priority queue key-value storage for expiration time based object eviction.

README 文档

README

React-PHP based priority queue key-value storage for expiration time based object eviction.

Objective

The objective is to create a prioritized list ordered by an expiration time of the item. When querying the list if there is an expired item by a given threshold, the list should return the item with the lowest expiration time below the threshold and then remove the item from the list. Each item has a unique key (which is allowed to be only once within the list), as well as a data attribute and an expiration time. If an item should be added to the list and the key already exists, the values for the expiration time and data of the existing item have to be update within the list. The list has to be reordered based on the new priority of the item as well.

Use Case

Imagine a distributed web environment where actions in your application have to be taken based on frequently occurring events for the same entities. If these actions are very cost intensive it probably makes sense to execute them only once by buffering the events for a while and fetch the latest scheduled action from the queue as soon as the expiration time has been reached.

First Approach

This approach is based on a doubly linked list. Inserts have to be done ordered based on the expiration time, so that the item with the lowest expiration time is at the beginning of the list. From there it can be fetched with a pop() operation. When updating an existing item it has to be moved within the list, if the expiration time has changed. Advantage: Very intuitive. No redundant data. Everything can be implement within a list. Disadvantage: Needs to traverse the list very often when inserting or updating item. Cost intensive in terms of CPU time.

Second Approach

A PriorityQueue (implemented as min-heap), where each item is not necessarily unique, supported by two HashMaps holding the data and the most recent priority for a given key. When querying the queue for an item to be evicted the top element of the queue will be checked, if its expiration time is equal or lower than a given threshold (current time). To validate if this is the most recent version of the item, the expiration time of the top item from the queue will be compared with the latest known priority for the given key from the HashMap. If this is the the case, the item will be returned from the evict() function, otherwise it will return null. In both cases the item will be removed from the queue and from both HashMaps. Advantage: No need for cost intensive traversals of a list. Disadvantage: Redundant data within the PriorityQueue.

Pseudo code visualising the second approach

class PriorityHashQueue
    PriorityQueue <Key, Prio> q
    ArrayObject<Key, Data> d
    ArrayObject<Key, Prio> p
    
    push(Key, Data, Prio):
            d.set(Key, Data)
            p.set(Key, Prio)
            q.insert(Key, Prio)
    
    
    evict(Threshold):
        if (q.valid())
            item = q.top()
            if (item.prio < Threshold)
            q.next()
            if (p.get(item.key) == item.prio)
                data = d.get(item.key)
                d.unset(item.key)
                p.unset(item.key)
                return data
        return null

Run the example

Change directory into the examples sub folder, open three terminals and run the following commands.

Starting the process

Starting the queue server

$ php deferred-queue-daemon.php --debug 1 --evictionTickrate 1000

Starting a consumer process which just collects the evicted messages

$ php console-consumer.php --debug 1

Starting a producer simulation which generates some data:

$ php console-producer.php --debug 1 --simulate 1 --keyPrefix mk --keyDistribution 1 --numMessages 1 --expMinMs 3000000 --expMaxMs 3000000 --sleepUs 0

Output

Producer

Producing a new message which is then send to the queue:

2016-04-20 21:33:23 - Using tcp://127.0.0.1:4444 for inter process communication.
2016-04-20 21:33:24 - Generating packets...
2016-04-20 21:33:24 - mk_1:3000000:string|8a0b582099de9d149939c871449ebca96bac73ee

Queue Server

See here how a message comes in which is evicted three seconds later:

2016-04-20 21:33:22 - Running Awdn\VigilantQueue\DeferredQueueServer
2016-04-20 21:33:22 - The eviction tick rate is set to 1000/second.
2016-04-20 21:33:22 - Binding inbound ZMQ to 'tcp://127.0.0.1:4444'.
2016-04-20 21:33:22 - Binding outbound ZMQ to 'tcp://127.0.0.1:5444'.
2016-04-20 21:33:23 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:23 - [STATS] Added objects: 0, evictions: 0 (0 Obj/Sec, 0 Evi/Sec).
2016-04-20 21:33:24 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:24 - [STATS] Added objects: 0, evictions: 0 (0 Obj/Sec, 0 Evi/Sec).
2016-04-20 21:33:24 - [OnMessage] Data for key 'mk_1' [type 'string', exp 3000000 ms]: '8a0b582099de9d149939c871449ebca96bac73ee'
2016-04-20 21:33:25 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:25 - [STATS] Added objects: 1, evictions: 0 (1 Obj/Sec, 0 Evi/Sec).
2016-04-20 21:33:26 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:26 - [STATS] Added objects: 1, evictions: 0 (0 Obj/Sec, 0 Evi/Sec).
2016-04-20 21:33:27 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:27 - [STATS] Added objects: 1, evictions: 0 (0 Obj/Sec, 0 Evi/Sec).
2016-04-20 21:33:27 - [Eviction] Timeout detected for 'mk_1' at 1461231207.852
2016-04-20 21:33:28 - [WARN] MemoryUsage:    2 MB.
2016-04-20 21:33:28 - [STATS] Added objects: 1, evictions: 1 (0 Obj/Sec, 1 Evi/Sec).

Consumer

Consume the evicted message:

2016-04-20 21:33:24 - Connect to zmq at 'tcp://127.0.0.1:5444' (incoming evicted jobs from queue).
2016-04-20 21:33:27 - Received message: 8a0b582099de9d149939c871449ebca96bac73ee

Benchmark

The benchmark shows how the server receives 1.000.000 messages, puts them on the queue and evicts based on the defined expiration timeout.

统计信息

  • 总下载量: 20
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 2
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 2
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2016-04-20

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固