Redisson 延迟队列
底层实现
底层用 3 个队列实现
- 消息延迟队列,Zset 类型,按照过期时间顺序存放消息列表
- 消息顺序队列,List 类型,按照添加顺序存放消息列表
- 消息目标队列,List 类型,存放到期的消息,供消费者获取
消息来的时候先插入【消息延迟队列】和【消息顺序队列】
最后消费者在【消息目标队列】进行消费的,消费者只需要阻塞等待【消息目标队列】即可
消息移动
在初始化延迟队列的时候,会定时从【消息延迟队列】查询到最新到期时间,定时把【消息延迟队列】的消息移动到【消息目标队列】
如果【消息延迟队列】是空的,就不再定时差,而是等待发布订阅消息提醒,再定时把【消息延迟队列】的消息移动到【消息目标队列】
发送和获取延迟消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void produce() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS); }
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); }
|
初始化延迟队列
主要是调用了 QueueTransferTask
的 start()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void start() { RTopic schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); }
|
队列有新的消费者订阅时,调用 pushTask()
方法,把消息发送出去
指定主题有新消息时,调用 scheduleTask(startTime)
方法,会定时调用 pushTask()
方法,把消息发送出去
为什么在消费端也要初始化延迟队列?
接收消息的时候明明用不到 delayedQueue
为什么还要加?
- 初始化延迟队列的作用是会定时把【消息延迟队列】的到期数据移动到【消息目标队列】
- 如果发送方发送了延迟消息,但在到期之前下线了,接收方就接收不到了
- 所以是为了避免一部分数据丢失问题