redis 延时队列

延时队列,顾名思义是带有延时功能的消息队列,列举几个使用场景:

  1. 定时发公告
  2. 用户下单30分钟后未付款自动关闭订单
  3. 用户下单后延时短信提醒
  4. 延时关闭空闲客户端连接
redis 延时队列

使用redis提供的有序数据结构zset,把过期时间戳作为score。

redis 延时队列

@Slf4j

@Service

public class RedisDelayQueue {

@Resource

private StringRedisTemplate stringRedisTemplate;

private ScheduledExecutorService timer = Executors.newScheduledThreadPool(16);

private List<string> consumeTopics = new ArrayList<>(64);/<string>

@PostConstruct

public void init() {

Set<string> topicList = stringRedisTemplate.opsForSet().members("topicList");/<string>

if (topicList != null) {

topicList.forEach(this::registTopic);

}

}

private void registTopic(String topic) {

log.info("注册监听topic消息:{}", topic);

timer.scheduleAtFixedRate(() -> {

Set<string> msgs = stringRedisTemplate.opsForZSet().rangeByScore(topic, 0, System.currentTimeMillis(), 0, 1000);/<string>

if (msgs != null && msgs.size() > 0) {

Long remove = stringRedisTemplate.opsForZSet().remove(topic, msgs.toArray());

//删除结果大于0代表 抢到了

if( remove != null && remove> 0 ){

stringRedisTemplate.opsForList().leftPushAll(topic + "queue", msgs);

}

}

}, 1, 1, TimeUnit.SECONDS);

}

public void produce(String topic, String msg, Date date) {

log.info("topic:{} 生产消息:{},于{}消费", topic, msg, date);

Long addSuccess = stringRedisTemplate.opsForSet().add("topicList", topic);

if (addSuccess != null && addSuccess > 0) {

registTopic(topic);

}

stringRedisTemplate.opsForZSet().add(topic, msg, date.getTime());

}

public synchronized void consumer(String topic, Function<string> consumer) {/<string>

if (consumeTopics.contains(topic)) {

throw new RuntimeException("请勿重复监听消费" + topic);

}

consumeTopics.add(topic);

int consumerPoolSize = 10;

ExecutorService consumerPool = Executors.newFixedThreadPool(consumerPoolSize);

for (int i = 0; i < consumerPoolSize; i++) {

consumerPool.submit(() -> {

do {

log.info("循环取消息:{}", topic);

String msg;

try {

msg = stringRedisTemplate.opsForList().rightPop(topic + "queue", 1000, TimeUnit.MINUTES);

} catch (QueryTimeoutException e) {

log.debug("监听超时,重试中!");

continue;

}

log.info("{}监听到消息:{}", topic, msg);

if (msg != null) {

Boolean consumerSuccess;

try {

consumerSuccess = consumer.apply(msg);

} catch (Exception e) {

log.warn("消费失败!", e);

consumerSuccess = false;

}

//消费失败,1分钟后再重试

if (consumerSuccess == null || !consumerSuccess) {

log.info("消费失败,重新放回队列。msg:{},topic:{}", msg, topic);

produce(topic, msg, new Date(System.currentTimeMillis() + 60000));

}

}

} while (true);

});

}

}

}

单元测试

@Slf4j

@RunWith(SpringRunner.class)

@SpringBootTest

public class RedisDelayQueueTest {

@Resource

private RedisDelayQueue redisDelayQueue;

@Test

public void produce() {

for (int i = 0; i < 30; i++) {

redisDelayQueue.produce("topic"+i%3 , "hello message"+i , new Date(System.currentTimeMillis()+i*1000));

}

}

@Test

public void consumer() throws InterruptedException {

redisDelayQueue.consumer("topic0", (msg)->{

log.info("topic【{}】收到消息:{}","topic0",msg);

return true;

}); redisDelayQueue.consumer("topic1", (msg)->{

log.info("topic【{}】收到消息:{}","topic1",msg);

return true;

});

redisDelayQueue.consumer("topic2", (msg) -> {

log.info("topic【{}】收到消息:{}", "topic2", msg);

return true;

});

TimeUnit.MINUTES.sleep(10);

}

}


分享到:


相關文章: