[모던 자바] Future 인터페이스를 활용한 비동기 계산
Future의 단순 활용
Java5 부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있다.
비동기 계산을 모델링하는 데 Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.
시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다.
Future는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다.
Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출해야 한다.
Java8 이전의 예제 코드는 아래와 같다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Future call Start");
Thread.sleep(5000); // 5초간 지연
System.out.println("Future call End");
return "Future Call";
}
});
try {
Thread.sleep(3000); // 3초간 지연
System.out.println("Do Something else");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
String result = future.get(10, TimeUnit.SECONDS);
} catch (ExecutionException ee) { // 계산 중 예외 발생
ee.printStackTrace();
} catch (InterruptedException ie) { // 현재 스레드에서 대기 중 인터럽트 발생
ie.printStackTrace();
} catch (TimeoutException te) { // Future가 완료되기 전에 타임아웃 발생
te.printStackTrace();
}
총 8초가 지연되는 코드이지만 Future를 활용해 비동시적으로 실행해서 최대 5초가 지연되고 있다.
이처럼 ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있고, 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때 Future의 get 메서드로 결과를 가져올 수 있다.
get 메서드를 호출했을 때 이미 계산이 완료되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면 작업이 완료될 때까지 스레드를 블록 시킨다.
만약 오래 걸리는 작업이 영원히 끝나지 않는 것을 방지하기 위해 get 메서드를 overload 해서 스레드가 대기할 최대 타임아웃 시간을 설정하는 것이 좋다.
Future 제한
Future 인터페이스는 비동기 계산이 끝났는지 확인하는 메서드, 계산이 끝나길 기다리는 메서드, 결과 회수 메서드 등을 제공하는데 이들만으로는 간결한 동시 실행 코드를 구현하기에 충분하지 않다.
- 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 계산 결과는 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있다.
- Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
- Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.
- 프로그램적으로 Future를 완료시킨다.(비동기 동작에 수동으로 결과 제공)
- Future 완료 동작에 반응한다.
위와 같은 기능을 선언형으로 이용할 수 있도록 Java8에서는 CompletableFuture 클래스를 제공한다.
Future와 CompletableFuture의 관계를 Collection과 Stream와 비슷한 패턴(람다 표현식과 파이프라이닝)으로 활용할 수 있다.
CompletableFuture로 비동기 API 만들기
예를 들어 요리사가 동시에 여러 가지 음식을 만든다고 가정해보자.
@Getter
@AllArgsConstructor
public class Food {
private String name;
}
// 요리하기
public Future<Food> cookingAsync(String foodName, long start) {
CompletableFuture<Food> futureFood = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Food food = new Food(foodName);
futureFood.complete(food);
long cookingAsyncTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println(foodName + " 요리중... (" + cookingAsyncTime + "msecs)");
}).start();
return futureFood;
}
위 코드에서 비동기 요리와 요리가 완료된 음식을 포함하는 CompletableFuture 인스턴스를 만들었다.
그리고 실제 요리를 진행할 다른 스레드를 만든 다음에 결과를 기다리지 않고 결과를 포함할 Future 인스턴스를 바로 반환했다.
요리가 완료된 음식의 정보가 도착하면 complete 메서드를 이용해서 CompletableFuture를 종료할 수 있다.
그럼 두 가지 음식을 한 번에 요리해보자.
long start = System.nanoTime();
Future<Food> pizzaFuture = cookingAsync("피자", start);
System.out.println("pizzaFuture 반환!");
Future<Food> pastaFuture = cookingAsync("파스타", start);
System.out.println("pastaFuture 반환!");
try {
Food pizza = pizzaFuture.get();
long pizzaCompletedTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println(pizza.getName() + " 완성! (" + pizzaCompletedTime + "msecs)");
Food pasta = pastaFuture.get();
long pastaCompletedTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println(pasta.getName() + " 완성! (" + pastaCompletedTime + "msecs)");
} catch (Exception e) {
throw new RuntimeException(e);
}
실행 결과를 보면 요리가 끝나기 전에 Future가 반환되는 것을 확인할 수 있다.
피자와 파스타를 모두 요리하는데 약 4초가 걸리지만 동시에 요리했으므로 2초 만에 두 요리가 모두 완성되었다.
에러 처리 방법
만약에 요리하는 메서드가 진행되는 동안 에러가 발생하면 해당 스레드에만 영향을 미친다.
결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수도 있다.
위에서 말한 get 메서드를 overload 해서 스레드가 대기할 최대 타임아웃 시간을 설정하는 방법으로 해결할 수 있지만 어떤 오류가 왜 발생했는지 알 수 있는 방법이 없다.
따라서 completeExcpetionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해야 한다.
public Future<Food> cookingAsync(String foodName, long start) {
CompletableFuture<Food> futureFood = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Food food = new Food(foodName);
futureFood.complete(food);
} catch (Exception e) {
futureFood.completeExceptionally(e);
}
long cookingAsyncTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println(foodName + " 요리중... (" + cookingAsyncTime + "msecs)");
}).start();
return futureFood;
}
RuntimeException을 발생시켜보면 아래처럼 ExecutionException이 발생하고 어떤 Exception이 발생하였는지 정보를 출력해준다.
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
좀 더 간단하게 CompletableFuture를 만드는 방법도 있다.
supplyAsync 메서드는 Supplier를 인수로 받아서 비동기 실행 후 CompletableFuture를 반환한다.
ForkJoinPool의 Executor 중 하나가 Supplier를 실행할 것이다.
public Future<Food> cookingAsync(String foodName, long start) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long cookingAsyncTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println(foodName + " 요리중... (" + cookingAsyncTime + "msecs)");
return new Food(foodName);
});
}
두 번째 인수를 받는 오버로드 버전에 supplyAsync 메서드를 이용해서 다른 Executor를 지정할 수 있다.
결국 모든 다른 CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달할 수 있다.
병렬 Stream과 CompletableFuture 비동기 호출
병렬 Stream을 통해 비동기 호출과 같은 효과를 볼 수 있다.
List<String> foodNames = List.of("피자", "파스타", "햄버거");
List<Food> foods = foodNames.parallelStream()
.map(name -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Food(name);
})
.collect(Collectors.toList());
만약 최대로 활용할 수 있는 스레드의 개수가 4개이고 병렬 처리할 스레드가 5개일 때, 4개의 스레드가 끝난 후 다음 스레드가 실행되기 때문에 2배의 시간이 더 걸릴 수 있다.
팩토리 메서드 supplyAsync로 CompletableFuture를 만들고 동기적, 순차적으로 결과를 얻을 수도 있다.
List<String> foodNames = List.of("피자", "파스타", "햄버거");
List<CompletableFuture<Food>> foodFutures = foodNames.stream()
.map(name -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Food(name);
}))
.collect(Collectors.toList());
List<Food> foods = foodFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
CompletableFuture 클래스의 join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖는다. 다만 join은 아무 예외도 발생시키지 않는다는 점이 다르다.
CompletableFuture 버전이 병렬 스트림 버전보다 아주 조금 빠르다.
결과적으로는 비슷하지만 CompletableFuture는 병렬 스트림 버전이 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있어, Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
커스텀 Executor 사용하기
애플리케이션이 실제로 필요한 작업량을 고려한 풀에서 관리하는 스레드 수에 맞게 Executor를 만들 수 있으면 좋을 것이다. 풀에서 관리하는 스레드 수를 어떻게 결정할 수 있을까?
자바 병렬 프로그래밍에서는 스레드 풀이 너무 크면 CPU와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다.
반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을 수도 있다.
예를 들어 5개의 음식을 요리하는데 스레드가 10개이면 낭비인 것이다. 음식 하나당 하나의 스레드가 할당될 수 있도록 Executor를 설정해야 한다.
스레드 수가 너무 많으면 오리혀 서버가 크래시 될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는 100 이하로 설정하는 것이 바람직하다.
ExecutorService executor = Executors.newFixedThreadPool(Math.min(foodNames.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true); // 프로그램 종료를 방해하지 않는 데몬 스레드 사용
return t;
}
});
자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 될 수 있기 때문에 데몬 스레드를 사용해 자바 프로그램이 종료될 때 강제로 종료되도록 만든다.
이렇게 만든 Executor를 supplyAsync 두 번째 인수로 전달하면 된다.
List<CompletableFuture<Food>> foodFutures = foodNames.stream()
.map(name -> CompletableFuture.supplyAsync(() -> {
...
}, executor))
.collect(Collectors.toList());
비동기 동작을 많이 사용하는 상황에서는 이렇게 커스텀 Executor를 만들어 CompletableFuture를 활용하는 것이 가장 효과적일 수 있다.