java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock

锁是用来控制多个线程访问共享资源的方式,java中可以使用synchronizedLock实现锁的功能

synchronized是java中的关键字,隐藏获取和释放锁的过程,Lock是java中的接口,需要主动的获取锁和释放锁,synchronized是排他锁,而Lock支持可中断获取锁,超时获取锁

Lock提供的接口

public interface Lock {

    /**
     * 获取锁,调用该方法后当前线程获取锁,获取到锁之后从该方法返回
     */
    void lock();

    /**
     * 可中断的获取锁,在获取锁的过程中可以中断当前线程
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 尝试非阻塞的获取锁,调用方法后立即返回,获取到锁则返回true,否则返回false
     */
    boolean tryLock();

    /**
     * 超时获取锁,在超时时间内获取到锁,在超时时间被中断,超时时间内为获取到锁,三种情况下会从该方法返回
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 释放锁
     */
    void unlock();

    /**
     * 获取等待通知组件,只有当前线程获取到锁之后才可以调用该组件的wait()方法,释放锁
     */
    Condition newCondition();
}

队列同步器

队列同步器AbstractQueuedSynchronizerAQS简称同步器)是用来构建锁或者其他同步组件的基础框架

java中锁的实现基本都是通过聚合了一个同步器的子类完成线程访问控制的,同步器是实现锁的关键,可以这么理解,锁面向编程者,隐藏了实现细节,同步器面向锁的实现,简化了锁的实现方式,屏蔽了同步状态管理,线程排队,等待与唤醒等底层操作,通过AbstractQueuedSynchronizer我们可以很方便的实现一个锁

设计原则

同步器的设计基于模板方法模式,提供的模板方法主要包括:独占锁获取锁与释放同步状态,共享式获取与释放同步状态,获取同步队列中等待线程情况

独占式操作

想要实现一个独占式锁需要重写以下方法

方法名 描述
void acquire(int arg) 独占式获取同步状态,同一时刻只能有一个线程可以获取到同步状态,获取失败进入同步队列等待
void acquireInterruptibly(int arg) 独占式获取同步状态,响应中断操作,被中断时会抛异常并返回
boolean tryAcquireNanos(int arg, long nanosTimeout) 独占式获取同步状态,响应中断操作,并且增加了超时限制,如果规定时间没有获得同步状态就返回false,否则返回true
boolean release(int arg) 独占式释放同步状态,在释放同步状态之后,将同步队列中的第一个节点包含的线程唤醒

共享式操作

想要实现一个共享锁需要重写以下方法

方法名 描述
void acquireShared(int arg) 共享式获取同步状态,同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 共享式获取同步状态,响应中断操作
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 共享式获取同步状态,响应中断操作,并且增加了超时限制,如果规定时间没有获得同步状态就返回false,否则返回true
boolean releaseShared(int arg) 共享式释放同步状态
获取同步队列线程信息
方法名 描述
Collection getQueuedThreads() 获取同步队列上的线程集合

在这些模板方法中,多次提到了同步队列,我们看一下AQS是如何实现同步队列的

首先看下AbstractQueuedSynchronizer的类图

Node

Node类是AbstractQueuedSynchronizer类的内部类,同步器依靠内部的一个同步队列来完成同步状态的管理,当前线程获取同步状态失败的时候,同步器会将当前线程及等待信息构造成一个Node节点加入到同步队列中

属性 描述
waitStatus 该线程等待状态,包含如下:
CANCELLED 值为1,表示需要从同步队列中取消等待
SIGNAL值为-1,表示后继节点处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行
CONDITION值为-2,表示节点在等待队列中
PROPAGATE值为-3,表示下一次共享式同步状态获取将会无条件传播下去
INITIAL值为0,表示初始状态
prev:Node 前驱节点
next:Node 后继节点
thread:Thread 当前线程
nextWaiter:Node 下一个等待节点

可以看到AQS中的节点信息包含前驱和后继节点,所以我们知道了AQS的同步队列是双向链表结构的

AQS

AQS中的几个重要属性

属性 描述
state:int 同步状态:如果等于0,锁属于空闲状态,如果等于1,标识锁被占用,如果大于1,则表示锁被当前持有的线程多次加锁,即重入状态
head:Node 队列的头节点
tail:Node 队列的尾节点
unsafe:Unsafe AQS中的cas算法实现

AQS中提供了三个方法对同步状态进行操作

  1. getState()获取到同步状态
  2. setState(int newState)设置同步状态
  3. compareAndSetState(int expect, int update)使用CAS设置当前状态,该方法能够保证设置的原子性

AQS的基本结构如下图所示

在同步器中headtail的节点的引用指向同步队列的头,尾节点,这样在后面操作节点入列和出列的时候只需要操作同步器中的headtail节点就可以

独占式锁

ReentrantLock

ReentrantLock重入锁,内部AQS的实现是基于独占式获取/释放同步状态的。我们学习一下ReentrantLock的实现原理来进一步加深对AQS的理解

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,它表示一个线程可以对资源重复加锁,同时支持获取锁时使用公平锁还是非公平锁

例:

/**
 * @author: chenmingyu
 * @date: 2019/4/12 15:09
 * @description: ReentrantLock
 */
public class ReentrantLockTest {

    private static Lock LOCK = new ReentrantLock();

    public static void main(String[] args) {
        Runnable r1 = new TestThread();
        new Thread(r1,"r1").start();
        Runnable r2 = new TestThread();
        new Thread(r2,"r2").start();
    }

    public static class TestThread implements Runnable{

        @Override
        public void run() {
            LOCK.lock();
            try {
                System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now());
                TimeUnit.SECONDS.sleep(3L);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                LOCK.unlock();
            }
        }
    }
}

输出

只有在r1线程释放锁之后r2线程才获取到锁去执行代码打印数据

源码分析

创建的实例,默认使用非公平锁,如果需要公平锁,需要调用有参的构造函数

/**
 * 非公平锁
 * 创建ReentrantLock实例,默认使用非公平锁
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * 公平锁
 * 创建ReentrantLock实例,fair为true使用公平锁
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSyncFairSync都是ReentrantLock类的内部类,继承自ReentrantLock类的内部类SyncSync类继承了AbstractQueuedSynchronizer

类图如下

独占式锁的获取

非公平锁的实现

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

非公平锁会在调用lock()方法的时候首先调用compareAndSetState(0, 1)方法尝试获取锁,如果没有获取到锁则调用acquire(1)方法

compareAndSetState(0, 1)方法是一个CAS操作,如过设置成功,则为获取到同步状态,并调用setExclusiveOwnerThread(Thread.currentThread());方法将当前线程设置为独占模式同步状态的所有者

我们所说的获取同步状态其实指的就是获取锁的状态,获取同步状态成功则加锁成功

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

acquire(1)方法是提供的模板方法,调用tryAcquire(arg)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg)方法调用的是子类的实现,NonfairSynctryAcquire方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nonfairTryAcquire(acquires)方法

/**
 * 非公平尝试获取同步状态
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        /**
         * 首先根据`getState()`方法获取同步状态,如果等于0尝试调用`compareAndSetState(0,                 * acquires)`方法获取同步状态,如果设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的          * 所有者
         */
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  1. 根据getState()方法获取同步状态,如果等于0尝试调用compareAndSetState(0, acquires)方法获取同步状态,如果设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的所有者
  2. 如果当前线程等于独占式同步状态所有者的线程,那么就将state+1,表示当前线程多次加锁

如果tryAcquire(arg) 返回false,表示没有获取到同步状态,即没有拿到锁,所以需要调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法将当前线程加入到同步队列中,并且以死循环的方式获取同步状态,如果获取不到则阻塞节点中的线程,而被阻塞的线程只能通过前驱节点的出队,或者阻塞线程被中断来实现唤醒

addWaiter(Node.EXCLUSIVE)方法的作用就是构造同步队列的节点信息,然后加入到同步队列尾部

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

首先调用Node类的构造方法创建一个实例,tailAQS中队列的尾节点

如果tail节点不为空,将实例的前驱节点置为tail指向的节点,然后调用compareAndSetTail(pred, node)方法,compareAndSetTail(pred, node)方法调用unsafe.compareAndSwapObject(this, tailOffset, expect, update),此方法是一个CAS操作,不可中断,用来保证节点能够被线程安全的添加,设置成功后,将节点tail的后继节点指向当前实例,以此来实现将当前实例加入到同步队列尾部

如果tail节点等于空或者compareAndSetTail(pred, node)设置失败,则会调用enq(node)方法

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在这个方法中利用for循环构造了一个死循环,如果当前AQStail节点为空,则证明当前同步队列中没有等待的线程,也就是没有节点,调用compareAndSetHead(new Node())方法构造了一个头节点,然后循环调用compareAndSetTail(t, node)将当前实例加入到队列的尾部,如果失败就一直调用,直到成功为止

在调用addWaiter(Node mode)方法后会调用acquireQueued(final Node node, int arg)方法,作用是在每个节点进入到同步队列中后就进入了一个自旋的状态,通过校验自己的前驱节点是否是头节点,并且是否获取到同步状态为条件进行判断,如果满足条件则从自旋中退出,负责一直自旋

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

方法内也是一个for的死循环,通过node.predecessor()方法获取传入的Node实例的前驱节点并与AQShead节点进行比较,如果相等,则尝试获取同步状态获取锁,如果获取成功就调用setHead(node);方法将当前Node实例节点设置为head节点,将原来head节点的后继节点置为null,有助于GC回收

setHead(node);

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

如果传入的Node实例的前驱节点与AQShead节点不相等或者获取同步状态失败,则调用shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

通过CAS操作,设置节点的前驱节点等待状态为Node.SIGNAL,如果设置失败,返回false,因为外层是死循环,会重复当前方法直到设置成功

parkAndCheckInterrupt()方法调用LookSupport.park()阻塞线程,然后清除掉中断标识

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法返回后,调用selfInterrupt(),将线程中断

公平锁的实现

在了解acquire(1);方法的作用之后,在理解公平锁的实现就容易了

final void lock() {
    acquire(1);
}

对比非公平锁的实现少了一步上来就获取同步状态的操作,其余操作跟非公平锁的实现一样

公平锁与非公平锁总结

  1. 公平锁,在加锁之前如果有同步对列,则加入到同步队列尾部
  2. 非公平锁,在加锁之前不管有没有同步队列,先尝试获取同步状态,获取不到在加入到同步队列尾部
  3. 非公平锁比公平锁效率要高很多,公平锁保证了同步状态的获取按照FIFO原则,代价是需要进行大量的线程切换,而非公平锁情况下,当前线程在释放了同步状态之后再次获取到同步状态的记录非常大,可以减少大量的线程切换,但是可能会出现在同步队列中的某个线程一直获取不到锁的情况

独占式获取锁的流程

独占式锁的释放

ReentrantLockunlock()方法实际调用的AQSrelease(int arg)方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先调用tryRelease(arg)释放同步状态

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

获取同步状态,并减1,如果此时c==0则释放锁,将当前独占式锁的拥有线程置为null,然后设置state为0

然后调用unparkSuccessor(Node node)方法唤醒后继节点的线程

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        /*
         * 唤醒后继节点的线程
         */
        LockSupport.unpark(s.thread);
}

总结一下独占式获取锁和释放锁的过程:

  1. 获取锁的时候,首先会获取同步状态,如果获取成功则加锁成功,如果获取失败,将当前线程信息构造成节点信息并则加入到AQS维护的同步队列的尾部,并且开始自旋,跳出自旋的条件就是前驱节点为AQS的头节点并且获取到了同步状态,此时将节点移除同步队列
  2. 释放锁的时候,首先会释放同步状态,然后唤醒节点的后继节点
  3. 一个线程N次加锁之后,在释放锁的时候需要释放N次,之后才会被别的线程获取到锁
自己实现一个独占式锁

在了解了ReentrantLock的实现原理之后,我们就可以仿照着自己去实现一个自定义独占式锁了

步骤

  1. 创建一个LockTest类,实现Lock接口,重写必要的接口
  2. LockTest类里创建一个内部类Sync,继承AQS,因为要实现独占式锁,所以重写tryAcquire(int arg)tryRelease(int arg)方法就可以了

LockTest代码

/**
 * @author: chenmingyu
 * @date: 2019/4/11 15:11
 * @description: 自定义独占式锁
 */
public class LockTest implements Lock{

    private final Sync SYNC = new Sync();

    public static class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if(getState()<1){
                throw new IllegalMonitorStateException("释放同步状态不可小于1");
            }
            int c = getState() - arg;
            if (c == 0) {
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return true;
        }
    }

    @Override
    public void lock() {
        SYNC.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        SYNC.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return SYNC.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        SYNC.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
验证
/**
 * @author: chenmingyu
 * @date: 2019/4/12 15:09
 * @description: LockTest
 */
public class ReentrantLockTest {

    private static Lock LOCKTEST = new LockTest();

    public static void main(String[] args) {
        Runnable r1 = new TestThread();
        new Thread(r1,"LockTest 1").start();
        Runnable r2 = new TestThread();
        new Thread(r2,"LockTest 2").start();
    }

    public static class TestThread implements Runnable{

        @Override
        public void run() {
            LOCKTEST.lock();
            try {
                System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now());
                TimeUnit.SECONDS.sleep(3L);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                LOCKTEST.unlock();
            }
        }
    }
}

输出

共享式锁

读写锁

ReentrantReadWriteLock是读写锁的实现,实现ReadWriteLock接口

ReentrantReadWriteLock内部同样维护这一个Sync内部类,实现了AQS,通过重写对应方法实现读锁和写锁

现在已经知道了同步状态是由AQS维护的一个整型变量state,独占式锁获取到锁时会对其进行加1,支持重入,而读写锁ReentrantReadWriteLock在设计的时候也是通过一个整型变量进行读锁的同步状态和写锁的同步状态维护,在一个变量上维护两种状态就需要对整型变量进行按位分割,一个int类型的变量包含4个字符,一个字符8个bit,就是32bit,在ReentrantReadWriteLock中,高16位表示读,低16位表示写

写锁的获取

读写锁中的写锁,支持重进入的排它锁

重写ReentrantReadWriteLock的内部类Sync中的tryAcquire(int acquires)方法

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    /*
     * 1,如果同步状态c不等于0,代表着有读锁或者写锁
     */
    if (c != 0) {
        // 2,如果c不等于0,w写锁的同步状态为0,切当前线程不是持有锁的线程,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

解读

如果存在读锁,写锁不能被获取,必须要等到其他读线程释放读锁,才可以获取到写锁,这么做的原因是要确保写锁做的操作对读锁可见,如果写锁被获取,则其他读写线程的后续访问均会被阻塞

写锁的释放

读写锁中的读锁,支持重进入的共享锁

写锁的释放与独占式锁释放过程相似,每次都是减少写锁的同步状态,直到为0时,表示写锁已被释放

读锁的获取与释放

读锁是一个支持重入的共享锁,重写ReentrantReadWriteLock的内部类Sync中的tryAcquireShared(int unused)方法

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

如果其他线程获取了写锁,则当前线程获取读锁状态失败进入等待状态,如果当前线程获取了写锁或者写锁未被获取,则当前线程获取同步状态成功,获取到读锁

释放读锁的时候就是每次释放都会对同步状态进行-1,直到为0时,表示读锁已被释放

锁降级

锁降级是指将写锁降级为读锁,这个过程就是当前线程已经获取到写锁的时候,在获取到读锁,随后释放写锁的过程,这么做的目的为的就是保证数据的可见性

当前线程A获取到写锁后,对数据进行修改,之后在获取到读锁,然后释放写锁,完成锁降级,这时候线程A还没释放读锁,别的线程就无法获取到写锁,就无法对数进行修改,以此来保证数据的可见性

参考:java并发编程的艺术

推荐:

java并发编程 | 线程详解

文章作者: 陈明羽
文章链接: https://chenmingyu.top/concurrent-lock/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 陈明羽
打赏
  • 微信

评论