[모던 자바] CompletableFuture를 활용한 비동기 작업 파이프라인 만들기
앞서 Future와 CompletableFuture를 활용해 비동기 작업 처리를 해보았다.
이를 이용해 비동기 작업 파이프라인 만드는 법을 살펴보자.
동기 작업과 비동기 작업 조합하기
만약 음식을 요리하고 완성된 후 접시에 담는다고 가정해보자.
List<String> foodNames = List.of("피자", "햄거버", "파스타", "김밥", "떡볶이");
ExecutorService executor = Executors.newFixedThreadPool(Math.min(foodNames.size(), 100), runnable -> {
Thread t = new Thread(runnable);
t.setDaemon(true);
return t;
});
List<CompletableFuture<Dish>> dishFutures = foodNames.stream()
.map(name -> CompletableFuture.supplyAsync(() -> {
System.out.println(name + " 요리중...");
return new Food(name);
}, executor))
.map(future -> future.thenApply(food -> {
System.out.println(food.getName() + " 완성");
return food;
}))
.map(future -> future.thenCompose(food -> CompletableFuture.supplyAsync(() -> {
System.out.println(food.getName() + " 접시에 담기");
return new Dish(food);
}, executor)))
.collect(Collectors.toList());
List<Dish> dishes = dishFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
supplyAsync 메서드를 이용해 비동기적으로 음식을 요리하고
CompletableFuture에 thenApply 메서드를 호출해 어떤 음식이 완성되었는지 확인 후 다시 음식을 전달한다.
thenApply 메서드는 CompletableFuture가 끝날 때까지 블록하지 않는다는 점을 주의해야 한다.
즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다.
세 번째 map에서는 CompletableFuture에 thenCompose 메서드를 호출해 두 CompletableFuture을 조합했다.
thenCompose 메서드는 첫 번째 연산의 결과를 두 번째 연산으로 전달한다.
thenCompose 메서드도 Async로 끝나는 버전이 존재한다.
Aync로 끝나지 않는 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함을 의미하며, Aync로 끝나는 메서드는 다음 작업이 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다.
위 예제는 두 번째 CompletableFuture가 첫 번째 CompletableFuture에 의존하므로 Aync 버전에 thenCompose 메서드를 사용해도 실행시간에는 영향을 미치지 않는다.
따라서 스레드 전환 오버헤드가 적제 발생하면서 효율성이 좀 더 좋은 thenCompose를 사용했다.
독립된 두 개의 CompletableFuture 합치기
때로는 두 개의 CompletableFuture의 비동기 계산이 끝난 후 두 결과물을 합쳐야 할 경우가 있다.
물론 첫 번째 CompletableFuture의 동작 완료와 상관없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
이런 상황에서는 thenCombine 메서드를 사용할 수 있다.
thenCombine 메서드는 BiFunction을 두 번째 인수로 받는다. BiFunction은 두 개의 CompletableFuture 결과를 어떻게 합칠지 정의한다.
thenCombine도 Async 버전이 존재하는데 thenCombineAsync 메서드는 BiFunction이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행된다.
위 예제를 thenCombine 메서드를 사용해서 구현해보자.
List<CompletableFuture<Dish>> dishFutures = foodNames.stream()
.map(name -> CompletableFuture.supplyAsync(() -> {
System.out.println(name + " 요리중...");
return new Food(name);
}, executor))
.map(future -> future.thenApply(food -> {
System.out.println(food.getName() + " 완성");
return food;
}))
.map(future -> future.thenCombine(
CompletableFuture.supplyAsync(Dish::new, executor),
(food, dish) -> {
dish.setFood(food);
System.out.println(food.getName() + " 접시에 담기");
return dish;
}
)).collect(Collectors.toList());
List<Dish> dishes = dishFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
thenCompose 메서드와는 달리 각각에 CompletableFuture의 동작이 비동기 식으로 완료되는 것을 확인할 수 있다.
타임아웃 효과적으로 사용하기
Future의 계산 결과를 읽을 때는 무한정 기다리는 상황이 발생할 수 있으므로 블록을 하지 않는 것이 좋다.
Java9에서는 CompletableFuture에서 제공하는 몇 가지 기능을 이용해 이런 문제를 해결할 수 있다.
orTimeout 메서드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException으로 완료하면서 또 다른 CompletableFuture를 반환할 수 있도록 내부적으로 ScheduledThreadExecutor를 활용한다.
이 메서드를 이용하면 계산 파이프라인을 연결하고 여기서 TimeoutException이 발생하도록 메서드 체인의 끝에 orTimeout 메서드를 추가할 수 있고, 길이도 필요에 따라 조절할 수 있다.
CompletableFuture.supplyAsync(() -> {
...
}).orTimeout(3, TimeUnit.SECONDS);
만약 어떤 동작이 설정한 Timeout에 완료되지 못했을때 completeOnTimeout를 이용해 기본값을 지정할 수도 있다.
CompletableFuture.supplyAsync(() -> {
...
}).completeOnTimeout(DEFAULT_VALUE, 3, TimeUnit.SECONDS);
CompletableFuture 종료에 대응하는 방법
앞서 위 CompletableFuture를 조합하는 예제에서 모든 음식이 완성된 후에 접시에 모든 음식을 담을 수 있었다.
하지만 이런 경우 사용자는 모든 음식이 완성될 때까지 기다려야 한다. 심지어 너무 오래 걸리는 음식이 있다면 타임아웃도 일어날 수 있다.
모든 음식이 완성될 때까지 기다리지 말고 완성된 음식부터 그릇에 담을 수 없을까?
먼저 모든 음식이 완성될때까지 리스트 생성을 기다리지 않도록 하려면 CompletableFuture의 스트림을 직접 제어해야 한다.
음식마다 완성되는 시간이 다르게 설정을 해서 테스트를 해보자.
List<String> foodNames = List.of("피자", "햄거버", "파스타", "김밥", "떡볶이");
ExecutorService executor = Executors.newFixedThreadPool(Math.min(foodNames.size(), 100), runnable -> {
Thread t = new Thread(runnable);
t.setDaemon(true);
return t;
});
Random random = new Random();
Stream<CompletableFuture<Dish>> dishFutures = foodNames.stream()
.map(name -> CompletableFuture.supplyAsync(() -> {
int delay = random.nextInt(2000);
System.out.println(name + " 요리 시작 (" + delay + " msecs)");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Food(name);
}, executor))
.map(future -> future.thenCompose(food ->
CompletableFuture.supplyAsync(() -> new Dish(food), executor)
));
이렇게 만든 스트림에 map을 하나 더 추가하고 thenAccept 라는 메서드를 사용한다.
thenAccept 메서드는 새로운 스레드를 이용해서 CompletableFuture의 결과를 소비한다.
CompletableFuture[] futures = dishFutures
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
CompletableFuture의 결과를 소비 후 배열로 반환하고 팩토리 메서드 allOf를 사용해 CompletableFuture<Void>를 반환한다. 전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다.
따라서 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를 기다릴 수 있다.
실행 결과를 확인해보면 가장 빨리 완성된 음식부터 접시에 담긴것을 확인할 수 있다.