모던 자바 인 액션 - 16장 CompletableFuture - 안정적 비동기 프로그래밍

Modern Java In Action 정리 - CompletableFuture - 안정적 비동기 프로그래밍

  • 비동기 작업을 만들고 결과 얻기
  • 비블록 동작으로 생산성 높이기
  • 비동기 API 설계와 구현
  • 동기 API를 비동기적으로 소비하기
  • 두 개 이상의 비동기 연산을 파이프라인으로 만들고 합치기
  • 비동기 작업 완료에 대응하기

모던 자바 인 액션 책을 보고 정리한 글입니다.

Future의 단순활용

  • Java 5 부터 미래의 어느 시점에 결과를 얻는 모델에 활용할수 있도록 Future 인터페이스를 제공하고 있다.
  • 시간이 걸리는 작업들을 Future 내부에 설정하여 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업들을 할 수 있다.
1
2
3
4
5
6
7
8
ExecutorService es =ex.newCachedThreadPool();
Future<Double> future = ex.submit(new Callable<Double>() {
  public Double call() {
    return doSomeLongComputation();
  }
})
doSomeThingElse();
future.get(1, TimeUnit.SECONDS); // 비동기 결과가 준비되어 있지 않으면 1초간 기다린다. 
  • 위 예재와 같이 시간이 오래 걸리는 작업을 다른 스레드에서 처리하고 메인 스레드에서는 다른 작업들을 미리 수행할 수 있다.
  • 만약 결과가 준비되지 않았다면 작업이 완료될때까지 스레드를 블록할 수 있다. 이때 영원히 스레드가 종료되지 않는 경우를 대비하여 최대 타임아웃을 설정가능하다.

Future 제한

  • 첫번째 예제에서는 Future 인터페이스가 비동기 계산이 끝났는지 확인할수 있는 isDone, 게산이 끝나길 기다리는 메서드, 결과 회수 메서드등을 제공한다. 하지만 이들만으로 동시 실행 코드를 구현하기 어렵다.
  • 오래걸리는 A 계산이 끝나면 B를 실행하라 와 같은 요구사항을 쉽게 구현할 수 있어야 한다.
  • Future로 이와같은 동작을 구현하는것은 쉽지 않다. 다음과 같은 선언형 기능이 있다면 유용하게 사용할 수 있을것이다.
    • 두 개의 비동기 계산 결과를 하나로 합친다. 두 계산은 하나에 의존하는 상황일수도, 독립적인 계산일수도 있다.
    • Future 집합이 샐행하는 모든 테스크의 완료를 기다린다.
    • Future 집합에서 가장 빨리 완료되는 테스크를 기다렸다가 결과를 얻는다. (예를 들어 여러 테스크가 다양한 방식으로 같은 결과를 얻는 방법)
    • 프로그램적으로 Future를 완료 시킨다. (비동기 동작에 수동으로 결과 제공)
    • Future 완료 동작에 반응한다. (결과를 기다리며 블록되지 않고 결과가 준비되었다는 알림을 받은 다음 Future의 결과로 원하는 추가 동작을 수행가능)
  • 선언형으로 Future를 사용할 수 있는 CompletableFuture 클래스에 대해 알아본다.
  • Stream과 비슷하게 람다표현식과 파이프라이닝을 활용, 따라서 Future와 CompletableFuture는 Collection과 Stream의 관계에 비유가능하다.

CompletableFuture로 비동기 애플리케이션 만들기

  • 첫쨰. 고객에게 비동기 API를 제공하는 방법을 배운다.
  • 둘쨰. 동기 API를 사용해야할 때 코드를 비블록으로 만드는 방법을 배운다. 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법.
  • 셋째. 비동기 동작의 완료에 대응하는 방법을 배운다.

