Toby의 ReactiveProgramming

자바와 스프링의 비동기 기술

webmaster 2022. 8. 22. 22:30
728x90

JAVA에서의 비동기 작업(Future)

JVM의 Future란? JVM 1.5에서 나왔다, 비동기적인 연산,작업에 대한 결과를 가지고 있는 인터페이스

public static void main(String[] args) throws InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();
    Thread.sleep(2000);
    System.out.println("Hello"); //2초 후 해당 결과가 실행되고 Exit 실행
    System.out.println("Exit");
}
  • 2초 뒤에 Hello가 출력되고, Exit가 출력된다
  • 동기적 실행이 된다.

비동기적 실행

public static void main(String[] args) throws InterruptedException {
  ExecutorService es = Executors.newCachedThreadPool();

  es.execute(() ->{ //Runable은 리턴을 받을 수 없다.
    //독자적으로 작업을 실행후 종료한다.
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) { }
    log.info("Async");

  });

  log.info("Exit"); // 해당 코드가 먼저 실행된다.
}

비동기적 실행

  • 비동기적으로 실행되어 Exit출력 후, Async가 출력이된다.
  • 스레드 풀에서 스레드를 할당해서 실행했는지를 보기 위해 log를 찍었다.
  • es.excute는 파라미터로 Runable를 받으며, Runable은 리턴값이 없다. ->리턴값이 있는것을 사용하기 위해서는 submit을 쓰자
public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService es = Executors.newCachedThreadPool();
  Future<String> future = es.submit(() -> { //callable를 받기 때문에 return과 예외를 받을 수 있다.
    Thread.sleep(2000);
    log.info("Async");
    return "Hello";
  });    //Future를 통해 결과를 받을 수 있다.
  System.out.println(future.get()); //결과가 나올때까지 코드가 blocking 된다(Blocking)
  //만약 결과를 반환하지 않을경우(Non-Blocking)
  log.info("Exit");//비동기 작업이 끝나고, Exit이 호출되었다.
}
  • Blocking : 비동기적 실행을 결과를 아직 받지 않아 코드가 더이상 실행되지 않고 기다리고 있다
  • Non-Blocking : 비동기적 실행을 결과를 받을 것이 없기 때문에 코드를 Blocking 하지 않고, 바로 실행 시킨다
  • es.submit() 같은 경우 파라미터로 callable를 받기 때문에 리턴값과 예외를 처리해줄수 있다.
  • Future는 비동기적으로 처리한 결과를 받을때 사용하며, .get() 메서드를 사용하게 되면, Blocking상태로 만들 수 있다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService es = Executors.newCachedThreadPool();
  Future<String> future = es.submit(() -> {
    Thread.sleep(2000);
    log.info("Async");
    return "Hello";
  });

  System.out.println(future.isDone()); //비동기 작업이 완료 됬는가를 return 해준다
  //다른 작업과 비동기 적으로 실행할 때, 사용하기 좋다.
  log.info("Exit"); //해당 코드를 실행 후, blocking 상태로 된다.
  Thread.sleep(2100);
  System.out.println(future.isDone());
  System.out.println(future.get());
}
  • Blocking 이 된다고, 비동기적으로 실행이 되지 않는것이 아니라 Blocking을 실행하기 전까지는 비동기적으로 실행되기 때문에 이를 효과적으로 사용할 수도 있다.
  • future에는 isDone() 메서드가 존재하며, 비동기적으로 실행한 함수의 결과가 반환 되었는지를 즉시 확인할 수도 있다.
    • 이를 응용하면, while문을 조건절에 해당 메서드를 써서 비동기함수가 끝났는지 즉시 확인하여, Non-Blocking상태로 만들 수 있다.

JAVA에서의 비동기 작업을 받아오는 방법중 하나인 Future를 살펴보았고, 다음 방법인 Callback을 살펴보자

JAVA에서의 비동기 작업 (Callback)

