001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.collections4.queue; 018 019import java.io.IOException; 020import java.io.ObjectInputStream; 021import java.io.ObjectOutputStream; 022import java.io.Serializable; 023import java.util.AbstractCollection; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Iterator; 027import java.util.NoSuchElementException; 028import java.util.Objects; 029import java.util.Queue; 030 031import org.apache.commons.collections4.BoundedCollection; 032 033/** 034 * CircularFifoQueue is a first-in first-out queue with a fixed size that 035 * replaces its oldest element if full. 036 * <p> 037 * The removal order of a {@link CircularFifoQueue} is based on the 038 * insertion order; elements are removed in the same order in which they 039 * were added. The iteration order is the same as the removal order. 040 * </p> 041 * <p> 042 * The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll()}, 043 * {@link #offer(Object)} operations all perform in constant time. 044 * All other operations perform in linear time or worse. 045 * </p> 046 * <p> 047 * This queue prevents null objects from being added. 048 * </p> 049 * 050 * @param <E> the type of elements in this collection 051 * @since 4.0 052 */ 053public class CircularFifoQueue<E> extends AbstractCollection<E> 054 implements Queue<E>, BoundedCollection<E>, Serializable { 055 056 /** Serialization version. */ 057 private static final long serialVersionUID = -8423413834657610406L; 058 059 /** Underlying storage array. */ 060 private transient E[] elements; 061 062 /** Array index of first (oldest) queue element. */ 063 private transient int start; 064 065 /** 066 * Index mod maxElements of the array position following the last queue 067 * element. Queue elements start at elements[start] and "wrap around" 068 * elements[maxElements-1], ending at elements[decrement(end)]. 069 * For example, elements = {c,a,b}, start=1, end=1 corresponds to 070 * the queue [a,b,c]. 071 */ 072 private transient int end; 073 074 /** Flag to indicate if the queue is currently full. */ 075 private transient boolean full; 076 077 /** Capacity of the queue. */ 078 private final int maxElements; 079 080 /** 081 * Constructor that creates a queue with the default size of 32. 082 */ 083 public CircularFifoQueue() { 084 this(32); 085 } 086 087 /** 088 * Constructor that creates a queue from the specified collection. 089 * The collection size also sets the queue size. 090 * 091 * @param coll the collection to copy into the queue, may not be null 092 * @throws NullPointerException if the collection is null 093 */ 094 public CircularFifoQueue(final Collection<? extends E> coll) { 095 this(coll.size()); 096 addAll(coll); 097 } 098 099 /** 100 * Constructor that creates a queue with the specified size. 101 * 102 * @param size the size of the queue (cannot be changed) 103 * @throws IllegalArgumentException if the size is < 1 104 */ 105 @SuppressWarnings("unchecked") 106 public CircularFifoQueue(final int size) { 107 if (size <= 0) { 108 throw new IllegalArgumentException("The size must be greater than 0"); 109 } 110 elements = (E[]) new Object[size]; 111 maxElements = elements.length; 112 } 113 114 /** 115 * Adds the given element to this queue. If the queue is full, the least recently added 116 * element is discarded so that a new element can be inserted. 117 * 118 * @param element the element to add 119 * @return true, always 120 * @throws NullPointerException if the given element is null 121 */ 122 @Override 123 public boolean add(final E element) { 124 Objects.requireNonNull(element, "element"); 125 126 if (isAtFullCapacity()) { 127 remove(); 128 } 129 130 elements[end++] = element; 131 132 if (end >= maxElements) { 133 end = 0; 134 } 135 136 if (end == start) { 137 full = true; 138 } 139 140 return true; 141 } 142 143 /** 144 * Clears this queue. 145 */ 146 @Override 147 public void clear() { 148 full = false; 149 start = 0; 150 end = 0; 151 Arrays.fill(elements, null); 152 } 153 154 /** 155 * Decrements the internal index. 156 * 157 * @param index the index to decrement 158 * @return the updated index 159 */ 160 private int decrement(int index) { 161 index--; 162 if (index < 0) { 163 index = maxElements - 1; 164 } 165 return index; 166 } 167 168 @Override 169 public E element() { 170 if (isEmpty()) { 171 throw new NoSuchElementException("queue is empty"); 172 } 173 return peek(); 174 } 175 176 /** 177 * Returns the element at the specified position in this queue. 178 * 179 * @param index the position of the element in the queue 180 * @return the element at position {@code index} 181 * @throws NoSuchElementException if the requested position is outside the range [0, size) 182 */ 183 public E get(final int index) { 184 final int sz = size(); 185 if (index < 0 || index >= sz) { 186 throw new NoSuchElementException( 187 String.format("The specified index %1$d is outside the available range [0, %2$d)", 188 Integer.valueOf(index), Integer.valueOf(sz))); 189 } 190 191 final int idx = (start + index) % maxElements; 192 return elements[idx]; 193 } 194 195 /** 196 * Increments the internal index. 197 * 198 * @param index the index to increment 199 * @return the updated index 200 */ 201 private int increment(int index) { 202 index++; 203 if (index >= maxElements) { 204 index = 0; 205 } 206 return index; 207 } 208 209 /** 210 * Returns {@code true} if the capacity limit of this queue has been reached, 211 * i.e. the number of elements stored in the queue equals its maximum size. 212 * 213 * @return {@code true} if the capacity limit has been reached, {@code false} otherwise 214 * @since 4.1 215 */ 216 public boolean isAtFullCapacity() { 217 return size() == maxElements; 218 } 219 220 /** 221 * Returns true if this queue is empty; false otherwise. 222 * 223 * @return true if this queue is empty 224 */ 225 @Override 226 public boolean isEmpty() { 227 return size() == 0; 228 } 229 230 /** 231 * {@inheritDoc} 232 * <p> 233 * A {@code CircularFifoQueue} can never be full, thus this returns always 234 * {@code false}. 235 * 236 * @return always returns {@code false} 237 */ 238 @Override 239 public boolean isFull() { 240 return false; 241 } 242 243 /** 244 * Returns an iterator over this queue's elements. 245 * 246 * @return an iterator over this queue's elements 247 */ 248 @Override 249 public Iterator<E> iterator() { 250 return new Iterator<E>() { 251 252 private int index = start; 253 private int lastReturnedIndex = -1; 254 private boolean isFirst = full; 255 256 @Override 257 public boolean hasNext() { 258 return isFirst || index != end; 259 } 260 261 @Override 262 public E next() { 263 if (!hasNext()) { 264 throw new NoSuchElementException(); 265 } 266 isFirst = false; 267 lastReturnedIndex = index; 268 index = increment(index); 269 return elements[lastReturnedIndex]; 270 } 271 272 @Override 273 public void remove() { 274 if (lastReturnedIndex == -1) { 275 throw new IllegalStateException(); 276 } 277 278 // First element can be removed quickly 279 if (lastReturnedIndex == start) { 280 CircularFifoQueue.this.remove(); 281 lastReturnedIndex = -1; 282 return; 283 } 284 285 int pos = lastReturnedIndex + 1; 286 if (start < lastReturnedIndex && pos < end) { 287 // shift in one part 288 System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos); 289 } else { 290 // Other elements require us to shift the subsequent elements 291 while (pos != end) { 292 if (pos >= maxElements) { 293 elements[pos - 1] = elements[0]; 294 pos = 0; 295 } else { 296 elements[decrement(pos)] = elements[pos]; 297 pos = increment(pos); 298 } 299 } 300 } 301 302 lastReturnedIndex = -1; 303 end = decrement(end); 304 elements[end] = null; 305 full = false; 306 index = decrement(index); 307 } 308 309 }; 310 } 311 312 /** 313 * Gets the maximum size of the collection (the bound). 314 * 315 * @return the maximum number of elements the collection can hold 316 */ 317 @Override 318 public int maxSize() { 319 return maxElements; 320 } 321 322 /** 323 * Adds the given element to this queue. If the queue is full, the least recently added 324 * element is discarded so that a new element can be inserted. 325 * 326 * @param element the element to add 327 * @return true, always 328 * @throws NullPointerException if the given element is null 329 */ 330 @Override 331 public boolean offer(final E element) { 332 return add(element); 333 } 334 335 @Override 336 public E peek() { 337 if (isEmpty()) { 338 return null; 339 } 340 return elements[start]; 341 } 342 343 @Override 344 public E poll() { 345 if (isEmpty()) { 346 return null; 347 } 348 return remove(); 349 } 350 351 /** 352 * Deserializes the queue in using a custom routine. 353 * 354 * @param in the input stream 355 * @throws IOException if an I/O error occurs while writing to the output stream 356 * @throws ClassNotFoundException if the class of a serialized object cannot be found 357 */ 358 @SuppressWarnings("unchecked") 359 private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { 360 in.defaultReadObject(); 361 elements = (E[]) new Object[maxElements]; 362 final int size = in.readInt(); 363 for (int i = 0; i < size; i++) { 364 elements[i] = (E) in.readObject(); 365 } 366 start = 0; 367 full = size == maxElements; 368 if (full) { 369 end = 0; 370 } else { 371 end = size; 372 } 373 } 374 375 @Override 376 public E remove() { 377 if (isEmpty()) { 378 throw new NoSuchElementException("queue is empty"); 379 } 380 381 final E element = elements[start]; 382 if (null != element) { 383 elements[start++] = null; 384 385 if (start >= maxElements) { 386 start = 0; 387 } 388 full = false; 389 } 390 return element; 391 } 392 393 /** 394 * Returns the number of elements stored in the queue. 395 * 396 * @return this queue's size 397 */ 398 @Override 399 public int size() { 400 int size = 0; 401 402 if (end < start) { 403 size = maxElements - start + end; 404 } else if (end == start) { 405 size = full ? maxElements : 0; 406 } else { 407 size = end - start; 408 } 409 410 return size; 411 } 412 413 /** 414 * Serializes this object to an ObjectOutputStream. 415 * 416 * @param out the target ObjectOutputStream. 417 * @throws IOException thrown when an I/O errors occur writing to the target stream. 418 */ 419 private void writeObject(final ObjectOutputStream out) throws IOException { 420 out.defaultWriteObject(); 421 out.writeInt(size()); 422 for (final E e : this) { 423 out.writeObject(e); 424 } 425 } 426 427}