/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Objects; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.LongConsumer; /** * An extension of {@link Consumer} used to conduct values through the stages of * a stream pipeline, with additional methods to manage size information, * control flow, etc. Before calling the {@code accept()} method on a * {@code Sink} for the first time, you must first call the {@code begin()} * method to inform it that data is coming (optionally informing the sink how * much data is coming), and after all data has been sent, you must call the * {@code end()} method. After calling {@code end()}, you should not call * {@code accept()} without again calling {@code begin()}. {@code Sink} also * offers a mechanism by which the sink can cooperatively signal that it does * not wish to receive any more data (the {@code cancellationRequested()} * method), which a source can poll before sending more data to the * {@code Sink}. * *
A sink may be in one of two states: an initial state and an active state. * It starts out in the initial state; the {@code begin()} method transitions * it to the active state, and the {@code end()} method transitions it back into * the initial state, where it can be re-used. Data-accepting methods (such as * {@code accept()} are only valid in the active state. * * @apiNote * A stream pipeline consists of a source, zero or more intermediate stages * (such as filtering or mapping), and a terminal stage, such as reduction or * for-each. For concreteness, consider the pipeline: * *
{@code * int longestStringLengthStartingWithA * = strings.stream() * .filter(s -> s.startsWith("A")) * .mapToInt(String::length) * .max(); * }* *
Here, we have three stages, filtering, mapping, and reducing. The * filtering stage consumes strings and emits a subset of those strings; the * mapping stage consumes strings and emits ints; the reduction stage consumes * those ints and computes the maximal value. * *
A {@code Sink} instance is used to represent each stage of this pipeline, * whether the stage accepts objects, ints, longs, or doubles. Sink has entry * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do * not need a specialized interface for each primitive specialization. (It * might be called a "kitchen sink" for this omnivorous tendency.) The entry * point to the pipeline is the {@code Sink} for the filtering stage, which * sends some elements "downstream" -- into the {@code Sink} for the mapping * stage, which in turn sends integral values downstream into the {@code Sink} * for the reduction stage. The {@code Sink} implementations associated with a * given stage is expected to know the data type for the next stage, and call * the correct {@code accept} method on its downstream {@code Sink}. Similarly, * each stage must implement the correct {@code accept} method corresponding to * the data type it accepts. * *
The specialized subtypes such as {@link Sink.OfInt} override * {@code accept(Object)} to call the appropriate primitive specialization of * {@code accept}, implement the appropriate primitive specialization of * {@code Consumer}, and re-abstract the appropriate primitive specialization of * {@code accept}. * *
The chaining subtypes such as {@link ChainedInt} not only implement * {@code Sink.OfInt}, but also maintain a {@code downstream} field which * represents the downstream {@code Sink}, and implement the methods * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to * delegate to the downstream {@code Sink}. Most implementations of * intermediate operations will use these chaining wrappers. For example, the * mapping stage in the above example would look like: * *
{@code * IntSink is = new Sink.ChainedReference(sink) { * public void accept(U u) { * downstream.accept(mapper.applyAsInt(u)); * } * }; * }* *
Here, we implement {@code Sink.ChainedReference}, meaning that we expect
* to receive elements of type {@code U} as input, and pass the downstream sink
* to the constructor. Because the next stage expects to receive integers, we
* must call the {@code accept(int)} method when emitting values to the downstream.
* The {@code accept()} method applies the mapping function from {@code U} to
* {@code int} and passes the resulting value to the downstream {@code Sink}.
*
* @param Prior to this call, the sink must be in the initial state, and after
* this call it is in the active state.
*/
default void begin(long size) {}
/**
* Indicates that all elements have been pushed. If the {@code Sink} is
* stateful, it should send any stored state downstream at this time, and
* should clear any accumulated state (and associated resources).
*
* Prior to this call, the sink must be in the active state, and after
* this call it is returned to the initial state.
*/
default void end() {}
/**
* Indicates that this {@code Sink} does not wish to receive any more data.
*
* @implSpec The default implementation always returns false.
*
* @return true if cancellation is requested
*/
default boolean cancellationRequested() {
return false;
}
/**
* Accepts an int value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept int values
*/
default void accept(int value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* Accepts a long value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept long values
*/
default void accept(long value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* Accepts a double value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept double values
*/
default void accept(double value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* {@code Sink} that implements {@code Sink