使用SpringBoot的线程池处理异步任务 您所在的位置:网站首页 线程池队列有序 使用SpringBoot的线程池处理异步任务

使用SpringBoot的线程池处理异步任务

2023-08-30 21:22| 来源: 网络整理| 查看: 265

这是我参与更文挑战的第3天,活动详情查看: 更文挑战

介绍

最近在做项目时了解了最好不要直接使用 new Thread(...).start() ,用线程池来隐式的维护所有线程,具体为什么可以看这篇文章。

其实 SpringBoot 已经为我们创建并配置好了这个东西,这里就来学习一下如何来使用 SpringBoot 为我们设置的线程池。

如有错误欢迎联系我指正!

使用 创建配置类

首先我们需要创建一个配置类来让 SpringBoot 加载,并且在里面设置一些自己需要的参数。

@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("开启SpringBoot的线程池!"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(corePoolSize); // 设置最大线程数 executor.setMaxPoolSize(maxPoolSize); // 设置缓冲队列大小 executor.setQueueCapacity(queueCapacity); // 设置线程的最大空闲时间 executor.setKeepAliveSeconds(keepAliveSeconds); // 设置线程名字的前缀 executor.setThreadNamePrefix(namePrefix); // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务 // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 线程池初始化 executor.initialize(); return executor; } }

首先,

@Configuration 的作用是表明这是一个配置类。 @EnableAsync 的作用是启用 SpringBoot 的异步执行

其次,关于线程池的设置有

corePoolSize: 核心线程数,当向线程池提交一个任务时池里的线程数小于核心线程数,那么它会创建一个线程来执行这个任务,一直直到池内的线程数等于核心线程数 maxPoolSize: 最大线程数,线程池中允许的最大线程数量。关于这两个数量的区别我会在下面解释 queueCapacity: 缓冲队列大小,用来保存阻塞的任务队列(注意这里的队列放的是任务而不是线程) keepAliveSeconds: 允许线程存活时间(空闲状态下),单位为秒,默认60s namePrefix: 线程名前缀 RejectedExecutionHandler: 拒绝策略,当线程池达到最大线程数时,如何处理新任务。线程池为我们提供的策略有 AbortPolicy:默认策略。直接抛出 RejectedExecutionException DiscardPolicy:直接丢弃掉被拒绝的任务,且不会抛出任何异常 DiscardOldestPolicy:丢弃掉队列中的队头元素(也就是最早在队列里的任务),然后重新执行 提交该任务 的操作 CallerRunsPolicy:由主线程自己来执行这个任务,该机制将减慢新任务的提交

关于 corePoolSize 与 maxPoolSize 的区别也是困惑了我很久,官方文档上的解释说的很清楚。我的理解如下:

这个线程池其实是有点“弹性的”。当向线程池提交任务时:

若 当前运行的线程数 < corePoolSize

则 即使其它的工作线程处于空闲状态,线程池也会创建一个新线程来执行任务

若 corePoolSize < 当前运行的线程数 < maxPoolSize

若 队列已满

则 创建新线程来执行任务

若 队列未满

则 加入队列中

若 当前运行的线程数 > maxPoolSize

若 队列已满

则 拒绝任务

若 队列未满

则 加入队列中

所以当想要创建固定大小的线程池时,将 corePoolSize 和 maxPoolSize 设置成一样就行了。

最后,别忘了给方法加上 @Bean 注解,否则 SpringBoot 不会加载。

这里因为我加了 @Value 注解,可以在 application.properties 中配置相关数据,如

# 配置核心线程数 async.executor.thread.core_pool_size = 5 # 配置最大线程数 async.executor.thread.max_pool_size = 5 # 配置队列大小 async.executor.thread.queue_capacity = 999 # 配置线程池中的线程的名称前缀 async.executor.thread.name.prefix = test-async- # 配置线程最大空闲时间 async.executor.thread.keep_alive_seconds = 30 在具体的方法中使用

配置完上面那些使用起来就轻松了,只需在业务方法前加上 @Async 注解,它就会异步执行了。

如在 Service 中添加如下方法。

@Async("asyncServiceExecutor") // 注:@Async所修饰的函数不能定义为static类型,这样异步调用不会生效 public void asyncTest() throws InterruptedException { logger.info("任务开始!"); System.out.println("异步执行某耗时的事..."); System.out.println("如休眠5秒"); Thread.sleep(5000); logger.info("任务结束!"); }

然后在 Controller 里调用一下这个方法,在网页上连续发送请求做一个测试。 我这里连续发起了5次请求,可以看到这5个任务确实是成功地异步执行了。

res1.png

我设置的线程池大小为 5,所以当超过 5 个任务被提交时,会放入阻塞队列中。

res2.png

到这里,基本的异步执行任务就实现了。

自定义

虽然它提供给我们的线程池已经很强大了,但是有时候我们还需要一些额外信息,比如说我们想知道这个线程池已经执行了多少任务了、当前有多少线程在运行、阻塞队列里还有多少任务等等。那么这个时候我们就可以自定义我们的线程池。

自定义很简单,自己写一个类继承 Spring 提供的 ThreadPoolTaskExecutor,在此之上做修改就好了。如

public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class); public void info() { ThreadPoolExecutor executor = getThreadPoolExecutor(); if (executor == null) return; String info = "线程池" + this.getThreadNamePrefix() + "中,总任务数为 " + executor.getTaskCount() + " ,已处理完的任务数为 " + executor.getCompletedTaskCount() + " ,目前正在处理的任务数为 " + executor.getActiveCount() + " ,缓冲队列中任务数为 " + executor.getQueue().size(); logger.info(info); } @Override public void execute(Runnable task) { info(); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { info(); super.execute(task, startTimeout); } @Override public Future submit(Runnable task) { info(); return super.submit(task); } @Override public Future submit(Callable task) { info(); return super.submit(task); } @Override public ListenableFuture submitListenable(Runnable task) { info(); return super.submitListenable(task); } @Override public ListenableFuture submitListenable(Callable task) { info(); return super.submitListenable(task); } }

