Toby의 ReactiveProgramming

WebFlux

webmaster 2022. 8. 28. 19:17
728x90

이전에 사용했던 AsyncRestTemplate의 getForEntity 메서드 같은 경우, Http 요청을 모두 받기 때문에 Http Header, 상태 코드 모두 돌아오기 때문에 좋지 않다.

Mono

@GetMapping("/rest")
public Mono<String> rest(int idx) {
  //Mono를 사용하면, 어떤 오브젝트든 받아서 반환할 수 있다.
  //Mono를 컨테이너(컬렉션)이라고, 생각하면 된다 -> 컨테이너 = Optional, List 등과 같이 컨테이너로 데이터를 감싸면 여러 기능을 쓸 수 있다.
  //Mono<String> m = Mono.just("Hello");//일반 String 메서드와 달리 Mono로 감싸게 되면, 많은 기능을 쓸 수 있는것을 볼 수 있다
  return Mono.just("Hello");
}
  • 더 이상, DeferredResult를 사용하지 말고, Mono 형식을 사용하면 된다.
  • Mono를 컨테이너라고, 생각을 하게 되면, 데이터를 Mono라는 컨테이너를 씌어 반환한다고 생각하면 된다.
    • Mono 컨테이너로 감싸기 때문에 Mono을 여러 기능을 사용할 수 있다.

WebClient + Mono로 변경(스프링 5 이상)

@RestController
  public static class MyController {

    public static final String URL1 = "http://localhost:8081/service1?req={req}";
    public static final String URL2 = "http://localhost:8081/service2?req={req}";

    @Autowired
    private MyService myService;

    WebClient client = WebClient.create(); //AsyncRestTemplate 와 비슷하다고 생각하면 된다.

    @GetMapping("/rest")
    public Mono<String> rest(int idx) {
      /*
          Mono<ClientResponse> res = client.get().uri(URL1, idx).exchange(); //URL1에 해당하는 API를 호출한 코드
          //ClientResponse에 header,status,body 정보가 있다
          //정의하는 것만으로는 API가 호출되지 않는다.
          //res.subscribe(); //이 동작을 해야지만, 실제 API를 호출한다. //이 동작은 스프링이 해준다(리턴 타입이 Mono일 경우)
          //Mono 안에 ClientResponse 안에 감싸 있기 때문에 이를 바꿔서 변경해 주어야한다.
          Mono<String> body = res.flatMap(clientResponse -> clientResponse.bodyToMono(String.class));
          //map은 원소를 변환후, 담겨져 있던 곳에 다시 담고, flatMap 같은 경우 다시 담아 주지 않는다.
          return body;
       */
      return client.get().uri(URL1).exchange().flatMap(c -> c.bodyToMono(String.class)); //한 문장으로 리펙토링 가능
    }
  }
  • Spring5 이상부터는 더 이상 AsyncRestTemplate를 사용하지 않고, WebClient를 사용한다.
    • WebClient에는 메서드, 호출할 URI, 전달할 파라미터, 적용(exchange) 등의 메서드가 있다.
    • AsyncRestTemplate과 비슷하다.
    • exchange 한다고, 호출되는 것이 아니라, 실제 subscribe가 실행될 때(구독을 신청할 때) 실행이 된다.
    • 스프링에서는 Mono를 반환하게 되면, 스프링 컨테이너가 비동기 실행을 한 뒤, res.subscribe 동작을 실행해 준다.
  • WebClient가 반환하는 값은 Mono<ClientResponse> 이므로, Mono<String>으로 변환하기 위해 flatMap을 사용했다.
    • bodyToMono가 반환하는 값이 Mono<데이터> 이므로 map을 실행하게 되면 Mono<Mono<데이터>> 형식으로 감싸 진다.
    • flatMap을 사용해 감싸져 있는 값을 분해해서 리턴한다고 생각하면 된다.
  • 변수에 담지 않고, 풀어서 쓰면 한 줄로 간단하게 코드를 쓸 수 있다.

비동기 요청을 여러개 순차적으로 보내기

@GetMapping("/rest")
public Mono<String> rest(int idx) {
  return client.get().uri(URL1, idx).exchange() //Mono<ClientResponse>
      .flatMap(c -> c.bodyToMono(String.class)) //Mono<String>
      .flatMap(res1 -> client.get().uri(URL2, res1).exchange()) //Mono<ClientResponse>
      .flatMap(c -> c.bodyToMono(String.class)) //Mono<String>
    ;
}
  • flatMap에 파라미터 인자로, res1을 결과를 전달한다.
  • 전달된 파라미터를 URL2의 파라미터로 넘겨 Mono<ClientResponse>로 받아온다
  • 받아온 결과는 다시 Mono<String>으로 바꿔주어야 한다.
  • flatMap은 Mono의 값을 파라미터로 받아 람다식을 실행하고, 어떤 값으로 바꿔주는 역할만 한다고 생각하면 된다.
    • 현재는 API를 Call한 값으로 바꿔주거나, Mono<String>으로 바꿔주는 형식으로 동작한다.

Service 비동기로 실행하기

Mono : 하나의 결과값만 가지고 있음을 증빙하는 Reactive 스타일 데이터

@Slf4j
@EnableAsync
@SpringBootApplication
public class Chapter12Application {

    @RestController
    public static class MyController {

        public static final String URL1 = "http://localhost:8081/service1?req={req}";
        public static final String URL2 = "http://localhost:8081/service2?req={req}";

        @Autowired
        private MyService myService;

        WebClient client = WebClient.create(); //AsyncRestTemplate 와 비슷하다고 생각하면 된다.

        @GetMapping("/rest")
        public Mono<String> rest(int idx) {
            return client.get().uri(URL1, idx).exchange()
                .flatMap(c -> c.bodyToMono(String.class))
                .flatMap(res1 -> client.get().uri(URL2, res1).exchange())
                .doOnNext(c -> log.info("log1 : {}",c)) // Publisher 에서 넘어올때, doOnNext로 로그를 찍을 수 있다.
                .flatMap(c -> c.bodyToMono(String.class))
                .doOnNext(c -> log.info("log2 : {}",c)) //.map( res2 -> myService.work(res2)) //퍄랴미터도 Mono<String> 반환도 Mono<String>이라 map을 써도 된다.
                // 만약 myService.work 가 오래걸리는 서비스이다? -> 동기적으로 실행시키게 되므로, 스레드가 blocking 된다.
                .flatMap( res2 -> Mono.fromCompletionStage(myService.work(res2))) //CompletableFuture<String> -> Mono<String>
                .doOnNext(c -> log.info("log3 : {}",c)) //끝나고 로그를 찍어도 된다
            ;
        }
    }
    @Service
    public static class MyService {
      @Async
      public CompletableFuture<String> work(String req) {
        return CompletableFuture.completedFuture(req + "/asyncwork");
      }
    }
}
  • 이전에는 service를 호출할 때, 하나의 스레드에서 동기적으로 호출되기 때문에 Service가 실행되는 동안 스레드는 Blocking 됐다.
  • 이러한 문제를 해결하기 위해 메서드에 @Async 애노테이션을 붙이며, CompletableFuture를 반환형으로 받는 코드로 수정한다.
    • 꼭 CompletableFuture 일 필요는 없으며, ListableFuture 등, 다른 비동기를 받을 수 있는 타입으로 선언하면 된다.
  • myService.work는 CompletableFuture를 반환하므로, flatMap에서 호출한 결과를 Mono의 static 메서드인 fromCompletionStage을 호출하여, 값을 CompletableFuture<String>에서 Mono<String>으로 바꾼다.
  • 중간중간 Publisher에 어떤 값이 넘어갔는지를 확인하기 위해서는 doOnNext 함수를 통해 로그도 찍어 볼 수 있다.

 

 

728x90