概述
DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
- 使用案例
- 简介
- 源码分析
使用案例
因为DelayQueue要求每一个入队的元素都要实现Delayed
接口,也就是实现一个获取当前对象的延迟时间的方法。另外他的内部是通过使用PriorityQueue存放数据,因此你最好在内部实现一个比较延迟时间的比较器,这样可以按照延迟时间的大小来进行队列的建立,这样也会使得延迟时间最短的放在队列的最前面。不会因为队头延迟时间没到而阻塞了后面延迟时间到的元素出队。后面介绍源码的时候你会更清楚这里说的,先看下面一个简单demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
| package JUC.collect;
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
public static void main(String[] args) { DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();
producer(delayQueue);
consumer(delayQueue);
while (true) { try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }
private static void producer(final DelayQueue<DelayedElement> delayQueue) { new Thread(new Runnable() { @Override public void run() { while (true) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
DelayedElement element = new DelayedElement(1000, "test"); delayQueue.offer(element); } } }).start();
new Thread(new Runnable() { @Override public void run() { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("delayQueue size:" + delayQueue.size()); } } }).start(); }
private static void consumer(final DelayQueue<DelayedElement> delayQueue) { new Thread(new Runnable() { @Override public void run() { while (true) { DelayedElement element = null; try { element = delayQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "---" + element); } } }).start(); } }
class DelayedElement implements Delayed {
private final long delay;
private final long expire;
private final String msg;
private final long now;
public DelayedElement(long delay, String msg) { this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; now = System.currentTimeMillis(); }
@Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }
@Override public String toString() { final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delay); sb.append(", expire=").append(expire); sb.append(", msg='").append(msg).append('\''); sb.append(", now=").append(now); sb.append('}'); return sb.toString(); } }
|
这个大体上就是实现了一个Delayed
接口的类。然后模拟了生产者消费者模型来回进行入队出队操作。具体的源码解释的比较详细。
2. 简介
类图如下
使用场景主要有以下俩个:
- TimerQueue的内部实现
- ScheduledThreadPoolExecutor中DelayedWorkQueue是对其的优化使用
3. 源码分析
首先介绍后面会用到的属性
1 2 3 4 5 6
| private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
|
构造函数
1 2 3 4 5
| public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
|
入队操作
因为DelayQueue
是无界队列,所以入队的时候是不会阻塞住,他们所有的入队操作都是调用的下面这个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
|
这里入队还是比较简单的,主要是借助PriorityQueue
来进行存储,将等待时间长的放在队尾,短的放在队头。
出队操作
take 操作
获取并移除队列首元素,如果队列没有过期元素则等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
第一次调用take时候由于队列空,所以把当前线程放入available的条件队列等待,当执行offer并且添加的元素就是队首元素时候就会通知最先等待的线程激活,循环重新获取队首元素,这时候first假如不空,则调用getdelay方法看该元素海剩下多少时间就过期了,如果delay<=0则说明已经过期,则直接出队返回。否者看leader是否为null,不为null则说明是其他线程也在执行take则把该线程放入条件队列,否者是当前线程执行的take方法,则调用await直到剩余过期时间到(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,该线程会重新竞争得到锁,重新进入循环。说明当前take返回了元素,如果当前队列还有元素则调用singal激活条件队列里面可能有的等待线程。leader那么为null,那么是第一次调用take获取过期元素的线程,第一次调用的线程调用设置等待时间的await方法等待数据过期,后面调用take的线程则调用await直到signal。
poll操作
获取并移除队头过期元素,否者返回null。这个方法其实和上面的take操作大体上差不多,只是这个操作有一个等待过期的时间,如果超过这个时间还没有获取到元素,则之间返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
参考
- Java延时队列DelayQueue的使用
- 并发队列-无界阻塞延迟队列DelayQueue原理探究