前言

工作中运用 OpenFeign 进行跨服务调用,最近发现线上经常会遇到恳求失利。

java.net.ConnectException: Connection refused: connect

经过排查咱们发现不是接口超时,而是有时分会恳求到现已下线的服务导致报错。这多发生在服务提供者体系布置的时分,因为体系布置的时分会调用 Spring 容器 的 shutdown() 办法, Eureka Server 那里能够及时的除掉下线服务,可是咱们上一篇文章中现已知道 readOnlyCacheMapreadWriteCacheMap 同步距离是 30SClient 端拉取实例信息的距离也是 30S,这就导致 Eureka Client 端存储的实例信息数据在一个临界时刻范围内都是脏数据。

调整 Eureka 参数

已然因为 Eureka 本身的设计导致会存在服务实例信息推迟更新,那么咱们尝试去修正几个参数来下降推迟

  • Client 端设置服务拉取距离3S, eureka.client.registry-fetch-interval-seconds = 3
  • Server 端设置读写缓存同步距离 3S,eureka.server.response-cache-update-interval-ms=3000

这样设置之后经过一段时刻的调查发现情况有所改善,但还是存在这个问题,并且并没有改善多少。

LoadBalancer 如何获取实例信息

EurekaOpenFeign 的文章中都有提到,OpenFeign 进行长途调用的时分会经过负载均衡器选取一个实例建议 Http 恳求。咱们 SpringCloud 版本是 2020,现已移除了 ribbon,运用的是 LoadBalancer

经过 debug OpenFeign 调用的源码发现它是从 DiscoveryClientServiceInstanceListSupplier的构造办法获取实例信息集合 List<ServiceInstance> 的,内部调用到 CachingServiceInstanceListSupplier 构造办法,要点看 CacheFlux.lookup()

public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
   super(delegate);
   this.serviceInstances = CacheFlux.lookup(key -> {
      // TODO: configurable cache name
      Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
      if (cache == null) {
         if (log.isErrorEnabled()) {
            log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
         }
         return Mono.empty();
      }
      List<ServiceInstance> list = cache.get(key, List.class);
      if (list == null || list.isEmpty()) {
         return Mono.empty();
      }
      return Flux.just(list).materialize().collectList();
   }, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
         .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
            Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
            if (cache == null) {
               if (log.isErrorEnabled()) {
                  log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
               }
            }
            else {
               cache.put(key, instances);
            }
         }).then());
}

这儿先去查缓存,缓存有就直接返回,缓存没有就去 CompositeDiscoveryClient.getInstances() 查询。查询完毕之后会回调到 CacheFlux.lookup(param,param2) 第二个参数的代码块,将成果放进缓存。

@Override
public List<ServiceInstance> getInstances(String serviceId) {
   if (this.discoveryClients != null) {
      for (DiscoveryClient discoveryClient : this.discoveryClients) {
         List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
         if (instances != null && !instances.isEmpty()) {
            return instances;
         }
      }
   }
   return Collections.emptyList();
}

要点看这个办法,因为咱们运用的是 Eureka 作为注册中心。所以这儿会调用 EurekaDiscoveryClient
getInstances(), 最终咱们发现底层其实便是从 DiscoveryClient.localRegionApps 获取的服务实例信息。

现在咱们清楚了,OpenFeign 调用时,负载均衡战略还不是从 DiscoveryClient.localRegionApps 直接拿的实例信息,是自己缓存了一份。这样一来,不只要计算 Eureka 本身的推迟,还要算上缓存时刻。

SpringCloud 中有很多内存缓存的完成,这儿咱们选择的是 Caffine

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>3.0.5</version>
</dependency>

引进依赖即可自动装备,从 LoadBalancerCacheProperties 中咱们能够发现默许的缓存时刻是 35S,所以要处理咱们的问题还需要下降缓存时刻,也能够直接不运用内存缓存,每次都从 EurekaClient 拉取过来的实例信息读取即可。

经过上面的剖析咱们能够发现运用 OpenFeign 内部调用是无法彻底治愈这个问题的,因为 Eureka 的推迟是无法彻底治愈的,只能说在维持机器功能等各方面的前提下尽可能的缩短数据同步定时使命的时刻距离。所以咱们能够换个角度,让调用失利的恳求进行重试。

LoadBalancer 的两种负载均衡战略

经过源码调试,发现它有两种负载均衡战略 RoundRobinLoadBalancer、RandomLoadBalancer,轮询和随机,默许的战略是轮询

LoadBalancerClientConfiguration

@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
      LoadBalancerClientFactory loadBalancerClientFactory) {
   String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
   return new RoundRobinLoadBalancer(
         loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}

这两种战略都比较简单,没什么好说的。

轮询战略存在的问题

咱们能够调查下轮询战略的完成,它有一个原子类型的成员变量,用来记载下一次恳求要落到哪一个实例

final AtomicInteger position;

中心逻辑

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
   if (instances.isEmpty()) {
      if (log.isWarnEnabled()) {
         log.warn("No servers available for service: " + serviceId);
      }
      return new EmptyResponse();
   }
   // TODO: enforce order?
   int pos = Math.abs(this.position.incrementAndGet());
   ServiceInstance instance = instances.get(pos % instances.size());
   return new DefaultResponse(instance);
}

能够看到完成逻辑很简单,用 position 自增,然后实例数量进行求余,到达轮询的作用。乍一看如同没问题,可是它存在这样一种情况。现在咱们有两个实例 192.168.1.121、192.168.1.122,这时分两个恳求 A、B 过来,A 恳求了 121 的,B 恳求了 122 的,然后 A 恳求失利了触发重试,因为轮询机制 A 重试的实例又回到了 121 ,这样就有问题了,因为还是失利,咱们要让重试的恳求一定能重试到其他的服务实例。

运用 TraceId 完成自定义负载均衡战略

因为重试的时分是在 OpenFeign 内部从头建议了一次 HTTP 恳求,所以 traceId 并没有变,咱们能够先从 MDC 上下文获取 traceId,再从缓存中获取 traceId 对应的值,假如没有就随机生成一个数字然后和 RoundRobinLoadBalancer 相同自增求余,假如缓存中现已有了就直接自增求余,这样就一定能重试到不同的实例。

这儿咱们缓存组件还是运用 Caffeine

private final LoadingCache<String, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
      .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
   if (serviceInstances.isEmpty()) {
      log.warn("No servers available for service: " + serviceId);
      return new EmptyResponse();
   }
   String traceId = MDC.get("traceId");
   if (traceId == null) {
      traceId = UUID.randomUUID().toString();
   }
   AtomicInteger seed = positionCache.get(traceId);
   int s = seed.getAndIncrement();
   int pos = s % serviceInstances.size();
   return new DefaultResponse(serviceInstances.stream()
         .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
         .collect(Collectors.toList()).get(pos));
}

这个办法是从哈希哥那里学到的,他的主页 /user/501033… 。

完了之后声明咱们自己的负载均衡器的 Bean

public class FeignLoadBalancerConfiguration {
    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSuppliers, Environment environment) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinRetryDifferentInstanceLoadBalancer(serviceInstanceListSuppliers,name);
    }
}

之后在主启动类上运用 @LoadBalancerClient 指定咱们自定义的负载均衡器

@LoadBalancerClient(name = "feign-test-product", configuration = FeignLoadBalancerConfiguration.class)

设置 LoadBalancer Zone

还记得之前 Eureka 咱们为了处理本机调用的时分会经过负载均衡调用到开发环境的机器设置了 zoneSpringCloud LoadBalancer 也提供了这个装备,并且从源码中咱们能够发现,最终会以 LoadBalancer 设置的为准,假如没有为它设置,那么会运用 Eureka 中的 zone 装备,假如设置了就会掩盖 Eurekazone 设置

EurekaLoadBalancerClientConfiguration.postprocess()

@PostConstruct
public void postprocess() {
   if (!StringUtils.isEmpty(zoneConfig.getZone())) {
      return;
   }
   String zone = getZoneFromEureka();
   if (!StringUtils.isEmpty(zone)) {
      if (LOG.isDebugEnabled()) {
         LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
      }
      zoneConfig.setZone(zone);
   }
}

结语

假如这篇文章对你有帮助,记得点赞加重视!你的支持便是我继续创造的动力!