延迟队列
延迟队列定义
队列:
它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列中的元素会被优先取出进行消费
延迟队列
普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
实现方案
Redis
我们知道Redis有一个有序集合的数据结构ZSet,ZSet中每个元素都有一个对应Score,ZSet中所有元素是按照其Score进行排序的。
那么我们可以通过以下这几个操作使用Redis的ZSet来实现一个延迟队列:
入队操作:ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为Score加入到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素个数,因此我们能相对比较高效的进行入队操作。
起一个进程定时(比如每隔一秒)通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素,具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:
a. 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
b. 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明ZSet中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;
同样的,ZRANGEBYSCORE操作的时间复杂度为O(logN + M),其中N为ZSet中元素个数,M为查询的元素个数,因此我们定时查询操作也是比较高效的。
RabbitMQ
RabbitMQ
RabbitMQ本身并不直接提供对延迟队列的支持,我们依靠RabbitMQ的TTL以及死信队列功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ的死信队列以及TTL功能。
死信队列
死信队列实际上是一种RabbitMQ的消息处理机制,当RabbmitMQ在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:
- 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
- 消息超时未消费,也就是TTL过期了
- 消息队列到达最大长度
消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。
消息生存时间TTL
TTL(Time-To-Live)是RabbitMQ的一种高级特性,表示了一条消息的最大生存时间,单位为毫秒。如果一条消息在TTL设置的时间内没有被消费,那么它就会变成一条死信,进入我们上面所说的死信队列。
有两种不同的方式可以设置消息的TTL属性,一种方式是直接在创建队列的时候设置整个队列的TTL过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列,参考代码如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
在延迟队列的延迟时间为固定值的时候,比较适合使用这种方式。
另一种方式是针对单条消息设置,参考代码如下,该消息被设置了6秒的过期时间:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());
如果需要不同的消息设置不同的延迟时间,上面针对队列的TTL设置便无法满足我们的需求,需要使用这种针对单个消息的TTL设置。
不过需要注意的是,使用这种方式设置的TTL,消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期。比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,那么RabbitMQ会等到第一个消息过期之后,才会让第二个消息过期。