Java架构师之路-如何去实现一个分布式定时任务?

尹家俊


虽说有现成的框架可以实现,不过还是一步一步地说一下思路。


需求

为方便大家的理解,先给大家讲一个真实的需求,这是我在第二家公司的一个项目,定时任务每天凌晨执行,需求很简单:把原始的业务数据,加工处理成待发送的短信。

原始数据:姓名-小明,所在地-北京,电话-13800000000,账单最后还款日期-2018年4月30日。

加工后的数据是:亲爱的小明,您的账单最后还款日期为2018年4月30日,请提前缴费。然后把需要把这条短信发送到13800000000这个手机号上。


定时任务

定时任务框架里面,最有名的就是quartz了,相信大部分Java程序员都用过。

我们项目最开始也用的是quartz,只有一个服务器跑定时任务。但是待处理的数据越来越多,定时服务执行的时间也越来越长,终于有一天,定时任务从晚上跑到了第二天白天也没有跑完,耽误了短信的发送。

改造后的定时任务

有人就有疑问了,能不能直接把定时服务部署多套不就行了。但是部署多套quartz的话,就会出现问题:待处理的任务有可能会被重复执行。

应对这种问题,我们当时有两种处理方案:

方案一:定时服务只部署一套,但是定时任务的工作只是提取待处理的任务。

实际的业务处理服务集群化部署,然后由定式服务提取数据后,发送给业务处理服务器进行实际的处理。

方案二:这个是我当时自己想出的一个奇葩的方法,不过这个方案想明白了,对分布式定式服务的理解很有帮助!

  • 定时任务程序部署多套,并且多套环境都是独立的IP。每套程序定时将IP写入到数据中(一分钟对表update一次,并更新时间戳)。
  • 多套服务选举出一台主服务器。
  • 主服务器把所有的待处理任务,尽可能平均分配给每一台服务器。(IP和待处理任务对应上,也就是每一条待处理任务只能让分配的IP处理)
  • 处理任务的时候,只处理自己IP对应的任务。
  • 一台服务器挂了,主服务器负责把它的IP从数据库中抹掉(三分钟没有对表进行更新的IP,删除掉),并重新分配这个IP对应的待处理任务。
  • 主服务器挂了,重新选举出主服务器。

分布式定时任务框架

我只用过Elastic-job,所以只给大家介绍一下这个框架。

任务分片:把一个任务拆分成几个独立的任务,然后由分布式服务器分别执行一个或者多个子任务。比如还是上面那个需求,那么可以按照【所在地】拆分任务,北京的待处理数据是一个子任务,天津的待处理数据是第二个子任务。

Elastic-Job并不直接提供数据处理的功能,实际的数据处理还是需要自己写,Elastic-Job会将分片任务分配到各个运行中的作业服务器。

其实发现了没有,Elastic-Job做的工作,就是我那个主服务器做的任务分配的工作,把所在地=北京的,分配给服务器1处理,把所在地=天津的,分配给服务器2处理;甚至包括监控每台作业服务器是否存活,挂掉一台重新分配待处理任务,也都是Elastic-Job来做的。

我将持续分享Java开发、架构设计、程序员职业发展等方面的见解,希望能得到你的关注。


会点代码的大叔


一、在JAVA开发领域,目前可以通过以下几种方式进行定时任务

1、单机部署模式

Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。

ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。

Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。

Quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

2、分布式集群模式(不多介绍,简单提一下)

问题:

I、如何解决定时任务的多次执行?

II、如何解决任务的单点问题,实现任务的故障转移?

问题I的简单思考:

1、固定执行定时任务的机器(可以有效避免多次执行的情况 ,缺点就是单点故障问题)。

2、借助Redis的过期机制和分布式锁。

3、借助mysql的锁机制等。

成熟的解决方案:

1、Quartz:可以去看看这篇文章[Quartz分布式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。

2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)当当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。

3、xxl-job:(https://github.com/xuxueli/xxl-job)是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。

4、saturn:(https://github.com/vipshop/Saturn) 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、SpringTask实现定时任务(这里是基于springboot)

1、简单的定时任务实现

使用方式:

使用@EnableScheduling注解开启对定时任务的支持。

使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定时策略来实现定时任务。

使用缺点:

1、多个定时任务使用的是同一个调度线程,所以任务是阻塞执行的,执行效率不高。

2、其次如果出现任务阻塞,导致一些场景的定时计算没有实际意义,比如每天12点的一个计算任务被阻塞到1点去执行,会导致结果并非我们想要的。

使用优点:

1、配置简单

2、适用于单个后台线程执行周期任务,并且保证顺序一致执行的场景

源码分析:

//默认使用的调度器

if(this.taskScheduler == null) {

this.localExecutor = Executors.newSingleThreadScheduledExecutor();

this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);

}

//可以看到SingleThreadScheduledExecutor指定的核心线程为1,说白了就是单线程执行

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {

return new DelegatedScheduledExecutorService

(new ScheduledThreadPoolExecutor(1));

}

//利用了DelayedWorkQueue延时队列作为任务的存放队列,这样便可以实现任务延迟执行或者定时执行

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

  

2、实现并发的定时任务

使用方式:

方式一:由1中我们知道之所以定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就行了!直接上代码:

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Autowired

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//简单粗暴的方式直接指定

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

//也可以自定义的线程池,方便线程的使用与维护,这里不多说了

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

}

}

@Bean(name = "myThreadPoolTaskScheduler")

