锁
volatile底层实现:
volatile可以保证数据的可见性,被volatile修饰的变量能够保证每个线程能够获取该变量的最新值,从而避免出现数据脏读的现象;同时禁止指令重排。
加入volatile关键字时,汇编代码中会多出一个lock前缀指令,这个指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能:
- 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;
- 它会强制将对缓存的修改操作立即写入主存;
- 如果是写操作,它会导致其他CPU中对应的缓存行无效。
AQS:
AQS,全称AbstractQueuedSynchronizer(抽象队列同步器),是Java并发包(java.util.concurrent)中用于构建锁和其他同步组件的基础框架。它提供了一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)。AQS支持独占和共享两种模式
设计原理:
- 状态管理:AQS使用一个
int
类型的状态变量表示当前同步状态。这个状态可以通过getState()
、setState(int newState)
和compareAndSetState(int expect, int update)
方法进行查询和更新。 - FIFO队列:当一个线程尝试获取锁但失败时(即状态不符合获取条件),该线程会被封装成一个节点(Node),并被加入到同步队列的尾部。队列遵循FIFO原则,即先入队的线程优先获得锁。
- 独占与共享模式:AQS支持两种模式来适应不同的同步需求。在独占模式下,每次只有一个线程能够成功获取锁;而在共享模式下,多个线程可能同时成功获取锁。
- 自旋与阻塞:为了提高性能,AQS在某些情况下会让线程自旋(忙等)一段时间而不是立即进入阻塞状态,这取决于具体实现和系统配置。
应用:
- ReentrantLock:一个可重入的互斥锁,提供了与使用
synchronized
关键字类似的同步语义,并扩展了诸如公平性选择、非块结构锁定操作等功能。 - Semaphore(信号量):控制对某组资源的访问权限,限制同时访问某一资源的最大线程数。
- CountDownLatch:允许一个或多个线程一直等待,直到其他线程执行的一组操作完成为止。
- CyclicBarrier:允许一组线程互相等待,直到到达某个公共屏障点(所有线程都到达此点后才能继续执行)。
- ReadWriteLock:允许多个读操作并发执行,但在写操作发生时不允许任何读操作。【多用于缓存系统、配置管理、文档编辑等】
锁优化:
使用层面:
- 减少锁的时间:不需要同步执行的代码,尽量放到同步块外边(循环的话,可能需要锁粗化,防止频繁加解锁)。
- 减少锁的粒度:物理上的一个锁,拆分为逻辑上的多个锁,增加并行度,以空间换时间,例如java中的ConcurrentHashMap(使用多个segment分别进行加解锁)。
- 使用读写锁:使用ReentranReadWriteLock,读加读锁,写加写锁。
- 使用CAS:线程竞争不激烈,且执行快,可以考虑valatiled+cas。
系统层面:
- 自适应自旋锁:自动自旋,自旋时间可以有上一个在同一个锁上的自旋时间和锁的持有者状态决定。
- 锁消除:如果编译器发现不可能存在共享数据的竞争,会进行锁消除【Netty中无锁设计的pipeline中的chnnelhandler会进行锁消除的优化】
- 锁升级:偏向锁->轻量级锁->重量级锁
ReentrantLock和synchronized:
synchronized:
synchronized基于监视器锁(Monitor Lock)实现,底层依赖于操作系统的互斥锁(Mutex)。每个对象在Java中都与一个隐式的监视器相关联,使用synchronized时,线程需要获得对象的监视器锁,然后才能进入同步代码块。synchronized申请锁、上锁、释放锁都与对象头有关。对象头主要结构是由Mark Word 组成,其中Mark Word存储对象的hashCode、锁信息或分代年龄或GC标志等信息。同步代码块是利用 monitorenter 和 monitorexit 指令实现的,而同步方法则是利用 flags 实现的。
使用:
- 修饰实例⽅法: 作⽤于当前对象实例加锁,进⼊同步代码前要获得当前对象实例的锁
- 修饰静态⽅法: 也就是给当前类加锁,会作⽤于类的所有对象实例,因为静态成员不属于任何⼀个实例对象,是类成员。
- 修饰代码块: 指定加锁对象,对给定对象加锁,进⼊同步代码库前要获得给定对象的锁。
偏向锁、轻量级锁、重量级锁:
- 偏向锁:在锁对象的对象头中记录一下当前获取到该锁的线程ID,该线程下次如果又来获取该锁就可以直接获取到了。一旦有另一个线程尝试获取这个锁,偏向锁将被撤销,并可能升级为轻量级锁或者直接升级为重量级锁,取决于竞争的程度。
- 轻量级锁:轻量级锁采用CAS操作来避免使用操作系统层面的互斥量,当一个线程尝试获取锁时,JVM会在当前线程的栈帧中创建锁记录(Lock Record),并将对象头中的Mark Word复制到该锁记录中。然后尝试通过CAS操作将对象头的Mark Word替换为指向该锁记录的指针。如果成功,则表示获得了锁;否则,表示存在竞争。多个线程同时竞争同一个锁时,可能会出现多次CAS失败的情况。在这种情况下,JVM通常会让未获得锁的线程短暂等待(即自旋)。
- 重量级锁:重量级锁基于操作系统的互斥量实现,涉及到用户态和内核态的转换,开销较大。当一个线程获取重量级锁时,其他试图获取该锁的线程会被阻塞,直到持有锁的线程释放锁为止。这种方式虽然保证了线程安全,但代价较高。
ReentrantLock:
ReentrantLock是java.util.concurrent包下提供的一套互斥锁,需要lock()和unlock()方法配合try/finally语句块来完成。ReenTrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁,它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。
区别:
- synchronized是Java语言的内置特性,是一个关键字,ReentrantLock是一个类。
- sychronized会自动的加锁与释放锁,ReentrantLock需要程序员手动加锁与释放锁。
- sychronized的底层是JVM层面的锁,ReentrantLock是API层面的锁。
- sychronized是非公平锁,ReentrantLock可以选择公平锁或非公平锁。
- sychronized锁的是对象,锁信息保存在对象头中,ReentrantLock通过代码中int类型的state标识来标识锁的状态。
- sychronized底层有一个锁升级的过程。
- 在较低竞争时,synchronized会自动使用优化,比如锁消除和锁粗化,使得它的性能在某些情况下可能高于ReentrantLock。高竞争时,ReentrantLock可能更好,因为提供了更灵活的线程控制。
- ReentrantLock具有与之关联的Condition对象,可以搭配lock来更细粒度的控制线程通信;synchronized配合Object的wait()和notify()/notifyAll()来进行线程之间的通信,但不如Condition灵活。
CountDownLatch和Semaphore:
CountDownLatch表示计数器,可以给CountDownLatch设置一个数字,一个线程调用CountDownLatch的await()将会阻塞,其他线程可以调用CountDownLatch的countDown()方法来对CountDownLatch中的数字减一,当数字被减成0后,所有await的线程都将被唤醒。
对应的底层原理就是,调用await()方法的线程会利用AQS排队,一旦数字被减为0,则会将AQS中排队的线程依次唤醒。
Semaphore表示信号量,可以设置许可的个数,表示同时允许最多多少个线程使用该信号量,通过acquire()来获取许可,如果没有许可可用则线程阻塞,并通过AQS来排队,可以通过release()方法来释放许可,当某个线程释放了某个许可后,会从AQS中正在排队的第一个线程开始依次唤醒,直到没有空闲许可。
伪共享:
在现代处理器中,缓存行(Cache Line)是缓存的最小可分配单位,通常是64字节,当多个线程在不同CPU核心上操作缓存行中的不同变量时,如果这些变量位于同一个缓存行,修改其中一个变量会导致整个缓存行被标记为无效(cpu缓存一致性决定)。这种重复的缓存行无效化和重新加载的现象就是伪共享。
public class FalseSharingExample implements Runnable {
// 定义线程数
public static int NUM_THREADS = 4;
public static final long ITERATIONS = 5000000000L; // 每个线程的迭代次数
private final int arrayIndex; // 此线程操作的共享数据索引
private static ValuePadding[] values; // 用于存储共享数据
public FalseSharingExample(int arrayIndex) {
this.arrayIndex = arrayIndex;
}
public static void main(final String[] args) throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
// 初始化共享变量数组
values = new ValuePadding[NUM_THREADS];
for (int i = 0; i < values.length; i++) {
values[i] = new ValuePadding();
}
final long start = System.nanoTime();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new FalseSharingExample(i));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Duration = " + (System.nanoTime() - start));
}
@Override
public void run() {
long i = ITERATIONS + 1;
while (0 != --i) {
// 更新当前线程对应的共享变量
values[arrayIndex].value = i;
}
}
// 内部类,带有缓存行填充以避免伪共享
public static class ValuePadding {
protected long p1, p2, p3, p4, p5, p6, p7; // 填充字段
public volatile long value = 0L; // 实际操作的变量
protected long p9, p10, p11, p12, p13, p14, p15; // 填充字段
}
// @Contended注释的使用演示,如果使用该注解,需要加启动参数启用 -XX:-RestrictContended
/*
public static class ValueWithContended {
@Contended
public volatile long value = 0L;
}
*/
}