大家好,这篇文章跟大家探讨下日常使用线程池的各种姿势,重点介绍怎么在 Spring 环境中正确使用线程池。
首先问大家一个问题,你日常开发中是怎样使用线程池的?
我想大致可以分为以下四种情况:
1.方法级,随用随建,用完关闭
2.类级共享,定义个 static final 修饰的 ThreadPoolExecutor,该类及子类(看修饰符)所有对象、方法共享
3.业务共享,按业务类型定义多个 ThreadPoolExecutor,相同业务类型共享同一线程池对象
4.全局共享,服务所有地方共享同一全局线程池
一般来说,优先使用方式3,其次方式2,不要使用方式1跟4,原因如下
1.线程池出现的目的就是为了统一管理线程资源,减少频繁创建销毁线程带来的开销,使用池化技术复用线程执行任务,提升系统性能,在高并发、异步化的场景下,方法级使用根本达不到此目的,反而会使性能变低。
2.全局共享一个线程池,任务执行参差不齐,相互影响,高耗时任务会占满线程池资源,导致低耗时任务没机会执行;同时如果任务之间存在父子关系,可能会导致死锁的发生,进而引发OOM。
3.按业务类型进行线程池隔离,各任务执行互不影响,粒度也比类级共享大点,不会创建大量线程池,降低系统调度压力,像 Hystrix 线程池隔离就可以理解成这种模式。
综上,建议大家都采用方式3,按业务功能分类定义线程池。
Spring 作为一个 Bean 容器,我们通常会将业务中用到的 ThreadPoolExecutor 注册到 Spring 容器中,同时 Spring 在容器刷新的时候会注入相应的 ThreadPoolExecutor 对象 到我们的业务 Bean 中,然后就可以直接使用了,比如定义如下(ThreadPoolBuilder是封装的一个建造者模式实现):
@Configuration
public class ThreadPoolConfiguration {
@Bean
public ThreadPoolExecutor jobExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000)
.build();
}
@Bean
public ThreadPoolExecutor remotingExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(SYNCHRONOUS_QUEUE.getName(), null)
.build();
}
@Bean
public ThreadPoolExecutor consumeExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000)
.build();
}
}
以上按使用场景定义了三个线程池实例,一个用来执行耗时的定时任务、一个用来执行远程RPC调用、一个用来执行 Mq 消费。
这样使用 ThreadPoolExecutor 有个问题,Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。
我们知道 Spring 中的 Bean 是有生命周期的,如果 Bean 实现了 Spring 相应的生命周期接口(InitializingBean、DisposableBean接口),在 Bean 初始化、容器关闭的时候会调用相应的方法来做相应处理。
所以建议最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 线程池实现。
这里分享一个源码阅读技巧,就是开源项目和Spring整合时,很多同学不知从何入手阅读源码。
我们知道Spring提供了很多的扩展点,第三方框架整合Spring其实大多也都是基于这些扩展接口来做的,所以我们可以从这些扩展接口入手,断点调试,一步步深入框架内核。
这些扩展包括但不限于以下接口:
BeanFactoryPostProcessor:在Bean实例化之前对BeanDefinition进行修改
BeanPostProcessor:在Bean初始化前后对Bean进行一些修改包装增强,比如返回代理对象
Aware:一个标记接口,实现该接口及子接口的类会收到Spring的通知回调,赋予某种Spring框架的能力,比如ApplicationContextAware、EnvironmentAware等
ApplicationContextInitializer:在上下文准备阶段,容器刷新之前做一些初始化工作,比如我们常用的配置中心client基本都是继承该初始化器,在容器刷新前将配置从远程拉到本地,然后封装成PropertySource放到Environment中供使用
ApplicationListener:Spring事件机制,监听特定的应用事件(ApplicationEvent),观察者模式的一种实现
FactoryBean:用来自定义Bean的创建逻辑(Mybatis、Feign等等)
ImportBeanDefinitionRegistrar:定义@EnableXXX注解,在注解上Import了一个 ImportBeanDefinitionRegistrar,实现注册BeanDefinition到容器中
ApplicationRunner/CommandLineRunner:容器启动后回调,执行一些初始化工作
上述列出了几个比较常用的接口,但是Spring扩展远不于此,还有很多扩展接口大家可以自己去了解。
DynamicTp 框架内部定义了 DtpExecutor 线程池类,其继承关系如下:

