spring cloud gateway+spring cloud loadbalancer 组合指定版本权重分流
上次写了在gateway上使用ribbon的实现方式。因为考虑到ribbon是阻塞版本,不适合在gateway上使用,所以切换到了spring cloud loadbanlancer 并且配合使用 CircuitBreaker断路器
0. 参考
文章 1 Spring Tips: Spring Cloud Loadbalancer
文章 2 Spring Cloud LoadBalancer 官方文档
文章 3 Spring Cloud Commons 之 loadbalancer 源码笔记
版本:
spring-cloud.version Hoxton.SR7
spring-boot-starter-parent 2.3.2.RELEASE
1. 思路
仍旧是基于 Weight Route Predicate Factory 的改造。想法是通过在config上配置版本号和权重比例,达到指定版本分流的目的。
例如,配置文件像下面这样配置
- id: temp_old uri: lb://TEMPLATE predicates: - Path=/temp/** - VersionWeight=group1, 99, v1 #权重比例,与版本号 filters: - StripPrefix=1 - id: temp_new uri: lb://TEMPLATE predicates: - Path=/temp/** - VersionWeight=group1, 1, v2 filters: - StripPrefix=1
99%的流量将进入老版本v1,只有1%的流量进入到新版v2。
gateway是用的webflux的,而我的服务是普通servlet的,所以在服务端的负载均衡策略还是使用ribbion 的做法
1.1 调用链路
请求链路
客户端
-> Weight Route Predicate(权重断言)
-> ReactiveLoadBalancerClientFilter (如果在uri上配置了“lb”则会进入响应式负载均衡)
-> chooes(ServerWebExchange exchange) (该方法从LoadBalancerClientFactory 中获取ReactorLoadBalancer<ServiceInstance> 就是负载均衡策略bean)
->loadBalancer.choose(createRequest()) (负载均衡算法 选择出服务实例)
->请求到具体服务
完成服务实例选择
1.2 自定义负载均衡策略
官方文档中提到自定义负载均衡策略可以使用自定义的负载均衡配置文件
public class CustomLoadBalancerConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RandomLoadBalancer(loadBalancerClientFactory .getLazyProvider(name, ServiceInstanceListSupplier.class), name); } }
定义好了之后通过@LoadBalancerClient来指定每一个Service(代码里的'stores' 为serviceId)对应的自定义配置
@Configuration @LoadBalancerClient(value = "stores", configuration = CustomLoadBalancerConfiguration.class) public class MyConfiguration { }
2. 改造点
写一个grayContext, 基于threadlocal处理version的上下文传递。自定义一个 VersionLoadBalancer 继承 ReactorServiceInstanceLoadBalancer,实现内部的choose 方法,(可以参考 RoundRobinLoadBalancer 的实现)。
public class VersionLoadBalancer implements ReactorServiceInstanceLoadBalancer{ ... //省略掉其他的代码 @Override public Mono<Response<ServiceInstance>> choose(Request request) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map(this::getInstanceResponse); } private Response<ServiceInstance> getInstanceResponse( List<ServiceInstance> instances) { if (instances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } // GrayUtil 自定义工具类从当前线程中取出传递的version String version = GrayUtil.getCurrentConextStrValue("version"); if(StringUtil.isNotEmpty(version)){ //如果没有配置元数据version的情况,就随机取一个 参考RoundRobinLoadBalancer if(!instances.stream().allMatch((item->item.getMetadata() !=null && item.getMetadata().get("version") != null))){ int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); } //但凡有配置version的情况,选择version相同的服务 Optional<ServiceInstance> optional = instances.stream().filter(item -> { if(item.getMetadata() != null && item.getMetadata().get("version") != null){ return version.equals(item.getMetadata().get("version")); } return false; }).findFirst(); if(optional.isPresent()){ return new DefaultResponse(optional.get()); } } log.warn(String.format("No servers available for service %s, version: %s", this.serviceId, version)); return new EmptyResponse(); } }
自定义LoadBalancerConfiguration
public class VersionLoadBalancerConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> versionLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new VersionLoadBalancer(loadBalancerClientFactory .getLazyProvider(name, ServiceInstanceListSupplier.class), name); } @Bean public ServiceInstanceListSupplier versionClientServiceInstanceListSupplier( ConfigurableApplicationContext context) { ServiceInstanceListSupplier serviceInstanceListSupplier = ServiceInstanceListSupplier.builder() .withDiscoveryClient() .withZonePreference() //.withCaching() //测试环境先不缓存 .build(context); return serviceInstanceListSupplier; } }
自定义@EnableGrayLoadBalancerClient注解,参照@LoadBalancerClient 实现自动给每个服务绑定自己的负载规则。
@Configuration(proxyBeanMethods = false) @Import(GrayLoadBalancerClientConfigurationRegistrar.class) @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EnableGrayLoadBalancerClient { /** * 配置文件 * @return */ Class<?> configuration() ; }
public class GrayLoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar , ResourceLoaderAware { private ApplicationContext context; private Binder binder; @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) { Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableGrayLoadBalancerClient.class.getName()); List<String> appNames = getServiceIdfromRoute(); appNames.forEach(name->{ if (attrs != null && attrs.containsKey("configuration")) { registerClientConfiguration(registry,name, attrs.get("configuration")); } }); } private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); registry.registerBeanDefinition(name + ".VersionLoadBalancerClientSpecification", builder.getBeanDefinition()); } @Override public void setResourceLoader(ResourceLoader resourceLoader) { this.context = (ApplicationContext) resourceLoader; this.binder = Binder.get(this.context.getEnvironment()); } //使用了 VersionWeight 断言的才被加入到负载中,并不是全部加入,这里可以根据具体情况判定。 private List<String> getServiceIdfromRoute(){ Map<String, Object> routes = binder.bind("spring.cloud.gateway.routes" , Bindable.mapOf(String.class, Object.class)).get(); return routes.values().stream().filter(item->{ LinkedHashMap<String, List> map = (LinkedHashMap<String, List>) item; LinkedHashMap<String, Object> predicates =(LinkedHashMap<String, Object>) map.get("predicates"); return predicates.values().stream().anyMatch(lm ->{ if(lm instanceof LinkedHashMap){ return ((LinkedHashMap)lm).values().stream().anyMatch(s->{ if(s instanceof String){ return ((String)s).contains("VersionWeight"); } return false; }); }else if(lm instanceof String){ return ((String)lm).contains("VersionWeight"); } return false; }); }).map(item->((LinkedHashMap)item).get("uri")).map(item->{ String strItem = (String)item; return strItem.substring(5); }).distinct().collect(Collectors.toList()); }
@LoadBalancerClient 是通过 LoadBalancerClientSpecification 来实现子容器隔离,和fegin 的实现方式一致。bean会加入到LoadBalancerClientFactory中,而且容器实例化是在clientFactory.getInstance(name)时才会进行实例化。getInstance这个方法是在ReactiveLoadBalancerClientFilter 中被调用的,言下之意就是当被第一次调用时才会实现容器实例化并加载LoadBalancer。
监听路由刷新事件(RefreshScopeRefreshedEvent)考虑到我们的灰度发布是在路由中配置的,当路由改变时,比方增加服务器路由,就需要重写新增服务的负载规则。
public class RefreshGrayRouteListener implements ApplicationListener<ApplicationEvent>, BeanFactoryAware, BeanDefinitionRegistryPostProcessor { private BeanFactory beanFactory; private BeanDefinitionRegistry beanDefinitionRegistry; //注册 LoadBalancerClientSpecification private static String registerClientConfiguration(BeanDefinitionRegistry registry, Object name) { String beanName = name + ".VersionLoadBalancerClientSpecification"; BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(new Class[]{VersionLoadBalancerConfiguration.class}); registry.registerBeanDefinition(beanName, builder.getBeanDefinition()); return beanName; } @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof RefreshScopeRefreshedEvent) { refreshLoadBalancerBean(); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } private void refreshLoadBalancerBean() { LoadBalancerClientFactory clientFactory = beanFactory.getBean(LoadBalancerClientFactory.class); //TODO: 实际是无法删除的 clientFactory.destroy(); //找出配置的负载均衡的服务 GatewayProperties properties = beanFactory.getBean(GatewayProperties.class); List<String> ServiceIds = properties.getRoutes().stream().filter(routeDefinition -> { return routeDefinition.getPredicates().stream().anyMatch(predicateDefinition -> "VersionWeight".equals(predicateDefinition.getName())); }).map(routeDefinition -> routeDefinition.getUri().getHost()).distinct().collect(Collectors.toList()); //卸载存在的loadBalancerClientSpecification Map<String, LoadBalancerClientSpecification> loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); loadBalancerClientSpecificationMap.forEach((k, v) -> { if (!k.startsWith("default")) { ((DefaultListableBeanFactory) beanFactory).removeBeanDefinition(k); } }); //重新注册 LoadBalancerClientSpecification ServiceIds.forEach(name -> registerClientConfiguration(beanDefinitionRegistry,name)); loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); List<LoadBalancerClientSpecification> list = new ArrayList<>(loadBalancerClientSpecificationMap.size()); loadBalancerClientSpecificationMap.forEach((k, v) -> { if (!k.startsWith("default")) { list.add(v); } }); clientFactory.setConfigurations(list); ServiceIds.forEach(name->clientFactory.getInstance(name)); } @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { this.beanDefinitionRegistry = beanDefinitionRegistry; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { } }
自定义GrayFilter,继承OncePerRequestFilter 在head中获取version 放入到threadlocal中。
@Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String version = request.getHeader("version"); if(StringUtil.isNotEmpty(version)){ GrayUtil.setCurrentConextStrValue("version", version); } filterChain.doFilter(request, response); return; }
7.
3.待解决问题
调用/refresh的时候,需要刷新两次才能初期化新加入服务的负载规则,第一次虽然刷新成功,但是没有初期化bean。 当某个服务不需要做负载均衡,也就是去掉VersionWeight 断言时,还是会进入versionWeight的负载方法(我也不知道怎么办比较好,索性不去了,将权重均分- -)kasaya
SLAYERS スレイヤーズ
0 条评论
kasaya
SLAYERS スレイヤーズ
宣传栏
目录
上次写了在gateway上使用ribbon的实现方式。因为考虑到ribbon是阻塞版本,不适合在gateway上使用,所以切换到了spring cloud loadbanlancer 并且配合使用 CircuitBreaker断路器
0. 参考
文章 1 Spring Tips: Spring Cloud Loadbalancer
文章 2 Spring Cloud LoadBalancer 官方文档
文章 3 Spring Cloud Commons 之 loadbalancer 源码笔记
版本:
spring-cloud.version Hoxton.SR7
spring-boot-starter-parent 2.3.2.RELEASE
1. 思路
仍旧是基于 Weight Route Predicate Factory 的改造。想法是通过在config上配置版本号和权重比例,达到指定版本分流的目的。
例如,配置文件像下面这样配置
- id: temp_old uri: lb://TEMPLATE predicates: - Path=/temp/** - VersionWeight=group1, 99, v1 #权重比例,与版本号 filters: - StripPrefix=1 - id: temp_new uri: lb://TEMPLATE predicates: - Path=/temp/** - VersionWeight=group1, 1, v2 filters: - StripPrefix=1
99%的流量将进入老版本v1,只有1%的流量进入到新版v2。
gateway是用的webflux的,而我的服务是普通servlet的,所以在服务端的负载均衡策略还是使用ribbion 的做法
1.1 调用链路
请求链路
客户端
-> Weight Route Predicate(权重断言)
-> ReactiveLoadBalancerClientFilter (如果在uri上配置了“lb”则会进入响应式负载均衡)
-> chooes(ServerWebExchange exchange) (该方法从LoadBalancerClientFactory 中获取ReactorLoadBalancer<ServiceInstance> 就是负载均衡策略bean)
->loadBalancer.choose(createRequest()) (负载均衡算法 选择出服务实例)
->请求到具体服务
完成服务实例选择
1.2 自定义负载均衡策略
官方文档中提到自定义负载均衡策略可以使用自定义的负载均衡配置文件
public class CustomLoadBalancerConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RandomLoadBalancer(loadBalancerClientFactory .getLazyProvider(name, ServiceInstanceListSupplier.class), name); } }
定义好了之后通过@LoadBalancerClient来指定每一个Service(代码里的'stores' 为serviceId)对应的自定义配置
@Configuration @LoadBalancerClient(value = "stores", configuration = CustomLoadBalancerConfiguration.class) public class MyConfiguration { }
2. 改造点
写一个grayContext, 基于threadlocal处理version的上下文传递。自定义一个 VersionLoadBalancer 继承 ReactorServiceInstanceLoadBalancer,实现内部的choose 方法,(可以参考 RoundRobinLoadBalancer 的实现)。
public class VersionLoadBalancer implements ReactorServiceInstanceLoadBalancer{ ... //省略掉其他的代码 @Override public Mono<Response<ServiceInstance>> choose(Request request) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map(this::getInstanceResponse); } private Response<ServiceInstance> getInstanceResponse( List<ServiceInstance> instances) { if (instances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } // GrayUtil 自定义工具类从当前线程中取出传递的version String version = GrayUtil.getCurrentConextStrValue("version"); if(StringUtil.isNotEmpty(version)){ //如果没有配置元数据version的情况,就随机取一个 参考RoundRobinLoadBalancer if(!instances.stream().allMatch((item->item.getMetadata() !=null && item.getMetadata().get("version") != null))){ int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); } //但凡有配置version的情况,选择version相同的服务 Optional<ServiceInstance> optional = instances.stream().filter(item -> { if(item.getMetadata() != null && item.getMetadata().get("version") != null){ return version.equals(item.getMetadata().get("version")); } return false; }).findFirst(); if(optional.isPresent()){ return new DefaultResponse(optional.get()); } } log.warn(String.format("No servers available for service %s, version: %s", this.serviceId, version)); return new EmptyResponse(); } }
自定义LoadBalancerConfiguration
public class VersionLoadBalancerConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> versionLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new VersionLoadBalancer(loadBalancerClientFactory .getLazyProvider(name, ServiceInstanceListSupplier.class), name); } @Bean public ServiceInstanceListSupplier versionClientServiceInstanceListSupplier( ConfigurableApplicationContext context) { ServiceInstanceListSupplier serviceInstanceListSupplier = ServiceInstanceListSupplier.builder() .withDiscoveryClient() .withZonePreference() //.withCaching() //测试环境先不缓存 .build(context); return serviceInstanceListSupplier; } }
自定义@EnableGrayLoadBalancerClient注解,参照@LoadBalancerClient 实现自动给每个服务绑定自己的负载规则。
@Configuration(proxyBeanMethods = false) @Import(GrayLoadBalancerClientConfigurationRegistrar.class) @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EnableGrayLoadBalancerClient { /** * 配置文件 * @return */ Class<?> configuration() ; }
public class GrayLoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar , ResourceLoaderAware { private ApplicationContext context; private Binder binder; @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) { Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableGrayLoadBalancerClient.class.getName()); List<String> appNames = getServiceIdfromRoute(); appNames.forEach(name->{ if (attrs != null && attrs.containsKey("configuration")) { registerClientConfiguration(registry,name, attrs.get("configuration")); } }); } private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); registry.registerBeanDefinition(name + ".VersionLoadBalancerClientSpecification", builder.getBeanDefinition()); } @Override public void setResourceLoader(ResourceLoader resourceLoader) { this.context = (ApplicationContext) resourceLoader; this.binder = Binder.get(this.context.getEnvironment()); } //使用了 VersionWeight 断言的才被加入到负载中,并不是全部加入,这里可以根据具体情况判定。 private List<String> getServiceIdfromRoute(){ Map<String, Object> routes = binder.bind("spring.cloud.gateway.routes" , Bindable.mapOf(String.class, Object.class)).get(); return routes.values().stream().filter(item->{ LinkedHashMap<String, List> map = (LinkedHashMap<String, List>) item; LinkedHashMap<String, Object> predicates =(LinkedHashMap<String, Object>) map.get("predicates"); return predicates.values().stream().anyMatch(lm ->{ if(lm instanceof LinkedHashMap){ return ((LinkedHashMap)lm).values().stream().anyMatch(s->{ if(s instanceof String){ return ((String)s).contains("VersionWeight"); } return false; }); }else if(lm instanceof String){ return ((String)lm).contains("VersionWeight"); } return false; }); }).map(item->((LinkedHashMap)item).get("uri")).map(item->{ String strItem = (String)item; return strItem.substring(5); }).distinct().collect(Collectors.toList()); }
@LoadBalancerClient 是通过 LoadBalancerClientSpecification 来实现子容器隔离,和fegin 的实现方式一致。bean会加入到LoadBalancerClientFactory中,而且容器实例化是在clientFactory.getInstance(name)时才会进行实例化。getInstance这个方法是在ReactiveLoadBalancerClientFilter 中被调用的,言下之意就是当被第一次调用时才会实现容器实例化并加载LoadBalancer。
监听路由刷新事件(RefreshScopeRefreshedEvent)考虑到我们的灰度发布是在路由中配置的,当路由改变时,比方增加服务器路由,就需要重写新增服务的负载规则。
public class RefreshGrayRouteListener implements ApplicationListener<ApplicationEvent>, BeanFactoryAware, BeanDefinitionRegistryPostProcessor { private BeanFactory beanFactory; private BeanDefinitionRegistry beanDefinitionRegistry; //注册 LoadBalancerClientSpecification private static String registerClientConfiguration(BeanDefinitionRegistry registry, Object name) { String beanName = name + ".VersionLoadBalancerClientSpecification"; BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(new Class[]{VersionLoadBalancerConfiguration.class}); registry.registerBeanDefinition(beanName, builder.getBeanDefinition()); return beanName; } @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof RefreshScopeRefreshedEvent) { refreshLoadBalancerBean(); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } private void refreshLoadBalancerBean() { LoadBalancerClientFactory clientFactory = beanFactory.getBean(LoadBalancerClientFactory.class); //TODO: 实际是无法删除的 clientFactory.destroy(); //找出配置的负载均衡的服务 GatewayProperties properties = beanFactory.getBean(GatewayProperties.class); List<String> ServiceIds = properties.getRoutes().stream().filter(routeDefinition -> { return routeDefinition.getPredicates().stream().anyMatch(predicateDefinition -> "VersionWeight".equals(predicateDefinition.getName())); }).map(routeDefinition -> routeDefinition.getUri().getHost()).distinct().collect(Collectors.toList()); //卸载存在的loadBalancerClientSpecification Map<String, LoadBalancerClientSpecification> loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); loadBalancerClientSpecificationMap.forEach((k, v) -> { if (!k.startsWith("default")) { ((DefaultListableBeanFactory) beanFactory).removeBeanDefinition(k); } }); //重新注册 LoadBalancerClientSpecification ServiceIds.forEach(name -> registerClientConfiguration(beanDefinitionRegistry,name)); loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); List<LoadBalancerClientSpecification> list = new ArrayList<>(loadBalancerClientSpecificationMap.size()); loadBalancerClientSpecificationMap.forEach((k, v) -> { if (!k.startsWith("default")) { list.add(v); } }); clientFactory.setConfigurations(list); ServiceIds.forEach(name->clientFactory.getInstance(name)); } @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { this.beanDefinitionRegistry = beanDefinitionRegistry; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { } }
自定义GrayFilter,继承OncePerRequestFilter 在head中获取version 放入到threadlocal中。
@Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String version = request.getHeader("version"); if(StringUtil.isNotEmpty(version)){ GrayUtil.setCurrentConextStrValue("version", version); } filterChain.doFilter(request, response); return; }
7.