并发包阻塞队列之ArrayBlockingQueue,并发包阻塞队
分类:新闻中心

10.并签发承包合约阻塞队列之ArrayBlockingQueue,并发队列阻塞队列

  上后生可畏节中对并发包中的非阻塞队列ConcurrentLinkedQueue的入队、出队做了一个轻松的剖判,本文将对并签发承包合约中的阻塞队列做一个简约解析。 

  Java并发包中的阻塞队列风度翩翩共7个,当然他们都是线程安全的。 

  ArrayBlockingQueue:八个由数组结构组成的有界阻塞队列。 

  LinkedBlockingQueue:三个由链表结构重新组合的有界阻塞队列。 

  PriorityBlockingQueue:一个支撑先行级排序的无界阻塞队列。 

  DealyQueue:四个利用优先级队列完成的无界阻塞队列。 

  SynchronousQueue:八个不存款和储蓄成分的短路队列。 

  LinkedTransferQueue:二个由链表结构构成的无界阻塞队列。 

  LinkedBlockingDeque:四个由链表结构组成的双向阻塞队列。(摘自《Java并发编制程序的秘技》) 

  在本文对ArrayBlockingQueue阻塞队列做一个粗略深入分析 

  对于ArrayLinkedQueue,放眼看千古其安全性的担保是由ReentrantLock保障的,有关ReentrantLock的剖释可参照他事他说加以考查《5.Lock接口及其达成ReentrantLock》,在下文笔者也会方便的聊到。 

  首先来查阅其构造函数: 

构造方法 

 

public ArrayBlockingQueue(int capacity) 

构造指定大小的有界队列 

public ArrayBlockingQueue(int capacity, boolean fair) 

构造指定大小的有界队列,指定为公平或非公平锁 

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) 

 

构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合 

 1 public ArrayBlockingQueue(int capacity) { 
 2   this(capacity, false);//默认构造非公平锁的阻塞队列 
 3 } 
 4 public ArrayBlockingQueue(int capacity, boolean fair) { 
 5   if (capacity <= 0)  
 6     throw new IllegalArgumentException(); 
 7   this.items = new Object[capacity]; 
 8   lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁 
 9   notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》
10   notFull = lock.newCondition;//初始化非满等待队列 
11 } 
12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
13   this(capacity, fair); 
14   final ReentrantLock lock = this.lock; 
15   lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 
16   try { 
17     int i = 0; 
18     try { 
19       for (E e : c) { 
20         checkNotNull(e); 
21         item[i++] = e;//将集合添加进数组构成的队列中 
22       } 
23     } catch (ArrayIndexOutOfBoundsException ex) { 
24       throw new IllegalArgumentException(); 
25     } 
26     count = i;//队列中的实际数据数量 
27     putIndex = (i == capacity) ? 0 : i; 
28   } finally { 
29     lock.unlock(); 
30   } 
31 } 

  在第15行,源码里给了一句注释: Lock only for visibility, not mutual exclusion。那句话的情趣就是交给,这些锁的操作并非为着排挤操作,而是锁住其可以看到性。线程T1是实例化ArrayBlockingQueue对象,T2是对实例化的ArrayBlockingQueue对象做入队操作(当然要力保T1和T2的施行种种),固然不对它实行加锁操作(加锁会确定保证其可知性,也正是写回主存),T1的集结c有望只存在T1线程维护的缓存中,并未有写回主存,T第22中学实例化的ArrayBlockingQueue维护的缓存以至主存中并不曾群集c,此时就因为可以预知性产生数据不近似的气象,引发线程安全主题材料。 

  以下是ArrayBlockingQueue的部分出队入队操作。

jdk1.7.0_79 

队列成分的插入

  

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

插入 

add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue 

offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 

offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。 

put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 

//ArrayBlockingQueue#add 
public boolean add(E e) { 
  return super.add(e); 
} 

//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 
public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义 
    return true; 
  else 
    throw new IllegalStateException(); 
}

//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false 
public boolean offer(E e) { 
  checkNotNull(e);//检查入队元素是否为空 
  final ReentrantLock lock = this.lock; 
  lock.lock();//获得锁,线程安全 
  try { 
    if (count == items.length)//队列满时,不阻塞等待,直接返回false 
      return false; 
    else { 
      insert(e);//队列未满,直接插入 
      return true; 
    } 
  } finally {
    lock.unlock();
  }
} 

//ArrayBlockingQueue#insert 
private void insert(E e) { 
  items[putIndex] = x; 
  putIndex = inc(putIndex); 
  ++count; 
  notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》

 }

  在那地有多少个ArrayBlockingQueue成员变量。items即队列的数组援引,putIndex表示等待插入的数组下标地方。当items[putIndex] = x将新成分插入队列中后,调用inc将数组下标向后活动,如若队列满则将putIndex置为0:

//ArrayBlockingQueue#inc 
private int inc(int i) { 
  return (++i == items.length) ? 0 : i; 
} 

  接着剖判下put方法,阻塞插入队列,当队列满时不会回来false,也不会抛出非常,而是直接不通等待,直到有空位可插入,但它可被中断再次回到。

//ArrayBlockingQueue#put 
public void put(E e) throws InterruptedException { 
  checkNotNull(e);//同样检查插入元素是否为空 
  final ReentrantLock lock = this.lock; 
  lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 
  try { 
    while (count == items.length) 
      notFull.await();//当队列满时,使非满等待队列休眠 
    insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列 
  } finally { 
    lock.unlock(); 
  } 
}  

  在上文《10.并发包阻塞队列之ArrayBlockingQueue》中简要分析了ArrayBlockingQueue部分源码,在本文中后生可畏致要介绍的是Java并发包中的阻塞队列LinkedBlockingQueue。ArrayBlockingQueue队列是由数组完成,而LinkedBlockingQueue队列的兑现则是链表(单向链表)达成,所以在LinkedBlockingQueue有二个Node内部类来表示链表的节点。  

队列成分的删减 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

 

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, 
public E remove() { 
  E x = poll();//poll方法由Queue接口定义 
  if (x != null) 
    return x; 
  else 
    throw new NoSuchElementException(); 
} 

//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null 
public E poll() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return (count == 0) ? null : extract(); 
  } finally { 
    lock.unlock(); 
  } 
}

//ArrayBlockingQueue#extract 
private E extract() { 
  final Object[] items = this.items; 
  E x = this.<E>cast(items[takeIndex]);//移除队首元素 
  items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收 
  takeIndex = inc(takeIndex); 
  --count; 
  notFull.signal();//唤醒非满等待队列线程 
  return x; 
} 

  相比add和offer方法,精通了上四个点子后remove和poll实际轻便精通,同理在知晓了put阻塞插入队列后,相比take阻塞删除队列成分同样也很好精通。

//ArrayBlockQueue#take 
public E take() throws InterruptedException { 
  final ReentrantLock lock = this.lock(); 
  lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 
  try { 
    while (count == 0)//队列元素为空 
      notEmpty.await();//非空等待队列休眠 
    return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列 
  } finally { 
    lock.unlock(); 
  } 
} 

  最后多个主意size。

public int size() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return count; 
  } finally { 
    lock.unlock(); 
  } 
}

  能够看看ArrayBlockingQueue队列的size方法,是直接回到的count变量,它不像ConcurrentLinkedQueue,ConcurrentLinkedQueue的size则是历次会遍历那个行列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法功效高。何况ConcurrentLinkedQueue的size方法并从未加锁!也正是说很有希望其size并不标准,那在它的笺注中表达了ConcurrentLinkedQueue的size并从未多大的用处。

 

 

上风度翩翩节中对并发包中的非阻塞队列 ConcurrentLinkedQueue 的入队、出队做了三个简便的分...

static final class Node<E> { 
  E item;//入队元素 
  Node<E> next;//指向后继节点 
  Node(E x) { 
    item = x; 
  } 
} 

  相像它也许有3个构造方法,与ArrayBlockingQueue略有分化。

 1 public LinkedBlockingQueue() { 
 2   this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 
 3 } 
 4 public LinkedBlockingQueue(int capacity) { 
 5   if (capacity <= o) throw new IllegalArgumentException(); 
 6   this.capacity = capacity; 
 7   last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 
 8 } 
 9 public LinkedBlockingQueue(Collection<? extends E> c ) { 
10   this(Integer.MAX_VALUE); 
11   final ReentrantLock putLock = this.putLock; 
12   putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 
13   try { 
14       int n = 0; 
15       for (E e : c) { 
16           if (e == null) 
17               throw new NullPointerException(); 
18           if (n == capacity) 
19               throw new IllegalStateException("Queue full"); 
20           enqueue(new Node<E>(e));//入队 
21           ++n; 
22       } 
23       count.set(n); 
24   } finally { 
25       putLock.unlock(); 
26   } 
27 } 

  在第12行中拿到锁是为着确认保障可以预知性,那些的案由小编觉着是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的履行顺序),尽管不对它实行加锁操作(加锁会确定保证其可知性,也正是写回主存),T1的集结c有异常的大希望只存在T1线程维护的缓存中,并未写回主存,T第22中学实例化的LinkedBlockingQueue维护的缓存以致主存中并不曾集结c,此时就因为可见性产生数据不黄金时代致的事态,引发线程安全主题材料。  

  在摸底完LinkedBlockingQueue的构造方法后,大家回过头来看LinkedBlockingQueue的七个分子变量: 

private final ReentrantLock takeLock = new ReentrantLock(); 
private final ReentrantLock putLock = new ReentrantLock(); 

  可知LinkedBlockingQueue中有三个锁,二个是为着锁住入队操作,贰个是为了锁住出队操作。而在ArrayBlockingQueue中则唯有一个锁,同一时间锁住队列的入队、出队操作。 

private final Condition notEmpty = takeLock.newCondition(); 
private final Condition notFull = putLock.newCondition(); 

  那四个成员变量则是线程等待队列,多少个是出队锁上的守候队列,叁个是入队锁上的等候队列。在ArrayBlockingQueue也会有多个等待队列,二个是非空等待队列,另三个则是非满等待队列,在此或多或少上两个如出风姿罗曼蒂克辙。 

队列成分的插入 

  

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

插入 

add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue 

offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 

offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。 

put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 

  LinkedBlockingQueue中并不曾像ArrayBlockingQueue那样重写了AbstractQueue的add方法而直白调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add相符,都是直接调用其AbstractQueue。

//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 
public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义 
    return true; 
  else 
    throw new IllegalStateException(); 
} 

 1 //LinkedBlockingQueue#offer 
 2 public boolean offer(E e) { 
 3   if (e == null) throw new NullPointerException(); 
 4   final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 
 5   if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 
 6     return false; 
 7   int c = -1; 
 8   Node<E> node = new Node(e); 
 9   final ReentrantLock putLock = this.putLock;//插入锁 
10   putLock.lock();//获得插入锁 
11   try { 
12     if (count.get() < capacity) { 
13       enqueuer(node);//入队 
14       c = count.getAndIncrement();//队列数据总数自增+1后返回 
15       if (c + 1 < capacity) 
16         notFull.signal();//唤醒非满等待队列上的线程 
17     } 
18   } finally { 
19     putLock.unlock(); 
20   } 
21   if (c == 0) 
22     signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 
23   return c >= 0 
24 } 

  在第10行是收获插入锁,和ArrayBlockingQueue唯有叁个锁不相同的是,LinkedBlockingQueue分为入队锁和出队锁,也便是说对于ArrayBlockingQueue同期只可以有多少个线程对它实行入队大概出队操作,而对于LinkedBlockingQueue来讲同临时常间能有三个线程对队列实行入队也许出队操作。 

  前七个add和offer方法都是非阻塞的,对于put方法则是阻塞的,线程会一贯不通直到线程非空或许非满,不过它在堵塞时能被线程中断重回。

