그룹 스터디 공부(IT 서적)/모던 자바 인 액션

chapter 15 CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

hanyugyeong 2023. 9. 6. 17:32
반응형
SMALL

이 장의 내용

  • Thread, Future, 자바가 풍부한 동시성 API를 제공하도록 강요하는 진화의 힘
  • 비동기 API
  • 동시 컴퓨팅의 박스와 채널 뷰
  • CompletableFuture 콤비네이터로 박스를 동적으로 연결
  • 리액티브 프로그래밍용 자바 9 플로 API의 기초를 이루는 발행 구독 프로토콜
  • 리액티브 프로그래밍과 리액티브 시스템

15.1 동시성을 구현하는 자바 지원의 진화 

Runnable, Thread → ExecutorService, Callable<T>, Future<T>, 제네릭 → RecursiveTask → 람다 → 분산 비동기 프로그래밍

15.1.1 스레드와 높은 수준의 추상화 

프로세스는 다시 운영체제에 한 개 이상의 스레드 즉, 본인이 가진 프로세스와 같은 주소 공간을 공유하는 프로세스를 요청함으로 태스크를 동시에 또는 협력적으로 실행할 수 있다. 

 

자바 스트림으로 외부 반복(명시적 루프) 대신 내부 반복을 통해 얼마나 쉽게 병렬성을 달성할 수 있는지 설명했다. 

sum = Arrays.stream(stats).parallel().sum();

병렬 스트림 반복은 명시적으로 스레드를 사용하는 것에 비해 높은 수준의 개념이라는 사실을 알 수 있다. 다시 말해 스트림을 이용해 스레드 사용 패턴을 추상화할 수 있다. 

 

15.1.2 Executor와 스레드 풀

스레드의 문제 

자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드는 비용이 비싸며, 스레드 숫자도 제한되어 있다.

기기마다 다른 하드웨어 스레드를 가지며, 주어진 프로그램에서 사용할 최적의 자바 스레드 개수는 사용할 수있는 하드웨어 코어 개수에 따라 달라진다.

스레드 풀 그리고 스레드 풀이 더 좋은 이유

자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다. 

ExecutorService newFixedThreadPool(int nThreads)

이 메서드는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다. 

이 방식의 장점은 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스클르 스레드 풀에 아무 오버헤드 없이 제출할 수 있다는 점이다. 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 다양한 설정을 할 수 있다. 

프로그래머는 태스크(Runnable이나 Callable)를 제공하면 스레드가 이를 실행한다. 

스레드 풀 그리고 스레드 풀이 나쁜 이유

스레드풀을 이용할때는 두 가지 사항을 주의해야한다.

  • k 스레드를 가진 스레드 풀은 오직 k만큼의 스레드를 동시에 실행할 수있다. sleep 중이거나 I/O block 중인 스레드가 있다면 작업 효율성이 떨어지며, 데드락에 걸릴 수도 있다.
    block 할 수 있는 태스크는 스레드에 제출하지 말아야하지만 이를 항상 지키긴 어렵다.
  • 모든 스레드 풀이 종료되기 전에 프로그램을 종료하면, 워커 스레드가 다음 태스크 제출을 기다리며 종료되지 않을 수 있으므로 주의해야한다.

15.1.3 스레드의 다른 추상화 : 중첩되지 않은 메서드 호출 

7장(병렬 스트림 처리와 포크/조인 프레임워크)에서 사용한 동시성에서는 한 개의 특별한 속성 즉, 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다렸다.

이렇게 스레드 생성과 join()이 한 쌍처럼 충첩된 메서드 호출을 엄격한 포크/조인이라 부른다.

시작된 태스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 좀 더 여유로운 방식의 포크/조인을 사용해도 비교적 안전하다. 

15장에서는 다음 그림과 같이 사용자의 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나 계속 실행되는 동시성 형태에 초점을 맞춘다.

이렇게 메서드가 반환된 후에도 만들어진 태스크 실행이 계속되는 메서드를 비동기 메서드라 한다.

이 메서드를 사용할때는 다음 사항을 주의해야 한다.

  • 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록해야 한다.
  • 기존 실행 중이던 스레드가 종료되지 않은 상황에서 자바의 main() 메서드가 반환되면 문제가 발생할 수 있다. - 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때가지 기다린다. - 애플리케이션 종료를 방해하는 스레드를 강제종료(kill)시키고 애플리케이션을 종료한다. 

자바 스레드는 setDaemon() 메서드를 이용해 데몬 또는 비데몬으로 구분시킬 수 있다. 

데몬 스레드는 애플리케이션이 종료될 때 강제 종료되므로 디시크의 데이터 일관성을 파괴하지 않는 동작을 수행할 때 유용하게 활용할 수 있는 반면, main() 메서드는 모든 비데몬 스레드가 종료될 때까지 프로그램을 종료하지 않고 기다린다. 

15.1.4 스레드에 무엇을 바라는가?

모든 하드웨어 스레드를 활용해 병렬성의 장점을 극대화하도록 프로그램 구조를 만드는 것 즉, 프로그램을 작은 태스크 단위로 구조화하는 것이 목표다

15.2 동기 API와 비동기 API 

두 가지 단계로 병렬성을 이용할 수 있다. 첫 번째로 외부 반복(명시적 for루프)을 내부 반복(스트림 메서드 사용)으로 바꿔야 한다. 그리고 스트림에 parallel() 메서드를 이용하므로 자바 런타임 라이브러리가 복잡한 스레드 작업을 하지 않고 병렬로 요소가 처리되도록 할 수 있다. 

다음과 같은 시그니처를 갖는 f,g 두 메서드의 호출을 합하는 예제를 살펴보자. 

int f(int x);
int g(int x);

참고로 이들 메서드는 물리적 결과를 반환하므로 동기 API라고 부른다. 다음처럼 두 메서드를 호출하고 합계를 출력하는 코드가 있다. 

int y = f(x);
int z = g(x);
System.out.println(y + z);

f와 g를 실행하는데 오랜 시간이 걸린다고 가정하자 f,g의 작업을 컴파일러가 완전하게 이해하기 어려우므로 보통 자바 컴파일러는 코드 최적화와 관련한 아무 작업도 수행하지 않을 수 있다. 

별도의 스레드로 f와 g를 실행해 이를 구현할 수 있다. 의도는 좋지만 이전의 단순했던 코드가 다음처럼 복잡하게 변한다. 

class ThreadExample {
  public static void main(String[] args) throws InterruptedException {
    int x = 1337;
    Result result = new Result();
    
    Thread t1 = new Thread(() -> { result.left = f(x); });
    Thread t2 = new Thread(() -> { result.right = g(x); });
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println(result.left + result.right);
  }
  
  private static class Result {
    private int left;
    private int right;
  }
}

Runnable 대신 Future API 인터페이스를 이용해 코드를 더 단순화할 수 있다. 이미 ExecutorService로 스레드 풀을 설정했다고 가정하면 다음처럼 코드를 구현할 수 있다. 

public class ExecutorServiceExample {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    int x = 1337;

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    Future<Integer> y = executorService.submit(() -> fo(x));
    Future<Integer> z = executorService.submit(() -> go(x));
    System.out.println(y.get() + z.get());

    executorService.shutdown();
  }

}

여전히 이 코드도 명시적인 submit 메서드 호출 같은 불필요한 코드로 오염되었다. 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용해 내부 반복으로 바꾼것처럼 비슷한 방법으로 이 문제를 해결해야 한다. 

 

문제의 해결은 비동기 API라는 기능으로 API를 바꿔서 해결할 수 있다. 

 

첫 번째 방법은 자바의 Future를 이용하면 이 문제를 조금 개선할 수 있다. 

15.2.1 Future 형식 API 

대안을 이용하면  f,g의 시그니처가 다음처럼 바뀐다.

Future<Integer> f(int x);
Future<Integer> g(int x);

그리고 다음처럼 호출이 바뀐다. 

Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get() + z.get());

메서드 f는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future를 반환한다. 마찬가지로 메서드 g도 Future를 반환하며 세 번째 코드는 get() 메서드를 이용해 두 Future가 완료되어 결과가 합쳐지기를 기다린다. 

 

예제에서는 API는 그대로 유지하고 g를 그대로 호출하면서 f에만 Future를 적용할 수 있었다. 

하지만 조금 더 큰 프로그램에서는 두 가지 이유로 이런 방식을 사용하지 않는다. 

  • 다른 상황에서는 g에도 future 형식이 필요할 수 있으므로 API 형식을 통일하는 것이 바람직하다.
  • 병렬 하드웨어로 프로그램 실행 속도를 극대화하려면 작고 합리적인 크기의 여러 태스크로 나누는 것이 좋다.

15.2.2 리액티브 형식 API 

두 번째 대안은 f, g의 시그니처를 바꿔 콜백 형식의 프로그래밍을 이용하는 것이다.

void f(int x, IntConsumer dealWithResult);

f가 값을 반환하지 않는데 어떻게 프로그램이 동작할까? f에 추가 인수로 콜백(람다)을 전달해서 f의 바디에서는 return 문으로 결과를 반환하는 것이 아니라 결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것이 비결이다. 

다시 말해 f는 바디를 실행하면서 태스크를 만든 다음 즉시 반환하므로 코드 형식이 다음처럼 바뀐다. 

public class CallbackStyleExample {

  public static void main(String[] args) {

    int x = 1337;
    Result result = new Result();

    f(x, (int y) -> {
      result.left = y;
      System.out.println((result.left + result.right));
    });

    g(x, (int z) -> {
      result.right = z;
      System.out.println((result.left + result.right));
    });
  }

}

하지만 결과가 달라졌다. f와 g의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력한다. 락을 사용하지 않으므로 값을 두 번 출력할 수 있을 뿐더러 때로는 +에 제공된 두 피연산자가 println이 호출되기 전에 업데이트될 수도 있다. 

  • if-then-else를 이용해 적절한 락을 이용해 두 콜백이 모두 호출되었는지 확인한 다음 원하는 기능을 수행한다.
  • 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.

15.2.3 잠자기(그리고 기타 블로킹 동작)은 해로운 것으로 간주 

스레드 풀에서 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다는 사실을 기억하자. 

 

블록 동작은 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작(Future에 get() 호출)과 외부 상호작용을 기다리는 동작 두 가지로 구분할 수 있다. 

다음은 한 개의 작업을 갖는 코드 A다.

work1();
Thread.sleep(10000);
work2();

이를 코드 B와 비교하자.

public class ScheduledExecutorServiceExample {
  public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executor.newScheduledThreadPool(1);
    
    work1();
    scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, timeUnit.SECONDS);
    
    scheduledExecutorService.shutdown();
  }
  
  public static void work1() {
    ...
  }
  
  
  public static void work2() {
    ...
  }
}

코드 A는 sleep하는 동안 스레드 자원을 점유하는 반면, B는 다른 작업이 실행될 수 있도록 허용한다.

 

15.2.4 현실성 확인 

현실적으로 '모든 것은 비동기'라는 설계 원칙을 지킬 수는 없다.

자바의 개선된 동시성 API를 이용해 유익을 얻을 수 있는 상황을 찾아보고 사용해보길 권장한다.

네트워크 서버의 블록/비블록 API를 일관적으로 제공하는 Netty 같은 새로운 라이브러리를 사용하는 것도 도움이 된다.

15.2.5 비동기 API에서 예외는 어떻게 처리할까?

비동기 API에서 호출된 메서드의 실제 바디는 별도의 스레드에서 호출되며 이때 발생하는 어떤 에러는 이미 호출자의 실행 범위와는 관계가 없는 상황이 된다.