비동기 API 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Shop {
  public double getPrice(String product) {
      return calculatePrice(product);
  }

  private double calculatePrice(String product) {
      delay();
      return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
  }
  
  public static void delay() {
      try {
          Thread.sleep(1000L);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}
  • getPrice는 외부 요청과 같이 시간이 오래걸리는 작업을 수행 할 수 있으므로 임의로 1초 동안 sleep 메소드를 호출하였다.
  • 위 API를 사용자가 호출하는 경우 비동기 동작이 완료될때까지 1초 동안 블록된다.
  • 동기 메소드를 비동기 메소드로 소비하는 방법을 알아보자.

동기 메서드를 비동기 메서드로 변환

  • 동기 메서드 getPrice를 비동기 메서드로 변환하려면 이름과 반환값을 변경해야 한다.
1
public Future<Double> getPriceAnsync(String product)
  • Future는 결과값의 핸들일 뿐 계산이 완료되면 get 메서들르 통해 결과를 얻을 수 있다. 다시말해 getPriceAnsync 메서드는 즉시 반환되고 호출자 스레드는 다른 작업을 수행할 수 있다.
1
2
3
4
5
6
7
8
9
10
public Future<Double> getPriceAnsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    
    new Thread(() -> {
        double price = calculatePrice(product); // 다른 스레드에서 비동기로 계산 
        futurePrice.complete(price); // 결과값 전달
    }).start();
    
    return futurePrice;
}
  • 상점은 비동기 API를 제공함으로 즉시 Future를 반환한다. 클라이언트는 전달 받은 Future를 이용하여 적절한 시점에 결과를 얻을 수 있고, 그 동안 다른 작업을 수행 가능하다.
  • Future의 get 메서드 호출시 결과값을 가지고 있다면 곧바로 값을 읽지만 그렇지 않으면 계산이 완료될때 까지 블록한다.

에러 처리 방법

  • 가격을 계산하는 동안 에러가 발생하는 경우 해당 쓰레드에만 영향을 미치게 되어 전체적인 결과값이 잘못되거나 클라이언트에서 응답을 무한히 기다리게 될 수 있다.
  • 클라이언트는 타임아웃 값을 받는 get 메서드를 활용하여 문제를 해결할 수있다. 하지만 일반적으로는 어떠한 이유로 TimeoutException이 발생한지는 알수 없을것이다.
  • CompleteExceptionally 메서드를 이용하여 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Future<Double> getPriceAnsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();

    new Thread(() -> {
        try {
            double price = calculatePrice(product); // 다른 스레드에서 비동기로 계산
            futurePrice.complete(price); // 결과값 전달
        } catch (Exception ex) {
            futurePrice.completeExceptionally(ex); // 문제가 발생하는 경우 에러를 포함시켜 Future를 종료
        }
    }).start();

    return futurePrice;
}

팩토리 메서드 supplyAsync로 CompletableFuture 만들기

1
2
3
public Future<Double> getPriceAnsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

비블록 코드 만들기

1
2
3
4
5
6
7
8
9
10
private List<Shop> shops = Arrays.asList(new Shop("shop1"), 
            new Shop("shop2"), 
            new Shop("shop3"),
            new Shop("shop4"));

public List<String> findPrices(String product) {
    return shops.stream()
            .map(shop -> String.format("%s is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}
  • 위와 같은 예제를 실행한다면 shops의 수 만큼 price를 순차적으로 계산하게 되어 많은 시간이 소요될 것이다.

병렬 스트림으로 요청 병렬화 하기

1
2
3
4
5
public List<String> findPrices(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}
  • 병렬 스트림을 이용하여 동시에 가격을 계산할 수 있도록 개선하면 시간이 단축된다.
  • 다음은 CompletableFuture를 활용하여 findPrices 메서드의 동기 호출을 비동기 호출로 변경해보자.

CompletableFuture로 비동기 호출 구현하기

1
2
3
4
5
6
7
8
9
10
11
// 팩토리 메서드를 활용
public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s is %.2f", shop.getName(), shop.getPrice(product))))
            .collect(Collectors.toList());


    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}
  • CompletableFuture 리스트를 만든 후 계산이 끝난 결과를 추출하는 예시이다. CompletableFuture의 join메서드를 활용했는데 Future의 get 메서드와 같은 의미를 갖는다.
  • 다른점으로는 join 메서드는 예외를 발생시키지 않는다는 점이다.
  • 두 map연산을 하나의 스트림 파이프라인으로 처리하지 않고 분리했다는것에 주목하자. 스트림 연산은 Lazy한 방식으로 동작하기 때문에 하나의 파이프 라인으로 처리했다면 순차적으로 실행되게 된다.

확장성이 더 좋은 해결 방법

  • 병렬 스트림 버전의 코드는 지정한 숫자의 상점에 하나의 스레드를 할당하여 네 개의 작업을 병렬로 처리하였다. 상점이 추가되는 경우는 어떻게 될까 ? (스레드 갯수는 4개가 최대라고 제한)
  • 순차 실행인 경우는 5초 이상, 병렬 스트림, CompletableFuture를 사용한 경우 2초 이상이 소모 될것이다.
  • 병렬 스트림과 CompletableFuture는 비슷한 결과를 보이지만 CompletableFuture는 병렬 스트림에 비해 작업에 사용할 다양한 Executor를 지정할 수 있다.

커스텀 Executor 사용하기

  • 애플리케이션에서 사용하는 자원을 고려하여 풀에서 관리하는 최적의 스레드 수에 맞게 Executor를 만드는 것이 효율적일것이다.
  • 스레드가 너무 많을수록 사용하지 않는 스레드가 많아지고 서버가 크래시 날 수 있으므로 적정한 갯수를 지정하는것이 중요하다.
1
2
3
4
5
6
7
8
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true); // 프로그램 종료를 방해하지 않는 데몬 스레드를 사용.
        return t;
    }
});

비동기 작업 파이프라인 만들기

  • 상점이 하나의 할인 서비스를 사용한다고 가정, 다음과 같이 구현할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Discount {
  public enum Code {
      NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
      
      private final int percentage;

      Code(int percentage) {
          this.percentage = percentage;
      }
  }
  ...
}

