Java线程池
1、自定义线程池
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?在Java中可以通过线程池来达到这样的效果。 下面先自定义一个线程池,关键图如下:
①编写ThreadPool和BlockingQueue类代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
//时间单位
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
//执行任务
public void execute(Runnable task) {
//当任务数没有超过核心线程数时,直接交给worker执行
//如果任务数超过核心线程数,则加入任务队列暂存
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker:{},{}", worker, task);
workers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
public void run() {
//执行任务
//当task不为空则执行任务
//当task执行完毕则接着从任务队列获取任务并执行
while (task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行:{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker被移除:{}", this);
workers.remove(this);
}
}
}
}
class BlockingQueue<T> {
//任务队列
private Deque<T> queue = new ArrayDeque<>();
//锁
private ReentrantLock lock = new ReentrantLock();
//生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
//阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列:{}",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列:{}",element);
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
//获取大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}②接着在主函数里创建5个任务到线程池执行:
1
2
3
4
5
6
7
8
9
10
11
12
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("第{}个任务执行完毕", j+1);
});
}
}
}执行结果发现5个任务都被线程池中的线程执行完毕了,接着程序就进入了干等的状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1515:39:54.115 org.example.ThreadPool [main] - 新增worker:Thread[Thread-0,5,main],org.example.TestPool$$Lambda$1/706277948@4563e9ab
15:39:54.115 org.example.ThreadPool [main] - 新增worker:Thread[Thread-1,5,main],org.example.TestPool$$Lambda$1/706277948@66d33a
15:39:54.115 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@7e0babb1
15:39:54.115 org.example.ThreadPool [Thread-0] - 正在执行:org.example.TestPool$$Lambda$1/706277948@4563e9ab
15:39:54.115 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@6debcae2
15:39:54.115 org.example.TestPool [Thread-0] - 第1个任务执行完毕
15:39:54.115 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@5ba23b66
15:39:54.115 org.example.ThreadPool [Thread-1] - 正在执行:org.example.TestPool$$Lambda$1/706277948@66d33a
15:39:54.115 org.example.ThreadPool [Thread-0] - 正在执行:org.example.TestPool$$Lambda$1/706277948@7e0babb1
15:39:54.115 org.example.TestPool [Thread-1] - 第2个任务执行完毕
15:39:54.115 org.example.TestPool [Thread-0] - 第3个任务执行完毕
15:39:54.115 org.example.ThreadPool [Thread-1] - 正在执行:org.example.TestPool$$Lambda$1/706277948@6debcae2
15:39:54.115 org.example.ThreadPool [Thread-0] - 正在执行:org.example.TestPool$$Lambda$1/706277948@5ba23b66
15:39:54.115 org.example.TestPool [Thread-1] - 第4个任务执行完毕
15:39:54.115 org.example.TestPool [Thread-0] - 第5个任务执行完毕③为了不让无任务时程序进入死等,将take()方法换成带有超时时间的poll()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
//返回的是剩余等待时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}更改Worker类相应的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
public void run() {
//执行任务
//当task不为空则执行任务
//当task执行完毕则接着从任务队列获取任务并执行
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null) {
try {
log.debug("正在执行:{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker被移除:{}", this);
workers.remove(this);
}
}
}执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1915:41:32.536 org.example.ThreadPool [main] - 新增worker:Thread[Thread-0,5,main],org.example.TestPool$$Lambda$1/706277948@4563e9ab
15:41:32.536 org.example.ThreadPool [main] - 新增worker:Thread[Thread-1,5,main],org.example.TestPool$$Lambda$1/706277948@66d33a
15:41:32.536 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@7e0babb1
15:41:32.536 org.example.ThreadPool [Thread-0] - 正在执行:org.example.TestPool$$Lambda$1/706277948@4563e9ab
15:41:32.536 org.example.TestPool [Thread-0] - 第1个任务执行完毕
15:41:32.536 org.example.ThreadPool [Thread-1] - 正在执行:org.example.TestPool$$Lambda$1/706277948@66d33a
15:41:32.536 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@6debcae2
15:41:32.536 org.example.TestPool [Thread-1] - 第2个任务执行完毕
15:41:32.536 org.example.ThreadPool [Thread-1] - 正在执行:org.example.TestPool$$Lambda$1/706277948@7e0babb1
15:41:32.536 org.example.TestPool [Thread-1] - 第3个任务执行完毕
15:41:32.536 org.example.ThreadPool [Thread-1] - 正在执行:org.example.TestPool$$Lambda$1/706277948@6debcae2
15:41:32.536 org.example.TestPool [Thread-1] - 第4个任务执行完毕
15:41:32.536 org.example.BlockingQueue [main] - 加入任务队列:org.example.TestPool$$Lambda$1/706277948@5ba23b66
15:41:32.536 org.example.ThreadPool [Thread-0] - 正在执行:org.example.TestPool$$Lambda$1/706277948@5ba23b66
15:41:32.536 org.example.TestPool [Thread-0] - 第5个任务执行完毕
15:41:33.542 org.example.ThreadPool [Thread-1] - worker被移除:Thread[Thread-1,5,main]
15:41:33.542 org.example.ThreadPool [Thread-0] - worker被移除:Thread[Thread-0,5,main]
Process finished with exit code 0④当任务数很多且执行任务都需要花费大量时间时,程序就会在向阻塞队列中加任务处干等,这时可以将put()换成带超时时间的阻塞添加方法offer():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public boolean offer(T element,long timeout,TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列:{}", element);
if(nanos<=0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列:{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}⑤而我们希望当线程太忙而无法执行新提交的任务时具体的执行策略由调用者决定,可以运用到策略模式,即将具体的操作抽象成接口,这里在线程池类中新建一个拒绝策略接口,修改后全部代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
//死等策略
queue.put(task);
//带超时等待
queue.offer(task, 500, TimeUnit.MILLISECONDS);
//让调用者放弃任务执行
log.debug("放弃任务:{}", task);
//让调用者自己执行任务
task.run();
//让调用者抛出异常
throw new RuntimeException("任务执行失败:" + task);
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("第{}个任务执行完毕", j + 1);
});
}
}
}
class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
//时间单位
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}
//执行任务
public void execute(Runnable task) {
//当任务数没有超过核心线程数时,直接交给worker执行
//如果任务数超过核心线程数,则加入任务队列暂存
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker:{},{}", worker, task);
workers.add(worker);
worker.start();
} else {
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
public void run() {
//执行任务
//当task不为空则执行任务
//当task执行完毕则接着从任务队列获取任务并执行
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行:{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker被移除:{}", this);
workers.remove(this);
}
}
}
}
class BlockingQueue<T> {
//任务队列
private Deque<T> queue = new ArrayDeque<>();
//锁
private ReentrantLock lock = new ReentrantLock();
//生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
//阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
//返回的是剩余等待时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列:{}", element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列:{}", element);
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public boolean offer(T element, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列:{}", element);
if (nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列:{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//获取大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if (queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else {
log.debug("加入任务队列:{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
2、ThreadPoolExecutor
- java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。 与此类有关的继承树如下图:
2.1 线程池状态
ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。
- 从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING。
- 这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值。
2.2 工作方式
ThreadPoolExecutor类的其中一个构造方法如下:
1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间 - 针对救急线程
- unit 时间单位 - 针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂 - 可以为线程创建时起个好名字
- handler 拒绝策略
根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池。
工作方式(假设corePoolSize=2,maximumPoolSize=3):
①线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
②当线程数达到corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程。
③如果队列选择了有界队列,那么任务超过了队列大小时,会创建(maximumPoolSize-corePoolSize)数目的线程来救急。如果救急线程执行完毕会被销毁(或者可以在构造方法处设置生存时间),而核心线程不会。
④如果线程到达maximumPoolSize仍然有新任务这时会执行拒绝策略。拒绝策略jdk提供了4种实现,其它著名框架也提供了实现。
- AbortPolicy让调用者抛出RejectedExecutionException异常,这是默认策略。
- CallerRunsPolicy让调用者运行任务。
- DiscardPolicy放弃本次任务。
- DiscardOldestPolicy放弃队列中最早的任务,本任务取而代之。
- Dubbo的实现,在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题。
- Netty的实现,是创建一个新线程来执行任务。
- ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略。
- PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略。
⑤当高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime和unit来控制。
2.3 固定大小线程池
通过调用Executors.newFixedThreadPool()来创建固定大小线程池。
1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}特点
- 核心线程数==最大线程数(没有救急线程被创建),因此也无需超时时间。
- 阻塞队列是无界的,可以放任意数量的任务。
- 适用于任务量已知,相对耗时的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestExecutors {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "mypool_t" + t.getAndIncrement());
}
});
pool.execute(() -> {
log.debug("1");
});
pool.execute(() -> {
log.debug("2");
});
pool.execute(() -> {
log.debug("3");
});
}
}执行结果:
1
2
3
421:26:10.886 org.example.TestExecutors [mypool_t2] - 2
21:26:10.886 org.example.TestExecutors [mypool_t1] - 1
21:26:10.888 org.example.TestExecutors [mypool_t2] - 3
…………
2.4 带缓冲线程池
通过调用Executors.newCachedThreadPool()来创建带缓冲线程池。
1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}特点:
- 核心线程数是0, 最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
- 全部都是救急线程(60s后可以回收)。
- 救急线程可以无限创建。
- 队列采用了SynchronousQueue,实现特点是它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)。
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
- 适合任务数比较密集,但每个任务执行时间较短的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class TestExecutors {
public static void main(String[] args) {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
}
}执行结果:
1
2
3
4
5
621:36:02.002 org.example.TestExecutors [t1] - putting 1
21:36:03.017 org.example.TestExecutors [t2] - taking 1
21:36:03.017 org.example.TestExecutors [t1] - 1 putted...
21:36:03.017 org.example.TestExecutors [t1] - putting...2
21:36:04.028 org.example.TestExecutors [t3] - taking 2
21:36:04.028 org.example.TestExecutors [t1] - 2 putted...- 核心线程数是0, 最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
2.5 单线程线程池
通过调用Executors.newSingleThreadExecutor()来创建单线程线程池。
1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}使用场景:
- 希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestExecutors {
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
log.debug("1");
int i = 1 / 0;
});
pool.execute(() -> {
log.debug("2");
});
pool.execute(() -> {
log.debug("3");
});
}
}运行结果:
1
2
3
4
5
6
7
821:42:25.934 org.example.TestExecutors [pool-1-thread-1] - 1
21:42:25.942 org.example.TestExecutors [pool-1-thread-2] - 2
21:42:25.942 org.example.TestExecutors [pool-1-thread-2] - 3
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at org.example.TestExecutors.lambda$main$0(TestExecutors.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Executors.newSingleThreadExecutor()线程个数始终为1,不能修改。
- FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法。
Executors.newFixedThreadPool(1)初始时为1,以后还可以修改。
- 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改。
2.6 任务调度线程池
在『任务调度线程池』功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestTimer {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
public void run() {
log.debug("task 1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask task2 = new TimerTask() {
public void run() {
log.debug("task 2");
}
};
log.debug("start...");
//使用 timer 添加两个任务,希望它们都在 1s 后执行。
//但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行。
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}执行结果:
1
2
310:59:26.108 org.example.TestTimer [main] - start...
10:59:27.109 org.example.TestTimer [Timer-0] - task 1
10:59:29.109 org.example.TestTimer [Timer-0] - task 2使用ScheduledExecutorService改写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestScheduled {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
pool.schedule(() -> {
log.debug("task1");
int i = 1 / 0;
}, 1, TimeUnit.SECONDS);
pool.schedule(() -> {
log.debug("task2");
}, 1, TimeUnit.SECONDS);
}
}执行结果如下,可见第一个线程出错并不会影响第二个线程的执行,但是如果线程池中只有一个线程的话仍然会串行执行:
1
211:03:36.422 org.example.TestScheduled [pool-1-thread-1] - task1
11:03:36.422 org.example.TestScheduled [pool-1-thread-2] - task2
测试以固定间隔执行任务(使用scheduleAtFixedRate)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@Slf4j
public class TestScheduled {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
}
}执行结果如下,一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被增大到了2s。
1
2
3
4
5
6
711:07:38.528 org.example.TestScheduled [main] - start...
11:07:39.573 org.example.TestScheduled [pool-1-thread-1] - running...
11:07:41.583 org.example.TestScheduled [pool-1-thread-1] - running...
11:07:43.594 org.example.TestScheduled [pool-1-thread-1] - running...
11:07:45.603 org.example.TestScheduled [pool-1-thread-1] - running...
Process finished with exit code -1测试每个任务间隔多少时间执行(使用scheduleWithFixedDelay)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestScheduled {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
log.debug("running...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
}
}执行结果如下,一开始,延时1s,scheduleWithFixedDelay的间隔是距离上一个任务结束的延时。
1
2
3
4
5
6
711:10:45.078 org.example.TestScheduled [main] - start...
11:10:46.124 org.example.TestScheduled [pool-1-thread-1] - running...
11:10:49.128 org.example.TestScheduled [pool-1-thread-1] - running...
11:10:52.146 org.example.TestScheduled [pool-1-thread-1] - running...
11:10:55.163 org.example.TestScheduled [pool-1-thread-1] - running...
Process finished with exit code -1
2.7 提交任务
1 | // 执行任务 |
测试submit方法。
1
2
3
4
5
6
7
8
9
10
11
12
public class TestExecutors {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<String> future = pool.submit(() -> {
log.debug("running");
Thread.sleep(1000);
return "ok";
});
log.debug("{}", future.get());
}
}执行结果:
1
222:19:05.037 org.example.TestExecutors [pool-1-thread-1] - running
22:19:06.044 org.example.TestExecutors [main] - ok测试invokeAll方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestExecutors {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
futures.forEach( f -> {
try {
log.debug("{}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}执行结果:
1
2
3
4
5
622:21:55.613 org.example.TestExecutors [pool-1-thread-1] - begin
22:21:56.615 org.example.TestExecutors [pool-1-thread-1] - begin
22:21:57.118 org.example.TestExecutors [pool-1-thread-1] - begin
22:21:59.133 org.example.TestExecutors [main] - 1
22:21:59.133 org.example.TestExecutors [main] - 2
22:21:59.133 org.example.TestExecutors [main] - 3测试invokeAny方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestExecutors {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
String result = pool.invokeAny(Arrays.asList(
() -> {
log.debug("begin 1");
Thread.sleep(1000);
log.debug("end 1");
return "1";
},
() -> {
log.debug("begin 2");
Thread.sleep(500);
log.debug("end 2");
return "2";
},
() -> {
log.debug("begin 3");
Thread.sleep(2000);
log.debug("end 3");
return "3";
}
));
log.debug("{}", result);
}
}执行结果:
1
2
3
4
522:29:15.324 org.example.TestExecutors [pool-1-thread-2] - begin 2
22:29:15.324 org.example.TestExecutors [pool-1-thread-3] - begin 3
22:29:15.324 org.example.TestExecutors [pool-1-thread-1] - begin 1
22:29:15.835 org.example.TestExecutors [pool-1-thread-2] - end 2
22:29:15.835 org.example.TestExecutors [main] - 2
2.8 关闭线程池
调用shutdown方法。
- 线程池状态变为SHUTDOWN。
- 不会接收新任务。
- 已提交任务会执行完。
- 此方法不会阻塞调用线程的执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestShutdown {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> result1 = pool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});
Future<Integer> result2 = pool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});
Future<Integer> result3 = pool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});
log.debug("shutdown");
pool.shutdown();
pool.awaitTermination(3, TimeUnit.SECONDS);
log.debug("other....");
}
}测试结果:
1
2
3
4
5
6
7
8
9
1010:43:53.061 org.example.TestShutdown [main] - shutdown
10:43:53.061 org.example.TestShutdown [pool-1-thread-1] - task 1 running...
10:43:53.061 org.example.TestShutdown [pool-1-thread-2] - task 2 running...
10:43:54.062 org.example.TestShutdown [pool-1-thread-2] - task 2 finish...
10:43:54.062 org.example.TestShutdown [pool-1-thread-1] - task 1 finish...
10:43:54.062 org.example.TestShutdown [pool-1-thread-2] - task 3 running...
10:43:55.076 org.example.TestShutdown [pool-1-thread-2] - task 3 finish...
10:43:55.076 org.example.TestShutdown [main] - other....
Process finished with exit code 0
调用shutdownNow方法。
- 线程池状态变为STOP。
- 不会接收新任务。
- 会将队列中的任务返回。
- 并用interrupt的方式中断正在执行的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestShutdown {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> result1 = pool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});
Future<Integer> result2 = pool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});
Future<Integer> result3 = pool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});
log.debug("shutdown");
List<Runnable> runnables = pool.shutdownNow();
log.debug("other.... {}" , runnables);
}
}测试结果:
1
2
3
4
5
610:47:26.753 org.example.TestShutdown [main] - shutdown
10:47:26.753 org.example.TestShutdown [pool-1-thread-2] - task 2 running...
10:47:26.753 org.example.TestShutdown [pool-1-thread-1] - task 1 running...
10:47:26.754 org.example.TestShutdown [main] - other.... [java.util.concurrent.FutureTask@19bb089b]
Process finished with exit code 0
其它方法
1
2
3
4
5
6// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
2.9 Tomcat线程池
tomcat其中一个重要组成部分是Connector,如下图:
- LimitLatch用来限流,可以控制最大连接个数,类似 J.U.C 中的Semaphore。
- Acceptor只负责【接收新的socket连接】。
- Poller只负责监听socket channel是否有【可读的I/O事件】。
- 一旦可读,封装一个任务对象(socketProcessor),提交给Executor线程池处理。
- Executor线程池中的工作线程最终负责【处理请求】。
Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同。
- 如果总线程数达到maximumPoolSize:
- 这时不会立刻抛RejectedExecutionException异常。
- 而是再次尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException异常。
- 如果总线程数达到maximumPoolSize:
Connector配置:
Executor线程配置:
2.10 Fork/Join线程池
Fork/Join是JDK 1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的cpu密集型运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork/Join默认会创建与cpu核心数大小相同的线程池。
提交给Fork/Join线程池的任务需要继承 RecursiveTask(有返回值)或RecursiveAction(没有返回值),例如下面定义了一个对1~n之间的整数求和的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TestForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(5)));
}
}
//计算1~n之间整数的和
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
public String toString() {
return "{" + n + '}';
}
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// 将任务进行拆分(fork)
MyTask t1 = new MyTask(n - 1);
t1.fork();//让一个线程去执行此任务
log.debug("fork() {} + {}", n, t1);
// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}执行结果:
1
2
3
4
5
6
7
8
9
10
11
1221:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-0] - fork() 2 + {1}
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-2] - fork() 4 + {3}
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-3] - fork() 3 + {2}
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-1] - fork() 5 + {4}
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-0] - join() 1
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
21:41:44.699 org.example.MyTask [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15
Process finished with exit code 0用图表示如下:
任务拆分优化代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class TestForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(1, 5)));
}
}
class MyTask extends RecursiveTask<Integer> {
int begin;
int end;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
public String toString() {
return "{" + begin + "," + end + '}';
}
protected Integer compute() {
// 5, 5
if (begin == end) {
log.debug("join() {}", begin);
return begin;
}
// 4, 5
if (end - begin == 1) {
log.debug("join() {} + {} = {}", begin, end, end + begin);
return end + begin;
}
// 1 5
int mid = (end + begin) / 2; // 3
MyTask t1 = new MyTask(begin, mid); // 1,3
t1.fork();
MyTask t2 = new MyTask(mid + 1, end); // 4,5
t2.fork();
log.debug("fork() {} + {} = ?", t1, t2);
int result = t1.join() + t2.join();
log.debug("join() {} + {} = {}", t1, t2, result);
return result;
}
}执行结果:
1
2
3
4
5
6
7
8
9
1021:43:15.579 org.example.MyTask [ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
21:43:15.579 org.example.MyTask [ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
21:43:15.579 org.example.MyTask [ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
21:43:15.579 org.example.MyTask [ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
21:43:15.587 org.example.MyTask [ForkJoinPool-1-worker-3] - join() 3
21:43:15.587 org.example.MyTask [ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
21:43:15.587 org.example.MyTask [ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
15
Process finished with exit code 0用图表示如下: