Java

[모던 자바] 스트림에 여러 연산 병렬로 실행하기(Stream Forking)

Beekei 2022. 4. 5. 14:20
반응형

스트림에서는 한 번만 연산을 수행할 수 있으므로 결과도 한 번만 얻을 수 있다는 것이 Java8 스트림의 가장 큰 단점이다. 스트림을 두 번 탐색하려면 IllegalStateException이 발생한다.

 

하지만 한 스트림에서 여러 결과를 얻어야 하는 상황이 있을 수 있다.

그러려면 한 번에 한 개 이상의 람다를 스트림으로 적용해야 한다.

즉, fork 같은 메서드를 이용해서 스트림을 포크(분기) 시키고 포크된 스트림에 다양한 함수를 적용해야 한다.

심지어 여러 연산을 각각의 스레드에서 병렬로 실행할 수 있다면 더 좋을 것이다.

 

하지만 Java8의 스트림에서는 이 기능을 제공하지 않는다.

Spliterator(특히 늦은 바인딩 기능을 활용), BlockingQueue, Future를 이용해서 Java8에서 제공하지 않는 기능을 직접 편리한 API로 만들어 보자. 

 

스트림 포킹

스트림에서 여러 연산을 병렬로 실행하려면 먼저 원래 스트림을 감싸면서 다른 동작을 정의할 수 있도록 StreamForker를 만들어야 한다.

public class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }
    
    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        forks.put(key, f); // 스트림에 적용함 함수 저장
        return this; // 파이프라인 구축을 위해 this 반환
    }
    
    public Results getResults() {
        // 구현해야 함
    }
}

fork 메서드는 두 인수를 받는다.

  • 스트림을 특정 연산의 결과 형식으로 변환하는 Function
  • 연산의 결과를 제공하는 키. 내부 앱에 키/함수 쌍 저장

fork 메서드는 StreamForker 자신을 반환하므로 여러 연산을 포킹(forking: 분기)해서 파이프라인을 만들 수 있다.

여기서 사용자는 스트림에서 세 가지 키로 인덱스된 세 개의 동작을 정의한다.

StreamForker는 원래 스트림을 탐색하면서 세 개의 스트림으로 포크시킨다.

이제 포크된 스트림에 세 연산을 병렬로 적용할 수 있으며 결과 맵에 각 키의 인덱스를 이용해서 함수 적용 결과를 얻을 수 있다.

 

StreamForker 동작 모습

getResults 메서드를 호출하면 fork 메서드로 추가한 모든 연산이 실행된다.

getResutls는 다음과 같은 Results 인터페이스의 구현을 반환한다.

public interface Results {
    <R> R get(Object key);
}

Results 인터페이스는 fork 메서드에서 사용하는 key 객체를 받는 하나의 메서드 정의를 포함한다.

이 메서드는 키에 대응하는 연산 결과를 반환한다.

 

ForkingStreamConsumer로 Results 인터페이스 구현하기

public class StreamForker<T> {
    
    ...
    
    public Results getResults() {
        ForkingStreamConsumer<T> consumer = build();
        try {
            stream.sequential().forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }
    
}

ForkingStreamConsumer는 Results 인터페이스와 Consumer 인터페이스를 구현한다.

ForkingStreamConsumer의 역활은 스트림의 모든 요소를 소비해서 for 메서드로 전달된 연산 수 만큼의 BlockingQueue로 분산시키는 것이다.

forEach 메서드를 병렬 스트림에 수행하면 큐에 삽입되는 요소의 순서가 흐트러질 수 있으므로 스트림을 순차로 처리하도록 지시한다.

finish 메서드는 더 이상 처리할 항목리 없음을 지시하는 특별한 요소를 추가한다.

 

ForkingStreamConsumer를 반환하는 build 메서드는 다음과 같다.

public class StreamForker<T> {
    
    ...
    
    private ForkingStreamConsumer<T> build() {
        List<BlockingQueue<T>> queues = new ArrayList<>(); // 각각의 연산을 저장할 큐 리스트 생성
        // 연산 결과를 포함하는 Future를 연산을 식별할 수 있는 키에 대응시켜 맵에 저장
        Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(
            new HashMap<>(),
            (map, e) -> {
                map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                return map;
            },
            (m1, m2) -> {
               m1.putAll(m2);
               return m1;
            });
        return new ForkingStreamConsumer<>(queues, actions);
    }

}

연산을 저장한 큐 리스트를 생성하고, 스트림에서 실행할 여러 동작을 식별할 수 있는 키와 대응하는 연산 결과를 포함하는 Future를 값으로 포함하는 맵을 만든다.

BlockingQueue의 생성한 큐 리스트와 Future의 Map을 ForkingStreamConsumer 생성자로 전달한다.

 

여기서 getOperationResult로 각각의 Future를 만든다.

public class StreamForker<T> {
    
    ...
    
    private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
        BlockingQueue<T> queue = new LinkedBlockingQueue<>();
        queues.add(queue); // 큐를 만들어 큐 리스트에 추가

        // 큐의 요소를 탐색하는 Spliterator 생성
        Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
        // Spliterator를 소스로 갖는 스트림을 생성
        Stream<T> source = StreamSupport.stream(spliterator, false);
        // 스트림에서 주어진 함수를 비동기로 적용해서 결과를 얻을 Future 생성
        return CompletableFuture.supplyAsync(() -> f.apply(source)); 
    }
    
}

getOperationResult 메서드는 새로운 BlockingQueue를 생성하여 큐 리스트에 추가한다.

그리고 큐를 새로운 BlockingQueueSpliterator로 전달한다.

이것은 큐에서 탐색할 항목을 읽는 늦은 바인딩(late-binding) Spliterator다.

Spliterator를 탐색하는 순차 스트림을 만든 다음에 스트림에서 수행할 연산을 포함하는 함수를 정용한 결과를 계산할 Future를 만든다.

 

ForkingStreamConsumer, BlockingQueueSpliterator 구현하기

ForkingStreamConsumer는 다음처럼 구현할 수 있다.

public static class ForkingStreamConsumer<T> implements Consumer<T>, Results {

    static final Object END_OF_STREAM = new Object();

    private final List<BlockingQueue<T>> queues;
    private final Map<Object, Future<?>> actions;

