项目中,当发生并行操作时,一般都会用到线程池处理多线程任务,线程池的规则类似于数据库连接池,在此不予赘述。
jdk自带线程池,此处主要讲述Spring框架自带的线程池ThreadPoolTaskExecutor。
Runnable、Callable和Future
通过实现Runnable和Callable接口实现一个线程任务,从而能放入Executor进行线程管理。其中,Callable可以理解为带有返回值的Runnable,并且Callable需要实现的方法不是run()而是call(),该方法返回一个泛型对象。当我们把一个需要返回值的线程任务放进线程池后,线程池会返回一个Future对象,借助该对象,我们可以调用get()方法获取线程的状态,调用get()会阻塞当前线程直到返回结果。
创建一个Callable任务如下:
class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("start task"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++){ sum += i; } return sum; } }
想要执行需要转化为Future的子类FutureTask类,注意捕获异常:
Future future =new FutureTask(new Callable<String>(){ @Override public String call(){ System.out.println("x"); return "done"; } }); try { ((FutureTask) future).run(); //x System.out.print(future.get()); //done future.isDone(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
线程池创建
通过ThreadPoolExecutor创建原生的线程池,我们使用Spring框架的线程池:
ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(10); //核心线程数 taskExecutor.setMaxPoolSize(30); //最大线程数 taskExecutor.setQueueCapacity(100); //任务队列长度 taskExecutor.initialize();
ThreadPoolTaskExecutor底层其实还是由ThreadPoolExecutor实现的。
当然除了静态工具类的创建方式还可直接Autowired注入,配置xml:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数 --> <property name="corePoolSize" value="10" /> <!-- 线程池维护线程的最大数量 --> <property name="maxPoolSize" value=30" /> <!-- 允许的空闲时间, 默认60秒 --> <property name="keepAliveSeconds" value="60" /> <!-- 任务队列 --> <property name="queueCapacity" value="100" /> <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 --> <property name="allowCoreThreadTimeOut" value="true"/> <!-- 对拒绝task的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" /> </property> </bean>
以下是一个线程池工具类的实例(基于ThreadPoolTaskExecutor实现):
public class ThreadPoolUtil { private ThreadPoolUtil(){ } private static Logger log = LoggerFactory.getLogger(ThreadPoolUtil.class); private static ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor(); static{ taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(100); //任务队列最大长度 taskExecutor.setQueueCapacity(200); taskExecutor.setAllowCoreThreadTimeOut(true); taskExecutor.setKeepAliveSeconds(2000); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); } public static void addTask(Runnable task) { taskExecutor.execute(task); } //Callable任务 public static Future addReturnedTask(Callable task){ return taskExecutor.submit(task); } public static void shutdown() { try{ taskExecutor.shutdown(); }catch(Exception ex) { log.error("关闭ThreadPoolTaskUtil失败", ex); } } }
运行机制
工作流程
线程池调用execute(Runnable task)或是submit(Runnable task)、submit(Callable task)添加任务到线程池中(submit方法执行会返回Future对象,从而可以判断线程执行与否),当当前线程数小于核心线程数时,可以直接创建新线程,当超过核心线程数时,将线程任务加入workQueue中,当workQueue队列排满时,开始创建新线程直到线程池总量执行至超过最大线程数。
当线程数超过maxPoolSize时,如果仍旧有任务添加进去,会抛出RejectedExecutionException异常(根据拒绝策略会有不同的选择),必须集中处理这种情况。
当有空闲线程且线程数超过核心线程时,系统会等待时间,超过keepAliveTime便会销毁闲置的线程。
相关参数
接下来解释一些参数:
corePoolSize:核心线程数,线程数小于该值时,会新建线程执行任务,并保存在缓存队列中
maximumPoolSize:最大线程数,最多同时开启的线程数
keepAliveTime:空闲时间,当超过核心线程数的线程无任务被搁置时,超过这个时间线程就会被回收
workQueue:工作队列,当线程数=核心线程时,会将新的任务放进去,直到队列慢创建新的线程
allowCoreThreadTimeOut:若为true,则线程只要搁置超出空闲时间都会被回收,不管是否大于核心线程
rejectedExecutionHandler:拒绝处理器,用于指定当任务队列满时会如何处理新任务
拒绝策略
也就是RejectedExecutionHandler,共有四种策略:
ThreadPoolExecutor.DiscardPolicy()
任务队列满时,有新的任务时会直接被丢弃
ThreadPoolExecutor.AbortPolicy()
任务队列满时,有新的任务时将会抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardOldestPolicy()
任务队列满时,有新的任务时会丢弃队伍队头任务,然后重新尝试执行
ThreadPoolExecutor.CallerRunsPolicy()
任务队列满时,会在execute方法的调用线程中运行被拒绝的任务