草庐IT

Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)

干货满满张哈希 2023-03-28 原文
本系列是 我TM人傻了 系列第五期[捂脸],往期精彩回顾:

本篇文章涉及底层设计以及原理,以及问题定位和可能的问题点,非常深入,篇幅较长,所以拆分成上中下三篇:

  • :问题简单描述以及 Spring Cloud Gateway 基本结构和流程以及底层原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的链路追踪以及为何会出现这个问题
  • :现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其他可能的问题点,以及如何解决

Spring Cloud Sleuth 是如何增加链路信息

通过之前的源码分析,我们知道,在最开始的 TraceWebFilter,我们将 Mono 封装成了一个 MonoWebFilterTrace,它的核心源码是:

@Override public void subscribe(CoreSubscriber<? super Void> subscriber) { Context context = contextWithoutInitialSpan(subscriber.currentContext()); Span span = findOrCreateSpan(context); //将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC //日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map //简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息 try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { //将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this)); } //在 scope.close() 之后,会将链路信息从 ThreadLocal 的 Map 中剔除 } @Override public Object scanUnsafe(Attr key) { if (key == Attr.RUN_STYLE) { //执行的方式必须是不能切换线程,也就是同步的 //因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了 return Attr.RunStyle.SYNC; } return super.scanUnsafe(key); } WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。

经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。

我们应用中丢失链路信息的地方

通过查看日志我们发现,启用 RequestBody 缓存的地方,都有链路缺失。这个 RequestBody 缓存我们使用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter,其核心源码是:

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest, Function<ServerHttpRequest, Mono<T>> function) { ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); return //读取 Body,由于 TCP 拆包,所以需要他们拼接到一起 DataBufferUtils.join(exchange.getRequest().getBody()) //如果没有 Body,则直接返回空 DataBuffer .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator()))) //decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body //之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路 .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); } 为何会使用这个 AdaptCachedBodyGlobalFilter 呢?获取请求 Body 是通过 exchange.getRequest().getBody() 获取的,其结果是一个 Flux<DataBuffer>.请求的 Body 是一次性的,如果你需要请求重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 已经结束。所以,对于需要重复调用,例如重试,一对多路由转发的情况,需要将请求 Body 缓存起来,就是经过这个 GatewayFilter。但是经过这个 GatewayFilter 之后,链路信息就没了,可以通过以下这个简单项目进行复现(项目地址):

引入依赖:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.6</version> </parent> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置--> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${disruptor.version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> 对所有路径开启 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false) public class ApiGatewayConfiguration { @Autowired private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter; @Autowired private GatewayProperties gatewayProperties; @PostConstruct public void init() { gatewayProperties.getRoutes().forEach(routeDefinition -> { //对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId()); adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent); }); } } 配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):

server: port: 8181 spring: application: name: apiGateway cloud: gateway: httpclient: connect-timeout: 500 response-timeout: 60000 routes: - id: first_route uri: http://httpbin.org predicates: - Path=/httpbin/** filters: - StripPrefix=1 添加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter 之前,一个在 AdaptCachedBodyGlobalFilter 之后。这两个 Filter 非常简单,只是打一行日志。

@Log4j2 @Component public class PreLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("before AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; } } @Log4j2 @Component public class PostLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("after AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; } } 最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。

启动这个应用,之后访问 http://127.0.0.1:8181/httpbin/anything,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:

2021-09-08 06:32:35.457 INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter 2021-09-08 06:32:35.474 INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter

为何链路信息会丢失

我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:

return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then( Mono.defer(() -> { //省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套 //读取 Body,由于 TCP 拆包,所以需要他们拼接到一起 DataBufferUtils.join(exchange.getRequest().getBody()) //如果没有 Body,则直接返回空 DataBuffer .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator()))) //decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body //之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路 .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); }) .then(Mono.empty())) ), //调用对应的 Handler TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }); ); 其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下:

//首先我们创建一个新的 Span Span span = tracer.newTrace(); //声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperator class MonoWebFilterTrace<T> extends MonoOperator<T, T> { protected MonoWebFilterTrace(Mono<? extends T> source) { super(source); } @Override public void subscribe(CoreSubscriber<? super T> actual) { //将 subscribe 用 span 包裹 try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) { source.subscribe(actual); //在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志 log.info("stopped"); } } } Mono.defer(() -> new MonoWebFilterTrace( Mono.fromRunnable(() -> { log.info("first"); }) //模拟 FluxReceive .then(Mono.delay(Duration.ofSeconds(1)) .doOnSuccess(longSignal -> log.info(longSignal)))) ).subscribe(aLong -> log.info(aLong)); Mono.delay 和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:

2021-09-08 07:12:45.236 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first 2021-09-08 07:12:45.240 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped 2021-09-08 07:12:46.241 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0) 2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete() 2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0 在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。

有关Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 难道Lua没有和Ruby的method_missing相媲美的东西吗? - 2

    我好像记得Lua有类似Ruby的method_missing的东西。还是我记错了? 最佳答案 表的metatable的__index和__newindex可以用于与Ruby的method_missing相同的效果。 关于ruby-难道Lua没有和Ruby的method_missing相媲美的东西吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/7732154/

  4. ruby-on-rails - rails 目前在重启后没有安装 - 2

    我有一个奇怪的问题:我在rvm上安装了ruby​​onrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(

  5. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

    我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

  6. 没有类的 Ruby 方法? - 2

    大家好!我想知道Ruby中未使用语法ClassName.method_name调用的方法是如何工作的。我头脑中的一些是puts、print、gets、chomp。可以在不使用点运算符的情况下调用这些方法。为什么是这样?他们来自哪里?我怎样才能看到这些方法的完整列表? 最佳答案 Kernel中的所有方法都可用于Object类的所有对象或从Object派生的任何类。您可以使用Kernel.instance_methods列出它们。 关于没有类的Ruby方法?,我们在StackOverflow

  7. ruby-on-rails - Rails 3,嵌套资源,没有路由匹配 [PUT] - 2

    我真的为这个而疯狂。我一直在搜索答案并尝试我找到的所有内容,包括相关问题和stackoverflow上的答案,但仍然无法正常工作。我正在使用嵌套资源,但无法使表单正常工作。我总是遇到错误,例如没有路线匹配[PUT]"/galleries/1/photos"表格在这里:/galleries/1/photos/1/edit路线.rbresources:galleriesdoresources:photosendresources:galleriesresources:photos照片Controller.rbdefnew@gallery=Gallery.find(params[:galle

  8. ruby-on-rails - 有没有办法为 CarrierWave/Fog 设置上传进度指示器? - 2

    我在Rails应用程序中使用CarrierWave/Fog将视频上传到AmazonS3。有没有办法判断上传的进度,让我可以显示上传进度如何? 最佳答案 CarrierWave和Fog本身没有这种功能;你需要一个前端uploader来显示进度。当我不得不解决这个问题时,我使用了jQueryfileupload因为我的堆栈中已经有jQuery。甚至还有apostonCarrierWaveintegration因此您只需按照那里的说明操作即可获得适用于您的应用的进度条。 关于ruby-on-r

  9. ruby - 没有类方法获取 Ruby 类名 - 2

    如何在Ruby中获取BasicObject实例的类名?例如,假设我有这个:classMyObjectSystem我怎样才能使这段代码成功?编辑:我发现Object的实例方法class被定义为returnrb_class_real(CLASS_OF(obj));。有什么方法可以从Ruby中使用它? 最佳答案 我花了一些时间研究irb并想出了这个:classBasicObjectdefclassklass=class这将为任何从BasicObject继承的对象提供一个#class您可以调用的方法。编辑评论中要求的进一步解释:假设你有对象

  10. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

随机推荐