Future를 구현한 CompletableFuture에서는 런타임 get() 메서드에 예외를 처리할 수 있는 기능을 제공하며 예외에서 회복할 수 있도록 exceptionally()
같은 메서드도 제공한다.

리액티브 형식의 비동기 API에서는 return 대신 기존 콜백이 호출되므로 예외가 발생했을 때 실행될 추가 콜백을 만들어 인터페이스를 바꿔야한다.

15.3 박스와 채널 모델 

  • 박스와 채널 모델은 동시성을 설계하고 계념화하기 위한 모델이다.
  • 박스와 채널 모델을 이용하면 생각과 코드를 구조화할 수 있으며, 시스템 구현의 추상화 수준을 높일 수 있다.
  • 박스로 원하는 연산을 표현하면 계산을 손으로 코딩한 결과보다 더 효율적일 것이다.
  • 또한 병렬성을 직접 프로그래밍하는 관점을 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔준다.

간단한 박스와 채널 다이어그램 

위 태스크를 코드로 구현해보자 

int t = p(x);
System.out.println(r(q1(t), q2(t));
// 위 방식은 q1, q2를 차례로 호출하여 하드웨어 병렬성 활용과는 거리가 멀다.
int t = p(x);
Future<integer> a1 = executorService.submit(() -> q1(t));
Future<integer> a2 = executorService.submit(() -> q2(t));
System.out.println(r(a1.get(), a2.get());

박스와 채널 다이어그램의 모양상 p와 r을 Future로 감싸지 않았지만, 병렬성을 극대화하려면 모든 함수를 Future로 감싸야 한다.
많은 태스크가 get() 메서드를 호출해서 Future가 끝나기를 기다리게 되면 하드웨어의 병렬성을 제대로 활용하지 못하거나 데드락에 걸릴 수도 있다.

15.4 CompletableFuture와 콤비네이터를 이용한 동시성

CompletableFuture는 Future를 조합할 수 있는 기능이 있다.

ComposableFuture가 아닌 CompletableFuture라 부르는 이유는 실행할 코드 없이 Future를 만들 수 있고, complete() 메서드를 이용해 다른
스레드가 완료한 후에 get()으로 값을 얻을 수 있도록 허용하기 때문이다.

public class CFComplete {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFiexedthreadPool(10);
        int x = 1337;

        CompletableFuture<Integer> a = new CompletableFuture<>();
        executorService.submit(() -> a.complete(f(x)));
        int b = g(x);
        System.out.println(a.get() + b);

        executorService.shutdown();

    }
}

f(x)와 g(x)를 동시에 실행해 합계를 구하는 위 코드에서, f(x)의 실행이 끝나지 않은 상황에서 get()을 기다리며 프로세싱 자원을 낭비할 수 있다.

ComposableFuture의 thenCombine 메서드를 사용하면 연산 결과를 효과적으로 더할 수 있다.

ComposableFuture<V> thenCombine(CompletableFuture<U> other, Bifunction<T, U, V> fn)

이 메서드는 T, U 값을 받아 새로운 V값을 만든다.

public class CFComplete {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFiexedthreadPool(10);
        int x = 1337;

        CompletableFuture<Integer> a = new CompletableFuture<>();
        CompletableFuture<Integer> b = new CompletableFuture<>();

        CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z);
        executorService.submit(() -> a.complete(f(x)));
        executorService.submit(() -> b.complete(g(x)));

        System.out.println(c.get());
        executorService.shutdown();

    }
}

결과를 추가하는 세 번째 연산 c는 다른 두 작업이 끝날때까지 실행되지 않으므로 먼저 시작해서 블록되지 않는다.

이전 버전의 y+z 연산은 g(x)를 실행한 스레드에서 수행되어 f(x)가 완료될 때까지 블록될 여지가 있었다.

반면 thenCombine을 이용하면 f(x)와 g(x)가 끝난 다음에 덧셈 계산이 실행된다.

반응형
LIST