728x90
Thread Pool Hell(스레드 풀 지옥)
LinkedIn의 Thread Pool



- 2~4시 사이에 요청이 급격히 많아졌고, 그에 따라 Latency도 급격히 증가한 것을 볼 수 있다
- Thread요청이 급격히 늘어나 Queue에 대기하고 있던 요청들도 늘게 되어, Latency가 길어진 것을 볼 수 있다.
- 왜 이렇게 Latency가 늘어났을까?
- 최신 BackEnd 구조를 보면, 외부 API 호출을 통한 요청으로 서로 간 통신을 한다
- 외부 API 호출을 통한 요청은 I/O 요청으로 Blocking 처리가 되고, 스레드가 Blocking 상태가 되어 놀고 있기 때문에 Latancy가 증가한 것이다.
- 스레드를 늘리는것은 근본적인 해결이 될 수 없다(스레드 개수를 늘리는 것은 한정되어 있으며, 오히려 잦은 Context-Switching으로 인한, 성능 하락이 야기될 수 있다)
- 특히 Blocking 상태가 되어있는 Thread는 즉시 Context-Switching 대상이 되기 때문에 해결책이 될 수 없다.
CyclicBarrier.await
@RestController //@ResponseBody + @Controller
public class MyController2 {
@GetMapping("/rest")
public String rest(int idx){
return "rest " + idx;
}
}
application.properties
server.tomcat.threads.max=1
spring.task.execution.pool.core-size= 1
@Slf4j
public class LoadTest {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/rest?idx={idx}";
//100개의 요청을 모아서 100개 생성후 한번에 돌리고 싶으면 어떻게 할까? 동기화
CyclicBarrier barrier = new CyclicBarrier(101);
for (int i = 0; i < 100; ++i) {
//es.execute(() -> {
es.submit(() -> {
int idx = counter.addAndGet(1);
//exception을 처리하는 코드가 람다식 안(Runnable)에 있다면, 메서드 밖으로 던질 수 없고 try-catch로 잡아야한다.
barrier.await(); //await를 만난 숫자가 CyclicBarrier에 파라미터로 들어온 숫자로 되게 되면, 아래 코드가 실행이 된다.
log.info("Thread {}", idx );
StopWatch sw = new StopWatch();
sw.start();
String res = rt.getForObject(url, String.class, idx);
sw.stop();
log.info("Elapsed {} {} {}", idx, sw.getTotalTimeMillis(), res);
return null;
});
}
barrier.await();
StopWatch main = new StopWatch();
main.start();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}
- CyclicBarrier를 선언하여, 스레드를 동기화 할 수 있다.
- CyclicBarrier 생성자로 넘긴 파라미터의 값 만큼 await를 호출하게 되면, 생성된 스레드가 그 순간 실행된다(동기화)
- 현재 101을 파라미터로 받아 for문에서 100번 호출후, for문 탈출 후 1번을 실행해, 그 순간 thread를 동기화시켜 스레드를 실행시키면서 mainStopWatch 시간을 잰다.
대기상태의 스레드가 어떤문제를 일으키는지 살펴보자
controller(외부 API 호출, Blocking이 된다)
@RestController
public class MyController2 {
RestTemplate rt = new RestTemplate();
@GetMapping("/rest")
public String rest(int idx) {
String res = rt.getForObject("http://localhost:8081/service?req={req}", String.class,
"hello" + idx);
return res;
}
}
외부 Service
@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service";
}
}
public static void main(String[] args) {
//SpringBoot를 띄울때 Properties 값을 동적으로 설정해서 띄울수 있다.
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.threads.max", "1000");
System.setProperty("spring.task.execution.pool.core-size", "1000");
SpringApplication.run(TobyReactiveProgrammingApplication.class, args);
}
}
- setProperty로 동적으로 프로퍼티 값을 전달해 줄 수 있다.
- 호출되는 API 는 service로 2초의 시간이 지난 뒤 반환한다.
- 해당 외부 WEB에서 처리하는 스레드 Pool은 여유 있다고 가정
LoadTest를 실행해보자(위와 동일)
@Slf4j
public class LoadTest {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
//...
String url = "http://localhost:8080/rest?idx={idx}";//100개의 요청을 모아서 100개 생성후 한번에 돌리고 싶으면 어떻게 할까? 동기화
for (int i = 0; i < 100; ++i) {
es.submit(() -> {
//...
String res = rt.getForObject(url, String.class, idx);
//...
});
}
//...
}
}
- 요청 100개를 전달하는데 요청에서 호출하는 외부 API가 한 개당 2초의 Blocking이 걸린다.
- RestTemplate의 getForObject메서드는 Blocking 메서드이다.
- 스레드 풀보다 많은 요청이 발생하게 되면, 스레드가 반환될 때까지 Blocking을 기다려 요청을 처리하지 못하고 있다(CPU는 놀고 있다)
- 해결방안은 API호출 하는 방법(getForObject)을 비동기로 호출하면 된다.
- Callable과 DefferedResult로는 해결하기 힘들다(callable 같은 경우 워킹 스레드가 늘어나고, DefferedResult처럼 이벤트가 발생하는 것이 아니기 때문에)
AsyncRestTemplate
AsyncRestTemplate rt = new AsyncRestTemplate();
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
"http://localhost:8081/service?req={req}", String.class,
"hello" + idx);
//스프링이 ListenableFuture<ResponseEntity<String>>을 반환하게 되면 callback을 알아서 등록하고, 비동기로 실행시켜준다
return res;
}
- AsyncRestTemplate을 사용하면 비동기적인 요청을 할 수 있다
- getForEntity 메서드는 ListenableFuture<ResponseEntity<String>>을 반환하게 되고, ListenableFuture<ResponseEntity<String>>를 스프링에서 반환하게 되면, 스프링이 자동으로 많은 일을 해준다.
- 스프링이 callback을 알아서 등록하고, 비동기로 실행시켜준다.
- 따라서 개발자는 따로 CallBack을 생성하지 않아도 된다.
- AsyncRestTemplate 를 쓰게 되면 사실 백그라운드에서 스레드를 요청 개수만큼 생성해서 하나의 스레드로 보내는 것처럼 해주는 것이기 때문에 효과적인 방법이 아니다(결국 백그라운드에 스레드가 100개 생성된다)
Netty 사용
AsyncRestTemplate rt = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
"http://localhost:8081/service?req={req}", String.class,
"hello" + idx);
return res;
}
- RestTemplate에서 지원하는 Netty 라이브러리(비동기 라이브러리)를 사용하여 작성해 보자
- 스레드가 거의 늘어나지 않고 비동기적인 처리가 가능한 것을 볼 수 있다.
- 스프링에서 Blocking을 호출하는 메서드가 많다고 하더라도, Blocking되지 않고 비동기적으로 호출할 수 있도록 도와주는 라이브러리이다.
데이터 가공
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res = rt.getForEntity(
"http://localhost:8081/service?req={req}", String.class,
"hello" + idx);
res.addCallback(s -> {
dr.setResult(
s.getBody()//원하는 결과값
+ "/work"
);
}, e->{
//비동기 작업을 콜백으로 처리할 때는 예외를 전파하면 안된다.
//순수하게 deferredResult를 통해 처리를 해야한다.
dr.setErrorResult(e.getMessage()); //이걸 받는 클라이언트에서 적절히 처리하면 된다.
});
return dr;
}
- 값을 받아 가공을 해야될 경우에는 callback 방식을 사용해서 처리해줘야 한다(get() 방식으로 써서 동기적으로 하면 X)
- 이때 사용되야 하는 것이 DeferredResult로 스프링에서 결괏값이 나오지 않았을 때는 DeferedResult값을 반환했다가 만약 값이 응답 왔다면 callback 함수에서 setResult로 이벤트를 동작시켜 DeferredResult에 스레드를 할당해서 반환시켜준다.
- callback 방식에서는 예외를 전파하면 안된다
- 어느 스레드가 받아 처리하는지 알 수 없기 때문에 callback을 호출하는 곳에서 예외를 처리해주어야 한다
Callback 지옥
여러 비동기 작업을 순차적으로 진행해야 된다면 어떻게 할까?
외부 API (service1, service2)
@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service1")
public String service1(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service1";
}
@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service2";
}
}
public static void main(String[] args) {
//SpringBoot를 띄울때 Properties 값을 동적으로 설정해서 띄울수 있다.
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.threads.max", "1000");
System.setProperty("spring.task.execution.pool.core-size", "1000");
SpringApplication.run(TobyReactiveProgrammingApplication.class, args);
}
}
@RestController
public class MyController2 {
AsyncRestTemplate rt = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
public static final String URL1 = "http://localhost:8081/service1?req={req}";
public static final String URL2 = "http://localhost:8081/service2?req={req}";
@Autowired
MyService myService;
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
ListenableFuture<ResponseEntity<String>> res1 = rt.getForEntity(URL1, String.class,"hello" + idx);
res1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> res2 = rt.getForEntity(URL2, String.class,s.getBody());
res2.addCallback(s2 -> {
ListenableFuture<String> res3 = myService.work(s2.getBody());
res3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
return dr;
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolExecutor myThreadPool() {
ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
return te;
}
}
- 외부 API 호출 2번 내부적 작업 1번 총 3번의 비동기 작업이 동작한다.
- 3개의 작업을 비동기로 실행시키기 때문에, 빠르게 실행이 가능하다.
- Backgroud에서 비동기적으로 데이터를 가지고 와서 조합하기 때문에 이를 컬렉션으로 저장해서 관리하는 코드를 작성하기는 어렵다.
- 문제를 해결하였지만, 콜백이 계속 연관되어 나오기 때문에 코드를 읽기 복잡하다.
- 병렬로 데이터를 조합하기도 어렵다.
- 매번 에러를 잡아주는 방식 e -> {...} 코드 또한 매번 나오기 때문에 중복 코드가 발생한다
DB를 사용하는 코드는 보통 JDBC를 사용하고, 이런 JDBC는 Blocking 통신을 하기 때문에, 비동기 통신은 불가능하다. -> 현재는 오라클이 뒤늦게 비동기 처리가 가능한 JDBC를 개발하고 있다.
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| CompletableFuture (0) | 2022.08.27 |
|---|---|
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |
| Reactive Streams - Schedulers (0) | 2022.08.22 |
| Reactive Streams - Operators (0) | 2022.07.17 |