网站首页 > 文章精选 正文
主要实现AQS队列和CAS无锁机制
1.CAS
定义:CAS是一种无锁算法。有3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
使用方式: 自旋操作,这个操作是一个原子的操作,被广泛的应用在java底层实现
do{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))
实现原理:在java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现
优点:
- CAS速度非常快,CAS是CPU指令级的操作,只有一步原子操作;
- CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了
缺点:
- 自旋消耗资源大
- CAS只能针对单个变量进行操作,不能用于多个变量来实现原子操作
- ABA问题
ABA问题的解决方法:通过引入版本号,更新了内存值就更新版本号,从而知道有没有线程操作过该值。
2.AQS队列
2.1 定义:AQS是一个用于构建锁和同步容器的框架
AQS使用一个FIFO的队列(也叫CLH队列,是CLH锁的一种变形),表示排队等待锁的线程。队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。
waitstatus的节点状态枚举值:
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- INITAL状态:值为0,代表初始化状态。
2.2 ReentrantLock的流程:
- ReentrantLock先通过CAS尝试获取锁,
- 如果此时锁已经被占用,该线程加入AQS队列并wait()
- 当前线程的锁被释放,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,此时:
- 非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占。
- 公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。
(注:ReentrantLock默认是非公平锁,也可以指定为公平锁)
ReentrantLock的2个构造函数:
public ReentrantLock() {
sync = new NonfairSync(); //默认,非公平
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); //根据参数创建
}
2.3 lock()和unlock()的实现:
lock()函数:
如果成功通过CAS修改了state,指定当前线程为该锁的独占线程,标志自己成功获取锁。
如果CAS失败的话,调用acquire();
final void lock() { //非公平锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() { //公平锁
acquire(1);
}
首先,调用tryAcquire(),会尝试再次通过CAS修改state为1,
如果失败而且发现锁是被当前线程占用的,就执行重入(state++);
如果锁是被其他线程占有,那么当前线程执行tryAcquire返回失败,并且执行addWaiter()进入等待队列,并挂起自己interrupt()。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()方法
检查state字段,
若为0,表示锁未被占用,那么尝试占用;
若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。
如果以上两点都没有成功,则获取锁失败,返回false。
protected final boolean tryAcquire(int acquires) { //注意:这是公平的tryAcquire()
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//需要先判断自己是不是队头的节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
final boolean tryAcquire(int acquires) { //注意:这是非公平的tryAcquire()
//获取当前线程
final Thread current = Thread.currentThread();
//获取state变量值
int c = getState();
if (c == 0) { //没有线程占用锁
//没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁
if (compareAndSetState(0, acquires)) {
//占用锁成功,设置独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { //当前线程已经占用该锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新state值为新的重入次数
setState(nextc);
return true;
}
//获取锁失败
return false;
addWaiter()方法
当前线程加入AQS双向链表队列。
写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。
首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。
/**
* 将新节点和当前线程关联并且入队列
* @param mode 独占/共享
* @return 新节点
*/
private Node addWaiter(Node mode) {
//初始化节点,设置关联线程和模式(独占 or 共享)
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点引用
Node pred = tail;
// 尾节点不为空,说明队列已经初始化过
if (pred != null) {
node.prev = pred;
// 设置新节点为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq()的处理逻辑就相当于自旋加上CAS保证一定能写入队列。
acquireQueued()方法
写入队列后,需要挂起当前线程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。
如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。
waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。
shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
他是利用 LockSupport 的 park() 方法来挂起当前线程的,直到被唤醒。
unlock()函数
释放时候,state--,通过state==0判断锁是否完全被释放。
成功释放锁的话,唤起一个被挂起的线程
他是利用 LockSupport 的 unpark() 方法来唤醒队列头部节点之后的首个线程。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒被挂起的线程
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
总结:
1.每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;
2.通过大量CAS保证多个线程竞争锁的时候的并发安全;
3.可重入的功能是通过维护state变量来记录重入次数实现的。
4.公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;
5.非公平锁可以直接抢占,所以效率更高;
猜你喜欢
- 2025-01-03 ant design Modal关闭时清除数据的解决方案
- 2025-01-03 Flink状态管理详解:Keyed State和Operator List State深度解析
- 2025-01-03 电气自动化专业词汇中英文对照表
- 2025-01-03 事务相关知识集锦
- 2025-01-03 FlexSim常用脚本语言汇总--搜集最常用
- 2025-01-03 停止javascript的ajax请求,取消axios请求,取消reactfetch请求
- 2025-01-03 MySQL进阶垫脚石:线程长时间处于killed状态怎么破?
- 2025-01-03 还搞不定 SQL 阻塞与超时?早晚得出事!
- 2025-01-03 细说ReactiveCocoa的冷信号与热信号(二):为什么要区分冷热信号
- 2025-01-03 我画了35张图就是为了让你深入 AQS
- 最近发表
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 计算机网络的拓扑结构是指() (45)
- 稳压管的稳压区是工作在什么区 (45)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)