前言
本篇文章首要记载项目中遇到的 xxl-job 的实战,希望能经过这篇文章告知读者们什么是 xxl-job 以及怎样运用 xxl-job 并分享一个实战事例。
那么下面先阐明什么是 xxl-job 以及为什么要运用它。
xxl-job 是什么?
XXL-JOB 是一个分布式使命调度渠道,其中心规划方针是开发敏捷、学习简略、轻量级、易扩展。
规划思维 是将调度行为笼统构成 调度中心 渠道,渠道自身不承担事务逻辑,而是负责发起 调度恳求 后,由 履行器 接纳调度恳求并履行 使命,这儿的 使命 笼统为 涣散的 JobHandler。经过这种办法即可完结 调度 与 使命 彼此解耦,从而进步体系整体的稳定性和拓展性。
为了更好理解,这儿放一张官网的架构图:
使命调度是什么?
在开发项目时大家是否也遇到过类似的场景问题:
- 体系需求守时在每天0点进行数据备份。
- 体系需求在活动开端前几小时预热履行一些前置事务。
- 体系需求守时对 MQ 消息表的发送装填,对发送失利的 MQ 消息进行补偿从头发送。
这些场景问题都可以经过 使命调度 来处理,使命调度指的是体系在约定的指守时刻主动去履行指定的使命的进程。
单体体系 中有许多完结 使命调度 的办法,如多线程办法、Timer 类、Spring Tasks 等等。这儿比较常用的是 Spring Tasks(经过 @EnableScheduling
+ @Scheduled
的注解可以自界说守时使命,有爱好的可以去了解一下)
为什么需求分布式使命调度渠道?
分布式下,每个服务都可以建立为集群,这样的好处是可以将使命切片分给每一个服务从而完结并行履行,进步使命调度的处理功率。那么为什么 分布式体系 不能运用 单体体系 的使命调度完结办法呢。
在集群服务下,假如还是运用每台机器按照单体体系的使命调度完结办法完结的话,会呈现下面这四个问题:
- 怎样做到对使命的操控(怎么避免使命重复履行)。
- 假如某台机器宕机了,会不会存在使命丢掉。
- 假如要添加服务实例,怎样做到弹性扩容。
- 怎么做到对使命调度的履行状况一致监测。
经过上面的问题可以了解到分布式体系下需求一个满意高可用、容错办理、负载均衡等功用的使命调度渠道来完结使命调度。分布式体系下,也有许多可以完结使命调度的第三方的分布式使命调度体系,如 xxl-job、Quartz、elastic-job 等等常用的分布式使命调度体系。
本篇文章将介绍 xxl-job 分布式使命调度体系。
怎么运用 xxl-job
作为开源软件的 xxl-job,可以在 github 或 gitee 上检查和下载 xxl-job 的源码。
下面将介绍我运用 xxl-job 的流程(假如有操作不妥的,可以检查官方的中文文档 分布式使命调度渠道XXL-JOB (xuxueli.com))
dokcer 下安装 xxl-job
-
docker 下拉取 xxl-job 的镜像(这儿运用 2.3.1 版别)
docker pull xuxueli/xxl-job-admin:2.3.1
-
创立映射容器的文件目录
mkdir -p -m 777 /mydata/xxl-job/data/applogs
-
在
/mydata/xxl-job
的目录下创立application.properties
文件因为
application.properties
的代码过长,这儿就不展现了,需求的可以去 gitee 上获取,详细途径如图:这儿需求留意数据库位置的填写:
假如还需求更改端口的可以更改这儿:
这儿还需求留意告警邮箱和拜访口令(后续Spring Boot装备用到):
-
将
tables_xxl-job.sql
文件导入上面过程3指定的数据库(自己填写的那个数据库)同样因为文件代码过长,这儿展现 gitee 上获取的途径图:
-
履行 docker 指令
留意这儿的
-p 8088:8088
是因为我更改了前面application.porperties
文件的端口号为 8088,所以这儿我履行的 docker 指令为-p 8088:8088
,假如没有更改的这儿一定要改为-p 8080:8080
。docker run -p 8088:8088 \ -d --name=xxl-job-admin --restart=always \ -v /mydata/xxl-job/application.properties:/application.properties \ -v /mydata/xxl-job/data/applogs:/data/applogs \ -e PARAMS='--spring.config.location=/application.properties' xuxueli/xxl-job-admin:2.3.1
履行后经过
docker ps
检查是否成功运转,假如失利可以经过docker logs xxl-job-admin
检查详细错误日志。 -
经过
http://192.168.101.25:8088/xxl-job-admin/
拜访(这儿ip和端口是自己的)账号:admin 密码:123456
到这儿就算是完结了 xxl-job 在 docker 的建立。
Spring Boot 项目集成 xxl-job
xxl-job 由 调度中心 和 履行器 组成,上面现已完结了在 docker 上布置调度中心了,接下来介绍怎样装备布置履行器项目。
-
在 Spring Boot 项目中导入 maven 依赖
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.1</version> </dependency>
这儿需求留意版别号与 xxl-job 版别需求共同,这儿我装备的都是 2.3.1 版别。
-
在 Spring Boot 项目中装备
application.yml
文件xxl: job: admin: addresses: http://192.168.101.25:8088/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
- 这儿的
xxl.job.admin.addresses
用于指定调度中心的地址。 - 这儿的
xxl.job.accessToken
用于指定拜访口令(也便是前面建立 xxl-job 中过程3指定的)。 - 这儿的
xxl.job.executor.appname
用于指定履行器的称号(需求与后续装备履行器的称号共同)。 - 这儿的
xxl.job.executor.port
用于指定履行器的端口(履行器实际上是一个内嵌的 Server,默认端口为9999,装备多个同一服务实例时需求指定不同的履行器端口,否则会端口冲突)。 - 其他特点只需求照着装备即可(想要了解特点的详细意义可以检查中文文档中的2.4装备布置履行器项目章节)。
- 这儿的
-
编写装备类
/** * XXL-JOB装备类 * * @author 单程车票 */ @Slf4j @Configuration public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
-
调度中心中新增履行器
履行器的装备特点:
- AppName: 每个履行器集群的仅有标示 AppName,履行器会周期性以 AppName 为对象进行主动注册。可经过该装备主动发现注册成功的履行器,供使命调度时运用。
- 称号: 履行器的称号(可以运用中文更好地表现该履行器是用来干嘛的)。
- 注册办法:调度中心获取履行器地址的办法(一般为了方便可以选用主动注册即可)。
- 主动注册:履行器主动进行履行器注册,调度中心经过底层注册表可以动态发现履行器机器地址。
- 手动录入:人工手动录入履行器的地址信息,多地址逗号分隔,供调度中心运用。
- 机器地址:”注册办法”为”手动录入”时有效,支撑人工维护履行器的地址信息。
-
装备自界说使命
装备自界说使命有许多种形式,如 Bean形式(依据办法)、Bean形式(依据类)、GLUE形式等等。这儿介绍经过 Bean形式(依据办法) 是怎么自界说使命的(关于其余的形式可以参阅官方文档)。
Bean形式(依据办法)也便是每个使命对应一个办法,经过添加
@XxLJob(value="自界说JobHandler称号", init = "JobHandler初始化办法", destroy = "JobHandler毁掉办法")
注解即可完结界说。/** * 使命处理类 * * @author 单程车票 */ @Component public class TestJob { /** * 测试使命 */ @XxlJob("testHandler") public void testHandler() { XxlJobHelper.handleSuccess("本次测试使命调度成功"); } }
- 经过注解也可以指定 初始化办法和毁掉办法,假如不填写可以直接写一个 自界说的JobHandler称号 用于后面在调度中心中装备使命时对应使命的 JobHandler 特点值。
- 可以经过
XxlJobHelper.log
来打印日志,经过调度中心可以检查履行日志的状况。 - 可以经过
XxlJobHelper.handleFail
或XxlJobHelper.handleSuccess
手动设置使命调度的成果(不设置时默认成果为成功状况,除非使命履行时呈现反常)。
-
调度中心中新增使命
这儿首要留意 Cron 表达式的时刻装备以及 JobHandler 的值需求与自界说使命办法的注解上的
value
特点值共同即可。关于高档装备这儿放一张中文文档的详细阐明(也可以直接去看文档):
需求建立集群或过期策略等高档玩法时可以进行装备。
到这儿就完结了 SpringBoot 集成 xxl-job 完结分布式使命调度的全进程了,接下来会经过一个实战事例来详细看看 xxl-job 的用途。
xxl-job 实战
下面经过一个最近自己在跟着做的学习项目中运用到 xxl-job 的场景事例来详细了解一下怎么运用 xxl-job 来完结使命调度。
实战背景
当时项目需求对上传到分布式文件体系 minio 中的视频文件进行一致格局的视频转码操作,因为自身视频转码操作会带了很大的时刻耗费以及 CPU 的开支,所以考虑集群服务下运用 xxl-job 的办法以使命调度的办法守时处理视频转码操作。
这样可以带来两个好处:① 以使命调度的办法,可以使得视频转码操作不会堵塞主线程,避免影响首要事务的吞吐量; ② 以集群服务分片接纳使命的办法,可以将使命均分给每个机器使得使命调度可以并行履行,进步总使命处理时刻以及下降单台机器 CPU 的开支;
xxl-job 履行流程图
怎样将使命均分给每台服务器?
因为使命履行时刻过长,需求建立集群服务来做到并行使命调度,从而减小 CPU 的开支,那么怎样均分使命呢?
运用 xxl-job 在集群布置时,装备路由策略中选择 分片播送 的办法,可以使一次使命调度会播送触发集群中所有的履行器履行一次使命,而且可以向体系传递分片参数。
运用这一特性可以依据 当时履行器的分片序号和分片总数 来获取对应的使命记载。
先来看看 Bean 形式下怎样获取分片序号和分片总数:
// 分片序号(当时履行器序号)
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数(履行器总数)
int shardTotal = XxlJobHelper.getShardTotal();
有了这两个特点,当履行器扫描数据库获取记载时,可以依据 取模 的办法获取归于当时履行器的使命,可以这样编写 sql 获取使命记载:
select * from media_process m
where m.id % #{shareTotal} = #{shareIndex}
and (m.status = '1' or m.status = '3')
and m.fail_count < 3
limit #{count}
扫描使命表,依据使命 id 对分片总数 取模 来完结对所有分片的均分使命,经过判断是否是当时分片序号,而且当时使命状况为 1
(未处理)或 3
(处理失利)而且当时使命失利次数小于3次时可以取得当时使命。每次扫描只取出 count
个使命数(批量处理)。
因而经过 xxl-job 的分片播送 + 取模 的办法即可完结对集群服务均分使命的操作。
怎样确保使命不会被重复消费?
因为视频转码自身处理时刻就会比较长,所以更不答应服务重复履行,尽管上面经过分片播送+取模的办法进步了使命不会被重复履行的机率,可是依旧存在如下状况:
如下图,有三台集群机器和六个使命,刚开端分配好了每台机器两个使命,履行器0正准备履行使命3时,刚好履行器2宕机了,此时履行器1刚好履行一次使命,因为分片总数减小,导致履行器1从头分配到需求履行的使命正好也是使命3,那么此时就会呈现履行器0和履行器1都在履行使命3的状况。
那么这种状况就需求完结幂等性了,幂等性有很多种完结办法,有爱好了解的可以参阅:开发中常遇到的接口幂等性问题及完结 – ()
这儿运用达观锁的办法完结幂等性,详细 sql 如下:
update media_process m
set m.status = '2'
where (m.status = '1' or m.status = '3')
and m.fail_count < 3
and m.id = #{id}
这儿只需求依靠使命的状况即可完结(未处理1
;处理中2
;处理失利3
;处理成功4
),可以看到这儿类似于 CAS 的办法经过比较和设置的办法只有在状况为未处理或处理失利时才干设置为处理中。这样在并发场景下,即便多个履行器一起处理该使命,也只有一个使命可以设置成功进入处理使命阶段。
为了真实达到幂等性,还需求设置一下 xxl-job 的调度过期策略和堵塞处理策略来确保真实的幂等性。分别设置为 疏忽(调度过期后,疏忽过期的使命,从当时时刻开端从头计算下次触发时刻) 和 丢掉后续调度(调度恳求进入单机履行器后,发现履行器存在运转的调度使命,本次恳求将会被丢掉并标记为失利)。
编写完结该功用所需的所有使命
- 分片视频转码处理
代码(这儿的代码只展现部分中心过程代码):
/**
* 视频转码处理使命
*/
@XxlJob("videoTranscodingHandler")
public void videoTranscodingHandler() throws InterruptedException {
// 1. 分片获取当时履行器需求履行的所有使命
List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, count);
// 经过JUC东西类堵塞直到所有使命履行完
CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
// 遍历所有使命
mediaProcessList.forEach(mediaProcess -> {
// 以多线程的办法履行所有使命
executor.execute(() -> {
try {
// 2. 测验抢占使命(经过达观锁完结)
boolean res = mediaProcessService.startTask(id);
if (!res) {
XxlJobHelper.log("使命抢占失利,使命id{}", id);
return;
}
// 3. 从minio中下载视频到本地
File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
// 下载失利
if (file == null) {
XxlJobHelper.log("下载视频犯错,使命id:{},bucket:{},objectName:{}", id, bucket, objectName);
// 呈现反常重置使命状况为处理失利等候下一次处理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "下载视频到本地失利");
return;
}
// 4. 视频转码
String result = videoUtil.generateMp4();
if (!result.equals("success")) {
XxlJobHelper.log("视频转码失利,原因:{},bucket:{},objectName:{},", result, bucket, objectName);
// 呈现反常重置使命状况为处理失利等候下一次处理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "视频转码失利");
return;
}
// 5. 上传转码后的文件
boolean b1 = mediaFileService.addMediaFilesToMinIO(new_File.getAbsolutePath(), "video/mp4", bucket, objectNameMp4);
if (!b1) {
XxlJobHelper.log("上传 mp4 到 minio 失利,使命id:{}", id);
// 呈现反常重置使命状况为处理失利等候下一次处理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "上传 mp4 文件到 minio 失利");
return;
}
// 6. 更新使命状况为成功
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.SUCCESS.getValue(), fileId, url, "创立临时文件反常");
} finally {
countDownLatch.countDown();
}
});
});
// 堵塞直到所有办法履行完结(30min后不再等候)
countDownLatch.await(30, TimeUnit.MINUTES);
}
中心使命 – 分片获取使命后履行视频转码使命,过程如下:
- 经过 分片播送拿到的参数以取模的办法 获取当时履行器所属的使命记载调集
- 遍历调集,以 多线程的办法 并发地履行使命
- 每次履行使命前需求先经过 数据库达观锁的办法 抢占当时使命,抢占到才干履行
- 履行使命进程分为 分布式文件体系下载需求转码的视频文件 -> 视频转码 -> 上传转码后的视频 -> 更新使命状况(处理成功)
- 运用JUC东西类
CountDownLatch
完结所有使命履行完后才退出办法 - 中间运用 xxl-job 的日志记载错误信息和履行成果
- 清理使命表中转码成功的使命的记载并将其刺进使命前史表
因为使命表处理完使命后仅仅更新使命状况,这样跟着使命增多会导致检索起来时刻耗费过大,所以运用使命调度的办法定时扫描使命表,将使命状况为处理成功的使命删去并从头刺进使命前史表中留存(因为代码过于简略,这儿就不做展现了)。
首要完结两个功用:① 清理使命表中已成功处理的使命; ② 将处理成功的使命记载刺进前史表中;
- 视频补偿机制
因为运用达观锁会将使命状况更新为处理中,假如此时履行使命的履行器(服务)宕机了,会导致该使命记载一直存在,因为达观锁的原因其他履行器也无法获取,这个时分同样需求运用使命调度的办法,定时扫描使命表,判断使命是否处于处理中状况而且使命创立时刻远大于30分钟,则阐明使命超时了,则是运用使命调度的办法从头更新使命的状况为未处理,等候下一次视频转码使命的调度处理。此外视频补偿机制使命调度还需求检查是否存在使命最大次数现已大于3次的,假如存在则交付给人工处理(因为代码过于简略,这儿就不做展现了)。
首要完结两个功用:① 处理使命超时状况下的使命,做出补偿; ② 处理失利次数大于3次的使命,做出补偿;
测试并检查日志
准备好的使命表记载:
发动三台媒资服务器,并敞开使命:
可以单独检查每个使命的日志:
经过日志中的履行日志检查详细日志信息:
可以看到直接为了测试改错的途径导致下载视频犯错:
检查数据库表的改变:
到这儿可以看到中心的视频转码使命履行成功,而且逻辑正确,可以起到分布式使命调度的效果。
这便是本次 xxl-job 实战的全部内容了,写这篇文章首要是为了记载一下项目中是怎么运用 xxl-job 的,而且供给一种分片播送均分使命的思路以及幂等性问题怎么处理,详细运用 xxl-job 还需依据自己项目的需求。遇到问题可以参阅官网:分布式使命调度渠道XXL-JOB (xuxueli.com)