延迟任务精准发布文章
1)文章定时发布
2)延迟任务概述
2.1)什么是延迟任务
- 定时任务:有固定周期的,有明确的触发时间
- 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟

应用场景:
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
2.2)技术对比
2.2.1)DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
实现:
1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,
2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,
3:循环的从延迟队列中拉取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class DelayedTask implements Delayed{ private int executeTime = 0; public DelayedTask(int delay){ Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND,delay); this.executeTime = (int)(calendar.getTimeInMillis() /1000 ); }
@Override public long getDelay(TimeUnit unit) { Calendar calendar = Calendar.getInstance(); return executeTime - (calendar.getTimeInMillis()/1000); }
@Override public int compareTo(Delayed o) { long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return val == 0 ? 0 : ( val < 0 ? -1: 1 ); }
public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); queue.add(new DelayedTask(5)); queue.add(new DelayedTask(10)); queue.add(new DelayedTask(15));
System.out.println(System.currentTimeMillis()/1000+" start consume "); while(queue.size() != 0){ DelayedTask delayedTask = queue.poll(); if(delayedTask !=null ){ System.out.println(System.currentTimeMillis()/1000+" cosume task"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
DelayQueue实现完成之后思考一个问题:
使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)
2.2.2)RabbitMQ实现延迟任务

2.2.3)redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

3)redis实现延迟任务
实现思路

问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度
3.在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
4)延迟任务服务实现
4.1)搭建heima-leadnews-schedule模块
leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务
①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:

②:添加bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12
| server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
|
③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
1 2 3 4 5 6 7 8 9 10 11
| spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root
mybatis-plus: mapper-locations: classpath*:mapper/*.xml type-aliases-package: com.heima.model.schedule.pojos
|
4.2)数据库准备
导入资料中leadnews_schedule数据库
taskinfo 任务表

实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.heima.model.schedule.pojos;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;
import java.io.Serializable; import java.util.Date;
@Data @TableName("taskinfo") public class Taskinfo implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.ID_WORKER) private Long taskId;
@TableField("execute_time") private Date executeTime;
@TableField("parameters") private byte[] parameters;
@TableField("priority") private Integer priority;
@TableField("task_type") private Integer taskType;
}
|
taskinfo_logs 任务日志表

实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.heima.model.schedule.pojos;
import com.baomidou.mybatisplus.annotation.*; import lombok.Data;
import java.io.Serializable; import java.util.Date;
@Data @TableName("taskinfo_logs") public class TaskinfoLogs implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.ID_WORKER) private Long taskId;
@TableField("execute_time") private Date executeTime;
@TableField("parameters") private byte[] parameters;
@TableField("priority") private Integer priority;
@TableField("task_type") private Integer taskType;
@Version private Integer version;
@TableField("status") private Integer status;
}
|
乐观锁支持:
1 2 3 4 5 6 7 8 9 10
|
@Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; }
|
4.3)安装redis
①拉取镜像
② 创建容器
1
| docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
|
③链接测试
打开资料中的Redis Desktop Manager,输入host、port、password链接测试

能链接成功,即可
4.4)项目集成redis
① 在项目导入redis相关依赖,已经完成
1 2 3 4 5 6 7 8 9
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
|
② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
1 2 3 4 5
| spring: redis: host: 192.168.200.130 password: leadnews port: 6379
|
③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

④:测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.heima.schedule.test;
import com.heima.common.redis.CacheService; import com.heima.schedule.ScheduleApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
import java.util.Set;
@SpringBootTest(classes = ScheduleApplication.class) @RunWith(SpringRunner.class) public class RedisTest {
@Autowired private CacheService cacheService;
@Test public void testList(){
String list_001 = cacheService.lRightPop("list_001"); System.out.println(list_001); }
@Test public void testZset(){
Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888); System.out.println(zset_key_001);
} }
|
4.5)添加任务
①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.heima.model.schedule.dtos;
import lombok.Data;
import java.io.Serializable;
@Data public class Task implements Serializable {
private Long taskId;
private Integer taskType;
private Integer priority;
private long executeTime;
private byte[] parameters; }
|
③:创建TaskService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.heima.schedule.service;
import com.heima.model.schedule.dtos.Task;
public interface TaskService {
public long addTask(Task task) ;
}
|
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| package com.heima.schedule.service.impl;
import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;
import java.util.Calendar; import java.util.Date;
@Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService {
@Override public long addTask(Task task) {
boolean success = addTaskToDb(task);
if (success) { addTaskToCache(task); }
return task.getTaskId(); }
@Autowired private CacheService cacheService;
private void addTaskToCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis();
if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); }
}
@Autowired private TaskinfoMapper taskinfoMapper;
@Autowired private TaskinfoLogsMapper taskinfoLogsMapper;
private boolean addTaskToDb(Task task) {
boolean flag = false;
try { Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo);
task.setTaskId(taskinfo.getTaskId());
TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs);
flag = true; } catch (Exception e) { e.printStackTrace(); }
return flag; } }
|
ScheduleConstants常量类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.heima.common.constants;
public class ScheduleConstants {
public static final int SCHEDULED=0;
public static final int EXECUTED=1;
public static final int CANCELLED=2;
public static String FUTURE="future_";
public static String TOPIC="topic_"; }
|
④:测试
4.6)取消任务
在TaskService中添加方法
1 2 3 4 5 6 7
|
public boolean cancelTask(long taskId);
|
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
@Override public boolean cancelTask(long taskId) {
boolean flag = false;
Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
if(task != null){ removeTaskFromCache(task); flag = true; }
return false; }
private void removeTaskFromCache(Task task) {
String key = task.getTaskType()+"_"+task.getPriority();
if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } }
private Task updateDb(long taskId, int status) { Task task = null; try { taskinfoMapper.deleteById(taskId);
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs);
task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); }
return task;
}
|
测试
4.7)消费任务
在TaskService中添加方法
1 2 3 4 5 6 7
|
public Task poll(int type,int priority);
|
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); }
return task; }
|
4.8)未来数据定时刷新
4.8.1)reids key值匹配
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

代码案例:
1 2 3 4 5 6 7 8
| @Test public void testKeys(){ Set<String> keys = cacheService.keys("future_*"); System.out.println(keys);
Set<String> scan = cacheService.scan("future_*"); System.out.println(scan); }
|
4.8.2)reids管道
普通redis客户端和服务器交互模式

Pipeline请求模型

官方测试结果数据对比

测试案例对比:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Test public void testPiple1(){ long start =System.currentTimeMillis(); for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); cacheService.lLeftPush("1001_1", JSON.toJSONString(task)); } System.out.println("耗时"+(System.currentTimeMillis()- start)); }
@Test public void testPiple2(){ long start = System.currentTimeMillis(); List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes()); } return null; } }); System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒"); }
|
4.8.3)未来数据定时刷新-功能完成
在TaskService中添加方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Scheduled(cron = "0 */1 * * * ?") public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下"); } } }
|
在引导类中添加开启任务调度注解:@EnableScheduling
4.9)分布式锁解决集群下的方法抢占执行
4.9.1)问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

