/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountedCompleter; import java.util.concurrent.ForkJoinTask; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.IntFunction; import java.util.function.LongConsumer; /** * Factory for creating instances of {@code TerminalOp} that perform an * action for every element of a stream. Supported variants include unordered * traversal (elements are provided to the {@code Consumer} as soon as they are * available), and ordered traversal (elements are provided to the * {@code Consumer} in encounter order.) * *
Elements are provided to the {@code Consumer} on whatever thread and * whatever order they become available. For ordered traversals, it is * guaranteed that processing an element happens-before processing * subsequent elements in the encounter order. * *
Exceptions occurring as a result of sending an element to the
* {@code Consumer} will be relayed to the caller and traversal will be
* prematurely terminated.
*
* @since 1.8
*/
final class ForEachOps {
private ForEachOps() { }
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of a stream.
*
* @param action the {@code Consumer} that receives all elements of a
* stream
* @param ordered whether an ordered traversal is requested
* @param This terminal operation is stateless. For parallel evaluation, each
* leaf instance of a {@code ForEachTask} will send elements to the same
* {@code TerminalSink} reference that is an instance of this class.
*
* @param Void evaluateSequential(PipelineHelper spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
public Void evaluateParallel(PipelineHelper spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// TerminalSink
@Override
public Void get() {
return null;
}
// Implementations
/** Implementation class for reference streams */
static final class OfRef extends CountedCompleter spliterator;
private final Sink sink;
private final PipelineHelper spliterator,
Sink sink) {
super(null);
this.sink = sink;
this.helper = helper;
this.spliterator = spliterator;
this.targetSize = 0L;
}
ForEachTask(ForEachTask parent, Spliterator spliterator) {
super(parent);
this.spliterator = spliterator;
this.sink = parent.sink;
this.targetSize = parent.targetSize;
this.helper = parent.helper;
}
// Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
Spliterator rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink taskSink = sink;
ForEachTask task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
}
/**
* A {@code ForkJoinTask} for performing a parallel for-each operation
* which visits the elements in encounter order
*/
@SuppressWarnings("serial")
static final class ForEachOrderedTask extends CountedCompleter spliterator;
private final long targetSize;
private final ConcurrentHashMap> completionMap;
private final Sink leftPredecessor;
private Node spliterator,
Sink parent,
Spliterator spliterator,
ForEachOrderedTask leftPredecessor) {
super(parent);
this.helper = parent.helper;
this.spliterator = spliterator;
this.targetSize = parent.targetSize;
this.completionMap = parent.completionMap;
this.action = parent.action;
this.leftPredecessor = leftPredecessor;
}
@Override
public final void compute() {
doCompute(this);
}
private static void doCompute(ForEachOrderedTask task) {
Spliterator rightSplit = task.spliterator, leftSplit;
long sizeThreshold = task.targetSize;
boolean forkRight = false;
while (rightSplit.estimateSize() > sizeThreshold &&
(leftSplit = rightSplit.trySplit()) != null) {
ForEachOrderedTask leftChild =
new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
ForEachOrderedTask rightChild =
new ForEachOrderedTask<>(task, rightSplit, leftChild);
// Fork the parent task
// Completion of the left and right children "happens-before"
// completion of the parent
task.addToPendingCount(1);
// Completion of the left child "happens-before" completion of
// the right child
rightChild.addToPendingCount(1);
task.completionMap.put(leftChild, rightChild);
// If task is not on the left spine
if (task.leftPredecessor != null) {
/*
* Completion of left-predecessor, or left subtree,
* "happens-before" completion of left-most leaf node of
* right subtree.
* The left child's pending count needs to be updated before
* it is associated in the completion map, otherwise the
* left child can complete prematurely and violate the
* "happens-before" constraint.
*/
leftChild.addToPendingCount(1);
// Update association of left-predecessor to left-most
// leaf node of right subtree
if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
// If replaced, adjust the pending count of the parent
// to complete when its children complete
task.addToPendingCount(-1);
} else {
// Left-predecessor has already completed, parent's
// pending count is adjusted by left-predecessor;
// left child is ready to complete
leftChild.addToPendingCount(-1);
}
}
ForEachOrderedTask taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
}
/*
* Task's pending count is either 0 or 1. If 1 then the completion
* map will contain a value that is task, and two calls to
* tryComplete are required for completion, one below and one
* triggered by the completion of task's left-predecessor in
* onCompletion. Therefore there is no data race within the if
* block.
*/
if (task.getPendingCount() > 0) {
// Cannot complete just yet so buffer elements into a Node
// for use when completion occurs
@SuppressWarnings("unchecked")
IntFunction leftDescendant = completionMap.remove(this);
if (leftDescendant != null)
leftDescendant.tryComplete();
}
}
}