概述
DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
- 使用案例
- 简介
- 源码分析
使用案例
因为DelayQueue要求每一个入队的元素都要实现Delayed
接口,也就是实现一个获取当前对象的延迟时间的方法。另外他的内部是通过使用PriorityQueue存放数据,因此你最好在内部实现一个比较延迟时间的比较器,这样可以按照延迟时间的大小来进行队列的建立,这样也会使得延迟时间最短的放在队列的最前面。不会因为队头延迟时间没到而阻塞了后面延迟时间到的元素出队。后面介绍源码的时候你会更清楚这里说的,先看下面一个简单demo

| package JUC.collect;
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
public static void main(String[] args) { DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();
producer(delayQueue);
consumer(delayQueue);
while (true) { try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }
private static void producer(final DelayQueue<DelayedElement> delayQueue) { new Thread(new Runnable() { @Override public void run() { while (true) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
DelayedElement element = new DelayedElement(1000, "test"); delayQueue.offer(element); } } }).start();
new Thread(new Runnable() { @Override public void run() { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("delayQueue size:" + delayQueue.size()); } } }).start(); }
private static void consumer(final DelayQueue<DelayedElement> delayQueue) { new Thread(new Runnable() { @Override public void run() { while (true) { DelayedElement element = null; try { element = delayQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "---" + element); } } }).start(); } }
class DelayedElement implements Delayed {
private final long delay;
private final long expire;
private final String msg;
private final long now;
public DelayedElement(long delay, String msg) { this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; now = System.currentTimeMillis(); }
@Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }
@Override public String toString() { final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delay); sb.append(", expire=").append(expire); sb.append(", msg='").append(msg).append('\''); sb.append(", now=").append(now); sb.append('}'); return sb.toString(); } }
|
这个大体上就是实现了一个Delayed
接口的类。然后模拟了生产者消费者模型来回进行入队出队操作。具体的源码解释的比较详细。
2. 简介
类图如下
使用场景主要有以下俩个:
- TimerQueue的内部实现
- ScheduledThreadPoolExecutor中DelayedWorkQueue是对其的优化使用
3. 源码分析
首先介绍后面会用到的属性
1 2 3 4 5 6
| private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
|
构造函数
1 2 3 4 5
| public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
|
入队操作
因为DelayQueue
是无界队列,所以入队的时候是不会阻塞住,他们所有的入队操作都是调用的下面这个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
|
这里入队还是比较简单的,主要是借助PriorityQueue
来进行存储,将等待时间长的放在队尾,短的放在队头。
出队操作
take 操作
获取并移除队列首元素,如果队列没有过期元素则等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
第一次调用take时候由于队列空,所以把当前线程放入available的条件队列等待,当执行offer并且添加的元素就是队首元素时候就会通知最先等待的线程激活,循环重新获取队首元素,这时候first假如不空,则调用getdelay方法看该元素海剩下多少时间就过期了,如果delay<=0则说明已经过期,则直接出队返回。否者看leader是否为null,不为null则说明是其他线程也在执行take则把该线程放入条件队列,否者是当前线程执行的take方法,则调用await直到剩余过期时间到(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,该线程会重新竞争得到锁,重新进入循环。说明当前take返回了元素,如果当前队列还有元素则调用singal激活条件队列里面可能有的等待线程。leader那么为null,那么是第一次调用take获取过期元素的线程,第一次调用的线程调用设置等待时间的await方法等待数据过期,后面调用take的线程则调用await直到signal。
poll操作
获取并移除队头过期元素,否者返回null。这个方法其实和上面的take操作大体上差不多,只是这个操作有一个等待过期的时间,如果超过这个时间还没有获取到元素,则之间返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
参考
- Java延时队列DelayQueue的使用
- 并发队列-无界阻塞延迟队列DelayQueue原理探究