SpringBoot线程池

为什么要使用线程池?

并发编程实际为将一些操作采用多线程的方式进行异步处理,以此提升效率。

但是程序的线程资源并不是无限的,创建和销毁都消耗了性能资源。

使用线程池的目的是为了控制线程数量,防止高压的情况下无限创建线程导致OOM。

核心线程的存在可以让线程可以重复使用,一定程度上减少了线程创建和销毁导致的消耗。

SpringBoot 对JAVA中的 ThreadPoolExecutor 进行了封装。

相关注解

@EnableAsync 类上添加,启动类或配置类,开启异步支持

@Async(value = "Bean 名称") 方法上添加,将这个方法执行异步操作

可以创建多个线程池,创建多个原因是方便不同的业务使用。

如果所有业务共用一个线程池,可能会出现一个业务占满了所有线程,导致其他业务全部在队列等待中,无法访问的情况。

ThreadPoolTaskExecutor 配置类

在一个配置类中,可以创建多个线程池,方法名为@Async(value = "Bean 名称") 中的Bean名称。

@Configuration
public class ExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor smsThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(16);
        //配置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(32);
        //配置队列大小
        threadPoolTaskExecutor.setQueueCapacity(1024);
        //是否允许核心线程超时
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        //非核心线程池空闲超时销毁时间(单位秒)
        threadPoolTaskExecutor.setKeepAliveSeconds(15);
        //配置线程池中的线程的名称前缀 (指定一下线程名的前缀)
        threadPoolTaskExecutor.setThreadNamePrefix("async-sms-");
        //配置线程池拒绝策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        threadPoolTaskExecutor.initialize();
        //返回线程池对象
        return threadPoolTaskExecutor;
    }

	@Bean
    public ThreadPoolTaskExecutor smsThreadPoolTaskExecutor() {
		......
	}
}

方法使用线程池

使用@Async(value = "Bean 名称") 为需要异步的方法选择线程池

@Async(value = "smsThreadPoolTaskExecutor")
public void doSMS() {
   ......
}

获取异步方法的返回值

主线程调用了3个异步方法,是可以获取到3个异步方法的返回值,再结束主线程的。

先要通过实现Future来创建一个返回类

public class ResultFuture<V> implements Future<V> {

    //结果
    private V result;

    public ResultFuture(V result) {
        this.result = result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return this.result;
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return this.result;
    }
}

修改异步方法

// 修改返回类
@Async(value = "smsThreadPoolTaskExecutor")
public Future<String> doSMS() {
	// new 自定义的返回类,
	return new ResultFuture<>("ok");
}

public void testAsync() {
    Future<String> r = this.doSMS();
    try {
		// 通过get方法获取返回值
        String s = r.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
}