Hystrix 熔斷器源碼淺析

回顧

為了防止服務之間的調用異常造成的連鎖反應,在SpringCloud中提供了Hystrix組件來實現服務調用異常的處理,或對高併發情況下的服務降級處理 。簡單回顧一下Hystrix的使用:

1.要使用 Hystrix熔斷機制處理引入它本身的依賴之外,我們需要在主程序配置類上貼 @EnableHystrix 標籤 開啟Hystrix功能,如下

@EnableHystrix
@EnableEurekaClient
@SpringBootApplication
...
public class ConsumerApplication {

2.開啟Hystrix熔斷機制後,對方法進行熔斷處理

@Service
public class HelloService {
@Autowired
private RestTemplate restTemplate;
//該註解對該方法創建了熔斷器的功能,並指定了fallbackMethod熔斷方法
@HystrixCommand(fallbackMethod = "hiError")
public String hiService(String name){
//調用接口進行消費
String result = restTemplate.getForObject("http://PRODUCER/hello?name="+name,String.class);
return result;
}
public String hiError(String name) {
return "hi,"+name+"error!";
}
}

當hiService方法第調用異常,會觸發 fallbackMethod執行的hiError方法做成一些補救處理。

那麼我們就沿著我們的使用方式來跟蹤一下 Hystrix的 工作原理。

首先我們看一下標籤:@EnableHystrix ,他的作用從名字就能看出就是開啟Hystrix ,我們看一下它的源碼

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}

它上面有一個註解:@ EnableCircuitBreaker ,翻譯單詞意思就是啟用熔斷器(斷路器),那麼@ EnableHystrix標籤的本質其實是@ EnableCircuitBreaker ,我們看一下他的源碼

/**
* Annotation to enable a CircuitBreaker implementation.
* http://martinfowler.com/bliki/CircuitBreaker.html
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}

@EnableCircuitBreaker標籤引入了一個@Import(EnableCircuitBreakerImportSelector.class) 類,翻譯類的名字就是 , 開啟熔斷器的導入選擇器 ,導入什麼東西呢?看源碼

/**
* Import a single circuit breaker implementation Configuration
* @author Spencer Gibb
*/
@Order(Ordered.LOWEST_PRECEDENCE - 100)

public class EnableCircuitBreakerImportSelector extends
SpringFactoryImportSelector<enablecircuitbreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty(
"spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE);
}
}
/<enablecircuitbreaker>

翻譯類上的註釋 “Import a single circuit breaker implementation Configuration”,其實EnableCircuitBreakerImportSelector的作用就是去導入熔斷器的配置 。其實Spring中也有類似於JAVA SPI 的加載機制, 即會自動加載 jar包 spring-cloud-netflix-core 中的META-INF/spring.factories 中的Hystrix相關的自動配置類

注:SPI : 通過將服務的接口與實現分離以實現解耦,提高程序拓展性的機制,達到插拔式的效果 。


Hystrix 熔斷器源碼淺析

HystrixCircuitBreakerConfiguration 就是針對於 Hystrix熔斷器的配置

/**
* @author Spencer Gibb
* @author Christian Dupuis
* @author Venil Noronha
*/
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
......

在該配置類中創建了 HystrixCommandAspect


/**
* AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
*/
@Aspect
public class HystrixCommandAspect {
private static final Map<hystrixpointcuttype> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<hystrixpointcuttype>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
//定義切點,切到 @HystrixCommand標籤所在的方法
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {

}
//針對切點:@hystrixCommand切點的處理
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//獲取到目標方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//判斷方法上不能同時存在@HystrixCommand標籤和HystrixCollapser標籤
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//把方法封裝成 HystrixInvokable
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
// 通過CommandExecutor來執行方法
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
/<hystrixpointcuttype>/<hystrixpointcuttype>

HystrixCommandAspect 其實就是對 貼了@HystrixCommand標籤的方法使用 Aop機制實現處理 。代碼中通過把目標方法封裝成 HystrixInvokable對象,通過CommandExecutor工具來執行目標方法。

HystrixInvokable是用來幹嘛的?看源碼知道,其實他是一個空行法的接口,他的目的只是用來標記可被執行,那麼他是如何創建的我們看代碼HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);的create方法

 public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
...省略代碼...
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}

其實是new了一個 GenericCommand 對象,很明顯他們是實現關係,我們看一下關係圖


Hystrix 熔斷器源碼淺析


跟蹤 GenericCommand 的源碼

@ThreadSafe
public class GenericCommand extends AbstractHystrixCommand<object> {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
public GenericCommand(HystrixCommandBuilder builder) {
super(builder);
}
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", this.getCommandKey().name());
return this.process(new AbstractHystrixCommand<object>.Action() {
Object execute() {
return GenericCommand.this.getCommandAction().execute(GenericCommand.this.getExecutionType());
}
});
}
protected Object getFallback() {
final CommandAction commandAction = this.getFallbackAction();
if (commandAction != null) {
try {
return this.process(new AbstractHystrixCommand<object>.Action() {
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable var3) {
LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
}
} else {
return super.getFallback();
}
}
}
/<object>/<object>/<object>

它本身對目標方法的正常執行和對 fallback方法的 執行做了實現 。

GenericCommand.this.getCommandAction().execute(...)獲取到目標方法並執行,底層會交給 MethodExecutionAction 使用反射去執行方法,

回到 HystrixCommandAspect的methodsAnnotatedWithHystrixCommand方法中,我們看下 CommandExecutor.execute是如何執行的

public class CommandExecutor {
public CommandExecutor() {
}
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch(executionType) {
//異步
case SYNCHRONOUS:
return castToExecutable(invokable, executionType).execute();
//同步
case ASYNCHRONOUS:
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
case OBSERVABLE:
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable)invokable;
} else {
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
}

這裡有兩種執行方式 SYNCHRONOUS 異步 ,ASYNCHRONOUS同步 ,我們先看異步: castToExecutable(invokable, executionType).execute(); 這裡代碼把HystrixInvokable對象轉成 HystrixExecutable並調用execute方法執行 ,跟蹤execute方法進入HystrixCommand.execute方法中

 public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}

}
--------------
public Future queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future delegate = toObservable().toBlocking().toFuture();

final Future f = new Future() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
* (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
* issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
* The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
* than that interruption request cannot be taken back.
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
....省略...

在 HystrixCommand.execute方法中 其實是Future 來異步執行,調用過程中會觸發 GenericCommand來完成調用,執行完成後調用 Future.get()方法拿到執行結果 。


分享到:


相關文章: