Woowen You can do anything you set your mind to, man

JAVA8 AbstractQueuedSynchronizer

JAVA中的AbstractQueuedSynchronizer,提供了一些阻塞队列的基础方法,简称AQS框架

  • 1.给阻塞式锁提供了一个框架,跟一些同步器有关,比如信号量,事件等等
  • 2.基于FIFO(先进先出)的阻塞队列实现
  • 3.给很多基于操作一个原子值表示状态的同步器提供基础的方法
  • 4.子类必须给这些状态提供保护方法去改变这些原子状态(一般用来加锁和释放锁)
  • 5.该类还提供其他的阻塞队列方法,他本身并不关心state的具体含义,这个部分由子类去定义,子类可以通过getState 方法去获取这些原子值,或者通过setStatecompareAndSetState去更新他
  • 5.AQS提供了独占和共享两种模式,当在独占模式下,其他线程试图访问都是失败的,共享模式下,其他线程试图访问,可能会成功
  • 6.不同的线程共享同一个队列
  • 7.AQS本身是不实现任何接口的除了序列化的.所以他是通过自己去定义锁的实现方式,并不是通过synchronized等同步方式
  • 8.AQS的等待队列,是CLH Lock的一种变种,CLH Lock经常被用来实现自旋锁
  • 9.当前线程如果处于队列的第一位,那么他回去尝试获取,但是并不会保证一定成功,他只是给了去获取的权利

插入CLH队列,只需要一个原子操作,作为tail,而出队列,只需要设置头部就行了

内部定义一个Node

static final class Node {

    // 表明共享模式中的一个node
    static final Node SHARED = new Node();
    // 表明独占模式中的一个node
    static final Node EXCLUSIVE = null;

    // 表明线程已经取消
    static final int CANCELLED =  1;

    // 表明线程需要挂起挂起
    static final int SIGNAL    = -1;

    // 表明线程需要等待
    static final int CONDITION = -2;

    // 表明下一个共享模式获取无条件
    static final int PROPAGATE = -3;


    volatile int waitStatus;

    // 当前节点的前一个节点,当前节点依赖他来检查waitStatus,在入队期间和出队的时候变成null
    volatile Node prev;

    // 当前节点的后一个节点,       
    volatile Node next;

    // 当前线程
    volatile Thread thread;

    // 下一个等待状态中的节点        
    Node nextWaiter;

    // 当节点以及在共享模式中等待,返回true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前一个节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
// 入队操作
private Node enq(final Node node) {
    for (;;) {
        // 获得尾节点
        Node t = tail;
        // 如果尾节点为空,说明这个队列没有值,那么就声明一个新的node并且设置他为头节点,并且把尾节点的标志为头节点
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 当前节点插入到尾节点之后,并且把它设置为尾节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

ReentrantLock

ReentrantLock实现了公平锁和非公平锁

Lock方法

// 非公平锁
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        // cas更新标志位为1,成功就设置当前线程为独占线程,否则说明锁已经被占用
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 否则说明有其他锁正在拥有,尝试获取
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

// 公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    // 公平获取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                    (current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

// 非公平获取
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

UnLock方法

// 尝试释放锁,成功就唤醒队列中的下一个node的线程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 释放锁
protected final boolean tryRelease(int releases) {
    // 标志位-1
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

1.公平锁和非公平锁的区别就在于lock方法.非公平锁会先直接尝试去更新state,获取锁,失败了再走队列获取,等于多了一次插队的机会,而公平锁就是老老实实走队列先进先出依次获取锁

2.ReentrantLock是独占锁

3.获取到锁,state + 1, 如果没有获取到,生成一个当前线程和独占模式的node,将他加入FIFO队列等待获取

4.公平锁和非公平锁释放锁的时候没有区别

5.释放锁,state - 1, 如果释放成功,将他下一个node的线程唤醒,并且队列的头节点改成它

6.入队操作调用AQS的实现,而加锁和释放锁是子类自己去实现的

ReentrantReadWriteLock

// 公平锁,非公平锁
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

// 公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

WriteLock 的Lock方法

  • 1.如果写锁和读锁没有被其他线程占用,那么立刻获取写锁,并计数+1
  • 2.如果当前线程已经占用了写锁,那么占用计数+1
  • 3.如果其他线程已经占用了锁,那么当前线程等待直到获取到了写锁
public void lock() {
    sync.acquire(1);
}

// 尝试获取,如果获取失败,那么加一个独占node进入队列,中断线程,所以可重入写锁是独占锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 尝试获取锁 
protected final boolean tryAcquire(int acquires) {    
    // 1. 如果读技术不为0,或者写计数不为0,并且当前线程不是加锁线程,那么直接返回失败
    // 2. 如果计数已经达到最大值,失败
    // 3. 或者,当前线程是可重入,并且队列允许,那么久更新状态并设置拥有者为当前线程
    Thread current = Thread.currentThread(); // 当前线程
    int c = getState(); // 获取状态值
    int w = exclusiveCount(c); // 获取独占计数
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超过最大计数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 可重入获取 state + 1
        setState(c + acquires);
        return true;
    }
    // 非公平锁,一直返回false
    // 公平锁,队列中只有一个线程等待,或者队列为空时,返回true,反之说明需要加锁,返回false
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 设置当前线程为独占线程
    setExclusiveOwnerThread(current);
    return true;
}

// 每个node包含两个属性,一个是线程,一个是类型,类型主要用来说明是独占node还是共享node
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

1.8中的tryAcquire方法,将c != 0 判断放到了前面,稍微做了些优化

WriteLock 的UnLock方法

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 尝试释放锁
    // 如果释放成功,那么需要将下一个队列中的线程挂起
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 标志位state - 1
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  • 1.ReentrantWriteReadLock的写锁是独占锁
  • 2.加锁就是state+1, 释放锁就是state-1

ReadLock 的Lock方法

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  • 1.ReentrantWriteReadLock中,写锁是独占模式,读锁是共享模式

  • 2.与AQS的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。

  • 3.与AQS的独占功能不同,当锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。

  • 4.可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。可重入锁最大的作用是避免死锁

当两个方法,方法1,和方法2都依赖锁A,线程获取锁A可以直接调用方法1,和方法2,而不用为了方法2再去获取一次锁A,这有就避免了死锁

参考

CLH Lock

https://segmentfault.com/a/1190000007094429

AQS实现分析

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer#anch113563

Woowen 浙ICP备15013647号