JAVA에서의 비동기 작업(Future)
JVM의 Future란? JVM 1.5에서 나왔다, 비동기적인 연산,작업에 대한 결과를 가지고 있는 인터페이스
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Thread.sleep(2000);
System.out.println("Hello"); //2초 후 해당 결과가 실행되고 Exit 실행
System.out.println("Exit");
}
- 2초 뒤에 Hello가 출력되고, Exit가 출력된다
- 동기적 실행이 된다.
비동기적 실행
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() ->{ //Runable은 리턴을 받을 수 없다.
//독자적으로 작업을 실행후 종료한다.
try {
Thread.sleep(2000);
} catch (InterruptedException e) { }
log.info("Async");
});
log.info("Exit"); // 해당 코드가 먼저 실행된다.
}

- 비동기적으로 실행되어 Exit출력 후, Async가 출력이된다.
- 스레드 풀에서 스레드를 할당해서 실행했는지를 보기 위해 log를 찍었다.
- es.excute는 파라미터로 Runable를 받으며, Runable은 리턴값이 없다. ->리턴값이 있는것을 사용하기 위해서는 submit을 쓰자
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
Future<String> future = es.submit(() -> { //callable를 받기 때문에 return과 예외를 받을 수 있다.
Thread.sleep(2000);
log.info("Async");
return "Hello";
}); //Future를 통해 결과를 받을 수 있다.
System.out.println(future.get()); //결과가 나올때까지 코드가 blocking 된다(Blocking)
//만약 결과를 반환하지 않을경우(Non-Blocking)
log.info("Exit");//비동기 작업이 끝나고, Exit이 호출되었다.
}
- Blocking : 비동기적 실행을 결과를 아직 받지 않아 코드가 더이상 실행되지 않고 기다리고 있다
- Non-Blocking : 비동기적 실행을 결과를 받을 것이 없기 때문에 코드를 Blocking 하지 않고, 바로 실행 시킨다
- es.submit() 같은 경우 파라미터로 callable를 받기 때문에 리턴값과 예외를 처리해줄수 있다.
- Future는 비동기적으로 처리한 결과를 받을때 사용하며, .get() 메서드를 사용하게 되면, Blocking상태로 만들 수 있다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
Future<String> future = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
System.out.println(future.isDone()); //비동기 작업이 완료 됬는가를 return 해준다
//다른 작업과 비동기 적으로 실행할 때, 사용하기 좋다.
log.info("Exit"); //해당 코드를 실행 후, blocking 상태로 된다.
Thread.sleep(2100);
System.out.println(future.isDone());
System.out.println(future.get());
}
- Blocking 이 된다고, 비동기적으로 실행이 되지 않는것이 아니라 Blocking을 실행하기 전까지는 비동기적으로 실행되기 때문에 이를 효과적으로 사용할 수도 있다.
- future에는 isDone() 메서드가 존재하며, 비동기적으로 실행한 함수의 결과가 반환 되었는지를 즉시 확인할 수도 있다.
- 이를 응용하면, while문을 조건절에 해당 메서드를 써서 비동기함수가 끝났는지 즉시 확인하여, Non-Blocking상태로 만들 수 있다.
JAVA에서의 비동기 작업을 받아오는 방법중 하나인 Future를 살펴보았고, 다음 방법인 Callback을 살펴보자
JAVA에서의 비동기 작업 (Callback)
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> futureTask = new FutureTask<>(() -> { //Callback 을 처리할 수 있는 메커니즘이 다 존재한다.
Thread.sleep(2000);
log.info("Async");
return "Hello";
}){ //익명 클래스로 만든다
@Override
protected void done() { //비동기 작업이 완료되면 실행되는 hook
try {
System.out.println(get()); //get() 을 여기서 실행
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
};
es.execute(futureTask);
es.shutdown(); //executor 서비스가 끝나게 되면 반드시 종료하자
}
- FutureTask를 통해 Callback을 처리할 수 있는 매커니즘을 해결 할 수 있다.
- done() 메서드는 비동기 작업이 완료되면 실행되는 것으로, done()메서드에서 get()을 호출해서 비동기로 호출된 결과를 가지고 오고 있다.
Callback 패턴을 클래스로 만들어 사용하기
interface SuccessCallback { //성공 콜백
void onSuccess(String result);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc; //생성시 실행될 람다를 생성자로 받는다.
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
super(callable);
//sc는 널이면 안된다.
this.sc = Objects.requireNonNull(sc); //sc가 null이 아니면 그 값을 리턴한다.
}
@Override
protected void done() { //hook을 재정의한다.
try {
sc.onSuccess(get()); //get()이 반환하는 리턴값을 넣어준다.
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask callbackFutureTask = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
//}, res -> System.out.println(res)); //Success 도 람다로 받을 수 있다.
}, System.out::println);
es.execute(callbackFutureTask);
es.shutdown();
}
- Callback을 파라미터로 받아 실행시킬수 있다.
- 생성자를 통해, 콜백시, 실행될 람다를 입력받아 done() 메서드(hooks)에서 실행해준다.
- 콜백 인터페이스 같은 경우에도 실행될 메서드는 하나이므로 람다로 받을 수 있다.
- 메서드 레퍼런스(System.out::println)을 사용해 코드를 더 간결하고, 가독성 있게 작성할 수 있다.
예외까지 처리하는 Callback 작성하기
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}
interface ExceptionCallback { //비동기 작업을 하다 예외가 발생시 해당 인터페이스를 받게끔 한다.
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}
@Override
protected void done() {
try {
sc.onSuccess(get());
} catch (InterruptedException e) { //작업을 진행하지 말고, 종료하라는 메시지를 준다.
Thread.currentThread().interrupt();
} catch (ExecutionException e) { //비동기 작업시 발생하는 예외, 현재 es에 처리해야될 예외
ec.onError(e.getCause()); //e는 예외가 포장되어 있기 때문에 까서 전달
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask callbackFutureTask = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if(true)
throw new RuntimeException("Async ERROR!!!");
log.info("Async");
return "Hello";
}, System.out::println,
e -> System.out.println("Error: " + e.getMessage()));
es.execute(callbackFutureTask);
es.shutdown();
}
}

- 생성자의 3번째 파라미터로 예외를 처리하는 방식을 전달한다.
- done(hook)에서 catch 하는 예외중 InterruptedException 같은 경우 조금 특별한 예외이므로 다르게 처리하자
- 실제 우리가 던져주어야 하는 예외는 ExecutionException으로 비동기 작업시 발생한 에외가 해당 부분에 오게 된다.
- 반드시 예외를 발생시키기 위해 if(true)로 컴파일러를 속여 예외를 발생 시켯다.
- 원하는 방식으로 예외가 커스텀되어 출력되는것을 확인할 수 있다.
문제점 : 성격이 다른 기술적인 코드와 비지니스 코드가 main 메서드 안에 짬뽕되어 있다.
Spring에서는 어떻게 동작 시켰는가?
Future 방식
@Slf4j
@Component
public class MyService {
@Async //해당 어노테이션만 붙이는 순간 비동기 작업이 수행된다.
public Future<String> hello() throws InterruptedException{ //비동기 작업은 Future로 결과를 받아야한다
Thread.sleep(2000);
log.info("hello()");
return new AsyncResult<>( "Hello"); //비동기 결과값을 담아주는 클래스
}
}
@Slf4j
@EnableAsync //해당 어노테이션을 선언해야된다
@SpringBootApplication
public class TobyReactiveProgrammingApplication {
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(
TobyReactiveProgrammingApplication.class, args)) {
}
}
@Autowired
MyService myService;
@Bean
public ApplicationRunner run() {
//SpringBoot 가 뜨면서 바로 실행할 코드를 작성한다
return args -> {
log.info("run()");
Future<String> future = myService.hello();
log.info("exit: {}", future.isDone());
log.info("result: {}", future.get());
};
}
}

