Back

Java面试之并发

什么是乐观锁和悲观锁?

1、悲观锁
Java在JDK1.5之前都是靠synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有共享变量的锁,都采用独占的方式来访问这些变量。独占锁其实就是一种悲观锁,所以可以说synchronized是悲观锁。

2、乐观锁
乐观锁(Optimstic Locking)其实是一种思想。相对悲观锁而言,乐观锁假定认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式地对数据的冲突与否进行检测,如果发现冲突了,则返回错误信息。

什么是AQS

AbstractQueuedSynchronizer,是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。

AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作 哨兵节点或这哑节点,它不与任何线程关联。其他节点与等待线程关联,每个节点维护一个等待状态waitStatus。

同步容器与并发容器

  • 同步容器
    1. 同步容器主要代表用Vector和Hashtable,及Collections.synchronizedXXX等
    2. 锁粒度为整体对象
    3. 迭代器是及时失败的,迭代过程修改会抛出ConcurrentModificationException
  • 并发容器
    1. 主要代表有ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentSkipListMap、ConcurrentSkipListSet
    2. 锁粒度是分散的、细粒度,即读写使用不同锁
    3. 迭代器具有弱一致性,能够迭代时修改。

synchronized 底层如何实现?什么是锁的升级、降级?

synchronized 代码块是由 monitorenter/monitorexit 指令实现的,Monitor 对象是同步的基本实现单元。

在 Java 6 之前,Monitor 的实现完全是依靠操作系统内部的互斥锁,因为需要进行用户态到内核态的切换,所以同步操作是一个无差别的重量级操作。

现代的(Oracle)JDK 中,JVM 对此进行了大刀阔斧地改进,提供了三种不同的 Monitor 实现, 也就是常说的三种不同的锁:偏斜锁(Biased Locking)、轻量级锁和重量级锁,大大改进了其性能。

所谓锁的升级、降级,就是 JVM 优化 synchronized 运行的机制, 当 JVM 检测到不同的竞争状况时,会自动切换到适合的锁实现,这种切换就是锁的升级、降级。

当没有竞争出现时,默认会使用偏斜锁。 JVM 会利用 CAS 操作(compare and swap),在对象头上的 Mark Word 部分设置线程 ID,以表示这个对象偏向于当前线程,所以并不涉及真正的互斥锁。 这样做的假设是基于在很多应用场景中,大部分对象生命周期中最多会被一个线程锁定,使用偏斜锁可以降低无竞争开销。

如果有另外的线程试图锁定某个已经被偏斜过的对象,JVM 就需要撤销(revoke)偏斜锁,并切换到轻量级锁实现。 轻量级锁依赖 CAS 操作 Mark Word 来试图获取锁,如果重试成功,就使用普通的轻量级锁;否则,进一步升级为重量级锁。

当JVM 进入安全点(SafePoint)的时候,会检查是否有闲置的 Monitor,然后试图进行降级。

偏斜锁并不适合所有应用场景,撤销操作(revoke)是比较重的行为, 只有当存在较多不会真正竞争的 synchronized 块儿时,才能体现出明显改善。 实践中对于偏斜锁的一直是有争议的,有人甚至认为,当你需要大量使用并发类库时,往往意味着你不需要偏斜锁。 还有一方面是,偏斜锁会延缓 JIT 预热的进程,所以很多性能测试中会显式地关闭偏斜锁,命令:-XX:-UseBiasedLocking

“自旋锁”是做什么的吗?它的使用场景是什么?

自旋锁:
竞争锁失败的线程,并不会真实的在操作系统层面挂起等待,而是JVM会让线程做几个空循环(基于预测在不久的将来就能获得), 在经过若干次循环后,如果可以获得锁,那么进入临界区,如果还不能获得锁,才会真实的将线程在操作系统层面进行挂起。

适用场景:
自旋锁可以减少线程的阻塞,这对于锁竞争不激烈,且占用锁时间非常短的代码块来说,有较大的性能提升, 因为自旋的消耗会小于线程阻塞挂起操作的消耗。 如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,就不适合使用自旋锁了, 因为自旋锁在获取锁前一直都是占用cpu做无用功,线程自旋的消耗大于线程阻塞挂起操作的消耗,造成cpu的浪费。

一个线程两次调用 start() 方法会出现什么情况?谈谈线程的生命周期和状态转移。

Java 的线程是不允许启动两次的,第二次调用必然会抛出 IllegalThreadStateException。
在第二次调用 start() 方法的时候,线程可能处于终止或者其他(非 NEW)状态,但是不论如何,都是不可以再次启动的。

线程的生命周期和状态转移 详见java笔记基础篇

线程是系统调度的最小单元,一个进程可以包含多个线程, 作为任务的真正运作者,有自己的栈(Stack)、寄存器(Register)、本地存储(ThreadLocal)等, 但是会和进程内其他线程共享文件描述符、虚拟地址空间等。

虚假唤醒

在多核处理器下,pthread_cond_signal可能会激活多于一个线程(阻塞在条件变量上的线程)。 结果就是,当一个线程调用pthread_cond_signal()后,多个调用pthread_cond_wait()或pthread_cond_timedwait()的线程返回。这种效应就称为“虚假唤醒”。

举个例子,我们现在有一个生产者-消费者队列和三个线程。

1) 1号线程从队列中获取了一个元素,此时队列变为空。

2) 2号线程也想从队列中获取一个元素,但此时队列为空,2号线程便只能进入阻塞(cond.wait()),等待队列非空。

3) 这时,3号线程将一个元素入队,并调用cond.notify()唤醒条件变量。

4) 处于等待状态的2号线程接收到3号线程的唤醒信号,便准备解除阻塞状态,执行接下来的任务(获取队列中的元素)。

5) 然而可能出现这样的情况:当2号线程准备获得队列的锁,去获取队列中的元素时,此时1号线程刚好执行完之前的元素操作,返回再去请求队列中的元素,1号线程便获得队列的锁,检查到队列非空,就获取到了3号线程刚刚入队的元素,然后释放队列锁。

6) 等到2号线程获得队列锁,判断发现队列仍为空,1号线程“偷走了”这个元素,所以对于2号线程而言,这次唤醒就是“虚假”的,它需要再次等待队列非空。

如果用if判断,多个等待线程在满足if条件时都会被唤醒(虚假的),但实际上条件并不满足,生产者生产出来的消费品已经被第一个线程消费了。 这就是我们使用while去做判断而不是使用if的原因:因为等待在条件变量上的线程被唤醒有可能不是因为条件满足而是由于虚假唤醒。 所以,我们需要对条件变量的状态进行不断检查直到其满足条件,不仅要在pthread_cond_wait前检查条件是否成立,在pthread_cond_wait之后也要检查。

while ( isCondition()) {
    waitForAConfition(...);
}

慎用ThreadLocal

慎用ThreadLocal,这是 Java 提供的一种保存线程私有信息的机制, 因为其在整个线程生命周期内有效,所以可以方便地在一个线程关联的不同业务模块之间传递信息,比如事务 ID、Cookie 等上下文相关信息。

它的实现结构,可以参考源码,数据存储于线程相关的 ThreadLocalMap,其内部条目是弱引用,如下面片段。

static class ThreadLocalMap {
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    // …
}

当 Key 为 null 时,该条目就变成“废弃条目”,相关“value”的回收,往往依赖于几个关键点,即 set、remove、rehash。

下面是 set 的示例:

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];; …) {
        //…
        if (k == null) {
            // 替换废弃条目
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 扫描并清理发现的废弃条目,并检查容量是否超限
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();// 清理废弃条目,如果仍然超限,则扩容(加倍)
}

通常弱引用都会和引用队列配合清理机制使用,但是 ThreadLocal 是个例外,它并没有这么做。

这意味着,废弃项目的回收依赖于显式地触发,否则就要等待线程结束,进而回收相应ThreadLocalMap! 这就是很多 OOM 的来源,所以通常都会建议,应用一定要自己负责remove,并且不要和线程池配合,因为 worker 线程往往是不会退出的。

JVM 启动 Hello World的线程分析

检测获得 Thread[Reference Handler,10,system]
Thread[Finalizer,8,system]
Thread[main,5,main]
Thread[Signal Dispatcher,9,system]
Hello World!

其中:
Reference Handler:处理引用对象本身的垃圾回收
Finalizer:处理用户的Finalizer方法
Signal Dispatcher:外部jvm命令的转发器
在jdk6环境中,还有一个Attach Listener的线程是负责接收外部命令的,如jmap、jstack

什么情况下 Java 程序会产生死锁?如何定位、修复?

死锁是一种特定的程序状态,在实体之间,由于循环依赖导致彼此一直处于等待之中,没有任何个体可以继续前进。 死锁不仅仅是在线程之间会发生,存在资源独占的进程之间同样也可能出现死锁。 通常来说,我们大多是聚焦在多线程场景中的死锁,指两个或多个线程之间,由于互相持有对方需要的锁,而永久处于阻塞的状态。

定位死锁最常见的方式就是利用 jstack 等工具获取线程栈,然后定位互相之间的依赖关系,进而找到死锁。 如果是比较明显的死锁,往往 jstack 等就能直接定位,类似 JConsole 甚至可以在图形界面进行有限的死锁检测。

如果程序运行时发生了死锁,绝大多数情况下都是无法在线解决的,只能重启、修正程序本身问题。 所以,代码开发阶段互相审查,或者利用工具进行预防性排查,往往也是很重要的。

基本上死锁的发生是因为:

  • 互斥条件,类似 Java 中 Monitor 都是独占的,要么是我用,要么是你用。
  • 互斥条件是长期持有的,在使用结束之前,自己不会释放,也不能被其他线程抢占。
  • 循环依赖关系,两个或者多个个体之间出现了锁的链条环。

如果可能的话,尽量避免使用多个锁,并且只有需要时才持有锁;
如果必须使用多个锁,尽量设计好锁的获取顺序(根据对象之间组合、调用的关系对比和组合,考虑可能调用时序);
使用带超时的方法,为程序带来更多可控性;
业界也有一些其他方面的尝试,比如通过静态代码分析(如 FindBugs)去查找固定的模式,进而定位可能的死锁或者竞争情况

死循环死锁可以认为是自旋锁死锁的一种,其他线程因为等待不到具体的信号提示。导致线程一直饥饿。
这种情况下可以查看线程 cpu 使用情况,排查出使用 cpu 时间片最高的线程,再打出该线程的堆栈信息,排查代码。
比如Linux上,可以使用top命令配合grep Java之类,找到忙的pid;然后,转换成16进制,就是jstack输出中的格式;再定位代码

Java 并发包提供了哪些并发工具类?

我们通常所说的并发包也就是 java.util.concurrent 及其子包,集中了 Java 并发的各种基础工具类,具体主要包括几个方面:

  • 提供了比 synchronized 更加高级的各种同步结构,包括 CountDownLatch、CyclicBarrier、Sempahore 等,可以实现更加丰富的多线程操作,比如利用 Semaphore 作为资源控制器,限制同时进行工作的线程数量。
  • 各种线程安全的容器,比如最常见的 ConcurrentHashMap、有序的ConcunrrentSkipListMap,或者通过类似快照机制,实现线程安全的动态数组CopyOnWriteArrayList 等。
  • 各种并发队列实现,如各种 BlockedQueue 实现,比较典型的 ArrayBlockingQueue、SynchorousQueue 或针对特定场景的 PriorityBlockingQueue 等。
  • 强大的 Executor 框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况下,不再需要自己从头实现线程池和任务调度器。

CountDownLatch,允许一个或多个线程等待某些操作完成。
CyclicBarrier,一种辅助性的同步结构,允许多个线程等待到达某个屏障。
Semaphore,Java 版本的信号量实现。

两个应用并发工具的场景:

  1. 请求熔断器,使用 Semaphore 熔断某些请求线程,待系统恢复以后再逐步释放信号量。
  2. Worker 搜索停止标志。使用 countdownlatch 标记 Worker 找到的结果个数,达到结果后其他线程不再继续执行。

Java 并发类库还提供了Phaser,功能与 CountDownLatch 很接近,但是它允许线程动态地注册到 Phaser 上面,而 CountDownLatch 显然是不能动态设置的。 Phaser 的设计初衷是,实现多个线程类似步骤、阶段场景的协调,线程注册等待屏障条件触发,进而协调彼此间行动。

并发包里提供的线程安全 Map、List 和 Set,总体上种类和结构还是比较简单的。
如果我们的应用侧重于 Map 放入或者获取的速度,而不在乎顺序,大多推荐使用 ConcurrentHashMap, 反之则使用ConcurrentSkipListMap;如果我们需要对大量数据进行非常频繁地修改,ConcurrentSkipListMap 也可能表现出优势。

关于两个 CopyOnWrite 容器,其实 CopyOnWriteArraySet 是通过包装了CopyOnWriteArrayList 来实现的 首先,CopyOnWrite 到底是什么意思呢?它的原理是,任何修改操作,如 add、set、remove,都会拷贝原数组,修改后替换原来的数组,通过这种防御性的方式,实现另类的线程安全。

public boolean add (E e){
    synchronized (lock) {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        // 替换
        setArray(newElements);
        return true;
    }
}
final void setArray (Object[]a){
    array = a;
}

并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别?

  • Concurrent 类型基于 lock-free,在常见的多线程访问场景,一般可以提供较高吞吐量。
  • LinkedBlockingQueue 内部则是基于锁,并提供了 BlockingQueue 的等待性方法。

java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent、CopyOnWrite和 Blocking* 等三类,同样是线程安全容,可以简单认为:

  • Concurrent 类型没有类似 CopyOnWrite 之类容器相对较重的修改开销。
  • Concurrent 往往提供了较低的遍历一致性(弱一致性)。例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。size 等操作准确性是有限的。读取的性能具有一定的不确定性。
  • 与弱一致性对应的,就是我介绍过的同步容器常见的行为“fast-fail”,也就是检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。

哪些队列是有界的,哪些是无界的?

我们可以从不同的角度进行分类,从基本的数据结构的角度分析,有两个特别的Deque实现,ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,如:

  • 尾部插入时需要的addLast(e)、offerLast(e)。
  • 尾部删除所需要的removeLast()、pollLast()。

从行为特征来看,绝大部分 Queue 都是实现了 BlockingQueue 接口。 在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。

/**
* 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
…
*/
E take() throws InterruptedException;
/**
* 插入元素,如果队列已满,则等待直到队列出现空闲空间
…
*/
void put(E e) throws InterruptedException;

另一个 BlockingQueue 经常被考察的点,就是是否有界(Bounded、Unbounded),这一点也往往会影响我们在应用开发中的选择。

  • ArrayBlockingQueue 是最典型的的有界队列,其内部以 final 的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时,都要指定容量,如

    public ArrayBlockingQueue(int capacity, boolean fair)
    
  • LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为Integer.MAX_VALUE,成为了无界队列。

  • SynchronousQueue,这是一个非常奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是 1 吗?其实不是的,其内部容量是 0。

  • PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。

  • DelayedQueue 和 LinkedTransferQueue 同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是 put 操作永远也不会发生其他 BlockingQueue 的那种等待情况。

分析不同队列的底层实现,BlockingQueue 基本都是基于锁实现,一起来看看典型的LinkedBlockingQueue。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue,其条件变量与 LinkedBlockingQueue 版本的实现是有区别的,notEmpty、notFull都是同一个再入锁的条件变量, 而 LinkedBlockingQueue 则改进了锁操作的粒度,头、尾操作使用不同的锁,所以在通用场景下,它的吞吐量相对要更好一些。

LinkedBlockingQueue 的 take 方法与 ArrayBlockingQueue 中的实现,也是有不同的,由于其内部结构是链表,需要自己维护元素数量值。

类似 ConcurrentLinkedQueue 等,则是基于 CAS 的无锁技术,不需要在每个操作时使用锁,所以扩展性表现要更加优异。

相对比较另类的 SynchronousQueue,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。它是 Executors.newCachedThreadPool() 的 默认队列。

队列使用场景与典型用例

在实际开发中,我提到过 Queue 被广泛使用在生产者 - 消费者场景,比如利用BlockingQueue 来实现,由于其提供的等待机制,我们可以少操心很多协调工作。

在日常的应用开发中,如何进行选择呢?
以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 为例

  • 考虑应用场景中对队列边界的要求。ArrayBlockingQueue 是有明确的容量限制的,而LinkedBlockingQueue 则取决于我们是否在创建时指定,SynchronousQueue 则干脆不能缓存任何元素。
  • 从空间利用角度,数组结构的 ArrayBlockingQueue 要比 LinkedBlockingQueue 紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存需求更大。
  • 通用场景中,LinkedBlockingQueue 的吞吐量一般优于 ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。
  • ArrayBlockingQueue 实现比较简单,性能更好预测,属于表现稳定的“选手”。
  • 如果我们需要实现的是两个线程之间接力性(handoff)的场景,可选择 CountDownLatch,但是SynchronousQueue也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范。
  • 可能令人意外的是,很多时候 SynchronousQueue 的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景。

Java 并发类库提供的线程池有哪几种? 分别有什么特点?

通常开发者都是利用 Executors 提供的通用线程池创建方法,去创建不同配置的线程池,主要区别在于不同的 ExecutorService 类型或者不同的初始参数。

Executors 目前提供了 5 种不同的线程池创建配置:

  • newCachedThreadPool(),它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。其内部使用 SynchronousQueue 作为工作队列。
  • newFixedThreadPool(int nThreads),重用指定数目(nThreads)的线程,其背后使用的是无界的工作队列,任何时候最多有 nThreads 个工作线程是活动的。这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。
  • newSingleThreadExecutor(),它创建的是个 ScheduledExecutorService,也就是可以进行定时或周期性的工作调度。工作线程数目被限制为 1,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。
  • newScheduledThreadPool(int corePoolSize),同样是 ScheduledExecutorService,区别在于它会保持 corePoolSize 个工作线程。
  • newWorkStealingPool(int parallelism),这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。

  • Executor 是一个基础的接口,其初衷是将任务提交和任务执行细节解耦,这一点可以体会其定义的唯一方法。

    void execute(Runnable command);
    
  • ExecutorService 则更加完善,不仅提供 service 的管理功能,比如 shutdown 等方法,也提供了更加全面的提交任务机制,如返回Future而不是 void 的 submit 方法。

    <T> Future<T> submit(Callable<T> task);
    

    注意,这个例子输入的可是Callable,它解决了 Runnable 无法返回结果的困扰。

  • Java 标准类库提供了几种基础实现,比如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。这些线程池的设计特点在于其高度的可调节性和灵活性,以尽量满足复杂多变的实际应用场景。

  • Executors 则从简化使用的角度,为我们提供了各种方便的静态工厂方法

ThreadPoolExecutor

  • 工作队列负责存储用户提交的各个任务,这个工作队列,可以是容量为 0 的 SynchronousQueue(使用 newCachedThreadPool),也可以是像固定大小线程池(newFixedThreadPool)那样使用 LinkedBlockingQueue
    private final BlockingQueue<Runnable> workQueue;
    
  • 内部的“线程池”,这是指保持工作线程的集合,线程池需要在运行过程中管理线程创建、销毁。例如,对于带缓存的线程池,当任务压力较大时,线程池会创建新的工作线程;当业务压力退去,线程池会在闲置一段时间(默认 60 秒)后结束线程。
    private final HashSet<Worker> workers = new HashSet<>();
    
  • ThreadFactory 提供上面所需要的创建线程逻辑。
  • 如果任务提交时被拒绝,比如线程池已经处于 SHUTDOWN 状态,需要为其提供处理逻辑,Java 标准库提供了类似ThreadPoolExecutor.AbortPolicy等默认实现,也可以按照实际需求自定义。

线程池的几个基本组成部分,一起都体现在线程池的构造函数中:

  • corePoolSize,所谓的核心线程数,可以大致理解为长期驻留的线程数目(除非设置了allowCoreThreadTimeOut)。对于不同的线程池,这个值可能会有很大区别,比如newFixedThreadPool 会将其设置为 nThreads,而对于 newCachedThreadPool 则是为0。
  • maximumPoolSize,顾名思义,就是线程不够时能够创建的最大线程数。同样进行对比,对于 newFixedThreadPool,当然就是 nThreads,因为其要求是固定大小,而newCachedThreadPool 则是 Integer.MAX_VALUE。
  • keepAliveTime 和 TimeUnit,这两个参数指定了额外的线程能够闲置多久,显然有些线程池不需要它。
  • workQueue,工作队列,必须是 BlockingQueue。

通过配置不同的参数,我们就可以创建出行为大相径庭的线程池,这就是线程池高度灵活性的基础。

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler)

