본문 바로가기

Programing/JVM(Java, Kotlin)

[Java] Stream in depth : Stream & Pipeline

스트림의 내부 구조 및 원리를 확인하다 두서없이 정리해두었다.
스트림의 사용에 대한 내용이라면 자바 기본서에 있는 것을 보는 것이 좋을 것이다.

BaseStream 인터페이스

BaseStream 인터페이스는 위의 다이어그램에 적혀있는 메서드들을 가지고 있는 인터페이스이다.
try with resource에 유용하게 사용가능한 AutoCloseable 인터페이스를 상속하고 있다.

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable

또한 제너릭 인자 두 개를 갖는데 하나는 스트림 원소들의 타입(T)이고 나머지는 BaseStream을 구현하는 또 다른 타입(S)이다.
S는 BaseStream - 즉 자기자신을 bounded 하였기 때문에 BaseStream을 상속하고 있는 서브 타입을 쓸 수 있다.

PrimitiveIterator / Spliterator 인터페이스

BaseStream 인터페이스에는 스트림의 원소를 순회하기 위해 Iterator<T>를 반환해주는 iterator() 메서드를 가지고 있다. Java 1.2부터 있던 것이라 이미 많이 익숙해서 설명은 생략한다. 대신 Java 8에는 Iterator를 상속하는 PrimitiveIterator 인터페이스가 추가되었다.


반면 Spliterator<T>라는 자바 8에 추가된 인터페이스를 반환하는 spliterator() 메서드를 포함한다.
spliterator는 병렬 기능이 추가된(parallel analogue) Iterator라고 볼 수 있다.

원천 데이터(source)의 원소들을 순회(traversing)하고 파티셔닝(partitioning)하는 기능을 한다.
Sliterator 인터페이스에 정의된 인터페이스 중 범위를 가져오는 Range*Spliterator는 Streams 클래스에서 구현한다.

Sink 인터페이스

StreamSupport 클래스

저수준의 스트림을 직접 사용을 한다면 실제 구현까지 잘 알아야 할 것이다.
따라서 자바에서는 이런 스트림들을 다루기위해 StreamSupport 클래스라는 유틸리티 메서드들을 제공한다.
바로 우리가 Collection이나 배열에서 .stream()을 호출하여 사용을 하는데 내부적으로 StreamSupport의 정적 메서드를 호출하게 구현되어 있다.

4가지 타입(아래 서브 타입들에 나온다)에 대해 2가지 형태로 총 8개의 정적 메서드를 가지고 있다.

package java.util.stream;

