线程安全

线程安全性

“共享” 意味着变量可以由多个线程同时访问,而“可变”则意味着变量的值在其生命周期内可以发生变化

Java的主要同步机制是关键字synchronized,它提供了一种独占的加锁方式,但同步这个术语还包括volatile类型的变量、显示锁以及原子变量

如果当多个线程访问同一个可变的状态变量时没有采用适合的同步,那么程序可能出现错误,有如下三种方式可以修复此问题

  • 不在线程之间共享变量。
  • 将状态变量修改为不可变。
  • 在访问状态变量时使用同步

什么是线程安全性

什么是线程安全性

  1. 可以在多个线程中调用,并且在线程之间不会出现错误的交互
  2. 可以同时被多个线程调用,而调用者无需执行额外的操作

线程安全性的定义中,最核心的概念就是正确性,正确性的含义是:某个类的行为与其规范完全一致,在良好的规范中通常定义各种不变性条件来约束对象的状态,以及定义各种后验条件来描述对象操作的结果

当多个对象访问某个类时,不管运行环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的

线程安全的类中封装了必要的同步机制,因此客户端无需采用进一步的同步措施

==无状态对象是指==: 一个对象不即不包括任何域,也不包括任何对其他类中域的引用,无状态类一定是线程安全的

原子性

当我们在无状态对象中增加一个状态或发布可变对象时,如果状态未使用额外的同步机制,那么这个对象将是线程不安全的

竟态条件

==概念==: 当某个计算的正确性取决于多个线程的交替执行时序时,那么就会发生竟态条件,最常见的竟态条件是:先检查后执行的操作,即通过一个可能执行的观察结果来取决于下一步的动作
先检查后执行的一种常见情况就是:延迟初始化

==数据竞争==: 如果在访问共享的非final类型的域时没有采用同步来进行协同,那么就会出现数据竞争

复合操作

要避免竟态条件问题,就必须在某个线程修改变量时,通过某种方式防止其他线程使用这个变量,从而确保其他线程只能在修改操作完成之前或之后读取和修改状态,而不是在修改状态的过程中

假设有两个操作A和B,如果从执行A的线程来看,当另一个线程执行B时,要么将B执行完毕,要么完全不执行B,那么A和B对彼此来说是原子的,原子操作是指,对于访问同一个状态的所有操作(包括操作本身)来说,这个操作是一个原子方式执行的操作

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author dreamyao
* @title
* @date 2018/3/7 下午5:53
* @since 1.0.0
*/
@ThreadSafe
public class CountingFactorizer implements Servlet {
private final AtomicLong count = new AtomicLong(0);
public long getCount(){
return count.get();
}
@Override
public void service(ServletRequest servletRequest, ServletResponse servletResponse){
BigInteger i= extracFromRequest(servletRequest);
BigInteger[] factors=factor(i);
count.incrementAndGet();
encodeIntoResponse(resp,factors);
}
}

在实际情况中,应尽可能地使用现有的线程安全对象(例如:AtomicLong)来管理类的状态,与非线程安全的对象相比,判断线程安全对象的可能状态及其状态转换情况要更为容易,从而也更容易维护和验证线程安全性

加锁机制

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author dreamyao
* @title
* @date 2018/3/7 下午5:53
* @since 1.0.0
*/
@NotThreadSafe
public class UnsafeCachingFactorizer implements Servlet {
private final AtomicReference<BigInteger> lastNumber = new AtomicReference<>();
private final AtomicReference<BigInteger[]> lastfactors = new AtomicReference<>();
@Override
public void service(ServletRequest servletRequest, ServletResponse servletResponse){
BigInteger i= extracFromRequest(servletRequest);
if(i.equals(lastNumber.get())) {
encodeIntoResponse(resp,lastFactors.get());
} else {
BigInteger[] factors=factor(i);
lastNumber.set();
lastFactors.set(factor);
encodeIntoResponse(resp,factors);
}
}
}

该Servlet在没有足够的原子性保证的情况下对其最近计算结果进行缓存(不要这么做)

线程安全的定义要求

  1. 多个线程之间的操作无论采用何种执行时序或交替方式,都要保证不变性条件不被破坏
  2. 只有确保了这个不变条件不被破坏,上面的Servlet才正确的
  3. 当在不变条件中涉及到多个变量时,各个变量之间并不是彼此独立的,而是某个变量的值会对其他变量的值产生约束,因此当更新某个变量时,需要在同一个原子操作中对其他变量同时进行更新
  4. 在使用原子引用的情况下,尽管对set方法的每次调用都是原子的,但仍然无法同时更新lastNumber和lastFactors,如果只修改了其中一个变量,那么在这两次修改操作之间,其他线程将发现不变形条件被破坏了
  5. ==要保持状态的一致性,就需要在单个原子操作中更新所有相关的状态变量==

内置锁

Java提供了一种内置的锁机制来支持原子性(synchronize),同步代码块将介绍加锁机制以及其他同步机制的另一个重要方面:可见性,同步代码块包括以下两部分:

  • 一个作为锁的对象引用
  • 一个作为由这个锁保护的对象

重入

当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞,然而由于内置锁时可重入的,因此如果某个线程试图获得一个由他自己持有的锁,那么这个请求就会成功

“重入”意味着获取锁的操作粒度是“线程”而不是调用,重入避免了死锁的情况

用锁来保护状态

==注意==:访问共享状态的复合操作,例如命中计数器的递增操作(读取-修改-写入)或者延迟初始化(先检查后执行)都必须是原子操作以避免产生静态条件,仅仅将复合操作封装到一个同步代码块中是不够的

  1. 当类的不变性条件设计多个状态变量时,那么还有另外一个需求,在不变性条件中的每个变量都必须由同一个锁保护,因此可以在单个原子操作中访问或更新这些变量,从而确保不变条件不被破坏
  2. 多个原子操作复合为一个复合操作后复合操作就不一定是原子操作
  3. 当执行时间较长的计算或者可能无法快速完成的操作(例如:网络I/O或者控制台I/O)一定不要持有锁
  4. 加锁的含义不仅仅局限于互斥行为,还包括内存可见性,为了确保所有线程都能看到共享变量的最新值,所有执行读操作或者写操作的线程都必须在同一个锁上同步

对于可能被多个线程同时访问的可变状态了变量,在访问它时都需要持有同一个锁,在这种情况下,我们称状态变量是由这个锁保护的

每个共享的和可变的变量都应该只由一个锁来保护,从而使维护人员知道是哪一个锁

对于每个包含多个变量的不变性条件,其中涉及的所有变量都需要由同一个锁来保护

活跃性与性能

通过缩小同步代码块的作用范围,我们很容易做到既保护并发性,同时又维护线程安全性,要确保同步代码块不要过小,并且不要将本应是原子的操作拆分到多个同步代码块中去,应该尽量不影响共享状态且执行时间较长的操作从同步代码块中分离出去,从而在这些操作的执行过程中其他线程可以访问共享状态

对在单个变量上实现原子操作来说,原子变量是很有用的(推荐使用原子变量,例如AtomicLong等),但由于我们已经使用了同步代码块来构造原子操作,而使用两种不同的同步机制不仅会带来混乱,也不会在性能或安全上带来任何好处,因此在单个变量上实现原子操作和原子变量这两种同步机制任选其中一个就可以了

在获得锁与释放锁等操作都需要一定的性能开销,因此如果将同步代码块分解的过细,那么通常并不好,尽管这样做不会破坏原子性

通常,在简单性与性能之间存在着互相制约因素,当实现某个同步策略时,一定不要盲目地为了性能而牺牲简单性(这可能会破坏安全性)

对象的共享

Volatile变量

  1. Volatile变量的常用使用场景
1
2
3
4
5
volatile boolean asleep;
...
while (!asleep){
countSomeSheep();
}
  1. volatile 变量通常用做某个操作完成、发生中断或者状态的标志
  2. volatile 的语义不足以确保递增操作(count++)的原子性,除非你能确定只有一个线程对变量执行写操作(原子变量提供了:读-改-写的原子操作)并且常常用做一种更好的volatile变量
  3. 加锁机制即可以确保可见性又可以确保原子性,而volatile变量只能确保可见性

当且仅当满足一下所有条件时,才应该使用volatile变量

  • 对变量的写入操作不依赖变量的当前值,或者你能确保只有单个线程更新变量的值。
  • 该变量不会与其他状态变量一起纳入不变性条件中。
  • 在访问变量时不需要加锁

发布与逸出

发布

