Spring 多线程
Spring 的多线程开发的时候也会涉及到的,定时器、请求、异步任务等,不过很多都是Spring封装好的,拿来就用,最近项目里面用到了,就在 Spring 4.3.x 的版本上整理了相关的类。Spring 涉及到的类在好几个模块下:
1.基础类和接口定义在 spring-core 模块中,主要有:TaskExecutor、SyncTaskExecutor、AsyncTaskExecutor、SimpleAsyncTaskExecutor、TaskExecutorAdapter等。可以拿来即用,但是比较简单,一般无法满足我们的需求。其他模块相关的,都是继承和实现这个模块中的类。
2.定时器相关的类在 spring-context-support 中,有 scheduling.quartz, scheduling.commonj 2个包,另写一份文档。
3.spring-context 模块中的 scheduling 包下,有基础类的更完善的实现,比如:ThreadPoolExecutorFactoryBean、ThreadPoolTaskExecutor、ScheduledExecutorService等,这些类几乎可以满足我们的需求了。
基础的TaskExecutor
TaskExecutor 是最基础的任务执行接口,实现可以使用不同的执行策略:同步、异步、线程池等。跟jdk1.5的java.util.concurrent.Executor是相等的,Spring 3.0后是继承的 Executor。
同步的SyncTaskExecutor
主要用在测试场景,在许多场景中,推荐使用异步的 asynchronous
public class SyncTaskExecutor implements TaskExecutor, Serializable {
}
异步的AsyncTaskExecutor
实现 这个接口意味着 execute(Runnable) 方法,不会在调用线程中执行 Runnable,而是异步的在其他线程执行
public interface AsyncTaskExecutor extends TaskExecutor {
/** Constant that indicates immediate execution */
// 立即执行
long TIMEOUT_IMMEDIATE = 0;
/** Constant that indicates no time limit */
// 执行时间不受限制
long TIMEOUT_INDEFINITE = Long.MAX_VALUE;
}
SimpleAsyncTaskExecutor
实现了 TaskExecutor ,为每一个 task 生成一个新的线程,异步执行。支持通过bean的配置 “concurrencyLimit” 实现线程数的配置,默认线程数是无限的。使用于大量的短任务。
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
/**
* execute 方法最终会调用该方法,默认如果没有实现 *threadFactory,则创建一个新的线程,并开始运行
**/
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
}
**TaskExecutorAdapter **
适配器使用jdk的 Executor 暴露了 Spring 的 TaskExecutor,以此实现一个 TaskExecutor 运行.
public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
private final Executor concurrentExecutor;
private TaskDecorator taskDecorator;
}
ThreadPoolExecutorFactoryBean
实现了 FactoryBean 接口的类,通过 getObject() 方法可以获取到 ExecutorService 的实例,ExecutorService 的实现也是用的ThreadPoolExecutor。当然,这里面的 corePoolSize、maxPoolSize默认是1、 Integer.MAX_VALUE,不适合开发具体的场景的话,可以再修改参数。
public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport
implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
private ExecutorService exposedExecutor;
}
ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private ThreadPoolExecutor threadPoolExecutor;
// 具体的ThreadPoolExecutor初始化
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
}
ThreadPoolTaskScheduler
带调度功能的线程池配置,线程池实现是由 ScheduledExecutorService 的实现类 ScheduledThreadPoolExecutor,也有一些调度的方法。
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
private volatile ScheduledExecutorService scheduledExecutor;
// 具体的ScheduledThreadPoolExecutor实例生成
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
if (this.removeOnCancelPolicy) {
if (setRemoveOnCancelPolicyAvailable && this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
}
else {
logger.info("Could not apply remove-on-cancel policy - not a Java 7+ ScheduledThreadPoolExecutor");
}
}
return this.scheduledExecutor;
}
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
}