public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code value = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
    return String.format("$s %.2f $s", name, price, value);
}

할인 서비스 구현

  • 할인 정보는 언제든 변경될 수 있으므로 매번 서버에서 받아오는걸로 가정
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Discount {
  public enum Code {
  }

  public static String applyDiscount(Quote quote) {
    return quote.getShopName() + "price is " + Discount.apply(quote,getPrice, quote.getDiscountCoe());
  }

  private static double apply(double price, Code code) {
    delay();
    return format(price * (100 - code.percentage) / 100);
  }
}

할인 서비스 사용

  • Discount 서비스는 원격 서비스이므로 1초의 지연을 추가.
1
2
3
4
5
6
7
public List<String> findPrices(String product) {
  return shops.stream()
              .map(shop -> shop.getPrice(product))
              .map(Quote::parse)
              .map(Discount::applyDiscount)
              .collect(toList());
}
  • 세게의 map 연산을 사용하여 상점 스트림에 파이프라인으로 원하는 결과를 얻었지만 성능 최적화와는 거리가 먼 코드이다.
  • 병렬 스트림으로 개선한다면 성능이 좋아지겠지만 스레드 풀의 크기가 고정되어 있다는 단점으로 인해 CompletableFuture에서 수행하는 커스텀 Executor를 정의해보자.

동기 작업과 비동기 작업 조합하기

1
2
3
4
5
6
7
8
9
10
11
12
public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
          .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
          .map(future -> future.thenApply(Quote::parse))
          .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discoount.applyDiscount(qoute), executor)))
          .collect(toList());

  return priceFutures.stream()
                      .map(CompletableFuture::join)
                      .collect(toList());
}
  • 가격 정보 얻기
    • 첫번째 연산에서 supplyAsync에 닮다 표현식을 전달하여 비동기적으로 상점에서 정보를 조회 했다. 첫번째 반환 결과는 Stream<CompletableFuture 이다.
  • Quote 파싱
    • 두번째 변환 과정은 문자열을 파싱하기 위한 Quote 객체를 만든다. 이 과정은 원격 서비스를 이용하지 않기 때문에 thenApply를 활용하여 즉시 지연없이 실행한다.
    • thenApply 메서드는 CompletableFuture가 끝날때가지 블록하지 않는다는 점을 주의한다. 즉 CompletableFuture가 동작을 완전히 완료한 다음 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다.
    • 따라서 반환 결과값은 Stream<CompletableFuture 이다.
  • CompletableFuture를 조합하여 할인된 가격 계산
    • 세번째 연산에서는 Discount 서비스를 활용하여 할인된 가격을 계산해야 한다.
    • 람다표현식으로 이 동작을 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환 되어 결과적으로 두가지 CompletableFuture로 이루어진 비동기 동작을 만들수 있다.
      • 상점에서 가격정보를 얻어와 Quote로 변환
      • 변환된 Quote를 Discount서비스로 전달하여 할인된 최종 가격 획득
    • thenCompose 메서드는 첫 번째 연산의 결과를 두번째 연산으로 전달한다. 즉 첫 번째 CompletableFuture에 thenCompose를 호출하고, Function을 넘겨주는 식으로 두 CompletableFuture를 조합할 수 있다.
    • Function은 첫 번째 CompletableFuture 반환 결과를 인수로 받고 두 번째 CompletableFuture를 반환하는데 두번째 CompletableFuture는 첫 번째 CompletableFuture의 결과를 계산의 입력으로 사용한다.

독립 CompletableFuture와 비독립 CompletableFuture 합치기

  • 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 경우를 살펴보자, 첫 번째 CompletableFuture의 완료와는 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
  • thenCombine 메서드를 사용하여 해결할 수 있다. thenCombine 메서드는 BiFunction을 두 번째 인수로 받는다.
  • thenCompose와 마찬가지로 thenCombine 메서드에도 Async 버전이 존재. thenCombineAsync 메서드에서는 BiFunction이 정의하는 동작이 스레드 풀로 제출될 경우 별도의 태스크에서 비동기적으로 수행된다.
1
2
3
4
Future<Double> futurePriceInUSD = 
  CompletableFuture.supplyAsync(() -> shop.getPrice(product)) // 제품가격을 요청하는 첫번째 테스크 생성.
                   .thenCombie(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate) // USD, EUR의 환율 정보를 요청하는 독립적인 두 번째 테스크 생성.
                   .orTimeOut(3, TimeUnit.SECONDS); // 3초 뒤 작업이 완료되지 않으면 TimeoutException을 발생시키도록 설정
  • 무한정 대기하는것을 막기 위해 TimeOut을 설정할 수 있다. 또한 completeOnTimeOut 메서드를 사용한다면 특정 시간안에 응답이 오지 않는 경우 기본값을 전달할 수있다. completeOnTimeOut의 반환값은 CompletableFuture이므로 파이프라인의 실행은 유지될 것이다.