public TaskScheduler getMyThreadPoolTaskScheduler() {

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

taskScheduler.setPoolSize(10);

taskScheduler.setThreadNamePrefix("Haina-Scheduled-");

taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//调度器shutdown被调用时等待当前被调度的任务完成

taskScheduler.setWaitForTasksToCompleteOnShutdown(true);

//等待时长

taskScheduler.setAwaitTerminationSeconds(60);

return taskScheduler;

}

方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

废话太多,直接上代码:

@Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)

@Async("myThreadPoolTaskExecutor")

//@Async

public void scheduledTest02(){

System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());

}

//自定义线程池

@Bean(name = "myThreadPoolTaskExecutor")

public TaskExecutor getMyThreadPoolTaskExecutor() {

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

taskExecutor.setCorePoolSize(20);

taskExecutor.setMaxPoolSize(200);

taskExecutor.setQueueCapacity(25);

taskExecutor.setKeepAliveSeconds(200);

taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");

// 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者

taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//调度器shutdown被调用时等待当前被调度的任务完成

taskExecutor.setWaitForTasksToCompleteOnShutdown(true);

//等待时长

taskExecutor.setAwaitTerminationSeconds(60);

taskExecutor.initialize();

return taskExecutor;

}

首先使用@EnableAsync 启用异步任务

然后在定时任务的方法加上@Async即可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)

项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@Async("myThreadPool")

线程池的使用心得(后续有专门文章来探讨)

java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。

使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性

使用自定义的线程池便于对项目中线程的管理、维护以及监控。

即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!!

三、动态定时任务的实现

问题:

使用@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

解决办法:

方式一:实现SchedulingConfigurer接口,重写configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Autowired

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

//可以实现动态调整定时任务的执行频率

scheduledTaskRegistrar.addTriggerTask(

//1.添加任务内容(Runnable)

() -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),

//2.设置执行周期(Trigger)

triggerContext -> {

//2.1 从数据库动态获取执行周期

String cron = "0/2 * * * * ? ";

//2.2 合法性校验.

// if (StringUtils.isEmpty(cron)) {

// // Omitted Code ..

// }

//2.3 返回执行周期(Date)

return new CronTrigger(cron).nextExecutionTime(triggerContext);

}

);

}

}

方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执行频率的调整

首先,我们要认识下这个调度类,它其实是对java中ScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有以下几点:

1、提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数。

2、支持自定义任务,通过传入Trigger参数。

3、对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。

顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:

1、提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置

2、实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。

3、因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)

4、提供默认ThreadFactory实现,直接通过参数重载配置

扯了这么多,还是直接上代码:

@Component

public class DynamicTimedTask {

private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

//利用创建好的调度类统一管理

//@Autowired

//@Qualifier("myThreadPoolTaskScheduler")

//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;

//接受任务的返回结果

private ScheduledFuture> future;

@Autowired

private ThreadPoolTaskScheduler threadPoolTaskScheduler;

//实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler

@Bean

public ThreadPoolTaskScheduler threadPoolTaskScheduler() {

ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();

return new ThreadPoolTaskScheduler();

}

/**

* 启动定时任务

* @return

*/

public boolean startCron() {

boolean flag = false;

//从数据库动态获取执行周期

String cron = "0/2 * * * * ? ";

future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);

if (future!=null){

flag = true;

logger.info("定时check训练模型文件,任务启动成功!!!");

}else {

logger.info("定时check训练模型文件,任务启动失败!!!");

}

return flag;

}

/**

* 停止定时任务

* @return

*/

public boolean stopCron() {

boolean flag = false;

if (future != null) {

boolean cancel = future.cancel(true);

if (cancel){

flag = true;

logger.info("定时check训练模型文件,任务停止成功!!!");

}else {

logger.info("定时check训练模型文件,任务停止失败!!!");

}

}else {

flag = true;

logger.info("定时check训练模型文件,任务已经停止!!!");

}

return flag;

}

class CheckModelFile implements Runnable{

@Override

public void run() {

//编写你自己的业务逻辑

System.out.print("模型文件检查完毕!!!")

}

}

}

四、总结

到此基于springtask下的定时任务的简单使用算是差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!

基于分布式集群下的定时任务使用,后续有时间再继续!!!


IT实战联盟


需要一个全局协调器,记master。接下来就是要干活的应用服务器,记slave。定时器而且是分布式的,就要求每个服务器在指定时间都执行相同任务,而且实现对任务的管理如暂停,启动,停止等!甚至可以创建指定的定时器。还要保证任务执行的原子性!定时任务由master统一分发给每个slave!之后每个slave将处理结果返回给master。由master统一保存或是回滚!

实现的技术比较多:支持分布式的如zookeeper,系统信息交互的如netty,支持定时器的如quartz。


tryetry


需要一个任务分发集群和一个任务执行集群,任务分发集群三台机器即可,任务执行集群依赖你的业务量。任务分发集群执行分发任务,按照一定的策略将任务分发给执行集群各个机器。需要解决任务重复分发问题,以及执行任务的server和分发任务的server挂掉的问题,总之就是两个集群的高可用。

解决重复分发问题,很简单,采用主从模式,这涉及选主操作,分发集群只有一台主server工作,其他server在主server挂掉后选主继续分发,选主使用zk非常方便。

执行集群中server挂掉后,其上任务需要重新分发给其他server,这个问题使用zk也是很方便可以解决

手机打字 细节就不写了


溅溅123321


分布式任务本身涉及到分布式构架,基于不同的任务处理量和反应时间可以考虑 hadoop spark flink 等。 定时最好用serverless的scheduler 触发。


分享到:


相關文章: