发布于 

延迟队列

延迟队列定义

  • 队列:

    它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列中的元素会被优先取出进行消费

  • 延迟队列

    普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

实现方案

Redis

我们知道Redis有一个有序集合的数据结构ZSet,ZSet中每个元素都有一个对应Score,ZSet中所有元素是按照其Score进行排序的。

那么我们可以通过以下这几个操作使用Redis的ZSet来实现一个延迟队列:

入队操作:ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为Score加入到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素个数,因此我们能相对比较高效的进行入队操作。

起一个进程定时(比如每隔一秒)通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素,具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:

  • a. 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;

  • b. 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明ZSet中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;

同样的,ZRANGEBYSCORE操作的时间复杂度为O(logN + M),其中N为ZSet中元素个数,M为查询的元素个数,因此我们定时查询操作也是比较高效的。

RabbitMQ

RabbitMQ
RabbitMQ本身并不直接提供对延迟队列的支持,我们依靠RabbitMQ的TTL以及死信队列功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ的死信队列以及TTL功能。

死信队列

死信队列实际上是一种RabbitMQ的消息处理机制,当RabbmitMQ在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:

  • 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
  • 消息超时未消费,也就是TTL过期了
  • 消息队列到达最大长度

消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。


本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

本站由 @shyiuanchen 创建,使用 Stellar 作为主题。