public final class StreamSupport {
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
    public static <T> Stream<T> stream(Supplier<? extends Spliterator<T>> supplier,
                                       int characteristics,
                                       boolean parallel) {
        Objects.requireNonNull(supplier);
        return new ReferencePipeline.Head<>(supplier,
                                            StreamOpFlag.fromCharacteristics(characteristics),
                                            parallel);
    }
    public static IntStream intStream(Spliterator.OfInt spliterator, boolean parallel) {
        return new IntPipeline.Head<>(spliterator,
                                      StreamOpFlag.fromCharacteristics(spliterator),
                                      parallel);
    }
    public static IntStream intStream(Supplier<? extends Spliterator.OfInt> supplier,
                                      int characteristics,
                                      boolean parallel) {
        return new IntPipeline.Head<>(supplier,
                                      StreamOpFlag.fromCharacteristics(characteristics),
                                      parallel);
    }
    public static LongStream longStream(Spliterator.OfLong spliterator,
                                        boolean parallel) {
        return new LongPipeline.Head<>(spliterator,
                                       StreamOpFlag.fromCharacteristics(spliterator),
                                       parallel);
    }
    public static LongStream longStream(Supplier<? extends Spliterator.OfLong> supplier,
                                        int characteristics,
                                        boolean parallel) {
        return new LongPipeline.Head<>(supplier,
                                       StreamOpFlag.fromCharacteristics(characteristics),
                                       parallel);
    }
    public static DoubleStream doubleStream(Spliterator.OfDouble spliterator,
                                            boolean parallel) {
        return new DoublePipeline.Head<>(spliterator,
                                         StreamOpFlag.fromCharacteristics(spliterator),
                                         parallel);
    }
    public static DoubleStream doubleStream(Supplier<? extends Spliterator.OfDouble> supplier,
                                            int characteristics,
                                            boolean parallel) {
        return new DoublePipeline.Head<>(supplier,
                                         StreamOpFlag.fromCharacteristics(characteristics),
                                         parallel);
}

BaseStream을 상속하고 있는 서브 타입들

BaseStream JavaDoc에 적혀있듯이 Stream, IntStream, LongStream, DoubleStream이 BaseStream을 상속하고 있는 인터페이스이다.

이름에서 쉽게 유추해볼 수 있듯이 IntStream, LongStream, DoubleStream들은 Primitive 타입(int, long, double)을 위한 스트림이다.

stream pipelines

스트림은 하나의 타입으로의 사용보다는 흐름(stream)으로 사용될 때 의미를 갖는다.
java.util.stream 패키지에 대한 설명을 보면 스트림 연산에는 중간(intermediate)과 끝(terminal) 연산으로 나뉘고, 이런 연산은 스트림 파이프라인의 형태(stream pipelines)로 결합(be combined)된다고 나와있다.

pipeline 클래스도 위에서 살펴본 BaseStream 처럼 계층구조로 되어있다. 다만 위에서는 인터페이스라면 여기는 추상클래스이다.

최상위 클래스인 PipelineHelper 추상 클래스의 경우 JDK 8 기준 10개의 추상 메서드가 정의되어 있는데 아래와 같다.

이중 getSourceShare()라는 메서드가 있는데 스트림의 특성을 반환하는 기능을 한다.

StreamShape enum

package java.util.stream;

enum StreamShape {
    REFERENCE,
    INT_VALUE,
    LONG_VALUE,
    DOUBLE_VALUE
}

StreamShape 열거형에는 참조, int 값, long 값, double 값 이렇게 4가지로 나뉘어 있다.
위에서 본 BaseStream, AbstractPipeline 하위의 타입들의 개수랑 정확히 일치한다.

-----

Example : sum (1~100)

예로 1부터 100까지의 합을 구하는 것을 해보자.
스트림을 쓰지 않는다면 아래와 같은 코드로 합을 구할 수 있다.

public class SumOfOneToOneHundred {
    public static void main(String[] args) {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        System.out.println(sum);
    }
}

만약에 이것을 스트림으로 옮긴다면 아래와 같은 코드가 될 것이다.

public class SumOfOneToOneHundred {
    public static void main(String[] args) {
        int sum = IntStream.rangeClosed(1, 100).reduce(0, (x, y) -> x + y);
        System.out.println(sum);
    }
}

또한 reduce의 (x, y) -> x + y 람다 연산은 아래와 같은 메서드 레퍼런스로 바꿀 수 있다.

public class SumOfOneToOneHundred {
    public static void main(String[] args) {
        int sum = IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
        System.out.println(sum);
    }
}

참고로 Integer의 sum 메서드는 아래와 같다.

package java.lang;

public final class Integer extends Number implements Comparable<Integer> {
    /**
     * Adds two integers together as per the + operator.
     * ..
     * @see java.util.function.BinaryOperator
     * @since 1.8
     */
    public static int sum(int a, int b) {
        return a + b;
    }
    // ..
}

BinaryOperator는 BiFunction을 상속받은 함수형 인터페이스이고 BiFunction는 apply를 가지고 있는 함수형인터페이스이다.

package java.util.function;

@FunctionalInterface
public interface BiFunction<T, U, R> {
    R apply(T t, U u);
    // ..
}
package java.util.function;

@FunctionalInterface
public interface BinaryOperator<T> extends BiFunction<T,T,T> {
    // ..
}

따라서 BinaryOperator<T>에 의한 BiFunction은 결국 아래와 같은 형태가 될 것이다.

@FunctionalInterface
public interface BiFunction<T, T, T> {
    T apply(T t, T u);
    // ..
}

이것은 Integer의 sum 정적 메서드와 시그너처가 일치한다.
사실 Integer#sum의 JavaDoc에는 BinaryOperator를 보라고 되어 있지만, IntBinaryOperator 라는 함수형 인터페이스가 따로 있다.

package java.util.function;

@FunctionalInterface
public interface IntBinaryOperator {
    int applyAsInt(int left, int right);
}

동작은?

코드가 여러가지 나와서 다시 옮겨적는다.

public class SumOfOneToOneHundred {
    public static void main(String[] args) {
        int sum = IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
        System.out.println(sum);
    }
}

위의 코드는 어떻게 동작하게 될까?
IntStream.rangeClosed(1, 100).reduce(0, Integer::sum); 문장을 크게 둘로 나누어 보면 아래와 같다.

  1. IntStream.rangeClosed(1, 100)
  2. IntStream.reduce(0, Integer::sum)

IntStream.rangeClosed

IntStream에는 range와 rangeClosed가 있는데 전자는 끝의 조건을 포함하지 않는다.
만약 IntStream.range(1, 100)을 했더라면 1부터 99까지의 범위에 대한 스트림을 만들게 된다.
물론 IntStream.range(1, 101)와 같이 하면 동일하게 동작을 하겠지만 오히려 IntStream.rangeClosed(1, 100)가 더 명확해보인다.

package java.util.stream;

public interface IntStream extends BaseStream<Integer, IntStream> {
    // ..
    public static IntStream rangeClosed(int startInclusive, int endInclusive) {
        if (startInclusive > endInclusive) {
            return empty();
        } else {
            return StreamSupport.intStream(
                    new Streams.RangeIntSpliterator(startInclusive, endInclusive, true), false);
        }
    }

내부적으로 StreamSupport의 intStream을 호출한다. (StreamSupport는 이미 위에서 등장했다.)

public final class StreamSupport {
    // ..
    public static IntStream intStream(Spliterator.OfInt spliterator, boolean parallel) {
        return new IntPipeline.Head<>(spliterator,
                                      StreamOpFlag.fromCharacteristics(spliterator),
                                      parallel);
    }

인자로 받는 Streams.RangeIntSpliterator는 위에서 보는 것 처럼 Spliterator.OfInt이다.
RangeIntSpliterator에 대해서는 https://namocom.tistory.com/771 의 아랫부분을 참고하자.

IntStream.reduce(0, Integer::sum)

IntStream 인터페이스의 reduce는 아래의 형태이고 실제 사용은 IntPipeline에 의해 구현되어 있다.

public interface IntStream extends BaseStream<Integer, IntStream> {

    // ..
    int reduce(int identity, IntBinaryOperator op);
abstract class IntPipeline<E_IN>
        extends AbstractPipeline<E_IN, Integer, IntStream>
        implements IntStream {

    // ..
    @Override
    public final int reduce(int identity, IntBinaryOperator op) {
        return evaluate(ReduceOps.makeInt(identity, op));
    }

evaluate는 AbstractPipeline에 정의된 것을 사용한다.

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    // ..
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

ReduceOps.makeInt(identity, op)

final class ReduceOps {
    //..
    public static TerminalOp<Integer, Integer> makeInt(int identity, IntBinaryOperator operator) {
        Objects.requireNonNull(operator);
        class ReducingSink
                implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
            private int state;

            @Override
            public void begin(long size) {
                state = identity;
            }

            @Override
            public void accept(int t) {
                state = operator.applyAsInt(state, t);
            }

            @Override
            public Integer get() {
                return state;
            }

            @Override
            public void combine(ReducingSink other) {
                accept(other.state);
            }
        }
        return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

성능을 생각한다면 반복자를 통하는 것 보다 공식을 통한 덧셈의 방법이 훨씬 빠르다.
반복적으로 구하는 것이 O(n)의 시간 복잡도가 필요하다면 공식을 통한 계산 방법은 O(1)의 복잡도를 가지기 때문이다.
이 공식은 등비수열의 합이나 가우스의 일화를 생각하면 쉽게 만들 수 있다.

public class SumOfOneToOneHundred {
    public static void main(String[] args) {
        int sum = IntStream.rangeClosed(1, 201).reduce(0, Integer::sum);
        System.out.println(sum);
        System.out.println(sum(1, 201));
    }

    private static int sum(int startInclusive, int endInclusive) {
        return (startInclusive + endInclusive) * (endInclusive - startInclusive + 1) / 2;
    }
}

스프링 프레임워크도 ControllerAdviceBean클래스에 findAnnotatedBeans부분에 스트림으로 적용을 했다가 뺀 커밋로그가 있다.

따라서 성능에 민감하다면 스트림보다는 다른 방법을 생각해보는 것이 좋다.
또한 스트림도 잘 써야 한다. 누적 연산을 하는 경우라면 reduce를 쓰는 것보다는 collector를 쓰는 것이 좋을 것이다.
이 경우에는 reduce를 쓰면 O(n2)인데 collector를 쓰면 O(n)으로 줄일 수 있다.