728x90
CompletableFuture : 비동기 작업을 간단하게 완료하는 작업을 할 수 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
//CompletableFuture<Integer> cf = CompletableFuture.completedFuture(1); //비동기 작업을 간단하게 완료하는 작업을 할 수 있다.
CompletableFuture<Integer> cf = new CompletableFuture<>(); //값을 넘기지 않았기 때문에 무한정 대기
//cf.complete(2);//값을 넣어준다
cf.completeExceptionally(new RuntimeException()); //예외가 발생했다는것을 알려준다
System.out.println(cf.get());
}
- CompletableFuture 타입을 사용하게 되면, 비동기 작업을 간단하게 진행할 수 있다.
- CompletableFuture의 complete 메서드를 사용하여, 값을 넣어주어, 이벤트를 발생시켜 주거나, 생성 시 CompletableFuture.completedFuture(value)를 사용해 이벤트를 바로 넣어 줄 수도 있다.
- CompletableFuture의 completeExceptionally를 사용해서, 예외를 발생하였다는 것을 호출하는 곳에 전달해 줄 수 있으며, get() 메서드를 호출하는 순간 예외가 전파된다.
runAsync, thenRun
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.runAsync(() -> log.info("runAsync"))//파라미터로 Runnable을 받는다.
//장점 : 내부적으로 Future, CompletionStage 인터페이스를 구현하고 있다.
// (CompletionStage = 하나의 작업을 비동기 작업으로 수행하고, 완료가 됫을때, 해당 작업에 의존적인 메서드를 실행시켜줄 수 있는 클래스)
.thenRun(() -> log.info("thenRun"))//비동기 작업이 완료되었으면, 해당 백그라운드에서 이 람다를 실행해라(동기적 실행)
.thenRun(() -> log.info("thenRun"));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}

- CompletableFuture 같은 경우, 내부적으로 Future, CompletionStage를 구현하고, 있다.
- CompletionStage를 구현했기 때문에 후속 작업이 비동기 작업에 의존적으로 동작하는 코드를 작성하기 용이하다
- runAsyn 같은 경우 파라미터로 Runnable 타입을 받는다(리턴 값이 없다)
- 비동기 작업을 해당 메서드를 통해 간단하게 실행이 가능하다
- thenRun 같은 경우 CompletableFuture 결과를 다시 비동기 적으로 수행하고 싶을 때 사용한다.
supplyAsync, thenApply, thenAccept
public static void main(String[] args) throws ExecutionException, InterruptedException {
//비동기 결과를 다음 비동기 작업에 전달할 수 있다
CompletableFuture
.supplyAsync(() ->{
log.info("runAsync");
return 1;
}) //리턴값이 있어야 한다
.thenApply(s -> {
log.info("thenApply {} ", s);
return s + 1;
})//앞에 비동기 작업을 결과를 받아 사용할 수 있다.
.thenApply(s2 -> {
log.info("thenApply {} ", s2);
return s2 * 3;
})
.thenAccept(s3 -> log.info("thenAccept {}", s3));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
- supplyAsync 같은 경우 비동기 작업을 결과를 반환해야 한다.
- thenApply 같은 경우 반환받은 결과를 파라미터로 받을 수 있으며, 반환 값이 반드시 있어야 한다.
- thenAccept 같은 경우 비동기 작업에 대한 결과를 파라미터로 받을 수 있으며, 함수를 실행할 수 있다(반환 x)
thenCompose, thenApply
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() ->{
log.info("runAsync");
return 1;
})
.thenCompose(s -> { //flatMap 에 해당하는 부분으로 보면 된다.
log.info("thenApply {} ", s);
//비동기 작업으로 인해 타입이 CompletableFuture 타입으로 변경이된다.
return CompletableFuture.completedFuture(s + 1);//function을 결과 값이 또 다른 Completable 결과로 나와야 할 경우가 있다.
})
.thenApply(s2 -> { //map에 해당
log.info("thenApply {} ", s2);
return s2 * 3;
})
.thenAccept(s3 -> log.info("thenAccept {}", s3));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
- thenApply는 원소 하나하나에 접근해서 값을 반환하는 Stream에 map과 비슷한 함수이다.
- thenCompose 같은 경우 반환해야 하는 타입을 전체 CompletableFuture 타입으로 감싸 반환하므로, Stream에 flatmap과 비슷한 함수이다.
- CompletableFuture로 감싸서 반환하기 위해 CompletableFuture의 static 메서드를 사용해서 반환한다.
참고 : map <-> flatMap
https://www.techiedelight.com/ko/difference-map-flatmap-java/
Java에서 map()과 flatMap()의 차이점
이 게시물은 차이점에 대해 설명합니다. map() 그리고 flatMap() 방법 Stream Java의 클래스. 1. 서명 프로토타입 Stream.map() 이다: /** * @파람 새 스트림의 요소 유형 * @파람 매퍼 각 요소에 적용할 비간섭,
www.techiedelight.com
exceptionally
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() ->{
log.info("runAsync");
if(true)
throw new RuntimeException();
return 1;
})
.thenCompose(s -> {
log.info("thenApply {} ", s);
return CompletableFuture.completedFuture(s + 1);
})
.thenApply(s2 -> {
log.info("thenApply {} ", s2);
return s2 * 3;
})
.exceptionally(e -> -10) //예외를 잡아 복구하는 기능을 적용할 수 있다
.thenAccept(s3 -> log.info("thenAccept {}", s3));
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
- 비동기로 작업을 진행하는 도중에 예외가 발생하였을 때, exceptionally에서 예외를 복구할 수 있다.
- supplyAsync에서 예외를 발생시키게 된다면, thenCompose, thenApply 가 실행되지 않고 exceptionally에서 예외를 복구 후, thenAccept에서 결과를 출력하는 것을 볼 수 있다.
비동기 작업을 다른 스레드로 실행
public static void main(String[] args) throws ExecutionException, InterruptedException {
//현재는 하나의 스레드를 통해 모든 비동기 작업을 하지만, 만약 비동기 작업을 다른 스레드에서 실행하고 싶다면 어떻게 할까?
ExecutorService es = Executors.newFixedThreadPool(10);
CompletableFuture
.supplyAsync(() ->{
log.info("runAsync");
return 1;
}, es)
.thenCompose(s -> {
log.info("thenApply {} ", s);
return CompletableFuture.completedFuture(s + 1);
})
.thenApplyAsync(s2 -> { //현재 스레드 풀 정책에 따라 새로운 스레드를 할당한다.
log.info("thenApply {} ", s2);
return s2 * 3;
}, es)
.exceptionally(e -> -10)
.thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es);
log.info("exit");
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}

- Async 키워드가 붙지 않은 메서드를 실행하게 된다면, 해당 비동기 메서드의 실행은 하나의 스레드에서 동작하게 된다.
- 여러 비동기 작업을 현재 내 스레드 풀 정책에 맞춰 다른 스레드에게 할당시킬 수 있는 방법으로는 Async 키워드가 붙은 메서드를 호출하면 된다
- 항상 Async키워드가 붙은 메서드, 붙지 않은 메서드가 쌍으로 존재한다.
- 실행 결과를 보게 되면 현재 FixedThreadPool 정책에 따라 비동기 작업마다 스레드를 할당한다.
ListenableFuture을 Callback 지옥을 수정하자
@RestController
public static class MyController {
public static final String URL1 = "http://localhost:8081/service1?req={req}";
public static final String URL2 = "http://localhost:8081/service2?req={req}";
AsyncRestTemplate rt = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
//해당 값이 반환하는 것이 반환하는것이 CompletableFuture 타입이 아니다.
toCompletableFuture(rt.getForEntity(URL1, String.class, "h" + idx))
.thenCompose(s -> toCompletableFuture(rt.getForEntity(URL2, String.class, s.getBody()))) //2번쨰 비동기 결과
.thenApplyAsync(s2 -> myService.work(s2.getBody())) //myService가 더이상 비동기가 아니므로 thenApply를 사용하면 된다
//스레드를 비동기로 하는 Async 키워드를 붙이자
.thenAccept(s3 -> dr.setResult(s3))
.exceptionally(e -> {dr.setErrorResult(e.getMessage()); return (Void)null;} )
;
return dr;
}
private <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> listenableFuture) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
listenableFuture.addCallback(s -> {
completableFuture.complete(s);
}, e -> {
completableFuture.completeExceptionally(e);
});
return completableFuture;
}
@Service
public static class MyService {
//@Async //CompletableFuture를 사용하면 더이상 스프링을 비동기 어노테이션이 필요 없다
public String work(String req) {
return req + "/asyncwork";
}
}
@Bean
public ThreadPoolExecutor myThreadPool() {
ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
return te;
}
public static void main(String[] args) {
SpringApplication.run(Chapter11Application.class, args);
}
}
- rt.getForEntity 가 반환하는 값이 아직, CompletableFuture 형식이 아닌 ListenableFuture 형식이기 때문에 이를 CompletableFuture로 변경해줘야 한다.
- toCompletableFuture 메서드를 통해, listenableFuture에 callback에서 Completable의 complete를 호출하고, CompletableFuture를 반환한다.
- 비동기 작업 2 같은 경우 CompletableFuture을 파라미터로 받아 CompletableFuture을 반환하는 함수이므로 thenCompose를 사용해야 한다.
- 비동기 작업 3 같은 경우 더 이상 스프링에서 비동기 작업을 실행시키지 않아도 된다(@Async 키워드를 제거하자)
- 일반적으로 값을 반환하는 코드를 작성한 뒤, 비동기로 실행하는 곳에서, thenApply를 실행할 수 있다.
- 비동기로 이를 실행해야 하므로, 메서드에 Async키워드를 붙여야 한다.
- 모든 비동기 작업을 수행한 결과를, thenAccept에서 DeferredResult에 담아 준다.
- 예외처리 같은 경우, exceptionally 함수를 마지막에 두어 처리를 하면 되는데 exceptionally 같은 경우 반드시 반환 값이 있어야 하기 때문(예외를 복구해주는..)에 return (Void) null을 반환한다.
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| Mono의 동작방식과 block() (0) | 2022.08.28 |
|---|---|
| WebFlux (0) | 2022.08.28 |
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |