引进网关的必要性
在微服务架构下,咱们一般会把相对独立的事务或功用划分为不同的服务,不同服务之间相互阻隔,独立布置。这些微服务由服务注册中心会集办理。关于来自外部的用户恳求来说,在处理恳求的中心事务逻辑之前,一般还需求对恳求做一些预处理,如权限校验、监控计算、流量约束等。假如外部恳求直接抵达微服务,那么一切的服务都需求各自负责处理这些功用,会形成这部分功用逻辑在各个服务重复呈现,增加了未来对这些基础功用的维护晋级的本钱。
所以在微服务环境下,咱们还需求一个网关组件来作为恳求进口。一些基础的恳求预处理的逻辑能够统一完成在网关这一层,这样事务服务只需求专心于处理事务逻辑即可。别的,引进网关作为统一恳求进口之后,还能够运用网关来完成一些其他的功用,比方服务维护、灰度发布等。
1. Spring Cloud Gateway 简介
Spring Cloud Gateway 是 Spring Cloud 微服务生态下的网关组件。Spring Cloud Gateway 是依据 Spring 5 和 Spring Boot 2 建立的,本质上是一个 Spring Boot 运用。在详细介绍其根本原理之前,先看一下通常而言,能够由微服务网关供给的功用。
在 Spring Cloud Gateway 发布之前,Spring Cloud 运用的是由 Netflix 开源的 Zuul 1 作为网关组件。Zuul 1 是依据传统的 Servlet API 开发的,运用了堵塞的网络模型,每个恳求需求分配专门的线程处理,所以资源开支比较大,在高并发的情况下需求创立大量的线程来处理恳求,线程数目会成为体系的瓶颈。作为取代 Spring Cloud Zuul 的组件,Spring Cloud Gateway 网络层运用了依据非堵塞的 Netty,解决了线程数瓶颈从而进步了体系功能。
微服务网关的功用
- 恳求路由:依据恳求自身的特点把恳求转发到不同的微服务是微服务网关的根本功用之一。事务运维人员需求针对不同的服务装备路由,使网关能够依据恳求的 header、途径、参数、协议等特点将其转发到对应的服务。
- 服务发现:网关是微服务环境的恳求进口。支持服务发现能使网关在转发恳求到方针服务时充分运用服务注册中心动态办理服务实例的优势,在装备路由转发的方针地址时也会更加便利。
- 修改恳求呼应:网关在收到外部恳求,将其转发到方针服务之前,能够依据需求对恳求进行修改,比假如更改恳求 header、参数等。类似地,也能够在获取到事务服务呼应之后,回来给用户前对呼应进行修改。
- 权限校验:某些事务场景在处理用户恳求时需求先对用户进行权限校验,这部分逻辑也能够由网关来负责。恳求在抵达网关时,由网关依据恳求要拜访的事务接口先对用户鉴权,只要校验经过的恳求才会转发到对应的服务,而校验不经过的恳求会被网关直接回绝。这样做能够把回绝无效恳求这一步提前到网关这一层,减少无效的流量进入到事务服务。
- 限流熔断:网关能够经过增加限流、熔断等机制来对事务服务起维护效果,进步体系全体的可用性。依据事务服务的吞吐量,网关能够约束转发到该服务的恳求数量,超出约束的恳求直接回绝或降级,这样能够避免因为过多的恳求导致事务服务负载过高的情况。当事务服务反常时,还能够经过熔断的办法抵达快速失利的效果。
- 恳求重试:关于一些幂等的恳求,当网关转发方针服务失利时,能够在网关层做自动重试。关于一些多实例布置服务,重试时还能够考虑把恳求转发到不同的实例,以进步恳求成功的概率。
- 呼应缓存:当用户恳求获取的是一些静态的或更新不频繁的数据时,一段时间内多次恳求获取到的数据很可能是一样的。关于这种情况能够将呼应缓存起来。这样用户恳求能够直接在网关层得到呼应数据,无需再去拜访事务服务,减轻事务服务的担负。
- 呼应聚合:某些情况下用户恳求要获取的呼应内容可能会来自于多个事务服务。网关作为事务服务的调用方,能够把多个服务的呼应整合起来,再一并回来给用户。
- 监控计算:因为网关是恳求进口,所以在网关这一层能够便利地对外部的拜访恳求做监控和计算,一起还能够对事务服务的呼应做监控,便利发现反常情况。
- 灰度流量:网关能够用来做服务流量的灰度切换。比方某个事务服务上线了新版本,那能够在网关这一层依照灰度策略,把一部分恳求流量切换到新版本服务上,以达到验证新版本事务服务的功用和功能的效果。
- 反常呼应处理:关于事务服务回来的反常呼应,能够在网关层在回来给用户之前做转换处理。这样能够把一些事务侧回来的反常细节躲藏,转换成用户友好的过错提示回来。
2. Spring Cloud Gateway 根本原理
Spring Cloud Gateway 运用了 Spring WebFlux 非堵塞网络结构,网络层默许运用了高功能非堵塞的 Netty Server,解决了 Spring Cloud Zuul 因为堵塞的线程模型带来的功能下降的问题。
Spring WebFlux 的内容介绍,参考作者文章: Spring WebFlux 和 Spring MVC 比照分析
Gateway 自身是一个 Spring Boot 运用,它处理恳求是逻辑是依据装备的路由对恳求进行预处理和转发。Gateway 有几个比较中心的概念:
- Route:一个 Route 由路由 ID,转发 URI,多个 Predicates 以及多个 Filters 构成。Gateway 上能够装备多个 Routes。处理恳求时会按优先级排序,找到第一个满意一切 Predicates 的 Route;
- Predicate:表示路由的匹配条件,能够用来匹配恳求的各种特点,如恳求途径、办法、header 等。一个 Route 能够包含多个子 Predicates,多个子 Predicates 终究会集并成一个;
- Filter:过滤器包含了处理恳求和呼应的逻辑,能够分为 pre 和 post 两个阶段。多个 Filter 在 pre 阶段会按优先级高到低次序履行,post 阶段则是反向履行。Gateway 包含两类 Filter。
- 大局 Filter:每种大局 Filter 大局只会有一个实例,会对一切的 Route 都生效。
- 路由 Filter:路由 Filter 是针对 Route 进行装备的,不同的 Route 能够运用不同的参数,因此会创立不同的实例。
Gateway 在发动时会创立 Netty Server,由它接纳来自 Client 的恳求。收到恳求后依据路由的匹配条件找到第一个满意条件的路由,然后恳求在被该路由装备的过滤器处理后由 Netty Client 转到方针服务。服务回来呼应后会再次被过滤器处理,终究回来给 Client。
Gateway 路由装备
Spring Cloud Gateway 自身供给了很多 Predicate 和 Filter 的完成,一些根本的功用能够经过这些现成的 Predicate 和 Filter 装备完成。这些 Gateway 自身供给的 Predicate 和 Filter 在官方文档上有详细的介绍,这里给一个大致的例子:
spring:
cloud:
gateway:
routes:
- id: test_route
uri: lb://service-A
predicates:
- Path=/hello
filters:
- SetRequestHeader=X-Request-Red, Blue
路由是 Gateway 的中心构件,不同的路由依据匹配条件能够处理不同类型的恳求,并转发到对应的方针服务。一个路由由以下几个特点组成:
- Id: 路由 ID;
- Uri: 转发恳求的方针地址;
- Order: 次序(优先级);
- Predicate: 匹配条件。多个 Predicates 会集并成一个聚合的条件;
- Filters: 路由过滤器。这些过滤器终究会和大局过滤器一起排序处理匹配成功的恳求;
- Metadata: 额定的元数据。
Gateway 恳求路由原理
Gateway 运用了 Spring WebFlux 结构,该结构处理恳求的进口在类 DispatcherHandler 。它会依据供给的 HandlerMapping 来获取处理恳求的 Handler 办法。Gateway 运用对 HandlerMapping 的完成是 RoutePredicateHandlerMapping 。
- 进来的恳求由 DispatcherHandler 处理。
- DispatcherHandler 依据 RoutePredicateHandlerMapping 获取 Handler 办法。
- RoutePredicateHandlerMapping 依靠 RouteLocator 获取一切路由装备并依据匹配条件打到恳求匹配的路由。
- RoutePredicateHandlerMapping 把恳求交给 FilteringWebHandler 处理。
- FilteringWebHandler 从恳求匹配的路由获取对应的路由 Filter,并和大局 Filter 合并结构 GatewayFilterChain ,恳求终究由 GatewayFilterChain 里的 Filter 按次序处理。
Spring Cloud Gateway 上下文(ServerWebExchange)
SpringCloud Gateway 的上下文是 ServerWebExchange,恳求的信息都存储在 ServerWebExchange 中,在网关上的后续操作都是依据上下文操作的,在 http 恳求抵达网关之后,网关进口是ReactorHttpHandlerAdapter#apply 办法,去获取恳求的 request 和 response,构建当次恳求的上下文供后续 filter 运用:
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// 获取恳求的Request,构建ReactorServerHttpRequest
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
// 构建ServerHttpResponse
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 交给HttpWebHandlerAdapter构建上下文ServerWebExchange
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
构建完 request 和 response 后,交给 HttpWebHandlerAdapter 构建上下文 ServerWebExchange:
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// 构建恳求的上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
Spring Cloud Gateway 读取路由(RouteDefinition)
咱们在装备文件中装备的一个路由规矩,对应到 Java 类便是 GatewayProperties,Spring Boot 会将装备文件映射为 Java 类,例如上文的装备:
spring:
cloud:
gateway:
routes:
- id: test_route
uri: lb://service-A
predicates:
- Path=/hello
filters:
- SetRequestHeader=X-Request-Red, Blue
路由信息映射到 GatewayProperties 后怎么获取其间的 RouteDefinition?
答案是经过 RouteDefinitionLocator。
public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
private final GatewayProperties properties;
// 结构函数设置properties
public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
this.properties = properties;
}
// 从properties中读取RouteDefinition
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.properties.getRoutes());
}
}
当然咱们获取路由信息的地方不止 properties 一种,还能够从 内存,缓存,乃至注册中心等。CompositeRouteDefinitionLocator 运用派遣器模式允许咱们组合读取路由信息。
public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {
private final Flux<RouteDefinitionLocator> delegates;
// 将 RouteDefinitionLocator 组合
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
this.delegates = delegates;
}
// 委托给 RouteDefinitionRepository 履行读取
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
}
}
Spring Cloud Gateway 的 GlobalFilter
GlobalFilter 是一切被 Gateway 阻拦的 http 恳求都要做的处理;GatewayFilter 是依据路由装备匹配predicate 的 http 恳求才会做的处理。
大局阻拦器,是一切被阻拦到的 http 恳求都要去做的处理;例如拿到一个 http 恳求后,咱们的意图是转发到下流服务,恳求结果并回来,那么一切被阻拦到的 http 恳求都需求做下列几件事:
- 依照 predicate 把契合规矩的 url 转换为真实要去恳求的 url;
- 调用真实的下流服务(依据 netty 完成的 http 调用,详细代码在 NettyRoutingFilter 类中);
- 得到 response,回来给调用方。
public interface GlobalFilter {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
接口中只要一个 filter 办法,完成类完成该接口后在 filter 中去做详细阻拦逻辑,这些 Filter 都完成了 GlobalFilter 接口:
AdaptCachedBodyGlobalFilter:优先级最高的 Filter,恳求到 gateway 后,将上下文ServerWebExchange 中已有的缓存删除 恳求信息,将此次的恳求信息缓存到上下文中。
ForwardPathFilter:假如该恳求还未被路由或 URI对象的特点不是 forward,则将该恳求对应装备的 Route 信息中 uri 的 path 设置到上下文 ServerWebExchange 中。
RouteToRequestUrlFilter:将此次恳求的 uri 和装备的 Route 规矩做 merged 处理,拿到真实代理的下流服务的地址,将得到的 url 放到上下文中,key 为
GATEWAY_REQUEST_URL_ATTR
。LoadBalancerClientFilter:网关供给了负载均衡的 Filter,详细负载规矩能够自己完成。
NoLoadBalancerClientFilter:没有负载均衡的阻拦器。
NettyRoutingFilter:网关的 http 是依据 netty 完成的,若此次恳求 scheme 是 http 或 https 则运用依据 netty 的 httpClient 履行调用,将回来结果写入上下文中。
NettyWriteResponseFilter:依据 Web Flux,若上下文中存在
CLIENT_RESPONSE_CONN_ATTR
,将呼应数据回来。WebClientHttpRoutingFilter:效果同 NettyRoutingFilter,办法同 LoadBalancerClientFilter。
WebsocketRoutingFilter:路由 WebSocket 恳求,校验逻辑在WebsocketRoutingFilter#changeSchemeIfIsWebSocketUpgrade 中。
WebClientWriteResponseFilter:效果同 NettyWriteResponseFilter。
ForwardRoutingFilter:设置此次恳求已被路由。
GatewayMetricsFilter:计算网关的功能指标。
Spring Cloud Gateway 的 GatewayFilter
GatewayFilter 是面向开发人员的,因需适配,当咱们需求给契合 predicate 的 url 做一些处理时经过装备就可增加,例如,咱们想给 path 匹配上 /test/**
的 url 增加 header,经过下列装备就可增加,这类装备是依据事务需求进行的特别装备。
public interface GatewayFilter extends ShortcutConfigurable {
/**
* Name key.
*/
String NAME_KEY = "name";
/**
* Value key.
*/
String VALUE_KEY = "value";
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
接口界说中多了 NAME_KEY
和 VALUE_KEY
,原因是 GatewayFilter 是面向开发人员的,例如咱们需求装备给 path契合 /test/** 的恳求增加 header 时,header 是 key-value 形式,这时候就用到了:
public class AddRequestHeaderGatewayFilterFactory extends AbstractNameValueGatewayFilterFactory {
@Override
public GatewayFilter apply(NameValueConfig config) {
return (exchange, chain) -> {
// 即将增加的key-value增加到上下文的header中
ServerHttpRequest request = exchange.getRequest().mutate()
.header(config.getName(), config.getValue()).build();
return chain.filter(exchange.mutate().request(request).build());
};
}
}
GlobalFilter 和 GatewayFilter 整合运用
每个 Filter 中都有一个 Order 特点,在履行时是在 FilteringWebHandler#handle办法 中对 GlobalFilter 和 GatewayFilter 进行的整合和排序,详细履行在 FilteringWebHandler#filter办法:
/**
* 整合Filter
*/
public Mono<Void> handle(ServerWebExchange exchange) {
// 依据Route信息取出装备的GatewayFilter调集
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
// 取出globalFilters
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将GatewayFilter增加到combined
combined.addAll(gatewayFilters);
// combined依据Order排优先级
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
/**
* 履行Filter
*/
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
GlobalFilter 和 GatewayFilter 自界说 Filter
- 自界说 GlobalFilter:GlobalFilter 详细的完成办法是完成接口,每个 filter 都完成了 GlobalFilter 接口:
public class GlobalTestFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if("契合事务逻辑,处理完事务逻辑,持续履行下一个filter"){
return chain.filter(exchange);
}
//不契合事务逻辑,直接回来
return "依照不契合事务逻辑处理";
}
}
- 自界说 GatewayFilter:GatewayFilter 详细的完成办法是工厂,每个工厂都继承了 AbstractGatewayFilterFactory:
public class TestGatewayFilterFactory extends AbstractGatewayFilterFactory<TestGatewayFilterFactory.Config> {
public TestGatewayFilterFactory() {
super(TestGatewayFilterFactory.Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
if("契合条件,处理事务逻辑,持续履行下一个Filter"){
return chain.filter(exchange);
}
// 不契合条件,直接回来
return "false";
};
}
public static class Config {
private String businessAttributes;
public String getBusinessAttributes() {
return businessAttributes;
}
public void setBusinessAttributes(String businessAttributes) {
this.businessAttributes = businessAttributes;
}
}
}
3. Spring Cloud Gateway 工作进程
网关发动阶段
- Yaml 文件和 GatewayProperties 文件映射,映射处理源码在 JavaBeanBinder.BeanProperty#getValue –> CollectionBinder#merge —> Binder#bindBean;
- 加载 Locator Bean,为后续读取 RouteDefinition 做准备【GatewayAutoConfiguration】;
public class GatewayAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(
GatewayProperties properties) {
return new PropertiesRouteDefinitionLocator(properties);
}
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(
Flux.fromIterable(routeDefinitionLocators));
}
@Bean
@Primary
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
return new CachingRouteLocator(
new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}
}
- 初始化GlobalFilters【FilteringWebHandler】;
public class GatewayAutoConfiguration {
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
}
public class FilteringWebHandler implements WebHandler {
private final List<GatewayFilter> globalFilters;
// 结构函数中设置globalFiltersglobalFilters
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
// 设置globalFilters
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
}
- 初始化 predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration –> RouteDefinitionRouteLocator】;
public class RouteDefinitionRouteLocator
implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
// 结构函数中初始化
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
List<RoutePredicateFactory> predicates,
List<GatewayFilterFactory> gatewayFilterFactories,
GatewayProperties gatewayProperties, ConversionService conversionService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.conversionService = conversionService;
initFactories(predicates);
gatewayFilterFactories.forEach(
factory -> this.gatewayFilterFactories.put(factory.name(), factory));
this.gatewayProperties = gatewayProperties;
}
// 设置predicate工厂
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach(factory -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key
+ " already exists, class: " + this.predicates.get(key)
+ ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (logger.isInfoEnabled()) {
logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}
public Flux<Route> getRoutes() {
// 从RouteDefinitions转换为Route,转换进程在convertToRoute办法中完成
return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
// RouteDefinition到Route的转换
private Route convertToRoute(RouteDefinition routeDefinition) {
// 从routeDefinition获取predicate
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
// 从routeDefinition获取gatewayFilters
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
// 结构Route
return Route.async(routeDefinition).asyncPredicate(predicate)
.replaceFilters(gatewayFilters).build();
}
// 获取GatewayFilters
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
// 假如默许filter不为空,则去加载
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
this.gatewayProperties.getDefaultFilters()));
}
// 假如Filter不为空,则
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(routeDefinition.getId(),
routeDefinition.getFilters()));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> {
// 从gatewayFilterFactories中依据key获取factory
GatewayFilterFactory factory = this.gatewayFilterFactories
.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find GatewayFilterFactory with name "
+ definition.getName());
}
// 获取definition设置的Filter值
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to "
+ definition.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args,
factory, this.parser, this.beanFactory);
// 每一个工厂中都有一个静态内部类Config,意图是存储咱们设置的Filter值
Object configuration = factory.newConfig();
// 将后几个参数的信息绑定到configuration
ConfigurationUtils.bind(configuration, properties,
factory.shortcutFieldPrefix(), definition.getName(), validator,
conversionService);
// 取得GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
}).collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
}
恳求处理阶段
- ReactorHttpHandlerAdapter#apply 办法是恳求到网关履行的进口;
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// 获取恳求的request和response
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 给到HttpWebHandlerAdapter履行构建
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
- HttpWebHandlerAdapter#handle 构建网关上下文 ServerWebExchange;
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// 依据恳求的request、response构建网关上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
- DispatcherHandler 用于 Http 恳求处理器/控制器的中央分发处理器,把恳求分发给已经注册的处理程序处理,DispatcherHandler 遍历 Mapping 获取对应的 handler,网关一共有 6 个 handlerMapping【此处会找到 RoutePredicateHandlerMapping,经过 RoutePredicateHandlerMapping 获取 FilteringWebHandler,经过 FilteringWebHandler 获取】;
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 遍历mapping获取handler
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
private final FilteringWebHandler webHandler;
private final RouteLocator routeLocator;
private final Integer managementPort;
private final ManagementPortType managementPortType;
// 网关发动时进行了初始化
public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
Environment environment) {
this.webHandler = webHandler;
this.routeLocator = routeLocator;
this.managementPort = getPortProperty(environment, "management.server.");
this.managementPortType = getManagementPortType(environment);
setOrder(1);
setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
}
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
// 回来FilteringWebHandler
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
}
- RoutePredicateHandlerMapping#lookupRoute 匹配路由,依据 routeLocator 获取咱们在装备我文件中装备的 Route,和当时恳求的路由做匹配;
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
// routeLocator获取咱们在装备我文件中装备的Route
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
// 当时恳求的路由做匹配
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
}
- FilteringWebHandler 创立过滤器链,履行过滤器;
public class FilteringWebHandler implements WebHandler {
// 创立过滤器链
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// 调用过滤器
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
// 履行调用
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
}