Java中的同步原语

volatile

volatile可以保证修饰变量的可见性和有序性,对于被volatile修饰的变量,对其进行单个读写,等价于被synchronized修饰的读操作或者写操作。

如何保证可见性

实现情况视处理器而定,在intel处理器上,对被volatile修饰的变量进行写操作的指令,在翻译为汇编指令时,会加上lock前缀,处理器在执行这一指令时,会将缓存行中的数据写会内存,其他CPU会通过嗅探总线,如果本地内存内缓存了此变量,会时当前值无效,重新读取。

如何保证有序性

在对volatile进行读写操作指令在编译为字节码时,会通过在指令序列中插入内存屏障指令来预防编译器和处理器为提高执行效率而进行的指令重排序,以此保证执行的有序性。

synchronized

java中的每一个对象都可以作为锁,使用synchronized加锁,根据使用场景,有三种不同的情况:

  • 对于普通同步方法,锁是实例对象;
  • 对于同步代码块,锁是当前类的Class对象(访问当前类方法和属性的入口);
  • 对于同步方法块,锁是括号中配置的对象。
    synchronized用的锁是存在Java对象头里的,Java对象头中的Mark Word默认存储对象的hashCode,分段年龄和锁标记位。

lock.png

fianl域的内存语义

编译器和处理器对于final域的的处理,在进行重排序时需要遵守两个规则:

  • 在构造函数内对一个final域的写入与随后将这个引用赋值给其他引用的操作不能重排;
  • 初次读一个包含final域对象的引用,与随后读这个final域,不能重排。
    实现以上规则同样是依赖在字节码指令序列中插入内存屏障。

Java中的锁

Condition接口

synchronized关键字配合Object类提供的wait(), wait(long timeout),notify(), notifyAll()等方法,可以实现等待/通知模式。Condition接口也提供了类似的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Condition {
//进入等待状态,直到其他线程调用signal()或者signalAll()方法进行唤醒。或者其他线程中断当前线程。如果当前线程返回,则已经重新获得锁。
void await() throws InterruptedException;

//不可中断
void awaitUninterruptibly();

//超时后返回,单位为毫秒
long awaitNanos(long nanosTimeout) throws InterruptedException;

//超时后返回,单位为 TimeUnit中的枚举
boolean await(long time, TimeUnit unit) throws InterruptedException;

//超时到将来的具体时间
boolean awaitUntil(Date deadline) throws InterruptedException;

//唤醒一个线程
void signal();

//唤醒所有线程
void signalAll();
}

Lock接口

synchronized进行加锁时,都是隐式加锁,不容易因为释放锁导致出错,Lock接口需要显式加锁,更加灵活。Lock接口具备synchronized关键字所不具备的灵活性:

  • 超时加锁,在指定时间内如果尚未获取到锁,返回false,如果在超时等待的时间内被中断,则抛出异常,获取到锁则返回true。
  • 可以响应中断,在线程获取锁的过程中,可以响应中断,避免死锁发生。
  • 非阻塞的获取锁,使用synchronized加锁,如果没有获得锁,则会进入阻塞状态,Lock加锁则是进入等待状态。

AbstractQueuedSychronizer

AQS的设计是基于模版方法mo模式的,使用者需要继承AQS,重写指定的方法,然后组合到同步组件当中使用。同步组件对外提供的调用方法,可以委托给AQS子类具体执行。

AQS的使用

同步器提供了三个方法,可以线程安全的访问同步的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

具体同步组件只需视情况实现以下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//独占式获取同步状态,在具体实现中,需要原子判断当前state是否符合预期(为旧值,其他线程未修改),如果符合,将状态设置为新值。
protected boolean tryAcquire(long arg) {
throw new UnsupportedOperationException();
}

//释放同步状态
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}

//共享式获取
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}

//共享式释放
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}

//判断当前状态是否被独占
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

同步组件对外提供了一些模版方法,供外部查询和操作同步状态,这些方法可以支持超时和中断的独占式获取和共享式获取同步状态。值得注意的是,这些方法都已经被final修饰,不可重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

AQS的实现

同步队列

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,会将当前线程以及等待状态信息构成一个Node加入到队尾,同时会阻塞当前线程,当同步状态被释放时,会从队列首部唤醒节点中的线程,使其尝试获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
NOde的部分字段
static final class Node {
volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;


}

