前言

  • 之前的文章我们可以构建了Eureka服务治理体系中的三个核心角色:服务注册中心,服务提供者已经服务消费者。
  • 当是在实际的项目中往往复杂得多,如果仅仅依靠之前构建的服务治理,大多数情况下午是无法直接满足业务需求的,我们还需要根据实际情况来做一些配置,调整和扩展。

基础架构

  • Eureka服务治理基础架构有三个核心要素

  • 服务注册中心

  • 服务提供者

  • 服务消费者:很多时候,客户端既是提供者也是消费者

服务提供者

服务注册

  • 服务提供者在启动的时候会通过发送 REST 请求的方式将自己注册到 Eureka Server 上。同时带上自身服务的一些元数据信息。
  • Eureka Server 接收到这个 REST 请求后,将元数据信息存储在一个双层结构的 Map 中
    • 第一层的 key 是服务名(applicationName), 第二层的 key 是具体服务的实例名(instance-id)
  • 在服务注册时, 需要确认一下eureka.client.register-with-eureka的参数,默认值为 True,如果设置为 false进步会启动注册操作。

服务同步

  • 如上面的架构图所示,二个服务提供者分别注册到了二个不同的服务注册中心上,也就是说,它门的信息分别被二个服务注册中心所维护。
  • 此时服务注册中心之间因为相互注册为服务,当服务提供者发送注册请求到一个服务注册中心时,会将该请求转发给集群中相连的其他注册中心,从而实现注册中心之间的服务同步
  • 通过服务同步,二个服务提供者的服务信息就能够通过这二个服务注册中心的任意一台获取到。

服务续约

  • 在注册完服务后,服务提供者会维护一个心跳用来持续告诉 Eureka Server, 自己还能提供服务,以防止Eureka Server将该服务从服务列表中剔除,这个操作称为服务续约(renew)

  • 服务续约有二个重要属性,我们可以根据需要进行调整

    1
    2
    3
    4
    # 用于定义服务续约的调用间隔时间,默认30秒
    eureka.instance.lease-renewal-interval-in-seconds=30
    # 用于定义服务失效的时间,默认90秒
    eureka.instance.lease-expiration-duration-in-seconds=90

服务消费者

获取服务

  • 到这里,服务注册中心已经注册了服务,并且该服务有二个示例。

  • 启动消费者的时候,它会发送一个 REST 请求给服务注册中心,获取上面注册的服务请求,为了性能考虑,Eureka Server会维护一份只读的服务清单返回给客户端,同时该缓存清单30秒更新一次。

  • 获取服务是服务消费者的基础,所以必须确保 eureka.client.fetch-registry 参数没有被修改为false, 该值默认为 true.

  • 如果希望修改缓存清单的更新时间,可以通过下面参数进行修改,单位为秒,默认30秒

    1
    eureka.client.registry-fetch-interval-seconds=30

服务调用

  • 服务消费者在获取服务清单后,通过服务名可以获得具体提供服务的实例名和该示例的元数据信息。
  • 通过服务实例的详细信息,所以客户端可以根据自己需要决定调用那个实例,Ribbon中会默认采用轮询方式调用,从而实现在客户端的负载均衡。
  • 对于访问实例的选择,Eureka中有 Region 和 Zone的概念
    • 一个Region 中可以包含多个 Zone,每个服务客户端需要被注册到一个 Zone中,所以每个客户端对应一个Region和一个Zone。
    • 在服务调用的时候,优先访问处于同一个 zone中的服务提供方。如果访问不到,就访问其他Zone。

服务下线

  • 系统运行过程中必然会面临和关闭或重启服务的某个实例的情况,在服务关闭期间,不希望客户调用关闭的实例。
  • 所以在客户端的服务实例下线(关闭)的时候,会触发一个服务下线的REST请求给Eureka Server,告诉服务注册中心我要下线了,
  • 服务端在接收到请求后,将该服务状态置为下线(DOWN), 并把下线事件传播出去。

服务注册中心

失效剔除

  • 有些时候,服务不一定会正常下线,可能由于内存溢出,网络故障很等问题导致服务不可用。而服务中心没有接收到”服务下线”的请求。

  • 为了从服务列表中将这些无法提供服务的实例剔除,Eureka Server在启动的时候会创建一个定时任务,默认一段时间(默认60秒)将当前清单中超时(默认90秒)没有续约的服务剔除出去。

    1
    2
    # 如过90秒类没有收到微服务心跳,就剔除该服务,单位毫秒
    eureka.server.eviction-interval-timer-in-ms=9000

自动保护

  • 当我们在本地调试基于 Eureka的程序时,基本都会在信息面板上出现红色的警告信息.

  • 实际上,该警告就是触发了 Eureka的自我保护机制。之前提到服务提供者注册到Eureka Server之后会维护一个心跳连接。告诉Eureka Server 自己还活着

  • Eureka Server在运行期间会统计心跳失败的比例子在15分钟内是否低于85%,如果出现低于的情况(单机调试的时候很容易满足,实际上的生产环境通常是由于网路不稳定导致),Eureka Server会将当前的实例注册信息保护起来,让这些实例不会过期,尽可能的保护这些注册信息。

  • 但是在这段时间内,实例若出现问题,那么客户端很容易拿到实际已经不存在的服务实例,进而出现调用失败的情况,客户端必须要有容错机制,比如可以使用请求重试,断路器等机制。

  • 本地调试很容易触发注册中心的保护机制,会使得维护的实例不那么准确,在本地开发的时候,可以使用如下配置来关闭保护机制,确保注册中心可以将不可用的实例正确剔除

    1
    2
    # 关闭自我保护机制
    eureka.server.enable-self-preservation=false

源码分析

  • 上面对Eureka中各个核心元素的通信行为做了详细的介绍。为了更深入理解它的运作和配置,结合源码查看具体是如何实现的。
  • 三个主要核心元素中,Eureka客户端在整个运行机制中是大部分的发起者,注册中心主要是处理请求的接收者,从Eureka客户端作为切入点观察如何完成通信行为。
  • 将SpringBoot注册到Eureka Server或者获取服务列表做了下面的事情
    1. 主应用类中配置@EnableDiscoveryClient注解
    2. 在配置文件中配置eureka.client.service-url.defaultZone指定服务注册中心的位置。

@EnableDiscoveryClient注解

  • 查看源码和注释知道该注解用来开启DiscoveryClient的实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import({EnableDiscoveryClientImportSelector.class})
    public @interface EnableDiscoveryClient {
    boolean autoRegister() default true;
    }

  • 搜索DiscoveryClient,发现有一个接口和类,得到如下所示的关系

    • 左边的org.springframework.cloud.client.discovery.DiscoveryClient是Spring Cloud的接口,用来定义发现服务的常用抽象方法,通过该接口可以有效屏蔽服务治理的实现细节,所以使用Spring Cloud构建的微服务应用可以方便切换不同的服务治理框架,而不用改动程序代码,只需要添加一些针对服务治理框架的配置即可。
    • org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是对该接口的实现,从命名来判断,它实现的对Eureka发现服务的封装,EurekaDiscoveryClient 依赖 Netflix Eureka的com.netflix.discovery.EurekaClient接口,该EurekaClient接口又继承了LookupService 接口,它们都是Netflix开源包中的内容,主要定义针对Eureka的发现服务的抽象方法,而真正实现发现服务则是Nerfix包中的DiscoveryClient类。

Eureka Server的URL获取

  • eureka.client.service-url.defaultZone指定了server的url列表,通过查找这个属性可以找到相关的加载属性,我们可以在com.netflix.discovery.endpoint.EndpointUtils类中找到下面的这个函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
    List<String> orderedUrls = new ArrayList();
    String region = getRegion(clientConfig);
    String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
    if (availZones == null || availZones.length == 0) {
    availZones = new String[]{"default"};
    }

    logger.debug("The availability zone for the given region {} are {}", region, availZones);
    int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
    List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
    if (serviceUrls != null) {
    orderedUrls.addAll(serviceUrls);
    }

    int currentOffset = myZoneOffset == availZones.length - 1 ? 0 : myZoneOffset + 1;

    while(currentOffset != myZoneOffset) {
    serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
    if (serviceUrls != null) {
    orderedUrls.addAll(serviceUrls);
    }

    if (currentOffset == availZones.length - 1) {
    currentOffset = 0;
    } else {
    ++currentOffset;
    }
    }

    if (orderedUrls.size() < 1) {
    throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
    } else {
    return orderedUrls;
    }
    }
    • 从上面的函数可以发现,客户端依次加载了二个内容,第一个是Region,第二个是Zone,从加载逻辑上可以判断他们的关系

    • 通过 getRegion函数,从配置中读取了Region返回,所以一个微服务应用只属于一个region,如果不特别配置,默认为default,我们可以通过eureka.client.region来配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static String getRegion(EurekaClientConfig clientConfig) {
    String region = clientConfig.getRegion();
    if (region == null) {
    region = "default";
    }

    region = region.trim().toLowerCase();
    return region;
    }
    • 通过getAvailabilityZones 函数,可以知道当没有特别为Region配置Zone的时候,默认采用defaultZone,这也是我们之前配置参数eureka.client.service-url.defaultZone的由来,若要为应用指定Zone,可以通过eureka.client.availability-zones属性来设置。从return的结果可以知道Zone可以设置多个,并且通过逗号分隔

      1
      2
      3
      4
      5
      6
      7
      8
      public String[] getAvailabilityZones(String region) {
      String value = (String)this.availabilityZones.get(region);
      if (value == null) {
      value = "defaultZone";
      }

      return value.split(",");
      }
    • 由此我们可以判断Region与Zone是一对多的关系, 在获取了region和zone的信息后才开始加载Eureka Server的具体地址,它根据传入的参数,按照一定算法确定加载位于哪一个Zone配置的serviceUrls

      1
      2
      int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
      List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
    • 查看具体的实现类EurekaClientConfigBean, 具体实现了如何解析参数的过程

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      public List<String> getEurekaServerServiceUrls(String myZone) {
      String serviceUrls = (String)this.serviceUrl.get(myZone);
      if (serviceUrls == null || serviceUrls.isEmpty()) {
      serviceUrls = (String)this.serviceUrl.get("defaultZone");
      }

      if (!StringUtils.isEmpty(serviceUrls)) {
      String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
      List<String> eurekaServiceUrls = new ArrayList(serviceUrlsSplit.length);
      String[] var5 = serviceUrlsSplit;
      int var6 = serviceUrlsSplit.length;

      for(int var7 = 0; var7 < var6; ++var7) {
      String eurekaServiceUrl = var5[var7];
      if (!this.endsWithSlash(eurekaServiceUrl)) {
      eurekaServiceUrl = eurekaServiceUrl + "/";
      }

      eurekaServiceUrls.add(eurekaServiceUrl.trim());
      }

      return eurekaServiceUrls;
      } else {
      return new ArrayList();
      }
      }
    • 当使用Ribbon来实现服务调用的时候,对于Zone的设置可以在负载均衡时实现区域亲和特性

      • Ribbon的默认策略会优先访问同一个客户端处于一个Zone中的服务端实例,只有当同一个Zone中没有可用服务端实例才会访问其他Zone中的实例。
    • 通过Zone属性的定义,配合实际部署的物理机构,可以有效地设计出对区域性故障的容错集群。

DiscoveryClient类

  • 查看com.netflix.discovery.DiscoveryClient 类,该类头部的注释的大致内容如下
    • 该类用于帮助于Eureka Server互相协助
    • Eureka Client 负责下面的任务
      • 向 Eureka Server注册服务实例
      • 向 Eureka Server服务租约
      • 当服务关闭期间,向Eureka Server取消租约
      • 查询Eureka Server中的服务实例列表

服务注册

  • 在理解了上面多个注册中心信息的加载后,再看看是如何实现服务注册的行为

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void initScheduledTasks() {
    // ...
    if (this.clientConfig.shouldRegisterWithEureka()) {
    // ...
    this.instanceInfoReplicator = new InstanceInfoReplicator(this,
    this.instanceInfo,
    this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
    // ...
    this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
    logger.info("Not registering with Eureka server per configuration");
    }
    }

  • 上面的if (this.clientConfig.shouldRegisterWithEureka())是一个服务注册相关的判断语句,在分支内,创建了一个InstanceInfoReplicator类的实例,它会执行一个定时任务,该定时任务具体实现看run方法,具体如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public void run() {
    try {
    discoveryClient.refreshInstanceInfo();

    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
    if (dirtyTimestamp != null) {
    // 真正触发调用注册的就是这里
    discoveryClient.register();
    instanceInfo.unsetIsDirty(dirtyTimestamp);
    }
    } catch (Throwable t) {
    logger.warn("There was a problem with the instance info replicator", t);
    } finally {
    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
    scheduledPeriodicRef.set(next);
    }
    }
  • 查看register的实现内容如下

    • instanceInfo对象就是注册时候,客户端给服务端的服务的元数据信息。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
    // 注册操作就是通过REST请求进行的
    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
    throw e;
    }
    if (logger.isInfoEnabled()) {
    logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

服务获取与续约

  • 继续看initScheduledTasks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    private void initScheduledTasks() {
    // 服务获取
    if (clientConfig.shouldFetchRegistry()) {
    // registry cache refresh timer
    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    cacheRefreshTask = new TimedSupervisorTask(
    "cacheRefresh",
    scheduler,
    cacheRefreshExecutor,
    registryFetchIntervalSeconds,
    TimeUnit.SECONDS,
    expBackOffBound,
    new CacheRefreshThread()
    );
    scheduler.schedule(
    cacheRefreshTask,
    registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    // 服务续约和注册任务
    if (clientConfig.shouldRegisterWithEureka()) {
    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

    // Heartbeat timer
    heartbeatTask = new TimedSupervisorTask(
    "heartbeat",
    scheduler,
    heartbeatExecutor,
    renewalIntervalInSecs,
    TimeUnit.SECONDS,
    expBackOffBound,
    new HeartbeatThread()
    );
    scheduler.schedule(
    heartbeatTask,
    renewalIntervalInSecs, TimeUnit.SECONDS);

    // InstanceInfo replicator
    // ....
    } else {
    logger.info("Not registering with Eureka server per configuration");
    }
    }
  • 从源码中可以看到“服务获取”相对于 服务续约和注册任务更为独立,续约与服务注册在同一个if逻辑中,不难理解,服务注册到Eureka Server后,需要一个心跳去续约防止别剔除,所以肯定成对出现。这里renewalIntervalInSecs,expBackOffBound对应的续约的参数

    1
    2
    3
    4
    5
    6
    eureka:
    instance:
    # eureka客户端发送心跳的间隔时间设置为1s,单位秒,默认30s
    lease-renewal-interval-in-seconds: 1
    # eureka客户端接收到心跳后等待时间上限是2s,默认90秒,超时3次就会剔除该服务
    lease-expiration-duration-in-seconds: 2
  • 服务获取的逻辑在单独的if判断中,判断依据就是eureka.client.fetch-registry的参数结果,默认为true。大部分情况我们不需要关心,为了定期的更新客户端的服务清单,保证访问确实健康的服务实例,“服务获取”请求不会只限于服务启动,而是一个定时任务,getRegistryFetchIntervalSeconds()对应的参数就是对应eureka.client.registry-fetch-interval-seconds配置参数,默认是30秒。

  • 查看具体的服务获取和服务续约的具体实现,续约比较简单直接以REST请求的方式进行续约

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private class HeartbeatThread implements Runnable {

    public void run() {
    if (renew()) {
    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
    }
    }
    }
    //------------------

    boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
    REREGISTER_COUNTER.increment();
    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
    long timestamp = instanceInfo.setIsDirtyWithTime();
    boolean success = register();
    if (success) {
    instanceInfo.unsetIsDirty(timestamp);
    }
    return success;
    }
    return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    return false;
    }
    }
  • 服务获取则复杂一些,会根据时候第一次获取发起不同的REST请求和响应的处理,具体的实现逻辑类似。

服务注册中心处理

  • 通过上面的分析,可以看到所有的交互都是通过REST请求来发送的,注册重谢需要对这些请求进行处理,Eureka Server对于各类REST请求的定义都在 com.netflix.eureka.resources包下

  • ApplicationResource中查看服务注册请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    if (this.isBlank(info.getId())) {
    return Response.status(400).entity("Missing instanceId").build();
    } else if (this.isBlank(info.getHostName())) {
    return Response.status(400).entity("Missing hostname").build();
    } else if (this.isBlank(info.getIPAddr())) {
    return Response.status(400).entity("Missing ip address").build();
    } else if (this.isBlank(info.getAppName())) {
    return Response.status(400).entity("Missing appName").build();
    } else if (!this.appName.equals(info.getAppName())) {
    return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
    return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
    return Response.status(400).entity("Missing dataCenterInfo Name").build();
    } else {
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
    String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
    if (this.isBlank(dataCenterInfoId)) {
    boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
    if (experimental) {
    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
    return Response.status(400).entity(entity).build();
    }

    if (dataCenterInfo instanceof AmazonInfo) {
    AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
    String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
    if (effectiveId == null) {
    amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
    }
    } else {
    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
    }
    }
    }
    // 经过对注册信息进行校验后,调用注册函数进行服务注册
    this.registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();
    }
    }

  • 经过对注册信息进行校验后,调用org.springframework.cloud.netflix.eureka.server.InstanceRegistry对象中的register(InstanceInfo, boolean)注册函数进行服务注册

1
2
3
4
5
6
7
8
9
public void register(final InstanceInfo info, final boolean isReplication) {
this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}

private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}
  • 在注册函数中,先调用publishEvent函数,将该新服务注册的事件传播出去,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry 父类中的注册实现(PeerAwareInstanceRegistryImpl),将InstaceInfo中的元数据信息存储在一个ConcurrentHashMap对象中,正如之前所说的,注册中心存储了二层Map结构,第一层key为服务名(InstanceInfo中的appName属性),第二层key为实例名(instanceInfo中的instanceid属性)

    1
    2
    3
    4
    5
    gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);

    // ....

    ((Map)gMap).put(registrant.getId(), lease);

配置详解

  • 从使用的角度对Eureka中的常用内容进行详细的介绍,帮助我们根据自身环境和业务特点进行个性化的配置调整。
  • 在Eureka的服务治理体系中,主要分为服务端(服务注册中心)和客户端(提供接口的微服务应用)二个不同的角色。
  • 当构建了高可用的注册中心后,该集群中所有的微服务应用和后续的一些基础类应用(如配置中心,网关等)都可以视作该体系下的一个微服务(Eureka 客户端)
  • 服务注册中心也是一样,只是高可用环境下的服务注册中心除了作为客户端外,还为集群中的其他客户端提供了服务注册的特殊功能,所以Eureka客户端的配置对象存在于所有Eureka服务治理体系下的应用实例中。
  • 在使用Spring Cloud Eureka 的过程中,我们所做的配置内容几乎都是对Eureka客户端配置进行操作
  • Eureka客户端的配置主要分为以下二个方面
    • 服务注册相关配置信息:包括服务注册中心的地址,服务获取的间隔时间,可用区域等
    • 服务实例相关配置信息:包括服务实例的名称,ip地址,端口号,健康检查路径等
  • Eureka的服务端更多的类似一个现成的产品,大多数情况下不需要修改他的配置信息

服务注册类配置

  • 关于服务注册类的配置信息,可以通过查看org.springframework.cloud.netflix.eureka.EurekaClientConfigBean的源码获取配置内容,这些信息都以eureka.client为前缀

指定注册中心

  • 将一个应用纳入Eureka的服务治理体系中,除了引入 Eureka依赖外,就是在配置文件中指定注册中心,主要通过eureka.client.service-url参数实现,他的定义如下,配置存储在HashMap中,并设置了一组默认值,key为defaultZone,value为http://localhost:8761/eureka/

    1
    2
    3
    4
    private Map<String, String> serviceUrl = new HashMap();

    this.serviceUrl.put("defaultZone", "http://localhost:8761/eureka/");

  • 当构建了高可用的的服务注册中心集群时,我们可以我参数的value值配置多个注册中心地址,使用逗号分隔

  • 为了服务注册中心的安全考虑,很多时候都会为服务中心加入安全校验,这个时候配置serviceUrl时,需要在value值的url中加入相应的安全校验信息,比如

    1
    http://<username>:<password>@localhost:8761/eyreka

其他配置

  • 下面整理了EurekaClientConfigBean中定义的一些配置

    参数名 说明 默认值
    enabled 启用Eureka客户端 true
    registryFetchIntervalSeconds 从Eureka服务端获取注册信息的间隔时间,单位为秒 30
    instanceInfoReplicationIntervalSeconds 更新实例信息的变化到Eureka服务端的间隔时间,单位为秒 30
    initialInstancelnfoReplicationIntervalSeconds 初始化实例信息到Eureka 服务端的间隔时间,单位为秒 40
    eurekaServiceUrlPllIntervalSeconds 轮询Eureka服务端地址更改的间隔时间,单位为秒。当我们与Spring Cloud Config配合,动态刷新Eureka的serviceURL地址时需要关注该参数 300
    curckaServerReadTimeoutSeconds 读取Eureka Server信息的超时时间,单位为秒 8
    eurckaServerConnctTimeoutSeconds 连接Eureka Server的超时时间,单位为秒 5
    eurekaServerTotalConnections 从Eureka客户端到所有Eureka服务端的连接总数 200
    eurekaServerTotalConnectionsPerHost 从Eureka客户端到每个Eureka服务端主机的连接总数 50
    eurekaConnectionldleTimeoutSeconds Eureka服务端连接的空闲关闭时间,单位为秒 30
    heartbeatExecutorThreadPoolSize 心跳连接池的初始化线程数 2
    heartbeatExecutorExponentialBackOfBound 心跳超时重试延迟时间的最大乘数值 10
    cacheRefreshExecutorThreadPoolSize 缓存刷新线程池的初始化线程数 2
    cacheRefreshExecutorExponentialBackOfBound 缓存刷新重试延迟时间的最大乘数值 10
    useDnsForFetchingServiceUrls 使用DNS来获取Eureka服务端的serviceUrl false
    registerWithEureka 是否将自身的实例信息注册到Eureka服务端 true
    preferSameZoneEureka 是否偏好使用处于相同zone的Eureka服务端 true
    filterOnlyUpInstances 获取实例时时候过滤,仅保留UP状态的实例 true
    fetchRegistry 是否从Eureka服务端获取注册信息 true

服务实例类配置

  • 关于服务实例类的配置,可以通过查看org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean的源码获取详细内容,这些配置都以eureka.instance为前缀

元数据

  • 在 org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean 的配置信息中,有一大部分是内容是对服务实例元数据
    的配置

  • 元数据是Eureka客户端在向服务注册中心发送注册请求时,用来描述自身服务信息的对象,包含了一些标准化的元数据,比如服务名称,实例名称,实例IP,实例端口等用于服务治理的重要信息;以及一些用于负载均衡策略或是其他特殊用途的自定义元数据信息。

  • 在使用Spring Cloud Eureka的时候,所有的配置小心都通过 EurekaInstanceConfigBean 进行加载,但在真正进行服务注册的时候,还是会包装层 com.netflix.appinfo.InstanceInfo 对象发送给 Eureka服务端, 这二个类非常相似,通过InstanceInfo 可以了解原生Eureka对元数据的定义,metadata是自定义的元数据信息,其他的成员是标准化的元数据信息,Spring Cloud的EurekaInstanceConfigBean 对原生元数据做了配置优化处理

    1
    2
    @XStreamAlias("metadata")
    private volatile Map<String, String> metadata = new ConcurrentHashMap<String, String>();
  • 我们可以通过eureka.instance.<properties>=<value>的格式对标准化数据进行配置

    • properties是EurekaInstanceConfigBean 对象中的成员变量名

    • 自定义元数据可以通过enreka.instance.metadataMap.<key>=<value>的格式进行配置

      1
      enreka.instance.etadataMap.zone=shanghai

实例名配置

  • 实例名instanceId,它是区分同一服务中不同实例的唯一标识
  • Netflix Eureka的原生实现中,采用主机名作为默认值,这样使得在同一主机上无法启动多个相同的服务实例
  • 在Spring Cloud Eureka的配置中,针对对一主机多启动的实例的情况,采用如下的默认规则${spring.cloud.client.ip-address}:${spring.application.name}:${spring.application.instance_id}:${server.port}
  • 我们可以通过eureka.instance.instance-id参数来进行配置,
  • 在本地进行负载均衡调试的时候,需要启动多个实例,可以在命令行中指定不同的service,port来启动,但是还是略显麻烦,我们可以直接通过设置server.port=0或者使用随机数server.port=${random.int[10000,19999]}来启动一个随机端口

端点配置

  • 在InstanceInfo中,我们可以看到一些url的配置,比如 homePageUrl,statusPageUr,healthCheckUrl,分别代表主页,状态页,健康检查的url,其中状态页和健康检查页在Spring Cloud Eureka中默认使用spring-boot-actuator模块中的infohealth端点

  • 开启了healthcheck功能后,为了保证服务的正常运转,必须保证Eureka客户端的/health端点在发送元数据的时候,是一个能够被注册中心访问到的地址,否则注册中心不会根据应用的健康检查来更改状态

  • info端点如果不正确的话,会导致在Eureka面板中单机服务实例时,无法访问到服务实例提供的信息接口。

  • 大多数情况下不用修改,在一些特殊情况下,如为应用设置了context-path。我们就需要为配置加速前缀

    1
    2
    3
    4
    5
    6
    7
    8
    management:
    server:
    servlet:
    context-path: /resume
    eureka:
    instance:
    status-page-url-path: ${management.server.servlet.context-path}/info
    health-check-url-path: ${management.server.servlet.context-path}/health
  • 有事后为了安全考虑也会修改原始端点的路径,同上修改路径即可,当客户端以https的方式来暴露端点时,相对路径的方式就无法满足要求,所以Spring cloud提供了绝对如今的配置参数

    1
    2
    3
    4
    eureka:
    instance:
    status-page-url: https://${eureka.instance.hostname}/info
    health-check-url: https://${eureka.instance.hostname}/health

健康检查

  • 默认情况下,Eureka中的各个服务实例的健康检查并不是通过spring-boot-actuator模块的、health端点来实现的,而是依靠客户端的心跳方式保持服务实例的存活

  • 在Eureka的服务续约和剔除机制下,客户端的健康状态从注册到注册中心开始都会处于UP状态,除非心跳终止一段时间后,注册中心将其剔除。

  • 默认的心跳方式可以有效检查客户端进程是否工作。但确无法保证应用能够正常提供服务,大多数微服务应用都会有一些外部资源依赖,如数据库,缓存,消息带你等,应用与外部资源无法联通时实际上已经不能提供正常的对外服务,服务调用并不能获得预期的结果。

  • spirng cloud Eureka中可以把健康检查交给 Spring-boot-actuator的health端点,实现更加全面的健康状态维护。

  • 配置

    1. 在依赖中添加依赖

      1
      2
      3
      4
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
      </dependency>
    2. 配置文件中添加参数

      1
      2
      3
      4
      eureka:
      client:
      healthcheck:
      enabled: true
    3. 保证服务注册中心可以正确访问到健康检查端点

其他配置

  • 除了前3个配置参数在需要的时候做调整,其他配置大多数情况下不需要配置,使用默认值即可。

课下问题:

  1. eureka无用。其实只是2.0不更新了。1.0还在更新,也在大量的用。

    1
    https://github.com/Netflix/eureka/wiki

    即使它以后都不用了 eureka 的思路也是值得学习的。毕竟服务注册中心,就这些东西。

  2. lombok使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- lombok -->
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.8</version>
    </dependency>

    ide安装插件

    getter/setter
  3. 域名问题

    1
    域名在 物理机的host文件配置,只是为了好区分,不是必须的。只要能访问到机器就行,用localhost,ip均可。
  4. 多节点注意事项

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    问题:eureka server间 设置peer。A->B,B->C,C->A,结果注册信息并不同步。
    看例子:
    依次启动7901,7902,7903。
    启动成功,注册api-driver ->7901
    发现只有7901和7902有 api-driver而 7903没有。

    简单说:api-driver向 7901注册,7902将api-driver同步到7902,但是不会同步到7903。后面源码会讲到。
    多节点建议:设置A->B,A->C其他类似。尽量不要跨 eureka节点。一对多,面面对到。

    讲解下图。前置概念peer。清除流程。

    功能点:
    peer启动:
    1、拉取它的peer的注册表。
    2、把自己注册到peer上。
    3、完成2之后,2中的peer会把它同步到,2中peer的peer。

    《eureka集群复制流程图》,为什么有时候3个实例,后来都变成2个实例了。

  5. yml配置文件分段。

  6. 可以独立使用,利用它的各种端点做开发,甚至可以自己做个服务注册中心。

Eureka 原理

  1. 本质:存储了每个客户端的注册信息。EurekaClient从EurekaServer同步获取服务注册列表。通过一定的规则选择一个服务进行调用。

  2. Eureka架构图

    《 Eureka架构图》

  3. 详解

  • 服务提供者:是一个eureka client,向Eureka Server注册和更新自己的信息,同时能从Eureka Server注册表中获取到其他服务的信息。
  • 服务注册中心:提供服务注册和发现的功能。每个Eureka Cient向Eureka Server注册自己的信息,也可以通过Eureka Server获取到其他服务的信息达到发现和调用其他服务的目的。
  • 服务消费者:是一个eureka client,通过Eureka Server获取注册到其上其他服务的信息,从而根据信息找到所需的服务发起远程调用。
  • 同步复制:Eureka Server之间注册表信息的同步复制,使Eureka Server集群中不同注册表中服务实例信息保持一致。
  • 远程调用:服务客户端之间的远程调用。
  • 注册:Client端向Server端注册自身的元数据以供服务发现。
  • 续约:通过发送心跳到Server以维持和更新注册表中服务实例元数据的有效性。当在一定时长内,Server没有收到Client的心跳信息,将默认服务下线,会把服务实例的信息从注册表中删除。
  • 下线:Client在关闭时主动向Server注销服务实例元数据,这时Client的服务实例数据将从Server的注册表中删除。
  • 获取注册表:Client向Server请求注册表信息,用于服务发现,从而发起服务间远程调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
让我们自己做:如何做。?

客户端:
拉取注册表
从注册表选一个
调用

服务端:
写个web server。
功能:
1、定义注册表:
Map<name,Map<id,InstanceInfo>>。
2、别人可以向我注册自己的信息。
3、别人可以从我这里拉取他人的注册信息。
4、我和我的同类可以共享注册表。

eureka是用:jersey实现,也是个mvc框架。
我们可以自己写个spring boot web实现。

Eureka Client源码

  1. 功能复习

    1
    2
    3
    4
    5
    6
    https://github.com/Netflix/eureka/wiki/Eureka-REST-operations
    注意地址中的v2 是没有的。

    查询所有实例信息:http://localhost:7900/eureka/apps

    注册服务:http://localhost:7900/eureka/apps/{applicationName}

    《Eureka Client工作流程图》

  2. 源码解读

    下面的讲解按照顺序进行。

    • spring boot项目引入eureka-client依赖,并注入spring 容器。

      在spring-boot项目中pom文件里面添加的依赖中的bean。是如何注册到spring-boot项目的spring容器中的呢?spring.factories文件是帮助spring-boot项目包以外的bean(即在pom文件中添加依赖中的bean)注册到spring-boot项目的spring容器的。

      由于@ComponentScan注解只能扫描spring-boot项目包内的bean并注册到spring容器中,因此需要@EnableAutoConfiguration(在SpringBootApplication下),注解来注册项目包外的bean。而spring.factories文件,则是用来记录项目包外需要注册的bean类名。

      点进去@SpringBootApplication注解,发现@EnableAutoConfiguration。点@EnableAutoConfiguration进去。

      1
      2
      @Import(AutoConfigurationImportSelector.class)
      public @interface EnableAutoConfiguration {

      点AutoConfigurationImportSelector进去

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      发现下面代码
      @Override
      public String[] selectImports(AnnotationMetadata annotationMetadata) {
      if (!isEnabled(annotationMetadata)) {
      return NO_IMPORTS;
      }
      AutoConfigurationMetadata autoConfigurationMetadata = AutoConfigurationMetadataLoader
      .loadMetadata(this.beanClassLoader);
      AutoConfigurationEntry autoConfigurationEntry = getAutoConfigurationEntry(autoConfigurationMetadata,
      annotationMetadata);
      return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());
      }

      此方法时,向spring ioc容器注入bean。selectImports,返回bean全名。import将bean全名注入。而注入的bean都是些什么呢?

      点:getAutoConfigurationEntry进去,有一句

      1
      List<String> configurations = getCandidateConfigurations(annotationMetadata, attributes);

      点getCandidateConfigurations进去:

      1
      2
      3
      List<String> configurations = SpringFactoriesLoader.loadFactoryNames(getSpringFactoriesLoaderFactoryClass(),
      getBeanClassLoader());

      点SpringFactoriesLoader进去:

      1
      public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";
    • 找eureka client 配置相关类

      1
      2
      3
      4
      5
      6
      7
      在api-listen-order(其他eureka client项目均可)项目中,找到
      spring-cloud-netflix-eureka-client-2.1.2.RELEASE下META-INF下spring.factories。此文件中,有如下配置信息:


      EurekaClientAutoConfiguration(Eureka client自动配置类,负责Eureka client中关键beans的配置和初始化),
      RibbonEurekaAutoConfiguration(Ribbon负载均衡相关配置)
      EurekaDiscoveryClientConfiguration(配置自动注册和应用的健康检查器)。
    • EurekaDiscoveryClientConfiguration介绍

      1
      找到此类:org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration中的注解@ConditionalOnClass(EurekaClientConfig.class),
    • EurekaClientConfig介绍

      1
      2
      3
      4
      点击进去查看EurekaClientConfig是个接口,查看其实现类EurekaClientConfigBean。此类里封装了Eureka Client和Eureka Server交互所需要的配置信息。看此类代码:

      public static final String PREFIX = "eureka.client";
      表示在配置文件中用eureka.client.属性名配置。
    • Eureka 实例相关配置

      1
      2
      3
      4
      5
      6
      7
      从org.springframework.cloud.client.discovery.DiscoveryClient顶级接口入手,前面介绍过spring common。看其在Eureka中的实现类org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient。有一个属性:
      private final EurekaClient eurekaClient,查看其实现类:com.netflix.discovery.DiscoveryClient。
      有一个属性:
      private final ApplicationInfoManager applicationInfoManager(应用信息管理器,点进去此类,发现此类总有两个属性:
      private InstanceInfo instanceInfo;
      private EurekaInstanceConfig config;
      服务实例的信息类InstanceInfo和服务实例配置信息类EurekaInstanceConfig)。
    • InstanceInfo介绍

      1
      2
      打开InstanceInfo里面有instanceId等服务实例信息。
      InstanceInfo封装了将被发送到Eureka Server进行注册的服务实例元数据。它在Eureka Server列表中代表一个服务实例,其他服务可以通过instanceInfo了解到该服务的实例相关信息,包括地址等,从而发起请求。
    • EurekaInstanceConfig介绍

      1
      2
      3
      EurekaInstanceConfig是个接口,找到它的实现类org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean。
      此类封装了EurekaClient自身服务实例的配置信息,主要用于构建InstanceInfo。看到此类有一段代码:@ConfigurationProperties("eureka.instance"),
      在配置文件中用eureka.instance.属性配置。EurekaInstanceConfigBean提供了默认值。
    • 通过EurekaInstanceConfig构建instanceInfo

      1
      2
      3
      4
      在ApplicationInfoManager中有一个方法
      public void initComponent(EurekaInstanceConfig config)中有一句:
      this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();
      通过EurekaInstanceConfig构造instanceInfo。
    • 顶级接口DiscoveryClient介绍

      1
      2
      3
      4
      5
      6
      7
      8
      介绍一下spring-cloud-commons-2.2.1.realease包下,org.springframework.cloud.client.discovery.DiscoveryClient接口。定义用来服务发现的客户端接口,是客户端进行服务发现的核心接口,是spring cloud用来进行服务发现的顶级接口,在common中可以看到其地位。在Netflix Eureka和Consul中都有具体的实现类。
      org.springframework.cloud.client.discovery.DiscoveryClient的类注释:
      Represents read operations commonly available to discovery services such as Netflix Eureka or consul.io。
      代表通用于服务发现的读操作,例如在 eureka或consul中。

      String description();//获取实现类的描述。
      List<String> getServices();//获取所有服务实例id。
      List<ServiceInstance> getInstances(String serviceId);//通过服务id查询服务实例信息列表。
    • Eureka 的实现

      1
      2
      3
      4
      接下来我们找Eureka的实现类。org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient。
      查看方法。
      public List<ServiceInstance> getInstances(String serviceId),
      组合了com.netflix.discovery.EurekaClient来实现。
    • EurekaClient的实现

      1
      2
      3
      4
      5
      6
      EurekaClient有一个注解@ImplementedBy(DiscoveryClient.class),此类的默认实现类:com.netflix.discovery.DiscoveryClient。提供了:
      服务注册到server方法register().
      续约boolean renew().
      下线public synchronized void shutdown().
      查询服务列表 功能。
      想想前面的图中client的功能。提供了于Eureka Server交互的关键逻辑。
    • com.netflix.discovery.DiscoveryClient

      1
      com.netflix.discovery.DiscoveryClient实现了EurekaClient(继承了LookupService)
    • com.netflix.discovery.shared.LookupService

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      LookupService作用:发现活跃的服务实例。
      根据服务实例注册的appName来获取封装有相同appName的服务实例信息容器:
      Application getApplication(String appName)。
      获取所有的服务实例信息:
      Applications getApplications();
      根据实例id,获取服务实例信息:
      List<InstanceInfo> getInstancesById(String id);

      上面提到一个Application,它持有服务实例信息列表。它是同一个服务的集群信息。比如api-passenger的所有服务信息,这些服务都在api-passenger服务名下面。

      而instanceInfo代表一个服务实例的信息。为了保证原子性,比如对某个instanceInfo的操作,使用了大量同步的代码。比如下面代码:
      public void addInstance(InstanceInfo i) {
      instancesMap.put(i.getId(), i);
      synchronized (instances) {
      instances.remove(i);
      instances.add(i);
      isDirty = true;
      }
      }

      Applications是注册表中,所有服务实例信息的集合。
    • 健康检测器和事件监听器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      EurekaClient在LookupService上做了扩充。提供了更丰富的获取服务实例的方法。按住不表。我们看一下另外两个方法:

      public void registerHealthCheck(HealthCheckHandler healthCheckHandler),向client注册 健康检查处理器,client存在一个定时任务通过HealthCheckHandler检查当前client状态,当client状态发生变化时,将会触发新的注册事件,去更新eureka server的注册表中的服务实例信息。
      通过HealthCheckHandler 实现应用状态检测。HealthCheckHandler的实现类org.springframework.cloud.netflix.eureka.EurekaHealthCheckHandler,看其构造函数:
      public EurekaHealthCheckHandler(HealthAggregator healthAggregator) {
      Assert.notNull(healthAggregator, "HealthAggregator must not be null");
      this.healthIndicator = new CompositeHealthIndicator(healthAggregator);
      }
      private final CompositeHealthIndicator healthIndicator;此类事属于org.springframework.boot.actuate.health包下,可以得出,是通过actuator来实现对应用的检测的。

      public void registerEventListener(EurekaEventListener eventListener)注册事件监听器,当实例信息有变时,触发对应的处理事件。
    • 找到com.netflix.discovery.DiscoveryClient

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      在api-listen-order项目中,找到spring-cloud-netflix-eureka-client-2.1.2.RELEASE下META-INF下spring.factories。此文件中org.springframework.cloud.bootstrap.BootstrapConfiguration=\
      org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration,此类有个注解:
      @Import({ EurekaDiscoveryClientConfiguration.class, // this emulates
      // @EnableDiscoveryClient, the import
      // selector doesn't run before the
      // bootstrap phase
      EurekaClientAutoConfiguration.class })
      注解中有个类: EurekaClientAutoConfiguration,此类中有如下代码:
      CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
      config, this.optionalArgs, this.context);
      (debug可以调试到)
      通过CloudEurekaClient找到:public class CloudEurekaClient extends DiscoveryClient。

    • com.netflix.discovery.DiscoveryClient构造函数-不注册不拉取

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      DiscoveryClient的构造函数:
      DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer)
      此方法中依次执行了 从eureka server中拉取注册表,服务注册,初始化发送心跳,缓存刷新(定时拉取注册表信息),按需注册定时任务等,贯穿了Eureka Client启动阶段的各项工作。

      构造函数353行:
      if (config.shouldFetchRegistry()) {
      this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
      } else {
      this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
      }
      shouldFetchRegistry,点其实现类EurekaClientConfigBean,找到它其实对应于:eureka.client.fetch-register,true:表示client从server拉取注册表信息。

      下面:
      if (config.shouldRegisterWithEureka()) {
      this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
      } else {
      this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
      }
      shouldRegisterWithEureka,点其实现类EurekaClientConfigBean,找到它其实对应于:
      eureka.client.register-with-eureka:true:表示client将注册到server。

      if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
      如果以上两个都为false,则直接返回,构造方法执行结束,既不服务注册,也不服务发现。

    • com.netflix.discovery.DiscoveryClient构造函数-两个定时任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      顺着上面代码往下看:
      scheduler = Executors.newScheduledThreadPool(2,
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-%d")
      .setDaemon(true)
      .build());
      定义了一个基于线程池的定时器线程池,大小为2。
      往下:
      heartbeatExecutor:用于发送心跳,
      cacheRefreshExecutor:用于刷新缓存。
    • com.netflix.discovery.DiscoveryClient构造函数-client和server交互的Jersey客户端

      1
      2
      接着构建eurekaTransport = new EurekaTransport();它是eureka Client和eureka server进行http交互jersey客户端。点开EurekaTransport,看到许多httpclient相关的属性。

    • com.netflix.discovery.DiscoveryClient构造函数-拉取注册信息

      1
      2
      3
      4
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
      fetchRegistryFromBackup();
      }
      如果判断的前部分为true,执行后半部分fetchRegistry。此时会从eureka server拉取注册表中的信息,将注册表缓存到本地,可以就近获取其他服务信息,减少于server的交互。
    • com.netflix.discovery.DiscoveryClient构造函数-服务注册

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
      try {
      if (!register() ) {
      throw new IllegalStateException("Registration error at startup. Invalid server response.");
      }
      } catch (Throwable th) {
      logger.error("Registration error at startup: {}", th.getMessage());
      throw new IllegalStateException(th);
      }
      }注册失败抛异常。
    • com.netflix.discovery.DiscoveryClient构造函数-启动定时任务

      1
      在构造方法的最后initScheduledTasks();此方法中,启动3个定时任务。方法内有statusChangeListener,按需注册是一个事件StatusChangeEvent,状态改变,则向server注册。
    • com.netflix.discovery.DiscoveryClient构造函数-总结

      1
      2
      3
      4
      5
      6
      总结DiscoveryClient构造关键过程:
      初始化一堆信息。
      从拉取注册表信息。
      向server注册自己。
      初始化3个任务。
      详细后面继续讲。源码就是这样,得层层拨开。
    • 拉取注册表信息详解

      1
      2
      3
      4
      5
      6
      7
      8
      9
      上面的fetchRegistry(false),点进去,看注释:
      // If the delta is disabled or if it is the first time, get all applications。
      如果增量式拉取被禁止或第一次拉取注册表,则进行全量拉取:getAndStoreFullRegistry()。
      否则进行增量拉取注册表信息getAndUpdateDelta(applications)。
      一般情况,在Eureka client第一次启动,会进行全量拉取。之后的拉取都尽量尝试只进行增量拉取。

      拉取服务注册表:
      全量拉取:getAndStoreFullRegistry();
      增量拉取:getAndUpdateDelta(applications);
    • 全量拉取

      1
      2
      3
      4
      5
      6
      7
      进入getAndStoreFullRegistry() 方法,有一方法:eurekaTransport.queryClient.getApplications。
      通过debug发现 实现类是AbstractJerseyEurekaHttpClient,点开,debug出
      webResource地址为:http://root:root@eureka-7900:7900/eureka/apps/,此端点用于获取server中所有的注册表信息。
      getAndStoreFullRegistry()可能被多个线程同时调用,导致新拉取的注册表被旧的覆盖(如果新拉取的动作设置apps阻塞的情况下)。
      此时用了AutomicLong来进行版本管理,如果更新时版本不一致,不保存apps。
      通过这个判断fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1),如果版本一致,并设置新版本(+1),
      接着执行localRegionApps.set(this.filterAndShuffle(apps));过滤并洗牌apps。点开this.filterAndShuffle(apps)实现,继续点apps.shuffleAndIndexInstances,继续点shuffleInstances,继续点application.shuffleAndStoreInstances,继续点_shuffleAndStoreInstances,发现if (filterUpInstances && InstanceStatus.UP != instanceInfo.getStatus())。只保留状态为UP的服务。
    • 增量拉取

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      回到刚才的fetchRegistry方法中,getAndUpdateDelta,增量拉取。通过getDelta方法,看到实际拉取的地址是:apps/delta,如果获取到的delta为空,则全量拉取。
      通常来讲是3分钟之内注册表的信息变化(在server端判断),获取到delta后,会更新本地注册表。
      增量式拉取是为了维护client和server端 注册表的一致性,防止本地数据过久,而失效,采用增量式拉取的方式,减少了client和server的通信量。
      client有一个注册表缓存刷新定时器,专门负责维护两者之间的信息同步,但是当增量出现意外时,定时器将执行,全量拉取以更新本地缓存信息。更新本地注册表方法updateDelta,有一个细节。
      if (ActionType.ADDED.equals(instance.getActionType())) ,public enum ActionType {
      ADDED, // Added in the discovery server
      MODIFIED, // Changed in the discovery server
      DELETED
      // Deleted from the discovery server
      },
      在InstanceInfo instance中有一个instance.getActionType(),ADDED和MODIFIED状态的将更新本地注册表applications.addApplication,DELETED将从本地剔除掉existingApp.removeInstance(instance)。
    • 服务注册

      1
      2
      3
      好了拉取完eureka server中的注册表了,接着进行服务注册。回到DiscoveryClient构造函数。
      拉取fetchRegistry完后进行register注册。由于构造函数开始时已经将服务实例元数据封装好了instanceInfo,所以此处之间向server发送instanceInfo,
      通过方法httpResponse = eurekaTransport.registrationClient.register(instanceInfo);看到String urlPath = "apps/" + info.getAppName();又是一个server端点,退上去f7,httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();204状态码,则注册成功。
    • 初始化3个定时任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      接着
      // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
      initScheduledTasks();看注释初始化3个定时任务。
      题外话:
      client会定时向server发送心跳,维持自己服务租约的有效性,用心跳定时任务实现;
      而server中会有不同的服务实例注册进来,一进一出,就需要数据的同步。所以client需要定时从server拉取注册表信息,用缓存定时任务实现;
      client如果有变化,也会及时更新server中自己的信息,用按需注册定时任务实现。

      就是这三个定时任务。

      进 initScheduledTasks()方法中,clientConfig.shouldFetchRegistry(),
      从server拉取注册表信息。
      int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds()拉取的时间间隔,eureka.client.registry-fetch-interval-seconds进行设置。

      int renewalIntervalInSecs = nstanceInfo.getLeaseInfo().getRenewalIntervalInSecs();心跳定时器,默认30秒。

      心跳定时任务和缓存刷新定时任务是有scheduler 的 schedule提交的,鼠标放到scheduler上,看到一句话 A scheduler to be used for the following 3 tasks:- updating service urls- scheduling a TimedSuperVisorTask。
      知道循环逻辑是由TimedSuperVisorTask实现的。
      new TimedSupervisorTask(
      "heartbeat",
      scheduler,
      heartbeatExecutor,
      renewalIntervalInSecs,
      TimeUnit.SECONDS,
      expBackOffBound,
      new HeartbeatThread()看到HeartbeatThread线程。
      点进去public void run() {
      if (renew()) {
      lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
      }
      }
      里面是renew()方法。

      scheduler.schedule(
      new TimedSupervisorTask(
      "cacheRefresh",
      scheduler,
      cacheRefreshExecutor,
      registryFetchIntervalSeconds,
      TimeUnit.SECONDS,
      expBackOffBound,
      new CacheRefreshThread()
      ),
      看到CacheRefreshThread,进去,发现 class CacheRefreshThread implements Runnable {
      public void run() {
      refreshRegistry();
      }
      }是用的refreshRegistry,进去发现fetchRegistry。回到原来讲过的地方。

      boolean renew() {
      EurekaHttpResponse<InstanceInfo> httpResponse;
      try {
      httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
      logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
      if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
      REREGISTER_COUNTER.increment();
      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
      long timestamp = instanceInfo.setIsDirtyWithTime();
      boolean success = register();
      if (success) {
      instanceInfo.unsetIsDirty(timestamp);
      }
      return success;
      }
      return httpResponse.getStatusCode() == Status.OK.getStatusCode();
      } catch (Throwable e) {
      logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
      return false;
      }
      }看到如果遇到404,server没有此实例,则重新发起注册。如果续约成功返回 200.
      点sendHeartBeat进去String urlPath = "apps/" + appName + '/' + id;

      还有一个定时任务,按需注册。当instanceinfo和status发生变化时,需要向server同步,去更新自己在server中的实例信息。保证server注册表中服务实例信息的有效和可用。
      // InstanceInfo replicator
      instanceInfoReplicator = new InstanceInfoReplicator(
      this,
      instanceInfo,
      clientConfig.getInstanceInfoReplicationIntervalSeconds(),
      2); // burstSize

      statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
      @Override
      public String getId() {
      return "statusChangeListener";
      }

      @Override
      public void notify(StatusChangeEvent statusChangeEvent) {
      if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
      InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
      // log at warn level if DOWN was involved
      logger.warn("Saw local status change event {}", statusChangeEvent);
      } else {
      logger.info("Saw local status change event {}", statusChangeEvent);
      }
      instanceInfoReplicator.onDemandUpdate();
      }
      };
      if (clientConfig.shouldOnDemandUpdateStatusChange()) {
      applicationInfoManager.registerStatusChangeListener(statusChangeListener);
      }
      instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

      此定时任务有2个部分,
      1:定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册。InstanceInfoReplicator点进去。看到一个方法
      public void run() {
      try {
      discoveryClient.refreshInstanceInfo();//刷新instanceinfo。
      //如果实例信息有变,返回数据更新时间。
      Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
      if (dirtyTimestamp != null) {
      discoveryClient.register();//注册服务实例。
      instanceInfo.unsetIsDirty(dirtyTimestamp);
      }
      } catch (Throwable t) {
      logger.warn("There was a problem with the instance info replicator", t);
      } finally {
      //延时执行下一个检查任务。用于再次调用run方法,继续检查服务实例信息和状态的变化。
      Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
      scheduledPeriodicRef.set(next);
      }
      }

      refreshInstanceInfo点进去,看方法注释:如果有变化,在下次心跳时,同步向server。

      2.注册状态改变监听器,在应用状态发生变化时,刷新服务实例信息,在服务实例信息发生改变时向server注册。 看这段
      statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
      @Override
      public String getId() {
      return "statusChangeListener";
      }
      @Override
      public void notify(StatusChangeEvent statusChangeEvent) {
      if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
      InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
      // log at warn level if DOWN was involved
      logger.warn("Saw local status change event {}", statusChangeEvent);
      } else {
      logger.info("Saw local status change event {}", statusChangeEvent);
      }
      instanceInfoReplicator.onDemandUpdate();
      }
      };如果状态发生改变,调用onDemandUpdate(),点onDemandUpdate进去,看到InstanceInfoReplicator.this.run();

      总结:两部分,一部分自己去检查,一部分等待状态监听事件。

      初始化定时任务完成,最后一步启动步骤完成。接下来就是正常服务于业务。然后消亡。
  • 服务下线

    1
    2
    3
    4
    5
    6
    服务下线:在应用关闭时,client会向server注销自己,在Discoveryclient销毁前,会执行下面清理方法。
    @PreDestroy
    @Override
    public synchronized void shutdown() ,看此方法上有一个注解,表示:在销毁前执行此方法。unregisterStatusChangeListener注销监听器。cancelScheduledTasks取消定时任务。unregister服务下线。eurekaTransport.shutdown关闭jersy客户端 等。

    unregister点进去。cancel点进去。AbstractJerseyEurekaHttpClient。String urlPath = "apps/" + appName + '/' + id;看到url和http请求delete方法。
    • client源码总结

      1
      2
      3
      4
      总结:源码其实两部分内容:
      1、client自身的操作。
      2、server的配合。(https://github.com/Netflix/eureka/wiki/Eureka-REST-operations)。
      一切尽在:《Eureka Client工作流程图》

Eureka Server源码

  1. Eureka Server功能复习

    接受服务注册
    接受服务心跳
    服务剔除
    服务下线
    集群同步
    获取注册表中服务实例信息

    需要注意的是,Eureka Server同时也是一个Eureka Client,在不禁止Eureka Server的客户端行为时,它会向它配置文件中的其他Eureka Server进行拉取注册表、服务注册和发送心跳等操作。

  2. 源码解读

    • 启动server注册相关bean

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      注册外部的配置类
      spring-cloud-netflix-eureka-server-2.1.2.REALEASE.jar

      META-INF/spring.factories

      org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
      启动时会自动加载:EurekaServerAutoConfiguration
      功能:向spring的bean工厂添加eureka-server相关功能的bean。

      但是EurekaServerAutoConfiguration的生效时有条件的。
      EurekaServerAutoConfiguration上有一个注解:@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class),意思是:只有在Spring容器里有Marker这个类的实例时,才会加载EurekaServerAutoConfiguration,这个就是控制是否开启Eureka Server的关键。
    • 开启eureka server

      1
      2
       开关:
      而在@EnableEurekaServer中,@Import(EurekaServerMarkerConfiguration.class),意思是:动态注入此bean到spring 容器。引入了EurekaServerMarkerConfiguration.class。所以开启了Server服务。所以注册了前面说的:EurekaServerAutoConfiguration
    • 开启注册

      1
      2
      3
      4
      5
      在EurekaServerMarkerConfiguration上有@Import(EurekaServerInitializerConfiguration.class),导入了EurekaServerInitializerConfiguration,
      EurekaServerInitializerConfiguration
      implements ServletContextAware, SmartLifecycle,SmartLifecycle的作用是:初始化完之后,
      执行public void start()方法。

在public void start()中,启动一个线程。看注释:log.info(“Started Eureka Server”);发布事件:publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())),
告诉client,可以来注册了。

1
2
3
4
5
上面提到的 log.info("Started Eureka Server") 的上面一行。eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
点contextInitialized进去,看到initEurekaServerContext,初始化eureka 上下文,点initEurekaServerContext进去,看到
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();从相邻的eureka 节点复制注册表,
下一行openForTraffic(主要是和client 交换信息,traffic),查看实现,PeerAwareInstanceRegistryImpl,开启任务postInit,进去之后发现剔除功能(剔除 没有续约的服务)。postInit,点进去,发现new EvictionTask(),点进去,看到run方法中,evict(compensationTimeMs),点进去就到了,具体剔除逻辑,下面剔除的时候讲。
1
2
- PeerAwareInstanceRegistry接口

在EurekaServerAutoConfiguration中 有 public EurekaServerContext eurekaServerContext,中有DefaultEurekaServerContext,点进去找到
@PostConstruct
@Override
public void initialize() {
logger.info(“Initializing …”);
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info(“Initialized”);
},其中peerEurekaNodes.start();启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息。registry.init(peerEurekaNodes);鼠标放在registry上,发现是PeerAwareInstanceRegistryImpl , 的 注册信息管理类里面的init方法。PeerAwareInstanceRegistry是个接口,实现类是:PeerAwareInstanceRegistryImpl。PeerAwareInstanceRegistry接口,实现了com.netflix.eureka.registry.InstanceRegistry。