==概念==:发布一个对象是指使对象能够在当前作用域之外的代码中使用
例如:

  1. 将一个指向该对象的引用保存到其他代码可以访问的地方
  2. 在某个非私有的方法中返回改引用
  3. 将引用传递到其他类的方法中
    在许多情况中,我们要确保对象及其内部状态不被发布,但如果在发布时要确保线程安全性,则可能需要同步
    发布内部状态可能会破坏封装性(内部状态是指对象的域或者域中其他对象的域)并使得程序难以维持不变性条件,如果在对象构造完成之前就发布,就会破坏线程安全性

逸出

当某个不应该被发布的对象被发布时,就被称为逸出

示例:
发布对象最简单的方法就是将对象的引用保存到一个公有的静态变量中,以便任何类和线程都能看见

1
2
3
4
5
public static Set<Secret> knownSecrets;
public void initialize() {
knownSecrets=new HashSet<Secret>();
}

对象逸出的几种情况:

第一种: 当发布某个对象时,可能会间接的发布其他对象,如:将Secret对象放入Set集合中时,那么同样就发布了这个对象,因为其他任何代码都可以遍历这个集合并获得Secret对象的引用

如果从非私有方法中返回一个引用,那么同样会发布返回的对象,例如:

==示例==:

1
2
3
4
5
6
7
8
class UnsafeSates {
private String[] states = new String[] { "AK","AL"};
public String[] getStates() {
return states;
}
}

以上示例使内部的可变状态逸出(不要这么做)

第二种: 当发布一个对象时,在该对象的非私有域中引用的所有对象同样会被发布,一般来说如果一个已经发布的对象能够通过非私有的变量引用和方法调用到达其他的对象,那么这些对象也都会被发布

最后一种发布对象或其内部状态的机制就是发布一个内部类的实列,如:

==示例==:

1
2
3
4
5
6
7
8
9
10
11
public class ThisEscape {
public ThisEscape(EventSource source) {
source.registerListener(
new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
}
)
}
}

安全的对象构造过程

在ThisEcape中给出了逸出的一个特殊例子,即this引用在构造函数中逸出,不要在构造函数中使用this引用逸出

如果想在构造函数中注册一个事件监听器或启动线程,那么可以使用一个私有的构造函数和一个公共的工厂方法,从而避免不正确的构造过程,如:

==示例==:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author dreamyao
* @title
* @date 2018/3/7 上午9:49
* @since 1.0.0
*/
public class SafeListener {
private final EventListener listener;
private SafeListener(){
listener=new EventListener() {
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
protected Object clone() throws CloneNotSupportedException {
return super.clone();
}
@Override
public String toString() {
return super.toString();
}
@Override
protected void finalize() throws Throwable {
super.finalize();
}
};
}
public static SafeListener newSafeListener(EventSource source) {
SafeListener safe = new SafeListener();
source.registerListener(safe.listener);
return safe;
}
}

具体来说,只有当构造函数返回时,this引用才应该从线程中逸出,构造函数可以将this引用保存到某个地方,只要其他线程不会在构造函数完成之前使用它,SafeLinstener对象中就使用了这种技术

线程封闭

当某个对象封闭到一个线程中时,这种用法将自动实现线程安全性,即使被封闭的对象本身不是线程安全的

==例如==:
线程封闭技术最常见的应用是JDBC的Connection对象

Ad-hoc线程封闭

在volatile变量上存在一种特殊的线程封闭,只有能确保只有单个线程对共享的volatile变量执行写操作,那么就可以安全的在共享的volatile变量上实现读取-修改-写入操作

由于Ad-hoc线程封闭技术的脆弱性,因此在程序中尽量少用他,在可能的情况下尽量使用更强的线程封闭技术(例如:栈封闭或者ThreadLoad类)

栈封闭

对于基本类型的局部变量,无论如何都不会破坏线程封闭性,由于任何方法都无法获得对基本类型的引用,因此Java语言的这种语义就确保了基本类型的局部变量始终封闭在线程内

在维持对象引用的栈封闭时,一定要多做些工作确保被引用的对象不会逸出,然而如果发布了对象的引用,那么封闭性将被破坏,并导致对象逸出

ThreadLocal类

维持线程封闭性的一种更规范的方法是使用ThreadLocal,这个类能使线程的某个值与保存值的对象关联起来

ThreadLocal对象通常用于防止对可变的单实例变量或全局变量进行共享,ThreadLocal变量类似于全局变量,它能降低代码的可重用性,并在类之间引入隐含的耦合性,因此在使用时一定要格外小心

不变性

满足同步需求的另一种方法是使用不可变对象,不可变对象一定是线程安全的

不可变对象很简单,它们只有一种状态,并且该状态有构造函数来控制,如果将一个可变对象传递给不可信的代码,或者将该对象发布到不可性代码可以访问它的地方,那么就很危险—不可信代码会改变它们的状态

即使对象的所有域都是final类型,这个对象也任然是可变的,因为final类型的域中可以保存对可变对象的引用

当满足以下条件时对象才是不可变的

  • 对象创建以后其状态就不能修改
  • 对象的所有域都是final类型
  • 对象是正确创建(在对象的创建期间,this引用没有逸出)

安全发布的常用模式

可变对象必须通过安全的方式来发布,这通常意味着在发布和使用该对象的线程时都必须使用同步

要安全的发布一个对象,对象的引用以及对象的状态都必须同时对其他线程可见,一个正确构造的对象可以通过以下方式来安全的发布

  • 在静态初始化函数中初始化一个对象引用
  • 将对象的引用保存到volatile类型的域或者AtomicReference对象中
  • 将对象的引用保存到某个正确构造对象的final类型域中
  • 将对象的引用保存到一个由锁保护的域中

事实不可变对象

如果对象从技术上看是可变的,但其状态在发布后不会再改变,那么把这种对象称为事实不可变对象,在没有任何额外同步的情况下,任何线程都可以安全的使用被安全发布的事实不可以对象

==例如==:

  • Spring初始化单例Bean时初始化的值如果在后续没有被修改那么也是事实不可变的
  • 非final私有域在定义时被赋值,后续为对值进行修改时也是事实不可变的

可变对象

对象的发布需求取决于他的可变性

  • 不可变对象可以通过任意机制来发布
  • 事实不可变对象必须通过安全的方式来发布
  • 可变对象必须通过安全方式来发布,并且必须是线程安全的或者由某个锁保护起来

在并发程序中使用共享对象时可以使用一些使用的策略,包括

  • ==线程封闭== 线程封闭的对象只能由一个线程拥有,对象被封闭在该线程中,并且只能由这个线程修改
  • ==只读共享== 在没有额外同步情况下,共享的只读对象可以由多个线程并发访问,但任何线程都不能修改它,共享的只读对象包括不可变对象和事实不可变对象
  • ==线程安全共享== 线程安全的对象在其内部实现同步,因此多个线程可以通过对象的公有接口进行访问而不需要进一步的同步
  • ==保护对象== 被保护的对象只能通过持有特定的锁来访问,保护的对象包括封装在其他线程安全的对象中的对象,以及已发布的并且有某个由某个特定锁保护的对象

对象的组合

设计线程安全的类

在设计线程安全类的过程中,需要包括以下三个基本条件

  • 找出构成对象状态的所有变量
  • 找出约束状态变量的不可变条件
  • 建立对象状态的并发访问管理策略

要分析对象状态首先从域开始

  1. 如果对象中所有域都是基本类型的变量,那么这些域就构成了对象的全部状态,对于包含n个基本类型域的对象,其状态就是这些域构成的n元组
  2. 如果对象的域中引用了其他对象,那么该对象的状态就将包含被引用对象的所有域

同步策略

同步策略规定了如何将不可变性,线程封闭与加锁机制等结合起来以维护线程的安全性,并且还规定了哪些变量由哪些锁来保护

收集同步需求

final类型的域使用的越多,就越能简化对象可能状态的分析过程(在极端的情况中,不可变对象只有唯一的状态)

当下一个对象需要依赖当前状态时,这个操作就一定是一个复合操作(例如:递增、递减操作、集合的有就删除,没有就添加等操作)

由于不可变条件以及后验条件在状态及状态转换上施加了各种约束,因此就需要额外的同步与封装,如果在某个操作中存在无效的状态转换,那么该操作必须是原子的

不能首先更新一个变量,然后释放锁并再次获得锁,然后在更新其他的变量,因为释放锁后,可能会使对象处于无效状态,如果在一个不可变性条件中包含多个变量,那么在执行任何访问相关变量的操作时,都必须持有保护这些变量的锁

依赖状态的操作

想要实现某个等待先验条件为真才执行的操作,一种更简单的方法是通过现有库中的类(例如:阻塞队列或者信号量)来实现依赖状态的行为

状态的所有权

