多线程应用提高(II) 线程池

  created  by  鱼鱼 {{tag}}
创建于 2018年10月16日 19:20:16 最后修改于 2020年02月25日 14:41:35

    项目中,当发生并行操作时,一般都会用到线程池处理多线程任务,线程池的规则类似于数据库连接池,在此不予赘述。

jdk自带线程池,此处主要讲述Spring框架自带的线程池ThreadPoolTaskExecutor。

Runnable、Callable和Future

    通过实现RunnableCallable接口实现一个线程任务,从而能放入Executor进行线程管理。其中,Callable可以理解为带有返回值的Runnable,并且Callable需要实现的方法不是run()而是call(),该方法返回一个泛型对象。当我们把一个需要返回值的线程任务放进线程池后,线程池会返回一个Future对象,借助该对象,我们可以调用get()方法获取线程的状态,调用get()会阻塞当前线程直到返回结果。

    创建一个Callable任务如下:

class Task implements Callable<Integer>{
    @Override 
    public Integer call() throws Exception {
        System.out.println("start task");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++){
            sum += i;
        }
        return sum;
    }
}


    想要执行需要转化为Future的子类FutureTask类,注意捕获异常:

Future future =new FutureTask(new Callable<String>(){
    @Override
    public String call(){
        System.out.println("x");
        return "done";
    }
});
try {
    ((FutureTask) future).run();        //x
    System.out.print(future.get());    //done
    future.isDone();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}


线程池创建

    通过ThreadPoolExecutor创建原生的线程池,我们使用Spring框架的线程池:

ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);        //核心线程数
taskExecutor.setMaxPoolSize(30);         //最大线程数
taskExecutor.setQueueCapacity(100);      //任务队列长度
taskExecutor.initialize();


    ThreadPoolTaskExecutor底层其实还是由ThreadPoolExecutor实现的。

    当然除了静态工具类的创建方式还可直接Autowired注入,配置xml:

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="10" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value=30" />
        <!-- 允许的空闲时间, 默认60秒 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 任务队列 -->
        <property name="queueCapacity" value="100" />
        <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 -->
        <property name="allowCoreThreadTimeOut" value="true"/>
        <!-- 对拒绝task的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" />
        </property>
    </bean>


 以下是一个线程池工具类的实例(基于ThreadPoolTaskExecutor实现):

public class ThreadPoolUtil {
    private ThreadPoolUtil(){

    }
    private static Logger log = LoggerFactory.getLogger(ThreadPoolUtil.class);

    private static ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
    static{
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(100);
        //任务队列最大长度
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setKeepAliveSeconds(2000);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
    }

    public static void addTask(Runnable task) {
        taskExecutor.execute(task);
    }
        //Callable任务
    public static Future addReturnedTask(Callable task){
        return taskExecutor.submit(task);
    }

    public static void shutdown() {
        try{
            taskExecutor.shutdown();
        }catch(Exception ex) {
            log.error("关闭ThreadPoolTaskUtil失败", ex);
        }
    }

}


运行机制

工作流程

    线程池调用execute(Runnable task)或是submit(Runnable task)、submit(Callable task)添加任务到线程池中(submit方法执行会返回Future对象,从而可以判断线程执行与否),当当前线程数小于核心线程数时,可以直接创建新线程,当超过核心线程数时,将线程任务加入workQueue中,当workQueue队列排满时,开始创建新线程直到线程池总量执行至超过最大线程数。

    当线程数超过maxPoolSize时,如果仍旧有任务添加进去,会抛出RejectedExecutionException异常(根据拒绝策略会有不同的选择),必须集中处理这种情况。

    当有空闲线程且线程数超过核心线程时,系统会等待时间,超过keepAliveTime便会销毁闲置的线程。

相关参数

    接下来解释一些参数:

corePoolSize:核心线程数,线程数小于该值时,会新建线程执行任务,并保存在缓存队列中

maximumPoolSize:最大线程数,最多同时开启的线程数

keepAliveTime:空闲时间,当超过核心线程数的线程无任务被搁置时,超过这个时间线程就会被回收

workQueue:工作队列,当线程数=核心线程时,会将新的任务放进去,直到队列慢创建新的线程

allowCoreThreadTimeOut:若为true,则线程只要搁置超出空闲时间都会被回收,不管是否大于核心线程

rejectedExecutionHandler:拒绝处理器,用于指定当任务队列满时会如何处理新任务

拒绝策略

    也就是RejectedExecutionHandler,共有四种策略:

  1. ThreadPoolExecutor.DiscardPolicy()    

         任务队列满时,有新的任务时会直接被丢弃

  2. ThreadPoolExecutor.AbortPolicy()

        任务队列满时,有新的任务时将会抛出RejectedExecutionException异常

  3. ThreadPoolExecutor.DiscardOldestPolicy()

        任务队列满时,有新的任务时会丢弃队伍队头任务,然后重新尝试执行

  4. ThreadPoolExecutor.CallerRunsPolicy()

        任务队列满时,会在execute方法的调用线程中运行被拒绝的任务


评论区
评论
{{comment.creator}}
{{comment.createTime}} {{comment.index}}楼
评论

多线程应用提高(II) 线程池

多线程应用提高(II) 线程池

    项目中,当发生并行操作时,一般都会用到线程池处理多线程任务,线程池的规则类似于数据库连接池,在此不予赘述。

jdk自带线程池,此处主要讲述Spring框架自带的线程池ThreadPoolTaskExecutor。

Runnable、Callable和Future

    通过实现RunnableCallable接口实现一个线程任务,从而能放入Executor进行线程管理。其中,Callable可以理解为带有返回值的Runnable,并且Callable需要实现的方法不是run()而是call(),该方法返回一个泛型对象。当我们把一个需要返回值的线程任务放进线程池后,线程池会返回一个Future对象,借助该对象,我们可以调用get()方法获取线程的状态,调用get()会阻塞当前线程直到返回结果。

    创建一个Callable任务如下:

class Task implements Callable<Integer>{
    @Override 
    public Integer call() throws Exception {
        System.out.println("start task");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++){
            sum += i;
        }
        return sum;
    }
}


    想要执行需要转化为Future的子类FutureTask类,注意捕获异常:

Future future =new FutureTask(new Callable<String>(){
    @Override
    public String call(){
        System.out.println("x");
        return "done";
    }
});
try {
    ((FutureTask) future).run();        //x
    System.out.print(future.get());    //done
    future.isDone();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}


线程池创建

    通过ThreadPoolExecutor创建原生的线程池,我们使用Spring框架的线程池:

ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);        //核心线程数
taskExecutor.setMaxPoolSize(30);         //最大线程数
taskExecutor.setQueueCapacity(100);      //任务队列长度
taskExecutor.initialize();


    ThreadPoolTaskExecutor底层其实还是由ThreadPoolExecutor实现的。

    当然除了静态工具类的创建方式还可直接Autowired注入,配置xml:

    <bean id="tmptaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="10" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value=30" />
        <!-- 允许的空闲时间, 默认60秒 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 任务队列 -->
        <property name="queueCapacity" value="100" />
        <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 -->
        <property name="allowCoreThreadTimeOut" value="true"/>
        <!-- 对拒绝task的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" />
        </property>
    </bean>


 以下是一个线程池工具类的实例(基于ThreadPoolTaskExecutor实现):

public class ThreadPoolUtil {
    private ThreadPoolUtil(){

    }
    private static Logger log = LoggerFactory.getLogger(ThreadPoolUtil.class);

    private static ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
    static{
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(100);
        //任务队列最大长度
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setKeepAliveSeconds(2000);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
    }

    public static void addTask(Runnable task) {
        taskExecutor.execute(task);
    }
        //Callable任务
    public static Future addReturnedTask(Callable task){
        return taskExecutor.submit(task);
    }

    public static void shutdown() {
        try{
            taskExecutor.shutdown();
        }catch(Exception ex) {
            log.error("关闭ThreadPoolTaskUtil失败", ex);
        }
    }

}


运行机制

工作流程

    线程池调用execute(Runnable task)或是submit(Runnable task)、submit(Callable task)添加任务到线程池中(submit方法执行会返回Future对象,从而可以判断线程执行与否),当当前线程数小于核心线程数时,可以直接创建新线程,当超过核心线程数时,将线程任务加入workQueue中,当workQueue队列排满时,开始创建新线程直到线程池总量执行至超过最大线程数。

    当线程数超过maxPoolSize时,如果仍旧有任务添加进去,会抛出RejectedExecutionException异常(根据拒绝策略会有不同的选择),必须集中处理这种情况。

    当有空闲线程且线程数超过核心线程时,系统会等待时间,超过keepAliveTime便会销毁闲置的线程。

相关参数

    接下来解释一些参数:

corePoolSize:核心线程数,线程数小于该值时,会新建线程执行任务,并保存在缓存队列中

maximumPoolSize:最大线程数,最多同时开启的线程数

keepAliveTime:空闲时间,当超过核心线程数的线程无任务被搁置时,超过这个时间线程就会被回收

workQueue:工作队列,当线程数=核心线程时,会将新的任务放进去,直到队列慢创建新的线程

allowCoreThreadTimeOut:若为true,则线程只要搁置超出空闲时间都会被回收,不管是否大于核心线程

rejectedExecutionHandler:拒绝处理器,用于指定当任务队列满时会如何处理新任务

拒绝策略

    也就是RejectedExecutionHandler,共有四种策略:

  1. ThreadPoolExecutor.DiscardPolicy()    

         任务队列满时,有新的任务时会直接被丢弃

  2. ThreadPoolExecutor.AbortPolicy()

        任务队列满时,有新的任务时将会抛出RejectedExecutionException异常

  3. ThreadPoolExecutor.DiscardOldestPolicy()

        任务队列满时,有新的任务时会丢弃队伍队头任务,然后重新尝试执行

  4. ThreadPoolExecutor.CallerRunsPolicy()

        任务队列满时,会在execute方法的调用线程中运行被拒绝的任务



多线程应用提高(II) 线程池2020-02-25鱼鱼

{{commentTitle}}

评论   ctrl+Enter 发送评论