AQS使用同步队列维护获取同步状态失败而阻塞的的线程,head指向头节点,每次获取状态失败的线程构成节点以后加入队列尾部。首节点是获取到同步状态的线程,当其释放同步状态时,会将首节点设置为其后继节点。

tail.png

1
2
3
4
5
6
7
8
9
10
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

ConcurrentHashMap

为什么使用ConcurrentHashMap

HashMap是线程不安全的,在多线程执行put操作过程中,有可能会试容量达到阈值,触发扩容操作,HashMap的扩容操作会将数组容量扩大为当前数组长度的两倍,重新遍历HashMap,将每一个链表中的元素进行重新hash,存入新的HashMap中。如果是多线程进行put,会出现链表成环,导致HashMap的get操作无法结束,CPU利用率达到100%;老生常谈,HashMap的死循环

HashTable几乎提供了与HashMap相同的操作,但是HashMap的很多方法都是通过synchronized修饰的,多线程操作会导致线程阻塞,即便是多个只进行查询操作的线程,这样使得效率非常低下。

我们可以使用Collections提供的封装方法,得到线程安全的Map。但是看了下面SynchronizedMap的实现,是使用了一个Object对象作为锁,同样每一个操作方法都被synchronized修饰了,可见效率也不高。如果需要线程安全且比较高效的HashMap,可以使用ConcurrentHashMap。

1
2
3
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}

为什么ConcurrentHashMap是线程安全的

ConcurrentHashMap是由Segment数组和HashEntry数组组成的,Segment是一种可重入锁,HashEntry用于存储键值对数据。一个Segment包含一个HashEntry数组,每个HashEntry是一个链表结构,每个Segment守护一个HashEntry数组的元素,当需要对数组中的元素进行修改时,必须先获得对应Segment的锁。

struct.png

初始化

  • 初始化segments数组,通过conrencyLevel计算数组长度,必须是2的整数幂。
  • 初始化segmentShift和segmentMask,segmentShift用于定位参与散列运算的位数,segmentMask是参与与hash做与运算的掩码,为size-1.
  • 初始化每一个segment,每一个HashEntry长度同样需要是2的整数幂长度。

get操作

get操作是不需要获取锁的,因为每一个HashEntry节点的value已经被volatile修饰了,可以保证读到的值是最新的值。
定位节点分为两步,首先定位目标segment,然后再定位具体的节点

1
2
hash >>> segmentShift) & segmentMask  // 定位Segment所使用的hash算法 
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash算法

put操作

在进行put操作时,首先定位到具体的segment,会对当前segment进行加锁,然后判断是否需要扩容,如果需要扩容,只对当前segment执行扩容操作,最后再添加元素,这不同于HashMap,HashMap添加节点以后进行扩容,如果以后都不再添加元素,这也许是一次无效扩容。

查询size

每个Segment使用volatile维护了一个表示当前segment内元素数量的count,但是显然对每个segment中count进行求和的操作不是原子性的。ConcurentHashMap还维护了一个变量,modCount,每次对ConcurrentHashMap中元素的修改操作会使得当前变量加1,所以在对count进行求和之前,保留modCount的副本,在求和以后如果modCount没有发生变化,证明求和这段时间没有线程对容器内元素进行操作,对count的求和是可靠的,如果modCount发生了改变,则需要重新求和,连续两次容器的大小都没有成功正确统计到,则对所有的segment加锁然后求和。


集合框架的使用

Java集合框架是一类用来存放元素的容器,主要分为实现了Map接口和实现了Collection接口的两类,前者存放的元素是一对key-value的映射,要求key唯一,后者存放的是单个元素,其中实现了Set接口的容器要求容器中的元素不可重复。

泛型

所谓泛型,就是类型参数话,集合框架下定义的容器希望不被局限于某一类元素,而是一套通用框架,可以存放除基本类型以为的其他类型。在集合容器变量声明时使用某一类型,那么此容器就只能存放这一类型的元素,并且容器中的元素可以调用这一类型元素可以调用的方法。

Collection类

collection.png

ArrayList

ArrayList是通过数组实现的List,具有快速随机查找的特点,但是删除元素和插入元素效率较低。常用的API有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int size();
public boolean isEmpty()
public boolean contains(Object o)
public int indexOf(Object o)
public Object[] toArray() //返回一个包含当前所有元素的数组
public int lastIndexOf(Object o)

public E set(int index, E element)
public E get(int index)
public void add(int index, E element) //在指定下标插入元素
public boolean add(E e)
public boolean remove(Object o) //删除指定下标的元素
public E remove(int index)
public void clear()
public boolean addAll(Collection<? extends E> c) //在当前列表后依次添加c中的元素

ArrayList实现了Cloneable接口,可以实现存储元素引用的浅拷贝,即两个容器中引用对象仍然相同,实现了Serializable接口,支持序列化和反序列化。

LinkedList

使用链表实现,每个节点item引用对象,和next,pre两个指针。LinkedList实现了Deque接口,可以从链表双端对链表进行操作。由于实现方式是链表,所以LinkedList具有插入和删除比较快,随机访问较慢的特点。LinkedList具有很多双端操作链表的API,可以认为如果不带显式的first和last方法,那么删除元素默认在链表尾部进行,添加元素在链表头部进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static class Node<E> {
E item;
Node<E> next;
Node<E> prev;

Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}
public boolean removeLastOccurrence(Object o)
public boolean removeFirstOccurrence(Object o)

public E pop()
public void push(E e)
public E pollFirst() //删除链表首部元素
public E pollLast()
public E peekFirst() //取得链表头节点
public E peekLast() //取得链表尾节点
public boolean offerFirst(E e)
public boolean offerLast(E e)

LinkedList不是线程安全的,如果有多个线程同时对LinkedList的结构进行修改(增加或者删除节点),有可能会导致链表出现环状。可以使用如下方法进行包装得到线程安全的List。

1
public static <T> List<T> synchronizedList(List<T> list)

Vector

Vector是线程安全的,有可能引起线程不安全的操作和获取相关Vector信息的值的方法都被synchronized修饰了。但是在不要求线程安全的场景下,推荐使用ArrayList代替Vector,性能更好。

Stack

Stack是继承自Vector的集合,具有后进先出的特点。

1
2
3
4
5
public E push(E item)
public synchronized E pop()
public boolean empty()
public synchronized int search(Object o)
public synchronized E peek()

HashSet

HashSet实现了Set接口,可以保证元素的容器中元素唯一,但是不保证有序。HastSet是通过HashMap实现的,保证容器中元素唯一的方式是,每次插入的元素实际是作为(key,value)中的key,value是一个Object对象,HashMap是可以保证存入元素key是唯一的,所以HashSet能够保证容器中的元素唯一。

1
2
3
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

HashSet同样是线程不安全的,如果需要在多线程场景中使用hashSet,推荐如下封装:

1
Set s = Collections.synchronizedSet(new HashSet(...));

LinkedHashSet

继承自HashSet,与HashSet相同,可保证元素唯一,使用LinkedHashMap实现,可以保证插入元素顺序可知。

TreeSet

实现了SortedSet接口,可以保证容器内元素不重复,可排序。

Map类

map.png
实现了Map接口的容器类用来存放的元素是一组key到value的映射关系,LinkedHashMap继承自HashMap,可以在HashMap的基础上保证存入顺序是可知的。

HashMap

LinkedHashMap

1
2
3
4
5
6
static class Entry<K,V> extends HashMap.Node<K,V> {
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}

LinkedHashMap实现这一功能的方式是,其中的每一个节点,都有两个指针,before和after,分别指向前一个插入节点,和后一个插入的节点。

LinkedhashMap是如何保证插入顺序有序的?重写了HashMap的newNode方法,将当前插入节点的before指针指向了插入前的尾节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
LinkedHashMap.Entry<K,V> p =
new LinkedHashMap.Entry<K,V>(hash, key, value, e);
linkNodeLast(p);
return p;
}
private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
LinkedHashMap.Entry<K,V> last = tail;
tail = p;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
Map<String, Integer> scores = new LinkedHashMap<>();
scores.put("chemistry", 93);
scores.put("math", 98);
scores.put("biology", 92);
scores.put("english", 97);
scores.put("physics", 94);
scores.put("chinese", 99);
scores.put("geography", 95);
Iterator<Map.Entry<String, Integer>> iterator = scores.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Integer> score = iterator.next();
System.out.println("subject: " + score.getKey() + " score: " + score.getValue());
}
}

