Java每日一讲线程池的总结

  • 2021-12-10
  • Admin

目录

?博主介绍

? 个人主页:苏州凯捷智能科技有限公司

? 个人社区:CSDN全国各地程序猿

?作者介绍:苏州凯捷智能科技有限公司创始人,主要目前与华为合作5G工业机器人领域开发,2D、3D视觉项目开发,政府项目投标开发,

?如果文章对你有帮助,欢迎关注、点赞、收藏(一键三连)

?️ 承接软件APP、小程序、网站等开发重点行业应用开发(SaaS、PaaS、CRM、HCM、银行核心系统、监管报送平台、系统搭建、人工智能助理)、大数据平台开发、商业智能、App开发、ERP、云平台、智能终端、产品化解决方案。测试软件产品测试、应用软件测试、测试平台及产品、测试解决方案。运维数据库维护(SQL Server 、Oracle、MySQL)、 操作系统维护(Windows、Linux、Unix等常用系统)、 服务器硬件设备维护、网络设备维护、 运维管理平台等。运营服务IT咨询 、IT服务、业务流程外包(BPO)、云/基础设施的管理、线上营销、数据采集与标注、内容管理和营销、设计服务、本地化、智能客服、大数据分析等。

? 有任何问题欢迎私信,看到会及时回复

? 微信号:stbsl6,微信公众号:苏州程序大白

? 想加入技术交流群的可以加我好友,群里会分享学习资料

前言

线程池: 简单理解,它就是一个管理线程的池子。

线程池的优点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。

提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的总结

工作流程

图解线程池流程先来一张通俗易懂的:
在这里插入图片描述
文字理解:工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。

将其翻译成线程池理解:

在这里插入图片描述

关键参数

线程池关键参数的理解(在图中可以看到):

  • corePoolSize(必需):核心线程数。即池中一直保持存活的线程数,即使这些线程处于空闲。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。

  • maximumPoolSize(必需):池中允许的最大线程数。当核心线程全部繁忙且任务队列打满之后,线程池会临时追加线程,直到总线程数达到maximumPoolSize这个上限。

  • keepAliveTime(必需):线程空闲超时时间。当非核心线程处于空闲状态的时间超过这个时间后,该线程将被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。

  • unit(必需):keepAliveTime参数的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)。

  • workQueue(必需):任务队列,采用阻塞队列实现。当核心线程全部繁忙时,后续由execute方法提交的Runnable将存放在任务队列中,等待被线程处理。

  • threadFactory(可选):线程工厂。指定线程池创建线程的方式。

  • handler(可选):拒绝策略。当线程池中线程数达到maximumPoolSize且workQueue打满时,后续提交的任务将被拒绝,handler可以指定用什么方式拒绝任务。

图解参数之间的关系:
在这里插入图片描述

工作原理

看完工作流程后再来看下线程池工作原理(图解):
在这里插入图片描述

1、AbortPolicy(抛出一个异常,默认的)。

2、DiscardPolicy(直接丢弃任务)。

3、DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)。

4、CallerRunsPolicy(交给线程池调用所在的线程进行处理)。

任务队列

ArrayBlockingQueue

  • ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

LinkedBlockingQueue

  • LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列。

DelayQueue

  • DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

  • PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

SynchronousQueue

  • SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

创建方式

创建线程池的方式:

1、通过ThreadPoolExecutor构造函数实现(推荐)。

2、通过 Executor 框架的工具类 Executors 来实现,总共有三种类型(方法内部也是调用了ThreadPoolExecutor的构造方法)。

ThreadPoolExecutor

《阿里巴巴 Java 开发手册》“并发处理”章节中,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。(具体原因可参照线程池的优点)
并且强制要求,不允许使用 Executors 去快捷创建线程池,而是通过 ThreadPoolExecutor 构造函数的方式,除了避免 OOM 的原因外,这样的处理方式可以让我们更加明确线程池的运行规则,规避资源耗尽的风险。

