Redis的延迟队列可以用于以下场景:
需求说明:
当用户申请售后,商家未在n小时内处理,系统自动进行退款。
商家拒绝后,用户可申请客服介入,客服x天内超时未处理,系统自动退款。
用户收到货物,x天自动确认收货
等等需要延时操作的流程……
1、秒杀业务
在秒杀业务中,常常需要对用户提交的订单进行实时处理,在高并发场景下,消息队列通常是必不可少的。而Redis的延迟队列可以很好地应对这种场景,将用户订单压入队列中,计算出订单的处理时间,并在指定的时间点推送到下单消息队列中,等待下一个处理流程。
2、任务调度
在许多场景下,需要定期执行一些任务,如对数据的扫描、发送邮件等。将这些任务放入Redis延迟队列中,根据任务执行时间的计算,等待相应时机推送到任务队列中,提高任务的执行效率与稳定性。
3、缓存更新
在应用缓存中,经常需要定期更新缓存。将缓存更新任务加入Redis延迟队列中,根据更新周期计算出下次更新时间,并在相应时机进行缓存更新,保证缓存数据的实时性。
四、Redis延迟队列最佳实践
在使用Redis延迟队列时,应注意以下几点:
1、数据结构的选择
在选择数据结构时,要根据具体场景进行选择。例如,若需要有序并快速查找,则使用Sorted Set更为合适;若仅需要简单的先进先出队列,则使用List即可。
2、消息处理的可靠性
在消息处理过程中,可能会遇到消息重复、消息消失等问题,因此应考虑如何保证消息的完整性与可靠性。可以采用ACK机制、简单重试机制、消息去重等策略来保证消息的可靠性。
3、定时器的精度
由于Redis的定时器粒度是毫秒级的,因此在时间计算时应注意舍入误差、时区处理等问题,避免计算出的时间点与实际时间不符合。
4、扫描策略的合理选择
在扫描延迟队列时,需要注意扫描的频率对Redis的负载影响,应根据实际情况选择合理的扫描策略。
=============================================================
TP6 中使用 think-queue 可以实现普通队列和延迟队列。
think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等队列的多队列, 内存限制 ,启动,停止,守护等消息队列可降级为同步执行
消息队列实现过程
1、通过生产者推送消息到消息队列服务中
2、消息队列服务将收到的消息存入redis队列中(zset)
3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
4、处理获取下来的消息调用业务类进行处理相关业务
5、业务处理后,需要从队列中删除消息
安装队列依赖
composer require topthink/think-queue
在项目下新建一个Job目录存放处理消息
use think\facade\Queue;public function job(Request $request){$params = $request->get();$jobHandlerClassName = 'app\job\Task'; $jobQueueName = 'task';$orderData = ['order_sn'=>$params['id']];//Queue::later();//立即执行$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName); //这儿的10是指10秒后执行队列任务if($isPushed !== false){echo '队列添加成功';}else{echo '插入失败了';}}
编写对应的消费者类 app\job/task.php
<?phpnamespace app\job;use think\queue\Job;class Task
{public function fire(Job $job, $data){$rt = $this->doJob($data);if($rt){$job->delete();return true;}// 重试三次失败 todo...if($job->attempts() == 3){$job->delete();return false;}//执行失败10S后重试$job->release(10);}public function doJob($data){echo date('Y-m-d H:i:s')."\n";return false;}}
php think queue:listen
–queue helloJobQueue \ //监听的队列的名称
–delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
–memory 128 \ //该进程允许使用的内存上限,以 M 为单位
–sleep 3 \ //如果队列中无任务,则多长时间后重新检查
–tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
–timeout 60 // work 进程允许执行的最长时间,以秒为单位