map_answer.png

TreeMap

基于红黑树实现的,对可对key进行排序的NavigableMap。

reference:
https://zhuanlan.zhihu.com/p/34490361


HashMap的使用

Node和初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;

Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//hashCode通过key于value计算得到
public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
}
  • 哈希桶的大小,默认为16,如果自定义,需要为2的n次幂,这样可以使用与运算高效的代替模运算。
  • 负载因子,负载因子*数组长度等于容量阈值。
  • 阈值,当前HashMap中元素元素超过阈值,需要扩容,防止各种操作hash冲突增多,效率变低。

    hash和寻址

    1
    2
    3
    4
    5
    static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }
    index = hash&(tab.length-1);
    hash值通过key的hashCode异或本身高位得到的,这样可以让高位参与运算,否则hash值异或(tab.length-1)将不能利用hash值的高位,导致hash冲突变多。

put过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null) //判断数组位置是否已经插入元素,如果没有就插入数组中
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k)))) //通过hash和key确定,是否key相同,是的话覆盖value即可
e = p;
else if (p instanceof TreeNode) //判断是不是转为了红黑树,是的话将此节点插入树中
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else { //循环遍历链表
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) { //到达尾部,没有找到key相同的节点,插入尾部
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
//找到key相同的节点,直接返回,执行1

if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
//1,在key相同的节点,修改value
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
//size大于阈值,扩容
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

get过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k)))) //首先通过hash值在哈希表数组中寻找,如果hash和key都符合,则表示找到,可以返回
return first;
if ((e = first.next) != null) { //出现hash冲突,
if (first instanceof TreeNode)//如果子节点是树节点,在红黑树中寻找
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do { //循环遍历链表,在链表中查找
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

扩容

在容量超过阈值以后,会进行扩容,数组容量是当前数组的两倍。扩容后会为当前HashMap中每一个桶中的元素重新寻址,新的下标是e.hash&(newCapcity-1)。如果新的hash桶中已有元素,则类似插入元素的方式将当前元素插入hash表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) { //决定存在数组高位还是低位
if (loTail == null)
loHead = e;
else
loTail.next = e; //会形成一个链表,link_1;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e; //也会形成一个链表,link_2;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead; //存link_1在新数组低位
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead; //存link_2在数组高位
}
}

Executor框架的使用

在Executor框架中,工作单元包括Runnable和Callable,执行机制由Executor框架提供。

两层调度模型

executor.png

多线程程序将任务分解为多个任务,然后由用户级调度器Executor将这些任务交由Java线程执行。Java线程并不是直接被CPU调度执行,还会映射为操作系统内核线程,由内核调度器将内核线程调度到CPU执行。祥见Java线程和os线程

Exectuor框架的结构

worker.png

manager.png
Executor框架由三大部分组成:

  • 任务,包括被执行任务需要实现的接口,Runnable接口和Callable接口;
  • 任务的执行,任务机制的和讯即接口及其实现类,ThreadPoolExecutor和ScheduledThreadPool;
  • 异步计算的结果,Future及其实现类FutureTask;

Executor框架的使用

use.png
主线程创建一个Runnable或者Callable任务(Executor可以将Runnable类型转换为Callable类型),然后交给Executor执行,主线程通过返回的Future接口,阻塞等待任务执行以后返回结果,也可以在等待过程中取消任务执行。

Executor框架核心类

  • ThreadPoolExecutor,线程池的核心实现类,用来执行被提交的任务。通过工厂类Executors实现三种类型线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//核心线程数和最大工作线程数量相等,避免创建大量线程,适用于服务器负载较重的情况。
//使用无界队列LinkedBlockingQueue作为任务管理队列,意味着线程池中工作线程不会超过核心线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

//没有核心线程,最大工作线程取整数的最大值,适用于有比较多短期的小任务场景;
//使用SynchronousQueue作为工作队列,当主线程提交任务速度大于线程处理速度时,会不断创建线程,有可能会因为创建过多线程导致CPU和内存耗尽
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//只会创建一个工作线程,适用于需要保证顺序的执行各个任务的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ScheduledThreadPoolExecutor用于在固定延迟后执行任务,通过Executors创建,包括两种类型:

