모던 자바 인 액션 - 15장 CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

Modern Java In Action 정리 - CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

  • Thread, Future, 자바가 풍부한 동시성 API를 제공하도록 공요하는 진화의 힘
  • 비동기 API
  • 동시 컴퓨팅의 박스와 채널 뷰
  • CompletableFuture 콤비네이터로 박스를 동적으로 연결
  • 리액티브 프로그래밍용 자바 9 플로 API의 기초를 이루는 발행 구독 프로토콜
  • 리액티브 프로그래밍과 리액티브 시스템

모던 자바 인 액션 책을 보고 정리한 글입니다.

동시성을 구현하는 자비 지원의 진화

  • 처음 자바는 Runnable과 Thread를 동기화된 클래스와 메서드를 이용하여 잠궜다, 그 후 조금더 진화된 ExecutorService 인터페이스, 높은 수준의 결과 즉 Runnable, Thread의 변형을 반환하는 Callable and Future, 제네릭 등을 지원했다.
  • ExecutorServices는 Runnable과 Callable 둘 다 실행 가능하다. 이런 기능들이 추가 됨에 따라 멀티코어 CPU에서 쉽게 병렬 프로그래밍을 구현할 수 있었다.
  • 자바는 Future를 조합하는 기능을 추가하며 동시성을 강화(CompletableFuture)했고, 자바 9에서는 분산 비동기 프로그래밍을 명시적으로 지원.

스레드와 높은 수준의 추상화

  • 일반적으로 프로세스는 운영체제에 한개 이상의 스레드 즉, 본인이 가진 프로세스와 같은 주소 공간을 공유하는 프로세스를 요청함으로 테스트클 동시에 또는 협력적으로 실행할 수 있다.
  • 이전에 스트림을 활용하여 외부 반복(명시적 루프) 대신 내부 반복을 통해 병렬 프로그래밍을 하는 방법을 배웠다. 결론적으로 병렬 스트림의 반복은 명시적인 스레드를 사용하는 것에 비해 높은 수준의 개념이라는것을 알 수 있다.
  • 정리 하자면 스트림을 활용하여 스레드 사용패턴을 추상화 할 수 있다.

Executor와 스레드 풀

  • Executor 프레임 워크와 스레드 풀을 통해 스레드를 테스크 제출과 실행을 분리 할 수 있도록 구현할 수 있다.
  • 일반적으로 스레드를 생성하는 비용이 크기 때문에 일정 갯수의 스레드를 스레드 풀에 미리 생성 및 등록하여 사용한다.
  • 자바 ExecutorService는 테스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공, newFixedThreadPool과 같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 생성할 수 있다.
  • newFixedThreadPool는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다.
  • 프로그래머는 테스크(Runnable, Callable)를 제공하면 스레드가 이를 실행한다.
  • 스레드 풀 그리고 스레드 풀이 나쁜 이유
  • 대부분의 관점에서 스레드를 직접 사용하는것 보단 스레드 풀을 이용하는것이 바람직하지만 두가지 사항을 주의해야 한다.
    • k 스레드를 가진 스레드 풀은 오직 k 만큼의 스레드를 동시에 실행 가능하다.
      • 초과로 제출된 테스크는 큐에 저장되어 이전 테스크가 종료되기 전까지 대기하게 된다.
      • 일반적인 상황에서는 불필요한 스레드를 생성하지 않기 때문에 문제되지 않지만 실행중인 테스크에서 Sleep이 걸리거나 I/O를 기다리게 되면 성능이 급혁하게 저하된다.
      • 만약 5 개의 스레드를 갖는 스레드 풀에 20개의 테스크가 할당되고 3개의 스레드가 I/O를 기다리게 되면 결론적으로 2개의 스레드가 15개의 테스크를 처리해야 한다.
      • 처음 제출한 테스크가 나중의 테스크의 제출을 기다리를 상황에 빠진다면 데드락에 걸릴수도 있다.
  • 프로그램을 종료하기전 모든 스레드 풀을 종료하는 습관을 가져야한다.
    • 풀의 워커 스레드가 만들어진 다음 다른 테스크의 제출을 기다리며 종료되지 않은 상태로 남을수도 있다.
    • 보통 장기간 실행하는 인터넷 서비스를 관리하며 오래 실행되는 ExecutorService를 갖는건 흔한일이며 자바는 이런상황을 다루도록 Thread.setDaemon 메서드를 지원한다.

스레드의 다른 추상화 : 중첩되지 않은 메서드 호출

  • 테스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다리는 방식 즉 스레드 생성과 join이 한쌍처럼 중첩된 메서드 호출내에 추가되는 것을 엄격한 포크 / 조인 이라 부른다.
  • 시작된 테스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 조금더 여유로운 형태의 포크 / 조인을 사용해도 안전하다.
  • 비동기 메서드를 활용한 구현시 어떤 위험성이 따르는지 확인하자.
    • 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행, 데이터 경쟁 문제를 일으키지 않는지 주의.
    • 기존 실행중이던 스레드가 종료되지 않은 상황에서 자바의 main 메서드가 반환되면 어떻게 될까 ?
      • 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때 까지 기다린다.
      • 애플리케이션 종료를 방해하는 스레드를 강제종료시키고 애플리케이션을 종료한다.
    • 첫번째 방법은 종료를 못한 스레드 때문에 애플리케이션이 크래시 날 수 있다. 이러한 문제를 피하려면 애플리케이션에서 만든 모든 스레드를 추적하고 애플리케이션을 종료하기 전 스레드 풀을 포함한 모든 스레드를 종료하는것이 좋다.
    • 자바 스레드는 데몬 또는 비 데몬으로 구분할 수 있다. 데몬 스레드는 애플리케이션이 종료될떄 강제 종료되므로 데이터 일관성을 파괴하지 않는 동작을 수행할때 유용하게 활용할 수 있다. 반면 main 메서드는 모든 비데몬 스레드가 종료될때 까지 프로그램을 종료하지 않고 기다린다.

동기 API와 비동기 API

  • 자바 8 스트림을 활용하여 병렬 하드웨어를 이용할 수 있다. 첫번째로 외부반복(명시적 for 루프)을 내부 반복(스트림 메서드 사용)으로 바꿔야 한다. 그리고 스트림에 parallel 메서드를 이용하므로 자바 런타임 라이브러리가 복잡한 스레드 작업을 하지 않고 병렬로 요소가 처리되도록 할 수 있다.
  • 푸르가 실행될 때 추측에 의존해야 하는 프로그래머와 달리 런타임 시스템은 사용할 수 있는 스레드를 더 잘 알고 있는것이 핵심이다.
1
2
int f(int x);
int g(int y);
  • 위 메서드가 병렬적으로 실행되는 예제를 살펴보자.
1
2
3
4
5
6
7
8
9
main () {
  t1 = new Thread(() -> left = f(x));
  t2 = new Thread(() -> right = g(x));

  t1.start();
  t2.start();
  t1.join();
  t2.join();
}
  • Runnable 대신 Future API 인터페이스를 활용할 수 도 있다. 이미 ExecutorService로 스레드 풀을 설정했다는 가정하에 다음처럼 구현이 가능하다.
1
2
3
4
5
6
7
8
main () {
  ExecutorService executorService = Executors.newFixedThreadPool(2);

  Future<Integer> y = executorService.submit(() -> f(x));
  Future<Integer> z = executorService.submit(() -> g(x));

  y.get() + z.get();
}
  • 여전히 이 코드도 명시적인 submit 메서드 호출같은 불필요한 코드로 오염되었다.
  • 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용하여 내부 반복으로 바꾼것 처럼 비슷한 방법으로 이 문제를 해결해야 한다.
  • 문제의 해결은 비동기 API라는 기능으로 API를 바꿔서 해결할 수 있다.
    • 첫번쨰 방법인 Future를 활용하면 문제를 조금 개선할 수 있다.
    • 두번째 방법은 Pub - Sub 기반 Flow 인터페이스를 활용 하여 구현할 수 있다.

Future 형식 API

  • 대안을 이용하면 이전 f, g 시그니처가 아래 처럼 변경되고 사용할 수 있다.
1
2
3
4
5
6
Future<Integer> f(int x);
Future<Integer> g(int x);

Future<Integer> y = f(x);
Future<Integer> z = g(x);
y.get() + z.get();
  • 메서드 f는 호출 즉시 자신의 원래 바디를 평가하는 테스크를 포함하는 Futrure를 반환
  • get 메서드를 이용하여 두 Future가 완료되어 결과가 합쳐지기를 기다린다.

리액티브 형식 API

  • 두 번째 대안에서 핵심은 f, g의 시그니처를 콜백 형식의 프로그래밍을 이용하는 것이다.
1
2
3
4
5
6
7
8
9
10
11
void f(int x, IntConsumer dealWithResult);

f(x, (int y) -> {
  left = y;
  print(letf + right);
})

