springboot+quartz+jpa实现动态定时任务。
配置maven
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<!--druid-spring-boot-starter-->
<version>1.1.10</version>
</dependency>
<!--quartz依赖-->
<dependency> <groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.3</version>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency> <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置application.properties
quartz.enable=true
server.port=8080
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/super
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.tomcat.initial-size=20
spring.datasource.tomcat.max-active=100
spring.datasource.tomcat.max-idle=100
spring.datasource.tomcat.min-idle=20
spring.datasource.tomcat.max-wait=10000
spring.datasource.tomcat.test-while-idle=true
spring.datasource.tomcat.test-on-borrow=false
spring.datasource.tomcat.test-on-return=false
#ID设置为自动获取 每一个必须不同 (所有调度器实例中是唯一的)
org.quartz.scheduler.instanceId=AUTO
#指定调度程序的主线程是否应该是守护线程
org.quartz.scheduler.makeSchedulerThreadDaemon=true
#ThreadPool实现的类名
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#ThreadPool配置线程守护进程
org.quartz.threadPool.makeThreadsDaemons=true
#线程数量
org.quartz.threadPool.threadCount=20
#线程优先级
org.quartz.threadPool.threadPriority=5
#数据保存方式为持久化
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#StdJDBCDelegate说明支持集群
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties = true
#quartz内部表的前缀
org.quartz.jobStore.tablePrefix=QRTZ_
#是否加入集群
org.quartz.jobStore.isClustered=true
#容许的最大作业延长时间
org.quartz.jobStore.misfireThreshold=25000
spring.jpa.show-sql=true
配置config全局配置类
package com.example.springbootquartz.config;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
@Configuration
public class ConfigureQuartz {
//配置JobFactory
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
/**
* SchedulerFactoryBean这个类的真正作用提供了对org.quartz.Scheduler的创建与配置,并且会管理它的生命周期与Spring同步。
* org.quartz.Scheduler: 调度器。所有的调度都是由它控制。
* @param dataSource 为SchedulerFactory配置数据源
* @param jobFactory 为SchedulerFactory配置JobFactory
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory) throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
//可选,QuartzScheduler启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true); //设置自行启动
factory.setDataSource(dataSource);
factory.setJobFactory(jobFactory);
factory.setQuartzProperties(quartzProperties());
return factory;
}
//从quartz.properties文件中读取Quartz配置属性
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/application.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
//配置JobFactory,为quartz作业添加自动连接支持
public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
}
自定义Stringutils类
*
package com.example.springbootquartz.util;
import java.util.List;
import java.util.Map;
public enum StringUtils {
getStringUtil;
//是否为空
public boolean isEmpty(String str) {
return (str == null) || (str.length() == 0) || (str.equals(""));
}
//去空格
public String trim(String str) {
return str == null ? null : str.trim();
}
//获取Map参数值
public String getMapString(Map<String, String> map) {
String result = "";
for (Map.Entry entry : map.entrySet()) {
result += entry.getValue() + " ";
}
return result;
}
//获取List参数值
public String getListString(List<String> list) {
String result = "";
for (String s : list) {
result += s + " ";
}
return result;
}
}
配置实体类
package com.example.springbootquartz.entity;
import javax.persistence.*;
import java.io.Serializable;
@Entity
@Table(name = "JOB_ENTITY")
public class JobEntity implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name; //job名称
private String group; //job组名
private String cron; //执行的cron
private String parameter; //job的参数
private String description; //job描述信息
@Column(name = "vm_param")
private String vmParam; //vm参数
@Column(name = "jar_path")
private String jarPath; //job的jar路径,在这里我选择的是定时执行一些可执行的jar包
private String status; //job的执行状态,这里我设置为OPEN/CLOSE且只有该值为OPEN才会执行该Job
public JobEntity() {
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public String getParameter() {
return parameter;
}
public void setParameter(String parameter) {
this.parameter = parameter;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getVmParam() {
return vmParam;
}
public void setVmParam(String vmParam) {
this.vmParam = vmParam;
}
public String getJarPath() {
return jarPath;
}
public void setJarPath(String jarPath) {
this.jarPath = jarPath;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return "JobEntity{" +
"id=" + id +
", name='" + name + '\'' +
", group='" + group + '\'' +
", cron='" + cron + '\'' +
", parameter='" + parameter + '\'' +
", description='" + description + '\'' +
", vmParam='" + vmParam + '\'' +
", jarPath='" + jarPath + '\'' +
", status='" + status + '\'' +
'}';
}
//新增Builder模式,可选,选择设置任意属性初始化对象
public JobEntity(Builder builder) {
id = builder.id;
name = builder.name;
group = builder.group;
cron = builder.cron;
parameter = builder.parameter;
description = builder.description;
vmParam = builder.vmParam;
jarPath = builder.jarPath;
status = builder.status;
}
public static class Builder {
private Integer id;
private String name = ""; //job名称
private String group = ""; //job组名
private String cron = ""; //执行的cron
private String parameter = ""; //job的参数
private String description = ""; //job描述信息
private String vmParam = ""; //vm参数
private String jarPath = ""; //job的jar路径
private String status = ""; //job的执行状态,只有该值为OPEN才会执行该Job
public Builder withId(Integer i) {
id = i;
return this;
}
public Builder withName(String n) {
name = n;
return this;
}
public Builder withGroup(String g) {
group = g;
return this;
}
public Builder withCron(String c) {
cron = c;
return this;
}
public Builder withParameter(String p) {
parameter = p;
return this;
}
public Builder withDescription(String d) {
description = d;
return this;
}
public Builder withVMParameter(String vm) {
vmParam = vm;
return this;
}
public Builder withJarPath(String jar) {
jarPath = jar;
return this;
}
public Builder withStatus(String s) {
status = s;
return this;
}
public JobEntity newJobEntity() {
return new JobEntity(this);
}
}
}
配置数据访问层
package com.example.springbootquartz.dao;
import com.example.springbootquartz.entity.JobEntity;
import org.springframework.data.repository.CrudRepository;
public interface JobEntityRepository extends CrudRepository<JobEntity,Long> {
JobEntity getById(Integer id);
}
配置service类
package com.example.springbootquartz.service;
import com.example.springbootquartz.dao.JobEntityRepository;
import com.example.springbootquartz.entity.JobEntity;
import com.example.springbootquartz.job.DynamicJob;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class DynamicJobService {
@Autowired
private JobEntityRepository repository;
//通过Id获取Job
public JobEntity getJobEntityById(Integer id) {
return repository.getById(id);
}
//从数据库中加载获取到所有Job
public List<JobEntity> loadJobs() {
List<JobEntity> list = new ArrayList<>();
repository.findAll().forEach(list::add);
return list;
}
//获取JobDataMap.(Job参数对象)
public JobDataMap getJobDataMap(JobEntity job) {
JobDataMap map = new JobDataMap();
map.put("name", job.getName());
map.put("group", job.getGroup());
map.put("cronExpression", job.getCron());
map.put("parameter", job.getParameter());
map.put("JobDescription", job.getDescription());
map.put("vmParam", job.getVmParam());
map.put("jarPath", job.getJarPath());
map.put("status", job.getStatus());
return map;
}
//获取JobDetail,JobDetail是任务的定义,而Job是任务的执行逻辑,JobDetail里会引用一个Job Class来定义
public JobDetail geJobDetail(JobKey jobKey, String description, JobDataMap map) {
return JobBuilder.newJob(DynamicJob.class)
.withIdentity(jobKey)
.withDescription(description)
.setJobData(map)
.storeDurably()
.build();
}
//获取Trigger (Job的触发器,执行规则)
public Trigger getTrigger(JobEntity job) {
return TriggerBuilder.newTrigger()
.withIdentity(job.getName(), job.getGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCron()))
.build();
}
//获取JobKey,包含Name和Group
public JobKey getJobKey(JobEntity job) {
return JobKey.jobKey(job.getName(), job.getGroup());
}
}
配置controller类
package com.example.springbootquartz.controller;
import com.example.springbootquartz.entity.JobEntity; import com.example.springbootquartz.service.DynamicJobService; import org.quartz.*; import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct; import java.util.Set;
@RestController public class JobController {
private static final Logger logger = LoggerFactory.getLogger(JobController.class);
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private DynamicJobService jobService;
//初始化启动所有的Job(服务运行时启动全部任务)
@PostConstruct
public void initialize() {
try {
reStartAllJobs();
logger.info("INIT SUCCESS");
} catch (SchedulerException e) {
logger.info("INIT EXCEPTION : " + e.getMessage());
e.printStackTrace();
}
}
//根据ID重启某个Job
@RequestMapping("/refresh/{id}")
public String refresh(@PathVariable Integer id) throws SchedulerException {
String result;
JobEntity entity = jobService.getJobEntityById(id);
if (entity == null) {
return "error: id is not exist ";
}
synchronized (logger) {
JobKey jobKey = jobService.getJobKey(entity);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.pauseJob(jobKey);
scheduler.unscheduleJob(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()));
scheduler.deleteJob(jobKey);
JobDataMap map = jobService.getJobDataMap(entity);
JobDetail jobDetail = jobService.geJobDetail(jobKey, entity.getDescription(), map);
if (entity.getStatus().equals("OPEN")) {
scheduler.scheduleJob(jobDetail, jobService.getTrigger(entity));
result = "Refresh Job : " + entity.getName() + "\t jarPath: " + entity.getJarPath() + " success !";
} else {
result = "Refresh Job : " + entity.getName() + "\t jarPath: " + entity.getJarPath() + " failed ! , " +
"Because the Job status is " + entity.getStatus();
}
}
return result;
}
//重启数据库中所有的Job
@RequestMapping("/refresh/all")
public String refreshAll() {
String result;
try {
reStartAllJobs();
result = "SUCCESS";
} catch (SchedulerException e) {
result = "EXCEPTION : " + e.getMessage();
}
return "refresh all jobs : " + result;
}
/**
* 重新启动所有的job
*/
private void reStartAllJobs() throws SchedulerException {
synchronized (logger) { //只允许一个线程进入操作
Scheduler scheduler = schedulerFactoryBean.getScheduler();
Set<JobKey> set = scheduler.getJobKeys(GroupMatcher.anyGroup());
scheduler.pauseJobs(GroupMatcher.anyGroup()); //暂停所有JOB
for (JobKey jobKey : set) { //删除从数据库中注册的所有JOB
scheduler.unscheduleJob(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()));
scheduler.deleteJob(jobKey);
}
for (JobEntity job : jobService.loadJobs()) { //从数据库中注册的所有JOB
logger.info("Job register name : {} , group : {} , cron : {}", job.getName(), job.getGroup(), job.getCron());
JobDataMap map = jobService.getJobDataMap(job);
JobKey jobKey = jobService.getJobKey(job);
JobDetail jobDetail = jobService.geJobDetail(jobKey, job.getDescription(), map);
if (job.getStatus().equals("OPEN")) {
scheduler.scheduleJob(jobDetail, jobService.getTrigger(job));
} else {
logger.info("Job jump name : {} , Because {} status is {}", job.getName(), job.getName(), job.getStatus());
}
}
}
}
}
最前面应该还有配置个job类
package com.example.springbootquartz.job;
import com.example.springbootquartz.util.StringUtils;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@DisallowConcurrentExecution
@Component
public class DynamicJob implements Job {
private Logger logger = LoggerFactory.getLogger(DynamicJob.class);
/**
* 核心方法,Quartz Job真正的执行逻辑.
*
* @param executorContext executorContext JobExecutionContext中封装有Quartz运行所需要的所有信息
* @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常
*/
@Override
public void execute(JobExecutionContext executorContext) throws JobExecutionException {
//JobDetail中的JobDataMap是共用的,从getMergedJobDataMap获取的JobDataMap是全新的对象
JobDataMap map = executorContext.getMergedJobDataMap();
String jarPath = map.getString("jarPath");
String parameter = map.getString("parameter");
String vmParam = map.getString("vmParam");
logger.info("Running Job name : {} ", map.getString("name"));
logger.info("Running Job description : " + map.getString("JobDescription"));
logger.info("Running Job group: {} ", map.getString("group"));
logger.info("Running Job cron : " + map.getString("cronExpression"));
logger.info("Running Job jar path : {} ", jarPath);
logger.info("Running Job parameter : {} ", parameter);
logger.info("Running Job vmParam : {} ", vmParam);
long startTime = System.currentTimeMillis();
if (!StringUtils.getStringUtil.isEmpty(jarPath)) {
File jar = new File(jarPath);
if (jar.exists()) {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.directory(jar.getParentFile());
List<String> commands = new ArrayList<>();
commands.add("java");
if (!StringUtils.getStringUtil.isEmpty(vmParam)) {
commands.add(vmParam);
}
commands.add("-jar");
commands.add(jarPath);
if (!StringUtils.getStringUtil.isEmpty(parameter)) {
commands.add(parameter);
}
processBuilder.command(commands);
logger.info("Running Job details as follows >>>>>>>>>>>>>>>>>>>>: ");
logger.info("Running Job commands : {} ", StringUtils.getStringUtil.getListString(commands));
try {
Process process = processBuilder.start();
logProcess(process.getInputStream(), process.getErrorStream());
} catch (IOException e) {
throw new JobExecutionException(e);
}
} else {
throw new JobExecutionException("Job Jar not found >> " + jarPath);
}
}
long endTime = System.currentTimeMillis();
logger.info(">>>>>>>>>>>>> Running Job has been completed , cost time : " + (endTime - startTime) + "ms\n");
}
//打印Job执行内容的日志
private void logProcess(InputStream inputStream, InputStream errorStream) throws IOException {
String inputLine;
String errorLine;
BufferedReader inputReader = new BufferedReader(new InputStreamReader(inputStream));
BufferedReader errorReader = new BufferedReader(new InputStreamReader(errorStream));
while ((inputLine = inputReader.readLine()) != null) {
logger.info(inputLine);
}
while ((errorLine = errorReader.readLine()) != null){
logger.error(errorLine);
}
}
}
数据库设计
作者简介
是一个逗比,比较喜欢交朋友,也是一个IT爱好者,希望有志者多加评论,不喜勿喷,最后谢谢大家能观看我的文章