chapter 16 CompletableFuture: 안정적 비동기 프로그래밍
16.1 Future의 단순 활용
미래의 어느 시점에 결과를 얻는 모델을 활용할 수 있도록 Future 인터페이스를 제공하고 있다.
비동기 계산을 모델링하는 데 Future를 이용할 수 있으며, Future은 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.
ExecutorService executor = Executors.newCachedThreadPool(); //스레드 풀에 테스크를 제출하려면 ExecutorService를 만들어야한다.
Future<Double> future = executor.submit(new Callable<Double>() { //Callable을 ExecutorService로 제출한다.
@Override
public Double call() throws Exception {
// 시간이 오래 걸리는 작업은 다른 스레드에서 비동기적으로 실행한다.
return doSomeLongComputation();
}
});
//비동기 작업을 수행하는 동안 다른 작업을 한다.
doSomethingElse();
try{
//비동기 작업의 결과를 가져온다. 결과가 준비되어 있지 않으면 호출 스레드가 블록된다. 하지만 최대 1초까지만 기다린다.
Double result = future.get(1, TimeUnit.SECONDS);
}catch (ExecutionException ee){
//계산 중 예외 발생
}catch (InterruptedException ie){
// 현재 스레드에서 대기 중 인터럽트 발생
}catch (TimeoutException te){
// Future가 완료되기 전에 타임아웃 발생
}
16.1.1 Future 제한
* 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 계산 결과는 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있다.
* Future 집합이 실행하는 모든 테스트의 완료를 기다린다.
* Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다(예를 들어 여러 태스크가 다양한 방식으로 같은 결과를 구하는 상황)
* 프로그램적으로 Future를 완료시킨다.(즉, 비동기 동작에 수동으로 결과 제공).
* Future 완료 동작에 반응한다.(즉, 결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과를 원하는 추가 동작을 수행할 수 있음).
16.1.2 CompletableFuture로 비동기 애플리케이션 만들기
애플리케이션을 만드는 동안 다음과 같은 기술을 배울 수 있다.
1) 고객에게 비동기 API를 제공하는 방법을 배운다.
2) 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법을 배운다. 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법을 살펴본다.
3) 비동기 동작의 완료에 대응하는 방법을 배운다.
동기 API와 비동기 API
동기 API : 메서드를 호출한 다음에 메서드가 계산을 완료할 때까지 기다렸다가 메서드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행한다. 동기 API를 사용하는 상황을 블록 호출이라고 한다.
비동기 API : 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이와 같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다.
16.2 비동기 API 구현
사용자가 이 API(최저가격 검색 애플리케이션)를 호출하면 비동기 동작이 완료될 때까지 1초동안 블록된다.
16.2.1 동기 메서드를 비동기 메서드로 변환
1) getPriceAsync 메서드 구현
public Future<Double> getPriceAsync(String product) {
//계산 결과를 포함할 CompletableFuture를 생성한다.
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
//다른 스레드에서 비동기적으로 계산을 수행한다.
double price = calculatePrice(product);
//오랜 시간이 걸리는 계산이 완료되면 Future에 값을 설정한다.
futurePrice.complete(price);
}).start();
//계산 결과가 완료되길 기다리지 않고 Future를 반환한다.
return futurePrice;
}
2) 비동기 API 사용
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
//상점에 제품가격 정보 요청
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime
+ " msecs");
// 제품의 가격을 계산하는 동안
doSomethingElse();
//다른 상점 검색 등 다른 작업 수행
try {
/*가격 정보가 있으면 Future에서 가격 정보를 읽고,
* 가격 정보가 없으면 가격 정보를 받을 때까지 블록한다.
* */
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
클라이언트는 반환된 Future를 이용해서 나중에 결과를 얻을 수 있다. 그 사이 클라이언트는 다른 상점에 가격 정보를 요청하는 등 첫 번째 상점의 결과를 기다리면서 대기하지 않고 다른 작업을 처리할 수 있다. 나중에 클라이언트가 특별히 할일이 없으면 Future의 get 메서드를 호출한다. 이때 Future가 결과값을 가지고 있다면 Future에 포함된 값을 읽거나 아니면 값이 계산될 때까지 블록한다.
Invocation returned after 4 msecs
Doing something else...
Price is 123.26
Price returned after 1037 msecs
16.2.2 에러 처리 방법
가격을 계산하는 동안 에러가 발생하면 해당 스레드에만 영향을 미친다. 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다. 결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수 있다.
클라이언트가 영원히 블록되지 않고 타임아웃 시간이 지나면 TimeoutException을 받을 수 있다. 하지만 이때 제품가격 계산에 왜 에러가 발생했는지 알 수 있는 방법이 없다. 따라서 completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해야 한다.
1) CompletableFuture 내부에서 발생한 에러 전파
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try{
double price = calculatePrice(product);
//계산이 정상적으로 종료되면 Future에 가격 정보를 저장한채로 Future를 종료한다.
futurePrice.complete(price);
}catch (Exception ex){
//도중에 문제가 발생하면 발생한 에러를 포함시켜 Future를 종료한다.
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(()-> calculatePrice(product));
}
16.3 비블록 코드 만들기
상점 리스트가 있다고 가정하자
private final List<Shop> shops = Arrays.asList(
new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
모든 상점에 순차적으로 정보를 요청하는 findPrices
// 모든 상점에 순차적으로 정보를 요청하는 findPrices
public List<String> findPrices(String product){
return shops.stream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
findPrices의 결과와 성능 확인
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) /1000000;
System.out.println("Done in"+duration+" msecs");
실행 결과
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in4065 msecs
16.3.1 병렬 스트림으로 요청 병렬화하기
병렬 스트림을 이용해서 순차 계산을 병렬로 처리해서 성능을 개선할 수 있다.
1) findPrices 메서드 병렬화
public static List<String> findPrices(String product){
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
새로운 버전의 findPrices 성능을 확인하자
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in1044 msecs
16.3.2 CompletableFuture로 비동기 호출 구현하기
팩토리 메서드 supplyAsync로 CompletableFuture를 만들 수 있음을 배웠다.
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
()-> String.format("%s price is %.2f",
shop.getName(),shop.getPrice(product))))
.collect(Collectors.toList());
return priceFutures;
CompletableFuture로 findPrices 구현하기
// 모든 상점에 순차적으로 정보를 요청하는 findPrices
public static List<String> findPrices(String product){
List<CompletableFuture<String>> priceFutures =
shops.stream()
//CompletableFuture로 각각의 가격을 비동기적으로 계산한다.
.map(shop -> CompletableFuture.supplyAsync(
()-> String.format("%s price is %.2f",
shop.getName(),shop.getPrice(product))))
.collect(Collectors.toList());
//모든 비동기 동작이 끝나길 기다린다.
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in1034 msecs
16.3.3 더 확장성이 좋은 해결 방법
CompletableFuture는 병렬 스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다. 따라서 Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
16.3.4 커스텀 Executor 사용하기
우리 애플리케이션이 실제로 필요한 작업량을 고려한 풀에서 관리하는 스레드 수에 맞게 Executor를 만들 수 있으면 좋을 것이다. 풀에서 관리하는 스레드 수를 어떻게 결정할 수 있을까?
* 스레드 풀 크기 조절
-> [자바 병렬 프로그래밍] 에서는 스레드 풀의 최적값을 찾는 방법을 제안한다. 스레드 풀이 너무 크면 CPU와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다. 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을 수 있다.
가격 정보를 검색하려는 상점 수만큼 스레드를 갖도록 Executor를 설정한다. 스레드 수가 너무 많으면 오히려 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는 100이하로 설정하는 것이 바람직하다.
1) 우리의 최저가격 검색 애플리케이션에 맞는 커스텀 Executor
private final Executor executor =
// 상점 수만큼의 스레드를 갖는 풀을 생성한다(스레드 수의 범위는 0과 100사이)
Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//프로그램 종료를 방해하지 않는 데몬 스레드를 사용한다.
t.setDaemon(true);
return t;
}
});
우리가 만드는 풀은 데몬 스레드를 포함한다. 자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 될 수 있다. 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다.
애플리케이션의 특성에 맞는 Executor를 만들어 CompletableFuture를 활용하는 것이 바람직하다.
16.4 비동기 작업 파이프라인 만들기
할인 서비스에서는 서로 다른 할인율을 제공하는 다섯 가지 코드를 제공한다.
1) enum으로 할인 코드 정의하기
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return format(price * (100 - code.percentage) / 100);
}
}
미리 계산된 임의의 가격과 임의의 Discount.Code를 반환할 것이다.
16.4.1 할인 서비스 구현
우리의 최저가격 검색 애플리케이션은 여러 상점에서 가격 정보를 얻어오고, 결과 문자열을 파싱하고, 할인 서버에 질의를 보낼 준비가 되었다. 할인 서버에서 할인율을 확인해서 최종가격을 계산할 수 있다. 상점
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() {
return shopName;
}
public double getPrice() {
return price;
}
public Discount.Code getDiscountCode() {
return discountCode;
}
}
상점에서 얻은 문자열을 정적 팩토리 메서드 parse로 넘겨주면 상점 이름, 할인전 가격, 할인된 가격 정보를 포함하는 Quote 클래스 인스턴스가 생성된다.
다음 코드에서 보여주는 것처럼 Discount 서비스에서는 Quote 객체를 인수로 받아 할인된 가격 문자열을 반환하는 applyDiscount 메서드도 제공한다.
public class Discount {
public enum Code {
//소스 생략
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
// 기존 가격에 할인 코드를 적용한다.
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
//Discount 서비스의 응답 지연을 흉내 낸다.
delay();
return format(price * (100 - code.percentage) / 100);
}
}
16.4.2 할인 서비스 사용
Discount는 원격 서비스이므로 다음 코드에서 보여주는 것처럼 1초의 지연을 추가한다.
1) Discount 서비스를 이용하는 가장 간단한 findPrices 구현
public static List<String> findPrices(String product){
return shops.stream()
//각 상점에서 할인 전 가격 얻기
.map(shop -> shop.getPrice(product))
//상점에서 반환한 문자열을 Quote 객체로 변환한다.
.map(Quote::parse)
//Discount 서비스를 이용해서 각 Quote에 할인을 적용한다.
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
* 첫 번째 연산에서는 각 상점을 요청한 제품의 가격과 할인 코드로 변환한다.
* 두 번째 연산에서는 이들 문자열을 파싱해서 Quote 객체를 만든다.
* 세 번째 연산에서는 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열을 반환한다.
병렬 스트림을 이용하면 성능을 쉽게 개선할 수 있다는 사실은 이미 확인했다.
CompletableFuture에서 수행하는 태스크를 설정할 수 있는 커스텀 Executor를 정의함으로써 우리의 CPU사용을 극대화할 수 있다.
16.7 마치며
* 한 개 이상의 원격 외부 서비스를 사용하는 긴 동작을 실행할 때는 비동기 방식으로 애플리케이션의 성능과 반응성을 향상시킬 수 있다.
* 우리 고객에게 비동기 API를 제공하는 것을 고려해야 한다. CompletableFuture의 기능을 이용하면 쉽게 비동기 API를 구현할 수 있다.
* CompletableFuture를 이용할 때 비동기 태스크에서 발생한 에러를 관리하고 전달할 수 있다.
* 동기 API를 CompletableFuture로 감싸서 비동기적으로 소비할 수 있다.
* 서로 독립적인 비동기 동작이든 아니면 하나의 비동기 동작이 다른 비동기 동작의 결과에 의존하는 상황이든 여러 비동기 동작을 조립하고 조합할 수 있다.
*CompletableFuture에 콜백을 등록해서 Future가 동작을 끝내고 결과를 생산했을 때 어떤 코드를 샐행하도록 지정할 수 있다.
*CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지 아니면 첫 값만 왼료되길 기다릴지 선택할 수 있다.
* 자바 9에서는 orTimeout, completeOnTimeout 메서드로 CompletableFuture에 비동기 타임아웃 기능을 추가했다.