1
2
3
4
5
6
7
8
9
10
//只会创建单个核心线程,工作线程为Integer.MAX_VALUE
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

//可以设置多个核心线程,工作线程为Integer.MAX_VALUE
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

Runnable接口和Callable接口的实现类,都可以交个线程池执行,不同的是Callable接口可以返回结果.也可以将一个Runnable对象封装为Callable对象:

1
2
3
4
5
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

ScheduledThreadPoolExecutor

1
2
3
4
5
6
7
//使用DelayWorkerQueue作为工作队列,这是一个无界阻塞队列,使用PriorityQueue实现。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

schedule.png
主线程向DelayWorkerQueue中添加任务时,任务会被包装为ScheduledFutureTask,线程池中的线程会从队列中取出任务执行。

ScheduledThreadPoolExecutor的实现

1
2
3
4
5
6
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns; //表示这个任务将要被执行的具体时间
this.period = 0; //表示任务执行的间隔周期,
this.sequenceNumber = sequencer.getAndIncrement(); //被添加入任务队列的顺序
}

DelayQueue中的任务是ScheduledFutureTask类型,包括三个成员变量。DelayQueue封装了一个PriorityQueue,会根据ScheduledFutureTask的time和sqquenceNumber进行排序。线程池中的线程会从任务队列中取出time大于当前时间的任务进行执行。在执行结束以后,会更新time,重新将任务放回队列之中。

1
2
3
4
5
6
7
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

FutureTask

由于FutureTask继承了Future接口和Runnable接口,所以可以把一个FutureTask接口交由实现了Executor的线程池执行,也可以作为计算结果返回,然后执行FutureTask.get()阻塞当前线程等待返回计算结果。

1
<T> Future<T> submit(Runnable task, T result);

FutureTask是基于AbstractQueuedSynchronizer(AQS)实现的,很多可阻塞类都是基于AQS实现的,AQS是一个原子框架,提供了通用机制来原子性的管理状态,阻塞和唤醒线程,以及维护被阻塞线程的队列。对于很多阻塞类,其具体操作都会委托给实现了AQS的内部类Sync,由Sync进行具体的操作。

aqs.png


线程池的使用

使用线程池的益处

  • 避免重复创建线程执行任务,减少了创建线程和销毁线程需要的时间开销和性能开销;
  • 提高任务响应速度,线程池中通常缓存有线程,当提交任务以后,可迅速执行;
  • 避免了无规则的创建大量线程,导致大量线程排队等待CPU,响应速度变慢;

线程池处理流程

stage.png

我们可以调用线程池的execute和submit方法来提交任务,提交参数都是一个Runnable实例,不同的是,submit会返回一个Future类型的对象,可以通过future对象的get方法获得返回值,注意这个方法会阻塞当前线程。

  1. 当一个任务提交给线程池时,如果当前线程池中线程数量少于核心线程数,会重新创建新的线程执行这个任务,然后通过线程安全的方法更新当前线程数。注意,当提交一个任务给线程池时,线程池会创建一个核心线程来执行,即使其他核心线程空闲,直到核心线程达到预设值。
  2. 如果当前线程数已经大于或者等于核心线程数,那会尝试判断阻塞队列是否已满,未满的话将任务加入到阻塞队列中。
  3. 已满则判断当前线程池数量是否小于最大线程数量,如果是的话,创建工作线程执行,否则的话执行拒绝策略。

线程池参数说明

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize, //最大工作线程数,不能小于corePoolSize
long keepAliveTime, //非核心线程超时时间,闲置时间超过会被销毁
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //用于保存等待执行的任务的阻塞队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler //拒绝策略)

阻塞队列有很多种,newCachedThreadPool使用SynchronousQueue,这种阻塞队列不存储任务,每个插入操作必须等待一个线程执行取出操作,否则插入线程阻塞。newFixedThreadPool则使用LinkedBlockingQueue。

handle.png