线程池实践

  • 避免任务堆积。newFixedThreadPool 是创建指定数目的线程,但是其工作队列是无界的,如果工作线程数目太少,导致处理跟不上入队的速度,这就很有可能占用大量统内存,甚至是出现 OOM。诊断时,你可以使用 jmap 之类的工具,查看是否有大量的任务对象入队。
  • 避免过度扩展线程。我们通常在处理大量短时任务时,使用缓存的线程池,比如在最新的HTTP/2 client API 中,目前的默认实现就是如此。我们在创建线程池的时候,并不能准确预计任务压力有多大、数据特征是什么样子(大部分请求是 1K 、100K 还是 1M 以上?),所以很难明确设定一个线程数目。
  • 如果线程数目不断增长(可以使用 jstack 等工具检查),也需要警惕另外一种可能性,就是线程泄漏,这种情况往往是因为任务逻辑有问题,导致工作线程迟迟不能被释放。建议你排查下线程栈,很有可能多个线程都是卡在近似的代码处。
  • 避免死锁等同步问题,对于死锁的场景和排查
  • 尽量避免在使用线程池时操作 ThreadLocal。

线程池大小的选择策略

  • 如果我们的任务主要是进行计算,那么就意味着 CPU 的处理能力是稀缺的资源,我们能够通过大量增加线程数提高计算能力吗?往往是不能的,如果线程太多,反倒可能导致大量的上下文切换开销。所以,这种情况下,通常建议按照 CPU 核的数目 N 或者 N+1。
  • 如果是需要较多等待的任务,例如 I/O 操作比较多,可以参考 Brain Goetz 推荐的计算方法:线程数 = CPU 核数 × (1 + 平均等待时间 / 平均工作时间)
  • 上面是仅仅考虑了 CPU 等限制,实际还可能受各种系统资源(文件句柄、内存)限制影响

