springboot+quartz+jpa实现动态定时任务

it2022-05-05  210

springboot+quartz+jpa实现动态定时任务。

配置maven

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <!--druid-spring-boot-starter--> <version>1.1.10</version> </dependency> <!--quartz依赖--> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> <version>2.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>

配置application.properties

quartz.enable=true server.port=8080 spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/super spring.datasource.username=root spring.datasource.password=root spring.datasource.tomcat.initial-size=20 spring.datasource.tomcat.max-active=100 spring.datasource.tomcat.max-idle=100 spring.datasource.tomcat.min-idle=20 spring.datasource.tomcat.max-wait=10000 spring.datasource.tomcat.test-while-idle=true spring.datasource.tomcat.test-on-borrow=false spring.datasource.tomcat.test-on-return=false #ID设置为自动获取 每一个必须不同 (所有调度器实例中是唯一的) org.quartz.scheduler.instanceId=AUTO #指定调度程序的主线程是否应该是守护线程 org.quartz.scheduler.makeSchedulerThreadDaemon=true #ThreadPool实现的类名 org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #ThreadPool配置线程守护进程 org.quartz.threadPool.makeThreadsDaemons=true #线程数量 org.quartz.threadPool.threadCount=20 #线程优先级 org.quartz.threadPool.threadPriority=5 #数据保存方式为持久化 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #StdJDBCDelegate说明支持集群 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties = true #quartz内部表的前缀 org.quartz.jobStore.tablePrefix=QRTZ_ #是否加入集群 org.quartz.jobStore.isClustered=true #容许的最大作业延长时间 org.quartz.jobStore.misfireThreshold=25000 spring.jpa.show-sql=true

配置config全局配置类

