Toby의 ReactiveProgramming

비동기 RestTemplate과 비동기 MVC/Servlet

webmaster 2022. 8. 26. 08:51
728x90

Thread Pool Hell(스레드 풀 지옥)

LinkedIn의 Thread Pool

2~4 시사이에 Pool이 Maxium까지 간다.
Thread pool 이 꽉차기 떄문에 Latency가 급격히 증가했다.
최신의 BackEnd 구조

  • 2~4시 사이에 요청이 급격히 많아졌고, 그에 따라 Latency도 급격히 증가한 것을 볼 수 있다
  • Thread요청이 급격히 늘어나 Queue에 대기하고 있던 요청들도 늘게 되어, Latency가 길어진 것을 볼 수 있다.
  • 왜 이렇게 Latency가 늘어났을까?
    • 최신 BackEnd 구조를 보면, 외부 API 호출을 통한 요청으로 서로 간 통신을 한다
    • 외부 API 호출을 통한 요청은 I/O 요청으로 Blocking 처리가 되고, 스레드가 Blocking 상태가 되어 놀고 있기 때문에 Latancy가 증가한 것이다.
  • 스레드를 늘리는것은 근본적인 해결이 될 수 없다(스레드 개수를 늘리는 것은 한정되어 있으며, 오히려 잦은 Context-Switching으로 인한, 성능 하락이 야기될 수 있다)
    • 특히 Blocking 상태가 되어있는 Thread는 즉시 Context-Switching 대상이 되기 때문에 해결책이 될 수 없다.

CyclicBarrier.await

@RestController //@ResponseBody + @Controller
public class MyController2 {

  @GetMapping("/rest")
  public String rest(int idx){
    return "rest " + idx;
  }
}

application.properties

server.tomcat.threads.max=1
spring.task.execution.pool.core-size= 1
@Slf4j
public class LoadTest {

  private static AtomicInteger counter = new AtomicInteger(0);

  public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    ExecutorService es = Executors.newFixedThreadPool(100);

    RestTemplate rt = new RestTemplate();
    String url = "http://localhost:8080/rest?idx={idx}";

    //100개의 요청을 모아서 100개 생성후 한번에 돌리고 싶으면 어떻게 할까? 동기화
    CyclicBarrier barrier = new CyclicBarrier(101);


    for (int i = 0; i < 100; ++i) {
      //es.execute(() -> {
      es.submit(() -> {
        int idx = counter.addAndGet(1);

        //exception을 처리하는 코드가 람다식 안(Runnable)에 있다면, 메서드 밖으로 던질 수 없고 try-catch로 잡아야한다.
        barrier.await(); //await를 만난 숫자가 CyclicBarrier에 파라미터로 들어온 숫자로 되게 되면, 아래 코드가 실행이 된다.

        log.info("Thread {}", idx );

        StopWatch sw = new StopWatch();
        sw.start();

        String res = rt.getForObject(url, String.class, idx);

        sw.stop();
        log.info("Elapsed {} {} {}", idx, sw.getTotalTimeMillis(), res);
        return null;
      });
    }

    barrier.await();
    StopWatch main = new StopWatch();
    main.start();

    es.shutdown();
    es.awaitTermination(100, TimeUnit.SECONDS);

    main.stop();
    log.info("Total: {}", main.getTotalTimeSeconds());
  }
}
  • CyclicBarrier를 선언하여, 스레드를 동기화 할 수 있다.
  • CyclicBarrier 생성자로 넘긴 파라미터의 값 만큼 await를 호출하게 되면, 생성된 스레드가 그 순간 실행된다(동기화)
  • 현재 101을 파라미터로 받아 for문에서 100번 호출후, for문 탈출 후 1번을 실행해, 그 순간 thread를 동기화시켜 스레드를 실행시키면서 mainStopWatch 시간을 잰다.

대기상태의 스레드가 어떤문제를 일으키는지 살펴보자

controller(외부 API 호출, Blocking이 된다)

@RestController 
public class MyController2 {
  RestTemplate rt = new RestTemplate();

  @GetMapping("/rest")
  public String rest(int idx) {
    String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class,
        "hello" + idx);
    return res;
  }
}

외부 Service

@SpringBootApplication
public class RemoteService {

  @RestController
  public static class MyController {