4.9.2)分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
解决方案:

4.9.3)redis分布式锁
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功
- 客户端B执行代码完成,删除锁
4.9.4)在工具类CacheService中添加方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try {
Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
|
修改未来数据定时刷新的方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Scheduled(cron = "0 */1 * * * ?") public void refresh(){
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务");
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } }
|
4.10)数据库同步到redis

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5);
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } }
private void clearCache(){ Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*"); cacheService.delete(futurekeys); cacheService.delete(topickeys); }
|
5)延迟队列解决精准时间发布文章
5.1)延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.heima.apis.schedule;
import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody;
@FeignClient("leadnews-schedule") public interface IScheduleClient {
@PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task);
@GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
@GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
|
在heima-leadnews-schedule微服务下提供对应的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.heima.schedule.feign;
import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;
@RestController public class ScheduleClient implements IScheduleClient {
@Autowired private TaskService taskService;
@PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); }
@GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); }
@GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
|
5.2)发布文章集成添加延迟队列接口
在创建WmNewsTaskService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package com.heima.wemedia.service;
import com.heima.model.wemedia.pojos.WmNews;
public interface WmNewsTaskService {
public void addNewsToTask(Integer id, Date publishTime);
}
|
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.heima.wemedia.service.impl;
import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;
@Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired private IScheduleClient scheduleClient;
@Override @Async public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加任务到延迟服务中----begin");
Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟服务中----end");
} }
|
枚举类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.heima.model.common.enums;
import lombok.AllArgsConstructor; import lombok.Getter;
@Getter @AllArgsConstructor public enum TaskTypeEnum {
NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; private final int priority; private final String desc; }
|
序列化工具对比
- JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
- Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
拷贝资料中的两个类到heima-leadnews-utils下
Protostuff需要引导依赖:
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.6.0</version> </dependency>
<dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.6.0</version> </dependency>
|
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| @Autowired private WmNewsTaskService wmNewsTaskService;
@Override public ResponseResult submitNews(WmNewsDto dto) {
if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); }
WmNews wmNews = new WmNews(); BeanUtils.copyProperties(dto,wmNews); if(dto.getImages() != null && dto.getImages().size() > 0){ String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); }
saveOrUpdateWmNews(wmNews);
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId());
saveRelativeInfoForCover(dto,wmNews,materials);
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
|
5.3)消费任务进行审核文章
WmNewsTaskService中添加方法
1 2 3 4
| /** * 消费延迟队列数据 */ public void scanNewsByTask();
|
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
@Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() {
log.info("文章审核---消费任务执行---begin---");
ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章审核---消费任务执行---end---"); }
|
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
6)作业