Toby의 ReactiveProgramming

CompletableFuture

webmaster 2022. 8. 27. 01:31
728x90

CompletableFuture : 비동기 작업을 간단하게 완료하는 작업을 할 수 있다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
  //CompletableFuture<Integer> cf = CompletableFuture.completedFuture(1); //비동기 작업을 간단하게 완료하는 작업을 할 수 있다.
  CompletableFuture<Integer> cf = new CompletableFuture<>(); //값을 넘기지 않았기 때문에 무한정 대기
  //cf.complete(2);//값을 넣어준다
  cf.completeExceptionally(new RuntimeException()); //예외가 발생했다는것을 알려준다
  System.out.println(cf.get());
}
  • CompletableFuture 타입을 사용하게 되면, 비동기 작업을 간단하게 진행할 수 있다.
  • CompletableFuture의 complete 메서드를 사용하여, 값을 넣어주어, 이벤트를 발생시켜 주거나, 생성 시 CompletableFuture.completedFuture(value)를 사용해 이벤트를 바로 넣어 줄 수도 있다.
  • CompletableFuture의 completeExceptionally를 사용해서, 예외를 발생하였다는 것을 호출하는 곳에 전달해 줄 수 있으며, get() 메서드를 호출하는 순간 예외가 전파된다.

runAsync, thenRun

public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture
      .runAsync(() -> log.info("runAsync"))//파라미터로 Runnable을 받는다.
      //장점 : 내부적으로 Future, CompletionStage 인터페이스를 구현하고 있다.
      //  (CompletionStage = 하나의 작업을 비동기 작업으로 수행하고, 완료가 됫을때, 해당 작업에 의존적인 메서드를 실행시켜줄 수 있는 클래스)
      .thenRun(() -> log.info("thenRun"))//비동기 작업이 완료되었으면, 해당 백그라운드에서 이 람다를 실행해라(동기적 실행)
      .thenRun(() -> log.info("thenRun"));
  log.info("exit");

  ForkJoinPool.commonPool().shutdown();
  ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}

Future, CompletionStage를 구현한 CompletableFuture

  • CompletableFuture 같은 경우, 내부적으로 Future, CompletionStage를 구현하고, 있다.
    • CompletionStage를 구현했기 때문에 후속 작업이 비동기 작업에 의존적으로 동작하는 코드를 작성하기 용이하다
  • runAsyn 같은 경우 파라미터로  Runnable 타입을 받는다(리턴 값이 없다)
    • 비동기 작업을 해당 메서드를 통해 간단하게 실행이 가능하다
  • thenRun 같은 경우 CompletableFuture 결과를 다시 비동기 적으로 수행하고 싶을 때 사용한다.

supplyAsync, thenApply, thenAccept

public static void main(String[] args) throws ExecutionException, InterruptedException {
  //비동기 결과를 다음 비동기 작업에 전달할 수 있다
  CompletableFuture
      .supplyAsync(() ->{
        log.info("runAsync");
        return 1;
      }) //리턴값이 있어야 한다
      .thenApply(s -> {
        log.info("thenApply {} ", s);
        return s + 1;
      })//앞에 비동기 작업을 결과를 받아 사용할 수 있다.
      .thenApply(s2 -> {
        log.info("thenApply {} ", s2);
        return s2 * 3;
      })
      .thenAccept(s3 -> log.info("thenAccept {}", s3));
  log.info("exit");
  ForkJoinPool.commonPool().shutdown();
  ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);

}
  • supplyAsync 같은 경우 비동기 작업을 결과를 반환해야 한다.
  • thenApply 같은 경우 반환받은 결과를 파라미터로 받을 수 있으며, 반환 값이 반드시 있어야 한다.
  • thenAccept 같은 경우 비동기 작업에 대한 결과를 파라미터로 받을 수 있으며, 함수를 실행할 수 있다(반환 x)

thenCompose, thenApply

public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture
      .supplyAsync(() ->{
        log.info("runAsync");
        return 1;
      })
      .thenCompose(s -> { //flatMap 에 해당하는 부분으로 보면 된다.
        log.info("thenApply {} ", s);
        //비동기 작업으로 인해 타입이 CompletableFuture 타입으로 변경이된다.
        return CompletableFuture.completedFuture(s + 1);//function을 결과 값이 또 다른 Completable 결과로 나와야 할 경우가 있다.
      })
      .thenApply(s2 -> { //map에 해당
        log.info("thenApply {} ", s2);
        return s2 * 3;
      })
      .thenAccept(s3 -> log.info("thenAccept {}", s3));
  log.info("exit");
  ForkJoinPool.commonPool().shutdown();
  ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
  • thenApply는 원소 하나하나에 접근해서 값을 반환하는 Stream에 map과 비슷한 함수이다.
  • thenCompose 같은 경우 반환해야 하는 타입을 전체 CompletableFuture 타입으로 감싸 반환하므로, Stream에 flatmap과 비슷한 함수이다.
    • CompletableFuture로 감싸서 반환하기 위해 CompletableFuture의 static 메서드를 사용해서 반환한다.

참고 : map <-> flatMap

https://www.techiedelight.com/ko/difference-map-flatmap-java/

 

Java에서 map()과 flatMap()의 차이점

이 게시물은 차이점에 대해 설명합니다. map() 그리고 flatMap() 방법 Stream Java의 클래스. 1. 서명 프로토타입 Stream.map() 이다: /** * @파람 새 스트림의 요소 유형 * @파람 매퍼 각 요소에 적용할 비간섭,

www.techiedelight.com

exceptionally

public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture
      .supplyAsync(() ->{
        log.info("runAsync");
        if(true)
          throw new RuntimeException();
        return 1;
      })
      .thenCompose(s -> {
        log.info("thenApply {} ", s);
        return CompletableFuture.completedFuture(s + 1);
      })
      .thenApply(s2 -> {
        log.info("thenApply {} ", s2);
        return s2 * 3;
      })
      .exceptionally(e -> -10) //예외를 잡아 복구하는 기능을 적용할 수 있다
      .thenAccept(s3 -> log.info("thenAccept {}", s3));
  log.info("exit");
  ForkJoinPool.commonPool().shutdown();
  ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
  • 비동기로 작업을 진행하는 도중에 예외가 발생하였을 때, exceptionally에서 예외를 복구할 수 있다.
  • supplyAsync에서 예외를 발생시키게 된다면, thenCompose, thenApply 가 실행되지 않고 exceptionally에서 예외를 복구 후, thenAccept에서 결과를 출력하는 것을 볼 수 있다.

비동기 작업을 다른 스레드로 실행

public static void main(String[] args) throws ExecutionException, InterruptedException {
  //현재는 하나의 스레드를 통해 모든 비동기 작업을 하지만, 만약 비동기 작업을 다른 스레드에서 실행하고 싶다면 어떻게 할까?
  ExecutorService es = Executors.newFixedThreadPool(10);

  CompletableFuture
      .supplyAsync(() ->{
        log.info("runAsync");
        return 1;
      }, es)
      .thenCompose(s -> {
        log.info("thenApply {} ", s);
        return CompletableFuture.completedFuture(s + 1);
      })
      .thenApplyAsync(s2 -> { //현재 스레드 풀 정책에 따라 새로운 스레드를 할당한다.
        log.info("thenApply {} ", s2);
        return s2 * 3;
      }, es)
      .exceptionally(e -> -10)
      .thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es);
  log.info("exit");
  ForkJoinPool.commonPool().shutdown();
  ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}

결과

  • Async 키워드가 붙지 않은 메서드를 실행하게 된다면, 해당 비동기 메서드의 실행은 하나의 스레드에서 동작하게 된다.
  • 여러 비동기 작업을 현재 내 스레드 풀 정책에 맞춰 다른 스레드에게 할당시킬 수 있는 방법으로는 Async 키워드가 붙은 메서드를 호출하면 된다
    • 항상 Async키워드가 붙은 메서드, 붙지 않은 메서드가 쌍으로 존재한다.
  • 실행 결과를 보게 되면 현재 FixedThreadPool 정책에 따라 비동기 작업마다 스레드를 할당한다.

ListenableFuture을 Callback 지옥을 수정하자

@RestController
public static class MyController {

  public static final String URL1 = "http://localhost:8081/service1?req={req}";
  public static final String URL2 = "http://localhost:8081/service2?req={req}";

  AsyncRestTemplate rt = new AsyncRestTemplate(
      new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
  @Autowired
  MyService myService;

  @GetMapping("/rest")
  public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<>();
    //해당 값이 반환하는 것이 반환하는것이 CompletableFuture 타입이 아니다.
    toCompletableFuture(rt.getForEntity(URL1, String.class, "h" + idx))
        .thenCompose(s -> toCompletableFuture(rt.getForEntity(URL2, String.class, s.getBody()))) //2번쨰 비동기 결과
        .thenApplyAsync(s2 -> myService.work(s2.getBody())) //myService가 더이상 비동기가 아니므로 thenApply를 사용하면 된다
          //스레드를 비동기로 하는 Async 키워드를 붙이자
        .thenAccept(s3 -> dr.setResult(s3))
        .exceptionally(e -> {dr.setErrorResult(e.getMessage()); return (Void)null;} )
    ;

    return dr;
  }

  private <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> listenableFuture) {
    CompletableFuture<T> completableFuture = new CompletableFuture<>();
    listenableFuture.addCallback(s -> {
      completableFuture.complete(s);
    }, e -> {
      completableFuture.completeExceptionally(e);
    });
    return completableFuture;
  }
  
  @Service
  public static class MyService {
    //@Async //CompletableFuture를 사용하면 더이상 스프링을 비동기 어노테이션이 필요 없다
    public String work(String req) {
      return req + "/asyncwork";
    }
  }

  @Bean
  public ThreadPoolExecutor myThreadPool() {
    ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
    return te;
  }

  public static void main(String[] args) {
    SpringApplication.run(Chapter11Application.class, args);
  }
}

 

  • rt.getForEntity 가 반환하는 값이 아직, CompletableFuture 형식이 아닌 ListenableFuture 형식이기 때문에 이를 CompletableFuture로 변경해줘야 한다.
  • toCompletableFuture 메서드를 통해, listenableFuture에 callback에서 Completable의 complete를 호출하고, CompletableFuture를 반환한다.
  • 비동기 작업 2 같은 경우 CompletableFuture을 파라미터로 받아 CompletableFuture을 반환하는 함수이므로 thenCompose를 사용해야 한다.
  • 비동기 작업 3 같은 경우 더 이상 스프링에서 비동기 작업을 실행시키지 않아도 된다(@Async 키워드를 제거하자)
    • 일반적으로 값을 반환하는 코드를 작성한 뒤, 비동기로 실행하는 곳에서, thenApply를 실행할 수 있다.
    • 비동기로 이를 실행해야 하므로, 메서드에 Async키워드를 붙여야 한다.
  • 모든 비동기 작업을 수행한 결과를, thenAccept에서 DeferredResult에 담아 준다.
  • 예외처리 같은 경우, exceptionally 함수를 마지막에 두어 처리를 하면 되는데 exceptionally 같은 경우 반드시 반환 값이 있어야 하기 때문(예외를 복구해주는..)에 return (Void) null을 반환한다.
728x90