1 /**
2   A FIFO queue implementation
3 */
4 module dcord.util.queue;
5 
6 import vibe.core.sync;
7 
8 import std.container.dlist,
9        std.array,
10        core.time;
11 
12 /**
13   A simple Queue of type T
14 */
15 class Queue(T) {
16   private DList!(T) data;
17 
18   /// Returns the size of the array
19   private size_t _size;
20 
21   /// Set the size of the array
22   void setSize(size_t size) {
23     this._size = size;
24   }
25   
26   /// Get the size of the array.
27   @property size_t size() {
28     return this._size;
29   }
30 
31   /// Returns true if the queue is empty
32   @property bool empty() {
33     return this.data.empty;
34   }
35 
36   /// Raw array access
37   @property T[] array() {
38     return this.data.array;
39   }
40 
41   /// Clear the entire queue
42   void clear() {
43     this.data.removeFront(this.size);
44     this.setSize(0);
45   }
46 
47   /// Push an item to the back of the queue. Returns false if the queue is full.
48   bool push(T item) {
49     this.data.insertBack(item);
50     this.setSize(this.size + 1);
51     return true;
52   }
53 
54   /// Push multiple items to the back of the queue. Returns false if the queue is
55   //   full.
56   bool push(T[] arr) {
57     this.data.insertBack(arr);
58     this.setSize(this.size + 1);
59     return true;
60   }
61 
62   /// Pops a single item off the front of the queue. Throws an exception if the
63   //   queue is empty.
64   T pop() {
65     if(this.data.empty) throw new Exception("Cannot peak into empty Queue");
66     T v = this.data.front;
67     this.data.removeFront();
68     this.setSize(this.size - 1);
69     return v;
70   }
71 
72   /// Peaks at the last item in the queue.
73   T peakBack() {
74     if(this.data.empty) throw new Exception("Cannot peak into empty Queue");
75     return this.data.back;
76   }
77 
78   /// Peaks at the first item in the queue.
79   T peakFront() {
80     if(this.data.empty) throw new Exception("Cannot peak into empty Queue");
81     return this.data.front;
82   }
83 }
84 
85 /**
86   A blocking queue of type T.
87 */
88 class BlockingQueue(T): Queue!T {
89   private ManualEvent onInsert;
90   private ManualEvent onModify;
91 
92   this() {
93     this.onInsert = createManualEvent();
94     this.onModify = createManualEvent();
95   }
96 
97   override void setSize(size_t size) {
98     if (this._size < size) {
99       this.onInsert.emit();
100     }
101 
102     this.onModify.emit();
103     this._size = size;
104   }
105 
106   bool wait(Duration timeout) {
107     if (this.onModify.wait(timeout, 0) != 0) {
108       return false;
109     }
110     return true;
111   }
112 
113   void wait() {
114     this.onModify.wait();
115   }
116 
117   T pop(Duration timeout) {
118     if (this.onInsert.wait(timeout, 0) != 0) {
119       return null;
120     }
121 
122     return this.pop(false);
123   }
124 
125   /// Pops a single item off the front of the queue. Throws an exception if the
126   //   queue is empty.
127   T pop(bool block=true) {
128     if (this.data.empty) {
129       if (block) {
130         this.onInsert.wait();
131       } else {
132         return null;
133       }
134     }
135 
136     return this.pop();
137   }
138 
139   override T pop() {
140     T v = this.data.front;
141     this.data.removeFront();
142     this.setSize(this.size - 1);
143     return v;
144   }
145 }
146 
147 /**
148   A SizedQueue of type T
149 */
150 class SizedQueue(T): Queue!T {
151   /// Maximum size of the queue
152   size_t maxSize;
153 
154   this(size_t maxSize) {
155     this.maxSize = maxSize;
156   }
157 
158   /// True if the queue is full
159   @property bool full() {
160     return this.size == this.maxSize;
161   }
162 
163   /// Push an element to the queue
164   override bool push(T item) {
165     if (this.size == this.maxSize) return false;
166     return super.push(item);
167   }
168 
169   /// Push another queue to the queue
170   override bool push(T[] arr) {
171     if (arr.length + this.size > this.maxSize) return false;
172     return super.push(arr);
173   }
174 }