然后在我们的配置类 ExecutorConfig 中将

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 改为 ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();, 也就是使用我们自己定义的线程池,然后会在相应的任务执行(execute())、任务提交(submit())时打印我们需要的信息了。

打印结果,在此之前已处理了5个任务:

res3.png

查询线程池信息

上面自定义线程池后想查询信息只能在线程池中的方法查询,那如果我想在任意地方查询线程池的信息呢?那也是可以的,而且非常简单。我这里写一个接口来查询线程池的任务信息以做示例。

首先修改一下线程池里的 Info() 方法,让它返回我们需要的信息。

public String info() { ThreadPoolExecutor executor = getThreadPoolExecutor(); if (executor == null) return "线程池不存在"; String info = "线程池" + this.getThreadNamePrefix() + "中,总任务数为 " + executor.getTaskCount() + " ,已处理完的任务数为 " + executor.getCompletedTaskCount() + " ,目前正在处理的任务数为 " + executor.getActiveCount() + " ,缓冲队列中任务数为 " + executor.getQueue().size(); logger.info(info); return info; }

然后修改一下配置类 ExecutorConfig 里注册线程池的方法,让它注册的是我们自定义的线程池类型。

@Bean(name = "asyncServiceExecutor") public VisibleThreadPoolTaskExecutor asyncServiceExecutor() { logger.info("开启SpringBoot的线程池!"); // 修改这里,要返回我们自己定义的类 VisibleThreadPoolTaskExecutor VisibleThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(corePoolSize); // 设置最大线程数 executor.setMaxPoolSize(maxPoolSize); // 设置缓冲队列大小 executor.setQueueCapacity(queueCapacity); // 设置线程的最大空闲时间 executor.setKeepAliveSeconds(keepAliveSeconds); // 设置线程名字的前缀 executor.setThreadNamePrefix(namePrefix); // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 线程池初始化 executor.initialize(); return executor; }

再在我们需要信息的地方自动注入这个线程池,然后调用一下 info() 方法就能得到信息了,我这里以在 Service 层中获取信息为例。

@Service public class DemoService { private static final Logger logger = LoggerFactory.getLogger(DemoService.class); // 别忘了这里要用 SpringBoot 的自动注入 @Autowired private VisibleThreadPoolTaskExecutor executor; // @SneakyThrows 这个注解是Lombok带的,我为了代码简洁使用的。你也可以使用 try catch 的方法。 @SneakyThrows @Async("asyncServiceExecutor") public void asyncTest() { logger.info("任务开始!"); System.out.println("异步执行某耗时的事..."); System.out.println("如休眠5秒"); Thread.sleep(5000); logger.info("任务结束!"); // 你甚至可以在任务结束时再打印一下线程池信息 executor.info(); } public String getExecutorInfo() { return executor.info(); } }

最后在 Controller 层中调用一下,就大功告成了!

@RestController public class DemoController { @Autowired private DemoService demoService; @GetMapping("/async") public void async() { demoService.asyncTest(); } @GetMapping("/info") public String info() { return demoService.getExecutorInfo(); } }

来看一下测试的结果吧,我这里调用 /async 一口气开启了 15 个任务,然后在不同时间使用 /info 来看看信息。

刚开始时的结果:

res4.png

一口气提交了15个任务后的中间结果:

res5.png

所有任务都执行完了的最终结果:

res6.png

总结

本篇到这里就结束了,篇幅略长。总结一下,要想在SpringBoot中使用它提供的线程池其实很简单,只要两步:

注册线程池(使用 @Bean 来注册),设置一些自己想要的参数; 在想要异步调用的方法上加上 @Async 注解。

当然你也可以不使用 @Async 注解,直接在想开线程的地方自动注入你注册的线程池,然后像普通线程池一样使用就行了。

其实关于这一方面的知识也讲得并不够详尽,比如线程池里还有哪些方法、SpringBoot是如何为我们弄得这么方便的等等,还需要多多补充知识。

参考 stackoverflow.com/questions/1… docs.oracle.com/javase/8/do… blog.csdn.net/m0_37701381…


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有