也就是说使用有界队列,控制线程创建数量。因为实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。我们应该显示地给我们的线程池命名,有助于后期快速定位问题。

Executors 返回线程池对象的弊端:
FixedThreadPool 和 SingleThreadExecutor :允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM(java.lang.OutOfMemoryError:内存用完了)。
CachedThreadPool 和ScheduledThreadPool:允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

  • 1
  • 2
  • 3
  • 4

示例:一个简单的for遍历线程

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class App 
{
    //corePoolSize: 定义核心线程数为 3
    private static final Integer CORE_POOL_SIZE = 3;
    //maximumPoolSize:定义最大线程数为 9
    private static final Integer MAX_POOL_SIZE = 9;
    //workQueue:定义任务队列为 ArrayBlockingQueue,并且容量为 90
    private static final Integer QUEUE_CAPACITY = 90;
    //keepAliveTime: 定义等待时间为 1
    private static final Long KEEP_ALIVE_TIME = 1L;


    public static void main(String[] args) {

        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,//unit: 等待时间的单位为 TimeUnit.SECONDS
                new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), //workQueue:定义任务队列为 ArrayBlockingQueue
                new ThreadPoolExecutor.CallerRunsPolicy());//handler:定义拒绝策略为 CallerRunsPolicy


        // 模拟向线程池提交(6个线程)任务
        for (int i = 0; i < 6; i++) {

            //执行线程;这里直接使用的Runnable匿名对象;也可以自行创建Runnable或Callable的实现类
            executor.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     Long start = System.currentTimeMillis(); //获取线程开始执行时间
                                     //这里随便写了一个遍历功能的线程
                                     for (int numValue = 0; numValue <= 2; numValue++) {

                                         System.out.println(Thread.currentThread().getName() + ":" + numValue + " " + new Date(System.currentTimeMillis()));
                                         try {
                                             Thread.sleep(500);
                                         } catch (InterruptedException e) {
                                             e.printStackTrace();
                                         }
                                     }
                                     System.out.println(Thread.currentThread().getName() + ":当前线程任务结束,总共花费时间:" + (System.currentTimeMillis() - start) / 1000 + "秒");
                                 }
                             }

            );
        }

        //终止线程池
        executor.shutdown(); //设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
        // executor.shutdownNow(); //(慎用)设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
        while (!executor.isTerminated()) {
            //使用isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,必须在shutdown()方法关闭线程池之后才能使用,
            //否则isTerminated()永远为false,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
        }
        System.out.println("所有线程都已结束");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

运行结果:

pool-1-thread-2:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-1:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-3:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-2:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
所有线程都已结束
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

运行结果分析:

该示例中总共向线程池提交了6个线程任务,但是该线程池的核心线程只有3个,所以根据示例中定义的拒绝策略和任务队列,剩下的3个线程在任务队列中,根据打印结果可以看到,当前面的3个线程分别执行完毕时,空出的核心线程会被剩下的3个线程任务拿到,然后继续执行剩下的线程任务。

Executors

FixedThreadPool

创建可重用固定线程数的线程池:

  • FixedThreadPool只有核心线程,且数量固定,没有非核心线程。

  • keepAliveTime设置为0L,代表多余的线程会被立即终止;即没有非空闲时间;因为不会产生多余的线程,所以keepAliveTime是无效的参数。

  • 任务队列为无界的阻塞队列LinkedBlockingQueue(容量默认为Integer.MAX_VALUE)。

在这里插入图片描述
1、提交任务。

2、如果线程数少于核心线程,创建核心线程执行任务。

3、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列。

4、如果线程执行完任务,去阻塞队列取任务,继续执行。

