问题

微信公众平台后台有一个功能即定时群发消息,如明晚的20:00群发一条图文消息。那么这种延时触发的逻辑如何实现呢?

方案一

每隔一定的时间扫描所有超时的事件

这是最容易想到的一种方案。此方案最关键的两点是轮训的频率以及如何高效地获取超时任务。

  • 如果可以允许一秒左右的误差,每隔一秒轮训一次即可。
  • 采用红黑树或者最小堆存储触发任务,按照触发时间戳排序。如此,每次扫描能够很快地获取超时的任务。

此种方案的缺点在于即使频率到达一秒,也可能会有一秒的误差。此外,轮训的方式在很多情况下并没有可触发的任务,会浪费资源。插入和删除操作的平均时间复杂度为O(logn)。

实践中,一个很简单的方案就是使用Redis的SortedSet存储触发任务,这样只需要使用zrangeByScore获取超时的任务,再使用zremrangeByScore即可删除已经触发的任务。不过此种方案,由于zrange和zrem是两条命令,在多线程消费时需要控制好并发问题,否则会造成重复消费。此外,此方案缺少ACK机制,会有任务丢失的可能。

如果不关注消费者的高可用(一个队列的消费线程挂了会有其他线程接管),那么最简单的实现就是使用单线程消费,通过多Redis队列分片来提升消费速度。而如果关注消费者的高可用,可以选择Redisson中的RDelayedQueue以及Jesque,它们通过使用Redis Lua实现了并发控制,支持多线程消费。

方案二

阻塞线程等待时间超时

此方案思路来自于Nginx中定时器的实现(和Java中的DelayQueue原理类似)。任务的存储和上面的方案类似,采用最小堆或者红黑树即可。然后选择最近要被触发的任务的时间距离作为阻塞调用epoll_wait的超时(也可以使用其他可以设置超时的阻塞调用)。阻塞超时后,依次获取最小触发时间戳的任务,超时则执行。

此种方案的最大优点在于不会有空的任务检查周期,插入和删除操作的平均时间复杂度和方案一一样是O(logn)。

实践中,给DelayQueue实现持久化机制即可。

方案三

采用环形队列

此方案详细可以见58沈剑的文章《1分钟实现“延迟消息”功能》。大体的思路如下:

采用环形队列,3600个slot,每隔1秒扫描一个slot,检查当前slot里面的所有任务,检查其cycleNum是否为0, 为0则触发,否则cycleNum-1。添加定时事件时,根据扫描指针的当前slot的index和事件触发的时间,计算cycleNum和要放入的slot。

此种方案的本质是栅格化与预计算,插入和删除操作的平均时间复杂度为O(1)。相比起前两种方案,大大提升了每次获取可触发任务的效率。但同样存在每次查询任务有可能做无用功的问题。此外,需要特别处理添加任务和扫描任务的临界点的问题,否则也可能会有时间上的误差。

Netty中的HashedWheelTimer对于此种方案做了实现。PS: 多谢@imangry提示

此外,需要提到的是Kafka使用的DelayQueue+分层时间轮的方案。这种方案使用DelayQueue避免了空轮训的问题,同时分层的方式能够减少每一个slot存放任务过多的问题。详情可见:Kafka解惑之时间轮(TimingWheel)

方案四

延时消息队列

目前,RabbitMQ、RocketMQ都支持延时消息队列。其中,RabbitMQ的实现思路是基于TTL的,详细可见:http://www.cnblogs.com/haoxinyue/p/6613706.html。而RocketMQ的延迟时间是预定义的,不够灵活。

此种方案,最大的优势是使用简单,且支持ACK机制。但如果要取消定时任务,则需要在业务层实现。

结论

  1. 如果应用中已经有RabbitMQ或者追求任务消费的可靠性,推荐使用RabbitMQ。
  2. 简单使用,数据量不大,对高可用、任务消费可靠性要求不高的情况下,可以选择单线程轮询数据库或者Redis方案。
  3. 数据量较大、关注消费速度和高可用、对任务消费的可靠性要求不高,推荐使用Jesque/RDelayedQueue。