    public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public <R> R get(Object key) {
        try {
            return ((Future<R>) actions.get(key)).get(); // 키에 대응하는 동작의 결과를 반환(Future의 계산 완료 대기)
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void accept(T t) {
        queues.forEach(q -> q.add(t)); // 스트림에서 탐색한 요소를 모든 큐로 전달
    }

    void finish() {
        accept((T) END_OF_STREAM); // 스트림의 끝을 알리는 마지막 요소를 큐에 삽입
    }
}

ForkingStreamConsumer 클래스는 Consumer 인터페이스와 Results 인터페이스를 구현하며, BlockingQueue의 List 참조와 스트림에 다양한 연산을 수행하는 Future의 Map 참조를 유지한다.

 

Consumer 인터페이스는 accept 메서드를 정의한다.

ForkingStreamConsumer가 스트림 요소를 받을 때마다 요소를 BlockingQueue로 추가한다.

그리고 기존 스트림의 모든 요소를 큐에 추가한 다음에는 finish 메서드를 호출해서 마지막 요소를 추가한다.

BlockingQueueSpliterator가 큐의 마지막 요소를 확인하는 순간 더 이상 처리할 요소가 없을음 판단할 수 있다.

 

Results 인터페이스는 get 메서드를 정의한다.

get 메서드는 인수 키로 맵에서 Future를 가져온 다음 값을 언랩하거나 값이 없으면 결과를 기다린다.

 

마지막으로 스트림에서 수행될 각 연산에 대한 BlockingQueueSpliterator를 구현해야 한다.

BlockingQueueSpliterator는 ForkingStreamConsumer에서 만든 BlockingQueue의 참조 중 하나를 포함한다.

public class BlockingQueueSpliterator<T> implements Spliterator<T> {

    private final BlockingQueue<T> q;

    public BlockingQueueSpliterator(BlockingQueue<T> q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        T t;
        while (true) {
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {

            }
        }

        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }

        return false;
    }
	
    // 분할을 하지 않는다.
    @Override
    public Spliterator<T> trySplit() {
        return null;
    }
	
    // 큐에서 몇 개의 요소를 가져올 수 있는지 미리 알 수 없으므로 estimatedSize값은 의미가 없다.
    // 분할을 하지 않는 상황이므로 활용하지 않는다.
    @Override
    public long estimateSize() { 
        return 0;
    }
	
    // Spliterator 특정을 사용하지 않으므로 0을 반환
    @Override
    public int characteristics() {
        return 0;
    }
}

위 코드에서는 스트림을 어떻게 분할할지는 정의하지 않고 늦은 바인딩 기능만 활용하도록 Spliterator를 정의했다.

tryAdvance 메서드는 ForkingStreamConsumer가 원래 스트림에서 추가한 요소를 BlockingQueue에서 가져온다.

이렇게 가져온 요소를 다음 스트림의 소스로 사용할 수 있도록 Consumer로 보낸다.

getOperationResult에서 생성한 Spliterator에서 요소를 보낼 Consumer를 결정하며, fork 메서드로 전달된 함수를 새로 만든 스트림에 적용한다.

 

tryAdvance 메서드는 ForkingStreamConsumer가 더 이상 큐에 처리할 요소가 없음을 알릴 목적으로 추가한 특별한 객체를 발견하기 전까지 true를 반환하며 소비할 다른 요소가 있음을 알린다.

StreamForker 빌딩 블록

왼쪽 상단의 StreamForker는 스트림에 수행할 각 연산을 포함하는 맵을 포함한다. 맵의 인덱스는 키며, 값은 수행할 함수다. 

오른쪽 ForkingStreamConsumer는 이들 연산을 저장할 큐를 가지고 있으며 원래 스트림에서 모든 요소를 소비해서 모든 큐로 요소를 분산시킨다.

 

그림 아래쪽을 보면 각 큐는 항목을 가져오면서 다른 스트림의 소스 역활을 하는 BlockingQueueSpliterator를 포함하고 있다. 

마지막으로 원래 스트림에서 포크된 각 스트림은 함수의 인수로 전달되면 각 수행해야 할 연산을 실행한다.

StreamForker 활용

이제 위에서 구현한 StreamForker를 활용해 여러개의 요리 데이터에서 모든 요리명을 콤마로 분리한 리스트를 만들고, 메뉴의 총 칼로리를 계산하고, 가장 칼로리가 높은 요리를 찾고, 요리를 종류별로 그룹화하는 연산을 수행해보자.

Stream<Dish> menuStream = Stream.of(
    new Dish("김치찌개", 630, DishType.KOREAN_FOOD),
    new Dish("된장찌개", 600, DishType.KOREAN_FOOD),
    new Dish("핫도그", 340, DishType.AMERICAN_FOOD),
    new Dish("애플파이", 720, DishType.AMERICAN_FOOD),
    new Dish("탕수육", 890, DishType.CHINESE_FOOD),
    new Dish("깐풍기", 930, DishType.CHINESE_FOOD)
);

Results results = new StreamForker<Dish>(menuStream)
    .fork("shortMenu", s -> s.map(Dish::getName).collect(Collectors.joining(", ")))
    .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
    .fork("mostCaloricDish", s-> s.reduce((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2).get())
    .fork("dishesByType", s -> s.collect(Collectors.groupingBy(Dish::getType)))
    .getResults();

String shortMenu = results.get("shortMenu");
int totalCalories = results.get("totalCalories");
Dish mostCaloricDish = results.get("mostCaloricDish");
Map<DishType, List<Dish>> dishesByType = results.get("dishesByType");

System.out.println("shortMenu : " + shortMenu);
System.out.println("totalCalories : " + totalCalories);
System.out.println("mostCaloricDish : " + mostCaloricDish);
System.out.println("dishesByType : " + dishesByType);

실행결과

StreamForker는 스트림을 포크하고 포크된 스트림에 다른 연산을 할당할 수 있도록 편리하고 유연한 API를 제공한다.

각각의 연산을 스트림에 함수를 적용하는 형식으로 되어있는데, 이 연산을 객체로 식별할 수 있다. 위 예제에서는 문자열로 연산을 식별한다.

더 포크할 스트림이 없으면 StreamForker에 getResults를 호출해서 정의한 연산을 모두 수행하고 Results를 얻을 수 있다.

내부에서는 연산을 비동기적으로 실행하므로 getResults 메서드를 호출하면 결과를 기다리는 것이 아니라 즉시 반환한다.

 

Results 인터페이스에 키를 전달해서 특정 연산의 결과를 얻을 수 있다.

연산 결과가 끝난 상황이라면 get 메서드가 결과를 반환하며, 아직 연산이 끝나지 않았으면 결과가 나올때까지 호출이 블록된다.

 

성능 문제

위에서 살펴본 방법이 스트림을 여러 번 탐색하는 방식에 비해 성능이 더 좋을 것이라고 생각하면 오산이다.

특히 메모리에 있는 데이터로 스트림을 만든 상황에서는 블록 큐를 사용하면서 발생하는 오버헤드가 병렬 실행으로 인한 이득보다 클 수 있다.

반대로 아주 큰 파일 스트림으로 사용하는 등 비싼 I/O 동작을 수행하는 상황에서는 한 번마 스트림을 활용하는 것이 더 좋은 선택일 수 있다.

지금까지 강조한 것처럼 이번에도 어느 쪽이 성능이 좋은지 판한다는 가장 좋은 방법은 직접 측정해 보는 것이다.

 

반응형