- 전 코드와 달리 관심사가 분리되었다.
- 비동기작업을 처리할 메서드에 @Async 어노테이션을 붙이면 된다.
- 비동기 작업을 동작시키기 위해서는 @EnableAsync 어노테이션을 붙여야 한다.
- run() 빈 같은경우 스프링 부트가 실행될때(모든 빈들이 다 올라왔을 때) 실행되는 메서드이다.
- 비동기 코드를 해당 부분에서 호출하여, 비동기 호출이 되는지 확인하였다.
- 비동기 적으로 실행한 코드는 다른 스레드에서 실행된 것을 볼 수있다.
이와 같은 방식은 예전 스프링에서 사용한 방식으로, f.get() 으로 받아오는 결과가 오래 걸릴경우 어떻게 했냐?
1) 결과 값을 DB에 저장하고, DB를 계속 Access하는 방식으로 동작시켰다.
2) future를 세션에 저장한 뒤, 리턴한다. -> 다음 요청시 세션에서 future를 꺼내 isDone을 확인후, 성공적으로 완료했는지 확인한다.
Callback 방식
@Slf4j
@Component
public class MyService {
@Async
public ListenableFuture<String> hello() throws InterruptedException{ //스프링에서 만들어 놓은 ListenableFuture를 사용해야한다
log.info("hello()");
Thread.sleep(2000);
return new AsyncResult<>( "Hello");
}
}
@Slf4j
@EnableAsync
@SpringBootApplication
public class TobyReactiveProgrammingApplication {
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(
TobyReactiveProgrammingApplication.class, args)) {
}
}
@Autowired
MyService myService;
@Bean
public ApplicationRunner run() {
//SpringBoot 가 뜨면서 바로 실행할 코드를 작성한다
return args -> {
log.info("run()");
ListenableFuture<String> listenableFuture = myService.hello();
listenableFuture.addCallback(s -> System.out.println(s),
e -> System.out.println(e.getMessage()));
log.info("exit");
};
}
}

- 콜백 방식으로 하면은 더 장점이 많다
- future처럼 get()메서드 호출시 기다릴 필요없이 바로 메서드가 실행이 가능하다.
- 스프링이 만들어 놓은 ListenableFuture를 사용해야한다.
- 자바 8를 적극적으로 활용한 CompleteFuture도 있다.
TIP : Async를 아무것도 설정하지 않고, 실행하면 SimpleAsyncTaskExecutor가 실행된다
좋지 않다!! 매번 새로운 스레드를 생성하고, 캐싱등의 기능을 사용하지 않고, 사용하고 버린다.
ThreadPool Bean을 하나 등록해서 ThreadPool를 설정하자!
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 100,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
return threadPoolExecutor;
}


- maxPoolSize는 Queue가 꽉 찾을때 동작한다
- CorePoolSize가 꽉차면 Queue에 데이터를 쌓고, 만약 이 Queue까지 모두 다 차게된다면 maxPoolSize만큼 늘리는 것이다.
- ThreadPool이 여러개 정의되어 있고, Async에서 이를 지정해서 사용해야될 경우 @Async(value=빈이름)으로 스레드 풀을 지정하면 된다.
비동기 서블릿

- Servlet 3.0 이전에는 OutputStream,InputStream을 사용하므로, Blocking 구조이다
- 스레드가 Blocking이 되면 CPU와, 메모리 자원을 많이 소모하게 된다.
- 스레드가 많아지게 되면 오히려 ContextSwitch가 빈번하게 발생하기 때문에 효율적이지 않다.
- 하나의 스레드가 req -> blocking IO(DB, API Call) -> res(html) 동작을 한다고 하면, blocking 상태일때, 해당 스레드는 대기하고 있기 때문에 동시에 처리할 수 있는 작업량이 낮아지게 된다.
- 외부의 리소스를 불러오거나 하는 Blocking 작업에 너무 많은 리소스를 소비한다는 것이 큰 문제이다.
- 서블릿 3.0, 3.1에서는 이러한 문제를 해결하고 비동기로 요청을 해결하도록 업그레이드 되었다.
서블릿은 잘 사용되지 않고, 대부분 Spring Framework를 활용하여 사용하기 때문에 Spring에서는 이를 어떻게 반영했나 살펴보자
Spring에서의 비동기1 (Callable 리턴)

