Spring Cloud教程 第六弹 Hystrix底层原理核心篇 您所在的位置:网站首页 spring底层是怎么实现的 Spring Cloud教程 第六弹 Hystrix底层原理核心篇

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

2024-05-24 13:36| 来源: 网络整理| 查看: 265

更多Spring与微服务相关的教程请戳这里 Spring与微服务教程合集

1、Hystrix资源隔离技术

hystrix github 官方文档:Home · Netflix/Hystrix Wiki · GitHub

hystrix可以完成隔离、限流、熔断、降级这些常用保护功能。

hystrix的隔离分为线程池隔离和信号量隔离

1.1、信号量隔离 信号量隔离就是hystrix的限流功能。虽然名字叫隔离,实际上它是通过信号量来实现的,而信号量说白了就是个计数器,当计数器计算达到设定的阈值,直接就做异常处理。而hystrix信号量隔离限制的是tomcat等Web容器线程数,一段时间仅仅能支持这么多,多余的请求再来请求线程资源,就被拒绝了。由于实际是通过隔离了部分容器线程资源,也算是一种隔离方式信号量隔离是同步的,所以不支持计算超时时间(需要自己手动实现)信号量隔离只是起了个限制作用,它的保护能力有限,如果下游服务有问题,长时间不返回结果,那也只能等着。因为本身信号量隔离对单个请求是起不到任何作用的,它只能限制请求过多问题(请求过多则拒绝,不让整个服务挂)

为了解决这个问题,hystrix又产生了线程池隔离。这种隔离方式是通过引入额外线程(这里的额外是相对于Tomcat的线程来说的,引入额外线程会造成额外开销)的方式。

1.2、线程池隔离

Hystrix采用额外的线程来对原来的web容器线程做管理控制,如果一个线程超时未返回,则熔断。既然引入额外的线程就涉及到线程池管理、线程的上下文切换这些额外的开销,所以相比信号量隔离,线程池隔离成本更高。

Hystrix可以为每一个依赖建立一个线程池,使之和其他依赖的使用资源隔离,同时限制他们的并发访问和阻塞扩张。每个依赖可以根据权重分配资源(这里主要是线程),每一部分的依赖出现了问题,也不会影响其他依赖的使用资源。

1

Hystrix使用Command模式对依赖调用进行封装,每一个Command就是一个上面所说的依赖,Command可以指定CommandKeyCommand Group可以把一组Command归为一组,默认情况下,Hystrix会为每一个Command Group建立一个线程池每一个线程池都有一个Key,这个线程池就是线程隔离的关键,所有的监控、缓存、调用等等都来自于这个线程池,默认的Thread Pool key就是command group名称默认情况下,每一个Command Group会自动创建一个线程池,尽管如此,还是会有这样的情况:当有一些依赖在一个Command Group中,但是又有隔离的必要的时候(比如一个依赖的超时会用满所有的线程池线程,这样就会影响到其他的依赖),可以为组里的Command指定Thread Pool Key

1.3、Command模式

Command模式:Hystrix中大量使用rxjava来实现Command模式。所有自定义的Command,不管继承于HystrixObservableCommand还是HystrixCommand,最终都继承于AbstractCommand。

关于Command的初始化过程,后面会通过源码讲到。

1.4、如何选择隔离策略

信号量隔离:

对于那些本来延迟就比较小的请求来说,线程池带来的开销是非常高的,因此用信号量隔离更好适用于不是对外部依赖的访问,因为对外部依赖的访问时长难以控制针对超大并发量的场景下,此时用线程池的话,可能撑不住那么高的并发,如果硬撑住,可能要耗费大量的线程资源,那么就用信号量,来进行限流保护

2、Hystrix 工作原理 2.1、流程

2.1.1、构造HystrixCommand或HystrixObservableCommand对象

返回单个响应,则:

HystrixCommand command = new HystrixCommand(arg1, arg2);

返回一个发出响应的可观测对象:

HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

2.1.2、执行Command

有四种方式:

