Redisson 延迟队列

Redisson 延迟队列

底层实现

底层用 3 个队列实现

  1. 消息延迟队列,Zset 类型,按照过期时间顺序存放消息列表
  2. 消息顺序队列,List 类型,按照添加顺序存放消息列表
  3. 消息目标队列,List 类型,存放到期的消息,供消费者获取

消息来的时候先插入【消息延迟队列】和【消息顺序队列】

最后消费者在【消息目标队列】进行消费的,消费者只需要阻塞等待【消息目标队列】即可

消息移动

在初始化延迟队列的时候,会定时从【消息延迟队列】查询到最新到期时间,定时把【消息延迟队列】的消息移动到【消息目标队列】

如果【消息延迟队列】是空的,就不再定时差,而是等待发布订阅消息提醒,再定时把【消息延迟队列】的消息移动到【消息目标队列】

发送和获取延迟消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 生产者
public void produce() {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);
}

// 消费者
public void consume() throws InterruptedException {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
String msg = blockingQueue.take();
//收到消息进行处理...
}

初始化延迟队列

主要是调用了 QueueTransferTaskstart() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    public void start() {
       RTopic schedulerTopic = getTopic();
       statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
           @Override
           public void onSubscribe(String channel) {
               pushTask();
          }
      });
       
       messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
           @Override
           public void onMessage(CharSequence channel, Long startTime) {
               scheduleTask(startTime);
          }
      });
  }

队列有新的消费者订阅时,调用 pushTask() 方法,把消息发送出去

指定主题有新消息时,调用 scheduleTask(startTime) 方法,会定时调用 pushTask() 方法,把消息发送出去

为什么在消费端也要初始化延迟队列?

接收消息的时候明明用不到 delayedQueue 为什么还要加?

  • 初始化延迟队列的作用是会定时把【消息延迟队列】的到期数据移动到【消息目标队列】
  • 如果发送方发送了延迟消息,但在到期之前下线了,接收方就接收不到了
  • 所以是为了避免一部分数据丢失问题

Redisson 延迟队列
http://showyoubug.cn/2024/07/30/Redisson 延迟队列/
作者
Dong Su
发布于
2024年7月30日
许可协议