- NIO커넥트가 요청을 받아 서블릿 스레드 풀로부터 스레드를 할당 받고, 작업스레드에 요청을 맡긴 후, 스레드를 서블릿 스레드 풀로 반환한다.
- 작업 스레드가 결과를 비동기로 실행하고, 결과를 반환하게 되면, 이때 다시 서블릿 스레드 풀에서 스레드를 할당해서 NIO 커넥트에게 반환한다.
@Slf4j
@RestController
public class MyController {
@Autowired
private MyService myService;
@GetMapping("/callable")
public Callable<String> callable() throws InterruptedException {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000); //이 작업을 수행하는 동안 2초간 Blocking 이 된다
return "Hello";
};
}
}

- Callable를 리턴하도록 하면 Non-Blocking으로 코드가 동작한다.
- Tomcat에서 할당된 스레드가 callable 로그를 찍고, 비동기로 코드를 실행 후, 바로 스레드 풀에 스레드를 반환한다.
- 비동기로 실행된 코드가 이를 실행하고, 결과를 반환할 때, 다시 스레드 풀에 스레드를 할당 받아 요청을 반환하고 종료한다.
다음 예제를 살펴보자!
@GetMapping("/callable")
public String callable() throws InterruptedException {
log.info("async");
Thread.sleep(2000); //이 작업을 수행하는 동안 2초간 Blocking 이 된다
return "Hello";
}
@Slf4j
public class LoadTest {
private static AtomicInteger counter = new AtomicInteger(0); //다른 스레드 동시에 접근X
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100); //스레드 100개 할당
RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/callable";
StopWatch main = new StopWatch(); //매인 스레드가 실행되는 시간을 찍어보자
main.start();
for (int i = 0; i < 100; ++i) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx );
StopWatch sw = new StopWatch();
sw.start();
rt.getForObject(url, String.class);
sw.stop();
log.info("Elapsed {} {}", idx, sw.getTotalTimeMillis());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);//100초 안에 안끝나면 강제 종료
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}

application.properties
server.tomcat.threads.max=1
- 스레드를 100개를 스레드풀에 생성하여, 할당하는 방식이다.
- 톰캣의 기본 스레드 요청 처리량은 200이기 때문에 100개는 문제없이 처리하지만, 더 늘어나게 된다면 효율적이지 않다
- 스레드를 요청만큼 생성하기 때문에 좋지 않은 방법이다.
- 총 걸리는 시간이 5초가 걸리지만, 원래는 2.N 초가 걸려야한다(톰캣의 기본 스레드 요청 처리량이 200이기 때문)
- 총 스레드가 1개여도 걸리는 시간은 차이가 안난다(why? 비동기적으로 스레드를 실행시키기 때문에)
스프링에서의 비동기2( DeferredResult를 반환하여 대기, Event 전달 -> 실행)
@GetMapping("/dr")
public DeferredResult<String> dr() throws InterruptedException{
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>();
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drCount(){
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drEvent(String msg){
for(DeferredResult<String> dr : results){
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}

- 요청이 들어오면, DeferredResult 큐에 담아둔 뒤, 외부에 이벤트가 발생하면 스레드를 할당 받아 처리해준다(지연 처리)
- DeferredResult타입을 담은 큐가 이벤트를 기다리고 있다가 이벤트가 발생하면 큐에 대기중인 결과를 출력하고 큐를 비어준다
- DefaultResult 큐에 들어간 데이터 DefaultResult는 setResult()나 예외가 발생하게 되면 이벤트를 발생시킨다.
- 서블릿 스레드를 최소화하여 많은 응답을 처리해 줄수 있다.
스프링에서의 비동기3( Emitter를 통한 Stream한 데이터 전달)
@GetMapping("/emitter")
public ResponseBodyEmitter emmiter(){
//데이터를 여러번에 나눠서 보낼수 있다.
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
try {
for(int i=1; i<=50; ++i) {
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(2000);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return emitter;
}

- http 에서 결과를 모아서 한번에 전달하는 것이 아닌 스트리밍 방식으로 데이터를 응답할 수 있다.
- 스프링을 이용하면 http에서 따로 Streame 처리를 하지 않더라도 간편하게 할 수 있다.
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
|---|---|
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| Reactive Streams - Schedulers (0) | 2022.08.22 |
| Reactive Streams - Operators (0) | 2022.07.17 |
| Reactive Streams (0) | 2022.07.16 |