public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService es = Executors.newCachedThreadPool();
  FutureTask<String> futureTask = new FutureTask<>(() -> { //Callback 을 처리할 수 있는 메커니즘이 다 존재한다.
    Thread.sleep(2000);
    log.info("Async");
    return "Hello";
  }){ //익명 클래스로 만든다
    @Override
    protected void done() { //비동기 작업이 완료되면 실행되는 hook
      try {
        System.out.println(get()); //get() 을 여기서 실행
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
    }
  };
  es.execute(futureTask);
  es.shutdown(); //executor 서비스가 끝나게 되면 반드시 종료하자
}
  • FutureTask를 통해 Callback을 처리할 수 있는 매커니즘을 해결 할 수 있다.
  • done() 메서드는 비동기 작업이 완료되면 실행되는 것으로, done()메서드에서 get()을 호출해서 비동기로 호출된 결과를 가지고 오고 있다.

Callback 패턴을 클래스로 만들어 사용하기

interface SuccessCallback { //성공 콜백
  void onSuccess(String result);
}

public static class CallbackFutureTask extends FutureTask<String> {
  SuccessCallback sc; //생성시 실행될 람다를 생성자로 받는다.

  public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
    super(callable);
    //sc는 널이면 안된다.
    this.sc = Objects.requireNonNull(sc); //sc가 null이 아니면 그 값을 리턴한다.
  }

  @Override
  protected void done() { //hook을 재정의한다.
    try {
      sc.onSuccess(get()); //get()이 반환하는 리턴값을 넣어준다.
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService es = Executors.newCachedThreadPool();
  CallbackFutureTask callbackFutureTask = new CallbackFutureTask(() -> {
    Thread.sleep(2000);
    log.info("Async");
    return "Hello";
  //}, res -> System.out.println(res)); //Success 도 람다로 받을 수 있다.
  }, System.out::println);

  es.execute(callbackFutureTask);
  es.shutdown();
}
  • Callback을 파라미터로 받아 실행시킬수 있다.
    • 생성자를 통해, 콜백시, 실행될 람다를 입력받아 done() 메서드(hooks)에서 실행해준다.
    • 콜백 인터페이스 같은 경우에도 실행될 메서드는 하나이므로 람다로 받을 수 있다.
  • 메서드 레퍼런스(System.out::println)을 사용해 코드를 더 간결하고, 가독성 있게 작성할 수 있다.

예외까지 처리하는 Callback 작성하기

@Slf4j
public class FutureEx {
  interface SuccessCallback { 
    void onSuccess(String result);
  }
  interface ExceptionCallback { //비동기 작업을 하다 예외가 발생시 해당 인터페이스를 받게끔 한다.
    void onError(Throwable t);
  }
  public static class CallbackFutureTask extends FutureTask<String> {
    SuccessCallback sc; 
    ExceptionCallback ec;
    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
      super(callable);
      this.sc = Objects.requireNonNull(sc); 
      this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
      try {
        sc.onSuccess(get()); 
      } catch (InterruptedException e) { //작업을 진행하지 말고, 종료하라는 메시지를 준다.
        Thread.currentThread().interrupt();
      } catch (ExecutionException e) { //비동기 작업시 발생하는 예외, 현재 es에 처리해야될 예외
        ec.onError(e.getCause()); //e는 예외가 포장되어 있기 때문에 까서 전달
      }
    }
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService es = Executors.newCachedThreadPool();
    CallbackFutureTask callbackFutureTask = new CallbackFutureTask(() -> {
      Thread.sleep(2000);
      if(true)
        throw new RuntimeException("Async ERROR!!!");
      log.info("Async");
      return "Hello";
    }, System.out::println,
        e -> System.out.println("Error: " + e.getMessage()));

    es.execute(callbackFutureTask);
    es.shutdown();
  }
}

결과

  • 생성자의 3번째 파라미터로 예외를 처리하는 방식을 전달한다.
  • done(hook)에서 catch 하는 예외중 InterruptedException 같은 경우 조금 특별한 예외이므로 다르게 처리하자
  • 실제 우리가 던져주어야 하는 예외는 ExecutionException으로 비동기 작업시 발생한 에외가 해당 부분에 오게 된다.
    • 반드시 예외를 발생시키기 위해 if(true)로 컴파일러를 속여 예외를 발생 시켯다.
  • 원하는 방식으로 예외가 커스텀되어 출력되는것을 확인할 수 있다.

문제점 : 성격이 다른 기술적인 코드와 비지니스 코드가 main 메서드 안에 짬뽕되어 있다.

Spring에서는 어떻게 동작 시켰는가?

Future 방식

@Slf4j
@Component
public class MyService {

  @Async //해당 어노테이션만 붙이는 순간 비동기 작업이 수행된다.
  public Future<String> hello() throws InterruptedException{ //비동기 작업은 Future로 결과를 받아야한다
    Thread.sleep(2000);
    log.info("hello()");
    return new AsyncResult<>( "Hello"); //비동기 결과값을 담아주는 클래스
  }

}
@Slf4j
@EnableAsync //해당 어노테이션을 선언해야된다
@SpringBootApplication
public class TobyReactiveProgrammingApplication {

  public static void main(String[] args) {
    try (ConfigurableApplicationContext c = SpringApplication.run(
        TobyReactiveProgrammingApplication.class, args)) {

    }
  }

  @Autowired
  MyService myService;

  @Bean
  public ApplicationRunner run() {
    //SpringBoot 가 뜨면서 바로 실행할 코드를 작성한다
    return args -> {
      log.info("run()");
      Future<String> future = myService.hello();
      log.info("exit: {}", future.isDone());
      log.info("result: {}", future.get());
    };
  }

}

  • 전 코드와 달리 관심사가 분리되었다.
  • 비동기작업을 처리할 메서드에 @Async 어노테이션을 붙이면 된다.
    • 비동기 작업을 동작시키기 위해서는 @EnableAsync 어노테이션을 붙여야 한다.
  • run() 빈 같은경우 스프링 부트가 실행될때(모든 빈들이 다 올라왔을 때) 실행되는 메서드이다.
    • 비동기 코드를 해당 부분에서 호출하여, 비동기 호출이 되는지 확인하였다.
    • 비동기 적으로 실행한 코드는 다른 스레드에서 실행된 것을 볼 수있다.

이와 같은 방식은 예전 스프링에서 사용한 방식으로, f.get() 으로 받아오는 결과가 오래 걸릴경우 어떻게 했냐?

1) 결과 값을 DB에 저장하고, DB를 계속 Access하는 방식으로 동작시켰다.

2) future를 세션에 저장한 뒤, 리턴한다. -> 다음 요청시 세션에서 future를 꺼내 isDone을 확인후, 성공적으로 완료했는지 확인한다.

Callback 방식

@Slf4j
@Component
public class MyService {
  @Async
  public ListenableFuture<String> hello() throws InterruptedException{ //스프링에서 만들어 놓은 ListenableFuture를 사용해야한다
    log.info("hello()");
    Thread.sleep(2000);
    return new AsyncResult<>( "Hello");
  }
}
@Slf4j
@EnableAsync 
@SpringBootApplication
public class TobyReactiveProgrammingApplication {

  public static void main(String[] args) {
    try (ConfigurableApplicationContext c = SpringApplication.run(
        TobyReactiveProgrammingApplication.class, args)) {

    }
  }

  @Autowired
  MyService myService;

  @Bean
  public ApplicationRunner run() {
    //SpringBoot 가 뜨면서 바로 실행할 코드를 작성한다
    return args -> {
      log.info("run()");
      ListenableFuture<String> listenableFuture = myService.hello();
      listenableFuture.addCallback(s -> System.out.println(s),
          e -> System.out.println(e.getMessage()));
      log.info("exit");
    };
  }

}

  • 콜백 방식으로 하면은 더 장점이 많다
    • future처럼 get()메서드 호출시 기다릴 필요없이 바로 메서드가 실행이 가능하다.
  • 스프링이 만들어 놓은 ListenableFuture를 사용해야한다.
    • 자바 8를 적극적으로 활용한 CompleteFuture도 있다.

TIP : Async를 아무것도 설정하지 않고, 실행하면 SimpleAsyncTaskExecutor가 실행된다

좋지 않다!! 매번 새로운 스레드를 생성하고, 캐싱등의 기능을 사용하지 않고, 사용하고 버린다.

ThreadPool Bean을 하나 등록해서 ThreadPool를 설정하자!

@Bean
public ThreadPoolExecutor threadPoolExecutor() {
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 100,
      TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
  return threadPoolExecutor;
}

Bean으로 등록한 threadPool을 사용해 실행한 것을 볼수 있다.

  • maxPoolSize는 Queue가 꽉 찾을때 동작한다
    • CorePoolSize가 꽉차면 Queue에 데이터를 쌓고, 만약 이 Queue까지 모두 다 차게된다면 maxPoolSize만큼 늘리는 것이다.
  • ThreadPool이 여러개 정의되어 있고, Async에서 이를 지정해서 사용해야될 경우 @Async(value=빈이름)으로 스레드 풀을 지정하면 된다.

비동기 서블릿

서블릿에서의 비동기

  • Servlet 3.0 이전에는 OutputStream,InputStream을 사용하므로, Blocking 구조이다
    • 스레드가 Blocking이 되면 CPU와, 메모리 자원을 많이 소모하게 된다.
    • 스레드가 많아지게 되면 오히려 ContextSwitch가 빈번하게 발생하기 때문에 효율적이지 않다.
  • 하나의 스레드가 req -> blocking IO(DB, API Call) -> res(html) 동작을 한다고 하면, blocking 상태일때, 해당 스레드는 대기하고 있기 때문에 동시에 처리할 수 있는 작업량이 낮아지게 된다.
    • 외부의 리소스를 불러오거나 하는 Blocking 작업에 너무 많은 리소스를 소비한다는 것이 큰 문제이다.
  • 서블릿 3.0, 3.1에서는 이러한 문제를 해결하고 비동기로 요청을 해결하도록 업그레이드 되었다.

서블릿은 잘 사용되지 않고, 대부분 Spring Framework를 활용하여 사용하기 때문에 Spring에서는 이를 어떻게 반영했나 살펴보자

Spring에서의 비동기1 (Callable 리턴)

Spring에서 비동기 프로세스

  • NIO커넥트가 요청을 받아 서블릿 스레드 풀로부터 스레드를 할당 받고, 작업스레드에 요청을 맡긴 후, 스레드를 서블릿 스레드 풀로 반환한다.
  • 작업 스레드가 결과를 비동기로 실행하고, 결과를 반환하게 되면, 이때 다시 서블릿 스레드 풀에서 스레드를 할당해서 NIO 커넥트에게 반환한다.
@Slf4j
@RestController
public class MyController {

  @Autowired
  private MyService myService;

  @GetMapping("/callable")
  public Callable<String> callable() throws InterruptedException {
    log.info("callable");
    return () -> {
      log.info("async");
      Thread.sleep(2000); //이 작업을 수행하는 동안 2초간 Blocking 이 된다
      return "Hello";
    };
  }
}

결과

  • Callable를 리턴하도록 하면 Non-Blocking으로 코드가 동작한다.
  • Tomcat에서 할당된 스레드가 callable 로그를 찍고, 비동기로 코드를 실행 후, 바로 스레드 풀에 스레드를 반환한다.
    • 비동기로 실행된 코드가 이를 실행하고, 결과를 반환할 때, 다시 스레드 풀에 스레드를 할당 받아 요청을 반환하고 종료한다.

다음 예제를 살펴보자!

@GetMapping("/callable")
public String callable() throws InterruptedException {
  log.info("async");
  Thread.sleep(2000); //이 작업을 수행하는 동안 2초간 Blocking 이 된다
  return "Hello";
}
@Slf4j
public class LoadTest {

  private static AtomicInteger counter = new AtomicInteger(0); //다른 스레드 동시에 접근X

  public static void main(String[] args) throws InterruptedException {
    ExecutorService es = Executors.newFixedThreadPool(100); //스레드 100개 할당

    RestTemplate rt = new RestTemplate();
    String url = "http://localhost:8080/callable";

    StopWatch main = new StopWatch(); //매인 스레드가 실행되는 시간을 찍어보자
    main.start();
    for (int i = 0; i < 100; ++i) {
      es.execute(() -> {
        int idx = counter.addAndGet(1);
        log.info("Thread {}", idx );

        StopWatch sw = new StopWatch();
        sw.start();

        rt.getForObject(url, String.class);

        sw.stop();
        log.info("Elapsed {} {}", idx, sw.getTotalTimeMillis());
      });
    }

    es.shutdown();
    es.awaitTermination(100, TimeUnit.SECONDS);//100초 안에 안끝나면 강제 종료

    main.stop();
    log.info("Total: {}", main.getTotalTimeSeconds());
  }
}

총 5초의 시간이 걸린것을 알 수 있으며, 스레드가 각각 할당된 것을 확인할 수 있다

application.properties

server.tomcat.threads.max=1
  • 스레드를 100개를 스레드풀에 생성하여, 할당하는 방식이다.
  • 톰캣의 기본 스레드 요청 처리량은 200이기 때문에 100개는 문제없이 처리하지만, 더 늘어나게 된다면 효율적이지 않다
    • 스레드를 요청만큼 생성하기 때문에 좋지 않은 방법이다.
    • 총 걸리는 시간이 5초가 걸리지만, 원래는 2.N 초가 걸려야한다(톰캣의 기본 스레드 요청 처리량이 200이기 때문)
  • 총 스레드가 1개여도 걸리는 시간은 차이가 안난다(why? 비동기적으로 스레드를 실행시키기 때문에)

스프링에서의 비동기2( DeferredResult를 반환하여 대기, Event 전달 -> 실행)

@GetMapping("/dr")
public DeferredResult<String> dr() throws InterruptedException{
  log.info("dr");
  DeferredResult<String> dr = new DeferredResult<>();
  results.add(dr);
  return dr;
}

@GetMapping("/dr/count")
public String drCount(){
  return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drEvent(String msg){
  for(DeferredResult<String> dr : results){
    dr.setResult("Hello " + msg);
    results.remove(dr);
  }
  return "OK";
}

비동기 작업 후 서블릿 응답 처리

  • 요청이 들어오면, DeferredResult 큐에 담아둔 뒤, 외부에 이벤트가 발생하면 스레드를 할당 받아 처리해준다(지연 처리)
  • DeferredResult타입을 담은 큐가 이벤트를 기다리고 있다가 이벤트가 발생하면 큐에 대기중인 결과를 출력하고 큐를 비어준다
    • DefaultResult 큐에 들어간 데이터 DefaultResult는 setResult()나 예외가 발생하게 되면 이벤트를 발생시킨다.
  • 서블릿 스레드를 최소화하여 많은 응답을 처리해 줄수 있다.

스프링에서의 비동기3( Emitter를 통한 Stream한 데이터 전달)

@GetMapping("/emitter")
public ResponseBodyEmitter emmiter(){
  //데이터를 여러번에 나눠서 보낼수 있다.
  ResponseBodyEmitter emitter = new ResponseBodyEmitter();
  Executors.newSingleThreadExecutor().submit(() -> {
      try {
        for(int i=1; i<=50; ++i) {
          emitter.send("<p>Stream " + i + "</p>");
          Thread.sleep(2000);
        }
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
  });
  return emitter;
}

Stream 한 데이터가 비동기적으로 전달 받는다

  • http 에서 결과를 모아서 한번에 전달하는 것이 아닌 스트리밍 방식으로 데이터를 응답할 수 있다.
  • 스프링을 이용하면 http에서 따로 Streame 처리를 하지 않더라도 간편하게 할 수 있다.
728x90