1
2
- 服务实例注册表

Server是围绕注册表管理的。有两个InstanceRegistry。
com.netflix.eureka.registry.InstanceRegistry是euraka server中注册表管理的核心接口。职责是在内存中管理注册到Eureka Server中的服务实例信息。实现类有PeerAwareInstanceRegistryImpl。

org.springframework.cloud.netflix.eureka.server.InstanceRegistry对PeerAwareInstanceRegistryImpl进行了继承和扩展,使其适配Spring cloud的使用环境,主要的实现由PeerAwareInstanceRegistryImpl提供。

com.netflix.eureka.registry.InstanceRegistry extends LeaseManager, LookupService 。LeaseManager是对注册到server中的服务实例租约进行管理。LookupService是提供服务实例的检索查询功能。

LeaseManager接口的作用是对注册到Eureka Server中的服务实例租约进行管理,方法有:服务注册,下线,续约,剔除。此接口管理的类目前是InstanceInfo。InstanceInfo代表服务实例信息。

PeerAwareInstanceRegistryImpl 增加了对peer节点的同步复制操作。使得eureka server集群中注册表信息保持一致。

1
2
3
4
- 接受服务注册

> 《eureka服务端注册》

我们学过Eureka Client在发起服务注册时会将自身的服务实例元数据封装在InstanceInfo中,然后将InstanceInfo发送到Eureka Server。Eureka Server在接收到Eureka Client发送的InstanceInfo后将会尝试将其放到本地注册表中以供其他Eureka Client进行服务发现。
我们学过:通过 eureka/apps/{服务名}注册

在EurekaServerAutoConfiguration中定义了 public FilterRegistrationBean jerseyFilterRegistration ,表名了 表明eureka-server使用了Jersey实现 对外的 restFull接口。注册一个 Jersey 的 filter ,配置好相应的Filter 和 url映射。


1

public javax.ws.rs.core.Application jerseyApplication(方法:中。
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
添加一些过滤器,类似于过滤请求地址,Path类似于@RequestMapping,Provider类似于@Controller

1
2
------

在com.netflix.eureka.resources包下,是Eureka Server对于Eureka client的REST请求的定义。看ApplicationResource类(这是一类请求,应用类的请求),类似于应用@Controller注解:@Produces({“application/xml”, “application/json”}),接受xml和json。见名识意 public Response addInstance。添加实例instanceinfo。 方法中,有一句:
registry.register(info, “true”.equals(isReplication));鼠标放在registry上PeerAwareInstanceRegistry接口,点击void register方法。发现 是PeerAwareInstanceRegistryImpl 的方法:public void register(final InstanceInfo info, final boolean isReplication) ,中有一句:super.register(info, leaseDuration, isReplication);
进入下面正题:
com.netflix.eureka.registry.AbstractInstanceRegistry
register方法

在register中,服务实例的InstanceInfo保存在Lease中,Lease在AbstractInstanceRegistry中统一通过ConcurrentHashMap保存在内存中。在服务注册过程中,会先获取一个读锁,防止其他线程对registry注册表进行数据操作,避免数据的不一致。然后从resgitry查询对应的InstanceInfo租约是否已经存在注册表中,根据appName划分服务集群,使用InstanceId唯一标记服务实例。如果租约存在,比较两个租约中的InstanceInfo的最后更新时间lastDirtyTimestamp,保留时间戳大的服务实例信息InstanceInfo。如果租约不存在,意味这是一次全新的服务注册,将会进行自我保护的统计,创建新的租约保存InstanceInfo。接着将租约放到resgitry注册表中。
之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将InstanceInfo加入用于统计Eureka Client增量式获取注册表信息的recentlyChangedQueue和失效responseCache中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁并完成服务注册。

1
2
3
4
5
6


- 接受心跳 续租,renew

> 《Eureka服务端接收心跳》

在Eureka Client完成服务注册之后,它需要定时向Eureka Server发送心跳请求(默认30秒一次),维持自己在Eureka Server中租约的有效性。

看另一类请求com.netflix.eureka.resources.InstanceResource。下public Response renewLease(方法。看到一行boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
点击renew的实现。
进入下面正题:

1

Eureka Server处理心跳请求的核心逻辑位于AbstractInstanceRegistry#renew方法中。renew方法是对Eureka Client位于注册表中的租约的续租操作,不像register方法需要服务实例信息,仅根据服务实例的服务名和服务实例id即可更新对应租约的有效时间。
com.netflix.eureka.registry.AbstractInstanceRegistry
renew
//根据appName获取服务集群的租约集合
Map<String, Lease> gMap = registry.get(appName);
//查看服务实例状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
//统计每分钟续租次数
renewsLastMin.increment();
//更新租约
leaseToRenew.renew();

此方法中不关注InstanceInfo,仅关注于租约本身以及租约的服务实例状态。如果根据服务实例的appName和instanceInfoId查询出服务实例的租约,并且根据#getOverriddenInstanceStatus方法得到的instanceStatus不为InstanceStatus.UNKNOWN,那么更新租约中的有效时间,即更新租约Lease中的lastUpdateTimestamp,达到续约的目的;如果租约不存在,那么返回续租失败的结果。

1
2
- 服务剔除

如果Eureka Client在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈,所以需要服务剔除此方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期租约剔除。

剔除是定时任务,默认60秒执行一次。延时60秒,间隔60秒
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());

从上面eureka server启动来看,剔除的任务,是线程启动的,执行的是下面的方法。
com.netflix.eureka.registry.AbstractInstanceRegistry
evict

判断是否开启自我保护
if (!isLeaseExpirationEnabled()) {
如果开启自我保护,不剔除。点进去isLeaseExpirationEnabled,查看实现类,有一个isSelfPreservationModeEnabled,点进去 @Override
public boolean isSelfPreservationModeEnabled() {
return serverConfig.shouldEnableSelfPreservation();
},发现EurekaServerConfig,的方法shouldEnableSelfPreservation,看其实现中有EurekaServerConfigBean,发现属性:enableSelfPreservation。

紧接着一个大的for循环,便利注册表register,依次判断租约是否过期。一次性获取所有的过期租约。

//获取注册表租约总数
int registrySize = (int) getLocalRegistrySize();
计算注册表租约的阈值 (总数乘以 续租百分比),得出要续租的数量
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());

总数减去要续租的数量,就是理论要剔除的数量
int evictionLimit = registrySize - registrySizeThreshold;

//求 上面理论剔除数量,和过期租约总数的最小值。就是最终要提出的数量。
int toEvict = Math.min(expiredLeases.size(), evictionLimit);

然后剔除。用internalCancel(appName, id, false);执行 服务下线将服务从注册表清除掉。

剔除的限制:1.自我保护期间不清除。2.分批次清除。

1

3.服务是逐个随机剔除,剔除均匀分布在所有应用中,防止在同一时间内同一服务集群中的服务全部过期被剔除,造成在大量剔除服务时,并在进行自我保护时,促使程序崩溃。

1
2
3
4
5
6
7
EurekaServerInitializerConfiguration的 eurekaServerBootstrap.contextInitialized(方法,中initEurekaServerContext();点进去this.registry.openForTraffic(this.applicationInfoManager, registryCount);点进去,super.postInit();点进去evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
发现 定时任务。


剔除服务是个定时任务,用EvictionTask执行,默认60秒执行一次,延时60秒执行。定时剔除过期服务。

服务剔除将会遍历registry注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阀值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数量减去当前注册表租约阀值),分批次剔除过期的服务实例租约。对过期的服务实例租约调用AbstractInstanceRegistry#internalCancel服务下线的方法将其从注册表中清除掉。

1
2
3
4
5
6
7
8
9

​ 自我保护机制主要在Eureka Client和Eureka Server之间存在网络分区的情况下发挥保护作用,在服务器端和客户端都有对应实现。假设在某种特定的情况下(如网络故障),Eureka Client和Eureka Server无法进行通信,此时Eureka Client无法向Eureka Server发起注册和续约请求,Eureka Server中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险,然而此时的Eureka Client可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除显然是不合理的。
​ 针对这种情况,Eureka设计了“自我保护机制”。在Eureka Server处,如果出现大量的服务实例过期被剔除的现象,那么该Server节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后再退出该模式;在Eureka Client处,如果向Eureka Server注册失败,将快速超时并尝试与其他的Eureka Server进行通信。“自我保护机制”的设计大大提高了Eureka的可用性。
​ ```

- 服务下线

> 《Eureka服务下线》

Eureka Client在应用销毁时,会向Eureka Server发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清除工作。

在InstanceResource中, public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication)
一行代码:boolean isSuccess = registry.cancel(app.getName(), id,
“true”.equals(isReplication));点进去cancel,发现:internalCancel(appName, id, isReplication); 查看实现:

先获取读锁,防止被其他线程修改
read.lock();
根据appName获取服务实力集群。
Map<String, Lease> gMap = registry.get(appName);
在内存中取消实例 id的服务
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}

1

添加到最近下线服务的统计队列
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + “(“ + id + “)”));
}

往下判断leaseToCancel是否为空,租约不存在,返回false,
如果存在,
设置租约下线时间。 leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
获取持有租约的服务信息,标记服务实例为instanceInfo.setActionType(ActionType.DELETED);
添加到租约变更记录队列
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));用于eureka client的增量拉取注册表信息。
释放锁。

首先通过registry根据服务名和服务实例id查询关于服务实例的租约Lease是否存在,统计最近请求下线的服务实例用于Eureka Server主页展示。如果租约不存在,返回下线失败;如果租约存在,从registry注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于Eureka Client的增量式获取注册表信息。

1
2
- 集群同步

如果Eureka Server是通过集群的方式进行部署,那么为了维护整个集群中Eureka Server注册表数据的一致性,势必需要一个机制同步Eureka Server集群中的注册表数据。

Eureka Server集群同步包含两个部分,
一部分是Eureka Server在启动过程中从它的peer节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中;
另一部分是Eureka Server每次对本地注册表进行操作时,同时会将操作同步到它的peer节点中,达到集群注册表数据统一的目的。

1.启动拉取别的peer
在Eureka Server启动类中:EurekaServerInitializerConfiguration位于EurekaServerAutoConfiguration 的import注解中。一行:eurekaServerBootstrap.contextInitialized(
进去:initEurekaServerContext();,点进去,一行:int registryCount = this.registry.syncUp();
看注释:拉取注册表从邻近节点。点击syncUp()的实现方法进去:
看循环:意思是,如果是i第一次进来,为0,不够等待的代码,直接执行下面的拉取服务实例。
将自己作为一个eureka client,拉取注册表。并通过register(instance, instance.getLeaseInfo().getDurationInSecs(), true)注册到自身的注册表中。

Eureka Server也是一个Eureka Client,在启动的时候也会进行DiscoveryClient的初始化,会从其对应的Eureka Server中拉取全量的注册表信息。在Eureka Server集群部署的情况下,Eureka Server从它的peer节点中拉取到注册表信息后,将遍历这个Applications,将所有的服务实例通过AbstractRegistry#register方法注册到自身注册表中。

int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

当执行完上面的syncUp逻辑后,在下面的openForTraffic,开启此server接受别的client注册,拉取注册表等操作。而在它首次拉取其他peer节点时,是不允许client的通信请求的。

在openForTraffic中,初始化期望client发送过来的服务数量,即上面获取到的服务数量this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold点进去,是计算自我保护的统计参数:
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
_ (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
_ serverConfig.getRenewalPercentThreshold());
服务数(每个服务每分钟续约次数)阈值
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
如果count=0,没有拉取到注册表信息,将此值设为true,表示其他peer来取空的实例信息,意味着,将不允许client从此server获取注册表信息。如果count>0,将此值设置为false,允许client来获取注册表。

后面将服务置为上线,并开启剔除的定时任务。

当Server的状态不为UP时,将拒绝所有的请求。在Client请求获取注册表信息时,Server会判断此时是否允许获取注册表中的信息。上述做法是为了避免Eureka Server在#syncUp方法中没有获取到任何服务实例信息时(Eureka Server集群部署的情况下),Eureka Server注册表中的信息影响到Eureka Client缓存的注册表中的信息。因为是全量同步,如果server什么也没同步过来,会导致client清空注册表。导致服务调用出问题。

2.Server之间注册表信息的同步复制
为了保证Eureka Server集群运行时注册表信息的一致性,每个Eureka Server在对本地注册表进行管理操作时,会将相应的操作同步到所有peer节点中。

在外部调用server的restful方法时,在com.netflix.eureka.resources包下的ApplicationResource资源中,查看每个服务的操作。比如服务注册public Response addInstance(,此方法中有
registry.register(info, “true”.equals(isReplication));点进去实现类:replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);这是一种情况。

1

在PeerAwareInstanceRegistryImpl类中,看其他操作,cancel,renew等中都有replicateToPeers,
此方法中有个peerEurekaNodes,代表一个可同步数据的eureka Server的集合,如果注册表有变化,向此中的peer节点同步。

replicateToPeers方法,它将遍历Eureka Server中peer节点,向每个peer节点发送同步请求。
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
此replicateInstanceActionsToPeers方法中,类PeerEurekaNode的实例node的各种方法,cancel,register,等,用了batchingDispatcher.process(,作用是将同一时间段内,相同服务实例的相同操作将使用相同的任务编号,在进行同步复制的时候,将根据任务编号合并操作,减少同步操作的数量和网络消耗,但是同时也造成了同步复制的延时性,不满足CAP中的C(强一致性)。
所以Eureka,只满足AP。

通过Eureka Server在启动过程中初始化本地注册表信息和Eureka Server集群间的同步复制操作,最终达到了集群中Eureka Server注册表信息一致的目的。

1

  • 获取注册表中服务实例信息

    1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Eureka Server中获取注册表的服务实例信息主要通过两个方法实现:AbstractInstanceRegistry#getApplicationsFromMultipleRegions从多地区获取全量注册表数据,AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions从多地区获取增量式注册表数据。

1、全量:
上面讲到从节点复制注册信息的时候,用方法public int syncUp() ,一行Applications apps = eurekaClient.getApplications();点进去实现类,有一行getApplicationsFromAllRemoteRegions(); 下面getApplicationsFromMultipleRegions,作用从多个地区中获取全量注册表信息,并封装成Applications返回,它首先会将本地注册表registry中的所有服务实例信息提取出来封装到Applications中,再根据是否需要拉取Region的注册信息,将远程拉取过来的Application放到上面的Applications中。最后得到一个全量的Applications。
2、在前面提到接受服务注册,接受心跳等方法中,都有recentlyChangedQueue.add(new RecentlyChangedItem(lease));作用是将新变动的服务放到最近变化的服务实例信息队列中,用于记录增量是注册表信息。getApplicationDeltasFromMultipleRegions,实现了从远处eureka server中获取增量式注册表信息的能力。

在EurekaServer对外restful中,在com.netflix.eureka.resources下,
@GET
public Response getApplication(@PathParam("version") String version,
@HeaderParam("Accept") final String acceptHeader,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {

其中有一句:String payLoad = responseCache.get(cacheKey);在responseCache初始化的时候,它的构造方法ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {中,Value value = generatePayload(key);点进去有一句:registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));从远程获取delta增量注册信息。但是这个只是向client提供,不向server提供,因为server可以通过每次变更自动同步到peer。

获取增量式注册表信息将会从recentlyChangedQueue中获取最近变化的服务实例信息。recentlyChangedQueue中统计了近3分钟内进行注册、修改和剔除的服务实例信息,在服务注册AbstractInstanceRegistry#registry、接受心跳请求AbstractInstanceRegistry#renew和服务下线AbstractInstanceRegistry#internalCancel等方法中均可见到recentlyChangedQueue对这些服务实例进行登记,用于记录增量式注册表信息。#getApplicationsFromMultipleRegions方法同样提供了从远程Region的Eureka Server获取增量式注册表信息的能力。

RestTemplate 的使用

  • 什么是 RestTemplate?

RestTemplate 是 Spring 框架提供的基于 REST 的服务组件,底层是对 HTTP 请求及响应进行了封装,提供了很多访问 RETS 服务的方法,可以简化代码开发。

  • 如何使用 RestTemplate?

1、创建 Maven 工程,pom.xml。

2、创建实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.southwind.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Student {
private long id;
private String name;
private int age;
}

3、Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.southwind.controller;

import com.southwind.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

import java.util.Collection;

@RestController
@RequestMapping("/rest")
public class RestHandler {
@Autowired
private RestTemplate restTemplate;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return restTemplate.getForEntity("http://localhost:8010/student/findAll",Collection.class).getBody();
}

@GetMapping("/findAll2")
public Collection<Student> findAll2(){
return restTemplate.getForObject("http://localhost:8010/student/findAll",Collection.class);
}

@GetMapping("/findById/{id}")
public Student findById(@PathVariable("id") long id){
return restTemplate.getForEntity("http://localhost:8010/student/findById/{id}",Student.class,id).getBody();
}

@GetMapping("/findById2/{id}")
public Student findById2(@PathVariable("id") long id){
return restTemplate.getForObject("http://localhost:8010/student/findById/{id}",Student.class,id);
}

@PostMapping("/save")
public void save(@RequestBody Student student){
restTemplate.postForEntity("http://localhost:8010/student/save",student,null).getBody();
}

@PostMapping("/save2")
public void save2(@RequestBody Student student){
restTemplate.postForObject("http://localhost:8010/student/save",student,null);
}

@PutMapping("/update")
public void update(@RequestBody Student student){
restTemplate.put("http://localhost:8010/student/update",student);
}

@DeleteMapping("/deleteById/{id}")
public void deleteById(@PathVariable("id") long id){
restTemplate.delete("http://localhost:8010/student/deleteById/{id}",id);
}
}

4、启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class RestTemplateApplication {
public static void main(String[] args) {
SpringApplication.run(RestTemplateApplication.class,args);
}

@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

服务消费者 consumer

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
server:
port: 8020
spring:
application:
name: consumer
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}

@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.southwind.controller;

import com.southwind.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

import java.util.Collection;

@RestController
@RequestMapping("/consumer")
public class ConsumerHandler {
@Autowired
private RestTemplate restTemplate;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return restTemplate.getForEntity("http://localhost:8010/student/findAll",Collection.class).getBody();
}

@GetMapping("/findAll2")
public Collection<Student> findAll2(){
return restTemplate.getForObject("http://localhost:8010/student/findAll",Collection.class);
}

@GetMapping("/findById/{id}")
public Student findById(@PathVariable("id") long id){
return restTemplate.getForEntity("http://localhost:8010/student/findById/{id}",Student.class,id).getBody();
}

@GetMapping("/findById2/{id}")
public Student findById2(@PathVariable("id") long id){
return restTemplate.getForObject("http://localhost:8010/student/findById/{id}",Student.class,id);
}

@PostMapping("/save")
public void save(@RequestBody Student student){
restTemplate.postForEntity("http://localhost:8010/student/save",student,null).getBody();
}

@PostMapping("/save2")
public void save2(@RequestBody Student student){
restTemplate.postForObject("http://localhost:8010/student/save",student,null);
}

@PutMapping("/update")
public void update(@RequestBody Student student){
restTemplate.put("http://localhost:8010/student/update",student);
}

@DeleteMapping("/deleteById/{id}")
public void deleteById(@PathVariable("id") long id){
restTemplate.delete("http://localhost:8010/student/deleteById/{id}",id);
}
}

服务网关

Spring Cloud 集成了 Zuul 组件,实现服务网关。

  • 什么是 Zuul?

Zuul 是 Netflix 提供的一个开源的 API 网关服务器,是客户端和网站后端所有请求的中间层,对外开放一个 API,将所有请求导入统一的入口,屏蔽了服务端的具体实现逻辑,Zuul 可以实现反向代理的功能,在网关内部实现动态路由、身份认证、IP 过滤、数据监控等。

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-zuul</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8030
spring:
application:
name: gateway
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
zuul:
routes:
provider: /p/**

属性说明:

zuul.routes.provider:给服务提供者 provider 设置映射

  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;

@EnableZuulProxy
@EnableAutoConfiguration
public class ZuulApplication {
public static void main(String[] args) {
SpringApplication.run(ZuulApplication.class,args);
}
}

注解说明:

@EnableZuulProxy:包含了 @EnableZuulServer,设置该类是网关的启动类。

@EnableAutoConfiguration:可以帮助 Spring Boot 应用将所有符合条件的 @Configuration 配置加载到当前 Spring Boot 创建并使用的 IoC 容器中。

  • Zuul 自带了负载均衡功能,修改 provider 的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.southwind.controller;

import com.southwind.entity.Student;
import com.southwind.repository.StudentRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

import java.util.Collection;

@RestController
@RequestMapping("/student")
public class StudentHandler {
@Autowired
private StudentRepository studentRepository;

@Value("${server.port}")
private String port;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return studentRepository.findAll();
}

@GetMapping("/findById/{id}")
public Student findById(@PathVariable("id") long id){
return studentRepository.findById(id);
}

@PostMapping("/save")
public void save(@RequestBody Student student){
studentRepository.saveOrUpdate(student);
}

@PutMapping("/update")
public void update(@RequestBody Student student){
studentRepository.saveOrUpdate(student);
}

@DeleteMapping("/deleteById/{id}")
public void deleteById(@PathVariable("id") long id){
studentRepository.deleteById(id);
}

@GetMapping("/index")
public String index(){
return "当前端口:"+this.port;
}
}

Ribbon 负载均衡

  • 什么是 Ribbon?

Spring Cloud Ribbon 是一个负载均衡解决方案,Ribbon 是 Netflix 发布的负载均衡器,Spring Cloud Ribbon 是基于 Netflix Ribbon 实现的,是一个用于对 HTTP 请求进行控制的负载均衡客户端。

在注册中心对 Ribbon 进行注册之后,Ribbon 就可以基于某种负载均衡算法,如轮询、随机、加权轮询、加权随机等自动帮助服务消费者调用接口,开发者也可以根据具体需求自定义 Ribbon 负载均衡算法。实际开发中,Spring Cloud Ribbon 需要结合 Spring Cloud Eureka 来使用,Eureka Server 提供所有可以调用的服务提供者列表,Ribbon 基于特定的负载均衡算法从这些服务提供者中选择要调用的具体实例。

  • 创建 Module,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
server:
port: 8040
spring:
application:
name: ribbon
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class RibbonApplication {
public static void main(String[] args) {
SpringApplication.run(RibbonApplication.class,args);
}

@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

@LoadBalanced:声明一个基于 Ribbon 的负载均衡。

  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.southwind.controller;

import com.southwind.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import java.util.Collection;

@RestController
@RequestMapping("/ribbon")
public class RibbonHandler {
@Autowired
private RestTemplate restTemplate;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return restTemplate.getForObject("http://provider/student/findAll",Collection.class);
}

@GetMapping("/index")
public String index(){
return restTemplate.getForObject("http://provider/student/index",String.class);
}
}

Feign

  • 什么是 Feign?

与 Ribbon 一样,Feign 也是由 Netflix 提供的,Feign 是一个声明式、模版化的 Web Service 客户端,它简化了开发者编写 Web 服务客户端的操作,开发者可以通过简单的接口和注解来调用 HTTP API,Spring Cloud Feign,它整合了 Ribbon 和 Hystrix,具有可插拔、基于注解、负载均衡、服务熔断等一系列便捷功能。

相比较于 Ribbon + RestTemplate 的方式,Feign 大大简化了代码的开发,Feign 支持多种注解,包括 Feign 注解、JAX-RS 注解、Spring MVC 注解等,Spring Cloud 对 Feing 进行了优化,整合了 Ribbon 和 Eureka,从而让 Feign 的使用更加方便。

  • Ribbon 和 Feign 的区别

Ribbon 是一个通用的 HTTP 客户端工具,Feign 是基于 Ribbon 实现的。

  • Feign 的tedian

1、Feign 是一个声明式的 Web Service 客户端。

2、支持 Feign 注解、Spring MVC 注解、JAX-RS 注解。

3、Feign 基于 Ribbon 实现,使用起来更加简单。

4、Feign 集成了 Hystrix,具备服务熔断的功能。

  • 创建 Module,pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
server:
port: 8050
spring:
application:
name: feign
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableFeignClients
public class FeignApplication {
public static void main(String[] args) {
SpringApplication.run(FeignApplication.class,args);
}
}
  • 创建声明式接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.southwind.feign;

import com.southwind.entity.Student;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.Collection;

@FeignClient(value = "provider")
public interface FeignProviderClient {
@GetMapping("/student/findAll")
public Collection<Student> findAll();

@GetMapping("/student/index")
public String index();
}
  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.southwind.controller;

import com.southwind.entity.Student;
import com.southwind.feign.FeignProviderClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collection;

@RestController
@RequestMapping("/feign")
public class FeignHandler {

@Autowired
private FeignProviderClient feignProviderClient;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return feignProviderClient.findAll();
}

@GetMapping("/index")
public String index(){
return feignProviderClient.index();
}
}
  • 服务熔断,application.yml 添加熔断机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 8050
spring:
application:
name: feign
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true

feign.hystrix.enabled:是否开启熔断器。

  • 创建 FeignProviderClient 接口的实现类 FeignError,定义容错处理逻辑,通过 @Component 注解将 FeignError 实例注入 IoC 容器中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.southwind.feign.impl;

import com.southwind.entity.Student;
import com.southwind.feign.FeignProviderClient;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
public class FeignError implements FeignProviderClient {
@Override
public Collection<Student> findAll() {
return null;
}

@Override
public String index() {
return "服务器维护中......";
}
}
  • 在 FeignProviderClient 定义处通过 @FeignClient 的 fallback 属性设置映射。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.southwind.feign;

import com.southwind.entity.Student;
import com.southwind.feign.impl.FeignError;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.Collection;

@FeignClient(value = "provider",fallback = FeignError.class)
public interface FeignProviderClient {
@GetMapping("/student/findAll")
public Collection<Student> findAll();

@GetMapping("/student/index")
public String index();
}

Hystrix 容错机制

在不改变各个微服务调用关系的前提下,针对错误情况进行预先处理。

  • 设计原则

    1、服务隔离机制

    2、服务降级机制

    3、熔断机制

    4、提供实时的监控和报警功能

    5、提供实时的配置修改功能

Hystrix 数据监控需要结合 Spring Boot Actuator 来使用,Actuator 提供了对服务的健康监控、数据统计,可以通过 hystrix.stream 节点获取监控的请求数据,提供了可视化的监控界面。

  • 创建 Maven,pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
port: 8060
spring:
application:
name: hystrix
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
management:
endpoints:
web:
exposure:
include: 'hystrix.stream'
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableFeignClients
@EnableCircuitBreaker
@EnableHystrixDashboard
public class HystrixApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixApplication.class,args);
}
}

注解说明:

@EnableCircuitBreaker:声明启用数据监控

@EnableHystrixDashboard:声明启用可视化数据监控

  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.southwind.controller;

import com.southwind.entity.Student;
import com.southwind.feign.FeignProviderClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collection;

@RestController
@RequestMapping("/hystrix")
public class HystrixHandler {
@Autowired
private FeignProviderClient feignProviderClient;

@GetMapping("/findAll")
public Collection<Student> findAll(){
return feignProviderClient.findAll();
}

@GetMapping("/index")
public String index(){
return feignProviderClient.index();
}
}
  • 启动成功之后,访问 http://localhost:8060/actuator/hystrix.stream 可以监控到请求数据,
  • 访问 http://localhost:8060/hystrix,可以看到可视化的监控界面,输入要监控的地址节点即可看到该节点的可视化数据监控。

Spring Cloud 配置中心

Spring Cloud Config,通过服务端可以为多个客户端提供配置服务。Spring Cloud Config 可以将配置文件存储在本地,也可以将配置文件存储在远程 Git 仓库,达到不重启微服务来配置客户端的效果,创建 Config Server,通过它管理所有的配置文件。

本地文件系统

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建 application.yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8762
spring:
application:
name: nativeconfigserver
profiles:
active: native
cloud:
config:
server:
native:
search-locations: classpath:/shared

注解说明

profiles.active:配置文件的获取方式

cloud.config.server.native.search-locations:本地配置文件存放的路径

  • resources 路径下创建 shared 文件夹,并在此路径下创建 configclient-dev.yml。
1
2
3
server:
port: 8070
foo: foo version 1
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;

@SpringBootApplication
@EnableConfigServer
public class NativeConfigServerApplication {
public static void main(String[] args) {
SpringApplication.run(NativeConfigServerApplication.class,args);
}
}

注解说明

@EnableConfigServer:声明配置中心。

创建客户端读取本地配置中心的配置文件

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建 bootstrap.yml,配置读取本地配置中心的相关信息。
1
2
3
4
5
6
7
8
9
spring:
application:
name: configclient
profiles:
active: dev
cloud:
config:
uri: http://localhost:8762
fail-fast: true

注解说明

cloud.config.uri:本地 Config Server 的访问路径

cloud.config.fail-fase:设置客户端优先判断 Config Server 获取是否正常。

通过spring.application.name 结合spring.profiles.active拼接目标配置文件名,configclient-dev.yml,去 Config Server 中查找该文件。

  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NativeConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(NativeConfigClientApplication.class,args);
}
}
  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.southwind.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/native")
public class NativeConfigHandler {

@Value("${server.port}")
private String port;

@Value("${foo}")
private String foo;

@GetMapping("/index")
public String index(){
return this.port+"-"+this.foo;
}
}

Spring Cloud Config 远程配置

  • 创建配置文件,上传至 GitHub
1
2
3
4
5
6
7
8
9
server:
port: 8070
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
spring:
application:
name: configclient
  • 创建 Config Server,新建 Maven 工程,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server:
port: 8888
spring:
application:
name: configserver
cloud:
config:
server:
git:
uri: https://github.com/southwind9801/aispringcloud.git
searchPaths: config
username: root
password: root
label: master
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;

@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigServerApplication.class,args);
}
}

创建 Config Client

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建 bootstrap.yml
1
2
3
4
5
6
7
8
9
10
11
12
spring:
cloud:
config:
name: configclient
label: master
discovery:
enabled: true
service-id: configserver
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/

注解说明

spring.cloud.config.name:当前服务注册在 Eureka Server 上的名称,与远程仓库的配置文件名对应。

spring.cloud.config.label:Git Repository 的分支。

spring.cloud.config.discovery.enabled:是否开启 Config 服务发现支持。

spring.cloud.config.discovery.service-id:配置中心在 Eureka Server 上注册的名称。

  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigClientApplication.class,args);
}
}
  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.southwind.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/hello")
public class HelloHandler {

@Value("${server.port}")
private String port;

@GetMapping("/index")
public String index(){
return this.port;
}
}

服务跟踪

Spring Cloud Zipkin

Zipkin 是一个可以采集并且跟踪分布式系统中请求数据的组件,让开发者可以更加直观的监控到请求在各个微服务所耗费的时间等,Zipkin:Zipkin Server、Zipkin Client。

创建 Zipkin Server

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-server</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>
<version>2.9.4</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
server:
port: 9090
  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import zipkin.server.internal.EnableZipkinServer;

@SpringBootApplication
@EnableZipkinServer
public class ZipkinApplication {
public static void main(String[] args) {
SpringApplication.run(ZipkinApplication.class,args);
}
}

注解说明

@EnableZipkinServer:声明启动 Zipkin Server

创建 Zipkin Client

  • 创建 Maven 工程,pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
  • 创建配置文件 application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
server:
port: 8090
spring:
application:
name: zipkinclient
sleuth:
web:
client:
enabled: true
sampler:
probability: 1.0
zipkin:
base-url: http://localhost:9090/
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/

属性说明

spring.sleuth.web.client.enabled:设置开启请求跟踪

spring.sleuth.sampler.probability:设置采样比例,默认是 1.0

srping.zipkin.base-url:Zipkin Server 地址

  • 创建启动类
1
2
3
4
5
6
7
8
9
10
11
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ZipkinClientApplication {
public static void main(String[] args) {
SpringApplication.run(ZipkinClientApplication.class,args);
}
}
  • Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.southwind.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/zipkin")
public class ZipkinHandler {

@Value("${server.port}")
private String port;

@GetMapping("/index")
public String index(){
return this.port;
}
}