客户端负载均衡
概述
- Spring Cloud Ribbon是-个基于HTTP和TCP的客户端负载均衡工具, 它基于Netflix Ribbon实现。
- 通过Spring Cloud的封装,可以让我们轻松地将面向服务的REST模板请求自动转换成客户端负载均衡的服务调用。
- Spring Cloud Ribbon虽然只是一个工具类框架,它不像服务注册中心、配置中心、API网关那样需要独立部署,但是它几乎存在于每一个Spring Cloud构建的微服务和基础设施中。
- 因为微服务间的调用,API网关的请求转发等内容,实际上都是通过Ribbon来实现的,包括后续我们将要介绍的Feign,它也是基于Ribbon实现的工具。
- 对Spring Cloud Ribbon的理解和使用,对于我们使用Spring Cloud来构建微服务非常重要。
负载均衡
- 负载均衡在系统架构中是一个非常重要,并且是不得不去实施的内容。
- 因为负载均衡是对系统的高可用、网络压力的缓解和处理能力扩容的重要手段之一。
服务端负载均衡
我们通常所说的负载均衡都指的是服务端负载均衡,其中分为硬件负载均衡和软件负载均衡。
- 硬件负载均衡主要通过在服务器节点之间安装专门用于负载均衡的设备,比如F5等
- 而软件负载均衡则是通过在服务器上安装一些具有均衡负载功能或模块的软件来完成请求分发工作,比如Nginx等。
不论采用硬件负载均衡还是软件负载均衡,只要是服务端负载均衡都能以类似下图的架构方式构建起来:
硬件负载均衡和服务端负载均衡的软件模块都会维护一个下挂可用的服务端清单,通过心跳检测来剔除故障的服务端节点,保证清单中都是可以正常访问的服务端节点。
客户端发送请求到负载均衡设备时,该设备按照某种算法(如线性轮询,权重负载,流量负载),从维护的可用服务端清单中取出一台服务端的地址,然后进行转发。
客户端负载均衡
客户端的负载均衡和服务端负载均衡最大的不同点在于上面提到的服务清单所存储的位置。
在客户端负载均衡中,所有的客户端节点都维护着自己要访问的服务端清单,这些服务清单来自五服务注册中心(如Eureka等等)。
同服务端负载均衡类型,客户端负载均衡也需要通过心跳维护服务清单的健康性(与服务注册中心配合完成)
Spring Cloud中的服务治理框架中,默认会创建针对各个服务治理框架的Ribbon自动化整合配置
- Eureka:RibbonEureKaAutoConfiguration
- COnsul: RibbonCOnsulAutoConfiguration
Ribbon本地负载均衡,在调用微服务接口时候,会在注册中心上获取注册信息服务列表之后缓存到JVM本地,从而在本地实现RPC远程服务调用技术
通过Spring Cloud Ribbon 的封装,在微服务框架中使用客户端负载均衡非常简单,只需要如下二步:
- 服务提供者只需要启动多个服务实例并注册到一个注册中心或者多个相关联的服务注册中心
- 服务消费者直接通过调用被@LoadBalanced注解修饰过的RestTemplate来实现面向服务的接口调用
RestTemplate
- 简化了发起 HTTP 请求以及处理响应的过程,并且支持 REST 。
1 |
|
内部方法
- 从图中可以看到 RestTemplate 类中的方法主要是来自接口 RestOperations, 我们可以提取出主要的几种方法是:
- GET
- POST
- PUT
- DELETE
- HEAD
- OPTIONS
- EXCHANGE
- EXECUTE
Get 方法
- 在 RestTemplate 中,发送一个 GET 请求,我们可以通过如下两种方式:
getForEntity
getForEntity函数,返回值是一个
ResponseEntity<T>
,ResponseEntity<T>
是 Spring 对 HTTP 请求响应的封装,包括了几个重要的元素,如请求状态码的枚举对象HttpStatus,响应码、contentType、contentLength、响应消息体等。比如下面一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String getHello() {
ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://HELLO-SERVICE/hello?name={1}", String.class,"World");
String body = responseEntity.getBody();
HttpStatus statusCode = responseEntity.getStatusCode();
int statusCodeValue = responseEntity.getStatusCodeValue();
HttpHeaders headers = responseEntity.getHeaders();
StringBuffer result = new StringBuffer();
result.append("responseEntity.getBody():").append(body).append("<hr>")
.append("responseEntity.getStatusCode():").append(statusCode).append("<hr>")
.append("responseEntity.getStatusCodeValue():").append(statusCodeValue).append("<hr>")
.append("responseEntity.getHeaders():").append(headers).append("<hr>");
return result.toString();
}关于这段代码,说如下几点:
getForEntity 的第一个参数为我要调用的服务的地址,这里我调用了服务提供者提供的 /hello 接口,最后的“world”参数会替换url中的{1}占位符
返回的ResponseEntity对象中的Body内容类型会根据第二个参数转为String类型
注意这里是通过服务名调用而不是服务地址,如果写成服务地址就没法实现客户端负载均衡了。
如果希望返回的Body是User对象,可以这样实现
1
2
3ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://HELLO-SERVICE/hello?name={1}", User.class,"World");
User body = responseEntity.getBody();
getForEntity方法重载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)
throws RestClientException;
- url: 请求地址
- responseType: 请求响应体的包装类型
- uriVariables: url中的参数绑定
- 注意:uriVariables是一个数组,它的顺序会对应url占位符定义顺序
<T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables)
throws RestClientException;
- uriVariables: url中的参数类型使用Map
- 使用该方法需要在url占位符中指定map中参数的key值
<T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException;
- 使用URI对象来替代之前的 url 和 uriVariables参数
- URI是JDK中Java.net包下的类,表示一个统一资源标识符(Uniform Resource Identifier)引用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14ResponseEntity<String> responseEntity =
restTemplate.getForEntity("http://HELLO-SERVICE/sayhello?name={1}", String.class, "张三");
Map<String, String> map = new HashMap<>();
map.put("name", "李四");
ResponseEntity<String> responseEntity =
restTemplate.getForEntity("http://HELLO-SERVICE/sayhello?name={name}", String.class, map);
// 通过Spring中提供的UriComponents来构建Uri即可
URI uri = UriComponentsBuilder.fromUriString("http://HELLO-SERVICE/sayhello?name={name}")
.build()
.expand("world")
.encode().toUri();
ResponseEntity<String> responseEntity = restTemplate.getForEntity(uri, String.class);
getForObject
getForObject 函数实际上是对 getForEntity 函数的进一步封装,通过HttpMessageConverterExtractor对Http的请求响应体的body内容进行对象转换,实现请求直接返回包装好的消息体内容。
1
2
3
4
5String book = restTemplate.getForObject("http://HELLO-SERVICE/getbook1", String.class);
return book;
Book book = restTemplate.getForObject("http://HELLO-SERVICE/getbook1", Book.class);
return book;重载实现,参数同getForEntity
1
2
3
4
5
6
7
8
<T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException;
<T> T getForObject(String url, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException;
<T> T getForObject(URI url, Class<T> responseType) throws RestClientException;
POST 方法
- 在 RestTemplate 中,POST 请求可以通过如下三个方法来发起:
postForEntity
该方法和get请求中的getForEntity方法类似,如下例子:
- 方法的第一参数表示要调用的服务的地址
- 方法的第二个参数表示上传的参数
- 方法的第三个参数表示返回的消息体的数据类型
1
2
3
4Book book = new Book();
book.setName("红楼梦");
ResponseEntity<Book> responseEntity = restTemplate.postForEntity("http://HELLO-SERVICE/getbook2", book, Book.class);
return responseEntity.getBody();方法的重载
1 | <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, |
- 这些函数的参数用法大部分月getForEntity一样,这里需要注意的是新增加了request参数,这个参数可以是普通对象,也可以是一个HttpEntity 对象
- 如果是非HttpEntity对象的时候,RestTemplate会将请求对象转换为一个HttpEntity对象阿莱处理,Object就是request的类型,request的内容被当做完整的body来处理
- 如果是一个HttpEntity对象爱那个,会被当做完整的Http请求对打处理,request不仅包含body内容,还有Header内容
postForObject
返回的消息体,可以直接使用postForObject。用法和getForObject一致。
也有三个重载方法, 传入参数与postForEntity一致
1
2
3
4
5
6
7
8
9
10
11
12
<T> T postForObject(String url, Object request, Class<T> responseType,
Object... uriVariables) throws RestClientException;
<T> T postForObject(String url, Object request, Class<T> responseType,
Map<String, ?> uriVariables) throws RestClientException;
<T> T postForObject(URI url, Object request, Class<T> responseType)throws RestClientException;
postForLocation
postForLocation 也是提交新资源,提交成功之后,返回新资源的 URI,该 URI 表示新资源的位置。
也实现了三种不同的重载方法, 参数和前面两种的参数基本一致,只不过该方法的返回值为 URI
1
2
3
4
5
6
7
8
9
10
11
URI postForLocation(String url, Object request, Object... uriVariables)throws RestClientException;
URI postForLocation(String url, Object request, Map<String, ?> uriVariables)
throws RestClientException;
URI postForLocation(URI url, Object request)throws RestClientException;
PUT 方法
在 RestTemplate 中,PUT 请求可以通过 put 方法调用
1
2
3
4Book book = new Book();
book.setName("红楼梦");
// book对象是我要提交的参数,最后的99用来替换前面的占位符{1}
restTemplate.put("http://HELLO-SERVICE/getbook3/{1}", book, 99);三种不同的重载方法
1
2
3
4
5void put(String url, Object request, Object... uriVariables)throws RestClientException;
void put(String url, Object request, Map<String, ?> uriVariables)throws RestClientException;
void put(URI url, Object request)throws RestClientException;put 方法的参数和前面介绍的 postForEntity 方法的参数基本一致,只是 put 方法没有返回值而已。
DELETE 方法
delete 请求我们可以通过 delete 方法调用来实现,如下例子:
1
restTemplate.delete("http://HELLO-SERVICE/getbook4/{1}", 100);
三种不同的重载方法
1
2
3
4
5void delete(String url, Object... uriVariables) throws RestClientException;
void delete(String url, Map<String, ?> uriVariables) throws RestClientException;
void delete(URI url) throws RestClientException;在REST请求时通常将DELETE请求的唯一标识拼接在URL中,所以不需要requet的body信息。
HEADER 方法
返回资源的所有 HTTP headers。
1
2
3
4
5HttpHeaders headForHeaders(String url, Object... uriVariables) throws RestClientException;
HttpHeaders headForHeaders(String url, Map<String, ?> uriVariables) throws RestClientException;
HttpHeaders headForHeaders(URI url) throws RestClientException;
OPTIONS
问可以执行哪些方法。
1
2
3
4
5Set<HttpMethod> optionsForAllow(String url, Object... uriVariables) throws RestClientException;
Set<HttpMethod> optionsForAllow(String url, Map<String, ?> uriVariables) throws RestClientException;
Set<HttpMethod> optionsForAllow(URI url) throws RestClientException;
EXCHANGE
与其它接口的不同:
- 允许调用者指定HTTP请求的方法(GET,POST,PUT等)
- 可以在请求中增加body以及头信息,其内容通过参数 HttpEntity<?>requestEntity 描述
- exchange支持‘含参数的类型’(即泛型类)作为返回类型,该特性通过 ParameterizedTypeReferenceresponseType 描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28<T> ResponseEntity<T> exchange(String url, HttpMethod method, HttpEntity<?> requestEntity,
Class<T> responseType, Object... uriVariables) throws RestClientException;
<T> ResponseEntity<T> exchange(String url, HttpMethod method, HttpEntity<?> requestEntity,
Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException;
<T> ResponseEntity<T> exchange(URI url, HttpMethod method, HttpEntity<?> requestEntity,
Class<T> responseType) throws RestClientException;
<T> ResponseEntity<T> exchange(String url,HttpMethod method, HttpEntity<?> requestEntity,
ParameterizedTypeReference<T> responseType, Object... uriVariables) throws RestClientException;
<T> ResponseEntity<T> exchange(String url, HttpMethod method, HttpEntity<?> requestEntity,
ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws RestClientException;
<T> ResponseEntity<T> exchange(URI url, HttpMethod method, HttpEntity<?> requestEntity,
ParameterizedTypeReference<T> responseType) throws RestClientException;
<T> ResponseEntity<T> exchange(RequestEntity<?> requestEntity, Class<T> responseType)
throws RestClientException;
<T> ResponseEntity<T> exchange(RequestEntity<?> requestEntity, ParameterizedTypeReference<T> responseType)
throws RestClientException;
EXECUTE
上面所有的方法内部返回值都调用了同一个方法 —— execute 方法。
1
2
3
4
5
6
7
8public <T> T postForObject(URI url, Object request, Class<T> responseType)
throws RestClientException {
RequestCallback requestCallback = httpEntityCallback(request, responseType);
HttpMessageConverterExtractor<T> responseExtractor =
new HttpMessageConverterExtractor<>(responseType, getMessageConverters());
return execute(url, HttpMethod.POST, requestCallback, responseExtractor);
}可以看到,Excute方法只是将 String 格式的 URI 转成了 java.net.URI,之后调用了doExecute方法。整个调用过程关键起作用的是 doExecute 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
<T> T execute(String url, HttpMethod method, RequestCallback requestCallback,
ResponseExtractor<T> responseExtractor, Object... uriVariables)
throws RestClientException;
<T> T execute(String url, HttpMethod method, RequestCallback requestCallback,
ResponseExtractor<T> responseExtractor, Map<String, ?> uriVariables)
throws RestClientException;
<T> T execute(URI url, HttpMethod method, RequestCallback requestCallback,
ResponseExtractor<T> responseExtractor)throws RestClientException;
doExecute 方法
代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
ResponseExtractor<T> responseExtractor)throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}RestTemplate 类中可以看到他们两的实现类。: RequestCallback 和 ResponseExtractor
RequestCallback :用于操作请求头和body,在请求发出前执行。
该接口有两个实现类:
1
AcceptHeaderRequestCallback :只处理请求头,用于getXXX()方法
1
HttpEntityRequestCallback: 继承自AcceptHeaderRequestCallback,可以处理请求头和Body,用于put,post和exchage方法
ResponseExtractor:解析HTTP响应的数据,而且不需要担心异常和资源的关闭
- ResponseEntityResponseExtractor 的作用是:使用 HttpMessageConverterExtractor 提取 body(委托模式),然后将 body 和响应头、状态封装成 ResponseEntity 对象。
源码分析
- Ribbon是如何通过Spring 提供的RestTemplate实现客户端负载均衡的?
@LoadBalanced
根据上面的操作,我们只在消费者端就添加了@LoadBalanced注解
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* Annotation to mark a RestTemplate or WebClient bean to be configured to use a
* LoadBalancerClient.
* @author Spencer Gibb
*/
public LoadBalanced {
}通过注释可以知道,该注解用来给RestTemplate做标记。已使用负载均衡客户端(LoadBalancerClient)来配置它
LoadBalancerClient
搜索LoadBalancerClient可以发现,这是Spring Cloud中定义的一个接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60public interface ServiceInstanceChooser {
/**
* Chooses a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @return A ServiceInstance that matches the serviceId.
*/
ServiceInstance choose(String serviceId);
/**
* Chooses a ServiceInstance from the LoadBalancer for the specified service and
* LoadBalancer request.
* @param serviceId The service ID to look up the LoadBalancer.
* @param request The request to pass on to the LoadBalancer
* @param <T> The type of the request context.
* @return A ServiceInstance that matches the serviceId.
*/
<T> ServiceInstance choose(String serviceId, Request<T> request);
}
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* Executes request using a ServiceInstance from the LoadBalancer for the specified
* service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* Executes request using a ServiceInstance from the LoadBalancer for the specified
* service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param serviceInstance The service to execute the request to.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* Creates a proper URI with a real host and port for systems to utilize. Some systems
* use a URI with the logical service name as the host, such as
* http://myservice/path/to/service. This will replace the service name with the
* host:port from the ServiceInstance.
* @param instance service instance to reconstruct the URI
* @param original A URI with the host as a logical service name.
* @return A reconstructed URI.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}从接口中可以通过定义的抽象方法了解客户端负载均和器中具备的能力
- ServiceInstance choose : 从负载均衡器中挑选一个对应服务的实例
- T execute:使用从负载均衡器中挑选一个对应服务的实例来执行请求内容
- URI reconstructURI:为系统构建一个合适的host:port形式的URI
- 分布式中,我们使用逻辑上的服务名称作为host构建URI进行请求,比如用 http://myservice/path/to/service
- 在该操作中,前者的ServiceInstance是带有host的URI,返回的确实根据服务实例拼接的具体请求的地址
在LoadBalancerClient接口的所属包org.springframework.cloud.client.loadbalancer下内容整理后得到如下图所示的关系
LoadBalancerAutoConfiguration
LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置,查看源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.loadbalancer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestTemplate;
/**
* Auto-configuration for blocking client-side load balancing.
*
* @author Spencer Gibb
* @author Dave Syer
* @author Will Tran
* @author Gang Li
* @author Olga Maciaszek-Sharma
*/
public class LoadBalancerAutoConfiguration {
private List<RestTemplate> restTemplates = Collections.emptyList();
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
static class LoadBalancerInterceptorConfig {
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}通过类上的注解可以知道,Ribbon实现负载均衡自动化配置需要满足的下面二个条件
- @ConditionalOnClass(RestTemplate.class): RestTemplate类必须存在于当前的工程环境中
- @ConditionalOnBean(LoadBalancerClient.class):Spring的Bean工厂中必须要有LoadBalancerClient的实现Bean
该启动配置类中,主要做了下面三件事情
- 创建了一个 LoadBalancerInterceptor 的Bean,用于实现对客户端发起请求时的拦截,实现客户端的负载均衡
- 创建了一个 RestTemplateCustomizer 的Bean, 用于给RestTemplate增加LoadBalancerInterceptor 拦截器
- 维护了一个@LoadBalanced注解修饰的RestTemplate对象列表,并进行了初始化。通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡RestTemplate增加LoadBalancerInterceptor 拦截器
LoadBalancerInterceptor
查看LoadBalancerInterceptor 拦截器是如何将一个普通RestTemplate变成负载均衡的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}LoadBalancerInterceptor拦截器中注入了LoadBalancerClient的实现,当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起http请求的时候,会被intercept函数所拦截,由于我们在使用RestTemplate时采用服务名作为host, 通过getHost()拿到服务名后调用excute函数,就会根据服务名来获取实例并发起实际的请求了
LoadBalancerClient实现类
拦截器会调用LoadBalancerClient的execute方法,通过分析找到默认实现类BlockingLoadBalancerClient
主要有几个处理:获取负载均衡器, 获取服务, 执行服务调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = getHint(serviceId);
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
new DefaultRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
return execute(serviceId, serviceInstance, lbRequest);
}
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
try {
T response = request.apply(serviceInstance);
Object clientResponse = getClientResponse(response);
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, defaultResponse, clientResponse)));
return response;
}
catch (IOException iOException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
throw iOException;
}
catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}execute的实现中,第一步就是获取具体的服务实例
1
2
3
4
5
6
7
8
9
10
11
12
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}这里的choose不是使用LoadBalancerClient接口中的choose方法,而是ReactiveLoadBalancer接口的choose函数,这里进行了负载均衡算法获取服务实例。
ReactiveLoadBalancer接口
接口信息如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public interface ReactiveLoadBalancer<T> {
/**
* Default implementation of a request.
*/
Request<DefaultRequestContext> REQUEST = new DefaultRequest<>();
/**
* Choose the next server based on the load balancing algorithm.
* @param request - incoming request
* @return publisher for the response
*/
Publisher<Response<T>> choose(Request request);
default Publisher<Response<T>> choose() { // conflicting name
return choose(REQUEST);
}
interface Factory<T> {
ReactiveLoadBalancer<T> getInstance(String serviceId);
/**
* Allows accessing beans registered within client-specific LoadBalancer contexts.
* @param name Name of the beans to be returned
* @param type The class of the beans to be returned
* @param <X> The type of the beans to be returned
* @return a {@link Map} of beans
* @see <code>@LoadBalancerClient</code>
*/
<X> Map<String, X> getInstances(String name, Class<X> type);
/**
* Allows accessing a bean registered within client-specific LoadBalancer
* contexts.
* @param name Name of the bean to be returned
* @param clazz The class of the bean to be returned
* @param generics The classes of generic types of the bean to be returned
* @param <X> The type of the bean to be returned
* @return a {@link Map} of beans
* @see <code>@LoadBalancerClient</code>
*/
<X> X getInstance(String name, Class<?> clazz, Class<?>... generics);
}
}- choose方法;根据负载均衡算法选择服务器
实现类有下面三个
ReactorLoadBalancer
- ReactorServiceInstanceLoadBalancer
- RandomLoadBalancer
- RoundRobinLoadBalancer
- ReactorServiceInstanceLoadBalancer