    @GetMapping("/service")
    public String service(String req) throws InterruptedException {
      Thread.sleep(2000);
      return req + "/service";
    }
  }

  public static void main(String[] args) {
    //SpringBoot를 띄울때 Properties 값을 동적으로 설정해서 띄울수 있다.
    System.setProperty("server.port", "8081");
    System.setProperty("server.tomcat.threads.max", "1000");
    System.setProperty("spring.task.execution.pool.core-size", "1000");
    SpringApplication.run(TobyReactiveProgrammingApplication.class, args);
  }
}
  • setProperty로 동적으로 프로퍼티 값을 전달해 줄 수 있다.
  • 호출되는 API 는 service로 2초의 시간이 지난 뒤 반환한다.
  • 해당 외부 WEB에서 처리하는 스레드 Pool은 여유 있다고 가정

LoadTest를 실행해보자(위와 동일)

@Slf4j
public class LoadTest {

  public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    //...
    String url = "http://localhost:8080/rest?idx={idx}";//100개의 요청을 모아서 100개 생성후 한번에 돌리고 싶으면 어떻게 할까? 동기화
    for (int i = 0; i < 100; ++i) {
      es.submit(() -> {
      	//...
        String res = rt.getForObject(url, String.class, idx);
        //...
      });
    }
    //...
  }
}
  • 요청 100개를 전달하는데 요청에서 호출하는 외부 API가 한 개당 2초의 Blocking이 걸린다.
    • RestTemplate의 getForObject메서드는 Blocking 메서드이다.
  • 스레드 풀보다 많은 요청이 발생하게 되면, 스레드가 반환될 때까지 Blocking을 기다려 요청을 처리하지 못하고 있다(CPU는 놀고 있다)
  • 해결방안은 API호출 하는 방법(getForObject)을 비동기로 호출하면 된다.
  • Callable과 DefferedResult로는 해결하기 힘들다(callable 같은 경우 워킹 스레드가 늘어나고, DefferedResult처럼 이벤트가 발생하는 것이 아니기 때문에)

AsyncRestTemplate

AsyncRestTemplate rt = new AsyncRestTemplate();
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
  ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
      "http://localhost:8081/service?req={req}", String.class,
      "hello" + idx);
  //스프링이 ListenableFuture<ResponseEntity<String>>을 반환하게 되면 callback을 알아서 등록하고, 비동기로 실행시켜준다
  return res;
}
  • AsyncRestTemplate을 사용하면 비동기적인 요청을 할 수 있다
  • getForEntity 메서드는 ListenableFuture<ResponseEntity<String>>을 반환하게 되고, ListenableFuture<ResponseEntity<String>>를 스프링에서 반환하게 되면, 스프링이 자동으로 많은 일을 해준다.
    • 스프링이 callback을 알아서 등록하고, 비동기로 실행시켜준다.
    • 따라서 개발자는 따로 CallBack을 생성하지 않아도 된다.
  • AsyncRestTemplate 를 쓰게 되면 사실 백그라운드에서 스레드를 요청 개수만큼 생성해서 하나의 스레드로 보내는 것처럼 해주는 것이기 때문에 효과적인 방법이 아니다(결국 백그라운드에 스레드가 100개 생성된다)

Netty 사용

AsyncRestTemplate rt = new AsyncRestTemplate(
    new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
  ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
      "http://localhost:8081/service?req={req}", String.class,
      "hello" + idx);
  return res;
}

 

  • RestTemplate에서 지원하는 Netty 라이브러리(비동기 라이브러리)를 사용하여 작성해 보자
  • 스레드가 거의 늘어나지 않고 비동기적인 처리가 가능한 것을 볼 수 있다.
  • 스프링에서 Blocking을 호출하는 메서드가 많다고 하더라도, Blocking되지 않고 비동기적으로 호출할 수 있도록 도와주는 라이브러리이다.

데이터 가공

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
  DeferredResult<String> dr = new DeferredResult<>();
  ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
      "http://localhost:8081/service?req={req}", String.class,
      "hello" + idx);
  res.addCallback(s -> {
    dr.setResult(
        s.getBody()//원하는 결과값
        + "/work"
    );
  }, e->{
    //비동기 작업을 콜백으로 처리할 때는 예외를 전파하면 안된다.
    //순수하게 deferredResult를 통해 처리를 해야한다.
    dr.setErrorResult(e.getMessage()); //이걸 받는 클라이언트에서 적절히 처리하면 된다.
  });
  return dr;
}
  • 값을 받아 가공을 해야될 경우에는 callback 방식을 사용해서 처리해줘야 한다(get() 방식으로 써서 동기적으로 하면 X)
  • 이때 사용되야 하는 것이 DeferredResult로 스프링에서 결괏값이 나오지 않았을 때는 DeferedResult값을 반환했다가 만약 값이 응답 왔다면 callback 함수에서 setResult로 이벤트를 동작시켜 DeferredResult에 스레드를 할당해서 반환시켜준다.
  • callback 방식에서는 예외를 전파하면 안된다
    • 어느 스레드가 받아 처리하는지 알 수 없기 때문에 callback을 호출하는 곳에서 예외를 처리해주어야 한다

Callback 지옥

여러 비동기 작업을 순차적으로 진행해야 된다면 어떻게 할까?

외부 API (service1, service2)

@SpringBootApplication
public class RemoteService {

  @RestController
  public static class MyController {

    @GetMapping("/service1")
    public String service1(String req) throws InterruptedException {
      Thread.sleep(2000);
      return req + "/service1";
    }

    @GetMapping("/service2")
    public String service2(String req) throws InterruptedException {
      Thread.sleep(2000);
      return req + "/service2";
    }

  }

  public static void main(String[] args) {
    //SpringBoot를 띄울때 Properties 값을 동적으로 설정해서 띄울수 있다.
    System.setProperty("server.port", "8081");
    System.setProperty("server.tomcat.threads.max", "1000");
    System.setProperty("spring.task.execution.pool.core-size", "1000");
    SpringApplication.run(TobyReactiveProgrammingApplication.class, args);
  }
}
@RestController 
public class MyController2 {
  AsyncRestTemplate rt = new AsyncRestTemplate(
      new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

  public static final String URL1 = "http://localhost:8081/service1?req={req}";
  public static final String URL2 = "http://localhost:8081/service2?req={req}";

  @Autowired
  MyService myService;

  @GetMapping("/rest")
  public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<>();
    ListenableFuture<ResponseEntity<String>> res1 = rt.getForEntity(URL1, String.class,"hello" + idx);

    res1.addCallback(s -> {
      ListenableFuture<ResponseEntity<String>> res2 = rt.getForEntity(URL2, String.class,s.getBody());
      res2.addCallback(s2 -> {
        ListenableFuture<String> res3 = myService.work(s2.getBody());
        res3.addCallback(s3 -> {
          dr.setResult(s3);
        }, e -> {
          dr.setErrorResult(e.getMessage());
        });
      }, e -> {
        dr.setErrorResult(e.getMessage());
      });
    }, e -> {
      dr.setErrorResult(e.getMessage());
    });
    return dr;
  }

  @Service
  public static class MyService {
    @Async
    public ListenableFuture<String> work(String req) {
      return new AsyncResult<>(req + "/asyncwork");
    }
  }

  @Bean
  public ThreadPoolExecutor myThreadPool() {
    ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
    return te;
  }
}
  • 외부 API 호출 2번 내부적 작업 1번 총 3번의 비동기 작업이 동작한다.
    • 3개의 작업을 비동기로 실행시키기 때문에, 빠르게 실행이 가능하다.
  • Backgroud에서 비동기적으로 데이터를 가지고 와서 조합하기 때문에 이를 컬렉션으로 저장해서 관리하는 코드를 작성하기는 어렵다.
  • 문제를 해결하였지만, 콜백이 계속 연관되어 나오기 때문에 코드를 읽기 복잡하다.
    • 병렬로 데이터를 조합하기도 어렵다.
    • 매번 에러를 잡아주는 방식 e -> {...} 코드 또한 매번 나오기 때문에 중복 코드가 발생한다

DB를 사용하는 코드는 보통 JDBC를 사용하고, 이런 JDBC는 Blocking 통신을 하기 때문에, 비동기 통신은 불가능하다.  -> 현재는 오라클이 뒤늦게 비동기 처리가 가능한 JDBC를 개발하고 있다. 

 

 

728x90