所有权与封装性总是互相关联的:

  1. 对象封装它拥有的状态,反之也成立,对它封装的状态拥有所有权
  2. 如果发布某个可变对象的引用,那么就不在拥有独占控制权,最多是“共享控制权”
  3. 对应从构造函数或者从方法中传递进来的对象,类通常并不拥有这些对象,除非这些方法是专门设计为转移传递进来的对象的所有权(例如:同步容器封装器的工厂方法)
  4. 容器类通常表现出一种“所有权分离”的形式,其中容器类拥有其自身的状态,而客户端代码则拥有容器中各个对象的状态(也就是拥有容器中各个对象的域的所有权)

为了防止多个线程在并发访问同一个对象时产生的互相干扰,这些对象应该要么是线程安全的,要么是事实不可变的对象,或者由锁来保护的对象

实列封闭

如果某个对象不是线程安全的,那么可以通过多种技术使其在多个线程中安全地使用,你可以确保对象只由单个线程访问(线程封闭)或者通过一个锁来保护该对象的所有访问

通过将封闭机制与适合的加锁策略结合起来,可以确保以线程安全的方式来使用非线程安全的对象。

将数据封装在对象内部,可以将数据访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁

被封闭的对象一定不能超出他们既定的作用域

  • 对象可以封闭在类的一个实列(例如:作为类的一个私有成员)中
  • 封闭在某个作用域内(作为一个局部变量(又称为栈封闭))
  • 封闭在线程内(例如:在某个线程中将对象从一个方法传递到另一个方法,而不是在对该线程之间共享该对象)

实列封闭是构建线程安全类的一个最简单的方式,它还使得在锁的策略的选择上拥有了更多的灵活性

封闭机制更易于构造线程安全的类,因为当封闭类的状态时,在分析类的线程安全性时就无需检查整个程序

Java监视器模式

Java监视器模式仅仅是一种编写代码的约定,对于任何一种锁对象,只要自始至终都使用该锁对象,都可以用来保护对象的状态

线程委托机制

大多数对象都是组合对象,当从头开始构建一个类,或者将多个非线程安全的类组合为一个类时,Java监视器模式用于实现线程安全的类是非常有用的

在某些情况下,通过多个线程安全类组合而成的类是线程安全的,而在某些情况下并不是线程安全的(即如果出现复合操作多个线程安全类或者多个原子操作时,复合操作必须是原子的,即通过加锁来确保线程安全)

==例子==:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @author dreamyao
* @version 1.0.0
* @description
* @date 2017/11/29 下午11:01
*/
@Immutable
public class Point {
public final int x, y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* @author dreamyao
* @title
* @date 2018/3/2 下午2:15
* @since 1.0.0
*/
@ThreadSafe
public class DelegatingVehicleTracker {
private final ConcurrentHashMap<String,Point> locations;
private final Map<String,Point> unmodifiableMap;
public DelegatingVehicleTracker(Map<String, Point> points) {
locations = new ConcurrentHashMap<>(points);
unmodifiableMap = Collections.unmodifiableMap(points);
}
public Map<String,Point> getLocations(){
return unmodifiableMap;
}
/**
* 此处发布了Point对象,但是Point对象为不可变对象,所以此处发布时线程安全的
* @param id
* @return
*/
public Point getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) {
if (locations.replace(id, new Point(x, y)) == null) {
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
}
}

独立的状态变量

我们可以将线程安全性委托给多个对象状态变量,只要这些变量是彼此独立的,即不在同一个方法中不是原子的操作多个状态变量

==例子==:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @author dreamyao
* @title
* @date 2018/3/4 下午8:56
* @since 1.0.0
*/
public class VisualComponent {
private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();
private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();
public void addKeyListener(KeyListener listener) {
keyListeners.add(listener);
}
public void addMouseListener(MouseListener listener) {
mouseListeners.add(listener);
}
public void removeKeyListener(KeyListener listener) {
keyListeners.remove(listener);
}
public void removeMouseListener(MouseListener listener) {
mouseListeners.remove(listener);
}
}

每个链表都是线程安全的,此外由于各个状态操作之间不存在耦合关系(分开独立在操作)因此VisualComponent可以将它的线程安全性委托给keyListeners、mouseListeners两个对象,故VisualComponent是线程安全的

当委托失效时

==例子==:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @author dreamyao
* @title
* @date 2018/3/4 下午9:03
* @since 1.0.0
*/
public class NumberRange {
/**
* 不变性条件:lower <= upper
*/
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
/**
* 虽然lower和upper都是原子操作,但是在serLower中为复合操作,但setLower不是原子操作(没进行加锁)所有setLower不是线程安全的
* @param i
*/
public void setLower(int i) {
// 注意--不安全的“先检查后执行”
if (i > upper.get()) {
throw new IllegalArgumentException("can't set lower to " + i + " > upper");
}
lower.set(i);
}
/**
* 虽然lower和upper都是原子操作,但是在setUpper中为复合操作,但setUpper不是原子操作(没进行加锁)所有setUpper不是线程安全的
* @param i
*/
public void setUpper(int i) {
// 注意--不安全的“先检查后执行”
if (i < lower.get()) {
throw new IllegalArgumentException("can't set upper to " + i + " > lower");
}
upper.set(i);
}
}

NumberRange可以通过加锁机制来维护不变性条件以确保线程安全,例如:使用一个锁来保护lower和upper,此外还必须避免发布lower和upper,从而防止客户端代码破坏其不变形条件

如果某个对象含有复合操作,例如:NumberRange ,那么仅靠委托不足以实现线程安全性,在这种情况下,这个类必须提供自己的加锁机制以保证这些复合操作都是原子操作,除非整个复合操作都可以委托给状态变量。

如果一个类由多个独立且线程安全的状态变量组成,并且在所有的操作中都不包含无效状态转换,那么可以将线程安全性委托给底层的状态变量

==构造线程安全类时采用的一些技术,例如:将线程安全性委托给现有的线程安全类,委托是创建线程安全类的一个最有效的策略,只需让现有的线程安全类管理所有的状态即可==

发布底层的状态变量

发布可变的变量将对下一步的开发和派生子类带来限制,但不会破坏类的线程安全性

如果一个状态变量是线程安全的,并且没有任何不变形条件约束他的值,在变量的操作上也不存在任何不允许的状态转换,那么就可以安全地发布这个变量

==示例==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午9:28
* @since 1.0.0
*/
@ThreadSafe
public class SafePoint {
@GuardedBy("this")
private int x, y;
private SafePoint(int[] a) {
this(a[0], a[1]);
}
public SafePoint(SafePoint p) {
this(p.get());
}
public SafePoint(int x, int y) {
this.x = x;
this.y = y;
}
public synchronized int[] get(){
return new int[]{x, y};
}
public synchronized void set(int x, int y) {
this.x = x;
this.y = y;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午9:32
* @since 1.0.0
*/
public class PublishingVehiceTracher {
private final Map<String,SafePoint> locations;
private final Map<String,SafePoint> unmodifiableMap;
public PublishingVehiceTracher(Map<String, SafePoint> locations) {
this.locations = new ConcurrentHashMap<>(locations);
this.unmodifiableMap = Collections.unmodifiableMap(this.locations);
}
public Map<String,SafePoint> getLocations(){
return unmodifiableMap;
}
/**
* 此处发布了Point对象,但是Point对象为不可变对象,所以此处发布时线程安全的
* @param id
* @return
*/
public SafePoint getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) {
if (locations.replace(id, new SafePoint(x, y)) == null) {
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
}
}

SafePoint虽然为可变对象,但是是线程安全的对象,故getLocations()或getLocation()在发布SafePoint对象时是线程安全的

getLocation() 方法返回底层Map对象的一个不可变副本,调用者不能增加或删除车辆,但却可以通过修改返回Map中的SafePoint值来改变车辆的位置
如果需要对车辆位置的变化进行判断或者当位置变化时执行一些操作,那么PublishingVehiceTracher中采用的方法并不合适

在现有的线程安全类中添加功能

Java类库包含了很多有用的基础模块类,我们应该优先选择重用这些现有的类而不是创建新的类,有时候,某个现有的线程安全类能支持我们需要的所有操作,但是更多的时候,现有的类只能支持大部分操作,此时就需要在不破坏线程安全的情况下添加一个新的操作

若没有则添加的概念很简单,但是由于这个类必须是线程安全的,因此就隐含地增加了另一个需求,即若没有则添加,这个操作必须是原子操作

==示例==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午10:01
* @since 1.0.0
*/
@ThreadSafe
public class BetterVector<E> extends Vector<E> {
public synchronized boolean putIfAbsent(E x) {
boolean absent = !contains(x);
if (absent) {
add(x);
}
return absent;
}
}

在现有的线程安全类中添加功能有如下策略:

第一种策略: 要添加一个新的原子操作,最安全的方法是修改原始的类,但这通常无法做到因为可能无法访问或修改类的源代码

第二种策略: 另一种方法是扩展这个类,扩展方法比直接将代码添加到类中更加脆弱,因为现在的同步策略实现被分布到多个单独维护的源码文件中,如果底层的类改变了同步策略并选择了不同的锁来保护它的状态变量,那么子类会被破坏,因为在同步策略改变后它无法再使用正确的锁来控制对基类状态的并发访问(在Vector的规范中定义了它的同步策略,因此BetterVector不存在这个问题)

客户端加锁机制

第三种策略: 第三种策略是扩展类的功能,但并不是扩展类本身,而是将扩展代码放入一个“辅助类”中
如下示例实现了:若没有则添加操作的辅助类

非线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午10:14
* @since 1.0.0
*/
@NotThreadSafe
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<>());
public synchronized boolean putIfAbsent(E x) {
boolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
}

以上示例问题在于:无论List使用哪个锁来保护它的状态,可以确定的是,这个锁并不是ListHelper上的锁,ListHelper只是带来了同步的假象,尽管所有的链表被声明为synchronized,但却使用了不同的锁,这意味着putIfAbsent相当于List的其他操作来说并不是原子的,因此就无法确定当putIfAbsent执行时另一个线程不会修改链表

线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午10:14
* @since 1.0.0
*/
@ThreadSafe
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<>());
public synchronized boolean putIfAbsent(E x) {
synchronized (list) {
boolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
}
}

以上示例:List在实现客户端加锁或外部加锁时使用同一个锁,客户端加锁是指,对于使用某个对象X的客户端代码,使用X本身用于保护其状态的锁来保护这段客户端代码,要使用客户端加锁,就必须知道对象X使用的是一个锁

客户端加锁机制与扩展机制有许多共同点,二者都是派生类的行为与基类的实列耦合在一起,正如扩展会破坏实现的封装性,客户端加锁同样会破坏同步策略的封装性

组合

第四种策略: 当为现有的类添加原子操作时有更好的方式:组合

==示例==:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* @author dreamyao
* @title
* @date 2018/3/6 下午10:32
* @since 1.0.0
*/
public class ImprovedList<T> implements List<T> {
private final List<T> list;
public ImprovedList(List<T> list) {
this.list = list;
}
public synchronized boolean punIfAbsent(T x) {
boolean contains = list.contains(x);
if (contains) {
list.add(x);
}
return !contains;
}
public synchronized void clear(){
list.clear();
}
// 按照类似的方式委托List的其他方法
}

ImprovedList通过自身的内置锁增加了一层额外的加锁,它并不关心底层的List是否为线程安全的,即使List不是线程安全的或者修改了他的加锁实现,ImprovedList也会提供一致的加锁机制来实现线程安全,虽然额外的同步层可能导致轻微的性能损失,但是性能损失是很小的因为底层List上的同步不存在竞争,所有速度会很快。

将同步策略文档化

在设计同步策略时需要考虑多个方面

  • 将那些变量声明为volatile类型
  • 将那些变量用锁来保护
  • 将那些锁用来保护那些变量
  • 那些变量必须是不可变的或者被封闭在线程中
  • 那些操作必须是原子的

构建基础模块

同步容器类

同步容器类包括:Vector、Hashtable等等,这些同步的封装器类时由Collections.synchronizedXxx等工厂方法创建的,这些类实现线程安全的方式是:将他们的状态封装起来,并对没一个公有的方法都进行同步,使得没一次只有一个线程能访问容器的状态

同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作,容器上的复合操作包括:

  • 迭代(反复访问元素,直到遍历完容器中的所有元素)
  • 跳转(根据指定顺序找到当前元素的下一个元素)
  • 条件运算(“若没有则添加”检查在Map中是否存在键值K,如果没有,就加入二元组K,V)

同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发的修改容器时,他们可能会表现出意料之外的行为

示例:

1
2
3
4
5
6
7
8
9
public static Object getLast(Vector list) {
int lastIndex=list.size() - 1;
retrun list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex=list.size() - 1;
return list.remove(lastIndex);
}

以上示例Vector上可能导致混乱结果的复合操作,当多好线程交替调用getLast(),deleteLast()方法时,getLast将抛出异常,因为在调用size与调用getLast()这两个操作之间,Vector变小了,因此在调用size时得到的索引值将不在有效

由于同步容器类要遵守同步策略,即支持客户端加锁,因此可能会创建一些新的操作,只要我们知道应该使用哪一个锁,那么这些新操作就与容器的其他操作一样都是原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Object getLast(Vector list) {
synchronize(list) {
int lastIndex=list.size() - 1;
retrun list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronize(list) {
int lastIndex=list.size() - 1;
return list.remove(lastIndex);
}
}

使用客户端加锁的Vector上的复合操作

迭代器与ConcurrentModificationException

现代的同步容器类也并没有消除复合操作中的问题
即使不存在死锁或饥饿等风险,长时间地对容器加锁也会降低程序的可伸缩性,那么将极大的降低吞吐量和CPU的利用率

如果不希望在迭代期间对容器加锁,那么有一种替代方法就是克隆容器,并在副本上进行迭代操作,由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就避免了抛出ConcurrentModificationException(在克隆过程中任然需要对容器加锁)在克隆容器时存在显著的性能开销,这种方式的好坏取决于多个因素,包括容器的大小,在每个元素上执行的工作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求

隐藏迭代器

虽然加锁可以防止迭代器抛出ConcurrentModificationException,但你必须记住在所有对共享容器进行迭代的地方都需要加锁,实际情况要更加复杂,因为某些情况下,迭代器会隐藏起来

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* @author dreamyao
* @title
* @date 2018/3/7 下午8:44
* @since 1.0.0
*/
public class HiddenIterator {
@GuardedBy("this")
private final Set<Integer> set = new HashSet<>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings(){
Random random = new Random(47);
for (int i = 0; i < 10; i++) {
add(random.nextInt());
}
// 隐藏在字符串连接中的迭代操作
System.out.println("DEBUG: added ten elements to " + set);
}
}

隐藏在字符串连接中的迭代操作(不用这么)
addThenThings方法会抛出ConcurrentModificationException,因为在生成调试信息的过程中,toSting方法对容器进行了迭代操作,当然,真正的问题在于HiddenIterator不是线程安全的,在使用println中的set之前必须首先要获得HiddenIterator的锁,但在调试代码和日志代码中通常或忽视这个问题。

如果HiddenIterator用synchronizeSet来包装HashSet,并且对同步代码进行封装,那么就不会发生这种错误。

正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略

容器的hashCode和equals等方法也会间接的执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况,同样containsAll、removeAll、retainAll等方法,以及把容器作为参数的构造函数,都会对容器进行迭代,所有这些间接的迭代操作都可能抛出ConcurrentModificationException

并发容器

Java5.0提供了多种并发容器来改进同步容器的性能,同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性,这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低

并发容器是针对多个线程并发访问设计的,在Java5.0中增加了ConcurrentHashMap,用来代替同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作情况下代替同步的List,在新的ConcurrentMap接口中增加了对一些常用复合操作的支持,例如:“若没有则添加”、替换以及有条件删除等

==通过并发容器来代替同步容器,可以极大地提高伸缩性并降低分析==

Java5.0增加了两种新的容器,Queue和BlockingQueue,Queue用来临时保存一组等待处理的元素

Java6.0也引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和SortedSet的并发替代品(例如:用synchronizeMap包装的TreeMap或TreeSet)

ConcurrentHasMap

ConcurrentHasMap与HashMap一样也是基于散列的Map,但它使用了一种完全不一样的加锁方式,ConcurrentHasMap使用了一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁

  • ConcurrentHasMap与其他并发容器一起增强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁
  • ConcurrentHasMap返回的迭代器具有若一致性,而非及时失败,弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会变量已有的元素,并可以(但不保证)在迭代器被构造后将修改操作反应给容器

只有当应用程序需要加锁 Map以进行独占访问或需要需要依赖同步Map带来的一些作用时,才应该放弃使用ConcurrentHasMap。

额外的原子Map操作

由于ConcurrentHasMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作

CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情况下他提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制(类似地CopyOnWriteArraySet的作用是替代同步的Set)

写入时复制(Copy-On-Write)容器的线程安全在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不需要进一步的同步

写入是复制容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响

虽然,==每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模比较大时,仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器==

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put(存入元素)和take(消费元素),以及支持定时的offer和poll方法,队列可以是有界的和无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞

阻塞队列支持生产者-消费者设计模式,该模式将“找出需要完成的工作”与“执行工作”这两个过程分离开来,并把工作放入一个待完成列表中以便在随后处理,而不是找出后马上处理

在基于阻塞队列构建的生产者-消费者设计中,生产者不需要知道消费者的标识与数量,或者他们是不是唯一的生产者,而只需要把数据放入即可,同样消费者也不需要知道生产者是谁,或者工作来自何处,BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者

一种最常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式

生产者和消费者的角色是相对的,某种环境中消费者在另一种不通的环境中可能会成为生产者

在构建搞可靠的应用程序时,有界队列是一种强大的资源管理器,它们能抑制并防止生产者过多的工作项,使应用程序在负荷过载的情况下变得更加健壮

示例:桌面搜索

有一种类型的程序适合被分解为生产者和消费者,例如:代理程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @author dreamyao
* @title
* @date 2018/3/8 下午2:28
* @since 1.0.0
*/
public class FileCrawler implements Runnable {
private final BlockingDeque<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(){
fileQueue = new LinkedBlockingDeque<>(10);
fileFilter = new AgeFileFilter(10);
root = new File("");
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File file : entries) {
if (file.isDirectory()) {
crawl(file);
} else if (!alredyIndxed(file)) {
fileQueue.put(file);
}
}
}
}
@Override
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author dreamyao
* @title
* @date 2018/3/8 下午3:36
* @since 1.0.0
*/
public class Indexer implements Runnable {
private final BlockingDeque<File> queue;
public Indexer(BlockingDeque<File> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

以上示例为桌面搜索应用程序中的生产者和消费者任务

生产者-消费者设计模式,分离了生产者和消费者,比将所有代码放到一起可读性和重用写更高,并且带来了许多性能优势,生产者-消费者模式可以并发执行,如果一个是I/O密集型,另一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率

串行线程封闭

  1. 对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程的封闭,从而将对象所有权从生产者交付给消费者。线程封闭对象只能由单个线程拥有,但可以通过安全地发布该对象来转移所有权。在转移所有权后,也只有另一个线程能够获得这个对象的访问权限,并且发布对象不会在访问它。这种安全的发布确保了对象状态对应新的所有者来说是可见的,并且由于最初的所有者不会在访问它,因此对象将被封闭在新的线程中。新的所有者可以对该对象做任意的修改,因为它具有独占的访问权
  2. 对象利用了串行线程封闭,将对象“借给”一个请求线程,只要对象池包含足够的内部同步来安全的发布池中的对象,并且只要客户端代码本身不会发布池中的对象,或者在将对象返回给对象池后就不在使用它,那么久可以安全的在线程之间传递所有权
  3. 我们也可以使用其他发布机制来传递可变对象的所有权,但必须确保只有一个线程能接受被转移的对象,阻塞队列简化了这项工作,除此之外,还可以通过ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet来完成这项工作

双端队列与工作窃取

Deque和BlockingDeque分别对Queue和BlockingQueue进行了扩展,Deque是一个双端队列,实现了在队列头和尾的高效插入和移除

双端队列适用于工作窃取设计模式,即如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者的双端队列末尾窃取工作,窃取工作的模式比传统的生产者-消费者模式具有更高的可伸缩性

工作窃取非常适合用于即是消费者也是生产者问题

阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种

  • 等待I/O操作结束
  • 等待获得一个锁
  • 等待从Thread.sleep方法中醒来
  • 等待另一个线程的计算结果
    当线程阻塞时,它通常会被挂起并处于阻塞状态(BLOCKED、WAITING、TIMED_WAITING)

BlockingQueue的put和take等方法会抛出受检异常InterrutedException,这与类库中的其他方法的做法相同,列如Thread.sleep,当某个方法抛出InterrutedException时表示该方法是一个阻塞方法,如果这个方法被中断,那么他将努力提前结束阻塞状态

Thread提供了interrupt方法用于中断线程或者查询线程是否已经被中断

中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他操作

==传递InterrutedException== 避开这个异常通常是最明智的策略,只需要把InterrutedException传递给方法调用者,传递InterrutedException的方法是,根本不捕获这个异常或者捕获了在执行简单的清理工作后再次抛出

==恢复中断== 有时候不能抛出InterrutedException,例如当代码在Runnable的一部分时,在这些情况下,必须捕获InterrutedException,并通过调用当前线程的interrupt方法恢复中断状态,这样调用栈中更高层的代码将看到引发了一个中断

示例

1
2
3
4
5
6
7
8
9
10
11
12
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
....
public void run() {
try {
processTask(queue,take());
} catch(InterrutedException e) {
// 恢复被中断的状态
Thread.currentThread().interrup();
}
}
}

同步工具类

在容器类中,阻塞队列是一直独特的类,他们不仅仅能作为保存对象的容器,还是协同调生产者-消费者等待线程之间的控制流

同步工具类可以是任何一个对象,只有他根据其自身的状态协调线程的控制流

阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)

所有的同步工具类都包含一些特殊的结构化属性,他们封装了一些状态,这状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入预期状态

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到到达终止状态

闭锁可以用来确保某些活动直到其他活动都完成后才继续执行