execute():阻塞,返回单个响应(或者在错误的情况下抛出一个异常)queue():返回一个包含单个响应的Futureobserve():订阅可返回响应的可观察对象,并且返回一个Observable(复制源Observable)toObservable():返回一个可观察对象,当你订阅它时,它将执行Hystrix命令并发出它的响应

注意:

第一种方式和第二种方式只适用于HystrixCommand,不适用于HystrixObservableCommand

execute()实际上是调用queue().get()。而queue()实际上是调用toObservable(). toblocking (). tofuture()。也就是说,最终每个HystrixCommand都有一个可观察的实现支持,即使是那些打算返回单个简单值的命令

K value = command.execute(); Future fValue = command.queue(); Observable ohValue = command.observe(); //热observable Observable ocValue = command.toObservable(); //冷observable

2.1.3、是否缓存了响应

如果这个命令启用了请求缓存,并且缓存中有对请求的响应,那么这个缓存的响应将立即以可观察对象的形式返回

请求缓存位于线程执行之前,即construct()或run()方法调用的前面

如果Hystrix没有实现请求缓存,则我们需要在construct()或run()方法里面手动实现请求缓存,即线程执行之后

2.1.4、断路器是否打开

当你执行命令时,Hystrix检查断路器,看看电路是否打开。如果电路打开(或触发),那么Hystrix将不会执行命令,但会将流路由到(8)获得回退。

如果电路关闭,则流继续流到(5),以检查是否有可用的容量来运行命令。

2.1.5、线程池、队列、信号量容量是否饱满

如果使用线程池隔离,则检查线程池、队列容量是否饱满

如果使用信号量隔离,则检查信号量是否饱满

2.1.6、执行请求 HystrixCommand.run() — 返回单个响应或者抛出异常,然后发出一个onCompleted通知HystrixObservableCommand.construct() —返回一个可发出响应的可观测对象或者发出一个onError的事件

2.1.7、计算电路是否健康

Hystrix向断路器报告成功、失败、拒绝和超时,断路器维护一组滚动计数器来计算统计数据。

它使用这些统计数据来确定电路何时应该“跳闸”,在这一点上它会短路任何后续请求,直到恢复期结束,在恢复期结束后,它会在第一次检查后再次关闭电路。

2.1.8、获取回退方法 在HystrixCommand的情况下,为了提供回退逻辑,你需要实现HystrixCommand. getfallback(),它返回一个回退值在HystrixObservableCommand的情况下,要提供回退逻辑,你需要实现HystrixObservableCommand. resumewithfallback(),它返回一个可以发出回退值的可观察对象如果没有为Hystrix命令实现回退方法,或者回退本身抛出异常,那么Hystrix仍然会返回一个可观察对象,但它不发出任何东西,并立即以onError通知结束

如果没有实现回退,则不同的执行方法会有不同的现象:

execute():抛出异常queue():会成功返回Future,但是如果调用了它的get()方法,Future将抛出异常observe() :返回一个可观察对象,当你订阅它时,它将通过调用订阅者的onError方法立即终止toObservable() :返回一个可观察对象,当你订阅它时,它将通过调用订阅者的onError方法来终止

2.1.9、返回成功的响应

3、Hystrix API com.netflix.hystrix hystrix-core 1.5.18

因为HystrixCommand是一个抽象类,所以我们需要继承它并重写run方法

import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; public class CommandHelloWorld extends HystrixCommand { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("HelloGroup")); this.name = name; } @Override protected String run() { return "Hello " + name + "!"; } public static void main(String[] args) { String s = new CommandHelloWorld("bobo").execute(); System.out.println(s); } }

更多API使用方法请戳这里:How To Use · Netflix/Hystrix Wiki · GitHub

4、源码解读

关于HystrixCommand,包括HystrixObservableCommand,都是继承AbstractCommand的,而AbstractCommand有一个最核心的构造方法,该构造方法包含了该command的所有配置

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }

而前面说到,Thread Pool,Command Group,Command Key都在AbstractCommand这里实现

而正好上面的代码中,其中有一行是这样的:

this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

这一行代码表示加载线程池,点进去看看

再点进去看看

点进去HystrixThreadPoolDefault的构造方法,看看做了什么

再点进去看看,就能看到最终创建线程池的代码了



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有