g(x, (int z) -> {
  rignt = z;
  print(letf + right);
})
  • 위처럼 람다를 기용한 계산은 합계를 정확하게 알 수 없다. 어떤 함수가 먼저 계산될지 알수 없기 때문이다.
    • if 조건문을 이용해 적절하게 락을 걸어 두 콜백이 모두 호출되었는지 판단한다.
    • 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 사용하는것이 적절하다.
  • 두 대안 모두 코드를 복잡하게 만들며 어떤 API를 사용할지 결정이 필요하다.
  • API는 명시적으로 스레드를 처리하는 코드에 비해 사용코드를 더 단순하게 만들고, 높은 수준의 구조를 유지할 수 있게 도와준다.

Sleep(또는 blocking)은 해롭다.

  • sleep 메서드를 호출하더라도 여전히 시스템은 자원을 점유하고 있다. 스레드가 적을떄는 문제되지 않지만 스레드가 많아지고 대부분이 sleep에 빠져든다면 문제가 심각해질것이다.
  • 스레드 풀에서 sleep 테스트는 다른 테스크의 할당을 막는것을 기억하자.
  • blocking 영역엥서의 sleep 도 문제가 심각하다. 블록 동작은 다른 테스크가 동작을 완료하기를 기다리는 동작과 외부 상호작용을 기다리는 동작 두 가지로 구분가능하다.
  • 이런 상황에서 테스크가 기다리는 일을 만들지 않거나 앞 뒤 동작을 구분하여 블록되지 않을경우만 실행할수 있도록 요청 가능하다.
1
2
3
4
5
6
7
8
9
10
work1(); 
Thread.sleep(10000); <- 10초간 슬립
work2(); 

main() {
  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
  work1();
  scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS);
  scheduledExecutorService.shutdown();
}
  • 아래 코드가 더 좋은 이유는 모두 같은 동작을 수행하나 위 코드는 Sleep 시간 동안 스레드의 자원을 점유하지만 아래 코드는 다른 작업이 실행될수 있도록 허용하는것이다.

비동기 API에서는 예외를 어떻게 처리하는가 ?

  • Future를 구현한 CompletableFuture에서는 런타임 get 메서드에 예외를 처리할 수 있는 기능을 제공, exceptionally와 같은 메서드도 제공된다.
  • 리액티브 형식의 비동기 API에서는 return 대신 콜백이 호출되므로 에러시 호출할 콜백 함수를 전달해주면 된다.
  • 콜백이 여러개 인경우 한 객체로 메서드를 감싸는 것이 좋다. Java 9 Flow API에서는 여러 콜백을 한 객체(네 개의 콜백을 감싸는 Subscriber)로 감싼다.

박스와 채널 모델

  • 동시성 모델을 가장 잘 설계하고 개념화 하는 기법을 박스와 채널 모델이라고 부른다.
  • 이전 에제 f(x) + g(x)의 계산을 일반화하면 f나 g를 호출하거나 p 함수에 인수 x를 이용해 호출하고 그 결과를 q1과 q2에 전달하며 두 호출의 결과를 r을 호출한다.
1
2
int t = p(x);
print(r(q1(t), q2(t)));
  • 위 코드는 q1과 q2를 차례로 호출하여 병렬성 활용과는 거리가 멀다.
1
2
3
4
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
print(r(a1.get(), a2.get()));
  • Future를 활용하여 f, g를 병렬로 호출하였다.
  • 이 예제에서는 p, r은 다른 작업보다 먼저 선행되고 마지막에 실행되어야 하기 때문에 Future로 감싸지 않았다. 하지만 병렬성을 극대화 하려면 모든 작업을 Future로 감싸야 한다.

CompletableFuture와 콤비네이터를 이용한 동시성

  • Java 8에서는 Future 인터페이스의 구현인 CompletableFuture를 이용하여 Future를 조합할 수 있는 기능을 추가했다.
  • 일반적으로 Future는 실행하여 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들수 있고 complete() 메서드를 활용하여 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 결과를 얻을수 있도록 허용한다.
1
2
3
4
5
6
7
8
9
10
main() {
 ExecutorService executorService = Executors.newFixedThreadPool(10);
 int x = 1337;

 CompletableFuture<Integer> a = new CompletableFuture<>();
 executorService.submit(() -> a.complete(f(x)));
 int b = g(x);
 print(a.get() + b);

}
  • 위 코드는 f(X)의 실행이 끝나지 않으면 get()을 기다려야 하기 때문에 프로세싱 자원을 낭비할 수 있다. 이러한 문제점을 CompletableFuture를 사용하여 해결할 수 있다.
  • 동작을 조합하는 방식은 다른 언어에서는 많이 사용 되지만 자바에서는 자바 8의 람다가 추가되며 겨우 걸음마를 땐 수준이다.
  • stream 연산을 조합하는것과 compose, andThen과 같은 메서드를 두 Function에 이용해 다른 Function을 얻을 수 있다.
  • CompletableFuture의 thenCombine 메서드를 활용하여 두 연산결과를 효과적으로 더할수 있다.
1
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T, U, V> fn);
  • 이 메서드는 두 개의 CompletableFuture 값 (T, U 결과 형식)을 받아 한 개의 새 값을 만든다. 처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블로하지 않은 상태로 결과 Future를 반환한다.
1
2
3
4
5
6
7
8
9
10
11
12
main() {
 ExecutorService executorService = Executors.newFixedThreadPool(10);
 int x = 1337;

 CompletableFuture<Integer> a = new CompletableFuture<>();
 CompletableFuture<Integer> b = new CompletableFuture<>();
 CompletableFuture<Integer> c = a.thenCombine(b, (y, z) + y + z);
 executorService.submit(() -> a.complete(f(x)));
 executorService.submit(() -> b.complete(g(x)));

 print(c.get());
}
  • Future a와 Future b의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을 때 스레드 풀에서 실행된 연산을 만든다.
  • 결과를 추가하는 세번째 연산 c는 다른 두작업이 끝나기 전까지 스레드에서 실행되지 않는다.

발행 - 구독 그리고 리액티브 프로그래밍

  • Future와 CompletableFuture는 독린적 실행과 병렬성을 가지고 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 정리하자면 Future는 한번만 실행해 결과를 제공한다.
  • 반명 리액티브 프로그래밍은 시간이 흐르며 여러 Future 같은 객체를 통해 여러 결과를 제공한다.
  • Java 9 에서는 java.util.concurrent.Flow의 인터페이스에 발행 - 구독 모델(Pub - Sub)을 적용하여 리액티브 프로그래밍을 제공한다.
    • 구독자가 구독할 수 있는 발행자
    • 이 연결을 구독(subscription)이라고 한다.
    • 이 연결을 이용해 메시지(또는 이벤트)를 전송한다.

두 플로를 합치는 예제

  • 두 정보 소스로 부터 발생하는 이벤트를 합쳐 다른 구독자가 볼 수 있도록 발행하는 예를 통해 pub - sub의 특징을 간단히 확인할 수 있다.
  • C1 + C2라는 공식을 포함하는 C3를 만들자. C1이나 C2의 값이 변경되면 C3에도 새로운 값이 반영되어야 한다.
1
2
3
4
5
6
7
8
9
10
11
private class SimpleCell {
  private int value = 0;
  private String name;

  public SimpleCell(String name) {
    this.name = name;
  }
}

SimpleCell c2 = new SimpleCell("c2");
SimpleCell c1 = new SimpleCell("c1");
  • c1 이나 c2의 값이 변경되는 경우 어떻게 c3가 두 값을 더하도록 지정할 수 있을까 ? c1과 c2에 이벤트가 발행 했을때 c3를 구독하도록 만들어야 한다.
  • 다음과 같이 Publisher 인터페이스를 활용하여 구현할 수 있다.
1
2
3
interface Publisher<T> {
  void subscribe(Subscriber<? super T> subscriber);
}
  • 이 인터페이스는 통신할 구독자를 인수로 받는다. Subscriber 인터페이스는 onNext라는 정보를 전달할 단순 메서드를 포함하며 구현자가 필요한 대로 이 메서드를 구현할 수 있다.
1
2
3
interface Subscriber<T> {
  void onNext(T t);
}
  • 두 개념을 합친 예시를 보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class SimpleCell implements Flow.Publisher<Integer>, Flow.Subscriber<Integer> {
    private int value = 0;
    private String name;
    private List<Flow.Subscriber> subscribers = new ArrayList<>();

    public SimpleCell(String name) {
        this.name = name;
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }

    @Override
    public void onNext(Integer newValue) {
        this.value = newValue;
        notifyAllSubscribers();
    }

    private void notifyAllSubscribers() {
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }

    ...
}


SimpleCell c3 = new SimpleCell("c3");
SimpleCell c2 = new SimpleCell("c2");
SimpleCell c1 = new SimpleCell("c1");

c1.subscribe(c3);

c1.onNext(10); // c1의 값을 10으로 갱신
c2.onNext(20); // c2의 값을 20으로 갱신

