| Matching Statements |
| File1 Line# |
File2 Line# |
Statement |
| 8 | 7 | package java.util.concurrent |
| 9 | 8 | import java.util.concurrent.atomic.* |
| 10 | 9 | import java.util.concurrent.locks.* |
| 11 | 10 | import java.util.* |
| 45 | 44 | public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
| 46 | 45 | implements BlockingQueue<E>, java.io.Serializable { |
| 47 | 46 | private static final long serialVersionUID = -6903933977591709194L |
| 66 | 65 | static class Node<E> { |
| 68 | 67 | volatile E item |
| 69 | 68 | Node<E> next |
| 70 | 69 | Node(E x) { item = x |
| 74 | 73 | private final int capacity |
| 77 | 76 | private final AtomicInteger count = new AtomicInteger(0) |
| 80 | 79 | private transient Node<E> head |
| 83 | 82 | private transient Node<E> last |
| 86 | 85 | private final ReentrantLock takeLock = new ReentrantLock() |
| 89 | 88 | private final Condition notEmpty = takeLock.newCondition() |
| 92 | 91 | private final ReentrantLock putLock = new ReentrantLock() |
| 95 | 94 | private final Condition notFull = putLock.newCondition() |
| 101 | 100 | private void signalNotEmpty() { |
102 354 382 416 437 | 101 354 382 416 437 | final ReentrantLock takeLock = this.takeLock |
103 149 417 438 617 636 658 | 102 150 417 438 694 713 735 | takeLock.lock() |
105 361 368 390 398 423 | 104 361 368 390 398 423 | notEmpty.signal() |
107 156 370 403 426 446 623 647 676 | 106 157 370 403 426 446 700 724 755 | takeLock.unlock() |
| 114 | 113 | private void signalNotFull() { |
115 238 288 332 | 114 240 286 332 | final ReentrantLock putLock = this.putLock |
116 148 333 616 635 657 | 115 149 333 693 712 734 | putLock.lock() |
118 255 261 297 305 339 | 117 257 263 295 303 339 | notFull.signal() |
120 157 263 310 342 624 648 677 | 119 158 265 308 342 701 725 756 | putLock.unlock() |
| 128 | 127 | private void insert(E x) { |
| 129 | 128 | last = last.next = new Node<E>(x) |
| 136 | 135 | private E extract() { |
137 440 | 440 | Node<E> first = head.next |
| 138 | 139 | head = first |
| 139 | 140 | E x = first.item |
| 140 | 141 | first.item = null |
141 374 407 430 645 | 142 374 407 430 722 | return x |
| 147 | 148 | private void fullyLock() { |
| 155 | 156 | private void fullyUnlock() { |
| 165 | 166 | public LinkedBlockingQueue() { |
166 192 | 167 194 | this(Integer.MAX_VALUE) |
| 176 | 177 | public LinkedBlockingQueue(int capacity) { |
| 177 | 178 | if (capacity <= 0) throw new IllegalArgumentException() |
| 178 | 179 | this.capacity = capacity |
179 720 | 180 799 | last = head = new Node<E>(null) |
| 191 | 193 | public LinkedBlockingQueue(Collection<? extends E> c) { |
| 205 | 207 | public int size() { |
| 206 | 208 | return count.get() |
| 222 | 224 | public int remainingCapacity() { |
| 223 | 225 | return capacity - count.get() |
237 287 331 352 379 415 | 239 285 331 352 379 415 | int c = -1 |
239 289 328 353 381 411 | 241 287 328 353 381 411 | final AtomicInteger count = this.count |
240 290 | 242 288 | putLock.lockInterruptibly() |
| 252 | 254 | while (count.get() == capacity) |
| 253 | 255 | notFull.await() |
254 304 360 397 | 256 302 360 397 | } catch (InterruptedException ie) { |
256 306 362 399 | 258 304 362 399 | throw ie |
259 295 337 | 261 293 337 | c = count.getAndIncrement() |
260 296 338 | 262 294 338 | if (c + 1 < capacity) |
265 312 344 | 267 310 344 | if (c == 0) |
266 313 345 | 268 311 345 | signalNotEmpty() |
| 283 | 281 | throws InterruptedException { |
286 380 | 284 380 | long nanos = unit.toNanos(timeout) |
293 335 | 291 335 | if (count.get() < capacity) { |
300 393 | 298 393 | if (nanos <= 0) |
| 303 | 301 | nanos = notFull.awaitNanos(nanos) |
| 329 | 329 | if (count.get() == capacity) |
| 346 | 346 | return c >= 0 |
| 350 | 350 | public E take() throws InterruptedException { |
| 351 | 351 | E x |
355 383 | 355 383 | takeLock.lockInterruptibly() |
| 358 | 358 | while (count.get() == 0) |
| 359 | 359 | notEmpty.await() |
365 387 420 | 365 387 420 | x = extract() |
366 388 421 | 366 388 421 | c = count.getAndDecrement() |
367 389 422 | 367 389 422 | if (c > 1) |
372 405 428 672 | 372 405 428 751 | if (c == capacity) |
373 406 429 | 373 406 429 | signalNotFull() |
| 377 | 377 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
378 414 | 378 414 | E x = null |
386 419 | 386 419 | if (count.get() > 0) { |
| 396 | 396 | nanos = notEmpty.awaitNanos(nanos) |
| 410 | 410 | public E poll() { |
412 435 | 412 435 | if (count.get() == 0) |
| 434 | 434 | public E peek() { |
| 441 | 441 | if (first == null) |
| 444 | 444 | return first.item |
| 454 | 461 | public boolean remove(Object o) { |
| 455 | 462 | if (o == null) return false |
| 456 | 463 | boolean removed = false |
457 482 496 513 526 542 568 693 | 464 504 554 573 586 610 642 772 | fullyLock() |
459 662 | 466 739 | Node<E> trail = head |
460 571 663 | 467 645 740 | Node<E> p = head.next |
| 461 | 468 | while (p != null) { |
| 462 | 469 | if (o.equals(p.item)) { |
| 463 | 470 | removed = true |
466 665 | 473 742 | trail = p |
467 575 666 | 474 649 743 | p = p.next |
| 469 | 476 | if (removed) { |
470 555 574 669 | 477 625 648 746 | p.item = null |
471 670 | 478 747 | trail.next = p.next |
| 472 | 481 | if (count.getAndDecrement() == capacity) |
473 530 547 581 673 | 482 592 617 658 752 | notFull.signalAll() |
476 491 508 517 532 549 585 705 | 485 513 568 577 594 619 662 784 | fullyUnlock() |
| 478 | 487 | return removed |
| 481 | 503 | public Object[] toArray() { |
484 498 | 506 556 | int size = count.get() |
| 485 | 507 | Object[] a = new Object[size] |
486 503 | 508 561 | int k = 0 |
487 699 | 509 778 | for (Node<E> p = head.next |
487 504 553 699 | 509 562 623 778 | p != null |
487 504 699 | 509 562 778 | p = p.next) |
| 488 | 510 | a[k++] = p.item |
489 506 | 511 566 | return a |
| 495 | 553 | public <T> T[] toArray(T[] a) { |
| 499 | 557 | if (a.length < size) |
| 500 | 558 | a = (T[])java.lang.reflect.Array.newInstance |
| 501 | 559 | (a.getClass().getComponentType(), size) |
| 504 | 562 | for (Node p = head.next |
| 505 | 563 | a[k++] = (T)p.item |
| 512 | 572 | public String toString() { |
| 515 | 575 | return super.toString() |
| 525 | 585 | public void clear() { |
528 545 | 588 613 | head.next = null |
529 546 | 591 616 | if (count.getAndSet(0) == capacity) |
| 536 | 604 | public int drainTo(Collection<? super E> c) { |
537 562 | 605 638 | if (c == null) |
538 563 | 606 639 | throw new NullPointerException() |
539 564 | 607 640 | if (c == this) |
540 565 | 608 641 | throw new IllegalArgumentException() |
| 544 | 612 | first = head.next |
552 570 | 622 644 | int n = 0 |
| 553 | 623 | for (Node<E> p = first |
| 553 | 623 | p = p.next) { |
554 573 | 624 647 | c.add(p.item) |
556 576 | 626 650 | ++n |
558 583 | 628 660 | return n |
| 561 | 637 | public int drainTo(Collection<? super E> c, int maxElements) { |
| 572 | 646 | while (p != null && n < maxElements) { |
| 578 | 652 | if (n != 0) { |
| 579 | 653 | head.next = p |
| 580 | 657 | if (count.getAndAdd(-n) == capacity) |
| 599 | 676 | public Iterator<E> iterator() { |
| 600 | 677 | return new Itr() |
| 603 | 680 | private class Itr implements Iterator<E> { |
| 609 | 686 | private Node<E> current |
| 610 | 687 | private Node<E> lastRet |
| 611 | 688 | private E currentElement |
| 613 | 690 | Itr() { |
614 633 655 | 691 710 732 | final ReentrantLock putLock = LinkedBlockingQueue.this.putLock |
615 634 656 | 692 711 733 | final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock |
| 619 | 696 | current = head.next |
620 643 | 697 720 | if (current != null) |
621 644 | 698 721 | currentElement = current.item |
| 628 | 705 | public boolean hasNext() { |
| 629 | 706 | return current != null |
| 632 | 709 | public E next() { |
| 638 | 715 | if (current == null) |
| 639 | 716 | throw new NoSuchElementException() |
| 640 | 717 | E x = currentElement |
| 641 | 718 | lastRet = current |
| 642 | 719 | current = current.next |
| 652 | 729 | public void remove() { |
| 653 | 730 | if (lastRet == null) |
| 654 | 731 | throw new IllegalStateException() |
| 660 | 737 | Node<E> node = lastRet |
| 661 | 738 | lastRet = null |
| 664 | 741 | while (p != null && p != node) { |
| 668 | 745 | if (p == node) { |
| 671 | 750 | int c = count.getAndDecrement() |
| 690 | 769 | private void writeObject(java.io.ObjectOutputStream s) |
| 691 | 770 | throws java.io.IOException { |
| 696 | 775 | s.defaultWriteObject() |
| 700 | 779 | s.writeObject(p.item) |
| 703 | 782 | s.writeObject(null) |
| 714 | 793 | private void readObject(java.io.ObjectInputStream s) |
| 715 | 794 | throws java.io.IOException, ClassNotFoundException { |
| 717 | 796 | s.defaultReadObject() |
| 719 | 798 | count.set(0) |
| 724 | 803 | E item = (E)s.readObject() |
| 725 | 804 | if (item == null) |
| 727 | 806 | add(item) |
| Matching Comments and Strings |
| File1 Line# |
File2 Line# |
Comment/String |
| 14 | 17 | * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on |
| 15 | 18 | * linked nodes. |
| 16 | 19 | * This queue orders elements FIFO (first-in-first-out). |
| 17 | 20 | * The <em>head</em> of the queue is that element that has been on the |
| 18 | 21 | * queue the longest time. |
| 19 | 22 | * The <em>tail</em> of the queue is that element that has been on the |
| 20 | 23 | * queue the shortest time. New elements |
| 21 | 24 | * are inserted at the tail of the queue, and the queue retrieval |
| 22 | 25 | * operations obtain elements at the head of the queue. |
| 23 | 26 | * Linked queues typically have higher throughput than array-based queues but |
| 24 | 27 | * less predictable performance in most concurrent applications. |
| 26 | 29 | * <p> The optional capacity bound constructor argument serves as a |
| 27 | 30 | * way to prevent excessive queue expansion. The capacity, if unspecified, |
| 28 | 31 | * is equal to {@link Integer#MAX_VALUE}. Linked nodes are |
| 29 | 32 | * dynamically created upon each insertion unless this would bring the |
| 30 | 33 | * queue above capacity. |
| 32 | 35 | * <p>This class and its iterator implement all of the |
| 33 | 36 | * <em>optional</em> methods of the {@link Collection} and {@link |
| 34 | 37 | * Iterator} interfaces. |
| 40 | 39 | * @since 1.5 |
| 41 | 40 | * @author Doug Lea |
| 42 | 41 | * @param <E> the type of elements held in this collection |
| 50 | 49 | * A variant of the "two lock queue" algorithm. The putLock gates |
| 51 | 50 | * entry to put (and offer), and has an associated condition for |
| 52 | 51 | * waiting puts. Similarly for the takeLock. The "count" field |
| 53 | 52 | * that they both rely on is maintained as an atomic to avoid |
| 54 | 53 | * needing to get both locks in most cases. Also, to minimize need |
| 55 | 54 | * for puts to get takeLock and vice-versa, cascading notifies are |
| 56 | 55 | * used. When a put notices that it has enabled at least one take, |
| 57 | 56 | * it signals taker. That taker in turn signals others if more |
| 58 | 57 | * items have been entered since the signal. And symmetrically for |
| 59 | 58 | * takes signalling puts. Operations such as remove(Object) and |
| 60 | 59 | * iterators acquire both locks. |
| 64 | 63 | * Linked list node class |
| 67 | 66 | * The item, volatile to ensure barrier separating write and read |
| 73 | 72 | * The capacity bound, or Integer.MAX_VALUE if none |
| 76 | 75 | * Current number of elements |
| 79 | 78 | * Head of linked list |
| 82 | 81 | * Tail of linked list |
| 85 | 84 | * Lock held by take, poll, etc |
| 88 | 87 | * Wait queue for waiting takes |
| 91 | 90 | * Lock held by put, offer, etc |
| 94 | 93 | * Wait queue for waiting puts |
| 99 | 98 | * otherwise ordinarily lock takeLock.) |
| 126 | 125 | * @param x the item |
| 134 | 133 | * @return the node |
| 145 | 146 | * Lock to prevent both puts and takes. |
| 153 | 154 | * Unlock to allow both puts and takes. |
162 183 | 163 184 | * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of |
| 163 | 164 | * {@link Integer#MAX_VALUE}. |
| 170 | 171 | * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. |
| 173 | 174 | * @throws IllegalArgumentException if <tt>capacity</tt> is not greater |
| 184 | 185 | * {@link Integer#MAX_VALUE}, initially containing the elements of the |
| 185 | 186 | * given collection, |
| 186 | 187 | * added in traversal order of the collection's iterator. |
| 187 | 189 | * @param c the collection of elements to initially contain |
| 198 | 200 | this doc comment is overridden to remove the reference to collections |
| 199 | 201 | greater in size than Integer.MAX_VALUE |
| 201 | 203 | * Returns the number of elements in this queue. |
| 209 | 211 | this doc comment is a modified copy of the inherited doc comment, |
| 210 | 212 | without the reference to unlimited queues. |
| 214 | 216 | * blocking. This is always equal to the initial capacity of this queue |
| 215 | 217 | * less the current <tt>size</tt> of this queue. |
| 228 | 230 | * necessary for space to become available. |
| 235 | 237 | Note: convention in all put/take/etc is to preset |
| 236 | 238 | local var holding count negative to indicate failure unless set. |
| 243 | 245 | * Note that count is used in wait guard even though it is |
| 244 | 246 | * not protected by lock. This works because count can |
| 245 | 247 | * only decrease at this point (all other puts are shut |
| 246 | 248 | * out by lock), and we (or some other waiting put) are |
| 247 | 249 | * signalled if it ever changes from |
| 248 | 250 | * capacity. Similarly for all other uses of count in |
| 249 | 251 | * other wait guards. |
255 305 361 398 | 257 303 361 398 | propagate to a non-interrupted thread |
| 270 | 229 272 | * Inserts the specified element at the tail of this queue, waiting if |
| 271 | 273 | * necessary up to the specified wait time for space to become available. |
| 277 | 275 | * @return <tt>true</tt> if successful, or <tt>false</tt> if |
| 278 | 276 | * the specified waiting time elapses before space is available. |
| 522 | 582 | * Atomically removes all of the elements from this queue. |
| 523 | 583 | * The queue will be empty after this call returns. |
| 551 | 621 | Transfer the elements outside of locks |
| 590 | 667 | * Returns an iterator over the elements in this queue in proper sequence. |
| 591 | 668 | * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that |
| 593 | 670 | * and guarantees to traverse elements as they existed upon |
| 594 | 671 | * construction of the iterator, and may (but is not guaranteed to) |
| 595 | 672 | * reflect any modifications subsequent to construction. |
| 605 | 682 | * Basic weak-consistent iterator. At all times hold the next |
| 606 | 683 | * item to hand out so that if hasNext() reports true, we will |
| 607 | 684 | * still have it to return even if lost race with a take etc. |
| 683 | 762 | * Save the state to a stream (that is, serialize it). |
| 685 | 764 | * @serialData The capacity is emitted (int), followed by all of |
| 686 | 765 | * its elements (each an <tt>Object</tt>) in the proper order, |
| 687 | 766 | * followed by a null |
688 712 | 767 791 | * @param s the stream |
| 695 | 774 | Write out any hidden stuff, plus capacity |
| 698 | 777 | Write out all elements in the proper order. |
| 702 | 781 | Use trailing null as sentinel |
| 710 | 789 | * Reconstitute this queue instance from a stream (that is, |
| 711 | 790 | * deserialize it). |
| 716 | 795 | Read in capacity, and any hidden stuff |
| 722 | 801 | Read in all elements and place in queue |