Java精選面試題(微信小程序):5000+道面試題和選擇題,包含Java基礎、并發、JVM、線程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架構設計、大廠真題等,在線隨時刷題!
1、通信底層介紹
xxl-job 使用 netty http 的方式進行通信,雖然也支持 Mina,jetty,netty tcp 等方式,但是代碼里面固定寫死的是 netty http。
2、通信整體流程
我以調度器通知執行器執行任務為例,繪制的活動圖:
![]()
3、驚艷的設計
看完了整個處理流程代碼,設計上可以說獨具匠心,將 netty,多線程的知識運用得行云流水。
我現在就將這些設計上出彩的點總結如下:
使用動態代理模式,隱藏通信細節
xxl-job 定義了兩個接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發執行等操作,AdminBiz 封裝了回調,注冊,取消注冊操作,接口的實現類中,并沒有通信相關的處理。
XxlRpcReferenceBean 類的 getObject() 方法會生成一個代理類,這個代理類會進行遠程通信。
全異步處理
執行器收到消息進行反序列化,并沒有同步執行任務代碼,而是將任務信息存儲在 LinkedBlockingQueue 中,異步線程從這個隊列中獲取任務信息,然后執行。
而任務的處理結果,也不是說處理完之后,同步返回的,也是放到回調線程的阻塞隊列中,異步的將處理結果返回回去。
這樣處理的好處就是減少了 netty 工作線程的處理時間,提升了吞吐量。
對異步處理的包裝
對異步處理進行了包裝,代碼看起來是同步調用的。
我們看下調度器,XxlJobTrigger 類觸發任務執行的代碼:
public static ReturnT runExecutor(TriggerParam triggerParam, String address){
ReturnT runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//這里面做了很多異步處理,最終同步得到處理結果
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT (ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("
address:").append(address);
runResultSB.append("
code:").append(runResult.getCode());
runResultSB.append("
msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run 方法我們說過了,是走的動態代理,和執行器進行通信,執行器執行結果也是異步處理完,才返回的,而這里看到的 run 方法是同步等待處理結果返回。
我們看下xxl-job是如何同步獲取處理結果的:調度器向執行器發出消息后,該線程阻塞。等到執行器處理完畢后,將處理結果返回,喚醒被阻塞的線程,調用處拿到返回值。
動態代理代碼如下:
//代理類中的觸發調用
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
thrownew XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse 類中實現了線程的等待,和線程喚醒的處理:
//返回結果,喚醒線程
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
//線程阻塞
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}if (!done) {
thrownew XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
有的同學可能會問了,調度器接收到返回結果,怎么確定喚醒哪個線程呢?
每一次遠程調用,都會生成 uuid 的請求 id,這個 id 是在整個調用過程中一直傳遞的,就像一把鑰匙,在你回家的的時候,拿著它就帶開門。
這里拿著請求 id 這把鑰匙,就能找到對應的 XxlRpcFutureResponse,然后調用 setResponse 方法,設置返回值,喚醒線程。
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// 通過requestId找到XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// 里面調用lock的notify方法
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);}
來源:https://blog.csdn.net/weixin_45334346?type=blog
公眾號“Java精選”所發表內容注明來源的,版權歸原出處所有(無法查證版權的或者未注明出處的均來自網絡,系轉載,轉載的目的在于傳遞更多信息,版權屬于原作者。如有侵權,請聯系,筆者會第一時間刪除處理!
最近有很多人問,有沒有讀者或者摸魚交流群!加入方式很簡單,公眾號Java精選,回復“加群”,即可入群!
文章有幫助的話,點在看,轉發吧!
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.