AtomicInteger 底层实现原理是什么?如何在自己的代码中应用CAS 操作?

AtomicIntger 是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性操作的实现是基于 CAS(compare-and-swap)技术

CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS 指令试图进行更新。 如果当前数值未变,代表没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就返回一个成功或者失败的结果。

从 AtomicInteger 的内部属性可以看出,它依赖于 Unsafe 提供的一些底层能力,进行底层操作;以 volatile 的 value 字段,记录数值,以保证可见性。

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;

具体的原子操作细节,可以参考任意一个原子更新方法,比如下面的 getAndIncrement。Unsafe 会利用 value 字段的内存地址偏移,直接完成操作。

public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

因为 getAndIncrement 需要返归数值,所以需要添加失败重试逻辑。

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

而类似 compareAndSet 这种返回 boolean 类型的函数,因为其返回值表现的就是成功与否,所以不需要重试。

public final boolean compareAndSet(int expectedValue, int newValue)

CAS 是 Java 并发中所谓 lock-free 机制的基础。

Java 9 中移除了 Unsafe.moniterEnter()/moniterExit(),导致无法平滑升级到新的 JDK 版本。 目前 Java 提供了两种公共 API,可以实现这种 CAS 操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射机制创建,我们需要保证类型和字段名称正确。

Atomic 包提供了最常用的原子性数据类型,甚至是引用、数组等相关原子类型和更新操作工具,是很多线程安全程序的首选。
atomic 包提供的LongAdder,在高度竞争环境下,可能就是比 AtomicLong 更佳的选择,尽管它的本质是空间换时间。

comments powered by Disqus