博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
转 Hystrix超时实现机制
阅读量:4945 次
发布时间:2019-06-11

本文共 7214 字,大约阅读时间需要 24 分钟。

HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserveprivate Observable
executeCommandAndObserve(final AbstractCommand
_cmd) { ···省略部分代码··· Observable
execution; //判断是否开启超时监测 if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator
(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }

executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。

2.关键点: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现

TimerListener listener = new TimerListener() {                @Override                public void tick() { if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // 标记事件,可以认为是开的hook,这里暂忽略 originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); //取消原Obserable的订阅 s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); } } //获取配置的超时时间配置 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } };

这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出HystrixTimeoutException异常,紧接着被command的 doOnErron接收走 fallback逻辑

fallbackprivate Observable
executeCommandAndObserve(final AbstractCommand
_cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); ................................. final Func1
> handleFallback = new Func1
>() { @Override public Observable
call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { //此处catch到超时异常 return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; ................................. return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }

同时s.unsubscribe()通知正在执行的线程,终止任务。如何终止呢?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的参数就是HystrixContextScheduler, Rxjava里 scheduler具体干活的是 worker,我们先看下Hystrix自定义scheduler的结构示意图

 

那么我们直奔主题,直接看
ThreadPoolWorker
//ThreadPoolWorker.schedule@Overridepublic Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask
f = (FutureTask
) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; }

1.开始的时候判断observable是否被订阅

2.被订阅后,将任务 submit到线程池
3.FutureCompleterWithConfigurableInterrupt scheduler在执行的时候,增加了observable的中断探测

private static class FutureCompleterWithConfigurableInterrupt implements Subscription { private final FutureTask
f; private final Func0
shouldInterruptThread; private final ThreadPoolExecutor executor; private FutureCompleterWithConfigurableInterrupt(FutureTask
f, Func0
shouldInterruptThread, ThreadPoolExecutor executor) { this.f = f; this.shouldInterruptThread = shouldInterruptThread; this.executor = executor; } @Override public void unsubscribe() { executor.remove(f); if (shouldInterruptThread.call()) { f.cancel(true); } else { f.cancel(false); } } .....省略代码....... }

当observable 取消订阅时,就会把当前任务移除,并中断任务

到这里只是讲说了超时后的处理,如何认定执行超时呢?

3.匠心之巧

这里有个很巧妙的设计,再探HystrixObservableTimeoutOperator

final Reference
tl = HystrixTimer.getInstance().addTimerListener(listener);#com.netflix.hystrix.util.HystrixTimer#addTimerListenerpublic Reference
addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; ScheduledFuture
f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }

利用了ScheduledThreadPoolExecutor,延迟执行,延迟时间就是我们设定的超时时间,我们再看下

#HystrixObservableTimeoutOperatorSubscriber
parent = new Subscriber
() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } ..... ..... ..... ..... ..... ..... ..... ..... ..... private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } };

这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener 的ScheduledFuture 线程,迫使超时机制失效。

// tl.clear()private static class TimerReference extends SoftReference
{ private final ScheduledFuture
f; .... .... .... .... .... @Override public void clear() { super.clear(); // stop this ScheduledFuture from any further executions f.cancel(false); } }

4.回归文字

HystrixCommand里有个 TimedOutStatus 超时状态

 

现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。

 

系列文章推荐

作者:青芒v5
链接:https://www.jianshu.com/p/60074fe1bd86
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

转载于:https://www.cnblogs.com/duanxz/p/10950012.html

你可能感兴趣的文章
E20170828-mk
查看>>
E20170905-mk
查看>>
E20180527-hm
查看>>
【Knockout】三、data-bind声明式绑定
查看>>
虚拟机共享文件夹不显示
查看>>
解决移动端click点击问题
查看>>
Java之排序总结
查看>>
分页组件
查看>>
#100天计划# 2013年9月30日
查看>>
图论:Floyd-多源最短路、无向图最小环
查看>>
浏览指南
查看>>
asp.net中MVC多语言包的使用
查看>>
session
查看>>
第三周作业1
查看>>
hdu 1021 Fibonacci Again 递推数列模周期
查看>>
【前端_js】bootstrap——表格插件的使用
查看>>
关于tensorflow中维度的问题
查看>>
图解排序算法(五)之快速排序——三数取中法
查看>>
Mysql_Explain_语法详细解析
查看>>
六合一串口模块使用说明
查看>>