package com.example.springbootquartz.config; import org.quartz.spi.JobFactory; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.scheduling.quartz.SpringBeanJobFactory; import javax.sql.DataSource; import java.io.IOException; import java.util.Properties; @Configuration public class ConfigureQuartz { //配置JobFactory @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } /** * SchedulerFactoryBean这个类的真正作用提供了对org.quartz.Scheduler的创建与配置,并且会管理它的生命周期与Spring同步。 * org.quartz.Scheduler: 调度器。所有的调度都是由它控制。 * @param dataSource 为SchedulerFactory配置数据源 * @param jobFactory 为SchedulerFactory配置JobFactory */ @Bean public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory) throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); //可选,QuartzScheduler启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录 factory.setOverwriteExistingJobs(true); factory.setAutoStartup(true); //设置自行启动 factory.setDataSource(dataSource); factory.setJobFactory(jobFactory); factory.setQuartzProperties(quartzProperties()); return factory; } //从quartz.properties文件中读取Quartz配置属性 @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/application.properties")); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } //配置JobFactory,为quartz作业添加自动连接支持 public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } } }

自定义Stringutils类

* package com.example.springbootquartz.util; import java.util.List; import java.util.Map; public enum StringUtils { getStringUtil; //是否为空 public boolean isEmpty(String str) { return (str == null) || (str.length() == 0) || (str.equals("")); } //去空格 public String trim(String str) { return str == null ? null : str.trim(); } //获取Map参数值 public String getMapString(Map<String, String> map) { String result = ""; for (Map.Entry entry : map.entrySet()) { result += entry.getValue() + " "; } return result; } //获取List参数值 public String getListString(List<String> list) { String result = ""; for (String s : list) { result += s + " "; } return result; } }

配置实体类

package com.example.springbootquartz.entity; import javax.persistence.*; import java.io.Serializable; @Entity @Table(name = "JOB_ENTITY") public class JobEntity implements Serializable { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String name; //job名称 private String group; //job组名 private String cron; //执行的cron private String parameter; //job的参数 private String description; //job描述信息 @Column(name = "vm_param") private String vmParam; //vm参数 @Column(name = "jar_path") private String jarPath; //job的jar路径,在这里我选择的是定时执行一些可执行的jar包 private String status; //job的执行状态,这里我设置为OPEN/CLOSE且只有该值为OPEN才会执行该Job public JobEntity() { } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getCron() { return cron; } public void setCron(String cron) { this.cron = cron; } public String getParameter() { return parameter; } public void setParameter(String parameter) { this.parameter = parameter; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getVmParam() { return vmParam; } public void setVmParam(String vmParam) { this.vmParam = vmParam; } public String getJarPath() { return jarPath; } public void setJarPath(String jarPath) { this.jarPath = jarPath; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public String toString() { return "JobEntity{" + "id=" + id + ", name='" + name + '\'' + ", group='" + group + '\'' + ", cron='" + cron + '\'' + ", parameter='" + parameter + '\'' + ", description='" + description + '\'' + ", vmParam='" + vmParam + '\'' + ", jarPath='" + jarPath + '\'' + ", status='" + status + '\'' + '}'; } //新增Builder模式,可选,选择设置任意属性初始化对象 public JobEntity(Builder builder) { id = builder.id; name = builder.name; group = builder.group; cron = builder.cron; parameter = builder.parameter; description = builder.description; vmParam = builder.vmParam; jarPath = builder.jarPath; status = builder.status; } public static class Builder { private Integer id; private String name = ""; //job名称 private String group = ""; //job组名 private String cron = ""; //执行的cron private String parameter = ""; //job的参数 private String description = ""; //job描述信息 private String vmParam = ""; //vm参数 private String jarPath = ""; //job的jar路径 private String status = ""; //job的执行状态,只有该值为OPEN才会执行该Job public Builder withId(Integer i) { id = i; return this; } public Builder withName(String n) { name = n; return this; } public Builder withGroup(String g) { group = g; return this; } public Builder withCron(String c) { cron = c; return this; } public Builder withParameter(String p) { parameter = p; return this; } public Builder withDescription(String d) { description = d; return this; } public Builder withVMParameter(String vm) { vmParam = vm; return this; } public Builder withJarPath(String jar) { jarPath = jar; return this; } public Builder withStatus(String s) { status = s; return this; } public JobEntity newJobEntity() { return new JobEntity(this); } }

}

配置数据访问层

package com.example.springbootquartz.dao; import com.example.springbootquartz.entity.JobEntity; import org.springframework.data.repository.CrudRepository; public interface JobEntityRepository extends CrudRepository<JobEntity,Long> { JobEntity getById(Integer id); }

配置service类

package com.example.springbootquartz.service; import com.example.springbootquartz.dao.JobEntityRepository; import com.example.springbootquartz.entity.JobEntity; import com.example.springbootquartz.job.DynamicJob; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; @Service public class DynamicJobService { @Autowired private JobEntityRepository repository; //通过Id获取Job public JobEntity getJobEntityById(Integer id) { return repository.getById(id); } //从数据库中加载获取到所有Job public List<JobEntity> loadJobs() { List<JobEntity> list = new ArrayList<>(); repository.findAll().forEach(list::add); return list; } //获取JobDataMap.(Job参数对象) public JobDataMap getJobDataMap(JobEntity job) { JobDataMap map = new JobDataMap(); map.put("name", job.getName()); map.put("group", job.getGroup()); map.put("cronExpression", job.getCron()); map.put("parameter", job.getParameter()); map.put("JobDescription", job.getDescription()); map.put("vmParam", job.getVmParam()); map.put("jarPath", job.getJarPath()); map.put("status", job.getStatus()); return map; } //获取JobDetail,JobDetail是任务的定义,而Job是任务的执行逻辑,JobDetail里会引用一个Job Class来定义 public JobDetail geJobDetail(JobKey jobKey, String description, JobDataMap map) { return JobBuilder.newJob(DynamicJob.class) .withIdentity(jobKey) .withDescription(description) .setJobData(map) .storeDurably() .build(); } //获取Trigger (Job的触发器,执行规则) public Trigger getTrigger(JobEntity job) { return TriggerBuilder.newTrigger() .withIdentity(job.getName(), job.getGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(job.getCron())) .build(); } //获取JobKey,包含Name和Group public JobKey getJobKey(JobEntity job) { return JobKey.jobKey(job.getName(), job.getGroup()); } }

配置controller类

package com.example.springbootquartz.controller;

import com.example.springbootquartz.entity.JobEntity; import com.example.springbootquartz.service.DynamicJobService; import org.quartz.*; import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct; import java.util.Set;

@RestController public class JobController {

private static final Logger logger = LoggerFactory.getLogger(JobController.class); @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired private DynamicJobService jobService; //初始化启动所有的Job(服务运行时启动全部任务) @PostConstruct public void initialize() { try { reStartAllJobs(); logger.info("INIT SUCCESS"); } catch (SchedulerException e) { logger.info("INIT EXCEPTION : " + e.getMessage()); e.printStackTrace(); } } //根据ID重启某个Job @RequestMapping("/refresh/{id}") public String refresh(@PathVariable Integer id) throws SchedulerException { String result; JobEntity entity = jobService.getJobEntityById(id); if (entity == null) { return "error: id is not exist "; } synchronized (logger) { JobKey jobKey = jobService.getJobKey(entity); Scheduler scheduler = schedulerFactoryBean.getScheduler(); scheduler.pauseJob(jobKey); scheduler.unscheduleJob(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup())); scheduler.deleteJob(jobKey); JobDataMap map = jobService.getJobDataMap(entity); JobDetail jobDetail = jobService.geJobDetail(jobKey, entity.getDescription(), map); if (entity.getStatus().equals("OPEN")) { scheduler.scheduleJob(jobDetail, jobService.getTrigger(entity)); result = "Refresh Job : " + entity.getName() + "\t jarPath: " + entity.getJarPath() + " success !"; } else { result = "Refresh Job : " + entity.getName() + "\t jarPath: " + entity.getJarPath() + " failed ! , " + "Because the Job status is " + entity.getStatus(); } } return result; } //重启数据库中所有的Job @RequestMapping("/refresh/all") public String refreshAll() { String result; try { reStartAllJobs(); result = "SUCCESS"; } catch (SchedulerException e) { result = "EXCEPTION : " + e.getMessage(); } return "refresh all jobs : " + result; } /** * 重新启动所有的job */ private void reStartAllJobs() throws SchedulerException { synchronized (logger) { //只允许一个线程进入操作 Scheduler scheduler = schedulerFactoryBean.getScheduler(); Set<JobKey> set = scheduler.getJobKeys(GroupMatcher.anyGroup()); scheduler.pauseJobs(GroupMatcher.anyGroup()); //暂停所有JOB for (JobKey jobKey : set) { //删除从数据库中注册的所有JOB scheduler.unscheduleJob(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup())); scheduler.deleteJob(jobKey); } for (JobEntity job : jobService.loadJobs()) { //从数据库中注册的所有JOB logger.info("Job register name : {} , group : {} , cron : {}", job.getName(), job.getGroup(), job.getCron()); JobDataMap map = jobService.getJobDataMap(job); JobKey jobKey = jobService.getJobKey(job); JobDetail jobDetail = jobService.geJobDetail(jobKey, job.getDescription(), map); if (job.getStatus().equals("OPEN")) { scheduler.scheduleJob(jobDetail, jobService.getTrigger(job)); } else { logger.info("Job jump name : {} , Because {} status is {}", job.getName(), job.getName(), job.getStatus()); } } } }

}

最前面应该还有配置个job类

package com.example.springbootquartz.job; import com.example.springbootquartz.util.StringUtils; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.*; import java.util.ArrayList; import java.util.List; @DisallowConcurrentExecution @Component public class DynamicJob implements Job { private Logger logger = LoggerFactory.getLogger(DynamicJob.class); /** * 核心方法,Quartz Job真正的执行逻辑. * * @param executorContext executorContext JobExecutionContext中封装有Quartz运行所需要的所有信息 * @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常 */ @Override public void execute(JobExecutionContext executorContext) throws JobExecutionException { //JobDetail中的JobDataMap是共用的,从getMergedJobDataMap获取的JobDataMap是全新的对象 JobDataMap map = executorContext.getMergedJobDataMap(); String jarPath = map.getString("jarPath"); String parameter = map.getString("parameter"); String vmParam = map.getString("vmParam"); logger.info("Running Job name : {} ", map.getString("name")); logger.info("Running Job description : " + map.getString("JobDescription")); logger.info("Running Job group: {} ", map.getString("group")); logger.info("Running Job cron : " + map.getString("cronExpression")); logger.info("Running Job jar path : {} ", jarPath); logger.info("Running Job parameter : {} ", parameter); logger.info("Running Job vmParam : {} ", vmParam); long startTime = System.currentTimeMillis(); if (!StringUtils.getStringUtil.isEmpty(jarPath)) { File jar = new File(jarPath); if (jar.exists()) { ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.directory(jar.getParentFile()); List<String> commands = new ArrayList<>(); commands.add("java"); if (!StringUtils.getStringUtil.isEmpty(vmParam)) { commands.add(vmParam); } commands.add("-jar"); commands.add(jarPath); if (!StringUtils.getStringUtil.isEmpty(parameter)) { commands.add(parameter); } processBuilder.command(commands); logger.info("Running Job details as follows >>>>>>>>>>>>>>>>>>>>: "); logger.info("Running Job commands : {} ", StringUtils.getStringUtil.getListString(commands)); try { Process process = processBuilder.start(); logProcess(process.getInputStream(), process.getErrorStream()); } catch (IOException e) { throw new JobExecutionException(e); } } else { throw new JobExecutionException("Job Jar not found >> " + jarPath); } } long endTime = System.currentTimeMillis(); logger.info(">>>>>>>>>>>>> Running Job has been completed , cost time : " + (endTime - startTime) + "ms\n"); } //打印Job执行内容的日志 private void logProcess(InputStream inputStream, InputStream errorStream) throws IOException { String inputLine; String errorLine; BufferedReader inputReader = new BufferedReader(new InputStreamReader(inputStream)); BufferedReader errorReader = new BufferedReader(new InputStreamReader(errorStream)); while ((inputLine = inputReader.readLine()) != null) { logger.info(inputLine); } while ((errorLine = errorReader.readLine()) != null){ logger.error(errorLine); } } }

数据库设计

作者简介

是一个逗比,比较喜欢交朋友,也是一个IT爱好者,希望有志者多加评论,不喜勿喷,最后谢谢大家能观看我的文章


最新回复(0)