Modern Java In Action 정리 - CompletableFuture - 안정적 비동기 프로그래밍
- 비동기 작업을 만들고 결과 얻기
- 비블록 동작으로 생산성 높이기
- 비동기 API 설계와 구현
- 동기 API를 비동기적으로 소비하기
- 두 개 이상의 비동기 연산을 파이프라인으로 만들고 합치기
- 비동기 작업 완료에 대응하기
모던 자바 인 액션 책을 보고 정리한 글입니다.
Future의 단순활용
- Java 5 부터 미래의 어느 시점에 결과를 얻는 모델에 활용할수 있도록 Future 인터페이스를 제공하고 있다.
- 시간이 걸리는 작업들을 Future 내부에 설정하여 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업들을 할 수 있다.
1 |
|
- 위 예재와 같이 시간이 오래 걸리는 작업을 다른 스레드에서 처리하고 메인 스레드에서는 다른 작업들을 미리 수행할 수 있다.
- 만약 결과가 준비되지 않았다면 작업이 완료될때까지 스레드를 블록할 수 있다. 이때 영원히 스레드가 종료되지 않는 경우를 대비하여 최대 타임아웃을 설정가능하다.
Future 제한
- 첫번째 예제에서는 Future 인터페이스가 비동기 계산이 끝났는지 확인할수 있는 isDone, 게산이 끝나길 기다리는 메서드, 결과 회수 메서드등을 제공한다. 하지만 이들만으로 동시 실행 코드를 구현하기 어렵다.
- 오래걸리는 A 계산이 끝나면 B를 실행하라 와 같은 요구사항을 쉽게 구현할 수 있어야 한다.
- Future로 이와같은 동작을 구현하는것은 쉽지 않다. 다음과 같은 선언형 기능이 있다면 유용하게 사용할 수 있을것이다.
- 두 개의 비동기 계산 결과를 하나로 합친다. 두 계산은 하나에 의존하는 상황일수도, 독립적인 계산일수도 있다.
- Future 집합이 샐행하는 모든 테스크의 완료를 기다린다.
- Future 집합에서 가장 빨리 완료되는 테스크를 기다렸다가 결과를 얻는다. (예를 들어 여러 테스크가 다양한 방식으로 같은 결과를 얻는 방법)
- 프로그램적으로 Future를 완료 시킨다. (비동기 동작에 수동으로 결과 제공)
- Future 완료 동작에 반응한다. (결과를 기다리며 블록되지 않고 결과가 준비되었다는 알림을 받은 다음 Future의 결과로 원하는 추가 동작을 수행가능)
- 선언형으로 Future를 사용할 수 있는 CompletableFuture 클래스에 대해 알아본다.
- Stream과 비슷하게 람다표현식과 파이프라이닝을 활용, 따라서 Future와 CompletableFuture는 Collection과 Stream의 관계에 비유가능하다.
CompletableFuture로 비동기 애플리케이션 만들기
- 첫쨰. 고객에게 비동기 API를 제공하는 방법을 배운다.
- 둘쨰. 동기 API를 사용해야할 때 코드를 비블록으로 만드는 방법을 배운다. 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법.
- 셋째. 비동기 동작의 완료에 대응하는 방법을 배운다.
비동기 API 구현
1 |
|
- getPrice는 외부 요청과 같이 시간이 오래걸리는 작업을 수행 할 수 있으므로 임의로 1초 동안 sleep 메소드를 호출하였다.
- 위 API를 사용자가 호출하는 경우 비동기 동작이 완료될때까지 1초 동안 블록된다.
- 동기 메소드를 비동기 메소드로 소비하는 방법을 알아보자.
동기 메서드를 비동기 메서드로 변환
- 동기 메서드 getPrice를 비동기 메서드로 변환하려면 이름과 반환값을 변경해야 한다.
1 |
|
- Future는 결과값의 핸들일 뿐 계산이 완료되면 get 메서들르 통해 결과를 얻을 수 있다. 다시말해 getPriceAnsync 메서드는 즉시 반환되고 호출자 스레드는 다른 작업을 수행할 수 있다.
1 |
|
- 상점은 비동기 API를 제공함으로 즉시 Future를 반환한다. 클라이언트는 전달 받은 Future를 이용하여 적절한 시점에 결과를 얻을 수 있고, 그 동안 다른 작업을 수행 가능하다.
- Future의 get 메서드 호출시 결과값을 가지고 있다면 곧바로 값을 읽지만 그렇지 않으면 계산이 완료될때 까지 블록한다.
에러 처리 방법
- 가격을 계산하는 동안 에러가 발생하는 경우 해당 쓰레드에만 영향을 미치게 되어 전체적인 결과값이 잘못되거나 클라이언트에서 응답을 무한히 기다리게 될 수 있다.
- 클라이언트는 타임아웃 값을 받는 get 메서드를 활용하여 문제를 해결할 수있다. 하지만 일반적으로는 어떠한 이유로 TimeoutException이 발생한지는 알수 없을것이다.
- CompleteExceptionally 메서드를 이용하여 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해보자.
1 |
|
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
1 |
|
비블록 코드 만들기
1 |
|
- 위와 같은 예제를 실행한다면 shops의 수 만큼 price를 순차적으로 계산하게 되어 많은 시간이 소요될 것이다.
병렬 스트림으로 요청 병렬화 하기
1 |
|
- 병렬 스트림을 이용하여 동시에 가격을 계산할 수 있도록 개선하면 시간이 단축된다.
- 다음은 CompletableFuture를 활용하여 findPrices 메서드의 동기 호출을 비동기 호출로 변경해보자.
CompletableFuture로 비동기 호출 구현하기
1 |
|
- CompletableFuture 리스트를 만든 후 계산이 끝난 결과를 추출하는 예시이다. CompletableFuture의 join메서드를 활용했는데 Future의 get 메서드와 같은 의미를 갖는다.
- 다른점으로는 join 메서드는 예외를 발생시키지 않는다는 점이다.
- 두 map연산을 하나의 스트림 파이프라인으로 처리하지 않고 분리했다는것에 주목하자. 스트림 연산은 Lazy한 방식으로 동작하기 때문에 하나의 파이프 라인으로 처리했다면 순차적으로 실행되게 된다.
확장성이 더 좋은 해결 방법
- 병렬 스트림 버전의 코드는 지정한 숫자의 상점에 하나의 스레드를 할당하여 네 개의 작업을 병렬로 처리하였다. 상점이 추가되는 경우는 어떻게 될까 ? (스레드 갯수는 4개가 최대라고 제한)
- 순차 실행인 경우는 5초 이상, 병렬 스트림, CompletableFuture를 사용한 경우 2초 이상이 소모 될것이다.
- 병렬 스트림과 CompletableFuture는 비슷한 결과를 보이지만 CompletableFuture는 병렬 스트림에 비해 작업에 사용할 다양한 Executor를 지정할 수 있다.
커스텀 Executor 사용하기
- 애플리케이션에서 사용하는 자원을 고려하여 풀에서 관리하는 최적의 스레드 수에 맞게 Executor를 만드는 것이 효율적일것이다.
- 스레드가 너무 많을수록 사용하지 않는 스레드가 많아지고 서버가 크래시 날 수 있으므로 적정한 갯수를 지정하는것이 중요하다.
1 |
|
비동기 작업 파이프라인 만들기
- 상점이 하나의 할인 서비스를 사용한다고 가정, 다음과 같이 구현할 수 있다.
1 |
|
할인 서비스 구현
- 할인 정보는 언제든 변경될 수 있으므로 매번 서버에서 받아오는걸로 가정
1 |
|
할인 서비스 사용
- Discount 서비스는 원격 서비스이므로 1초의 지연을 추가.
1 |
|
- 세게의 map 연산을 사용하여 상점 스트림에 파이프라인으로 원하는 결과를 얻었지만 성능 최적화와는 거리가 먼 코드이다.
- 병렬 스트림으로 개선한다면 성능이 좋아지겠지만 스레드 풀의 크기가 고정되어 있다는 단점으로 인해 CompletableFuture에서 수행하는 커스텀 Executor를 정의해보자.
동기 작업과 비동기 작업 조합하기
1 |
|
- 가격 정보 얻기
- 첫번째 연산에서 supplyAsync에 닮다 표현식을 전달하여 비동기적으로 상점에서 정보를 조회 했다. 첫번째 반환 결과는 Stream<CompletableFuture
이다.
- 첫번째 연산에서 supplyAsync에 닮다 표현식을 전달하여 비동기적으로 상점에서 정보를 조회 했다. 첫번째 반환 결과는 Stream<CompletableFuture
- Quote 파싱
- 두번째 변환 과정은 문자열을 파싱하기 위한 Quote 객체를 만든다. 이 과정은 원격 서비스를 이용하지 않기 때문에 thenApply를 활용하여 즉시 지연없이 실행한다.
- thenApply 메서드는 CompletableFuture가 끝날때가지 블록하지 않는다는 점을 주의한다. 즉 CompletableFuture가 동작을 완전히 완료한 다음 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다.
- 따라서 반환 결과값은 Stream<CompletableFuture
이다.
- CompletableFuture를 조합하여 할인된 가격 계산
- 세번째 연산에서는 Discount 서비스를 활용하여 할인된 가격을 계산해야 한다.
- 람다표현식으로 이 동작을 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환 되어 결과적으로 두가지 CompletableFuture로 이루어진 비동기 동작을 만들수 있다.
- 상점에서 가격정보를 얻어와 Quote로 변환
- 변환된 Quote를 Discount서비스로 전달하여 할인된 최종 가격 획득
- thenCompose 메서드는 첫 번째 연산의 결과를 두번째 연산으로 전달한다. 즉 첫 번째 CompletableFuture에 thenCompose를 호출하고, Function을 넘겨주는 식으로 두 CompletableFuture를 조합할 수 있다.
- Function은 첫 번째 CompletableFuture 반환 결과를 인수로 받고 두 번째 CompletableFuture를 반환하는데 두번째 CompletableFuture는 첫 번째 CompletableFuture의 결과를 계산의 입력으로 사용한다.
독립 CompletableFuture와 비독립 CompletableFuture 합치기
- 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 경우를 살펴보자, 첫 번째 CompletableFuture의 완료와는 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
- thenCombine 메서드를 사용하여 해결할 수 있다. thenCombine 메서드는 BiFunction을 두 번째 인수로 받는다.
- thenCompose와 마찬가지로 thenCombine 메서드에도 Async 버전이 존재. thenCombineAsync 메서드에서는 BiFunction이 정의하는 동작이 스레드 풀로 제출될 경우 별도의 태스크에서 비동기적으로 수행된다.
1 |
|
- 무한정 대기하는것을 막기 위해 TimeOut을 설정할 수 있다. 또한 completeOnTimeOut 메서드를 사용한다면 특정 시간안에 응답이 오지 않는 경우 기본값을 전달할 수있다. completeOnTimeOut의 반환값은 CompletableFuture이므로 파이프라인의 실행은 유지될 것이다.