DelayQueue

DelayQueue对元素进行持有直到一个特定的延迟到期,不允许null元素且注入其中的元素必须实现java.util.concurrent.Delayed接口。 PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列,DelayQueue其实就是在每次往优先级队列中添加元素, 然后以元素的delay过期值作为排序的因素,以此来达到先过期的元素会排在队首,每次从队列里取出来都是最先过期的元素。

源码分析

DelayQueue是容量无界的最大为Integer.MAX_VALUE,默认容量为11,若容量不够以当前容量50%递增, 可以使用一个现有集合对象初始化。

  • DelayQueue()
  • DelayQueue(Collection<? extends E> c)
    元素需要实现的接口Delayed
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

属性

// 重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 锁条件对象
private final Condition available = lock.newCondition();
// 根据delay时间排序的优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
// 用于优化阻塞通知的线程元素leader 用leader来减少不必要的等待时间
// 
private Thread leader = null;

leader的作用?

leader的主要作用用于减少不必要的阻塞时间,例如有多个消费者线程用take方法去取, 内部先加锁,然后每个线程都去peek第一个节点。如果leader不为空说明已经有线程在取了,设置当前线程阻塞。 如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环。

方法

添加元素

  • boolean offer(E e)
    public boolean offer(E e) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          q.offer(e); // 优先队列入队
          if (q.peek() == e) { // 查看元素是否是优先队列队首
              leader = null; // 设置leader为空 唤醒take线程
              available.signal();
          }
          return true;
      } finally {
          lock.unlock();
      }
    }
    
  • boolean offer(E e, long timeout, TimeUnit unit)
    同offer(E e)方法 因为队列没有容量限制故没有超时的offer方法
  • boolean add(E e)
    同offer(E e)方法 因为队列没有容量限制故没有抛出异常的add方法
  • void put(E e)
    同offer(E e)方法 因为队列没有容量限制故没有阻塞的put方法

移除元素

  • E poll()
    public E poll() {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          E first = q.peek(); // 查看优先队队首
          // 如果优先队列为空或队首delay时间为达到返回null
          if (first == null || first.getDelay(NANOSECONDS) > 0)
              return null;
          else
              return q.poll(); // 优先队列出队
      } finally {
          lock.unlock();
      }
    }
    
  • E poll(long timeout, TimeUnit unit)
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
      long nanos = unit.toNanos(timeout);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          for (;;) {
              E first = q.peek();
              if (first == null) { // 当优先队列为空时
                  if (nanos <= 0) // 超过dequeue操作的timeout时间 返回null
                      return null;
                  else // dequeue操作阻塞
                      nanos = available.awaitNanos(nanos);
              } else {
                  long delay = first.getDelay(NANOSECONDS);
                  if (delay <= 0)
                      return q.poll();
                  if (nanos <= 0)
                      return null;
                  // 如果优先队列队首未达到可用时间 释放队首节点避免内存溢出
                  first = null; // don't retain ref while waiting
                  // 如果队首delay时间大于poll等待时间
                  // 或者leader不为null(已有其他线程在阻塞队首delay时间)
                  // 继续进行poll等待
                  if (nanos < delay || leader != null)
                      nanos = available.awaitNanos(nanos);
                  else {
                      // 如果队首delay时间小于poll阻塞时间
                      // 且leader为null(没有线程在阻塞队首delay时间)
                      // 将当前线程设置为leader
                      Thread thisThread = Thread.currentThread();
                      leader = thisThread;
                      try {
                          // 阻塞delay时间
                          long timeLeft = available.awaitNanos(delay);
                          // 设置poll需要阻塞的时间
                          nanos -= delay - timeLeft;
                      } finally {
                          // 释放leader
                          if (leader == thisThread)
                              leader = null;
                      }
                  }
              }
          }
      } finally {
          // 如果没有线程阻塞delay且优先队列队首不为null
          if (leader == null && q.peek() != null)
              available.signal(); // 唤醒等待消费的线程
          lock.unlock();
      }
    }
    

    出队方法中为什么要释放first元素?

    假如有线程A和B都来获取队首,如果线程A阻塞完毕,获取对象成功,出队完成。 这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first。

  • E remove()
    public E remove() {
      E x = poll(); // 见上poll()方法
      if (x != null)
          return x;
      else
          throw new NoSuchElementException();
    }
    
  • boolean remove(Object o)
    public boolean remove(Object o) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          // 调用优先队列remove(Object o)方法
          return q.remove(o);
      } finally {
          lock.unlock();
      }
    }
    
  • E take()
    public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          for (;;) {
              E first = q.peek();
              if (first == null) // 如果队首为null 阻塞当前线程
                  available.await();
              else {
                  long delay = first.getDelay(NANOSECONDS);
                  if (delay <= 0) // 队首可用 队列弹出队首元素
                      return q.poll();
                  // 防止内存溢出
                  first = null; // don't retain ref while waiting
                  if (leader != null) // 如果有其他线程在阻塞队首delay时间 阻塞当前线程
                      available.await();
                  else {
                      Thread thisThread = Thread.currentThread();
                      leader = thisThread;
                      try {
                          // 阻塞队首delay时间
                          available.awaitNanos(delay);
                      } finally {
                          if (leader == thisThread)
                              leader = null;
                      }
                  }
              }
          }
      } finally {
          if (leader == null && q.peek() != null)
              available.signal();
          lock.unlock();
      }
    }
    

总结

DelayQueue底层是一个优先队列(java.util.PriorityQueue),使用一个重入锁和锁生成的条件对象进行并发控制。 toArray、drainTo、clear都加锁但contains、toString未加锁。