# 一.并发工具包
# 1.JUC 包概述?
juc 是 java.util.concurrent 的简称,为了支持高并发任务,在编程时可以有效减少竞争条件和死锁线程.
juc 主要包含 5 大工具包
工具包 | 描述 |
---|---|
locks | - ReentrantLock: 独占锁,同一时间只能被一个线程获取,支持重入性。 - ReentrantReadWriteLock: 读写锁,ReadLock 是共享锁,WriteLock 是独占锁。 - LockSupport: 提供阻塞和解除阻塞线程的功能,不会导致死锁。 |
executor | - Executor: 线程池的顶级接口。 - ExecutorService: 扩展了 Executor 接口,用于管理线程池的生命周期和任务执行。 - ScheduledExecutorService: 继承自 ExecutorService,支持周期性执行任务。 - ScheduledThreadPoolExecutor: ScheduledExecutorService 的实现类,用于周期性调度任务。 |
tools | - CountDownLatch: 一个同步辅助类,等待其他线程执行完任务后再执行。 - CyclicBarrier: 另一个同步辅助类,当所有线程都到达某个公共的屏障点时,才继续执行。 - Semaphore: 计数信号量,本质上是共享锁,通过 acquire() 获取信号量许可,通过 release() 释放许可。 |
atomic | - AtomicBoolean: 原子操作类,提供原子更新 boolean 值的功能。 - AtomicInteger: 原子操作类,提供原子更新 int 值的功能。 |
collections | - HashMap: 线程不安全的哈希表实现,对应的高并发类是 ConcurrentHashMap。 - ArrayList: 线程不安全的动态数组实现,对应的高并发类是 CopyOnWriteArrayList。 - HashSet: 线程不安全的哈希集合实现,对应的高并发类是 CopyOnWriteArraySet。 - Queue: 线程安全的队列实现,对应的高并发类是 ConcurrentLinkedQueue。 - SimpleDateFormat: 线程不安全的日期格式化类,对应的高并发类是 FastDateFormat 或者 DateFormatUtils。 |
请注意,JUC(Java.util.concurrent)是 Java 中处理并发编程的工具包,包含了大量用于处理多线程编程的类和接口。以上列出的工具包和类仅是其中的一部分,还有很多其他有用的工具供开发者使用。
# 2.并发编程逻辑
底层依赖—>中层工具—>具体实现
- 首先,声明共享变量为 volatile。
- 然后,使用 CAS 的原子条件更新来实现线程之间的同步。
- 同时,配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。
# 3.为什么用 ConcurrentHashMap?
ConcurrentHashmap 存在的原因:
- HashMap 线程不安全的 map,多线程环境下 put 操作会出现死循环.会导致 Entry 链表变为环形结构.next 节点用不为空,就成了死循环获取 Entry.
- HashTable 效率低下
- ConcurrentHashMap 锁分段(jdk1.7)可以提高并发访问效率
ConcurrentHashmap 和 HashMap 区别:
HashMap 是非线程安全的,而 HashTable 和 ConcurrentHashMap 都是线程安全的
HashMap 的 key 和 value 均可以为 null;而 HashTable 和 ConcurrentHashMap 的 key 和 value 均不可以为 null
HashTable 和 ConcurrentHashMap 的区别:保证线程安全的方式不同,HashTable 是使用 synchronized,ConcurrentHashMap 在 jdk1.8 中使用 cas 和 synchronized
# 4.ConcurrentHashMap 数据结构?
jdk1.8 之前的数据结构:
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结合组成。默认有 16 个区段。Segment 是一种可重入锁(ReentrantLock),在 ConcurrentHashMap 里扮演锁的角色;HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组。Seqment 的结构和 HashMap 类似,是一种数组和链表结构。一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得与它对应的 Segment 锁.
不同线程操作不同的 Segment 就不会出现安全问题,性能上大提升.在 jdk1.8 之前,多线程操作同一个 Segment 不能并发,在 jdk1.8 优化了.
jdk1.8 之后的数据结构:
改进:没有 Segment 区段了,和 HashMap 一致了,数组+链表+红黑树 +乐观锁 + synchronized
ConcurrentHashMap 数据结构与 1.8 中的 HashMap 保持一致,均为数组+链表+红黑树,是通过乐观锁+Synchronized 来保证线程安全的.当多线程并发向同一个散列桶添加元素时。
若散列桶为空:
此时触发乐观锁机制,线程会获取到桶中的版本号,在添加节点之前,判断线程中获取的版本号与桶中实际存在的版本号是否一致,若一致,则添加成功,若不一致,则让线程自旋。
若散列桶不为空:
此时使用 synchronized 来保证线程安全,先访问到的线程会给桶中的头节点加锁,从而保证线程安全。
# 5.ConcurrentHashMap 初始化?
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
2
3
4
5
6
7
8
9
10
11
这段代码是用来创建一个 ConcurrentHashMap
(并发哈希表)实例的构造函数。让我们逐步解释它:
- 参数检查:
- 首先,代码对传入的参数进行检查,确保它们符合创建
ConcurrentHashMap
的要求。 loadFactor
:必须大于 0.0,因为它表示内部哈希表进行扩容的加载因子阈值。initialCapacity
:必须大于等于 0,代表哈希表的初始容量。concurrencyLevel
:必须大于 0,表示预估的并发更新线程数。
- 首先,代码对传入的参数进行检查,确保它们符合创建
- 设置初始容量:
- 接下来,构造函数检查
initialCapacity
是否小于concurrencyLevel
。如果是,它会将initialCapacity
设置为concurrencyLevel
。 - 这个步骤确保哈希表至少有和预估的并发更新线程数一样多的桶(存储空间),以提高并发性能。
- 接下来,构造函数检查
- 计算大小:
- 构造函数接着根据给定的
initialCapacity
和loadFactor
计算内部哈希表的大小。 size
的计算为1.0 + initialCapacity / loadFactor
。- 这个大小用来确定哈希表的容量。
- 构造函数接着根据给定的
- 确定哈希表的容量:
- 接下来,构造函数使用计算得到的
size
来决定哈希表的容量。 - 如果计算得到的
size
大于等于MAXIMUM_CAPACITY
,则将哈希表的容量设置为MAXIMUM_CAPACITY
。 - 否则,会调用
tableSizeFor()
方法,根据size
计算出最接近且大于size
的 2 的幂次方值,并将其作为哈希表的容量。
- 接下来,构造函数使用计算得到的
- 设置
sizeCtl
:- 最后,将计算得到的容量
cap
赋值给sizeCtl
,这是ConcurrentHashMap
内部用于管理表的控制参数。
- 最后,将计算得到的容量
总体来说,这段代码创建了一个 ConcurrentHashMap
的实例,并对传入的参数进行了合法性检查,然后根据参数计算了哈希表的大小和容量。这些控制参数的设置有助于确保并发性能和哈希表的正确功能。
# 6.sizeCtl 的作用
sizeCtl
是 ConcurrentHashMap
内部的一个控制参数,用于控制哈希表的状态和并发扩容操作。
具体来说,sizeCtl
的作用如下:
- 控制表的初始化大小:
- 在构造函数中,我们可以看到
sizeCtl
被用来设置哈希表的初始容量,它是在构造函数末尾通过this.sizeCtl = cap;
进行赋值的。 cap
是根据传入的initialCapacity
和loadFactor
计算得到的哈希表容量。
- 在构造函数中,我们可以看到
- 控制表的扩容:
sizeCtl
还用于控制哈希表的并发扩容操作。当哈希表需要进行扩容时,会使用sizeCtl
的值来判断当前是否有其他线程正在进行扩容操作,或者标记当前线程正在进行扩容。- sizeCtl 的值可以是负数或者正数,具体的含义如下:
- 负数:表示当前有线程正在进行扩容操作,而绝对值表示正在扩容的线程数减 1(因为扩容线程计数是从-1 开始的)。
- 0:表示当前没有进行扩容操作,且可以接受新的扩容请求。
- 正数:表示当前没有进行扩容操作,但是已经有线程发起了扩容请求,该正数值表示发起扩容请求的线程数。
- 控制并发性:
sizeCtl
的负值和正值都表示当前存在扩容请求或正在扩容的情况,因此在进行扩容操作时,其他线程的访问可能需要等待。- 这样的设计可以防止多个线程同时触发扩容操作,从而避免对哈希表的结构进行重复扩容,提高了并发性能。
总结起来,sizeCtl
是 ConcurrentHashMap
内部用于控制哈希表状态、扩容操作和并发性的重要参数。它在哈希表的初始化和扩容过程中起着关键的作用,保障了 ConcurrentHashMap
的正确性和高并发性能。
# 7.定位数据?
首先需要找到 segment,通过散列算法对 hashCode 进行再散列找到 segment,主要是为了分布更加均匀,否则都在一个 segment 上,起不到分段锁的作用.散列函数如下,为了让高位参与到运算中.
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift)&segmentMask];
}
2
3
hash >>> segmentShift)&segmentMask // 定位Segment所使用的hash 算法
int index= hash &(tab.length -1); // 定位HashEntry所使用的hash算法
2
# 8.ConcurrentHashMap 的 get 操作?
transient volatile int count;
volatile V value;
public V get(Object key) {
int hash =hash(key.hashCode());//先进行一次再散列
return segmentFor(hash).get(key,hash);//再通过散列值定位到segment
//最后通过散列获取到元素
}
2
3
4
5
6
7
8
get 操作的高效之处在于整个 get 过程不需要加锁。原因是它的 get 方法里将要使用的共享变量都定义成 volatile 类型
# 9.ConcurrentHashMap 的 put 操作?
put 方法必须加锁,需要做 2 件事
- 是否扩容
- 查找插入位置
在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而只对某个 segment 进行扩容(jdk1.7)。数据量相对较小,注意和 HashMap 的区别.
# 10.说说 ConcurrentLinkedQueue?
线程安全的非阻塞队列 ConcurrentLinkedQueue
offer(E e)
方法是 ConcurrentLinkedQueue
提供的一个添加元素的方法。它用于将指定的元素插入队列的末尾(尾部)。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {//p是最后一个节点
if (p.casNext(null, newNode)) {//通过cas替换节点p的next节点
if (p != t) // 是否通过cas替换tail的指向
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)//tail未初始化
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
详细解释 ConcurrentLinkedQueue
的 offer(E e)
方法:
- 参数:
E e
e
是要插入队列的元素。
- 功能:
offer(E e)
方法将元素e
插入队列的末尾(尾部)。- 如果队列当前没有被其他线程占用或被修改,则插入操作是立即执行的。
- 如果队列当前正在被其他线程进行修改(比如扩容等操作),那么插入操作可能会被阻塞,直到队列可用为止。
- 返回值:
true
:如果元素e
成功插入队列。false
:如果由于某种原因(例如队列已满)未能插入元素e
。
- 注意事项:
ConcurrentLinkedQueue
是一个无界队列,不会限制队列的容量,因此offer(E e)
永远不会阻塞,除非由于资源限制或其他异常情况导致的插入失败。- 由于
ConcurrentLinkedQueue
是无界的,因此在不断地添加元素的情况下,它可能会占用大量的内存。
总的来说,ConcurrentLinkedQueue
的 offer(E e)
方法用于向队列的末尾添加元素 e
,它是一个非阻塞方法,可以在高并发环境下安全使用。需要注意的是,由于它是无界队列,添加元素时应当注意不要无限制地添加,以免占用过多的内存。
poll()
方法是ConcurrentLinkedQueue
的一个核心方法,用于从队列头部获取并移除元素。下面详细解释poll()
方法的工作原理:
poll()
方法用于从队列头部获取并移除元素。如果队列为空,poll()
方法会返回null
。poll()
方法是一个非阻塞方法,它不会阻塞线程,即使队列为空。- 在执行
poll()
方法时,它会检查队列头部是否有元素。 - 若队列为空(即没有任何元素),
poll()
方法会立即返回null
。 - 若队列非空,
poll()
方法会移除队列头部的元素,并将其返回。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 11.BlockingQueue 的实现原理?
方法 | 1 | 2 | 3 | 4 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() |
阻塞队列使用 Lock+多个 Condition 实现的 FIFO 的队列.多线程环境下安全的,如果队列满了,放入元素的线程会被阻塞,如果队列空了,取元素的线程会被阻塞.具体原理一起看源代码。通过 Condition 来实现队列元素的阻塞,是空还是满.
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2
3
4
5
6
7
8
# 12.JDK 中阻塞队列
JDK 7 提供了 7 个阻塞队列,如下。
队列类型 | 描述 |
---|---|
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列。它按照 FIFO(先进先出)原则对元素进行排序。在队列已满时,插入操作将被阻塞,直到队列有空间可用。在队列为空时,获取操作也将被阻塞,直到队列中有元素可获取。 |
LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列。也按照 FIFO 原则对元素进行排序。与 ArrayBlockingQueue 不同的是,它没有固定的容量上限,可以在构造时指定容量,如果未指定则默认为 Integer.MAX_VALUE。当队列为空或已满时,插入和获取操作将被阻塞。 |
PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列。元素需要实现 Comparable 接口或者通过构造时提供的 Comparator 进行比较。无论队列是否为空,获取操作都会立即返回最高优先级的元素。当队列为空时,获取操作将被阻塞。 |
DelayQueue | 一个使用优先级队列实现的无界阻塞队列。其中的元素必须实现 Delayed 接口,只有在其指定的延迟时间到达后才能被获取。元素按照延迟时间的长短进行排序。当队列为空时,获取操作将被阻塞。 |
SynchronousQueue | 一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的移除操作,反之亦然。它在并发场景中用于线程间直接传递数据,是一种特殊的同步工具。 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列。它在 LinkedBlockingQueue 的基础上添加了更高级的传输操作。除了基本的 FIFO 排序外,它还支持直接传输元素给消费者。在传输操作前后,获取操作和插入操作都可以被阻塞。 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。可以在队列的两端进行插入和获取操作。它也可以在构造时指定容量,如果未指定则默认为 Integer.MAX_VALUE。当队列为空或已满时,插入和获取操作将被阻塞。 |
以上表格中列出的队列类型是 Java 中并发包(java.util.concurrent)中提供的主要队列实现。每种队列类型在不同的场景中都有特定的用途,开发者可以根据具体需求选择合适的队列来实现线程安全的数据传递和任务调度。
# 13.DelayQueue 的使用场景?
在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现。
关闭空闲连接.服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
缓存.缓存中的对象,超过了空闲时间,需要从缓存中移出。
任务超时处理.在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
# 14.TransferQueue 的使用?
使用 TransferQueue 交替打印字符串
public class Juc_03_question_AbcAbc_06 {
public static void main(String[] args){
char[] aC = "ABC".toCharArray();
char[] bC = "123".toCharArray();
TransferQueue<Character> queue = new LinkedTransferQueue<>();
new Thread(()-> {
try {
for (char c : aC){
System.out.println(queue.take());
queue.transfer(c);
}
} catch (InterruptedException e){
e.printStackTrace();
}
},"t1").start();
new Thread(()-> {
try {
for (char c : bC){
queue.transfer(c);
System.out.println(queue.take());
}
} catch (InterruptedException e){
e.printStackTrace();
}
},"t2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
为什么 SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue:
SynchronousQueue 无锁竞争,需要依据实际情况注意生产者线程和消费者线程的配比.
# 15.Fork/Join 框架原理?
Fork/Join 框架是 Java 中用于并行任务执行的一种框架,它基于"分治"(divide-and-conquer)的思想。Fork/Join 框架允许将一个大任务划分为多个小任务,然后并行地执行这些小任务,并最终将它们的结果合并起来得到最终的结果。
Fork/Join 框架的原理如下:
- 分解任务:在 Fork/Join 框架中,一个大任务会被逐步地拆分成多个小任务,直到这些小任务可以直接处理(通常是足够小到不可再拆分的大小)。这个过程称为"分解"(Forking)。
- 并行执行:一旦任务被成功地拆分成多个小任务,这些小任务就可以并行地在不同的处理器上执行。Fork/Join 框架通过工作窃取(Work-Stealing)算法来实现任务的动态调度。当一个线程完成了它所拥有的小任务后,它会尝试从其他线程的任务队列中"窃取"一个新的任务进行处理,以保持线程的高利用率。
- 合并结果:在并行执行的过程中,每个小任务都会产生一个局部结果。当所有小任务都完成后,这些局部结果将会被合并成整个大任务的最终结果。这个过程称为"合并"(Joining)。
Fork/Join 框架主要涉及以下两个关键类:
ForkJoinTask
:这是一个抽象类,用于表示一个可以并行执行的任务。它有两个重要的子类:RecursiveTask
用于有返回值的任务,RecursiveAction
用于没有返回值的任务。ForkJoinPool
:这是 Fork/Join 框架的线程池,负责管理和调度任务的执行。它维护了一个工作队列和多个工作线程,以便高效地执行分解和合并任务。
在使用 Fork/Join 框架时,开发者需要继承RecursiveTask
或RecursiveAction
类,实现compute()
方法,在compute()
方法中将大任务分解成小任务,并实现任务的执行和结果合并逻辑。然后,将这些小任务提交给 Fork/Join 框架进行并行执行,最终得到任务的结果。Fork/Join 框架的自动任务调度和工作窃取算法能够有效地利用多核处理器的计算资源,提高并行任务执行的效率。
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。
# 16.fork 方法解读
当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread 的 pushTask 方法异步地执行这个任务,然后立即返回结果.代码如下。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
2
3
4
5
6
7
8
- 获取当前线程:
- 首先,代码通过
Thread.currentThread()
方法获取当前正在执行fork()
方法的线程对象,并将其赋值给变量t
。
- 首先,代码通过
- 判断当前线程类型:
- 然后,代码通过
instanceof
关键字判断当前线程是否是ForkJoinWorkerThread
的实例。 - 如果当前线程是
ForkJoinWorkerThread
的实例,说明该线程是 Fork/Join 框架中的工作线程。
- 然后,代码通过
- 提交任务:
- 如果当前线程是 Fork/Join 框架的工作线程,即
ForkJoinWorkerThread
的实例,那么代码会将当前任务this
推送(push)到该工作线程所绑定的工作队列中。 - 如果当前线程不是 Fork/Join 框架的工作线程,即是普通的 Java 线程,那么代码会通过
ForkJoinPool.common.externalPush(this)
方法将当前任务this
提交给 Fork/Join 框架的公共池(commonPool
)。
- 如果当前线程是 Fork/Join 框架的工作线程,即
- 返回任务:
- 最后,
fork()
方法会返回当前任务this
,以便链式调用或其他后续操作。
- 最后,
总结:fork()
方法用于将当前任务提交给 Fork/Join 框架进行并行执行。如果当前线程是 Fork/Join 框架的工作线程,任务会被推送到该工作线程的工作队列中;如果当前线程不是 Fork/Join 框架的工作线程,任务会被提交给 Fork/Join 框架的公共池。通过fork()
方法,开发者可以将一个大任务拆分成子任务,实现任务的并行执行。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 17.join 方法解读
Join 方法的主要作用是阻塞当前线程并等待获取结果.让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
2
3
4
5
6
- 首先,代码声明一个整型局部变量
s
,用于保存任务执行的状态。 - 定义局部变量:
- 首先,代码声明一个整型局部变量
s
,用于保存任务执行的状态。
- 首先,代码声明一个整型局部变量
- 调用
doJoin()
方法:- 接下来,代码调用
doJoin()
方法,该方法实际上是ForkJoinTask
类的一个抽象方法,需要在子类中实现。doJoin()
方法用于实际等待任务的完成并获取其执行状态。
- 接下来,代码调用
- 获取执行状态:
doJoin()
方法返回的是任务执行状态的值。通过位运算& DONE_MASK
,将s
的值与DONE_MASK
(一个常量,表示任务状态的掩码)进行按位与运算,可以得到任务的实际执行状态。
- 判断执行状态:
- 如果执行状态
s
不等于NORMAL
(其中NORMAL
是ForkJoinTask
类中的一个常量,表示任务正常完成),则说明任务执行过程中出现了异常或被取消。在这种情况下,代码会调用reportException(s)
方法,对异常进行处理和报告。
- 如果执行状态
- 返回结果:
- 如果任务的执行状态
s
等于NORMAL
,则说明任务已经正常完成。此时,代码调用getRawResult()
方法,获取任务的执行结果,并将结果返回给调用者。
- 如果任务的执行状态
总结:join()
方法用于等待当前任务的执行结果。它通过调用doJoin()
方法获取任务的执行状态,判断任务是否正常完成。如果任务正常完成,则调用getRawResult()
方法获取任务的执行结果并返回;如果任务执行过程中出现异常或被取消,则通过reportException(s)
方法对异常进行处理。通过join()
方法,可以实现对任务执行结果的获取和等待。
再来分析一下 doJoin()方法的实现代码
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
2
3
4
5
6
7
8
9
在 doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行.如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。
# 18.说说 java 中的原子操作类?
常用的有 13 个原子操作类,都是通过 cas 实现的.
Java 从 JDK1.5 开始提供了 java.util.concurrent.atomic 包(以下简称 Atomic 包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式.因为变量的类型有很多种
- Atomic 包里一共提供了 13 个类
- 属于 4 种类型的原子更新方式
- 原子更新基本类型、
- 原子更新数组、
- 原子更新引用
- 原子更新属性(字段).
- Atomic 包里的类基本都是使用 Unsafe 实现的包装类。
atomic 提供了 3 个类用于原子更新基本类型:
- AtomicInteger 原子更新整形
- AtomicLong 原子更新长整形
- AtomicBoolean 原子更新 bool 值
atomic 里提供了三个类用于原子更新数组里面的元素:
- AtomicIntegerArray:原子更新整形数组里的元素;
- AtomicLongArray:原子更新长整形数组里的元素;
- AtomicReferenceArray:原子更新引用数组里的元素。
原子更新基本类型的 AtomicInteger 只能更新一个变量,如果要原子更新多个变量,就需要使用原子更新引用类型提供的类了.原子引用类型 atomic 包主要提供了以下几个类:
- AtomicReference:原子更新引用类型;
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
- AtomicMarkableReference:原子更新带有标记位的引用类型.可以原子更新一个布尔类型的标记位和引用类型.构造方法是 AtomicMarkableReference(V initialRef, boolean initialMark)
如果需要原子更新某个对象的某个字段,就需要使用原子更新属性的相关类,atomic 中提供了一下几个类用于原子更新属性:
- AtomicIntegerFieldUpdater:原子更新整形属性的更新器;
- AtomicLongFieldUpdater:原子更新长整形的更新器;
AtomicStampedReference
:原子更新带有版本号的引用类型.该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。AtomicMarkableReference
也可以解决 ABA 问题.
# 19.说说对 CountDownLatch 的理解?
CountDownLatch
是 Java 中并发包(java.util.concurrent)提供的一个同步工具类,它的原理主要基于倒计数的方式。CountDownLatch
允许一个或多个线程等待其他线程完成一组操作,然后再继续执行自己的任务。
CountDownLatch
的原理如下:
- 初始化计数值:
- 在创建
CountDownLatch
实例时,需要指定一个初始的计数值(count)。该计数值表示需要等待的操作的数量。
- 在创建
- 等待操作:
- 在主线程或某个线程中,调用
CountDownLatch
的await()
方法,该方法会使当前线程进入等待状态,直到计数值变为零。 - 如果计数值当前不为零,则
await()
方法会一直阻塞当前线程,直到计数值为零。
- 在主线程或某个线程中,调用
- 完成操作:
- 在其他线程中,执行一组操作,并在每个操作完成后,调用
CountDownLatch
的countDown()
方法。每次调用countDown()
方法,计数值减 1。
- 在其他线程中,执行一组操作,并在每个操作完成后,调用
- 计数归零:
- 当
countDown()
方法的调用次数累计达到初始计数值时,计数值将变为零。
- 当
- 继续执行:
- 一旦计数值变为零,所有因调用
await()
方法而进入等待状态的线程都会被唤醒,继续执行后续的任务。
- 一旦计数值变为零,所有因调用
通过以上原理,CountDownLatch
可以实现线程间的协作和同步,使得某个线程等待其他线程完成特定操作后再继续执行。这对于多线程场景中需要等待其他线程完成某些初始化、数据加载或任务执行的情况非常有用。一旦计数值变为零,所有等待的线程都会被唤醒,从而实现了线程的协调和同步。
latch.await();
latch.countDown();
2
# 20.同步屏障 CyclicBarrier 理解?
CyclicBarrier
是 Java 中并发包(java.util.concurrent)提供的另一个同步工具类,它的原理是循环栅栏。CyclicBarrier
允许一组线程互相等待,直到所有线程都到达一个公共的屏障点,然后再同时继续执行。
CyclicBarrier
的原理如下:
- 初始化屏障点和参与线程数:
- 在创建
CyclicBarrier
实例时,需要指定一个屏障点(barrier),表示所有线程都要等待达到的点。同时,需要指定参与线程数(parties),表示需要等待的线程数量。
- 在创建
- 等待到达屏障点:
- 在各个线程中,调用
CyclicBarrier
的await()
方法,该方法会使当前线程等待,直到所有线程都到达屏障点。 - 每次调用
await()
方法,当前线程会被阻塞,直到所有参与线程都调用了await()
方法。
- 在各个线程中,调用
- 达到屏障点后继续执行:
- 一旦所有参与线程都调用了
await()
方法,它们都会在屏障点处等待。 - 当所有参与线程都到达屏障点后,
CyclicBarrier
会解除所有线程的阻塞状态,使它们可以继续执行后续的任务。
- 一旦所有参与线程都调用了
- 循环使用:
- 与
CountDownLatch
不同,CyclicBarrier
是可以循环使用的。一旦所有线程都到达屏障点并被释放,CyclicBarrier
会被重置,所有线程可以再次使用它进行下一轮的同步。
- 与
通过以上原理,CyclicBarrier
可以实现多个线程之间的协作和同步,让它们在公共的屏障点处等待,直到所有线程都到达后再同时继续执行。这对于多个线程需要同时完成某个阶段,然后再一起继续执行后续阶段的情况非常有用。每当CyclicBarrier
被重置,新的一轮同步过程又可以开始。
CyclicBarrier 依赖于 ReentrantLock 实现
barrier.await();
barrier.getNumberWaiting()
2
# 21. CyclicBarrier 和 CountDownLatch?
CyclicBarrier
和CountDownLatch
是 Java 并发包中两种不同的同步工具,它们在使用场景和原理上有一些区别。
- 同步方式:
CyclicBarrier
:采用循环栅栏的同步方式。多个线程在达到公共的屏障点处等待,直到所有线程都到达后才同时继续执行。CyclicBarrier
可以循环使用,每当所有线程都到达屏障点并被释放,它会被重置,可以进行下一轮的同步。CountDownLatch
:采用倒计数的同步方式。一个或多个线程等待其他线程执行完成一组操作后再继续执行。CountDownLatch
的计数值在创建时被初始化,每当一个线程完成一个操作,计数值减 1,直到计数值变为零时,等待的线程被唤醒。
- 参与方面:
CyclicBarrier
:需要指定参与线程数,在每次等待时都要等待所有参与线程到达屏障点。CountDownLatch
:需要指定倒计数值,在倒计数值变为零时所有等待的线程都会被唤醒,不需要指定参与线程数。
- 循环使用:
CyclicBarrier
可以循环使用,每次达到屏障点后,它会被重置,可以进行下一轮的同步。CountDownLatch
在计数值减为零后,无法重置或再次使用,一旦倒计数值为零,它就失去了继续等待其他线程的能力。
- 作用场景:
CyclicBarrier
适用于多个线程需要等待其他线程同时到达某个屏障点
后再一起继续执行的场景,常用于解决复杂任务的拆分和合并问题。CountDownLatch
适用于一个或多个线程需要等待其他线程完成一组操作
后再继续执行的场景,常用于线程间协调和同步。
# 22.说说 Semaphore ?
Semaphore
是 Java 中并发包(java.util.concurrent)提供的另一个同步工具类,它的原理基于信号量的概念。Semaphore
用于控制同时访问某个共享资源的线程数量,它维护一个许可证(permit)的计数,用于限制同时访问共享资源的线程数量。
Semaphore
的原理如下:
- 初始化许可证数量:
- 在创建
Semaphore
实例时,需要指定初始的许可证数量。这个数量表示同时允许的线程数。
- 在创建
- 获取许可证:
- 当一个线程需要访问共享资源时,它首先需要调用
Semaphore
的acquire()
方法。如果当前许可证数量大于零,该线程将获得一个许可证,并将许可证数量减 1。这样,许可证数量就可以反映当前可用的资源数。
- 当一个线程需要访问共享资源时,它首先需要调用
- 许可证数量为零时阻塞:
- 如果当前许可证数量为零,即所有的许可证都被其他线程占用,那么
acquire()
方法将会阻塞当前线程,直到有其他线程释放许可证。
- 如果当前许可证数量为零,即所有的许可证都被其他线程占用,那么
- 释放许可证:
- 当一个线程使用完共享资源后,它需要调用
Semaphore
的release()
方法来释放许可证。这将增加许可证数量,并允许其他等待许可证的线程继续执行。
- 当一个线程使用完共享资源后,它需要调用
通过以上原理,Semaphore
可以控制同时访问某个共享资源的线程数量,以防止过多的线程同时竞争资源导致资源过度消耗或产生冲突。Semaphore
是一种有效的并发控制工具,常用于限制同时访问共享资源的线程数量,控制并发访问的并发性。
semaphore.acquire();//阻塞
semaphore.release();//释放
intavailablePermits():返回此信号量中当前可用的许可证数。
intgetQueueLength():返回正在等待获取许可证的线程数。
booleanhasQueuedThreads():是否有线程正在等待获取许可证。
void reducePermits(intreduction):减少reduction个许可证,是个protected方法。
CollectiongetQueuedThreads):返回所有等待获取许可证的线程集合,是个protected方法。
2
3
4
5
6
7
# 23.CopyOnWriteArrayList 原理?
它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和 ArrayList 不同的时
它具有以下特性:
- 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
- 它是线程安全的。
- 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
- 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
- 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
原理:
- CopyOnWriteArrayList 实现了 List 接口,因此它是一个队列。
- CopyOnWriteArrayList 包含了成员 lock。每一个 CopyOnWriteArrayList 都和一个监视器锁 lock 绑定,通过 lock,实现了对 CopyOnWriteArrayList 的互斥访问。
- CopyOnWriteArrayList 包含了成员 array 数组,这说明 CopyOnWriteArrayList 本质上通过数组实现的。
- CopyOnWriteArrayList 的“动态数组”机制 -- 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”。这就是它叫做 CopyOnWriteArrayList 的原因!CopyOnWriteArrayList 就是通过这种方式实现的动态数组;不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,效率比较高。
- CopyOnWriteArrayList 的“线程安全”机制 -- 是通过 volatile 和监视器锁 Synchrnoized 来实现的。
- CopyOnWriteArrayList 是通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的 保证。
- CopyOnWriteArrayList 通过监视器锁 Synchrnoized 来保护数据。在“添加/修改/删除”数据时,会先“获取监视器锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的。
# 24.LongAdder 原理
LongAdder 是 Java 并发包中的一个类,用于高效地支持并发计数操作。它在 Java 8 中被引入,是对 AtomicLong 的改进和优化。
在多线程环境下,通常需要对共享的计数器进行增加操作。传统的 AtomicLong 类在高并发环境下会存在性能问题,因为它使用 CAS(Compare and Swap)指令来保证操作的原子性。在高并发情况下,多个线程竞争同一个 AtomicLong 实例可能导致大量的 CAS 操作,从而降低性能。
LongAdder 通过在内部使用一种更加高效的技术,将计数分散到多个变量中,从而减少了竞争。它维护了一个数组来保存多个变量,每个线程在进行计数操作时会根据哈希算法选择一个特定的变量进行增加,而不是像 AtomicLong 那样直接竞争一个变量。
这样做的好处是,在高并发情况下,线程之间几乎没有竞争,从而减少了 CAS 操作的次数,提高了并发性能。当需要获取总计数时,LongAdder 将所有变量的值求和得到结果。
使用 LongAdder 时,你可以通过调用 add()
方法增加计数,也可以通过 sum()
方法获取当前的总计数。
以下是 LongAdder 的简单示例:
import java.util.concurrent.atomic.LongAdder;
public class LongAdderExample {
public static void main(String[] args) {
LongAdder counter = new LongAdder();
// 多个线程增加计数
Runnable incrementTask = () -> {
for (int i = 0; i < 1000; i++) {
counter.add(1);
}
};
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 获取总计数
long total = counter.sum();
System.out.println("Total count: " + total);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
总结一下,LongAdder 是在高并发环境下用于替代 AtomicLong 的一种高效并发计数器。它通过将计数分散到多个变量中,减少了线程之间的竞争,从而提高了并发性能。
# 二.volatile 与 Synchonized
# 1.说说重排序的分类?
在执行程序时,为了提高性能,编译器(jvm 里的)和处理器(操作系统级别的)常常会对指令做重排序.重排序分 3 种类型。
编译器优化的重排序:
编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
指令级并行的重排序:
现代处理器采用了指令级并行技术(Instruction-Level Parallelism, ILP)来将多条指令重叠执行.如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
内存系统的重排序:
由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
上述的 1 属于编译器重排序,2 和 3 属于处理器重排序.这些重排序可能会导致多线程程序出现内存可见性问题.对于编译器,JMM 的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止).
对于处理器重排序,JMM 的处理器重排序规则会要求 Java 编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers, Intel 称之为 Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。
在单线程程序中,对存在数据依赖的操作重排序,不会改变执行结果(这也是 as-if-serial 语义允许对存在控制依赖的操作做重排序的原因);但在多线程程序中,对存在控制依赖的操作重排序,可能会改变程序的执行结果,因此必须要通过一定的同步手段加以控制。
# 2.说说重排序?
重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。
jmm 在实现上在不改变结果的前提下,编译器和处理器可以进行优化性的重排序.
# 3.volatile 实现原则
相关名词
- TPS(Transactions Per Second):每秒事务处理数,衡量一个服务性能好坏的评判标准。
- JMM(Java Memory Model):Java 内存模型。内存模型 JMM 控制多线程对共享变量的可⻅性
volatile 实现原则有 2 条:
- lock 前缀指令会使处理器缓存写回内存.
- 一个处理器的缓存写回内存,会导致其他处理器的缓存失效.
# 4.内存屏障的种类以及说明?
StoreLoad Barriers 是一个“全能型”的屏障,它同时具有其他 3 个屏障的效果.现代的多处理器大多支持该屏障(其他类型的屏障不一定被所有处理器支持).执行该屏障开销会很昂贵,因为当前处理器通常要把写缓冲区中的数据全部刷新到内存中(Buffer Fully Flush).
# 5.volatile 的实现原理
为了实现 volatile 内存语义,JMM 会分别禁止如下两种类型的重排序类型:
从图中可看出:
volatile 写写禁止重排序
volatile 读写,读读禁止重排序; volatile 读和普通写禁止重排序
volatile 写读,volatile 写写禁止重排序。
为了实现 volatile 的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序.对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能.为此,JMM 采取保守策略.下面是基于保守策略的 JMM 内存屏障插入策略。
- 在每个 volatile 写操作的前面插入一个 StoreStore 屏障。
- 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障。
- 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障。
- 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。
public class Juc_book_fang_03_VolatileBarrierExample {
int a;
volatile int v1 = 1;
volatile int v2 = 2;
void readAndWrite(){
int i = v1; //第一个volatile读
int j = v2; //第二个volatile读
a = i + j; //普通写
v1 = i +1; //第一个volatile写
v2 = j *2; //第二个 volatile写
//其他方法
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
屏障说明:
# 6.说说 volatile 的内存语义?
从内存语义的角度来说,volatile 的写读与锁的释放-获取有相同的内存效果:
- volatile 写和锁的释放有相同的内存语义;
- volatile 读与锁的获取有相同的内存语义。
volatile 写的内存语义如下。
当写一个 volatile 变量时,JMM 会把该线程对应的本地内存中的共享变量值刷新到主内存。
volatile 读的内存语义如下。
当读一个 volatile 变量时,JMM 会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
# 7.volatile 的特性?
volatile
是 Java 中的一个关键字,用于修饰变量。volatile
具有以下特性:
- 可见性:当一个变量被
volatile
修饰时,在一个线程中修改该变量后,其他线程可以立即看到修改后的值。这是因为volatile
会强制将变量的值从线程的本地内存中刷新到主内存中,使得其他线程可以读取到最新的值。 - 有序性:当一个变量被
volatile
修饰时,对该变量的读写操作具有有序性。这是因为volatile
会禁止指令重排序,保证指令执行的顺序与程序代码中的顺序一致。这个特性主要是为了解决多线程中的并发问题,保证线程之间的操作顺序与程序代码中的顺序一致。 - 不保证原子性:虽然
volatile
具有可见性和有序性,但是并不保证对变量的操作是原子性的,即不一定能够保证在多线程环境下操作变量时的安全性。例如,对于a++
这样的操作,虽然是原子操作,但是由于包含了读取、加 1、写回三个步骤,因此在多线程环境下仍然可能出现线程安全问题。
因此,volatile
主要用于保证变量的可见性和有序性,而不是用于解决线程安全问题。如果需要保证线程安全,需要结合其他机制,例如synchronized
、Lock
等。
多线程i++操作不保证原子性:
虽然使用了 volatile 关键字来修饰 count 变量,但是仍然无法保证输出结果为 50。这是因为 volatile 只能保证变量在多线程之间的可见性,但并不能保证对变量的操作是原子的。
在多线程环境下,即使使用 volatile 关键字修饰变量,当多个线程同时对这个变量进行自增操作时,仍然可能会出现竞争条件,导致结果不是预期的。
# 8.volatile 是如何保证可见性的?
示例代码中, instance 被 volatile 修饰。
volatile instance = new instance();
上边的 new 操作,转化成汇编代码如下: 有 volatile 变量修饰的共享变量进行写操作的时候会多出第二行 Lock 汇编代码, Lock 前缀的指令在多核处理器下会引发了两件事情:
- 将当前处理器缓存行的数据写回到系统内存.
(volatile 写的内存语义)
- 这个写回内存的操作会使在其他 CPU 里缓存了该内存地址的数据无效.
(volatile 读的内存语义)
如果对声明了 volatile 的变量进行写操作,JVM 就会向处理器发送一条 Lock 前缀的指令,将这个变量所在缓存行的数据写回到系统内存。
在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里(volatile 读的内存语义)
# 9.synchronized 三种使用
java 中的每一个对象都可以作为锁。具体表现为以下 3 种形式。
- 对于普通同步方法,锁是当前实例对象。
ACC_SYNCHRONIZED
- 对于静态同步方法,锁是当前类的 Class 对象。
ACC_SYNCHRONIZED
和ACC_STATIC
- 对于同步方法块,锁是
synchronized
括号里配置的对象。monitorenter
和monitorexit
,其中monitorexit
至少有 2 个出口,一个正常出口,一个异常出口
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁
当出现读少写多时,volatile 并不合适,提高吞吐量还得靠重量级锁.
# 10.synchronized 的实现原理
在 Java 早期版本中,synchronized
属于重量级锁,效率低下,因为监视器锁(monitor
)是依赖于底层的操作系统的 MutexLock
来实现的,Java 的线程是映射到操作系统的原生线程之上的。如果要挂起或者唤醒一个线程,都需要操作系统帮忙完成,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期的 synchronized
效率低的原因。庆幸的是在 Java6 之后 Java 官方对从 JVM 层面对 synchronized
较大优化,主要是锁升级的过程. owner 表示 Monitor 锁的持有者,而且同一个时刻只能有一个 owner.
JVM 基于进入和退出 Monitor
对象来实现方法同步和代码块同步,但两者的实现细节不一样。
代码块同步是使用 monitorenter
和 monitorexit
指令实现的,而方法同步是使用另外一种方式实现的。
monitorenter
指令是在编译后插入到同步代码块的开始位置,而 monitorexit
是插入到方法结束处和异常处,JVM 要保证每个 monitorenter
必须有对应的 monitorexit
与之配对。
任何对象都有一个 monitor 与之关联,当且一个 monitor 被持有后,它将处于锁定状态.线程执行到 monitorenter
指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁。
修饰方法:
public class SynchonizedTest1 {
private static int a = 0;
public synchronized void add(){
a++;
}
}
2
3
4
5
6
可以看到在 add 方法的 flags 里面多了一个 ACC_SYNCHRONIZED 标志,这标志用来告诉 JVM 这是一个同步方法
┌─[qinyingjie@qinyingjiedeMacBook-Pro]-[~/Documents/idea-workspace/ant/ant-juc/target/classes/com/xiaofei/antjuc/方腾飞]-[Thu Apr 14,18:51]
└─[$]<git:(master*)> javap -v SynchonizedTest1.class
Classfile /Users/qinyingjie/Documents/idea-workspace/ant/ant-juc/target/classes/com/xiaofei/antjuc/方腾飞/SynchonizedTest1.class
Last modified 2022-4-14; size 486 bytes
MD5 checksum 1a0bdb0e66832a2980bb5b8c0a58eff7
Compiled from "SynchonizedTest1.java"
public class com.xiaofei.antjuc.方腾飞.SynchonizedTest1
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #4.#18// java/lang/Object."<init>":()V
#2 = Fieldref #3.#19// com/xiaofei/antjuc/方腾飞/SynchonizedTest1.a:I
#3 = Class #20// com/xiaofei/antjuc/方腾飞/SynchonizedTest1
#4 = Class #21// java/lang/Object
#5 = Utf8 a
#6 = Utf8 I
#7 = Utf8 <init>
#8 = Utf8 ()V
#9 = Utf8 Code
#10 = Utf8 LineNumberTable
#11 = Utf8 LocalVariableTable
#12 = Utf8 this
#13 = Utf8 Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest1;
#14 = Utf8 add
#15 = Utf8 <clinit>
#16 = Utf8 SourceFile
#17 = Utf8 SynchonizedTest1.java
#18 = NameAndType #7:#8 //"<init>":()V
#19 = NameAndType #5:#6 // a:I
#20 = Utf8 com/xiaofei/antjuc/方腾飞/SynchonizedTest1
#21 = Utf8 java/lang/Object
{
public com.xiaofei.antjuc.方腾飞.SynchonizedTest1();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 3: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest1;
public synchronized void add();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #2 // Field a:I
3: iconst_1
4: iadd
5: putstatic #2 // Field a:I
8: return
LineNumberTable:
line 6: 0
line 7: 8
LocalVariableTable:
Start Length Slot Name Signature
0 9 0 this Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest1;
static {};
descriptor: ()V
flags: ACC_STATIC
Code:
stack=1, locals=0, args_size=0
0: iconst_0
1: putstatic #2 // Field a:I
4: return
LineNumberTable:
line 4: 0
}
SourceFile: "SynchonizedTest1.java"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
修饰类:
主要是使用 monitorenter 和 monitorexit 实现
有两个 monitorexit 呢?
第一个:正常退出
第二个:异常退出
public class SynchonizedTest2 {
private static int a = 0;
public void add(){
synchronized (SynchonizedTest2.class){
a++;
}
}
}
2
3
4
5
6
7
8
9
方腾飞|master⚡⇒ jjavap -v SynchonizedTest2
警告: 二进制文件SynchonizedTest2包含com.xiaofei.antjuc.方腾飞.SynchonizedTest2
Classfile /Users/qinyingjie/Documents/idea-workspace/ant/ant-juc/target/classes/com/xiaofei/antjuc/方腾飞/SynchonizedTest2.class
Last modified 2022-4-14; size 599 bytes
MD5 checksum e4ad4e62082f26cefee3bb1715e94295
Compiled from "SynchonizedTest2.java"
public class com.xiaofei.antjuc.方腾飞.SynchonizedTest2
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #4.#22// java/lang/Object."<init>":()V
#2 = Class #23// com/xiaofei/antjuc/方腾飞/SynchonizedTest2
#3 = Fieldref #2.#24// com/xiaofei/antjuc/方腾飞/SynchonizedTest2.a:I
#4 = Class #25// java/lang/Object
#5 = Utf8 a
#6 = Utf8 I
#7 = Utf8 <init>
#8 = Utf8 ()V
#9 = Utf8 Code
#10 = Utf8 LineNumberTable
#11 = Utf8 LocalVariableTable
#12 = Utf8 this
#13 = Utf8 Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest2;
#14 = Utf8 add
#15 = Utf8 StackMapTable
#16 = Class #23// com/xiaofei/antjuc/方腾飞/SynchonizedTest2
#17 = Class #25// java/lang/Object
#18 = Class #26// java/lang/Throwable
#19 = Utf8 <clinit>
#20 = Utf8 SourceFile
#21 = Utf8 SynchonizedTest2.java
#22 = NameAndType #7:#8 //"<init>":()V
#23 = Utf8 com/xiaofei/antjuc/方腾飞/SynchonizedTest2
#24 = NameAndType #5:#6 // a:I
#25 = Utf8 java/lang/Object
#26 = Utf8 java/lang/Throwable
{
public com.xiaofei.antjuc.方腾飞.SynchonizedTest2();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 3: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest2;
public void add();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class com/xiaofei/antjuc/方腾飞/SynchonizedTest2
2: dup
3: astore_1
4: monitorenter
5: getstatic #3 // Field a:I
8: iconst_1
9: iadd
10: putstatic #3 // Field a:I
13: aload_1
14: monitorexit
15: goto 23
18: astore_2
19: aload_1
20: monitorexit
21: aload_2
22: athrow
23: return
Exception table:
from to target type
5 15 18 any
18 21 18 any
LineNumberTable:
line 7: 0
line 8: 5
line 9: 13
line 10: 23
LocalVariableTable:
Start Length Slot Name Signature
0 24 0 this Lcom/xiaofei/antjuc/方腾飞/SynchonizedTest2;
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 18
locals = [ class com/xiaofei/antjuc/方腾飞/SynchonizedTest2, class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
static {};
descriptor: ()V
flags: ACC_STATIC
Code:
stack=1, locals=0, args_size=0
0: iconst_0
1: putstatic #3 // Field a:I
4: return
LineNumberTable:
line 4: 0
}
SourceFile: "SynchonizedTest2.java"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 11.synchronized 同步队列与等待队列?
synchronized
是 Java 中用于实现同步的一种机制。在synchronized
中,同步队列和等待队列是两个重要的概念:
- 同步队列:同步队列是一个双向队列,用于存放已经获得锁的线程。当一个线程成功地获得锁时,它会被加入到同步队列中。同步队列中的线程按照获取锁的先后顺序排队,先获取锁的线程排在队列的前面。
- 等待队列:等待队列是一个单向队列,用于存放等待锁的线程。当一个线程尝试获取锁时,如果锁已经被其他线程持有,那么它就会被加入到等待队列中。等待队列中的线程被阻塞,直到获取锁的线程释放锁后,它们才有机会重新竞争锁。
synchronized
中的同步队列和等待队列是通过内置锁(也称为监视器锁)来实现的。当一个线程成功获取锁时,它会持有锁并进入同步队列;当一个线程无法获取锁时,它会进入等待队列并释放锁,在等待队列中等待被唤醒。当持有锁的线程释放锁时,它会从同步队列中唤醒下一个等待的线程,使其重新竞争锁。
需要注意的是,同步队列和等待队列是在 Java 虚拟机层面实现的,而不是由操作系统的线程调度器来管理。因此,在synchronized
中的线程调度是由 Java 虚拟机来负责调度的,可能与操作系统的线程调度器有所不同。
# 12.synchronized 不同方法?
当一个线程进入一个对象的 synchronized 方法 A 之后,其它线程是否可进入此对象的 synchronized 方法 B?
如果一个线程进入一个对象的synchronized
方法 A,那么其他线程在此期间是无法进入该对象的任何synchronized
方法(包括方法 A 和方法 B)的。这是因为,当一个线程进入对象的synchronized
方法时,它会获取该对象的锁,并将该对象的锁计数器加 1,直到该线程执行完该方法并释放锁后,其他线程才有机会获取该对象的锁。
因此,在同一个对象上,同一时间只能有一个线程执行该对象的synchronized
方法,其他线程必须等待该线程执行完方法并释放锁后,才有机会获取该对象的锁并执行该对象的synchronized
方法。如果多个线程需要同时访问该对象的不同synchronized
方法,可以使用不同的锁对象,或者使用synchronized
代码块,并使用不同的锁对象。
# 13.ObjectMonitor 的属性
ObjectMonitor
是 Java 中的一个内部类,用于实现synchronized
的同步机制。ObjectMonitor
类中包含了以下一些重要的属性:
_owner
:表示当前获得锁的线程对象。_count
:表示当前线程已经重入锁的次数。_waitSet
:表示等待锁的线程队列,即等待队列。_waitSetLock
:表示等待队列的锁对象,用于对等待队列进行同步操作。_recursions
:表示当前线程重入锁的次数,与_count
类似。_EntryList
:表示同步队列,即已获得锁的线程队列。_WaitSetNext
:表示等待队列中下一个等待锁的线程。
需要注意的是,这些属性是在 Java 虚拟机层面实现的,而不是 Java 语言层面的。因此,它们的具体实现可能与不同的 Java 虚拟机实现有所不同。
# 14.MESI 与 volatile
既然 CPU 有缓存一致性协议(MESI),JVM 为啥还需要 volatile 关键字?
CPU 的缓存一致性协议(如 MESI:修改、独占、共享、无效)确实可以帮助处理多个 CPU 核心之间的缓存一致性问题,但是 Java 中的volatile
关键字在某些情况下仍然是必要的,因为它涉及到更高级的内存可见性问题,而不仅仅是底层的缓存一致性。
以下是为什么在某些情况下需要volatile
关键字的原因:
禁止指令重排序:Java 中的
volatile
关键字不仅仅保证了对变量的写操作会刷新到主内存中,还保证了对volatile
变量的读操作不会发生在写操作之前,即禁止了指令重排序。这对于确保线程安全非常重要,因为如果没有这个保证,其他线程可能会在写操作之前看到过时的值。内存可见性:
volatile
关键字保证了一个线程对volatile
变量的写操作对于其他线程来说是可见的。如果一个线程在一个volatile
变量上进行了写操作,那么其他线程在读取该变量时会立即看到最新的值。这是因为volatile
变量的读操作会从主内存中读取,而不是从线程的本地缓存中读取。不适用于复合操作:缓存一致性协议通常只关注单个变量的读写操作,而不考虑复合操作的一致性。在 Java 中,许多操作都是复合操作,例如递增操作(i++),如果不使用
volatile
或其他同步机制,可能会导致不正确的结果。volatile
关键字可以确保这些复合操作在多线程环境中的正确性。
虽然 CPU 的缓存一致性协议可以处理底层的缓存一致性问题,但volatile
关键字在 Java 中提供了更高级别的内存可见性和线程安全保证,特别是在处理复合操作和避免指令重排序方面非常重要。所以,volatile
关键字和 CPU 的缓存一致性协议并不冲突,而是在不同层面上处理多线程的问题。
# 三.锁升级
# 1.描述下锁分类?
锁的类型 | 锁的标志 |
---|---|
non-biasable | 01 (偏向标志位为 0) |
biasable | 01 (偏向标志位为 1) |
biased | 01 (偏向标志位为 1) |
thin lock | 00 |
fat lock | 10 |
GC | 11 |
需要注意的是,标志的描述中有一些错误。biasable
和biased
都使用相同的标志,即偏向标志位为 1,而非 0。此外,GC 标志通常不用于描述锁的状态,而是与垃圾回收相关。
因此,正确的锁类型和标志的描述应为:
- non-biasable(无锁且不可偏向): 01 (偏向标志位为 0)
- biasable(无锁可偏向): 01 (偏向标志位为 1)
- biased(偏向锁): 01 (偏向标志位为 1)
- thin lock(轻量级锁): 00
- fat lock(重量级锁): 10
以下是 Java 中常见的锁的标志以及对应的特点:
锁的类型 | 特点 | 锁标识 | 偏向标志位 |
---|---|---|---|
无锁 | 无锁是指多个线程可以同时访问同一个变量或资源,而不需要进行同步控制,也不会发生冲突。无锁通常适用于读多写少的场景,可以提高并发性能。 | 01 | 0 |
偏向锁 | 偏向锁是一种优化手段,用于减少无竞争情况下的锁操作,从而提高性能。偏向锁会在对象头中记录拥有锁的线程 ID,当一个线程进入同步块时,如果该对象的锁状态为无锁状态,那么当前线程会尝试获取锁并将对象头中的线程ID 设置为自己的 ID。如果当前线程已经拥有了该对象的锁,那么它可以直接进入同步块执行,无需再次获取锁。 | 01 | 1 |
轻量级锁 | 轻量级锁是一种基于自旋的锁,用于减少线程的上下文切换和线程阻塞、唤醒的开销。轻量级锁的实现基于对象头中的标志位 ,当一个线程进入同步块时,如果该对象的锁状态为无锁状态,那么当前线程会尝试使用 CAS 操作将对象头中的标志位设置为轻量级锁,并将锁的拥有者线程 ID 记录在锁记录(Lock Record)中。如果当前线程已经拥有了该对象的锁,那么它可以直接进入同步块执行,无需再次获取锁。如果锁操作失败,那么当前线程会进入自旋状态,尝试等待锁的释放。 | 00 | |
重量级锁 | 重量级锁是一种基于阻塞的锁,用于解决多个线程同时访问同一个资源时的互斥问题。当一个线程尝试获取重量级锁时,如果该锁已经被其他线程占用,那么当前线程会进入阻塞状态,直到该锁被释放并且当前线程重新获得锁。重量级锁的实现基于操作系统的线程调度器,线程的阻塞和唤醒需要操作系统进行调度,开销较大。 | 10 |
需要注意的是,以上锁的类型是在 Java 虚拟机层面实现的,而不是 Java 语言层面的。不同的 Java 虚拟机实现可能会有所不同。
# 2.描述下锁升级?
这几个状态会随着竞争情况逐渐升级.锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁.这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率
# 3.偏向锁的原理?
为了在只有一个线程执行同步块时提高性能,当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID
,以后该线程在进入和退出同步块时不需要进行 CAS 操作来加锁和解锁,只需简单地测试一下对象头的 Mark Word 里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下 Mark Word 中偏向锁的标识是否设置成 1 (表示当前是偏向锁):
- 如果没有设置,则使用 CAS 竞争锁;
- 如果设置了,则尝试使用 CAS 将对象头的偏向锁指向当前线程。
当设置了 HashCode 后,不能设置偏向锁,只能升级为轻量级锁,因为 HashCode 生成后,没办法把 HashCode 占位改为偏向线程的 id,这里 HashCode 会存在显式调用(直接调用 HashCode 方法)和隐式调用(比如 HashMap 的 put 方法)
# 4.JVM 偏向锁可以关闭吗?
偏向锁在 Java 6 和 Java 7 里是默认启用的,但是它在应用程序启动 4 秒钟之后才激活,如有必要可以使用 JVM 参数来关闭延迟:
-XX:BiasedLockingStartupDelay=0
如果你确定应用程序里所有的锁通常情况下处于竞争状态,可以通过 JVM 参数关闭偏向锁:
-XX:-UseBiasedLocking=false
那么程序默认会进入轻量级锁状态。
锁相关的参数
-XX:+UseBiasedLocking 启用偏向锁,默认启用
-XX:+PrintFlagsFinal 打印JVM所有参数
-XX:BiasedLockingStartupDelay=4000 偏向锁启用延迟时间,默认4秒
-XX:BiasedLockingBulkRebiasThreshold=20 批量重偏向阈值,默认20
-XX:BiasedLockingBulkRevokeThreshold=40 批量撤销阈值,默认40
-XX:BiasedLockingDecayTime=25000
2
3
4
5
6
# 5.偏向锁撤销原理?
偏向锁使用了一种等到竞争出现才释放偏向锁的机制:偏向锁只有遇到其他线程尝试竞争偏向锁时
,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点
(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定
(标志位为“01”,偏向标志位为 0)或轻量级锁
(标志位为“00”)的状态。
如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的 Mark Word 要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。
# 6.批量重偏向和批量撤销?
rebias & revoke
bulk rebias(批量重偏向):
如果已经偏向 t1 线程的对象,在 t2 线程申请锁时撤销偏向后升级为轻量级锁的对象数量达到一定值(20),后续的申请会批量重偏向到 t2 线程;
bulk revoke(批量撤销):
在单位时间(25s)内某种 Class 的对象撤销偏向的次数达到一定值(40),JVM 认定该 Class 竞争激烈,撤销所有关联对象的偏向锁,且新实例也是不可偏向的;并且有第三条线程 C 加入了,这个时候会触发批量撤销。JVM 会标记该对象不能使用偏向锁,以后新创建的对象,直接以轻量级锁开始。这个时候,才是真正的完成了锁升级。
# 7.轻量级锁加锁和解锁的过程?
轻量级锁是为了在线程交替执行同步块时提高性能。
轻量级锁加锁:
- 线程在执行同步块之前,JVM 会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录空间中,官方称为 Displaced Mark Word;
- 然后线程尝试使用 CAS 将对象头中的 Mark Word 替换为指向锁记录的指针。
- 如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
轻量级解锁:
会使用原子的 CAS 操作将 Displaced Mark Word 替换回到对象头,如果成功,则表示没有竞争发生.如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁
# 8.Mark Word 说明
Mark Word 64 位,格式如下:
java 对象头长度:
如果是无锁状态,则分布为 25 位 unused,31 位 hashcode,1 位 unused,4 位 age,偏向标志位,锁的标志位
如果是偏向锁状态,54 位 thread 的 id,2 位 epoch,,1 位 unused,4 位 age,偏向标志位,锁的标志位
如果是轻量级锁,62 位的 lock_record,锁的标志位
如果是重量级锁,62 位的 monitor 指针,锁的标志位,owner 指向持有锁的线程
# 9.Mark Word 中的 epoch
epoch 是 Mark Word 中的一部分,只存在于偏向锁状态,占 2 位。锁对应的类同样用 2 位存了 epoch。
epoch 的作用就是标记偏向的合法性
,说通俗点,就是看这个偏向锁有没有其他线程正在用
。如果不用 epoch,那每次想要重偏向,都得去遍历所有的线程栈看看有没有其他线程在用。
每次撤销数量刚到 20 的时候,锁的类 epoch 都会+1,并且更新加锁状态的同类锁对象,那么那些不加锁的锁对象 epoch 就和类的 epoch 不一样了,那就可以知道哪些偏向锁是空闲的了。
epoch 只有两位,肯定会循环,但不会影响准确性。
# 10.synchronized 锁信息在对象中存储?
synchronized 用的锁是存在 Java 对象头(Mark Word)里的。如果对象是数组类型,则虚拟机用 3 个字宽(Word)存储对象头,如果对象是非数组类型,则用 2 字宽存储对象头。在 32 位虚拟机中,1 字宽等于 4 字节,即 32bit.数组的 1 字宽(4 字节)存储数组的长度信息.
无锁状态下 32 位 JVM 的 Mark Word 的默认存储结构如下:
有锁状态的 Mark Word 的信息变化如下,并从下图中能够看到锁的信息的确是放到 Mark Word 中的,并且不同的锁类型, Mark Word 中的信息会有变化。
# 11.偏向锁和轻量级锁区别?
偏向锁和轻量级锁都是为了提高多线程并发访问的性能,但是它们的实现方式和适用场景有所不同。
- 实现方式:偏向锁和轻量级锁的实现方式不同。偏向锁在对象头中记录拥有锁的线程 ID,判断锁状态时只需要比较锁的标记位和当前线程的 ID 是否相等即可。轻量级锁则需要使用 CAS 操作来进行锁操作,同时需要使用锁记录来存储锁的状态和拥有者线程的 ID。
- 适用场景:偏向锁适用于线程之间的竞争较少的情况,例如对象被一个线程锁定后,其他线程很少访问该对象。在这种情况下,使用偏向锁可以避免无谓的锁竞争,从而提高性能。轻量级锁适用于线程之间的竞争较多,但是锁持有时间较短的情况。在这种情况下,使用轻量级锁可以减少线程的上下文切换和阻塞唤醒的开销,从而提高性能。
- 锁状态转换:偏向锁和轻量级锁的锁状态转换也有所不同。偏向锁在有其他线程竞争时会立即升级成轻量级锁或重量级锁,而轻量级锁在自旋超过一定次数或竞争线程过多时会膨胀成重量级锁。
# 12.描述下锁升级的过程?
偏向锁:
- A 线程获取偏向锁,并且 A 线程死亡退出。B 线程争抢偏向锁,会直接升级当前对象的锁为轻量级锁。这只是针对我们争抢了一次。
- A 线程获取偏向锁,并且 A 线程没有释放偏向锁,还在 sync 的代码块里边。B 线程此时过来争抢偏向锁,会直接升级为重量级锁。
- A 线程获取偏向锁,并且 A 线程释放了锁,但是 A 线程并没有死亡还在活跃状态。B 线程过来争抢,会直接升级为轻量级锁。综上所述,当我们尝试第一次竞争偏向锁时,如果 A 线程已经死亡,升级为轻量级锁;如果 A 线程未死亡,并且未释放锁,直接升级为重量级锁;如果 A 线程未死亡,并且已经释放了锁,直接升级为轻量级锁。
- A 线程获取偏向锁,并且 A 线程没有释放偏向锁,还在 sync 的代码块里边。B 线程多次争抢锁,会在加锁过程中采用重量级锁;但是,一旦锁被释放,当前对象还是会以轻量级锁的初始状态执行。
- A 线程获取偏向锁,并且 A 线程释放了锁,但是 A 线程并没有死亡还在活跃状态。B 线程过来争抢。部分争抢会升级为轻量级锁;部分争抢会依旧保持偏向锁。
偏向锁到轻量级锁:
线程 1 作为持有者,线程 2 作为竞争者出现了,线程 2 由于 cas 替换偏向锁中的线程 id 失败,发起了撤销偏向锁的动作.此时线程 1 还存活,暂停了线程 1 的线程,此时线程 1 的栈中的锁记录会被执行遍历,将对象头中的锁的是否是偏向锁位置改成 0,并将锁标志位从 01(偏向锁)改成 00(轻量级锁),升级为轻量级锁。
轻量级锁到重量级锁:
线程 1 为锁的持有者,线程 2 为竞争者.线程 2 尝试 CAS 操作将轻量级锁的指针指向自己栈中的锁记录失败后。发起了升级锁的动作。线程 2 会将 Mark Word 中的锁指针升级为重量级锁指针。自己处于阻塞状态,因为此时线程 1 还没释放锁。当线程 1 执行完同步体后,尝试 CAS 操作将 Displaced Mark Word 替换回到对象头时,此时肯定会失败,因为 mark word 中已经不是原来的轻量级指针了,而是线程 2 的重量级指针.那么此时线程 1 很无奈,只能释放锁,并唤醒其他线程进行锁竞争。此时线程 2 被唤醒了,获取了重量级锁。
# 13.比较三种锁的优缺点及使用场景?
其实偏向锁,本就为一个线程的同步访问的场景.在出现线程竞争非常小的环境下,适合偏向锁。轻量级锁自旋获取线程,如果同步块执行很快,能减少线程自旋时间,采用轻量级锁很适合。重量级锁就不用多说了,synchronized 就是经典的重量级锁。使用 synchronized 不一定会升级为重量级锁,得看条件.
偏向锁:
自始至终,对这把锁都不存在竞争,只需要做个标记,这就是偏向锁,每个对象都是一个内置锁(内置锁是可重入锁),一个对象被初始化后,还没有任何线程来获取它的锁时,那么它就是可偏向的,当有线程来访问并尝试获取锁的时候,它就会把这个线程记录下来,以后如果获取锁的线程正式偏向锁的拥有者,就可以直接获得锁,偏向锁性能最好。
轻量级锁:
轻量级锁是指原来是偏向锁的时候,这时被另外一个线程访问,存在锁竞争,那么偏向锁就会升级为轻量级锁,线程会通过自旋的形式尝试获取锁,而不会陷入阻塞。
重量级锁:
重量级锁是互斥锁,主要是利用操作系统的同步机制实现的,当多个线程直接有并发访问的时候,且锁竞争时间长的时候,轻量级锁不能满足需求,锁就升级为重量级锁,重量级锁会使得其他拿不到锁的线程陷入阻塞状态,重量级锁的开销相对较大。
偏向所锁,轻量级锁都是乐观锁,重量级锁是悲观锁。
一个对象刚开始实例化的时候,没有任何线程来访问它的时候。它是可偏向的,意味着,它现在认为只可能有一个线程来访问它,所以当第一个线程来访问它的时候,它会偏向这个线程,此时,对象持有偏向锁。偏向第一个线程,这个线程在修改对象头成为偏向锁的时候使用 CAS 操作,并将对象头中的 ThreadID 改成自己的 ID,之后再次访问这个对象时,只需要对比 ID,不需要再使用 CAS 在进行操作。
一旦有第二个线程访问这个对象,因为偏向锁不会主动释放,所以第二个线程可以看到对象是偏向状态,这时表明在这个对象上已经存在竞争了。
- 检查原来持有该对象锁的线程是否依然存活,如果挂了,则可以将对象变为无锁状态,然后重新偏向新的线程。
- 如果原来的线程依然存活,则马上执行那个线程的操作栈,检查该对象的使用情况,
- 如果仍然需要持有偏向锁,则偏向锁升级为轻量级锁,(偏向锁就是这个时候升级为轻量级锁的),此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程会进入自旋等待获得该轻量级锁;
- 如果不存在使用了,则可以将对象回复成无锁状态,然后重新偏向。
轻量级锁认为竞争存在,但是竞争的程度很轻,一般两个线程对于同一个锁的操作都会错开,或者说稍微等待一下(自旋),另一个线程就会释放锁。 但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁膨胀为重量级锁,重量级锁使除了拥有锁的线程以外的线程都阻塞,防止 CPU 空转。
# 14.为什么要引入轻量级锁?
解答这个问题,先要自问一句,不引入轻量级锁,直接用重量级锁有什么坏处。我们知道重量级锁,如果线程竞争锁失败,会直接进入阻塞(Blocked)状态,阻塞线程需要 CPU 从用户态转到内核态,代价较大,假设一个线程刚刚阻塞不久这个锁就被释放了,这个线程被唤醒后,还需要从内核态切换到用户态,一来一回就两次状态切换,那这个代价就有点得不偿失了,因此这个时候就干脆不阻塞这个线程,让它自旋的等待锁释放。如果自旋的等待锁的释放,正好是我们的轻量级锁的特性,那么为什么引入轻量级锁就明白了。
JVM 如何开启轻量级锁:
JDK 1.5 使用- XX:+UseSpinning 手动开启。
JDK1.6 及后续版本默认开启轻量级锁。
# 15.什么是适应性自旋?
和普通自旋的区别?
JDK 1.5 的自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。自旋次数可以设定,通过自行设置自旋次数,此处举例说明设置为 10 次。
-XX:PreBlockSpin=10
在 JDK 1.6 引入了适应性自旋锁,XX:PreBlockSpin 参数也就没有用了.适应性自旋锁意味着自旋的时间不在是固定的了,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定,基本认为一个线程上下文切换的时间是最佳的一个时间,同时 JVM 还针对当前 CPU 的负荷情况做了较多的优化,如下三点优化非常突出。
如果平均负载小于 CPU 则一直自旋
如果有超过(CPU/2)个线程正在自旋,则后来线程直接阻塞(升级为重量级锁)
如果正在自旋的线程发现 Owner 发生了变化则延迟自旋时间(自旋计数)或进入阻塞(升级为重量级锁)
# 四.锁的内存语义
# 1.对于同步方法,如何实现原子操作?
处理器提供总线锁定和缓存锁定
两个机制来保证复杂内存操作的原子性。
总线锁定:
如果多个处理器同时对非同步共享变量进行读改写操作(i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致.原因可能是多个处理器同时从各自的缓存中读取变量 i,分别进行加 1 操作,然后分别写入系统内存中。
对于同步方法操作 i++时,部分处理器使用总线锁就是来解决这个问题的.所谓总线锁就是使用处理器提供的一个 LOCK#信号(参见 93 题的 Lock 汇编指令),当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存,只不过总线锁定开销很大。
缓存锁定:
所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在 Lock 操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言 LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效.
# 2.为什么不淘汰总线锁定?
缓存锁定性能优于总线锁定,为什么不淘汰总线锁定?
有两种情况下处理器不会使用缓存锁定。
第一种情况是:
当操作的数据不能被缓存在处理器内部(比如外部磁盘数据),或操作的数据跨多个缓存行(cache line)时,则处理器会调用总线锁定。第二种情况是:
有些处理器不支持缓存锁定.对于 Intel 486 和 Pentium 处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
# 3.什么是原子操作?说说 i++操作
原子(atomic)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为“不可被中断的一个或一系列操作”。
i++是读改写系列操作,操作中包括如下三个:
- 读操作:读 i 的当前值;
- 改操作:在 i 的当前值上做+1 操作;
- 写:将修改后的值写回内存。
# 4.java 如何保证原子操作的?
在 Java 中可以通过锁和循环 CAS 的方式
来实现原子操作
CAS:
从 Java 1.5 开始,JDK 的并发包里提供了一些类来支持原子操作,如 AtomicBoolean (用原子方式更新 boolean 值)、 AtomicInteger (用原子方式更新 int 值)和 AtomicLong (用原子方式更新的 long 值),其中就是依靠 CAS 操作来完成的。
锁:
如 synchronized 以及 Lock 锁,线程获取对象锁之后,会完成系列操作后释放锁,运行期间,其他线程会处于阻塞状态,因此是原子性的操作.
锁机制保证了只有获得锁的线程才能够操作锁定的内存区域.JVM 内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁.有意思的是除了偏向锁,JVM 实现锁的方式都用了循环 CAS,即当一个线程想进入同步块的时候使用循环 CAS 的方式来获取锁,当它退出同步块的时候使用循环 CAS 释放锁
# 5.锁的获取和释放内存语义?
对比锁释放-获取的内存语义与 volatile 写一读的内存语义可以看出:
锁释放与 volatile 写有相同的内存语义;
锁获取与 volatile 读有相同的内存语义。
下面对锁释放和锁获取的内存语义做个总结。
线程 A 释放一个锁,实质上是线程 A 向接下来将要获取这个锁的某个线程发出了(线程 A 对共享变量所做修改的)消息。
线程 B 获取一个锁,实质上是线程 B 接收了之前某个线程发出的(在释放这个锁之前对共享变变量所做修改的)消息。
线程 A 释放锁,随后线程 B 获取这个锁,这个过程实质上是线程 A 通过主内存向线程 B 发送消息。
从对 ReentrantLock 的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式。
- 利用 volatile 变量的写-读所具有的内存语义。
- 利用 CAS 所附带的 volatile 读和 volatile 写的内存语义。
# 6.CAS 操作的原理?
JDK 文档对该方法的说明如下:如果当前状态值(内存值)等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义。
所谓的 CAS,其实是个简称,全称是 Compare And Swap,对比之后交换数据.内存值–预期值–新值
//原子类Atomic中的cas
public final boolean compareAndSet(boolean expect, boolean update){
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
2
3
4
5
6
//底层实现是用的Unsafe的cas,包含3个方法
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
2
3
4
5
6
上面的方法,有几个重要的参数:
this:
Unsafe 对象本身,需要通过这个类来获取 value 的内存偏移地址。valueOffset:
value 变量的内存偏移地址。expect:
期望更新的值。update:
要更新的最新值。
//CAS-c++源码:
inline jint Atomic::cmpxchg (jint exchange value, volatile jint*dest,
jint compare value){
// alternative for InterlockedCompareExchange
int mp=os::isMP();//是否为多核心处理器
_asm {
mov edx, dest //要修改的地址
mov ecx, exchange_value //新值值
mov eax,compare_value //期待值
LOCK_IF_MP(mp) //如果是多处理器,在下面指令前加上LOCK前缀
cmpxchg dword ptr [edx],ecx//[edx]与eax对比,相同则[edx]=ecx,否则不操作
}
}
2
3
4
5
6
7
8
9
10
11
12
13
这里看到有一个 LOCK_IF_MP,作用是如果是多处理器,在指令前加上 LOCK 前缀,因为在单处理器中,是不会存在缓存不一致的问题的,所有线程都在一个 CPU 上跑,使用同一个缓存区,也就不存在本地内存与主内存不一致的问题,不会造成可见性问题.然而在多核处理器中,共享内存需要从写缓存中刷新到主内存中去,并遵循缓存一致性协议通知其他处理器更新缓存.
Lock 在这里的作用:
- 在 cmpxchg 执行期间,锁住内存地址[edx],其他处理器不能访问该内存,保证原子性.即使是在 32 位机器上修改 64 位的内存也可以保证原子性。
- 将本处理器上写缓存全部强制写回主存中去,保证每个线程的本地内存与主存一致。
- 禁止 cmpxchg 与前后任何指令重排序,防止指令重排序。
# 7.CAS 存在的问题?
CAS 主要有 3 个问题:
- ABA 问题
- 循环时间长开销大
- 只能保证一个共享变量的原子操作
ABA 问题.因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了.ABA 问题的解决思路就是使用版本号.在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 A→ B→A 就会变成 1A→2B→3A。
解决一:
从 Java 1.5 开始,JDK 的 Atomic 包里提供了一个类 AtomicStampedReference
来解决 ABA 问题.这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
解决二:
使用 AtomicMarkableReference 可以通过 Boolean 类型进行判断
CAS 循环时间太长,会有什么问题:
自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。
使用 CAS 自旋,需要考虑业务场景是否是多任务快速处理的场景,如果单个任务处理够快且任务量大,使用 CAS 会带来很好地效果.轻量级锁的设计原理底层就是使用了 CAS 的操作原理。
# 8.AtomicMarkableReference
AtomicMarkableReference 与 AtomicStampedReference 一样也可以解决 ABA 的问题,两者唯一的区别是,AtomicStampedReference 是通过 int 类型的版本号,而 AtomicMarkableReference 是通过 boolean 型的标识来判断数据是否有更改过。
既然有了 AtomicStampedReference 为啥还需要再提供 AtomicMarkableReference 呢,在现实业务场景中,不关心引用变量被修改了几次,只是单纯的关心是否更改过。
AtomicMarkableReference详解:
// 静态内部类,封装了 变量引用 和 版本号
private static class Pair<T> {
final T reference; // 变量引用
final boolean mark; // 修改标识
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
private volatile Pair<V> pair;
/**
*
初始化,构造成一个 Pair 对象,由于 pair 是用 volatile 修饰的所以在构造是线程安全的
* @param initialRef 初始化变量引用
* @param initialMark 修改标识
*/
public AtomicMarkableReference(V initialRef, boolean initialMark) {
pair = Pair.of(initialRef, initialMark);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
常用方法:
// 构造函数,初始化引用和标记值
public AtomicMarkableReference(V initialRef, boolean initialMark)
// 以原子方式获取当前引用值
public V getReference()
// 以原子方式获取当前标记值
public int isMarked()
// 以原子方式获取当前引用值和标记值
public V get(boolean[] markHolder)
// 以原子的方式同时更新引用值和标记值
// 当期望引用值不等于当前引用值时,操作失败,返回false
// 当期望标记值不等于当前标记值时,操作失败,返回false
// 在期望引用值和期望标记值同时等于当前值的前提下
// 当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true
// 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回true
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
// 以原子的方式同时更新引用值和标记值
// 当期望引用值不等于当前引用值时,操作失败,返回false
// 当期望标记值不等于当前标记值时,操作失败,返回false
// 在期望引用值和期望标记值同时等于当前值的前提下
// 当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true
// 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回true
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
// 以原子方式设置引用的当前值为新值newReference
// 同时,以原子方式设置标记值的当前值为新值newMark
// 新引用值和新标记值只要有一个跟当前值不一样,就进行更新
public void set(V newReference, boolean newMark)
// 以原子方式设置标记值为新的值
// 前提:引用值保持不变
// 当期望的引用值与当前引用值不相同时,操作失败,返回fasle
// 当期望的引用值与当前引用值相同时,操作成功,返回true
public boolean attemptMark(V expectedReference, boolean newMark)
// 使用`sun.misc.Unsafe`类原子地交换两个对象
private boolean casPair(Pair<V> cmp, Pair<V> val)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 9.final 域的内存语义?
final
域的内存语义是指一个被final
修饰的域在构造函数执行完成后,其值对于其他线程是可见的,因此其他线程可以安全地访问该域的值,而无需进行同步控制。
具体来说,当一个线程在构造函数中完成对一个final
域的赋值后,该线程会释放所有已经初始化的final
域的内存屏障(Memory Barrier),这会导致所有后续的读操作都可以看到该域的值,而不会看到该域的默认值或初始值。同时,由于内存屏障的作用,其他线程也可以看到该域的最新值,而不需要进行同步控制。
需要注意的是,final
域的内存语义仅适用于被final
修饰的域,而不适用于在构造函数中赋值但未被final
修饰的域。对于非final
域,其他线程在访问该域的值时,可能会看到该域的默认值或初始值,而不是构造函数中赋予的值。因此,在多线程环境下,应该尽可能地使用final
域来保证线程安全性。
public class Juc_book_fang_12_FinalExample {
int i; // 普通变量
final int j; // final变量
static Juc_book_fang_12_FinalExample obj;
public Juc_book_fang_12_FinalExample() { // 构造函数
i = 1; // 写普通域
j = 2; // 写final域
}
public static void writer() { // 写线程A执行
obj = new Juc_book_fang_12_FinalExample();
}
public static void reader() { // 读线程B执行
Juc_book_fang_12_FinalExample object = obj;// 读对象引用
int a = object.i; // 读普通域
int b = object.j; // 读final域
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
写final
规则:
- 一个被
final
修饰的域必须在声明时或构造函数中进行初始化。 - 对于基本类型和不可变对象,可以将其声明为
public static final
,表示常量。 - 对于可变对象,应该避免将其声明为
public static final
,因为这样会使对象的引用被固定下来,而无法被替换。 - 在多线程环境下,
final
域可以用于保证线程安全性,因为final
域的内存语义保证了它在构造函数中赋值后对于其他线程是可见的。
读final
重排序规则:
在 Java 中,读final
域的重排序规则是比较宽松的,即在读操作之前可以进行一定的重排序,但是不能影响到读操作的正确性。
具体来说,当一个线程在读取一个final
域的值时,可能会出现以下情况:
- 读操作可以在构造函数中的写操作之前执行,但是读操作不能看到构造函数中未初始化的值。
- 读操作可以在构造函数中的写操作之后执行,但是读操作必须看到构造函数中初始化的值。
这种规则保证了在多线程环境下,对于被final
修饰的域的读操作是安全的,不会看到未初始化的值或者重复的值。需要注意的是,这种规则仅适用于读操作,对于写操作,final
域的内存语义要求必须在构造函数中完成初始化,不能进行重排序。
# 10.happens-before 的理解?
与程序员密切相关的 happens-before 规则如下。
程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作。
监视器锁规则:对一个锁的解锁,happens-before 于随后对这个锁的加锁。
volatile 变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。
传递性:如果 Ahappens-beforeB,且 Bhappens-beforeC,那么 Ahappens-beforeC
注意两个操作之间具有 happens-before 关系,并不意味着前一个操作必须要在后一个操作之前执行!happens-before 仅仅要求前一个操作(执行的结果)对后一个操作可见,且前一个操作按顺序排在第二个操作之前(the frst is visible to and ordered before the second)。happens-before 的定义很微妙,后文会具体说明 happens-before 为什么要这么定义。
# 11.什么是 as-if-serial 语义?
不管怎么重排序,单线程执行结果不变
为了遵守 as-if-serial 语义,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为为这种重排序会改变执行结果。但是,如果操作之间不存在数据依赖关系,这些操作就可能被编译器和处理器重排序。
# 五.AQS 介绍
# 1.谈谈常见的锁有哪些?
下面是对锁的粒度、乐观锁/悲观锁、公平锁/非公平锁、排他锁/共享锁、读写锁、自旋锁的描述和区分,使用表格的形式美化并用中文描述:
类别 | 描述 |
---|---|
锁的粒度 | 锁的粒度指的是锁定的范围,即锁保护的是整个对象还是对象的一部分。 |
synchronized 锁的状态 | Java 中最常用的锁机制,使用关键字synchronized 实现。在锁定对象时,对象的状态可以是无锁、偏向锁、轻量级锁或重量级锁。 |
偏向锁 | 偏向锁是指当只有一个线程访问对象的同步块时,该线程会自动获取锁,避免了多次加锁和解锁的开销,提高性能。 |
轻量级锁 | 轻量级锁是指当多个线程争用锁时,通过自旋的方式进行一段时间的忙等待,不进入阻塞状态。如果在自旋期间成功获取到锁,那么就是轻量级锁;否则,升级为重量级锁。 |
重量级锁 | 重量级锁是指当多个线程争用锁时,无法获取锁的线程会进入阻塞状态,释放 CPU 资源,直到持有锁的线程释放锁,阻塞的线程才会被唤醒。重量级锁的状态切换需要在用户态和内核态之间进行上下文切换,开销较大。 |
乐观锁/悲观锁 | 乐观锁和悲观锁是针对并发访问的数据的不同策略。 |
悲观锁 | 悲观锁认为"坏事一定会发生",在操作数据前先锁定数据,避免其他线程修改数据,保证数据的一致性。悲观锁常用于 synchronized、ReentrantLock 等锁机制。 |
乐观锁 | 乐观锁认为"坏事未必会发生",在操作数据时不会立即锁定数据,而是先进行操作,然后再检查是否有其他线程同时修改了数据。如果没有冲突,则操作成功;如果发现冲突,则需要进行回滚或重试操作。乐观锁的实现常用的方式是 CAS(Compare And Swap)。 |
公平锁/非公平锁 | 公平锁和非公平锁是针对线程获取锁的顺序的不同策略。 |
公平锁 | 公平锁遵循"先到先得"的原则,等待时间最长的线程优先获取锁。所有线程按照它们请求锁的顺序进行获取。 |
非公平锁 | 非公平锁是一种抢占式的锁策略,没有严格的获取顺序。新来的线程有可能在老的线程之前获取到锁。非公平锁通过减少线程上下文切换的次数,提高了性能。ReentrantLock 可以作为公平锁或非公平锁使用。 |
排他锁/共享锁 | 排他锁和共享锁是针对多个线程对同一资源的访问权限的不同策略。 |
排他锁 | 排他锁也称为写锁,表示只有一个线程能够独占地获取锁,其他线程不能同时访问被锁定的代码。在排他锁下,线程在获取到锁之前都会被阻塞。ReentrantLock 和 synchronized 都可以实现排他锁。 |
共享锁 | 共享锁也称为读锁,表示多个线程可以同时获取锁并共享被锁定的代码。只有在没有任何线程持有写锁的情况下,才能获取读锁。在共享锁下,多个线程可以同时读取数据,但不能写入数据。ReentrantReadWriteLock 是实现读写锁的一种机制。 |
读写锁 | 读写锁是一种特殊的锁机制,它允许多个线程同时读取数据,但只允许一个线程写入数据。在读写锁下,当没有线程持有写锁时,多个线程可以获取读锁;当有线程持有写锁时,其他线程不能获取读锁或写锁。读写锁适用于对读操作较多、写操作较少的情况,可以提高并发性。ReentrantReadWriteLock 是实现读写锁的一种机制。 |
自旋锁 | 自旋锁是一种乐观锁的实现方式,当一个线程尝试获取锁时,如果锁已经被其他线程持有,该线程会循环等待,不断尝试获取锁,直到获取成功。自旋锁适用于锁的持有时间非常短暂的场景,避免了线程切换的开销。但是如果锁的持有时间较长,自旋会导致 CPU 资源的浪费。自旋锁的实现常用的方式是 CAS。 |
# 2.什么是 AQS?
AQS(AbstractQueuedSynchronizer)是 Java 并发包中实现锁、同步器等的基础框架,它提供了一种便于实现自定义同步器的模板方法。AQS 的核心思想是使用一个 FIFO 双向链表(即等待队列)来管理线程的状态和竞争资源的获取。AQS 基于这个等待队列,通过使用 volatile 变量和 CAS(Compare And Swap)指令来实现线程的等待和唤醒、资源的获取和释放。
AQS 的主要原理如下:
- 状态管理:
- AQS 使用一个整型的状态变量(state)来表示资源的状态。线程在访问共享资源时,需要通过 CAS 原子操作来获取或释放这个状态变量。根据不同的同步器实现,状态变量可以表示锁的状态、信号量的剩余许可数等。
- 等待队列:
- AQS 内部维护一个 FIFO 双向链表,用于保存等待获取资源的线程。这个等待队列是 AQS 的核心数据结构,它管理着所有在同步器上等待的线程。等待队列中的每个节点代表一个等待线程。
- 线程状态转换:
- 当一个线程需要获取资源时,如果发现资源已被其他线程占用,则该线程会进入等待状态。在 AQS 中,等待状态有两种:独占模式(exclusive mode)和共享模式(shared mode)。独占模式用于实现排他锁,而共享模式用于实现读写锁等并发控制机制。
- 等待与唤醒:
- 当一个线程需要等待资源时,它会被包装成一个等待节点(Node)并加入到等待队列中。等待节点会被挂起,进入等待状态。当资源被释放或满足某个条件时,AQS 会根据具体的同步器规则,从等待队列中唤醒等待的线程,使其重新进入就绪状态,准备竞争资源。
- 自旋与阻塞:
- 在 AQS 中,线程在等待状态时,会进行自旋尝试获取资源。自旋是指线程在不断地检查资源的状态,如果资源可用则获取;否则,线程可能会自旋等待一段时间。如果自旋等待仍然无法获取资源,线程将被阻塞,进入阻塞状态,不再占用 CPU 资源。
总结下 AQS 是什么:
- AQS 是一个同步的基础框架,基于一个先进先出的队列。
- 锁机制依赖一个原子值的状态。
- AQS 的子类负责定义与操作这个状态值,但必须通过 AQS 提供的原子操作。
- AQS 剩余的方法就是围绕队列,与线程阻塞唤醒等功能。
# 4.Node 和 ConditionObject
AQS 中有两个重要的成员变量:Node 和 ConditionObject
- Node
- Node 的作用是存储获取锁失败的线程,并且维护一个 CLH FIFO 队列,该队列是会被多线程操作的,所以 Node 中大部分变量都是被 volatile 修饰,并且通过自旋和 CAS 进行原子性的操作。
- 有一个模式的属性:
独占模式和共享模式
,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已.
- ConditionObject
- 条件队列
- 该类主要是为了让子类实现独占模式。AQS 框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。
node 方法和属性值的含义:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出 npe |
nextWaiter | 指向下一个处于 CONDITION 状态的节点 |
next | 后继指针 |
线程两种锁的模式:
模式 | 含义 |
---|---|
SHARED | 共享,多个线程可同时执行,如Semaphore /CountDownLatch |
EXCLUSIVE | 独占,只有一个线程能执行,如ReentrantLock |
# 5.AQS 基于什么设计模式实现的?
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
同步器可重写的方法如下图所示:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
实现自定义同步组件时,将会调用同步器提供的模板方法
方法名称 | 描述 |
---|---|
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于 0 的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
# 6.AQS 底层同步队列的原理?
同步器依赖内部的同步队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述.
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法 |
void acquireInterruptibly(int arg) | 与 acquire(intarg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 并返回 |
boolean tryAcquireNanos(int arg,long nanos) | 在 acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回 false,如果获取到了返回 true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进人同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInteruptibly(int arg) | 与 acquireShared(intarg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在 acquireSharedInterruptibly(intarg)基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread>getQueuedThreads | 获取等待在同步队列上的线程集合 |
同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部
同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点.试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于 CAS 的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联
同步队列遵循 FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的 next 引用即可。
AQS会旋转几次获取锁:
会旋转 2 次,第一次的时候,未获取到会生成队列节点,第二次是是否为头结点,第二次可以是多次,可能出现非公平锁的饥饿状态,获取锁的过程实际上是获取 state 状态.
# 7.AQS 独占式同步状态获取与释放?
AQS 的 acquire 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
通过调用同步器的 acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的 tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过 addWaiter(Node node)方法将该节点加入到同步队列的尾部,当出现竞争时,采用 CAS 的方式加入到同步队列的尾部.最后调用 acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态.如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//快速尝试在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 头节点可能为空
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
上述代码通过使用 compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
在 enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置.可以看出,enq(final Node node)方法将并发添加节点的请求通过 CAS 变得“串行化”了。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
如果前驱节点是头节点,则尝试获取同步锁.而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。如果前驱节点不是头节点,则获取同步锁失败,那么线程继续在同步队列中等待.独占式同步状态获取流程,也就是 acquire(int arg)方法调用流程如下所示:
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态.通过调用同步器的 release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态).该方法代码如下所示:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
该方法执行时,会唤醒头节点的后继节点线程, unparkSuccessor(Node node)方法使用 LockSupport 来唤醒处于等待状态的线程。
# 8.共享式同步状态获取与释放?
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态.以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。在 acquireShared(int arg)方法中,同步器调用 tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为 int 类型,当返回值大于等于 0 时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(intarg)方法返回值大于等于 0。在 doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于 0,表示该次获取同步状态成功并从自旋过程中退出。
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点.对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于 tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作可能会同时来自多个线程。
//获取同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//释放同步状态
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;// loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 9.AQS 独占式超时获取锁和可中断获取锁?
在 Java 5 之前,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在 synchronized 上,等待着获取锁.在 Java 5 中,同步器提供了 acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException。
超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”, doAcquireNanos(int
arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性.针对超时获取,主要需要计算出需要睡眠的时间间隔 nanosTimeout,为了防止过早通知, nanosTimeout 计算公式为: nanosTimeout-=now-lastTime,其中 now 为当前唤醒时间, lastTime 为上次唤醒时间,如果 nanosTimeout 大于 0 则表示超时时间未到,需要继续睡眠 nanosTimeout 纳秒,反之,表示已经超时。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
如果 nanosTimeout 小于等于 spinForTimeoutThreshold (1000 纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程.原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让 nanosTimeout 的超时从整体上表现得反而不精确.因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。
# 10.AQS 实现的工具类?
AbstractQueuedSynchronizer (java.util.concurrent.locks)
CountDownLatch (java.util.concurrent)
ThreadPoolExecutor (java.util.concurrent)
LimitLatch (org.apache.tomcat.util.threads)
ReentrantLock (java.util.concurrent.locks)
ReentrantReadWriteLock (java.util.concurrent.locks)
Semaphore (java.util.concurrent)
# 11.tryAcquireShared(int)函数返回值?
- 负数:表示获取失败;
- 0:获取成功,但没有剩余资源;
- 正数:获取成功,且有剩余资源;
# 六.ReentrantLock 和 LockSupport
# 1.什么是可重入锁?
首先明确下 synchronized 和 lock 接口均为可重入锁。
重入锁,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。
实现原理如下:
- 线程再次获取锁.锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放.线程重复 n 次获取了锁,随后在第 n 次释放该锁后,其他线程能够获取到该锁.锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于 0 时表示锁已经成功释放。
# 2.ReentrantLock 非公平锁获取与释放?
使用非公平锁加锁过程:
- ReentrantLock:lock()。
- NonfairSync:lock()。
- AbstractQueuedSynchronizer:compareAndSetState(int expect,int update)。
final boolean nonfairTryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0){
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);//以cas原子操作更新state
return true;
}
} else if (current == getExclusiveOwnerThread()){
int nextc = c + acquires;
if (nextc < 0)// overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求 ReentrantLock 在释放同步状态时减少同步状态值
在使用非公平锁时,解锁方法 unlock()调用轨迹如下。
- ReentrantLock:unlock()
- AbstractQueuedSynchronizer:release(int arg)。
- Sync:tryRelease(int releases)。
protected final boolean tryRelease(int releases){
int c = getState()- releases;
if (Thread.currentThread()!= getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
# 3.ReentrantLock 公平锁获取和释放?
公平锁就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照 FIFO 的规则从队列中取到自己。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。
使用公平锁时,加锁方法 lock()调用轨迹如下。
- ReentrantLock:lock()。
- FairSync:lock()
- AbstractQueuedSynchronizer:acquire(int arg)。
- ReentrantLock:tryAcquire(int acquires)。
在第 4 步真正开始加锁,下面是该方法的源代码。
在使用公平锁时,解锁方法 unlock()调用轨迹如下。
- ReentrantLock:unlock()
- AbstractQueuedSynchronizer:release(int arg)。
- Sync:tryRelease(int releases)。
在第 3 步真正开始释放锁,下面是该方法的源代码。
c==0 代表是第一次进入,不是重复获取锁,所以不需要加其他的判断,第一步需要读取 volatile 变量 state
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是 FIFO。该方法与 nonfairTryAcquire(int acquires)比较,唯一不同的置为判断条件多了 hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock(){
acquire(1);
}
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();//获取锁,读取volatile变量state
if (c == 0){
if (!hasQueuedPredecessors()&&
compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()){
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
释放锁的过程和非公平锁一样
protected final boolean tryRelease(int releases){
int c = getState()- releases;
if (Thread.currentThread()!= getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0){
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//释放锁,写volatile变量state
return free;
}
2
3
4
5
6
7
8
9
10
11
12
# 4.公平锁和非公平锁总结?
ReentrantLock 是 Java 并发包提供的一种重入锁,它可以作为公平锁或非公平锁来使用。下面对 ReentrantLock 的公平锁和非公平锁进行总结:
- 公平锁(Fair Lock):
- 特点:公平锁遵循"先到先得"的原则,即线程按照请求锁的顺序依次获取锁。当有多个线程竞争锁时,等待时间最长的线程会被优先获取锁。
- 实现:公平锁的实现需要维护一个等待队列,当线程请求锁时,如果锁已被其他线程占用,则该线程会进入等待队列,按照 FIFO 的顺序等待。
- 优点:公平锁能够避免饥饿现象,所有线程都有机会获得锁,公平性比较高。
- 缺点:由于涉及到频繁的线程切换和调度,可能会导致性能下降。
- 非公平锁(Non-Fair Lock):
- 特点:非公平锁是一种抢占式的锁策略,线程在请求锁时,不管是否有其他线程在等待,它都会直接尝试获取锁,不会考虑等待队列中的顺序。
- 实现:非公平锁的实现不维护等待队列,当线程请求锁时,直接尝试获取锁,如果锁被其他线程占用,则进入自旋等待或阻塞状态。
- 优点:非公平锁由于省去了等待队列的维护和线程调度的开销,通常比公平锁具有更高的吞吐量和更低的延迟。
- 缺点:非公平锁可能会导致某些线程长时间无法获取锁,产生线程饥饿现象,不具备公平性。
# 5.为什么非公平锁会造成线程饥饿?
首先说下公平锁.假设目前 AQS 的同步队列中有 A B C 三个线程.线程 A 排在最前边.当线程 A 获取锁的同时,线程 D 也要获取锁.此时 D 线程先会通过 tryAcquire 方法判断是否自己是同步队列的头结点,如果不是,则乖乖的去同步队列中等待.线程 A 处于无竞争状态下获取锁.因此说公平锁完全按照线程先后进行 FIFO 的获取锁。非公平锁不必唤醒所有线程,cpu 开销小.
ReentrantLock:lock()。
FairSync:lock()。
AbstractQueuedSynchronizer:acquire(int arg)。
ReentrantLock:tryAcquire(int acquires)
再说非公平锁.假设目前 AQS 的同步队列中有 A B C 三个线程.线程 A 排在最前边.当线程 A 获取锁的同时,线程 D 也要获取锁.此时 D 线程直接进行争夺,虽说 D 是后来的,但是作为非公平锁,会直接进行 cas 的竞争.如果竞争成功,线程 A 继续作为头结点,等待 D 线程释放锁,参与下一轮竞争.如果竞争失败,线程 D 也需要乖乖的到同步队列中排队。从这段话我们可以看到,如果线程 A 一直竞争不到锁,那么就会一直留在同步队列中等待,造成线程饥饿,没事儿可干。
使用非公平锁时,加锁方法 lock()调用轨迹如下。
- ReentrantLock:lock()。
- NonfairSync:lock()。
- AbstractQueuedSynchronizer:compareAndSetState(int expect,int update)。
# 6.ReentrantLock 使用和理解?
在 ReentrantLock 中,调用 lock()方法获取锁;调用 unlock()方法释放锁。
注意:lock()是在 try 的外面,为了防止获取锁失败,还去释放锁.
ReentrantLock 的实现依赖于 Java 同步器框架 AbstractQueuedSynchronizer(本文简称之之为 AQS)。AQS 使用一个整型的 volatile 变量(命名为 state)来维护同步状态,这个 volatile 变量是 ReentrantLock 内存语义实现的关键。
ReentrantLock 默认是非公平锁,如果需要公平锁,参数传入 true
# 7.什么是读写锁?
读写锁,读读不排它,读写排它,写写排它.。Java 并发包提供读写锁的实现是 ReentrantReadWriteLock,它提供的特性如下所示:
特性 | 说明 |
---|---|
公平性选择 | 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平 |
重进入 | 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁 |
锁降级 | 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁 |
ReentrantReadWriteLock 作为 ReadWriteLock 的子类.ReadWriteLock 仅定义了获取读锁和写锁的两个方法,即 readLock()方法和 writeLock()方法
public interface ReadWriteLock {
/**
* Returns the lock used for reading
*@return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing
*@return the lock used for writing
*/
Lock writeLock();
}
2
3
4
5
6
7
8
9
10
11
12
13
而其实现 ReentrantReadWriteLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,这些方法以及描述如下所示。
方法名称 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,例如,仅一个线程,它连续获取(重进入)了 n 次读锁,那么占据读锁的线程数是 1.但该方法返回 n |
int getReadHoldCount() | 返回当前线程获取读锁的次数。该方法在 Java6 中加入到 ReentrantReadWriteLock 中,使用 ThreadLocal 保存当前线程获取的的次数,这也使得 Java6 的实现变得更加复杂 |
boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
如下代码,Cache 组合一个非线程安全的 HashMap 作为缓存的实现,同时使用读写锁的读锁和写锁来保证 Cache 是线程安全的.在读操作 get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作 put(String key,Object value)方法和 clear()方法,在更新 HashMap 必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续.Cache 使用读写锁提升读操作的并发性,也保证每次操作对所有的读写操作的可见性,同时简化了编程方式。
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
//获取一个key对应的value
public static final Object get(String key){
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
//设置key对应的value,并返回旧的value
public static final Object put(String key, Object value){
w.lock();
try {
return map.put(key, value);
} finally {
r.unlock();
}
return null;
}
//清空所有的内容
public static final void clear(){
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 8.ReentrantReadWriteLock 原理?
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态 state.回想 ReentrantLock 中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高 16 位表示读,低 16 位表示写,划分方式如下图所示:
# 9.ReentrantReadWriteLock 写锁的获取与释放?
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,**读锁已经被获取(读状态不为 0)**或者当前线程不是已经获取写锁的线程(意思是非重入状态),则当前线程进入等待状态:
protected final boolean tryAcquire(int acquires){
Thread current = Thread.currentThread();
int c = getState();//读锁和写锁的总体状态
int w = exclusiveCount(c);//写锁状态
if (c != 0){
//(Note: if c != 0 and w == 0 then shared count != 0)
//存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires)> MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock()||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
从代码上看,如果 c!= 0,w=0。说明 r!=0。则 return false,当前线程进入同步队列等待。否则进行加锁,更改 state 的值,如果 c==0,这说明读写锁都没有。则进行判断是否写线程需要 block 或者进行更新同步状态失败。否则设置当前线程为 owner 线程,获取锁成功。写锁的释放与 ReentrantLock 的释放过程基本类似,每次释放均减少写状态,当写状态为 0 时表示写锁已被释放.
ReentrantReadWriteLock
是 Java 并发包提供的读写锁,用于实现读写分离的并发控制。在ReentrantReadWriteLock
中,有一个名为"c"的整型变量,用于表示读写状态。
# 10.读写锁 c 值的含义
c 的值及其含义如下:
- c = 0:
- 表示当前没有任何线程持有读锁或写锁。即没有线程正在读取或写入共享资源。
- c > 0:
- 表示当前有一个或多个线程持有读锁。c 的值等于持有读锁的线程数量。
- c = -1:
- 表示当前有一个线程持有写锁。读写锁中只能有一个线程持有写锁,因此 c 的值只会是 0 或-1。
读写锁允许多个线程同时获取读锁,只要没有线程持有写锁,而写锁是独占的,只能有一个线程获取写锁。读锁和写锁之间是互斥的,即当有线程持有写锁时,其他线程无法获取读锁或写锁,直到写锁被释放。
# 11.ReentrantReadWriteLock 读锁的获取与释放?
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为 0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。类似于修改文件的时候不能打开查看.
protected final int tryAcquireShared(int unused){
Thread current = Thread.currentThread();
int c = getState();//读写总状态
//判断是否有其他线程获取到写锁
if (exclusiveCount(c)!= 0 &&
getExclusiveOwnerThread()!= current)
return -1;
int r = sharedCount(c);//读状态
if (!readerShouldBlock()&&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)){
if (r == 0){
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current){
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;//读重入
}
return 1;
}
return fullTryAcquireShared(current);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
在 tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
# 12.ReentrantReadWriteLock 的锁降级问题?
锁降级指的是写锁降级成为读锁.如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级.锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。state 是包含读写锁的状态,保证安全性.
为什么先获取读锁才能释放写锁:
因为 state 是包含读写锁的状态,如果先释放写锁,则获取读锁的时候,写锁被其他线程获取,不能再次获取到读锁.而拥有写锁的时候可以同时获取读锁,所以需要先获取读锁才能释放写锁.原理上来说只能有一个线程持有写锁.
RentrantReadWriteLock 为什么不支持锁升级:
读锁升级为写锁。首先自己有读锁,之后拿到写锁。如果有两个读锁同时升级为写锁。那么只有一个能升级成功。但是这两个线程同时拥有读锁。其中一个线程还一直在申请写锁。其他线程获取到读锁时,不能获取写锁.这就会造成死锁。但是锁降级是可以的。因为写锁只有一个线程占有。
# 13.LockSupport 工具是什么?
当需要阻塞或唤醒一个线程的时候,都会使用 LockSupport 工具类来完成相应工作.LockSupport 定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而 LockSupport 也成为构建同步组件的基础工具.(对比使用 synchronized 方法的阻塞唤醒功能.).LockSupport 定义了一组以 park 开头的方法用来阻塞当前线程,以及 unpark(Thread thread)方法来唤醒一个被阻塞的线程。
方法名称 | 描述 |
---|---|
void park() | 阻塞当前线程,如果调用 unpar ik(Thread thread)方法或者当前线程被中断,才能从 park()方法返回 |
void parkNanos(long nanos) | 阻塞当前线程,最长不超过 na anos 纳秒,返回条件在 park(的基础上增加了超时返回 |
void parkUntil(long deadline) | 阻塞当前线程,直到 deadline 日时间(从 1970 年开始到 deadline 时间的毫秒数) |
void unpark(Thread thread) | 唤醒处于阻塞状态的线程 thread |
先唤醒线程,再阻塞线程,线程不会真的阻塞;但是先唤醒线程两次再阻塞两次时就会导致线程真的阻塞。
LockSupport 就是通过控制变量_counter
来对线程阻塞唤醒进行控制的。原理有点类似于信号量机制。
- 当调用
park()
方法时,会将_counter 置为 0,同时判断前值,小于 1 说明前面被unpark
过,则直接退出,否则将使该线程阻塞。 - 当调用
unpark()
方法时,会将_counter 置为 1,同时判断前值,小于 1 会进行线程唤醒,否则直接退出。 形象的理解,线程阻塞需要消耗凭证(permit),这个凭证最多只有 1 个。当调用 park 方法时,如果有凭证,则会直接消耗掉这个凭证然后正常退出;但是如果没有凭证,就必须阻塞等待凭证可用;而 unpark 则相反,它会增加一个凭证,但凭证最多只能有 1 个。 - 为什么可以先唤醒线程后阻塞线程? 因为 unpark 获得了一个凭证,之后调用 park 因为有凭证消费,故不会阻塞。
- 为什么唤醒两次后阻塞两次会阻塞线程。 因为凭证的数量最多为 1,连续调用两次 unpark 和调用一次 unpark 效果一样,只会增加一个凭证;而调用两次 park 却需要消费两个凭证。
# 14.parkNanos 中 blocker 含义
parkNanos(long nanos)和 parkNanos(Object blocker,long nanos)异同?
对比 parkNanos(long nanos)方法和 parkNanos(Object blocker,long nanos)方法在使用场景上有什么不同?
LockSupport 增加了 3 个方法
- park(Object blocker)
- parkNanos(Object blocker,long nanos)
- parkUntil(Object blocker,long deadline)
用于实现阻塞当前线程的功能,其中参数 blocker 是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。
从右图的线程 dump 结果可以看出,代码片段的内容都是阻塞当前线程 10 秒,但从线程 dump 结果可以看出,有阻塞对象的 parkNanos 方法能够传递给开发人员更多的现场信息.这是由于在 Java 5 之前,当线程阻塞(使用 synchronized 关键字)在一个对象上时,通过线程 dump 能够查看到该线程的阻塞对象,方便问题定位,而 Java 5 推出的 Lock 等并发工具时却遗漏了这一点,致使在线程 dump 时无法提供阻塞对象的信息.因此,在 Java 6 中,LockSupport 新增了上述 3 个含有阻塞对象的 park 方法,用以替代原有的 park 方法。
# 15.park unpark 原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter
, _cond
和 _mutex
打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter
就好比背包中的备用干粮(0 为耗尽,1 为充足)
调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
# 16.Synchronized 和 Lock 区别?
synchronized | Lock | |
---|---|---|
原始构成 | sync 是 JVM 层面的,底层通过monitorenter 和monitorexit 来实现的 | Lock 是 JDK API 层面的 |
使用方法 | sync 不需要手动释放锁 | Lock 需要手动释放 |
是否可中断 | sync 不可中断,除非抛出异常或者正常运行完成 | Lock 是可中断的,通过调用interrupt() 方法 |
是否为公平锁 | sync 只能是非公平锁 | Lock 既能是公平锁,又能是非公平锁 |
绑定多个条件 | sync 不能,只能随机唤醒 | Lock 可以通过Condition 来绑定多个条件,精确唤醒 |
synchronized 依赖于 JVM 而 ReenTrantLock 依赖于 API
Lock 接口提供的 synchronized 关键字不具备的主要特性
特性 | 描述 |
---|---|
尝试非阻塞地获取锁 | 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁 |
能被中断地获取锁 | 与 synchronized 不同,获取到锁的线程能能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放 |
超时获取锁 | 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回 |
Lock 的 API:
方法名称 | 描述 |
---|---|
void lock() | 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回 |
void lockInterruptibly()) throws InterruptedException | 可中断地获取锁,和 lock0 方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回 true,否则返回 false |
boolean tryLock(long time.TimeUnit unit)throws InterruptedException | 超时的获取锁,当前线程在以下 3 种情况下会返回: ① 当前线程在超时时间内获得了锁 ② 当前线程在超时时间内被中断 ③ 超时时间结束,返回 false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的 waitO 方法,而调用后,当前线程将释放锁 |
synchronized和lock区别:
- synchronized 是一个关键词,lock 是一个接口
- synchronized 是隐式调用,lock 是显示调用
- synchronized 可以作用到方法,lock 只能作用到代码块
- Lock 支持非阻塞式加锁,
- Lock 支持超时加锁
- Lock 支持可中断加锁
- synchronized 采用的 monitor 监视器,lock 采用的是 AQS
- synchronized 只有 2 个同步队列,一个等待队列,lock 可以有多个等待队列,一个同步队列
- synchronized 只支持非公平锁,lock 支持公平锁和非公平锁
- synchronized 与 wait 和 notify 和 notifyAll 配合使用,lock 和 condition 和 await 和 signal 和 signalAll 配合使用
- lock 是模板方法模式,可以自定义实现锁机制
- lock 有读写锁,可以支持同时读读
# 17.Lock 的 Condition 接口作用?
任意一个 Java 对象,都拥有一组监视器方法(定义在 java.lang.Object 上),主要包括 wait()、 wait(long timeout)、 notify()以及 notifyAll()方法,这些方法与 synchronized 同步关键字配合,可以实现等待/通知模式.Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待/通知模式。
ConditionObject 是 AQS 的一个内部类,Condition 操作需要获取同步状态.节点类型和 AQS 中的节点是一样的.
通过对比 Object 的监视器方法和 Condition 接口,可以更详细地了解 Condition 的特性
Object 的监视器方法与 Condition 接口的对比
对比项 | Obiect Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用 Locklock 获取锁 调用 LocknewCondition 获取 Condition 对象 |
调用方式 | 直接调用 如:object.wait() | 直接调用 如:condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进人等待状态 | 支持 | 支持 |
当前线程释放锁并进人等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition 对象关联的锁.Condition 对象是由 Lock 对象(调用 Lock 对象的 newCondition()方法)创建出来的,换句话说,Condition 是依赖 Lock 对象的。
public class Juc_15_keyword_Condition {
final Lock lock = new ReentrantLock();//定义锁对象
//通过锁对象获取Condition实例
final Condition notFull = lock.newCondition(); //用于控制put操作
final Condition notEmpty = lock.newCondition(); //用于控制take操作
final Object[] items = new Object[100];//缓冲队列,初始容量100
int putptr, takeptr, count;//分别记录put,take当前的索引,count用于记录当前item的个数
/**
* 往缓冲队列中添加数据
*/
public void put(Object x) throws InterruptedException {
//上锁,作用和synchronized一样,保证代码同一时刻只有一个线程可以操作,也保证了和take方法的互斥
lock.lock();
try {
while (count == items.length) {
notFull.await();//如果队列满了,则put线程等待被唤醒
}
items[putptr] = x; //队列未满,则添加数据
if (++putptr == items.length) putptr = 0;//添完后,如果记录读数据的索引到了最后一个位置,则重置为0
++count;//item总数自增
notEmpty.signal();//唤醒take线程取数据
} finally {
lock.unlock();//put操作完后,释放锁.
}
}
/**
* 从缓冲队列中取数据
*/
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();//如果队列空了,则take线程等待被唤醒
}
Object x = items[takeptr]; //队列未空,则取数据
if (++takeptr == items.length) takeptr = 0;//取完后,如果记录取数据的索引到了最后一个位置,则重置为0
--count;//item总数自减
notFull.signal();//唤醒put线程添加数据
return x;//返回取得的数据
} finally {
lock.unlock();//take操作完后,释放锁对象
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
当调用 await()方法后,当前线程会释放锁并在此等待,而其他线程调用 Condition 对象的 signal()方法,通知当前线程后,当前线程才从 await()方法返回,并且在返回前已经获取了锁。一个线程进入等待状态----释放锁----线程被唤醒----线程获取锁----等待状态结束。
Condition 相关的方法,Condition 其实就是一个队列,好理解
方法 | 描述 |
---|---|
void await() | 造成当前线程在接到信号或被中断之前一直处于等待状态。 |
boolean await(long time, TimeUnit unit) | 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 |
long awaitNanos(long nanosTimeout) | 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 |
void awaitUninterruptibly() | 造成当前线程在接到信号之前一直处于等待状态。 |
boolean awaitUntil(Date deadline) | 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。 |
void signal() | 唤醒一个等待线程。 |
void signalAll() | 唤醒所有等待线程。 |
# 18.Condition 的实现原理?
ConditionObject 是同步器 AbstractQueuedSynchronizer 的内部类,因为 Condition 的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理.每个 Condition 对象都包含着一个队列(以下称为等待队列),该队列是 Condition 对象实现等待/通知功能的关键。
如果一个线程调用了 Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter).当前线程调用 Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列
Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可.上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock (更确切地说是同步器)拥有一个同步队列和多个等待队列
如果从队列(同步队列和等待队列)的角度看 await()方法,当调用 await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中。
//AbstractQueuedLongSynchronizer中的内部类ConditionObject,实现了Condition接口
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
2
3
4
5
6
7
8
调用 Condition 的 signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中.调用该方法的前置条件是当前线程必须获取了锁,可以看到 signal()方法进行了 isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程.接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。
Condition 的 signalAll()方法,相当于对等待队列中的每个节点均执行一次 signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。唤醒代表着移动到同步队列中.
# 七.线程池
# 1.线程池的实现原理?
当提交一个新任务到线程池时,线程池的处理流程如下。
- 线程池判断核心线程池里的线程是否都在执行任务.如果不是,则创建一个新的工作线程来执行任务.如果核心线程池里的线程都在执行任务,则进入下个流程。
- 线程池判断工作队列是否已经满.如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
- 线程池判断线程池的线程是否都处于工作状态.如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
线程池创建线程时,会将线程封装成工作线程 Worker , Worker 在执行完任务后,还会循环获取工作队列里的任务来执行.我们可以从 Worker 类的 run()方法里看到这点。
ThreadPoolExecutor 执行 execute 方法分下面 4 种情况。
- 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁).
- 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
- 如果无法将任务加入 BlockingQueue (队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁).
- 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈).在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。
# 2.创建线程池的重要参数?
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//等待时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//等待队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) {} //拒绝策略
2
3
4
5
6
7
- 核心线程数
- 最大线程数
- 生存时间
- 时间单位
- 任务队列
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue.静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列.每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
- 线程工厂:可以自定义线程的名字,方便区分
- 拒绝策略
- AbortPolicy:直接抛出异常。是默认的策略.
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
细说参数:
核心线程数(Core Pool Size):
- 核心线程数是线程池中一直保持活动的线程数量,即使它们处于空闲状态。线程池会根据工作队列的任务数量自动调整活动线程的数量,但不会低于核心线程数。
最大线程数(Maximum Pool Size):
- 最大线程数是线程池中允许的最大线程数量。当工作队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。
任务队列(Work Queue):
- 任务队列用于保存等待执行的任务。当线程池的活动线程数达到核心线程数时,新的任务会被放入任务队列等待执行。任务队列可以是有界队列(如 ArrayBlockingQueue)或无界队列(如 LinkedBlockingQueue)。
线程存活时间(Keep Alive Time):
- 线程存活时间是当线程池中的线程数量
超过核心线程数时
,多余的空闲线程等待新任务的最长时间。如果超过这个时间仍然没有新任务到来,超过核心线程数的空闲线程将被终止。
- 线程存活时间是当线程池中的线程数量
TimeUnit (线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒).
拒绝策略(Rejected Execution Policy):
- 当线程池的任务队列已满且活动线程数已达到最大线程数时,新的任务将无法提交执行。拒绝策略定义了当线程池无法接受新任务时的处理方式,例如抛出异常、丢弃任务、或在调用者线程中直接执行任务。
# 3.谈谈 PriorityQueue 理解?
PriorityQueue
是优先级队列,通过自然排序或者用 java
的比较器实现自定义排序,无界队列,但是可以在创建时指定大小,不允许有空值,默认是最小堆,当排序相同时,随机返回一个,PriorityQueue
是非线程安全的,PriorityBlockingQueue
是线程安全的,用于多线程环境.PriorityBlockingQueue
实现原理是使用了可重入锁
private final ReentrantLock lock;
PriorityQueue
通过二叉小顶堆实现,任意一个非叶子节点的权值,都不大于其左右子节点的权值
大根堆也叫大顶堆
小根堆也叫小顶堆
top 问题时,求最小 k 个数用大根堆,因为大根堆根节点是最大的值,保存的都是小值
top 问题时,求最大 k 个数用小根堆,因为小根堆根节点是最小的值,保存的都是大值
方法 | 作用 | 失败处理方式 |
---|---|---|
add() | 插入元素 | 抛出异常 |
offer() | 插入元素 | 返回 false |
element() | 获取队首元素不删除 | 抛出异常 |
peek() | 获取队首元素不删除 | null |
remove() | 取出队首元素删除 | 抛出异常 |
poll() | 取出队首元素删除 | null |
经典方法源码:从k
指定的位置开始,将x
逐层与当前点的parent
进行比较并交换,直到满足x >= queue[parent]
为止
//siftUp()
private void siftUp(int k, E x){
while (k > 0){
int parent = (k -1)>>> 1;//parentNo = (nodeNo-1)/2
Object e = queue[parent];
if (comparator.compare(x,(E) e)>= 0)//调用比较器的比较方法
break;
queue[k]= e;
k = parent;
}
queue[k]= x;
}
2
3
4
5
6
7
8
9
10
11
12
该方法的作用是从k
指定的位置开始,将x
逐层向下与当前点的左右孩子中较小的那个交换,直到x
小于或等于左右孩子中的任何一个为止。
//siftDown()
private void siftDown(int k, E x){
int half = size >>> 1;
while (k < half){
//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标
int child = (k << 1)+1;//leftNo = parentNo*2+1
Object c = queue[child];
int right = child +1;
if (right < size &&
comparator.compare((E) c,(E) queue[right])> 0)
c = queue[child = right];
if (comparator.compare(x,(E) c)<= 0)
break;
queue[k]= c;//然后用c取代原来的值
k = child;
}
queue[k]= x;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 4.execute 和 submit 的区别?
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
}
2
3
4
5
6
7
8
9
execute
和submit
都是用于向线程池提交任务的方法,但它们在使用方式和返回结果上有一些区别:
- 使用方式:
execute
:execute
方法是Executor
接口中定义的方法,它用于提交不需要返回结果的任务。该方法只接受Runnable
类型的任务,即没有返回值的任务。submit
:submit
方法是ExecutorService
接口中定义的方法,它用于提交既可以有返回结果也可以没有返回结果的任务。submit
方法可以接受Runnable
和Callable
类型的任务。Callable
是一个带有泛型返回值的任务类型,通过它可以获得任务执行的结果。
- 返回结果:
execute
:execute
方法没有返回结果,因为它只用于提交没有返回值的任务,所以无法获得任务的执行结果。submit
:submit
方法可以获得任务执行的结果。当使用submit
提交Callable
任务时,会返回一个Future
对象,通过这个对象可以异步获取任务执行的结果。
- 异常处理:
execute
:execute
方法不会抛出任务执行时的异常,因为没有返回结果,所以任务执行的异常只能由任务本身处理。submit
:submit
方法可以通过Future
对象来处理任务执行时的异常。调用Future
对象的get()
方法获取任务的执行结果时,如果任务抛出异常,get()
方法会将异常封装在ExecutionException
中并抛出。
总结:
- 如果你只关心任务的执行,不需要获取返回结果,可以使用
execute
方法。 - 如果你需要获取任务的执行结果或处理任务执行的异常,可以使用
submit
方法,并将任务封装为Callable
类型。
# 5.shutDown 和 shutDownNow
shutdown()
和shutdownNow()
都是用于关闭线程池的方法,但它们有一些区别:
shutdown()
方法:shutdown()
方法是ExecutorService
接口中定义的方法。- 调用
shutdown()
方法后,线程池会拒绝接受新的任务提交,但会继续执行已经提交的任务和队列中的任务。 shutdown()
方法会平缓地关闭线程池,它会等待所有已提交的任务执行完成,并且不会中断正在执行的任务。
shutdownNow()
方法:shutdownNow()
方法也是ExecutorService
接口中定义的方法。- 调用
shutdownNow()
方法后,线程池会立即停止接受新的任务提交,并且尝试中断正在执行的任务。 shutdownNow()
方法会尝试停止所有任务的执行,包括已经提交但未执行的任务,它会返回一个 List 集合,包含那些未执行的任务。
总结:
shutdown()
方法是平缓关闭线程池的方式,它会等待所有任务执行完成后关闭。shutdownNow()
方法是立即关闭线程池的方式,它会尝试中断正在执行的任务,并返回未执行的任务列表。
# 6.线程池监控?
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量小于或等于 taskCount。
- largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。
# 7.Executor 框架的结构?
Executor 框架主要由 3 大部分组成如下。
任务:
包括被执行任务需要实现的接口- Runnable 接口,Runnable 不会有返回结果
- Callable 接口,Callable 有返回结果
任务的执行。
包括任务执行机制的核心接口 Executor,
以及继承自 Executor 的 ExecutorService 接口。
Executor 框架有两个关键类实现了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)
异步计算的结果。
- 包括接口 Future
- 实现 Future 接口的 FutureTask 类。
核心类和接口:
- Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。
- ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor 是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。 ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
- Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。
- Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 Scheduled-ThreadPoolExecutor 执行。
# 8.说说四种线程池?
主要通过各个线程池的特点和工作队列来进行说明.
ThreadPoolExecutor:
通常使用工厂类 Executors
来创建。 Executors
可以创建 3 种类型的 ThreadPoolExecutor
:
- SingleThreadExecutor
- 适用于保证顺序执行各个任务.
- FixedThreadPool
- 适用于需要限制当前线程数量的场景.比如负载比较重的服务器.
- CachedThreadPool
- 大小无界的线程池.适用于大量短期任务.或者负载比较轻的服务器.
FixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
核心线程数和最大线程数都是用户自己设置的 size
多余的线程会被立即终止,等待时间为 0
FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列
队列的容量为 Integer MAX_VALUE
线程池中的线程数不会超过核心线程数
最大线程数是个无效参数
保活时间是个无效参数
不会拒绝任务
SingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2
3
4
5
6
- 核心线程数和最大线程数都是默认的 1
- 多余的线程会被立即终止,等待时间为 0
- FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列
- 队列的容量为 Integer MAX_VALUE。
CachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
- 核心线程数为 0
- 最大线程数无界
- 保活时间是 60S,等待新任务的最长时间是 60s,超过 60s 将被终止
- CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。
- 如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。
- 极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。
ScheduledThreadPoolExecuton:
ScheduledThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的 ScheduledThreadPoolExecutor,如下。
ScheduledThreadPoolExecutor。包含若干个线程的 ScheduledThreadPoolExecutor
SingleThreadScheduledExecutor。只包含一个线程的 ScheduledThreadPoolExecutor。
ScheduledFutureTask 主要包含 3 个成员变量,如下。
long 型成员变量 time,表示这个任务将要被执行的具体时间。
long 型成员变量 sequenceNumber,表示这个任务被添加到 ScheduledThreadPoolExecutor 中的序号。
long 型成员变量 period,表示任务执行的间隔周期。
ScheduledThreadPoolExecutor:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2
3
4
- 使用 DelayedWorkQueue 作为队列
- DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 Scheduled-FutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
- 核心线程数为指定值
- 最大线程数为无界值
- 保活时间为 0
SingleThreadScheduledExecutor:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
2
3
4
5
6
7
8
9
10
- 核心线程数是 1
- 最大线程数无界值
- 保活时间为 0
- 使用 DelayedWorkQueue 队列
- 指定线程工厂,自定义线程名字前缀
# 9.不推荐 Executors 创建线程
推荐使用 ThreadPoolExecutor 方式创建线程,在阿里的 Java 开发手册时有一条是不推荐使用 Executors 去创建,而是推荐去使用 ThreadPoolExecutor 来创建线程池。 这样做的目的主要原因是:使用 Executors 创建线程池不会传入核心参数,而是采用的默认值,这样的话我们往往会忽略掉里面参数的含义,如果业务场景要求比较苛刻的话,存在资源耗尽的风险;另外采 ThreadPoolExecutor 的方式可以让我们更加清楚地了解线程池的运行规则,不管是面试还是对技术成长都有莫大的好处。
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 各个方法的弊端:
- newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool: 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
# 10.说一下 CTL?
ctl 变量是整个线程池的核心控制状态,它是一个 AtomicInteger 类型的原子对象,它记录了线程池中生效线程数和线程池的运行状态。
- workerCount,生效的线程数,基本上可以理解为存活的线程数。
- runState,线程池运行状态。
ctl 总共 32 位,其中低 29 位代表 workerCount,所以最大线程数为 2^29^-1。高 3 位代表 runState。
runState 有 5 个值:
- RUNNING: 对应的高 3 位值是 111。接收新任务处理队列任务。
- SHUTDOWN:对应的高 3 位值是 000。不接收新任务,但处理队列任务。
- STOP:对应的高 3 位值是 001。不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
- TIDYING: 对应的高 3 位值是 010。所有任务都被终结,有效线程为 0,并触发 terminated()方法。
- TERMINATED :对应的高 3 位值是 011。当 terminated()方法执行结束。
状态转换过程:
- 当调用了 shutdown(),状态会从 RUNNING 变成 SHUTDOWN,不再接收新任务,此时会处理完队列里面的任务。
- 如果调用的是 shutdownNow(),状态会直接变成 STOP。
- 当线程或者队列都是空的时候,状态就会变成 TIDYING。
- 当 terminated()执行完的时候,就会变成 TERMINATED。
# 11.线程是如何被回收的
ThreadPoolExecutor 回收工作线程,一条线程 getTask()返回 null,就会被回收。
分两种场景:
未调用 shutdown() :
RUNNING 状态下全部任务执行完成的场景,线程数量大于 corePoolSize,线程超时阻塞,超时唤醒后 CAS 减少工作线程数,如果 CAS 成功,返回 null,线程回收。否则进入下一次循环。当工作者线程数量小于等于 corePoolSize,就可以一直阻塞了。调用 shutdown():
,全部任务执行完成的场景,shutdown() 会向所有线程发出中断信号,这时有两种可能。
所有线程都在阻塞:
中断唤醒,进入循环,都符合第一个 if 判断条件,都返回 null,所有线程回收。
任务还没有完全执行完:
至少会有一条线程被回收。在 processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用 tryTerminate(),向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。
# 12.创建多少线程合适?
我们从线程的应用场景来分析,由于 IO 操作比 Cpu 计算耗时要久的多的,如果我们一段程序有 IO 操作和 Cpu 计算,我们可以调用 IO 密集型计算。程序中没有 IO 操作只有 Cpu 的话称为 Cpu 密集型程序。
Cpu密集型:
Cpu 的核数=线程数就行,一般我们会设置 Cpu 核数+ 1,防止由于其他因素导致阻塞。
IO密集型:
确定在 IO 密集型计算中创建多少线程合适是一个复杂的问题,因为它涉及到多个因素,例如计算机的硬件配置、任务的性质和操作系统的特性。IO 密集型任务通常涉及大量的输入/输出操作,例如读写文件、网络通信等,而不是 CPU 密集型任务,这些任务主要涉及大量的计算。
在 IO 密集型任务中,线程通常会在等待 IO 操作完成时被阻塞,而不是在 CPU 上执行计算。因此,创建过多的线程可能会导致线程切换开销增加,从而导致性能下降。同时,创建过少的线程可能导致 CPU 资源得不到充分利用,从而造成性能浪费。
一般来说,建议的线程数量取决于以下几个因素:
- CPU 核心数:通常建议创建与 CPU 核心数相当数量的线程,这可以充分利用 CPU 资源,并避免过多的线程切换开销。
- IO 操作的类型和数量:如果 IO 操作非常耗时并且较多,可以考虑创建稍多于 CPU 核心数的线程,以便在等待 IO 时可以切换到其他线程执行任务。
- 内存:每个线程都需要一定的内存资源,过多的线程可能导致内存占用过大,影响系统的稳定性和性能。
- 操作系统的调度策略:不同的操作系统在线程调度方面有不同的策略,这也会影响合适的线程数量。
一种常见的做法是,首先根据 CPU 核心数来确定线程池的大小,然后根据实际的性能测试进行调优。可以逐渐增加线程数量,并监测系统性能的变化,找到最佳的线程数量。
值得注意的是,如果任务中有长时间的阻塞 IO 操作,也可以考虑使用异步 IO 或者事件驱动的编程模型,以减少线程数量并提高系统的吞吐量。
综上所述,IO 密集型任务的合适线程数量没有固定的标准,需要根据具体情况进行评估和调优。在实际应用中,可以进行性能测试和监测,找到最佳的线程数量来提高系统性能。
# 13.自定义线程池
- 600l 是 600,其中 l 是 long 的简写
- 使用阻塞队列,并设置最大容量为 1_000_000
- 自定义拒绝策略进行补偿机制处理
private ExecutorService service = new ThreadPoolExecutor(5,
5,
6001,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1_000_000),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 补偿机制
}
});
2
3
4
5
6
7
8
9
10
11
12
拒绝策略有4种:
- AbortPolicy(默认):
ThreadPoolExecutor.AbortPolicy()
是默认的拒绝策略。当线程池无法接受新任务时,会抛出RejectedExecutionException
异常。 - DiscardPolicy:
ThreadPoolExecutor.DiscardPolicy()
是另一种简单的拒绝策略。当线程池无法接受新任务时,新任务会被丢弃,不会抛出异常。 - DiscardOldestPolicy:
ThreadPoolExecutor.DiscardOldestPolicy()
是一种稍微高级一点的策略。当线程池无法接受新任务时,会丢弃队列中最老的任务,然后尝试重新提交新任务。 - CallerRunsPolicy:
ThreadPoolExecutor.CallerRunsPolicy()
。当线程池无法接受新任务时,它会将任务交给调用线程来执行。