ExecutorService executor = Executors.newFixedThreadPool(10);
                 for (int i = 0; i < Integer.MAX_VALUE; i++) {
                     executor.execute(()->{
                         try {
                             Thread.sleep(1000);
                         } catch (InterruptedException e) {
                             //do nothing
                         }
         		});

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

使用场景:

适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

缺点:

1、当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。

2、由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。

3、由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。

4、运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

CachedThreadPool

一个根据需要创建线程的线程池。

核心线程数为0

最大线程数为Integer.MAX_VALUE

阻塞队列是SynchronousQueue

非核心线程空闲存活时间为60秒

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>(),
                                   threadFactory);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述
1、提交任务。

2、因为没有核心线程,所以任务直接加到SynchronousQueue队列。

3、判断是否有空闲线程,如果有,就去取出任务执行。

4、如果没有空闲线程,就新建一个线程执行。

5、执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则被销毁。

使用场景:

因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。

缺点:

CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

SingleThreadExecutor

只有一个线程的线程池。

  • 核心线程数为1。

  • 最大线程数也为1。

  • 阻塞队列是LinkedBlockingQueue。

  • keepAliveTime为0。

ExecutorService executor = Executors.newSingleThreadExecutor();
              for (int i = 0; i < 5; i++) {
                  executor.execute(() -> {
                      System.out.println(Thread.currentThread().getName()+"正在执行");
                  });
   		   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述
1、提交任务。

2、线程池是否有一条线程在,如果没有,新建线程执行任务。

3、如果有,讲任务加到阻塞队列。

4、当前的唯一线程,从队列取任务,执行完一个,再继续取,如此循环。

使用场景:

适用于串行执行任务的场景,一个任务一个任务地执行。

缺点:

SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的任务队列(队列的容量为 Intger.MAX_VALUE)。

SingleThreadExecutor 使用无界队列作为线程池的任务队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM。

常用方法

提交

**execute()**方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;

**submit()**方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

AbstractExecutorService接口中的一个 submit 方法为例:

public Future<?> submit(Runnable task) {
        if (task == null) thrownew NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

上面方法调用的 newTaskFor()方法返回了一个 FutureTask 对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}

  • 1
  • 2
  • 3
  • 4

对比execute()方法:

public void execute(Runnable command) {
      ...
}
  • 1
  • 2
  • 3

关闭

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。

  • shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List;该方法要慎用,容易造成不可控的后果。

判断

  • isShutDown 当调用 shutdown()方法后,返回为 true,否则为false。

  • isTerminated 当调用shutdown() 方法后,并且所有提交的任务完成后返回为 true,否则为false。

状态切换

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务。

  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态。

  • 调用线程池的shutdownNow()方法,可以切换到STOP状态。

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务。

  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态。

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务。

线程池中执行的任务为空,进入TIDYING状态。

TIDYING

该状态表明所有的任务已经运行终止,记录的任务数量为0。

terminated()执行完毕,进入TERMINATED状态。

TERMINATED

  • 该状态表示线程池彻底终止。

配置线程数

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

?️ 承接软件APP、小程序、网站等开发重点行业应用开发(SaaS、PaaS、CRM、HCM、银行核心系统、监管报送平台、系统搭建、人工智能助理)、大数据平台开发、商业智能、App开发、ERP、云平台、智能终端、产品化解决方案。测试软件产品测试、应用软件测试、测试平台及产品、测试解决方案。运维数据库维护(SQL Server 、Oracle、MySQL)、 操作系统维护(Windows、Linux、Unix等常用系统)、 服务器硬件设备维护、网络设备维护、 运维管理平台等。运营服务IT咨询 、IT服务、业务流程外包(BPO)、云/基础设施的管理、线上营销、数据采集与标注、内容管理和营销、设计服务、本地化、智能客服、大数据分析等。

? 有任何问题欢迎私信,看到会及时回复

? 微信号:stbsl6,微信公众号:苏州程序大白

❤️关注苏州程序大白公众号❤️

? ??

原文:https://blog.csdn.net/weixin_39934361/article/details/121849198

联系站长

QQ:769220720