//LinkedBlockingQueue#put 
public void put(E e) throws InterruptedException { 
  if (e == null) throws new NullPointerException(); 
  int c = -1; 
  Node<E> node = new Node(e); 
  final ReentrantLock putLock = this.putLock; 
  final AtomicInteger count = this.count; 
  putLock.lockInterrupted();//能被线程中断地获取锁 
  try { 
    while (count.get() == capacity) {//队列数据量等于队列容量 
      notFull.await();//休眠非满等待队列上的线程 
    } 
    enqueuer(node);//入队 
    c = count.getAndIncrement();//队列数据总数自增+1后返回 
    if (c + 1 < capacity)//还没有达到队列容量 
      notFull.signal();//唤醒非满等待队列上的线程 
  } finally { 
    putLock.unlock(); 
  } 
  if (c == 0) 
  signalNotEmpty();//唤醒非空等待队列上的线程 
} 

  队列插入的最终一个艺术来看上边现身的enqueue入队方法。

private void enqueuer(Node<E> node) { 
  last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 
} 

队列成分的去除 

抛出异常  

返回值(非阻塞)  

一定时间内返回值  

返回值(阻塞)  

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue  

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。  

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值  

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。  

//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 
public E remove() {  
  E x = poll();//poll方法由Queue接口定义  
  if (x != null)  
    return x;  
  else  
    throw new NoSuchElementException();  
} 

//LinkedBlockingQueue#poll 
public E poll() { 
  final AtomicInteger count = this.count; 
  if (count.get() == 0)  
    return null; 
  E x = null; 
  int c = -1; 
  final ReentrantLock takeLock = this.takeLock; 
  takeLock.lock();//获取出队锁 
  try { 
    if (count.get() > 0) {//队列不为空 
      x = dequeuer();//出队 
      c = count.getAndDecrement();//队列数据自减-1返回 
      if ( c > 1)  
        notEmpty.signal();//唤醒非空等待队列上的线程 
    } 
  } finally { 
    takeLock.unlock(); 
  } 
  if (c == capacity) 
    signalNotFull();//唤醒非满等待队列上的线程 
  return x; 
} 

  前两个remove和poll方法都以非阻塞的,对于take方准绳是阻塞的,线程会向来不通直到线程非空恐怕非满,不过它在堵塞时能被线程中断重临。

public E take() throws InterruptedException { 
  E x; 
  int c = -1; 
  final AtomicInteger count = this.count; 
  final ReentrantLock takeLock = this.takeLock; 
  take.lockInterruptibly();//可被线程中断返回地获取锁 
  try { 
    while (count.get() == 0) {//队列数据为空 
      notEmpty.await();//休眠非空等待队列上的线程 
    } 
    x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 
    c = count.getAndDecrement(); 
  if (c > 1) 
    notEmpty.signal();//唤醒非空等待队列上的线程 
  } finally { 
    takeLock.unlock(); 
  } 
  if (c == capacity) 
    signalNotFull();//唤醒非满等待队列 
  return x; 
} 

  队列出队的结尾二个格局来看下边现身的dequeue入队方法。

private E dequeue() { 
  Node<E> h = head;//头节点,为空 
  Node<E> first = h.next; 
  h.next = h;//此时没有节点指向头节点,便于GC 
  head = first; 
  E x = first.item; 
  first.item = null; 
  return x; 
} 

  最终二个主意size。

public int size() { 
  return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 
} 

 


这是三个能给程序猿加buff的万众号 

图片 1

 

本文由美高梅网址发布于新闻中心,转载请注明出处:并发包阻塞队列之ArrayBlockingQueue,并发包阻塞队

上一篇:1安装及基本设置,多语言版 下一篇:没有了
猜你喜欢
热门排行
精彩图文