定时任务:有固定周期的,有明确的触发时间
延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事
件,任务可以立即执行,也可以延迟
DelayQueue,基于JVM
RabbitMQ:TTL+死信队列(常用)
Redis:zset特性(常用,本项目实现)
延时场景
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
准备工作
redis:实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳(毫秒值)作为score进行排序
例如:
生产者添加到4个任务到延迟队列中,时间毫秒值分别为97、98、99、100。当前时间的毫秒值为90
消费者端进行监听,如果当前时间的毫秒值匹配到了延迟队列中的毫秒值就立即消费
实现思路
使用redis的list存储当前消费队列,使用redis的zset未来数据队列
注:redis的list是基于双向链表实现的,查找和插入的性能都较高。在数据量大时,性能要比zset高的多
先将任务添加到数据库中
任务的执行时间<=当前时间(需立即2执行),则存放到list中,如果大于当前时间,则存放在zset中。
需要从数据库(DB)中定时同步数据至zset(为了性能考虑,将未来5分钟内要执行的任务同步至zset中),且需要每分钟从zset中同步数据至list中
过程中使用数据库做持久化
延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
使用redis中的两种数据类型,list和zset
原因一:Iist存储立即执行的任务,zset存储未来的数据
原因二:任务量过大以后,Zset的性能会下降
时间复杂度:执行时间(次数)随着数据规模增长的变化趋势
操作redis中的ist命令LPUSH:时间复杂度:O(1)
操作redis中的zset命令zadd:时间复杂度:O(M*log(n))
在添加zset数据的时候,为什么需要预加载?
如果任务数据特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可,是一种优化的形式
数据库准备-数据库自身解决并发两种策略
悲观锁(Pessimistic Lock)
每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁
乐观锁(Optimistic Lock)
每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新
这个数据,可以使用版本号等机制,如修改之前的版本号为1,修改后,在提交修改时会判断版本号是否仍为1,不为1的话就提交失败。
悲观锁的效率较低,而乐观锁的效率较高
mybatis-plus集成乐观锁的使用
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
Redis集成
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
功能实现
添加任务
任务类
@Data
public class Task implements Serializable {
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskType;
/**
* 优先级
*/
private Integer priority;
/**
* 执行id
*/
private long executeTime;
/**
* task参数
*/
private byte[] parameters;
}
任务日志类
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}