  1. 确保某个计算在其需要的所有资源都被初始化之后才继续执行
  2. 确保某个服务在其他依赖的所有服务已经启动之后才执行
  3. 等待直到某个操作的所有参与者都就绪在继续执行(例如:性能测试中的集合点)

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,闭锁状态包括一个计数器,该计数器初始化一个正数,表示需要等待的事件数量,countDown方法递减计数器,表示一个事件已经就绪了,如果计数器的值为非零那么await会一直阻塞计数器直到计数器为零为止,或者等待中的线程中断或者等待超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午1:30
* @since 1.0.0
*/
public class TestHarness {
public long timeTasks(int nxThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(nxThreads);
final CountDownLatch endGate = new CountDownLatch(nxThreads);
for (int i = 0; i < nxThreads; i++) {
Thread t = new Thread(() -> {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}

在计时测试中使用CountDownLatch来启动和停止线程

FutureTask

FutureTask也可以实现闭锁(FutureTask实现了Future语义)表示一种抽象的可生成结果的计算,Future表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行、正在运行、运行完成

FutureTask在Excutor框架中表示异步任务,此外还可以表示一些时间比较长的计算,这些计算可以在使用计算结果之前启动

==示例==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午1:48
* @since 1.0.0
*/
public class Preloader {
private final FutureTask<ProductInfo> future = new FutureTask<> (new Callable<ProductInfo> {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread =new Thread(future);
public void start() {
thread.start();
}
public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExcutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
}else {
throw launderThrowable(cause);
}
}
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
}else if (t instanceof Error) {
throw (Error) t;
}else {
throw new IllegalStateException("Not unchecked", t);
}
}
}
}

信号量

计数信号量用例控制同步访问某个特定资源的操作数量,或者执行某个指定操作的数量,计数器信号量还可以用来实现某种资源池,或者对容器施加边界

Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定,在实现操作时可以首先获得许可(只要还有剩余许可)并在使用后释放许可,如果没有许可,那么acquire方法将阻塞直到有许可为止(或者直到被中断或等待超时),release方法将返回一个许可(即释放占用的许可)给信号量

==在这种实现中不包含真正的许可对象,并且Semaphore也不会将许可与线程关联起来,因此在一个线程中获得的许可可以在另外一个线程释放,可以将acquire方法视为是消费一个许可,而release方法是创建一个许可,Semaphore并不受限于它在创建时的初始许可数量==

==示例==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午2:18
* @since 1.0.0
*/
@Immutable
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore semaphore;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
this.semaphore = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
// 获取许可
semaphore.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
}finally {
if (!wasAdded) {
semaphore.release();
}
}
}
public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
semaphore.release();
}
return wasRemoved;
}
}

栅栏

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行,闭锁用于等待事件,而栅栏用与等待其他线程

CyclicBarrier可以使一定数量的参与方式反复地在栅栏位置汇聚,它在并行迭代算法中非常有用,这种算法通常将一个问题拆分成一系列相互独立的子问题,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都要到达栅栏位置,如果所有的线程达到了栅栏位置,那么栅栏将打开,如果对await的调用超时或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。

另外一种形式的栅栏式Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据,当两方的执行不对称的操作时,Exchanger非常有用,例如:当一个线程向缓冲区中写数据时,而另外一个线程从缓冲区读数据

==当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全的发布给另一方==

构建高效且可伸缩的结果缓存

简单的缓存可能会将性能瓶颈转变成可伸缩性瓶颈,即使缓存是用于单线程的性能

示例

1
2
3
4
5
6
7
8
9
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:14
* @since 1.0.0
*/
public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:15
* @since 1.0.0
*/
public class ExpensiveFuntion implements Computable<String,BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
// 经过很长的计算
return new BigInteger(arg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:17
* @since 1.0.0
*/
@ThreadSafe
public class Memoizer1<A,V> implements Computable<A,V> {
@GuardedBy("this")
private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> computable;
public Memoizer1(Computable<A, V> computable) {
this.computable = computable;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = computable.compute(arg);
cache.put(arg, result);
}
return result;
}
}

HashMap不是线程安全的,因此确保不会被同时访问HashMap,Memoizer1采用了一种保守的方法,即对整个compute方法加锁,这边方法确保了线程安全性,但会带来可伸缩性问题,每次只能一个线程执行compute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:20
* @since 1.0.0
*/
@ThreadSafe
public class Memoizer2<A,V> implements Computable<A,V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computable;
public Memoizer2(Computable<A, V> computable) {
this.computable = computable;
}
@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = computable.compute(arg);
cache.put(arg, result);
}
return result;
}
}

Memoizer2的问题在于,如果某个线程启动了一个开销很大的计算,而其他线程不知道这个计算正在进行,那么很可能会出现重复计算的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:21
* @since 1.0.0
*/
@ThreadSafe
public class Memoizer3<A,V> implements Computable<A,V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computable;
public Memoizer3(Computable<A, V> computable) {
this.computable = computable;
}
@Override
public V compute(final A arg) throws InterruptedException {
Future<V> future = cache.get(arg);
if (future == null) {
Callable<V> eval= () -> computable.compute(arg);
FutureTask<V> futureTask = new FutureTask<>(eval);
future = futureTask;
cache.put(arg, futureTask);
futureTask.run();
}
try {
return future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}

Memoizer3的实现几乎是完美的,他表现了出了非常高的并发性,但是它有一个缺陷,即任然存在两个线程计算出相同的结果值的漏洞,但这个漏洞发生的概率要远小于Memoizer2,由于在compute方法中的if 代码块是非原子的 “先检查后执行” 操作,因此两个线程任然有可能同时调用compute方法来计算出相同的结果,即二者都没在缓存中找到期望值

Memoizer3中存在的这个问题的原因是:复合操作(若没有则添加)是在底层的Map对象上执行的,而这个对象无法通过加锁来确保原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @author dreamyao
* @title
* @date 2018/3/9 下午4:28
* @since 1.0.0
*/
@ThreadSafe
public class Memoizer<A,V> implements Computable<A,V> {
private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computable;
public Memoizer(Computable<A, V> computable) {
this.computable = computable;
}
@Override
public V compute(A arg) throws InterruptedException {
while (true) {
Future<V> future = cache.get(arg);
if (future == null) {
Callable<V> eval = () -> computable.compute(arg);
FutureTask<V> futureTask = new FutureTask<>(eval);
cache.putIfAbsent(arg, futureTask);
future = futureTask;
futureTask.run();
}
try {
return future.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (Exception e) {
cache.remove(arg, future);
}
}
}
}

Memoizer使用了ConcurrentMap中的原子方法putIfAbsent,避免了Memoizer3的漏洞

当缓存的是Future而不是值时,将导致缓存污染问题:如果某个计算被取消或失败,那么在计算这个结果时将指明计算过程被取消或失败,为了避免这种情况,如果Memoizer发现计算被取消或失败,那么将把Future从缓存中移除

第一部分小结

以下为并发技巧清单

  • 可变状态是至关重要的,所有的并发问题都可以归结为如何协调对并发状态的访问,可变状态越少就越容易确保线程安全性
  • 尽量将域声明为final类型,除非需要他们是可变的
  • 不可变对象一定是线程安全的,不可变对象能极大降低并发编程的复杂性,他们更为简单而且线程安全,可以任意共享而无需使用加锁或保护性复制等机制
  • 封装有助于管理复制性,在编写安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这样做呢?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略
  • 用锁来保护每个可变变量
  • 当保护同一个不变性条件中的所有变量时,需要使用同一个锁
  • 在执行复合操作期间,要持有锁
  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么线程会出现问题
  • 不要故作聪明地推断出不需要使用同步
  • 在设计过程中考虑线程安全,或者在文档中明确地指出它是不是线程安全的
  • 将同步策略文档化

任务执行

在线程中执行任务

在理想状态情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应,独立性有助于实现并发,因为如果存在足够多的处理器资源,那么这些独立的任务都可以并行执行

大多数服务应用程序提供了一种自然的任务边界选择:以独立的客户请求为边界

串行地执行任务

在应用程序中可以有多种策略在调度任务,而其中一些策略能够更好地利用潜在的并发性,最简单的策略就是在单线程中串行的执行各项任务

在服务器应用程序中串行处理机制通常无法提供高吞吐率和快速响应性

显式地为任务创建线程

  1. 任务处理过程从主线程分离出来,使得主循环能够更快地重新等待下一个到来的连接,这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性
  2. 任务可以并行处理,从而能同时服务多个请求,如果有多个处理器,或者任务由某种原因被阻塞,例如:I/O完成、获取锁或者资源可用性等,程序的吞吐量将得到提高
  3. 任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码

无限制创建线程的不足

  • 线程生命周期的开销非常高
  • 资源消耗大
  • 稳定性差

Executor框架

任务是一组逻辑工作单元,而线程则是任务异步执行的机制,串行执行的问题在于其糟糕的响应性和吞吐量,而为每个任务分配一个线程的问题在于资源管理的复杂性。

Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程相当于消费者(执行完这些工作单元),如果想要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor

==示例== : 基于线程池的Web服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author dreamyao
* @title 基于线程池的Web服务
* @date 2018/3/12 下午12:57
* @since 1.0.0
*/
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
for (;;) {
final Socket connect = socket.accept();
Runnable runnable = () -> handleRequest(connect);
exec.execute(runnable);
}
}
private static void handleRequest(Socket connect) {
}
}

改变Executor实现或配置所带来的影响远远小于改变任务提交方式带来的影响,通常Eexcutor的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中,增加了修改的难度

==示例== : 为每一个请求启动一个新线程的Executor

1
2
3
4
5
6
7
8
9
10
11
12
/**
* @author dreamyao
* @title 为每一个请求启动一个新线程的Executor
* @date 2018/3/12 下午1:06
* @since 1.0.0
*/
public class ThreadPerTaskExecutor implements Executor{
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}

执行策略

  • 在什么线程中执行任务
  • 任务按照什么顺序执行(FIFO、LIFO、优先级)
  • 有多少个任务能并发执行
  • 在队列中有多少个任务在等待执行
  • 如果系统由于过载而需要拒绝一个任务,那么应该现在哪一个任务?另外,如何通知应用程序有任务被拒绝
  • 在执行一个任务之前或之后,应该进行哪些动作

各种策略都是资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求

线程池

线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务,工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务

  • newFixedThreadPool:将创建一个固定长度的线程池
  • newCacheThreadPool:将创建一个可缓存的线程池,线程池不存在任何线程
  • newSingleThreadPool:newSingleThreadPool是一个单线程的Executor,它创建当工作者线程来执行任务,这个线程异常结束,会创建另一个线程来替代,newSingleThreadPool能确保依照任务在队列中的顺序来串行执行(例如:FIFO、LIFO、优先级)newSingleThreadPool还提供了大量的内部同步机制,从而确保了任务执行的任何内存写入操作对于后续任务来说都是可见的,这意味着,即使这个线程会不时的被另一个线程替代,但是对象总是可以安全的封闭在任务线程中
  • newScheduledThreadPool:创建一个固定长度的线程池,而且以延迟或者定时的方式来执行任务

ExecutorService的生命周期有3种状态:运行、关闭、已终止,ExecutorService在初始化创建时处于运行状态,shutdown方法将执行平缓的关闭过程,不在接受新的任务,同时等待已提交的任务执行完成(包括哪些还未开始执行的任务),shutdownNow方法将执行粗暴的关闭过程,它尝试取消所有运行中的任务,并且不在启动队列中尚未开始执行的任务

延迟任务与周期任务

Timer类负责管理延迟任务已经周期任务,然而Timer类存在缺陷,因此应该考虑用ScheduledThreadPoolExecutor来替代它,Timer支持基于绝对时间而不是相对时间,因此任务的执行对系统时钟变化很敏感,而ScheduledThreadPoolExecutor只支持基于相对时间的调度

Timer在执行所有定时任务时只会创建一个线程,如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性

Timer的另一个问题是,如果TimerTask抛出一个未检查异常,那么Timer将表现出糟糕的行为,Timer线程并不捕获异常,因此当TinmerTask抛出未检查的异常时将终止定时线程,这种情况下Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了,因此已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度(这个问题称之为线程泄漏)

如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BockingQueue,并为ScheduledThreadPoolExecutor提供了调度功能,DelayQueue管理着一组Delayed对象,每个Delayed对象都有一个相应的延迟时间,在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作,从DelayQueue中返回的对象根据他们的延迟时间进行排序

找出可利用的并行性

Executor框架帮助指定执行策略,但如果要使用Executor,必须将任务表述为一个Runnable,在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但是有时候任务边界并非是显而易见的,例如:很多桌面应用中,即使是服务器应用程序,在单个客户请求中仍可能存在发掘的并发性,例如:数据库服务器

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式,Runnable是一种有很多局限的抽象,虽然run方法能写入到日志文件或者将结果放入某个共享数据结果中,但他不能返回或者抛出受检查异常

Runnable和Callable描述的都是抽象的计算任务,这些任务通常是有范围的,即都有一个明确的起点,并且最终会结束

Executor执行的任务有4个生命周期阶段:创建、提交、开始、完成,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消,取消一个已完成的任务不会产出任何影响

Future任务的生命周期只能前进,不能后退,就像ExecutorServer的生命周期一样,如果任务被取消,get方法将抛出CancellationException,如果get抛出ExecutionException,则可以通过getCause来获得被封装的初始异常

要使用Callable 来表示无返回值的任务,可以使用Callable

在异构任务并行化中存在的局限

两个不同类型的任务称为异构任务,然而通过对异构任务的并行化来获得重大的性能提升是很困难的,如果没有在相似的任务之间找出细粒度的并行性,那么这种并行执行的方式带来的好处将减少。

只有大量相互独立且同构的任务可以并行进行处理时,才能提现出程序的工作负载分配到多个任务中带来的真正性能提升。

CompletionService:Executor与BlockingQueue

如果向Executor提交了一组计算任务,并且希望计算完成后获得结果,那么可以保留每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成,这种方法虽然可行,但是却有些繁琐,幸运的是还有一种更好的方法:完成服务(CompletionService)

CompletionService将Executor和BlockingQueue的功能融合在一起,可以通过将Callable的任务提交给他来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果

示例:使用CompletionService实现页面渲染器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @author dreamyao
* @title
* @date 2018/3/12 下午3:58
* @since 1.0.0
*/
public class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
List<ImageInfo> info =scanForImageInfo(source);
CompletionService<ImageInfo> completionService = new ExecutorCompletionService<>(executor);
for (final ImageInfo imageInfo:info){
completionService.submit(new Callable<ImageInfo>{
public ImageInfo call(){
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for(int t=0,n=info.size();t<n;t++){
Future<ImageInfo> f=completionService.take();
ImageInfo imageInfo=f.get();
renderImage(imageInfo);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
}

多个ExecutorCompletionService可以共享Executor,因此可以创建一个对于特定计算私有,又能共享一个公共Executor的ExecutorCompletionService。

取消与关闭

要使任务和线程能够安全、快速、可靠地停下来,并不是一件容易的事情,Java没有提供任何机制来安全地停止线程,虽然Thread.stop() 和suspend()等方法提供了这样的机制,但由于存在着一些严重的缺陷,因此避免使用,但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作方法是必要的,因为我们很少希望某个任务或者线程立马停止,因为这种立即停止会使共享的数据结构处于不一致的状态。

任务取消

如果外部代码能在某个操作正常完成之前将其置入完成状态,那么这个操作就可以称为可取消的,取消某个操作的原因有很多,例如:

  • 用户请求取消 :用户显示的进行关闭,如点击取消按钮
  • 有时间限制的操作
  • 应用程序事件
  • 错误
  • 关闭

在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务,只有一些协作式的机制,使请求取消的任务和代码都遵循一种友好协商的协议。

示例 :使用volatile类型的域来保存取消状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author dreamyao
* @title
* @date 2018/3/14 下午8:23
* @since 1.0.0
*/
@ThreadSafe
public class PrimeGenerator implements Runnable {
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<>();
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<>(primes);
}
}

线程池的使用

在任务与执行策略之间的隐形耦合

依赖性任务 大多数行为正确的任务都是相互独立的,它们不依赖于任务任务的执行结果或者其他效果,当在线程池中执行独立的任务时,可以任意的改变线程池的大小和配置,这些修改只会对执行性能产生影响,但是提交给线程池的任务依赖其他的任务时,那么就隐含地给执行任务带来了约束,此时必须小心的维持这些执行策略以避免产生活跃性问题

使用线程封闭机制的任务 对象可以封闭在任务线程中,使得在该线程中执行的任务在访问该对象时不需要额外的同步机制,即使资源部是线程安全的也没有问题,这种情形将在任务与执行策略之间形成隐式的耦合—— 任务要求其执行所在的Executor是单线程的:这正式newSingleThreadExecutor所保证的

对响应时间敏感的任务 如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个包含较少线程的线程池中那么将降低由该Executor管理的服务的响应性。

使用ThreadLocal的任务 ThreadLocal使每个线程都可以拥有某个变量的一个私有版本,只有当线程本地生命周期受限于任务的生命周期时,在线程池的线程使用ThreadLocal才有意义,而在线程池的线程不应该使用ThreadLocal在任务之间传递值

在一些任务中,需要拥有或排除某种特定的执行策略,如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保他们依赖的任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。

线程饥饿死锁

在单线程的Executor中,如果一个任务将另一个任务提交到了同一个Executor,并且等待这个提交任务的结果,那么通常会引发死锁,只有线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如:某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够的大,否则将发生线程饥饿死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author dreamyao
* @title
* @date 2018/3/24 下午1:05
* @since 1.0.0
*/
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class RenderPageTask implements Callable<String> {
@Override
public String call() throws Exception {
Future<String> header, footer;
header=exec.execute(new LoadFileTask("header.html"));
footer=exec.execute(new LoadFileTask("footer.html"));
String page=renderBody();
// 将发生死锁由于任务在等待子任务的结果
return header.get() + page + footer.get();
}
}
}

每当提交一个有依赖性的Executor任务时,要清楚地知道可能会出现线程饥饿死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置线程

运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也好变的糟糕,如果线程池中的线程数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性

有一项技术可以缓解时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制的等待,在平台类库中大多数可阻塞的方法中,都同时定义了限时版本和无限时版本,例如:Thread.join()、BlockingQueue.put()、CountDownLatch.await()以及Selector.select()等

如果线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小

设置线程池的大小

设置线程池的大小也并不困难,只需要避免过大和过小这两种情况

想要真正的设置线程池的大小需要如下分析:

  • 分析计算环境
  • 分析资源预算
  • 分析任务的特性

在部署的系统中需要多少CPU、多大内存、任务是计算密集型、I/O密集型还是二者皆可,他们是否需要像JDBC连接这样的稀缺资源,如果执行不通类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整

对应计算密集型任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的利用率(即使当计算密集型的线程偶尔由于页缺失故障或其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费)

当然CPU周期并不是唯一影响线程池大小的资源,还包括,内存、文件句柄、套接字句柄和数据库连接等

配置ThreadPoolExecutor

ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executor中的newCachedThreadPool、newFixedThreadPool和newScheduleThreadExecutor等工厂方法返回的,ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制

如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实列化一个对象,并根据自己的需求来定制

线程的创建与销毁

线程池的基本大小、最大大小以及存活时间等因素共同负责线程的创建与销毁,基本大小也是就是线程池的目标大小,即在没有任何任务执行时线程池的大小(在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads)并且只有在工作队列满了的情况下才会创建超出这个数量的线程,线程池的最大大小表示同时活动的线程数量的上限,如果某个线程超出了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时,newCacheThreadPool工厂方法将线程池的最大大小设置为了Integer.MAX_VALUE,而将基本大小设置为0,并将超时时间设置为1分钟,这种方式创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩

开发人员以避免将线程池的基本大小设置为0,从而最终销毁工作以免阻碍JVM的退出,然而如果线程池中没有使用SynchronizeQueue作为其工作队列(例如:newCacheThreadPool中就是如此)那么这种方式将产生一些奇怪的行为,如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadExecutor才会创建新的线程,因此当线程池的基本大小为0并且其工作队列有一定的容量,那么当把任务提交给线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常不是我们所希望的。

管理队列任务

有限的线程池中会限制可并发执行的任务数量(单线程的Executor是一种值得注意的特例:他们能确保不会有任何任务并发的执行,因为他们通过线程封闭实现线程安全性)

无限制的创建线程将会导致不稳定性,可以通过固定大小的线程池来解决这个问题,但是这个方案并不完整,在高负载的情况下应用程序扔可能耗尽资源,技术请求平均达到速率很稳定,仍然会出现请求突增的情况,经管队列有助于缓解任务的突增问题,但是如果任务持续高速的到来,那么最终还是会抑制请求的到达速率以避免耗尽内存(类似于通信网络中的限流)甚至在耗尽内存前,响应性能也将随着队列任务的增长变的越来越糟糕

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务,基本的任务排队方法有三种:

  • 无界队列
  • 有界队列
  • 同步移交

newFixedThreadPool和newSingleThreadExecutor默认情况下将使用一个无界的LinkedBlockingQueue,如果所有工作者线程处于忙碌状态,那么任务将在队列中等候,如果任务持续的快速到达,并且超过了线程池处理他们的速度,那么队列将无限制的增加

一种更为稳定的资源管理策略是使用有界队列,例如:ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue,有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题,当队列满了后,新的任务改怎么处理(有许多饱和策略)可以解决这个问题,在使用有界队列时,队列的大小与线程池的大小必须一起调节,如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用SynchronizeQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程,SynchronizeQueue不是一个真正的队列,而是一种线程之间进行移交的机制,将一个元素放入SynchronizeQueue中,必须有另外一个线程正在等待接收这个元素,如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝

饱和策略

当有界队列被填满后,饱和策略就开始发挥作用,ThreadPoolExecutor的饱和策略可以通过调用serRejectedExcutionHandler来修改(如果任务被提交到了一个已经关闭的Executor时,也会用到饱和策略)JDK提供了几种不同的RejectedExecutionExecutionHandler实现,每种实现都包含不同的饱和策略,AbortPolicy、CallerRunsPolicy、DiscarDPolicy和DiscardOldestPolicy。

  • 中止(Abort)策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException,调用者可以捕获这个异常,然后根据需求来编写自己的处理代码
  • 抛弃(Discard)当新提交的任务无法保存到队列中的等待执行时,抛弃策略会悄悄的抛弃该任务。
  • 抛弃最旧的(Discard-Oldest)抛弃最旧的测试则会抛弃下一个被执行的任务,然后尝试重新提交新的任务(如果工作队列是一个优先级队列,那么抛弃最旧的策略将导致抛弃优先级最高的任务,因此最好不要将抛弃最旧的饱和策略和优先级队列放在一起使用)
  • 调用者运行(Caller-Runs)策略实现了一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量,他不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务

示例:创建一个固定大小的线程池,并采用有界队列以及调用者运行饱和策略

1
2
3
4
public void createThreadPool(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(20));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}

当工作队列填满后,没有预定义的饱和策略来阻塞execute,然而,通过使用Semaphore来限制任务的到达率,就可以实现这个功能

示例 使用信号量来控制任务的提交速率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author dreamyao
* @title
* @date 2018/3/29 下午1:33
* @since 1.0.0
*/
@ThreadSafe
public class BoundedExcutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExcutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
exec.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}

线程工厂

每当线程池需要创建线程时,都是通过线程工厂来完成的,默认线程工厂方法创建一个新的、非守护线程,并且不包含特殊的配置信息。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author dreamyao
* @title
* @date 2018/3/30 下午1:02
* @since 1.0.0
*/
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new MyAppThread(r, poolName);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* @author dreamyao
* @title
* @date 2018/3/30 下午1:03
* @since 1.0.0
*/
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger logger = LoggerFactory.getLogger(MyAppThread.class);
public MyAppThread(Runnable target, String name) {
super(target, name + "_" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.info("UNCAUGHT in thread" + t.getName(), e);
}
});
}
public void run(){
boolean debug = debugLifecycle;
if (debug) {
logger.info("Created" + getName());
}
try {
alive.incrementAndGet();
super.run();
}finally {
alive.decrementAndGet();
if (debug) {
logger.info("Exiting " + getName());
}
}
}
public static int getThreadsCreaded(){
return created.get();
}
public static int getThreadsAlive(){
return alive.get();
}
public static boolean getDebug(){
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}

扩展ThreadPoolExecutor

ThreadPoolExcutor提供了几个在子类化中改写的方法:beforeExecute、afterExecute、terminated

在执行任务的线程中将调用beforeExecute、afterExecute等方法,在这些方法中还可以添加日志、计时、监视、统计信息收集的功能。无论任务是从run中正常返回还是抛出异常而返回,afterExecute方法都会被调用(如果任务在完成后带有一个Error,那么就不会调用afterExecute)如果BeforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

在线程池关闭操作时调用terminated,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志、或者收集finalize统计信息等操作。

示例 线程池添加统计信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* @author dreamyao
* @title
* @date 2018/3/30 下午1:31
* @since 1.0.0
*/
public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final Logger logger = LoggerFactory.getLogger(TimingThreadPool.class);
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logger.info(String.format("Thread %s: start %s", t, r));
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
logger.info(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
logger.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}

递归算法的并行化

如果在循环体中包含了一些密集计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行优化。

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转化为并行循环。

如果需要提交一个任务集并等待他们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获得结果。

当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化

示例 谜题框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author dreamyao
* @title
* @date 2018/3/31 下午7:46
* @since 1.0.0
*/
public interface Puzzle<P,M> {
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position, M move);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* @author dreamyao
* @title
* @date 2018/3/31 下午7:53
* @since 1.0.0
*/
public class SequentialPuzzledSolver<P,M> {
private final Puzzle<P, M> puzzle;
private final Set<P> seen = new HashSet<>();
public SequentialPuzzledSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}
public List<M> solve(){
P pos = puzzle.initialPosition();
return search(new Node<>(pos, null, null));
}
private List<M> search(Node<P,M> node) {
if (!seen.contains(node.pos)) {
seen.add(node.pos);
if (puzzle.isGoal(node.pos)) {
return node.asMoveList();
}
for (M move : puzzle.legalMoves(node.pos)) {
P pos = puzzle.move(node.pos, move);
Node<P, M> child = new Node<>(pos, move, node);
List<M> result = search(child);
if (result != null) {
return result;
}
}
}
return null;
}
@Immutable
static class Node<P,M>{
final P pos;
final M move;
final Node<P, M> prev;
public Node(P pos, M move, Node<P, M> prev) {
this.pos = pos;
this.move = move;
this.prev = prev;
}
List<M> asMoveList(){
List<M> solution = new LinkedList<>();
for (Node<P, M> n = this; n.move != null; n = n.prev) {
solution.add(0, n.move);
}
return solution;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* @author dreamyao
* @title
* @date 2018/3/31 下午8:03
* @since 1.0.0
*/
public class ConcurrentPuzzleSolver<P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
final ValueLatch<SequentialPuzzledSolver.Node<P, M>> solution = new ValueLatch<>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec, ConcurrentMap<P, Boolean> seen) {
this.puzzle = puzzle;
this.exec = exec;
this.seen = seen;
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// 阻塞直到找到解答
SequentialPuzzledSolver.Node<P, M> solnNode = solution.getValue();
return (solnNode == null) ? null : solnNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, SequentialPuzzledSolver.Node<P, M> n) {
return new SolverTask(p, m, n);
}
class SolverTask extends SequentialPuzzledSolver.Node<P, M> implements Runnable {
public SolverTask(P pos, M move, SequentialPuzzledSolver.Node<P, M> prev) {
super(pos, move, prev);
}
@Override
public void run() {
if (solution.isSet() || seen.putIfAbsent(pos, true) != null) {
// 已经找到了解答或者已经遍历这个位置
return;
}
if (puzzle.isGoal(pos)) {
solution.setValue(this);
} else {
for (M m : puzzle.legalMoves(pos)) {
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @author dreamyao
* @title
* @date 2018/3/31 下午8:18
* @since 1.0.0
*/
public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M> {
private final AtomicInteger taskCount = new AtomicInteger(0);
public PuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec, ConcurrentMap<P, Boolean> seen) {
super(puzzle, exec, seen);
}
protected Runnable newTask(P p, M m, SequentialPuzzledSolver.Node<P,M> n) {
return new CountingSolverTask(p, m, n);
}
class CountingSolverTask extends SolverTask {
CountingSolverTask(P pos, M move, SequentialPuzzledSolver.Node<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}
@Override
public void run() {
try {
super.run();
} finally {
if (taskCount.decrementAndGet() == 0) {
solution.setValue(null);
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author dreamyao
* @title
* @date 2018/3/31 下午8:06
* @since 1.0.0
*/
@ThreadSafe
public class ValueLatch<T> {
@GuardedBy("this")
private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet(){
return done.getCount() == 0;
}
public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException {
done.await();
synchronized (this) {
return value;
}
}
}

避免活跃性危险

死锁

当一个线程永远的持有一个锁,并且其他线程尝试获得这个锁时,那么他们将永远被阻塞。

锁顺序死锁

两个线程以不同的顺序来获取相同的锁,那么会发生死锁,如果所有的线程都以固定的顺序来获得锁,那么在程序中就不会出现锁顺序死锁问题。

示例 简单的锁顺序死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @author dreamyao
* @title
* @date 2018/4/4 下午1:28
* @since 1.0.0
*/
public class LeftRightDeadLock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight() {
synchronized (left) {
synchronized (right) {
}
}
}
public void rightLeft() {
synchronized (right) {
synchronized (left) {
}
}
}
}

注:文章来源DreamYao,转载请获得许可,谢谢。

给作者买杯咖啡吧。喵~