EagerDtpExecutor:参考 Tomcat 线程池设计,调整了下线程池的执行流程,优先创建线程执行任务而不是放入队列中,主要用于IO密集型场景,继承 DtpExecutor
DtpExecutor:重写了 ThreadPoolExecutor 的 execute 方法、beforeExecute 方法、afterExecute 方法,主要做任务包装、执行超时、等待超时记录等,继承 DtpLifecycleSupport
DtpLifecycleSupport:实现了 Spring 中的 InitializingBean, DisposableBean 接口,在 Bean 初始化、Spring 容器销毁时执行相应的逻辑,destroy 方法逻辑如下:
@Override
public void destroy() {
internalShutdown();
}
public void internalShutdown() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);
}
if (this.waitForTasksToCompleteOnShutdown) {
// 如果需要等待任务执行完毕,则调用shutdown()会执行先前已提交的任务,拒绝新任务提交,线程池状态变成 SHUTDOWN
this.shutdown();
} else {
// 如果不需要等待任务执行完毕,则直接调用shutdownNow()方法,尝试中断正在执行的任务,返回所有未执行的任务,线程池状态变成 STOP, 然后调用 Future 的 cancel 方法取消
for (Runnable remainingTask : this.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary();
}
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds <= 0) {
return;
}
try {
// 配合 shutdown 使用,阻塞当前线程,等待已提交的任务执行完毕或者超时
if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor {} to terminate", threadPoolName);
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName);
}
Thread.currentThread().interrupt();
}
}
DynamicTp 框架在整合 Spring 的时候,也是用到了上述说的扩展接口。
扩展1
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DtpBeanDefinitionRegistrar.class)
public @interface EnableDynamicTp {
}
使用过 DynamicTp 的小伙伴应该知道需要在启动类加 @EnableDynamicTp 注解,该注解其实就用到了 ImportBeanDefinitionRegistrar 扩展,主要代码如下:
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
DtpProperties dtpProperties = new DtpProperties();
// 将环境变量中的线程池相关配置绑定到 DtpProperties 对象上
PropertiesBinder.bindDtpProperties(environment, dtpProperties);
val executors = dtpProperties.getExecutors();
if (CollUtil.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
return;
}
executors.forEach(x -> {
// 判断线程池类型(common or eager)
Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
String beanName = x.getThreadPoolName();
// 线程池对象属性
Map<String, Object> properties = buildProperties(x);
// 构造器参数
Object[] args = buildArgs(executorTypeClass, x);
BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args);
});
}
代码解读:
1.我们知道 ImportBeanDefinitionRegistrar 的实现是在 Spring 容器刷新的时候执行的,在此之前在上下文准备阶段已经从配置中心拉取到线程池配置放到环境变量里了,所以第一步我们将环境变量里的线程池相关配置绑定到 DtpProperties 对象上。
2.然后构造 BeanDefinitionBuilder 对象,设置构造函数参数、设置属性值,注册到 BeanDefinition 到 Spring 容器中
public static void doRegister(BeanDefinitionRegistry registry,
String beanName,
Class<?> clazz,
Map<String, Object> properties,
Object... constructorArgs) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
// 设置构造器参数,老八股文了
for (Object constructorArg : constructorArgs) {
builder.addConstructorArgValue(constructorArg);
}
// 设置属性及值的KV对,后续在Bean populateBean 的时候会通过反射set方法赋值
if (CollUtil.isNotEmpty(properties)) {
properties.forEach(builder::addPropertyValue);
}
registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
3.Spring 容器刷新时会根据注册的 BeanDefinition 创建配置的线程池对象,初始化赋值,并注入到引用的 Bean 中。这样就不用在手动用 @Bean 声明线程池对象了,只需要在配置中心配置即可
扩展2
DtpPostProcessor 继承 BeanPostProcessor,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些处理,主要用来获取线程池对象注册到 DynamicTp 框架内部定义的容器中(就个 Map)
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor)) {
return bean;
}
if (bean instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
}
registerDtp(dtpExecutor);
return dtpExecutor;
}
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
DynamicTp dynamicTp;
try {
dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
if (dynamicTp == null) {
return bean;
}
} catch (NoSuchBeanDefinitionException e) {
log.error("There is no bean with the given name {}", beanName, e);
return bean;
}
String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;
registerCommon(poolName, (ThreadPoolExecutor) bean);
return bean;
}
扩展3
ApplicationListener 主要用来解耦逻辑,发布监听事件,core 模块跟 adapter 模块通信主要就用该扩展,以及框架会监听 Spring 容器启动的各阶段事件,做相应的逻辑处理
public abstract class AbstractDtpHandleListener implements GenericApplicationListener {
@Override
public boolean supportsEventType(ResolvableType resolvableType) {
Class<?> type = resolvableType.getRawClass();
if (type != null) {
return RefreshEvent.class.isAssignableFrom(type) ||
CollectEvent.class.isAssignableFrom(type) ||
AlarmCheckEvent.class.isAssignableFrom(type);
}
return false;
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
}
扩展4
ApplicationRunner,等 Spring 容器启动后,会调用该钩子函数,执行一些初始化操作,DtpMonitor、DtpRegistry 等都用到了该扩展
所以 DynamicTp 的正确使用姿势,线程池只需在配置中心声明,然后服务启动时框架会基于 Spring 的这些扩展自动创建线程池对象注入到所需的 Bean 中,代码中不需要显示声明

DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为 动态调参、通知报警、运行监控、三方包线程池管理等几大类。

经过几个版本迭代,目前最新版本v1.0.7具有以下特性
特性 ✅
代码零侵入:所有配置都放在配置中心,对业务代码零侵入
轻量简单:基于 springboot 实现,引入 starter,接入只需简单4步就可完成,顺利3分钟搞定
高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)
线上大规模应用:参考美团线程池实践,美团内部已经有该理论成熟的应用经验
多平台通知报警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书报警,同时提供 SPI 接口可自定义扩展实现
监控:定时采集线程池指标数据,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
任务增强:提供任务包装功能,实现TaskWrapper接口即可,如 TtlTaskWrapper 可以支持线程池上下文信息传递,以及给任务设置标识id,方便问题追踪
兼容性:JUC 普通线程池也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可
可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务
多模式:参考Tomcat线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池
支持多配置中心:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos、Apollo、Zookeeper、Consul,同时也提供 SPI 接口可自定义扩展实现
中间件线程池管理:集成管理常用第三方组件的线程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等组件的线程池管理(调参、监控报警)
目前累计 1.5k star,感谢你的star,欢迎pr,业务之余一起给开源贡献一份力量
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
我正在尝试使用ruby编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?
我是ruby的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp
我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:
所以,Ruby1.9.1现在是declaredstable.Rails应该与它一起工作,并且正在慢慢地将gem移植到它。它具有native线程和全局解释器锁(GIL)。自从GIL到位后,原生线程是否比1.9.1中的绿色线程有任何优势? 最佳答案 1.9中的线程是原生的,但它们被“放慢了速度”,一次只允许一个线程运行。这是因为如果线程真的并行运行,它会混淆现有代码。优点:IO现在在线程中是异步的。如果一个线程阻塞在IO上,那么另一个线程将继续执行直到IO完成。C扩展可以使用真正的线程。缺点:任何非线程安全的C扩展都可能存在使用Thre
我在一个ruby文件中有一个函数可以像这样写入一个文件File.open("myfile",'a'){|f|f.puts("#{sometext}")}这个函数在不同的线程中被调用,使得像上面这样的文件写入不是线程安全的。有谁知道如何以最简单的方式使这个文件写入线程安全?更多信息:如果重要的话,我正在使用rspec框架。 最佳答案 您可以通过File#flock给锁File.open("myfile",'a'){|f|f.flock(File::LOCK_EX)f.puts("#{sometext}")}
我编写了几个类来控制我想如何处理多个网站,两者都使用类似的方法(即登录、刷新)。每个类都打开自己的WATIR浏览器实例。classSite1definitialize@ie=Watir::Browser.newenddeflogin@ie.goto"www.blah.com"endend无线程的main中的代码示例如下require'watir'require_relative'site1'agents=[]agents这工作正常,但在当前代理完成登录之前不会移动到下一个代理。我想合并多线程来处理这个问题,但似乎无法让它工作。require'watir'require_relative
代码:threads=[]Thread.abort_on_exception=truebegin#throwexceptionsinthreadssowecanseethemthreadseputs"EXCEPTION:#{e.inspect}"puts"MESSAGE:#{e.message}"end崩溃:.rvm/gems/ruby-2.1.3@req/gems/activesupport-4.1.5/lib/active_support/dependencies.rb:478:inload_missing_constant':自动加载常量MyClass时检测到循环依赖稍加研究后,
任何人都可以推荐任何详细介绍Ruby多线程/多处理的复杂性的好的多线程/处理书籍/网站吗?我尝试使用ruby线程,基本上在1.9vm上的无死锁代码中它在jruby中遇到了死锁。是的,我意识到差异很大(jruby没有GIL),但我想知道是否有用于ruby中多线程编程的策略或类集,我只需要继续阅读。旁注:从java到ruby必须定义是否需要重新输入锁,这有点奇怪。 最佳答案 如果你使用Ruby1.9,你可以试试Fiber,它是Ruby中线程的一大改进http://ruby-doc.org/core-1.9/classes/F
我想从不同线程调用一个公共(public)枚举器。当我执行以下操作时,enum=(0..1000).to_enumt1=Thread.newdopenum.nextsleep(1)endt2=Thread.newdopenum.nextsleep(1)endt1.joint2.join它引发了一个错误:Fibercalledacrossthreads.当enum在从t1调用一次后从t2调用时。为什么Ruby设计为不允许跨线程调用枚举器(或纤程),以及是否有其他方法可以提供类似的功能?我猜测枚举器/纤程上的操作的原子性在这里是相关的,但我不完全确定。如果这是问题所在,那么在使用时独占锁定