延迟队列--Redis实现

2025-02-04

定时任务:有固定周期的,有明确的触发时间

延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事

件,任务可以立即执行,也可以延迟

  • DelayQueue,基于JVM

  • RabbitMQ:TTL+死信队列(常用)

  • Redis:zset特性(常用,本项目实现)

延时场景

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

准备工作

redis:实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳(毫秒值)作为score进行排序

image-fxup.png

例如:

生产者添加到4个任务到延迟队列中,时间毫秒值分别为97、98、99、100。当前时间的毫秒值为90

消费者端进行监听,如果当前时间的毫秒值匹配到了延迟队列中的毫秒值就立即消费

实现思路

使用redis的list存储当前消费队列,使用redis的zset未来数据队列

注:redis的list是基于双向链表实现的,查找和插入的性能都较高。在数据量大时,性能要比zset高的多

image-efra.png

  1. 先将任务添加到数据库中

  2. 任务的执行时间<=当前时间(需立即2执行),则存放到list中,如果大于当前时间,则存放在zset中。

  3. 需要从数据库(DB)中定时同步数据至zset(为了性能考虑,将未来5分钟内要执行的任务同步至zset中),且需要每分钟从zset中同步数据至list中

过程中使用数据库做持久化

延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

使用redis中的两种数据类型,list和zset

  • 原因一:Iist存储立即执行的任务,zset存储未来的数据

  • 原因二:任务量过大以后,Zset的性能会下降

时间复杂度:执行时间(次数)随着数据规模增长的变化趋势

操作redis中的ist命令LPUSH:时间复杂度:O(1)

操作redis中的zset命令zadd:时间复杂度:O(M*log(n))

说明

O(1)

常量级复杂度,执行次数与数据规模没有关系

O(M*log(n)

对数级复杂度,执行次数与数据规模是对数关系

在添加zset数据的时候,为什么需要预加载?

如果任务数据特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可,是一种优化的形式

数据库准备-数据库自身解决并发两种策略

  • 悲观锁(Pessimistic Lock)

每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁

  • 乐观锁(Optimistic Lock)

每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新

这个数据,可以使用版本号等机制,如修改之前的版本号为1,修改后,在提交修改时会判断版本号是否仍为1,不为1的话就提交失败。

悲观锁的效率较低,而乐观锁的效率较高

mybatis-plus集成乐观锁的使用

/**
 * mybatis-plus乐观锁支持
 * @return
 */
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
    return interceptor;
}

Redis集成

maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

功能实现

添加任务

任务类

@Data
public class Task implements Serializable {

    /**
     * 任务id
     */
    private Long taskId;
    /**
     * 类型
     */
    private Integer taskType;

    /**
     * 优先级
     */
    private Integer priority;

    /**
     * 执行id
     */
    private long executeTime;

    /**
     * task参数
     */
    private byte[] parameters;
    
}

任务日志类

@Data
@TableName("taskinfo_logs")
public class  TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;
}

PREV
SpringBoot应用集成测试
NEXT
OCR图片文字识别