0. 이 글을 적는 이유
제곧내..라고 하기엔 좀 없어 보이는데 진짜로 java(jdk 17 기준)에서 List를 대상으로 stream을 사용해 간단한 연산을 진행하는 경우 코드에서는 어떤 일이 발생하는지 궁금해서 메모하기 위함
1. 최초 진입(stream open)
이런 데이터가 있다고 가정을 해보자.
List<Integer> tmpList = new ArrayList<>();
그리고 내부에는 데이터가 1부터 100까지 데이터가 있다고 생각해 보자. 이 데이터를 조작하기 위해 아래와 같은 코드를 작성했다.
tmpList.stream().filter(i -> i % 2 == 0).map(i -> i * 4).toList();
그러면 최초에는 stream()을 통해 함수형 프로그래밍을 시작한다. 이 stream함수는 Collection Interface에 default 함수로 잡혀있는 것으로 구현은 아래와 같다.
Collection.java
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
StreamSupport.stream에서 2번째 인자는 parallel여부이니 우선은 신경 쓰지 말도록 하자.
Collection.java
@Override
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
spliterator를 호출하면서 this키워드를 통해 본인 객체를 넘겨주고 있다. 그러면 저 spliterator는 무엇을 하는 건가
Spliterators.java
public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
int characteristics) {
return new IteratorSpliterator<>(Objects.requireNonNull(c),
characteristics);
}
Spliterators.java에 static함수로 되어있는 <T> Spliterator<T> spliterator에서 첫 번째 인자에 조금 전 tmpList 객체가 넘어왔다. 이후 IteratorSpliterator객체를 반환해주고 있다. 추적해 보자.
Spliterators.java
public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {
this.collection = collection;
this.it = null;
this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
: characteristics;
}
이 객체에서 현재 list와 iterator, characteristics에 대해 관리하고 있는데 우리는 함수 내부적으로 두 번째 인자의 characteristics로 0이 넘어온 것을 위에서 확인했다.
spliterator()의 역할은 반복작업을 할 수 있도록 현재 컬렉션 객체와 iterator, Spliterator의 특성을 관리하고 있는 객체를 반환해 주는 것으로 이해를 하고 넘어가 보자.
이제 이렇게 생성된 Spliterator가 StreamSupport 클래스의 stream함수에 인자로 들어간다. 다시 코드를 이어서 확인해 보자.
Collection.java
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
StreamSupport.java
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
일단 spliterator가 null인지 아닌지 체크를 진행하고 이후 ReferencePipeline이라는 클래스 내부 클래스로 존재하는 Head라는 클래스를 반환한다. 이 내부 클래스는 ReferencePipeline을 상속받고 있다.
ReferencePipeline.java
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
/**
* Constructor for the source stage of a Stream.
*
* @param source {@code Supplier<Spliterator>} describing the stream
* source
* @param sourceFlags the source flags for the stream source, described
* in {@link StreamOpFlag}
*/
Head(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
/**
* Constructor for the source stage of a Stream.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described
* in {@link StreamOpFlag}
*/
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
Head에서는 전달받은 source를 다시 부모클래스의 생성자로 올려주고 있다. 추적해 보자.
RefrencePipeline.java
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Supplier<Spliterator>} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
ReferencePipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags The source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
우리는 Spliterator<?> 타입을 받는 생성자로 올려주었다. 여기서도 다시 부모클래스의 생성자로 올려주고 있다. 추적해 보자.
AbstractPipeline.java
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
source라는 이름으로 올라간 Spliterator는 sourceSpliterator에 저장이 되었다.
우리는 지금 Supplier<> 타입이 아닌 Spliterator<>타입을 인자로 받는 생성자를 호출했다. 즉 여기까지가 stream()을 호출했을 때 발생하는 일이다.
그럼 여기서 previousStage와 sourceStage가 어떤 걸 의미하는지 확인을 해야 한다.
AbstractPipeline.java
/**
* Backlink to the head of the pipeline chain (self if this is the source
* stage).
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
/**
* The "upstream" pipeline, or null if this is the source stage.
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
sourceStage는 pipeline chain의 헤드로 이동할 수 있는 Backlink용 필드라고 되어있고
previousStage는 upstream pipeline이라고 되어있다.
즉 오히려 previousStage가 현재 단계이고 sourceStage는 헤드로 이동하기 위한 Backlink 필드로 이해를 하면 될 것 같다.
요약
stream() 호출하면 ReferencePipeline 객체 생성 후 진행한다.
우리가 기존에 사용하던 collection은 source라는 이름의 인자로 AbstractPipeline까지 올라간 다음 sourceSpliterator에 저장이 된다.
이 pipeline 단계에서 이전에 했던 것은 없으므로 previousStage는 null로 처리되었고 sourceStage는 현재 객체로 처리되었다.
그러면 이제 filter에서는 어떤 일이 발생하는지 확인해 보자.
2. filter
ReferencePipeline.java 내 존재하는 filter함수를 호출한다. 위에서 우리는 마지막으로 반환된 Head가 ReferencePipeline을 상속받은 자식 클래스라는 것을 확인했다. 이 filter에서는 predicate를 받고 있다.
ReferencePipeline.java
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
그러면 이 Predicate란 도대체 무엇인가?
구현은 이렇게 되어있다.
/**
* Represents a predicate (boolean-valued function) of one argument.
*
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #test(Object)}.
*
* @param <T> the type of the input to the predicate
*
* @since 1.8
*/
@FunctionalInterface
public interface Predicate<T> {
/**
* Evaluates this predicate on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate,
* otherwise {@code false}
*/
boolean test(T t);
/**
* Returns a composed predicate that represents a short-circuiting logical
* AND of this predicate and another. When evaluating the composed
* predicate, if this predicate is {@code false}, then the {@code other}
* predicate is not evaluated.
*
* <p>Any exceptions thrown during evaluation of either predicate are relayed
* to the caller; if evaluation of this predicate throws an exception, the
* {@code other} predicate will not be evaluated.
*
* @param other a predicate that will be logically-ANDed with this
* predicate
* @return a composed predicate that represents the short-circuiting logical
* AND of this predicate and the {@code other} predicate
* @throws NullPointerException if other is null
*/
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
/**
* Returns a predicate that represents the logical negation of this
* predicate.
*
* @return a predicate that represents the logical negation of this
* predicate
*/
default Predicate<T> negate() {
return (t) -> !test(t);
}
/**
* Returns a composed predicate that represents a short-circuiting logical
* OR of this predicate and another. When evaluating the composed
* predicate, if this predicate is {@code true}, then the {@code other}
* predicate is not evaluated.
*
* <p>Any exceptions thrown during evaluation of either predicate are relayed
* to the caller; if evaluation of this predicate throws an exception, the
* {@code other} predicate will not be evaluated.
*
* @param other a predicate that will be logically-ORed with this
* predicate
* @return a composed predicate that represents the short-circuiting logical
* OR of this predicate and the {@code other} predicate
* @throws NullPointerException if other is null
*/
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
/**
* Returns a predicate that tests if two arguments are equal according
* to {@link Objects#equals(Object, Object)}.
*
* @param <T> the type of arguments to the predicate
* @param targetRef the object reference with which to compare for equality,
* which may be {@code null}
* @return a predicate that tests if two arguments are equal according
* to {@link Objects#equals(Object, Object)}
*/
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
/**
* Returns a predicate that is the negation of the supplied predicate.
* This is accomplished by returning result of the calling
* {@code target.negate()}.
*
* @param <T> the type of arguments to the specified predicate
* @param target predicate to negate
*
* @return a predicate that negates the results of the supplied
* predicate
*
* @throws NullPointerException if target is null
*
* @since 11
*/
@SuppressWarnings("unchecked")
static <T> Predicate<T> not(Predicate<? super T> target) {
Objects.requireNonNull(target);
return (Predicate<T>)target.negate();
}
}
FunctionalInterface로 내부 값이 boolean으로 참인지 거짓인지 체크를 한다고 한다. 여기서 재미있는 건 내가 내부적으로 Predicate자리에 집어넣은 람다식도 구현체로 잡힌다는 것이다.
그 이유는 java에서 -> 람다식은 FunctionalInterface일 때 사용 가능하기 때문이다. 단 하나의 추상 메서드만 구현을 하기에 ->로 접근을 하면 바로 해당 인터페이스로 안내를 해준다. 이러한 이유로 내부에서는 Predicate를 받고 있어서 저렇게 구현체가 나오는 것으로 생각된다.
다시 본론으로 들어와서 filter 메서드를 살펴보자.
ReferencePipeline.java
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
여기서는 StatelessOp 객체를 생성해서 반환해주고 있다. 그럼 이건 어떻게 생긴 놈인지 알아보자.
우선 StatelessOp라는 이름부터가 생소하다. 아마 StatelessOperator를 줄인 게 아닐까 싶다.
StatelessOp의 생성자는 이렇게 생겼다.
abstract static class StatelessOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
/**
* Construct a new Stream by appending a stateless intermediate
* operation to an existing stream.
*
* @param upstream The upstream pipeline stage
* @param inputShape The stream shape for the upstream pipeline stage
* @param opFlags Operation flags for the new stage
*/
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
upstream으로 this키워드를 이용해 스스로를 넘겨주고 다시 부모 생성자로 넘겨주었다.
/**
* Constructor for appending an intermediate operation onto an existing
* pipeline.
*
* @param upstream the upstream element source.
*/
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
마지막으로 한번 더 추적해 보자.
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
전달된 upstream은 previousStage라는 인자로 사용된다.
그리고 sourceStage는 previousStage.sourceStage로 되어있는데 이 경우는 계속 동일 객체를 driven 하다 보니 기존 sourceStage가 들어왔다.
그다음은 Sink라는 단어가 보인다. 사전적인 의미로는 가라앉다, 침몰하다, 내려가다 등의 의미가 있는데 여기서 발생한 흐름에 대해 다음 stage로 넘겨주는 게 내부적인 Sink클래스(객체)의 역할인 것 같다.
opWrapSink클래스에서는 Sink클래스의 내부 클래스로 있는 ChainedReference를 반환해주고 있다.
/**
* Abstract {@code Sink} implementation for creating chains of
* sinks. The {@code begin}, {@code end}, and
* {@code cancellationRequested} methods are wired to chain to the
* downstream {@code Sink}. This implementation takes a downstream
* {@code Sink} of unknown input shape and produces a {@code Sink<T>}. The
* implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}.
*/
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
직관적으로 알 수 있을만한 건 override된 accept함수에서는 predicate에서 true처리가 된 항목에 대해 downstream으로 넘기는 것을 알 수 있다. 재미있는 건 여기서 이전 원소들을 계속해서 가져온 다음 작업하는 코드가 보이지 않는다는 점이다. 그럼 map을 호출할 때 최종적으로 계산을 하는 것일까? 확인해 보자.
3. map
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
위 filter에서 봤던 것 과 크게 다르지 않아 보인다. 코드를 같이 놓고 비교해 보자.
여기서 mapper로 우리가 조작을 원했던 i -> i * 4 가 들어왔다.
조금 전 filter에서의 accept는 downstream.accept로 전달해 주는 역할이었고 여기서는 apply라는 새로운 함수가 호출되었다.
그렇다면 여기서는 apply라는 함수를 통해 우리가 작성한 i * 4 가 반환되고 다시 downstream으로 넘어갈 것이다.
4. toList
이제 list로 반환을 할 차례이다. toList는 아래와 같이 구현이 되어있다.
Stream.java
default List<T> toList() {
return (List<T>) Collections.unmodifiableList(new ArrayList<>(Arrays.asList(this.toArray())));
}
Stream클래스에 있는데 this를 호출하고 있다. 우리가 잠시 놓칠 수 있는 부분이겠지만 이 모든 stream(), filter(), map() 함수는 결론적으로 Stream()을 반환하고 있다.
ReferencePipeline으로 계속 설명이 나왔지만 이 클래스는 Stream인터페이스를 구현하고 있다.
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
무튼 이제 toArray함수를 살펴보자.
ReferencePipeline.java
@Override
public final Object[] toArray() {
return toArray(Object[]::new);
}
@Override
@SuppressWarnings("unchecked")
public final <A> A[] toArray(IntFunction<A[]> generator) {
// Since A has no relation to U (not possible to declare that A is an upper bound of U)
// there will be no static type checking.
// Therefore use a raw type and assume A == U rather than propagating the separation of A and U
// throughout the code-base.
// The runtime type of U is never checked for equality with the component type of the runtime type of A[].
// Runtime checking will be performed when an element is stored in A[], thus if A is not a
// super type of U an ArrayStoreException will be thrown.
@SuppressWarnings("rawtypes")
IntFunction rawGenerator = (IntFunction) generator;
return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
.asArray(rawGenerator);
}
갑자기 우리가 알던 단어들은 사라지고 다른 게 나오기 시작했다.
IntFunction<A[]> generator 부분에 Object[]::new 가 들어가면서 어떤 데이터가 들어오던 그만큼 새로 사이즈를 잡고 Array를 만들어주고 있다.
이렇게 생성된 Object Array는 generator라는 파라미터 이름에서 형변환되어 rawGenerator라는 이름으로 코드가 진행된다.
우선 evaluateToArrayNode라는 함수를 만나게 된다.
/**
* Collect the elements output from the pipeline stage.
*
* @param generator the array generator to be used to create array instances
* @return a flat array-backed Node that holds the collected output elements
*/
@SuppressWarnings("unchecked")
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
// Set the depth of this, last, pipeline stage to zero to slice the
// pipeline such that this operation will not be included in the
// upstream slice and upstream operations will not be included
// in this slice
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
우선 우리는 parallel이 false인 상태이기 때문에 else문으로 바로 넘어가자.
sourceSpliterator라는 이름이 보인다. 위에서 sourceSpliterator에 우리가 만든 Collection의 데이터가 저장되어 있다고 했다. 동일한 이름의 함수가 나오는데 코드는 아래와 같다.
AbstractPipeline.java
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
* of all computations up to and including the most recent stateful
* operation.
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
depth = 0;
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage
// based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
우선 sourceStage.sourceSpliterator가 null인 상황은 아니므로 최초 if문에서 동작을 시작한다.
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
현재 sourceSpliterator를 Spliterator<?> spliterator에 옮겨주고 현재 sourceSpliterator를 null로 초기화해 준다.
이후 코드에서는 가장 긴 부분인 parallel 조건이 우리는 false이므로 바로 다음 코드를 확인하자.
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
이 또한 우리는 위에서 terminalFlags에 0을 넣어서 전달해 줬다. 이 코드도 동작하지 않는다. 결국 우리가 조금 전 Spliterator<?> spliterator에 옮긴 sourceSpliterator만 반환이 된다.
이제 어떤 값들이 존재하는지 확인이 되었다.
else {
return evaluate(sourceSpliterator(0), true, generator);
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
evaluate함수에서는 <P_IN> Node<E_OUT>타입을 반환해주고 있다. 마찬가지로 우리는 parallel환경이 아니니 else문을 확인해 보자.
@Override
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
int flags = getStreamAndOpFlags();
long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
// Currently, we have no stateless SIZE_ADJUSTING intermediate operations,
// so we can simply ignore SIZE_ADJUSTING in parallel streams, since adjustments
// are already accounted in the input spliterator.
//
// If we ever have a stateless SIZE_ADJUSTING intermediate operation,
// we would need step back until depth == 0, then call exactOutputSize() for
// the subsequent stages.
if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel()) {
// Skip the source stage as it's never SIZE_ADJUSTING
for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; stage != null; stage = stage.nextStage) {
size = stage.exactOutputSize(size);
}
}
return size;
}
exactOutputSizeIfKnown함수에서는 정말로 현재 spliterator의 사이즈를 return 해주고 있다. 그러면 makeNodeBuilder는 무엇을 할까. 다만 우리는 위에서 다시 확인해 보면 사이즈를 모른다고 명시해 주었다. 즉 size는 -1이 return 될 것이다. 이후 동작을 살펴보자. sss
@Override
final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
return Nodes.builder(exactSizeIfKnown, generator);
}
static <T> Node.Builder<T> builder(long exactSizeIfKnown, IntFunction<T[]> generator) {
return (exactSizeIfKnown >= 0 && exactSizeIfKnown < MAX_ARRAY_SIZE)
? new FixedNodeBuilder<>(exactSizeIfKnown, generator)
: builder();
}
exactSizeIfKnown의 인자로 -1이 들어왔으므로 builder()가 반환된다.
static <T> Node.Builder<T> builder() {
return new SpinedNodeBuilder<>();
}
SpinedNodeBuilder의 구현체다.
/**
* Variable-sized builder class for reference nodes
*/
private static final class SpinedNodeBuilder<T>
extends SpinedBuffer<T>
implements Node<T>, Node.Builder<T> {
private boolean building = false;
SpinedNodeBuilder() {} // Avoid creation of special accessor
@Override
public Spliterator<T> spliterator() {
assert !building : "during building";
return super.spliterator();
}
@Override
public void forEach(Consumer<? super T> consumer) {
assert !building : "during building";
super.forEach(consumer);
}
//
@Override
public void begin(long size) {
assert !building : "was already building";
building = true;
clear();
ensureCapacity(size);
}
@Override
public void accept(T t) {
assert building : "not building";
super.accept(t);
}
@Override
public void end() {
assert building : "was not building";
building = false;
// @@@ check begin(size) and size
}
@Override
public void copyInto(T[] array, int offset) {
assert !building : "during building";
super.copyInto(array, offset);
}
@Override
public T[] asArray(IntFunction<T[]> arrayFactory) {
assert !building : "during building";
return super.asArray(arrayFactory);
}
@Override
public Node<T> build() {
assert !building : "during building";
return this;
}
}
우리가 스트림 작업을 하기 위해 사전에 메모리를 잡아두는 역할을 하고 있다.
이제 다시 wrapAndCopyInto 부분을 살펴보자.
/**
* Applies the pipeline stages described by this {@code PipelineHelper} to
* the provided {@code Spliterator} and send the results to the provided
* {@code Sink}.
*
* @implSpec
* The implementation behaves as if:
* <pre>{@code
* copyInto(wrapSink(sink), spliterator);
* }</pre>
*
* @param sink the {@code Sink} to receive the results
* @param spliterator the spliterator describing the source input to process
*/
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
이해가 잘 가지 않았지만 그래도 작성하신 분께서 친절하게 주석을 잘 달아두셨다.
이 파이프라인 헬퍼가 설명하는 파이프라인 단계를 제공된 spliterator에 적용하고 결과를 제공된 싱크에 보냅니다.
Params:
sink - 결과를 수신할 싱크
spliterator - 처리할 소스 입력을 설명하는 spliterator
번역기를 돌린 거라 조금 이상하지만 정리를 해보면 지금까지의 과정에 대해 spliterator에 적용하고 결과를 sink로 보낸다고 한다.
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
우리는 현재 모든 사이즈를 모른다고 명시하고 작업을 진행하고 있었다.
그렇다면 최초 if문을 진입할 것이고 전달받은 sink가 begin부터 시작해서 end까지 진행될 것이다.
드디어 뭔가 처음부터 끝까지 순회하며 데이터를 조작하는 듯한 코드가 발견되었다.
이 함수가 정확히 어떤 일을 하는지 스켈레톤을 살펴보자.
/**
* Pushes elements obtained from the {@code Spliterator} into the provided
* {@code Sink}. If the stream pipeline is known to have short-circuiting
* stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), the
* {@link Sink#cancellationRequested()} is checked after each
* element, stopping if cancellation is requested.
*
* @implSpec
* This method conforms to the {@code Sink} protocol of calling
* {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
* calling {@code Sink.end} after all elements have been pushed.
*
* @param wrappedSink the destination {@code Sink}
* @param spliterator the source {@code Spliterator}
*/
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
스플리터레이터에서 얻은 엘리먼트를 제공된 싱크에 푸시합니다.
스트림 파이프라인에 단락 스테이지가 있는 것으로 알려진 경우(StreamOpFlag.SHORT_CIRCUIT 참조),
각 요소마다 Sink.cancellationRequested()를 검사하여 취소가 요청되면 중지합니다.
Params:
wrappedSink - sink
스플리터 - spliterator
우리가 최초에 컬렉션에 있던 원소 하나하나를 sink에 밀어 넣으면서 진행을 하는 것 같다.
그러면 begin, end, forEachRemaining도 살펴보자.
/**
* Resets the sink state to receive a fresh data set. This must be called
* before sending any data to the sink. After calling {@link #end()},
* you may call this method to reset the sink for another calculation.
* @param size The exact size of the data to be pushed downstream, if
* known or {@code -1} if unknown or infinite.
*
* <p>Prior to this call, the sink must be in the initial state, and after
* this call it is in the active state.
*/
default void begin(long size) {}
새로운 데이터 세트를 수신하도록 싱크 상태를 재설정합니다.
이 메서드는 싱크에 데이터를 보내기 전에 호출해야 합니다.
end()를 호출한 후 이 메서드를 호출하여 다른 계산을 위해 싱크를 재설정할 수 있습니다.
Params:
size - 다운스트림으로 푸시할 데이터의 정확한 크기(알려진 경우), 알 수 없거나 무한한 경우 -1입니다.
이 함수를 호출하기 전에 싱크는 초기 상태여야 하며, 이 함수를 호출한 후에는 활성 상태여야 합니다.
/**
* Indicates that all elements have been pushed. If the {@code Sink} is
* stateful, it should send any stored state downstream at this time, and
* should clear any accumulated state (and associated resources).
*
* <p>Prior to this call, the sink must be in the active state, and after
* this call it is returned to the initial state.
*/
default void end() {}
모든 엘리먼트가 푸시되었음을 나타냅니다.
sync가 stateful인 경우, 이때 저장된 모든 상태를 다운스트림으로 전송하고 누적된 상태(및 관련 리소스)를 모두 지워야 합니다.
이 호출 이전에는 sync가 활성 상태여야 하며, 이 호출 이후에는 초기 상태로 돌아갑니다.
/**
* Performs the given action for each remaining element, sequentially in
* the current thread, until all elements have been processed or the action
* throws an exception. If this Spliterator is {@link #ORDERED}, actions
* are performed in encounter order. Exceptions thrown by the action
* are relayed to the caller.
* <p>
* Subsequent behavior of a spliterator is unspecified if the action throws
* an exception.
*
* @implSpec
* The default implementation repeatedly invokes {@link #tryAdvance} until
* it returns {@code false}. It should be overridden whenever possible.
*
* @param action The action
* @throws NullPointerException if the specified action is null
*/
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
모든 요소가 처리되거나 예외가 발생할 때까지 현재 스레드에서 순차적으로 남은 각 요소에 대해 지정된 작업을 수행합니다.
이 spliterator가 ORDERED인 경우, 액션은 발생 순서대로 수행됩니다.
액션이 던진 예외는 호출자에게 전달됩니다.
액션이 예외를 던지면 spliterator의 후속 동작은 지정되지 않습니다.
Params:
action - 액션
Throws:
NullPointerException - 지정된 액션이 null인 경우
/**
* If a remaining element exists, performs the given action on it,
* returning {@code true}; else returns {@code false}. If this
* Spliterator is {@link #ORDERED} the action is performed on the
* next element in encounter order. Exceptions thrown by the
* action are relayed to the caller.
* <p>
* Subsequent behavior of a spliterator is unspecified if the action throws
* an exception.
*
* @param action The action
* @return {@code false} if no remaining elements existed
* upon entry to this method, else {@code true}.
* @throws NullPointerException if the specified action is null
*/
boolean tryAdvance(Consumer<? super T> action);
남은 요소가 존재하면 해당 요소에 지정된 작업을 수행하여 true을 반환하고, 그렇지 않으면 false를 반환합니다.
spliterator가 ORDERED인 경우, 액션은 순서대로 다음 엘리먼트에 대해 수행됩니다.
액션이 던진 예외는 호출자에게 전달됩니다. 액션이 예외를 던지면 spliterator의 후속 동작은 지정되지 않습니다.
Params: action - 액션 리턴합니다: 이 메서드에 진입했을 때 남은 요소가 없으면 거짓, 그렇지 않으면 참입니다. Throws: NullPointerException - 지정된 액션이 null인 경우
이여서 이건 구현체까지 살펴보니
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (action == null)
throw new NullPointerException();
if (index >= 0 && index < fence) {
@SuppressWarnings("unchecked") T e = (T) array[index++];
action.accept(e);
return true;
}
return false;
}
찾았다. 여기서 index를 하나씩 증가시켜 가며 넘겨준다.
5. 전체 요약
stream() : 진짜로 stream을 사용할 수 있게 최초 open하는 과정. 현재의 Collection 데이터가 저장이 됨.
filter() : predicate에 대한 정의 후 sink로 저장
map() : mapper로 정의된 내용에 따라 filter를 거친 후 apply함수에서 downstream으로 넘겨줌
toList() : 지금까지 정의된 모든 filter, map에 대한 작업을 진행 후 원하는 형태(여기서는 list)로 반환
6. 마치며
드디어 전체적인 과정을 살펴보았다.
생각보다 쉽지는 않았고 완벽하게 코드를 이해하지는 못했지만 그래도 내부적으로는 각각 어떤 작업들이 발생하는지 알아볼 수 있는 기회가 되었다.