c3는 직접 c1을 구독하므로 다음과 같은 결과가 출력 
c1 : 10
c3 : 10
c2 : 20 

  • C3 = C1 + C2는 어떻게 구현할까 ? 왼쪽 / 오른쪽 계산 결과를 저장할 수 있는 별도의 클래스가 필요하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

public class ArithmeticCell extends SimpleCell {
    private int left;
    private int right; 
    
    public ArithmeticCell(String name) {
        super(name);
    }
    
    public void setLeft(int left) {
        this.left = left;
        onNext(left + this.right); // 셀 값을 갱신하고 구독자에게 알림
    }
    
    public void setRight(int right) {
        this.right = right;
        onNext(right + this.left);
    }
}


ArithmeticCell c3 = new ArithmeticCell("c3");
SimpleCell c2 = new SimpleCell("c2");
SimpleCell c1 = new SimpleCell("c1");

c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);

c1.onNext(10); // c1의 값을 10으로 갱신
c2.onNext(20); // c2의 값을 20으로 갱신
c1.onNext(15); // c1의 값을 15으로 갱신

C1: 10
C3: 10
C2: 20
C3: 30
C1: 15
C3: 35
  • 데이터가 발행자에서 구독자로 흐름에 착안해 개발자는 이를 업스트림 또는 다운 스트림이라고 부른다. 위 예제에서 데이터 newValue는 업스트림 onNext()메서드로 전달되고 notifyAllSubscribers() 호출을 통해 다운 스트림 onNext() 호출로 전달된다.
  • 실제로 pub-sub 구조를 적용하려면 onError나 onComplete와 같은 메서드를 통해 예외 발생, 종료등을 알수 있어야 한다.
  • 만약 수많은 메시지(이벤트)가 onNext로 전달 된다면 어떤일이 일어날까 ? 이런 상황을 압력이라고 부른다.
  • 이런 상황에서는 전달할 메시지를 제어할 수 있는 역압력 기법이 필요하다.

역압력

  • Subscriber 객체를 어떻게 Publisher에게 전달해 발행자가 필요한 메서드를 호출할 수 있는지 알아 보았다. 이 객체는 Publisher에서 Subscriber로 정보를 전달한다.
  • 정보의 흐름 속도를 역압력(흐름제어)로 제어 즉 Subscriber에서 Publisehr로 정보를 요청해야할 필요가 있을 수 있다.
  • Java 9 Flow API의 subscriber 인터페이스는 onSubscribe 메서드를 포함한다.
1
2
3
4
5
6
void onSubscribe(Subscription subscription);

interface Subscription {
  void cancel();
  void request(long n);
}
  • Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 호출된다.
  • Subscription 객체는 다음처럼 Subscriber와 Publisher와 통신할 수 있는 메서드를 포함한다.
  • 콜백을 이용한 역방향 소통효과에 주목, Pblisher는 Subscription 객체를 만들어 Subscriber로 전달시 Subscriber는 이를 이용하여 Publisher로 정보를 보낼 수 있다.

실제 역압력의 간단한 형태

  • 한 번에 한개의 이벤트만 처리하도록 구성시 다음과 같은 작업이 필요하다.
    • Subscriber가 OnSubscribe로 전달된 Subscription객체를 subscription 같은 필드에 로컬로 저장
    • Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가, 오직 한 이벤트만 요청
    • 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 수정
  • 구현이 간단해 보일수 있지만 실제로 역압력을 구현시 장단점을 생각해야 한다.
    • 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼것인가 ? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질것인가 ?
    • 큐가 너무 커지면 어떻게 해야 할까 ?
    • Subscriber가 준비되지 않으면 큐의 데이터를 폐기 할 것인가 ?

정리

  • 자바의 동시성 지원은 계속 진화 해왔으며 앞으로도 진화 할 것이다. 스레드 풀은 일반적으로 유용하지만 블록되는 테스크가 많은 경우 문제가 발생한다.
  • 메서드를 비동기로 만들면 병렬성을 추가할 수 있고 부수적으로 루프를 최적화 한다.
  • 박스와 채널 모델을 이용해 비동기 시스템을 시각화 할 수 있다.
  • CompletableFuture 클래스는 한 번의 비동기 연산을 표현한다. 콤비네이터로 비동기 연산을 조합함으로 Future를 이용할 때 발생한 블로킹 문제를 해결할 수 있다.
  • Flow API는 pub - sub 구조, 역얍력을 이용하면 자바의 리액티브 프로그래밍의 기초를 제공.