程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

ReentrantLock的底层原理

balukai 2025-01-03 14:15:48 文章精选 10 ℃

主要实现AQS队列和CAS无锁机制

1.CAS

定义:CAS是一种无锁算法。有3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

使用方式: 自旋操作,这个操作是一个原子的操作,被广泛的应用在java底层实现

do{

备份旧数据;

基于旧数据构造新数据;

}while(!CAS( 内存地址,备份的旧数据,新数据 ))

实现原理:在java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现

优点:

  1. CAS速度非常快,CAS是CPU指令级的操作,只有一步原子操作;
  2. CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了

缺点:

  1. 自旋消耗资源大
  2. CAS只能针对单个变量进行操作,不能用于多个变量来实现原子操作
  3. ABA问题

ABA问题的解决方法:通过引入版本号,更新了内存值就更新版本号,从而知道有没有线程操作过该值。

2.AQS队列

2.1 定义:AQS是一个用于构建锁和同步容器的框架

AQS使用一个FIFO的队列(也叫CLH队列,是CLH锁的一种变形),表示排队等待锁的线程。队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。

waitstatus的节点状态枚举值:

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • INITAL状态:值为0,代表初始化状态。

2.2 ReentrantLock的流程:

  1. ReentrantLock先通过CAS尝试获取锁,
    1. 如果此时锁已经被占用,该线程加入AQS队列并wait()
    2. 当前线程的锁被释放,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,此时:
      1. 非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占
      2. 公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。

(注:ReentrantLock默认是非公平锁,也可以指定为公平锁)

ReentrantLock的2个构造函数:

public ReentrantLock() {
    sync = new NonfairSync(); //默认,非公平
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync(); //根据参数创建
}

2.3 lock()和unlock()的实现:

lock()函数:

如果成功通过CAS修改了state,指定当前线程为该锁的独占线程,标志自己成功获取锁。

如果CAS失败的话,调用acquire();

 final void lock() { //非公平锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    final void lock() { //公平锁
            acquire(1);
    }

首先,调用tryAcquire(),会尝试再次通过CAS修改state为1,

如果失败而且发现锁是被当前线程占用的,就执行重入(state++);

如果锁是被其他线程占有,那么当前线程执行tryAcquire返回失败,并且执行addWaiter()进入等待队列,并挂起自己interrupt()。

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire()方法

检查state字段,

若为0,表示锁未被占用,那么尝试占用;

若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数

如果以上两点都没有成功,则获取锁失败,返回false。

 protected final boolean tryAcquire(int acquires) { //注意:这是公平的tryAcquire()
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //需要先判断自己是不是队头的节点
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

final boolean tryAcquire(int acquires) { //注意:这是非公平的tryAcquire()
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state变量值
    int c = getState();
    if (c == 0) { //没有线程占用锁
        //没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁
        if (compareAndSetState(0, acquires)) {
            //占用锁成功,设置独占线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { //当前线程已经占用该锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state值为新的重入次数
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;

addWaiter()方法

当前线程加入AQS双向链表队列。

写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。

/**
 * 将新节点和当前线程关联并且入队列
 * @param mode 独占/共享
 * @return 新节点
 */
private Node addWaiter(Node mode) {
    //初始化节点,设置关联线程和模式(独占 or 共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点引用
    Node pred = tail;
    // 尾节点不为空,说明队列已经初始化过
    if (pred != null) {
        node.prev = pred;
        // 设置新节点为尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq()的处理逻辑就相当于自旋加上CAS保证一定能写入队列。

acquireQueued()方法

写入队列后,需要挂起当前线程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

他是利用 LockSupportpark() 方法来挂起当前线程的,直到被唤醒。

unlock()函数

释放时候,state--,通过state==0判断锁是否完全被释放。

成功释放锁的话,唤起一个被挂起的线程

他是利用 LockSupport 的 unpark() 方法来唤醒队列头部节点之后的首个线程。

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	   //唤醒被挂起的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

总结:

1.每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;

2.通过大量CAS保证多个线程竞争锁的时候的并发安全;

3.可重入的功能是通过维护state变量来记录重入次数实现的。

4.公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;

5.非公平锁可以直接抢占,所以效率更高;

最近发表
标签列表