拒接策略分为四种,分别是直接只用调用者所在线程执行任务、直接抛出异常、丢弃队列里等待最长时间的任务,执行当前任务、或者直接丢弃掉当前任务。

配置线程池

根据不同的情况配置线程池:

  • 根据任务性质,比如是CPU密集型还是IO密集型,CPU密集型应该配置尽可能少的线程,防止线程持续等待CPU分配时间,IO密集型就可以多一些线程,因为大部分线程可能在等待IO;
  • 任务优先级,是否有些任务是高优执行的,可以使用ProrityQueue作为阻塞队列。

Java中的并发工具类

Java中有许多工具类可以在并发场景中使用,简化并发编程难度,提高程序准确性。

CountDaowLatch

可以实现类似于fork-join模型提供的功能,在多线程场景中,用于等待其他线程完成的线程可以调用countDownLatch的await方法进入等待状态,只有当其他线程将CountDownLatch中保存的值递减到0时,等待线程才会继续运行。

使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[]args) throws InterruptedException {
Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("continue");
countDownLatch.countDown();
System.out.println("continue");
countDownLatch.countDown();
}
});
threadOne.start();
countDownLatch.await();
System.out.println("over");
}
}

answer.png

运行结果总是相同,主线程在运行到await时,会进入等待状态,只有当子线程两次执行完countDown之后,主线程才会继续执行。

实现原理

查看CountDownLatch的实现,可以看到在new一个CountDownLatch时,需要一个int类型的参数:

1
2
3
4
5
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync是CountDownLatch的一个继承了AbstractQueuedSynchronizer的内部类,用于实现同步控制,具有一些列加锁和解锁的方法。构造CountDownLatch时传入的参数最终用来设置一个被volatile修饰的属性state。这个state值可以理解了当前所有线程可重入的获得了多少锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

countDown会将当前的state值减1,这可以理解为释放一把锁的过程。

当主线程调用await方法之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

如果当前的state没有减到0,主线程就会去执行doAcquireSharedInterruptibly,这个方法会使得主线程不断死循环的去获取“锁”,或者直到中断,直到state减少到0,主线程才能得到“锁”,解除循环,继续执行。

Semaphore

使用方式

可以用于做流量控制,限制多线程对有限资源的访问,在多并发场景下,如果资源数量有限,只能够支持有限的线程的使用,可以使用信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SemaphoreTest {
private static final int THREAD_COUNT = 30; //线程规模
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < 100 * THREAD_COUNT; i++) { //任务数量远大于线程数
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire(); //获取信号量
printThreadCount();
s.release(); //释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}

private static synchronized void printThreadCount() {
System.out.println("当前可用许可证数:"+s.availablePermits() + " 等待线程数" + s.getQueueLength());
}
}

ww.png

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

private final Sync sync;

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

Sync(int permits) {
setState(permits);
}

protected final void setState(int newState) {
state = newState;
}

在初始化Semaphore时,默认构造非公平的同步器,传入参数为信号量的值。
在线程执行到acquire时,会尝试可中断的去获取信号量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); //1
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;//目前剩余信号量,小于0证明已经无可用
//尝试判断,小于0或者原子重置信号量值失败,都会返回负值,然后进入等待队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

在释放信号量以后,会通过原子操作给state值加一,如果当前state的值大于0,会在等待队列中唤醒队列首部的线程去获得信号量。


Java中的原子类

在多线程读写共享变量的场景中,很容易出现数据竞争,导致数据不一致。Java提供了synchronized关键字和Lock接口来保证多线程对同步块的有序访问,但是这两种方式都需要隐式或者显式的获取锁,性能开销略大。Java的Atomic包提供了多个原子操作类,可以安全、高效、简单的实现在多线程场景下读写变量。Atomic包中的类基本上都是使用Unsafe实现的包装类。

AtomicInteger

包括诸多方法实现整型数据的原子操作。比如对于整数的递增操作i++,由于这一操作并不是原子的,所以即便使用volatile修饰也不能保证线程安全,这种场景就可以使用AtomicInteger的方法:

1
2
3
4
5
6
7
8
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下通过AtomitInteger的addAndGet方法来分析这一原子类是如何实现线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @param valueOffset the value memory address.
* @return the updated value
*/
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

