从源码看hystrix的工作原理

Hystrix是Netflix开源的一个限流熔断的项目、主要有以下功能:

隔离(线程池隔离和信号量隔离):限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。

优雅的降级机制:超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。

熔断:当失败率达到阀值自动触发降级(如因网络故障/超时造成的失败率高),熔断器触发的快速失败会进行快速恢复。

缓存:提供了请求缓存、请求合并实现。支持实时监控、报警、控制(修改配置)


从源码看hystrix的工作原理

Hystrix主要有4种调用方式:

toObservable() 方法 :未做订阅,只是返回一个Observable 。

observe() 方法 :调用 #toObservable() 方法,并向 Observable 注册 rx.subjects.ReplaySubject 发起订阅。

queue() 方法 :调用 #toObservable() 方法的基础上,调用:Observable#toBlocking() 和 BlockingObservable#toFuture() 返回 Future 对象

execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。

主要的执行逻辑:

1.每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.

2.执行execute()/queue做同步或异步调用.

3.判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.

4.判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.

5.调用HystrixCommand的run方法.运行依赖逻辑

依赖逻辑调用超时,进入步骤8.

6.判断逻辑是否调用成功。返回成功调用结果;调用出错,进入步骤8.

7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

8.getFallback()降级逻辑。以下四种情况将触发getFallback调用:

run()方法抛出非HystrixBadRequestException异常。

run()方法调用超时

熔断器开启拦截调用

线程池/队列/信号量是否跑满

没有实现getFallback的Command将直接抛出异常,fallback降级逻辑调用成功直接返回,降级逻辑调用失败抛出异常.

9.返回执行成功结果

下面结合源码看一下:

在execute中调用queue的get方法,同步获取返回结果

<code>public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}/<code>

在queue中是调用toObservable()方法

<code>public Future queue() { 

final Future delegate = toObservable().toBlocking().toFuture();
//省略一些无关的代码
};
/<code>

在toObservale中先判断是否在缓存中有,如果有的话则从缓存中取,没有的话则进入applyHystrixSemantics

<code>public Observable toObservable() {
//省略....
return Observable.defer(new Func0<observable>>() {
@Override
public Observable call() {
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}

commandStartTimestamp = System.currentTimeMillis();

if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}

final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
//先从缓存中获取如果有的话直接返回
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

Observable hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);

Observable afterCache;

// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
//里面订阅了,所以开始执行hystrixObservable
HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
/<observable>
/<code>

在applyHystrixSemantics中先判断是否开启熔断器,如果开启直接走失败逻辑,接着尝试获取信号量实例,如果获取不到,则也失败(hystrix的隔离模式有信号量和线程池2种,这里如果没开启信号量模式,这里的TryableSemaphore tryAcquire默认返回的都是true),接着在return 中进入executeCommandAndObserve

<code>private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
executionHook.onStart(_cmd);

/* determine if we're allowed to execute */
//判断是否开启熔断器
if (circuitBreaker.attemptExecution()) {
//获取信号量实例
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
//singleSemaphoreRelease可能在doOnTerminate、doOnUnsubscribe中重复调用
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};

final Action1<throwable> markExceptionThrown = new Action1<throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//拒绝逻辑
return handleSemaphoreRejectionViaFallback();
}
} else {
//直接走失败的逻辑

return handleShortCircuitViaFallback();
}
}/<throwable>/<throwable>
/<code>

在executeCommandAndObserve中进行一些超时,及失败的逻辑处理之后,进入executeCommandWithSpecifiedIsolation

<code>private Observable executeCommandAndObserve(final AbstractCommand _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//省略...
//失败处理逻辑
final Func1<throwable>> handleFallback = new Func1<throwable>>() {
@Override
public Observable call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}

return handleFailureViaFallback(e);
}
}
};
//判断是否开启超时设置
if (properties.executionTimeoutEnabled().get()) {
//list添加超时操作
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator(_cmd));

} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
/<throwable>/<throwable>
/<code>

在executeCommandWithSpecifiedIsolation,先判断是否进行线程隔离,及一些状态变化之后,进入getUserExecutionObservable

<code>private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) {
//线程隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
//当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。Observable是返回的
return Observable.defer(new Func0<observable>>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
//状态校验
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//统计标记命令
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error

*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<observable>>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
/<observable>/<boolean>
/<observable>
/<code>

在getUserExecutionObservable和getExecutionObservable中,主要是封装用户定义的run方法

<code>private Observable getUserExecutionObservable(final AbstractCommand _cmd) {
Observable userObservable;

try {
//获取用户定义逻辑的observable
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}

return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}

final protected Observable getExecutionObservable() {
return Observable.defer(new Func0<observable>>() {
@Override
public Observable
call() {
try {
//包装定义的run方法
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
/<observable>
/<code>

JAVA进阶架构程序员福利:我这里还总结整理了比较全面的JAVA相关的面试资料,都已经整理成了

PDF版,这些都可以分享给大家,关注私信我:【806】,免费领取!


分享到:


相關文章: