上一篇2. xxl-job源码剖析-中心XxlJobExecutor咱们介绍了XxlJobExecutor
。里面其实有个很重要的点其时没有深化剖析,便是在executor-server (rpc provider)
部分,其时仅仅介绍了embedServer = new EmbedServer()
就没持续深化了。可是个人以为这儿面的内容仍是许多的,所以本篇详细讲解下这个executor-server
的内容。
咱们首要从这个embedServer.start
开始。
EmbedServer
的start办法
EmbedServer
究竟是如何发动的呢?也能够说这个start
办法究竟做了啥?咱们带着问题往下看。
public void start(final String address, final int port, final String appname, final String accessToken) {
// 实现事务操作功用
executorBiz = new ExecutorBizImpl();
// 创建一个线程
thread = new Thread(new Runnable() {
@Override
public void run() {
// 选用netty进行网络服务
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// 敞开网络服务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 闲暇检测
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
// 支撑http协议
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
// 事务逻辑处理
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 发动注册
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
// 设置为后台线程
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
// 发动线程
thread.start();
}
看到了发动的源码,你是否会茅塞顿开,本来便是创建了一个Netty
服务端,监听端口。然后由Netty
来帮忙进行Http
协议的编码和解码。而咱们只需求关注事务,也便是EmbedHttpServerHandler
的处理逻辑。
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
this.bizThreadPool = bizThreadPool;
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// 获取调用中心发送过来的请求
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// 由线程池进行异步处理,避免堵塞IO
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// 处理请求
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// 得到的结果转成JSON
String responseJson = GsonTool.toJson(responseObj);
// 回来给调度中心
writeResponse(ctx, keepAlive, responseJson);
}
});
}
// 详细的处理逻辑
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// 校验POST请求
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// 依据uri进行不同的处理,不过这些处理逻辑全部委托给了executorBiz
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
/**
* 写入回来的http的响应报文
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close(); // beat 3N, close if idle
logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}
EmbedHttpServerHandler
的逻辑也不复杂,主要是对整理流程和功用进行了定义,其实主要的报文处理逻辑仍是委托executorBiz
进行处理。
作为一个客户端,最重要的便是履行调度中心发过来的指令,所以这儿要点剖析一下executorBiz.run(triggerParam)
。
ExecutorBizImpl
的run
办法
executorBiz
即ExecutorBizImpl
类的一个实例。直接查看它的run
办法的实现逻辑
public ReturnT<String> run(TriggerParam triggerParam) {
// 依据jobId获取JobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// 获取履行处理的类型
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 假设是spring bean的类型
// 依据handler的姓名得到JobHandler处理类
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 校验jobThread中的Handler是否和参数中的传的一致
if (jobThread!=null && jobHandler != newJobHandler) {
// 假设不一致,阐明handler改变了,需求kill之前的
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
// 重置
jobThread = null;
jobHandler = null;
}
// 从头赋值handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// 疏忽groovy的校验赋值代码,类似于bean的
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// 疏忽脚本类的校验赋值代码,类似于bean的
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 处理堵塞战略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 从头注册,设置jobThread
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 将需求履行的参数放入堵塞行列等候线程履行,异步处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
上面代码前部分都是校验内容,最后履行了一个jobThread.pushTriggerQueue(triggerParam)
办法,将参数放入到堵塞行列中,等候'JobThread
的履行。
JobThread
JobThread
继承了Thread
类,内部维护着一个无界的堵塞行列LinkedBlockingQueue<TriggerParam> triggerQueue
。其要点便是线程的run办法。
@Override
public void run() {
// 调用init办法,也便是@XxlJob注解中装备的init办法
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// 判断线程是否中止,由于内部是个堵塞行列,会一直等候履行,当强制kill的时分,需求进行中止
while(!toStop){
// 默许还没有跑使命
running = false;
// 闲暇次数累加
idleTimes++;
TriggerParam triggerParam = null;
try {
// 要查看 toStop 信号,咱们需求循环,所以不能运用 queue.take(),instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
// 不为空的状况,阐明需求履行使命了
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// 初始化上下文,相当于在履行的时分放入参数
XxlJobContext.setXxlJobContext(xxlJobContext);
// 记录日志
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
if (triggerParam.getExecutorTimeout() > 0) {
// 假设有超时时刻,需求在超时后中止
Thread futureThread = null;
try {
// 包装成FutureTask,发动另一个线程,进行线程的超时中止处理
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
// handle result
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// 无超时时刻的,直接履行
handler.execute();
}
// 校验履行结果
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
?tempHandleMsg.substring(0, 50000).concat("...")
:tempHandleMsg;
// 在context中设置结果信息
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
// 打印日志
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+ XxlJobContext.getXxlJobContext().getHandleCode()
+ ", handleMsg = "
+ XxlJobContext.getXxlJobContext().getHandleMsg()
);
} else {
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
// handle result
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
XxlJobHelper.handleFail(errorMsg);
XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
// 假设强制中止,行列还有未处理的数据的时分
while(triggerQueue !=null && triggerQueue.size()>0){
// 行列中的回调触发器请求
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
// 调用destroy办法,也便是@XxlJob注解中装备的destroy办法
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
通过上面的代码剖析,我觉得有几个写的很好的点能够参考学习的:
- 它的
toStop
变量。这个toStop
变量是个volatile boolean
类型的,确保可见性。这个并没有什么特别的,只要在while中进行判断,都需求运用volatile
类型。可是它在判断toStop
为false的时分,也便是或许被中止了,这个办法还做了处理,进行了一个回调。而不是直接回来,做清空行列的操作,我以为这点很好,毕竟遇到了意外的状况,仍是有地方能够查到的。 - 在
triggerQueue
中获取行列中的值的时分,运用了poll(3L, TimeUnit.SECONDS)
,并没有直接运用take()
。假设运用take()
的话,没有元素会一直堵塞,这样即使是toStop
已经变了,也无法检测到,由于堵塞了,无法持续往后履行了,这儿留了一个口儿,3s后假设没有poll
到会立马回来null,确保3s后会进行一个toStop
检测。 - 线程中还运用了一个
idleTimes
闲暇次数的检测。假设超过了30次,就移除毁掉这个线程。避免一些无用的线程一直在等候。由于在之前剖析ExecutorBizImpl
的run
办法中也能够看到有个重置后从头注册的一个进程,这个registJobThread
会创建新的线程,假设改变了handler而不清理之前的线程,会导致线程越来越多。
总结
从EmbedServer
的start办法到最后履行的事务逻辑的JobThread
中IJobHandler
的execute
办法。咱们逐层深化,从网络模型到线程模型,包括线程行列的堵塞,超时,中止。这儿面的许多内容都值得咱们进行深化的考虑和学习的。假设咱们也写一个网络程序,应该如何处理网络请求,应该如何选用异步处理来增大吞吐量,提高程序运行的效率。还要如何做好线程和异常的善后工作。相信读者在阅读完源码后,一定会有着自己的考虑。