var5是通过native方法取得的当前变量的值,compareAndSwapInt通过原子操作,将预期的的结果var5+var4替换为当前变量的值var5,如果方法成功就,compareAndSwapInt返回ture,循环结束,如果返回false,说明有其他线程在这段时间修改了当前变量的值,会重新通过循环获取var5,继续重试。

AtomicBoolean

实现原理与AtomicInteger基本相同,核心思想是将true和false映射为1和0。int类型的value存储的就是当前AtomicBoolean的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private volatile int value;

/**
* Creates a new {@code AtomicBoolean} with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicBoolean(boolean initialValue) {
value = initialValue ? 1 : 0;
}

/**
* Atomically sets to the given value and returns the previous value.
*
* @param newValue the new value
* @return the previous value
*/
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}

public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0; //将boolean类型映射为int类型
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);//通过原子操作更新当前值。
}

原子更新数组

更新数组的值是通过调用其他原子更新基本类型或者引用类型来实现的,重点在于通过数组下标获得当前值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);//得到下标在内存中的地址
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);//转为int
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

//在内存中取得当前值
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}

//以原子方式修改数组偏移量的值
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}

原子更新引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package longkai;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
public static void main(String[] args) {
AtomicReference<User> atomicReference = new AtomicReference<User>();
User firstUser = new User("long", 23);
atomicReference.set(firstUser);
User secondUser = new User("kai", 24);
atomicReference.compareAndSet(firstUser, secondUser);
System.out.println(atomicReference.get().getName());
System.out.println(atomicReference.get().getAge());

}
}

class User {
private String name;
private int age;

public User(String name, int age) {
this.name = name;
this.age = age;
}

public int getAge() {
return age;
}

public String getName() {
return name;
}
}

public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

实现方式

通过查看源码,最后发现Unfase类只提供了三种CAS方法:

1
2
3
4
5
6
7
8
9
/**
* 如果当前值为var4,则将值更新为var5
*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var5);

与AtomicBoolean类似,原子更新char、short、float、double可以实现。CAS算法是一种乐观锁的实现思想,在更新变量之前不对更新操作进行加锁,而是在更新之后再去看当前变量内存地址的值有没有发生改变,如果没有,就将希望更新的值写入到内存,如果发生了更改,则继续重试。


在冬季和北京再次相遇

昨天晚上八点到的北京,从广州穿着短袖飞越两千多公里,下飞机以后穿着和路人显得格格不入,大家都是穿着厚厚的棉袄了。

走了二十多分钟到之前订的民宿里暂住一晚,一路上是那种好久没有感觉过的冷。房东是个大学老师,很客气。十一点吃完外卖直接睡下了,感觉还是没有住酒店轻松,民宿总感觉不方便,比如担心声音大了一点惹主人家不高兴。

早上起来赶到回龙观和房东谈房子,感觉北京的空气比前些年要好很多了,很明显有那种高纬度地区天空纯洁的感觉。前几天从广州寄的被子今天刚好送到,不早不晚,可以直接用上。

秋招的offer拿到的全是北京的公司。在面试时很多面试官都会问我为什么要来北京?和同学说我打算去北京,他们也会问为什么要去那么远呢?北京空气质量不好,气候干燥,交通还拥堵。这些都是我真真实实要面对的问题,

但是我还是没有打算去其他城市,我觉得北京对于我有一种莫名的吸引力,或许是他深厚的文化底蕴,或许是他领先的经济发展水平,或者是他各类的活跃人才,这些也许其他城市也有,但总不能和北京比肩。在北京工作的经历会是一段富有意义的经历,这样的经历,越早越好。

近来也会思考人生的意义,以前觉得人生应该最求快乐,怎样快乐就怎样去度过一生,但是现在不这么觉得了,人生应该是追求充实,同样要去接纳苦难和波澜不惊,应该去领略各式各样的人生,如果总是快乐的,不免显得太单调无味。

距离今年春节已经不足三个月了,发现手里的事情还蛮多的,而且都有难度,毕业设计已经提上日程,目前还没有方向,想要在春招转后端开发,但是要学的东西还有很多,在快手的实习工作量应该会很足,也担心一些工作可能做不好,嗯,是的,不会说做完什么事就没事了,事情永远一件接一件,那,干就完了。