hystrix学习总结

hystrix是什么?

官网对hystrix的定义如下:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

个人理解如下:

  1. 首先 Hystrix 只是一个库,意味着非常容易使用,通过maven或gradle引入即可。
  2. Hystrix 的目的是在访问远程系统,服务或使用第三方库的时候由于网络故障,延迟或其他错误可能导致的系统级联故障的点进行隔离。从而使得整个分布式系统具有弹性和容错能力。

hystrix基本用法

HystrixCommand

使用Hystrix最简单直接的方式就是继承HystrixCommand对象并复写run方法。

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class HelloWorldCommand extends HystrixCommand<String> {

private final String name;

public HelloWorldCommand(String name) {
// 通过工厂方法HystrixCommandGroupKey.Factory.asKey
// 指定CommandGroup的名称
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}

@Override
protected String run() {
return "Hello " + name + "!";
}
// 注意只有非 HystrixBadRequestException 才会触发fallback
@Override
public String getFallback() {
return "fallback: " + name;
}
}

使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

String result = new HelloWorldCommand("jiexiu").execute();

Future<String> result = new HelloWorldCommand("jiexiu").queue();

//从名字也能看出来,需要在返回值上注册一个观察者来获取结果
Observable<String> observable = new HelloWorldCommand("jiexiu").observe();
// 或
Observable<String> stringObservable1 = helloWorldCommand.toObservable();
// 注册观察者 获取结果
// 先执行onNext再执行onCompleted
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("onNext: " + v);
}
});
truetrue
// 只有成功才会被回调
observable.subscribe(new Action1<String>() {
true// 相当于上面的onNext()
true// @Override
truepublic void call(String v) {
truetrueSystem.out.println("call: " + v);
true}
});

execute方法用来同步获取执行结果(内部还是通过线程池异步执行)内部逻辑如下:

1
2
3
4
5
6
7
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

queue 通过返回一个Future来异步获取结果。

observe和toObservable 通过返回一个Observable, 需要用户注册一个回调来接收调用结果。

observetoObservable的区别是:observe 在注册回调前就已经执行服务调用,toObservable是在注册回调后才开始执行。

HystrixObservableCommand

HystrixObservableCommand 和 HystrixCommand 功能是相同的。不过HystrixObservableCommand的定位是用在全异步的环境下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class HelloHystrixObservableCommand extends HystrixObservableCommand<String> {

private String name;

public HelloHystrixObservableCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("HelloHystrixObservableCommand"));
this.name = name;
}

@Override
protected Observable<String> construct() {
// return Observable.just("1", "2", "3");
// return Observable.just("Hello " + name);
// return Observable.from(new String[]{"1", "2"});

// OnSubscribe 是一个Callback, 当 Observable被注册的时候会被执行
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//remote http call
try {// 写业务逻辑,注意try-catch
if (!subscriber.isUnsubscribed()) {
RestTemplate restTemplate = new RestTemplate();
String result = restTemplate.getForObject("http://www.jiexiu.com", String.class);
//
subscriber.onNext(result);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
/**
* fallback
*/
@Override
public Observable<String> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
observer.onNext("fallback-jiexiu");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
public static void main(String[] args) throws IOException {
HelloHystrixObservableCommand observableCommand =
new HelloHystrixObservableCommand("jiexiu");
Observable<String> observable = observableCommand.construct();
observable.subscribe(System.out::println);

System.in.read();
}
}

fallback

Hystrix中实现优雅降级有2中方式:

  1. 通过在HystrixCommand中添加getFallback方法
  2. 通过在HystrixObservableCommand中添加resumeWithFallback方法

fallback 触发时机

  1. run方法或construct方法执行异常(非HystrixBadRequestException异常)。
  2. run方法或construct方法超时
  3. 线程池满了或信号量为空
  4. 断路器处于打开状态

注意: run方法抛出的所有异常中除了HystrixBadRequestException异常外都会被记录为失败,触发fallback,更新断路器统计信息。

Hystrix 隔离机制

Hystrix有2中实现请求故障隔离的机制。一种是利用线程池,一种是使用信号量,默认是使用线程池。

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public HelloWorldCommand(String name) {
// super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("helloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-ThreadPool"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(5) //默认是10
.withMaximumSize(10) //默认是10
.withKeepAliveTimeMinutes(5) //默认是1,单位为分钟
.withMaxQueueSize(100) // 默认是-1, 意味着底层使用 SynchronousQueue, 该属性不能动态修改
.withQueueSizeRejectionThreshold(80)
)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(3000) //默认是1s
.withCircuitBreakerEnabled(true) //默认是true

)
);
this.name = name;
}

线程池机制很容易理解,不同的HystrixCommandGroup底层使用不同的线程池,彼此不干扰。
可以通过andThreadPoolKey配置了线程池的名称。如果没有通过andThreadPoolKey来设置线程池的名称,默认使用withGroupKey设置的名称。

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("helloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-Semaphore-ThreadPool"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(3000) //默认是1s
.withCircuitBreakerEnabled(true) //默认是true
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
//默认值10
.withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
//默认值10
)
);
this.name = name;
}

熔断器 CircuitBreaker

通常有2

一些细节

HystrixCommandGroupKey 需要指定,用来将不同的HystrixCommand组织起来

HystrixCommandKey 的默认值是 Command的类名称(不带包名),也可以单独指定。

HystrixThreadPoolKey 的默认值是 HystrixCommandGroupKey。但是建议手动指定HystrixCommand执行的HystrixThreadPool的名称。因为同属一个Group下的Command可能需要在不同的HystrixThreadPool中隔离执行。

在fallback的处理中,如果需要调用一个远程服务获取值(e.g. 查询缓存) 那么最好使用单独的线程池来执行,否则可能由于主Command的执行线程池已经满了导致fallback不能正常工作。

文章目录
  1. 1. hystrix是什么?
  2. 2. hystrix基本用法
    1. 2.1. HystrixCommand
    2. 2.2. HystrixObservableCommand
  3. 3. fallback
    1. 3.1. fallback 触发时机
  4. 4. Hystrix 隔离机制
    1. 4.1. 线程池
    2. 4.2. 信号量
  5. 5. 熔断器 CircuitBreaker
  6. 6. 一些细节
|