Modern Java In Action 정리 - CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
- Thread, Future, 자바가 풍부한 동시성 API를 제공하도록 공요하는 진화의 힘
- 비동기 API
- 동시 컴퓨팅의 박스와 채널 뷰
- CompletableFuture 콤비네이터로 박스를 동적으로 연결
- 리액티브 프로그래밍용 자바 9 플로 API의 기초를 이루는 발행 구독 프로토콜
- 리액티브 프로그래밍과 리액티브 시스템
모던 자바 인 액션 책을 보고 정리한 글입니다.
동시성을 구현하는 자비 지원의 진화
- 처음 자바는 Runnable과 Thread를 동기화된 클래스와 메서드를 이용하여 잠궜다, 그 후 조금더 진화된 ExecutorService 인터페이스, 높은 수준의 결과 즉 Runnable, Thread의 변형을 반환하는 Callable
and Future , 제네릭 등을 지원했다. - ExecutorServices는 Runnable과 Callable 둘 다 실행 가능하다. 이런 기능들이 추가 됨에 따라 멀티코어 CPU에서 쉽게 병렬 프로그래밍을 구현할 수 있었다.
- 자바는 Future를 조합하는 기능을 추가하며 동시성을 강화(CompletableFuture)했고, 자바 9에서는 분산 비동기 프로그래밍을 명시적으로 지원.
스레드와 높은 수준의 추상화
- 일반적으로 프로세스는 운영체제에 한개 이상의 스레드 즉, 본인이 가진 프로세스와 같은 주소 공간을 공유하는 프로세스를 요청함으로 테스트클 동시에 또는 협력적으로 실행할 수 있다.
- 이전에 스트림을 활용하여 외부 반복(명시적 루프) 대신 내부 반복을 통해 병렬 프로그래밍을 하는 방법을 배웠다. 결론적으로 병렬 스트림의 반복은 명시적인 스레드를 사용하는 것에 비해 높은 수준의 개념이라는것을 알 수 있다.
- 정리 하자면 스트림을 활용하여 스레드 사용패턴을 추상화 할 수 있다.
Executor와 스레드 풀
- Executor 프레임 워크와 스레드 풀을 통해 스레드를 테스크 제출과 실행을 분리 할 수 있도록 구현할 수 있다.
- 일반적으로 스레드를 생성하는 비용이 크기 때문에 일정 갯수의 스레드를 스레드 풀에 미리 생성 및 등록하여 사용한다.
- 자바 ExecutorService는 테스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공, newFixedThreadPool과 같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 생성할 수 있다.
- newFixedThreadPool는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다.
- 프로그래머는 테스크(Runnable, Callable)를 제공하면 스레드가 이를 실행한다.
- 스레드 풀 그리고 스레드 풀이 나쁜 이유
- 대부분의 관점에서 스레드를 직접 사용하는것 보단 스레드 풀을 이용하는것이 바람직하지만 두가지 사항을 주의해야 한다.
- k 스레드를 가진 스레드 풀은 오직 k 만큼의 스레드를 동시에 실행 가능하다.
- 초과로 제출된 테스크는 큐에 저장되어 이전 테스크가 종료되기 전까지 대기하게 된다.
- 일반적인 상황에서는 불필요한 스레드를 생성하지 않기 때문에 문제되지 않지만 실행중인 테스크에서 Sleep이 걸리거나 I/O를 기다리게 되면 성능이 급혁하게 저하된다.
- 만약 5 개의 스레드를 갖는 스레드 풀에 20개의 테스크가 할당되고 3개의 스레드가 I/O를 기다리게 되면 결론적으로 2개의 스레드가 15개의 테스크를 처리해야 한다.
- 처음 제출한 테스크가 나중의 테스크의 제출을 기다리를 상황에 빠진다면 데드락에 걸릴수도 있다.
- k 스레드를 가진 스레드 풀은 오직 k 만큼의 스레드를 동시에 실행 가능하다.
- 프로그램을 종료하기전 모든 스레드 풀을 종료하는 습관을 가져야한다.
- 풀의 워커 스레드가 만들어진 다음 다른 테스크의 제출을 기다리며 종료되지 않은 상태로 남을수도 있다.
- 보통 장기간 실행하는 인터넷 서비스를 관리하며 오래 실행되는 ExecutorService를 갖는건 흔한일이며 자바는 이런상황을 다루도록 Thread.setDaemon 메서드를 지원한다.
스레드의 다른 추상화 : 중첩되지 않은 메서드 호출
- 테스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다리는 방식 즉 스레드 생성과 join이 한쌍처럼 중첩된 메서드 호출내에 추가되는 것을 엄격한 포크 / 조인 이라 부른다.
- 시작된 테스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 조금더 여유로운 형태의 포크 / 조인을 사용해도 안전하다.
- 비동기 메서드를 활용한 구현시 어떤 위험성이 따르는지 확인하자.
- 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행, 데이터 경쟁 문제를 일으키지 않는지 주의.
- 기존 실행중이던 스레드가 종료되지 않은 상황에서 자바의 main 메서드가 반환되면 어떻게 될까 ?
- 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때 까지 기다린다.
- 애플리케이션 종료를 방해하는 스레드를 강제종료시키고 애플리케이션을 종료한다.
- 첫번째 방법은 종료를 못한 스레드 때문에 애플리케이션이 크래시 날 수 있다. 이러한 문제를 피하려면 애플리케이션에서 만든 모든 스레드를 추적하고 애플리케이션을 종료하기 전 스레드 풀을 포함한 모든 스레드를 종료하는것이 좋다.
- 자바 스레드는 데몬 또는 비 데몬으로 구분할 수 있다. 데몬 스레드는 애플리케이션이 종료될떄 강제 종료되므로 데이터 일관성을 파괴하지 않는 동작을 수행할때 유용하게 활용할 수 있다. 반면 main 메서드는 모든 비데몬 스레드가 종료될때 까지 프로그램을 종료하지 않고 기다린다.
동기 API와 비동기 API
- 자바 8 스트림을 활용하여 병렬 하드웨어를 이용할 수 있다. 첫번째로 외부반복(명시적 for 루프)을 내부 반복(스트림 메서드 사용)으로 바꿔야 한다. 그리고 스트림에 parallel 메서드를 이용하므로 자바 런타임 라이브러리가 복잡한 스레드 작업을 하지 않고 병렬로 요소가 처리되도록 할 수 있다.
- 푸르가 실행될 때 추측에 의존해야 하는 프로그래머와 달리 런타임 시스템은 사용할 수 있는 스레드를 더 잘 알고 있는것이 핵심이다.
1 |
|
- 위 메서드가 병렬적으로 실행되는 예제를 살펴보자.
1 |
|
- Runnable 대신 Future API 인터페이스를 활용할 수 도 있다. 이미 ExecutorService로 스레드 풀을 설정했다는 가정하에 다음처럼 구현이 가능하다.
1 |
|
- 여전히 이 코드도 명시적인 submit 메서드 호출같은 불필요한 코드로 오염되었다.
- 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용하여 내부 반복으로 바꾼것 처럼 비슷한 방법으로 이 문제를 해결해야 한다.
- 문제의 해결은 비동기 API라는 기능으로 API를 바꿔서 해결할 수 있다.
- 첫번쨰 방법인 Future를 활용하면 문제를 조금 개선할 수 있다.
- 두번째 방법은 Pub - Sub 기반 Flow 인터페이스를 활용 하여 구현할 수 있다.
Future 형식 API
- 대안을 이용하면 이전 f, g 시그니처가 아래 처럼 변경되고 사용할 수 있다.
1 |
|
- 메서드 f는 호출 즉시 자신의 원래 바디를 평가하는 테스크를 포함하는 Futrure를 반환
- get 메서드를 이용하여 두 Future가 완료되어 결과가 합쳐지기를 기다린다.
리액티브 형식 API
- 두 번째 대안에서 핵심은 f, g의 시그니처를 콜백 형식의 프로그래밍을 이용하는 것이다.
1 |
|
- 위처럼 람다를 기용한 계산은 합계를 정확하게 알 수 없다. 어떤 함수가 먼저 계산될지 알수 없기 때문이다.
- if 조건문을 이용해 적절하게 락을 걸어 두 콜백이 모두 호출되었는지 판단한다.
- 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 사용하는것이 적절하다.
- 두 대안 모두 코드를 복잡하게 만들며 어떤 API를 사용할지 결정이 필요하다.
- API는 명시적으로 스레드를 처리하는 코드에 비해 사용코드를 더 단순하게 만들고, 높은 수준의 구조를 유지할 수 있게 도와준다.
Sleep(또는 blocking)은 해롭다.
- sleep 메서드를 호출하더라도 여전히 시스템은 자원을 점유하고 있다. 스레드가 적을떄는 문제되지 않지만 스레드가 많아지고 대부분이 sleep에 빠져든다면 문제가 심각해질것이다.
- 스레드 풀에서 sleep 테스트는 다른 테스크의 할당을 막는것을 기억하자.
- blocking 영역엥서의 sleep 도 문제가 심각하다. 블록 동작은 다른 테스크가 동작을 완료하기를 기다리는 동작과 외부 상호작용을 기다리는 동작 두 가지로 구분가능하다.
- 이런 상황에서 테스크가 기다리는 일을 만들지 않거나 앞 뒤 동작을 구분하여 블록되지 않을경우만 실행할수 있도록 요청 가능하다.
1 |
|
- 아래 코드가 더 좋은 이유는 모두 같은 동작을 수행하나 위 코드는 Sleep 시간 동안 스레드의 자원을 점유하지만 아래 코드는 다른 작업이 실행될수 있도록 허용하는것이다.
비동기 API에서는 예외를 어떻게 처리하는가 ?
- Future를 구현한 CompletableFuture에서는 런타임 get 메서드에 예외를 처리할 수 있는 기능을 제공, exceptionally와 같은 메서드도 제공된다.
- 리액티브 형식의 비동기 API에서는 return 대신 콜백이 호출되므로 에러시 호출할 콜백 함수를 전달해주면 된다.
- 콜백이 여러개 인경우 한 객체로 메서드를 감싸는 것이 좋다. Java 9 Flow API에서는 여러 콜백을 한 객체(네 개의 콜백을 감싸는 Subscriber
)로 감싼다.
박스와 채널 모델
- 동시성 모델을 가장 잘 설계하고 개념화 하는 기법을 박스와 채널 모델이라고 부른다.
- 이전 에제 f(x) + g(x)의 계산을 일반화하면 f나 g를 호출하거나 p 함수에 인수 x를 이용해 호출하고 그 결과를 q1과 q2에 전달하며 두 호출의 결과를 r을 호출한다.
1 |
|
- 위 코드는 q1과 q2를 차례로 호출하여 병렬성 활용과는 거리가 멀다.
1 |
|
- Future를 활용하여 f, g를 병렬로 호출하였다.
- 이 예제에서는 p, r은 다른 작업보다 먼저 선행되고 마지막에 실행되어야 하기 때문에 Future로 감싸지 않았다. 하지만 병렬성을 극대화 하려면 모든 작업을 Future로 감싸야 한다.
CompletableFuture와 콤비네이터를 이용한 동시성
- Java 8에서는 Future 인터페이스의 구현인 CompletableFuture를 이용하여 Future를 조합할 수 있는 기능을 추가했다.
- 일반적으로 Future는 실행하여 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들수 있고 complete() 메서드를 활용하여 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 결과를 얻을수 있도록 허용한다.
1 |
|
- 위 코드는 f(X)의 실행이 끝나지 않으면 get()을 기다려야 하기 때문에 프로세싱 자원을 낭비할 수 있다. 이러한 문제점을 CompletableFuture를 사용하여 해결할 수 있다.
- 동작을 조합하는 방식은 다른 언어에서는 많이 사용 되지만 자바에서는 자바 8의 람다가 추가되며 겨우 걸음마를 땐 수준이다.
- stream 연산을 조합하는것과 compose, andThen과 같은 메서드를 두 Function에 이용해 다른 Function을 얻을 수 있다.
- CompletableFuture의 thenCombine 메서드를 활용하여 두 연산결과를 효과적으로 더할수 있다.
1 |
|
- 이 메서드는 두 개의 CompletableFuture 값 (T, U 결과 형식)을 받아 한 개의 새 값을 만든다. 처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블로하지 않은 상태로 결과 Future를 반환한다.
1 |
|
- Future a와 Future b의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을 때 스레드 풀에서 실행된 연산을 만든다.
- 결과를 추가하는 세번째 연산 c는 다른 두작업이 끝나기 전까지 스레드에서 실행되지 않는다.
발행 - 구독 그리고 리액티브 프로그래밍
- Future와 CompletableFuture는 독린적 실행과 병렬성을 가지고 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 정리하자면 Future는 한번만 실행해 결과를 제공한다.
- 반명 리액티브 프로그래밍은 시간이 흐르며 여러 Future 같은 객체를 통해 여러 결과를 제공한다.
- Java 9 에서는 java.util.concurrent.Flow의 인터페이스에 발행 - 구독 모델(Pub - Sub)을 적용하여 리액티브 프로그래밍을 제공한다.
- 구독자가 구독할 수 있는 발행자
- 이 연결을 구독(subscription)이라고 한다.
- 이 연결을 이용해 메시지(또는 이벤트)를 전송한다.
두 플로를 합치는 예제
- 두 정보 소스로 부터 발생하는 이벤트를 합쳐 다른 구독자가 볼 수 있도록 발행하는 예를 통해 pub - sub의 특징을 간단히 확인할 수 있다.
- C1 + C2라는 공식을 포함하는 C3를 만들자. C1이나 C2의 값이 변경되면 C3에도 새로운 값이 반영되어야 한다.
1 |
|
- c1 이나 c2의 값이 변경되는 경우 어떻게 c3가 두 값을 더하도록 지정할 수 있을까 ? c1과 c2에 이벤트가 발행 했을때 c3를 구독하도록 만들어야 한다.
- 다음과 같이 Publisher 인터페이스를 활용하여 구현할 수 있다.
1 |
|
- 이 인터페이스는 통신할 구독자를 인수로 받는다. Subscriber 인터페이스는 onNext라는 정보를 전달할 단순 메서드를 포함하며 구현자가 필요한 대로 이 메서드를 구현할 수 있다.
1 |
|
- 두 개념을 합친 예시를 보자.
1 |
|
- C3 = C1 + C2는 어떻게 구현할까 ? 왼쪽 / 오른쪽 계산 결과를 저장할 수 있는 별도의 클래스가 필요하다.
1 |
|
- 데이터가 발행자에서 구독자로 흐름에 착안해 개발자는 이를 업스트림 또는 다운 스트림이라고 부른다. 위 예제에서 데이터 newValue는 업스트림 onNext()메서드로 전달되고 notifyAllSubscribers() 호출을 통해 다운 스트림 onNext() 호출로 전달된다.
- 실제로 pub-sub 구조를 적용하려면 onError나 onComplete와 같은 메서드를 통해 예외 발생, 종료등을 알수 있어야 한다.
- 만약 수많은 메시지(이벤트)가 onNext로 전달 된다면 어떤일이 일어날까 ? 이런 상황을 압력이라고 부른다.
- 이런 상황에서는 전달할 메시지를 제어할 수 있는 역압력 기법이 필요하다.
역압력
- Subscriber 객체를 어떻게 Publisher에게 전달해 발행자가 필요한 메서드를 호출할 수 있는지 알아 보았다. 이 객체는 Publisher에서 Subscriber로 정보를 전달한다.
- 정보의 흐름 속도를 역압력(흐름제어)로 제어 즉 Subscriber에서 Publisehr로 정보를 요청해야할 필요가 있을 수 있다.
- Java 9 Flow API의 subscriber 인터페이스는 onSubscribe 메서드를 포함한다.
1 |
|
- Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 호출된다.
- Subscription 객체는 다음처럼 Subscriber와 Publisher와 통신할 수 있는 메서드를 포함한다.
- 콜백을 이용한 역방향 소통효과에 주목, Pblisher는 Subscription 객체를 만들어 Subscriber로 전달시 Subscriber는 이를 이용하여 Publisher로 정보를 보낼 수 있다.
실제 역압력의 간단한 형태
- 한 번에 한개의 이벤트만 처리하도록 구성시 다음과 같은 작업이 필요하다.
- Subscriber가 OnSubscribe로 전달된 Subscription객체를 subscription 같은 필드에 로컬로 저장
- Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가, 오직 한 이벤트만 요청
- 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 수정
- 구현이 간단해 보일수 있지만 실제로 역압력을 구현시 장단점을 생각해야 한다.
- 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼것인가 ? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질것인가 ?
- 큐가 너무 커지면 어떻게 해야 할까 ?
- Subscriber가 준비되지 않으면 큐의 데이터를 폐기 할 것인가 ?
정리
- 자바의 동시성 지원은 계속 진화 해왔으며 앞으로도 진화 할 것이다. 스레드 풀은 일반적으로 유용하지만 블록되는 테스크가 많은 경우 문제가 발생한다.
- 메서드를 비동기로 만들면 병렬성을 추가할 수 있고 부수적으로 루프를 최적화 한다.
- 박스와 채널 모델을 이용해 비동기 시스템을 시각화 할 수 있다.
- CompletableFuture 클래스는 한 번의 비동기 연산을 표현한다. 콤비네이터로 비동기 연산을 조합함으로 Future를 이용할 때 발생한 블로킹 문제를 해결할 수 있다.
- Flow API는 pub - sub 구조, 역얍력을 이용하면 자바의 리액티브 프로그래밍의 기초를 제공.