- content
前言
总结java线程基础知识
目录
- ThreadPoolExecutor
- Executors
ThreadPoolExecutor
构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
1.几个核心参数概念:
- corePoolSize 池大小,如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务,当线程池中任务数量到达该值时,新的任务会放到队列中
- maximumPoolSize 最大线程数,超过这个数量就拒绝服务
- keepAliveTime 表示线程没有任务执行时最多保持多久会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
- unit 参数keepAliveTime的时间单位,有7种取值
- workQueue一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
- ArrayBlockingQueue(必须指定队列大小)、PriorityBlockingQueue 使用较少
- LinkedBlockingQueue: 基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
- SynchronousQueue: 这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
- threadFactory 线程工厂,主要用来创建线程,不指定则默认使用DefaultThreadFactory(创建线程规则:非守护线程,Thread.NORM_PRIORITY)
- handler表示当拒绝处理任务时的策略,有以下四种取值:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
2.注意事项
- ThreadPoolExecutor将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
- 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,超过数量的任务将被拒绝。
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
- 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
- 在大多数情况下,核心和最大池大小仅基于构造函数来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
- 并不是先加入任务就一定会先执行,假设队列大小为 4,corePoolSize为2,maximumPoolSize为6,那么当加入15个任务时, 执行的顺序类似这样:首先执行任务 1、2,然后任务3~6被放入队列。这时候队列满了,任务7、8、9、10 会被马上执行,而任务 11~15 则会抛出异常。最终顺序是:1、2、7、8、9、10、3、4、5、6。 当然这个过程是针对指定大小的
ArrayBlockingQueue<Runnable>来说,如果是默认的LinkedBlockingQueue,因为该队列无大小限制,maximumPoolSize会无效,队列大小不受控制,会有资源耗尽的风险。 - 最多能执行多少个任务??? maximumPoolSize+队列长度,超过这个数字之后就拒绝接受任务
//简单示例
public static void main(String[] args) {
ThreadPoolExecutor executor=new ThreadPoolExecutor(
2,
6,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(4));
for(int i=0;i<15;i++){
MyRunnable myRunnable = new MyRunnable(i);
executor.execute(myRunnable);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+
",队列中等待执行的任务数目:"+executor.getQueue().size()+
",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
class MyRunnable implements Runnable {
private int taskNum;
public MyRunnable(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
以上代码,最大线程数为6,队列长度为4,可执行6+4=10个任务,如果执行15个任务,最后5个会被拒绝
ThreadFactory
使用ThreadFactory自定义工厂能替代默认的DefaultThreadFactory,DefaultThreadFactory默认创建出来的线程都是用户线程而非守护线程,且线程的优先级都是Thread.NORM_PRIORITY。在自定义工厂里面,我们能创建定制化的Thread,并且计数,或则限制创建Thread的数量,给每个Thread设置对应的好听的名字,或者其他的很多很多事情。
示例:
import java.util.concurrent.ThreadFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
public class MyThreadFactory implements ThreadFactory {
private int counter;
private String name;
private List<String> stats;
public MyThreadFactory(String name) {
counter = 0;
this.name = name;
stats = new ArrayList<>();
}
@Override
public Thread newThread(Runnable run) {
Thread t = new Thread(run, name + "-Thread-" + counter);
counter++;
stats.add(String.format("Created thread %d with name %s on%s\n" ,t.getId() ,t.getName() ,new Date()));
return t;
}
public String getStas() {
StringBuffer buffer = new StringBuffer();
Iterator<String> it = stats.iterator();
while(it.hasNext()) {
buffer.append(it.next());
// buffer.append("\n");
}
return buffer.toString();
}
public static void main(String[] args) {
MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
Task task = new Task();
Thread thread = null;
for(int i = 0; i < 10; i++) {
thread = factory.newThread(task);
thread.start();
}
System.out.printf("Factory stats:\n");
System.out.printf("%s\n",factory.getStas());
}
}
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程状态定义
private static final int RUNNING = -1 << COUNT_BITS;
//如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
private static final int SHUTDOWN = 0 << COUNT_BITS;
//如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
//当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态
private static final int TERMINATED = 3 << COUNT_BITS;
Executors创建4种线程池
在java doc中,并不提倡我们直接使用ThreadPoolExecutor来创建线程池,而是使用Executors类中提供的几个静态方法来创建线程池(底层其实还是调用ThreadPoolExecutor来创建线程池),实际项目中手动创建ThreadPoolExecutor,指定参数会更好,理由是写代码的人可以更清楚线程池的运行规则,避免资源耗尽的风险
Executors各个方法的弊端:
1)
newFixedThreadPool和newSingleThreadExecutor主要的问题是队列长度不受控制,可能会有OOM风险 2)newCachedThreadPool和newScheduledThreadPool主要问题是maximumPoolSize=Integer.MAX_VALUE,可能会有OOM风险
//1. 创建一个可缓存线程池,应用中存在的线程数可以无限大
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
System.out.println("****************************newCachedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newCachedThreadPool.execute(new FourThreadPoolsRunnableImpl(index));
}
//2. 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,corePoolSize和maximumPoolSize值相等
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newFixedThreadPool.execute(new FourThreadPoolsRunnableImpl(index));
}
//3. 创建一个定长线程池,支持定时及周期性任务执行
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
//延迟三秒执行
newScheduledThreadPool.schedule(new FourThreadPoolsRunnableImpl(index),3, TimeUnit.SECONDS);
}
// dubbo的延迟暴露服务特性就是通过这种方式来实现的
//**************************************************************************************************************
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
//**************************************************************************************************************
//4. 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,corePoolSize和maximumPoolSize都被设置为1
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newSingleThreadExecutor.execute(new FourThreadPoolsRunnableImpl(index));
}
/*newCachedThreadPool.shutdown();
newFixedThreadPool.shutdown();
newScheduledThreadPool.shutdown();
newSingleThreadExecutor.shutdown();*/
//执行的任务类
public class FourThreadPoolsRunnableImpl implements Runnable{
private Integer index;
public FourThreadPoolsRunnableImpl(Integer index)
{
this.index=index;
}
@Override
public void run() {
try {
System.out.println("开始处理线程,index="+index);
Thread.sleep(3000);
System.out.println("执行完毕,index="+index+" 线程标识:"+this.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
实际项目使用
//定义线程安全线程池
private static AtomicReference<ExecutorService> serviceRef = new AtomicReference();
//第一步,初始化线程池
public static void init(String maxPoolSizeStr) {
if (serviceRef.get() == null) {
synchronized(ThreadPoolExecutorDemo.class) {
if (serviceRef.get() == null) {
int maxPoolSize = 50;
if (maxPoolSizeStr != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeStr);
}
//设定固定大小的线程池, 默认线程池大小50
serviceRef.set(new ThreadPoolExecutor(maxPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()));
}
}
}
}
//第二步,使用线程池执行callable任务, 返回future结果集
public static <T> Map<String, Future<T>> execute(){
Map<String, Future<T>> resultFutures = new HashMap();
Map<String, Callable<T>> tasks = createTasks();
Iterator i$ = tasks.keySet().iterator();
while(i$.hasNext()) {
String key = (String)i$.next();
resultFutures.put(key, ((ExecutorService)serviceRef.get()).submit((Callable)tasks.get(key)));
}
return resultFutures;
}
private static <T> Map<String,Callable<T>> createTasks() {
//示例创建多个待执行的callable任务,供线程池执行
Map<String,Callable<T>> taskMap = new HashMap<>();
taskMap.put("task1", new MyCallable(1));
taskMap.put("task2",new MyCallable(2));
return taskMap;
}
//第三步, 提供使用完成之后停止线程池的方法
public static void shutdown() {
if (serviceRef.get() != null) {
synchronized(ThreadPoolExecutorDemo.class) {
if (serviceRef.get() != null) {
((ExecutorService)serviceRef.get()).shutdown();
serviceRef.set(null);
}
}
}
}