728x90
Refectoring: CallBack 지옥에서 벗어나는 코드 Version 1
코드를 자세히 보면, 결과를 리턴으로 받는 것이 아닌 콜백 형식으로 받게 되니, Callback 지옥이 발생하게 된다.
에러를 처리해주는 코드가 매번 중복되는 문제도 계속 발생한다.
@SpringBootApplication
public class Chapter10Application {
@RestController
public static class MyController {
public static final String URL1 = "http://localhost:8081/service1?req={req}";
public static final String URL2 = "http://localhost:8081/service2?req={req}";
AsyncRestTemplate rt = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "h" + idx))//비동기 결과를 가지고 있는 오브젝트
.andAccept(s -> dr.setResult(s.getBody())) //앞에서 전달받은 결과를 람다식안으로 넘겨주는 역할을 하고 싶다
;
return dr;
}
}
public static class Completion {
private Consumer<ResponseEntity<String>> con;
private Completion next;
public Completion() {
}
public Completion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
//비동기 작업을 결과를 담는 클래스
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
//생성하는 static factory 메서드
Completion completion = new Completion();
lf.addCallback(s -> {
//성공
completion.complete(s);
}, e -> {
completion.error(e);
});
return completion;
}
//성공을 처리하는 메서드
private void complete(ResponseEntity<String> s) {
if(next != null){
next.run(s);
}
}
private void run(ResponseEntity<String> value) {
if(con != null){
con.accept(value);
}
}
//에러를 처리하는 메서드
private void error(Throwable e) {
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion completion = new Completion(con);
this.next = completion;
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolExecutor myThreadPool() {
ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
return te;
}
public static void main(String[] args) {
SpringApplication.run(Chapter10Application.class, args);
}
}
- 비동기 작업 하나에 대한 결과를, 클래스로 받아 처리하는 작업만 진행하였다.
- Completion 같은 경우, 내부에 연산을 진행한 결과인 next와, 실행되어야 할 람다 함수인 con을 가지고 있다.
- from 메서드는 비동기를 진행한 결과를 파라미터로 받아, Completion을 생성하고, 콜백과, 에러를 처리해주는 책임을 가진다.
- andAccept 메서드는 내부에서 람다 연산을 파라미터로 받는 Completion을 생성한 뒤, 그 Completion을 next에 담아둔다.
- andAccept 메서드는 결과를 받아 람다를 실행하고, 종료하는 역할만 하는 것이다.
Refectoring: CallBack 지옥에서 벗어나는 코드 Version2
비동기 호출을 결과로 받은 결과를 람다를 실행 후, 리턴 값을 돌려받는 코드를 작성해보자(f2)
@RestController
public static class MyController {
//..
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "h" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) // 해당 함수는 람다를 실행한 결과를 반환해 주어야한다.
.andAccept(s -> dr.setResult(s.getBody()))
;
return dr;
}
}
public static class Completion {
//..
private Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}
private void run(ResponseEntity<String> value) {
if(con != null){
con.accept(value);
} else if (fn != null) { //fn이 존재 할 경우 다음 단계로 넘어가야한다.
ListenableFuture<ResponseEntity<String>> listenableFuture = fn.apply(value); //앞에 작업 종료
listenableFuture.addCallback(
s -> complete(s), e -> error(e)
); //콜백을 가지고 연결을 해야한다.
}
}
public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion completion = new Completion(fn);
this.next = completion;
return completion;
}
//..
}
- Completion 클래스의 andApply() 메서드는 람다를 실행한 뒤, 그 결과를 반환해야 한다.
- 결과로 반환된 타입이 Completion 타입 이어야 한다(비동기 실행 chain이 계속되어야 한다)
- andApply 메서드에서 Function(함수를 인자 1, 리턴 가능) 형 클래스를 파라미터로 받는 Completion을 생성한 뒤, next에 담고, 생성한 Completion을 반환한다.
- 이제 run 메서드를 비동기적으로 실행이 될 때, fn 프로퍼티에 값이 비어있지 않을 경우, 람다를 생성한 실행한 뒤, 해당 결과에 대한 콜백 분기를 처리해 줘야 한다
- 이때 콜백 분기는 이전에 from에서 했던 동작과 동일하다.
Run 코드를 보면 if를 통해 분기 처리에 따른 여러 호출을 하고 있지만, 리펙토링이 필요해 보인다
Version2 Refectoring
public static class Completion {
private Completion next;
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion completion = new Completion();
lf.addCallback(s -> {
completion.complete(s);
}, e -> {
completion.error(e);
});
return completion;
}
protected void complete(ResponseEntity<String> s) {
if(next != null){
next.run(s);
}
}
protected void run(ResponseEntity<String> value) {
}
protected void error(Throwable e) {
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion completion = new AcceptCompletion(con);
this.next = completion;
}
public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion completion = new ApplyCompletion(fn);
this.next = completion;
return completion;
}
}
public static class AcceptCompletion extends Completion{
private Consumer<ResponseEntity<String>> con;
public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
@Override
protected void run(ResponseEntity<String> value) {
con.accept(value);
}
}
public static class ApplyCompletion extends Completion{
private Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}
@Override
protected void run(ResponseEntity<String> value) {
ListenableFuture<ResponseEntity<String>> listenableFuture = fn.apply(value); //앞에 작업 종료
listenableFuture.addCallback(
s -> complete(s), e -> error(e)
);
}
}
- Completion을 상속받는 AcceptCompletion과, ApplyComletion을 만든다.
- AcceptCompletion 같은 경우 Consumer 함수를 인자로 받으며, run 메서드를 오버 라이딩하여 accept를 호출한다.
- ApplyComletion 같은 경우 Fuction 함수를 인자로 받으며, run 메서드를 오버라이딩 하여 function 실행 결과를 콜백으로 호출한다.
- Completion의 Run 메서드는 더 이상 분기 처리할 필요 없다
- andAccept 호출 시에는 AcceptCompletion을 생성하고, andApply 호출시에는 ApplyCompletion을 생성하도록 코드를 리펙토링 하였다.
- 따라서, run 메서드를 호출하게 되면, 오버 라이딩된 메서드가 호출이 된다.
예외 처리를 추가한 코드
@RestController
public static class MyController {
//..
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "h" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andError(e -> dr.setErrorResult(e.toString())) //.andError 가 호출이 되면, 더이상 뒤에 코드가 동작하지 않고 종료하고 싶다.
.andAccept(s -> dr.setResult( s.getBody()))
;
return dr;
}
}
public static class Completion {
protected Completion next;
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion completion = new Completion();
lf.addCallback(s -> {
//성공
completion.complete(s);
}, e -> {
completion.error(e);
});
return completion;
}
//에러를 처리하는 메서드
protected void error(Throwable e) {
if (next != null) {
next.error(e);
}
}
public Completion andError(Consumer<Throwable> econ) {
Completion completion = new ErrorCompletion(econ);
this.next = completion;
return completion;
}
}
public static class ErrorCompletion extends Completion {
private Consumer<Throwable> econ;
public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}
@Override
protected void run(ResponseEntity<String> value) {
if (next != null) {
next.run(value);
}
}
@Override
protected void error(Throwable e) {
econ.accept(e); //다음으로 패싱한다.
}
}
- . andError를 추가하여, 만약 예외가 발생하지 않았을 경우에는 아래 코드가 계속 실행되고, 만약 예외가 발생하였다면 더 이상 실행시키면 안 된다.
- andError 메서드에서는 ErrorCompletion을 생성하고, next에 생성한 Completion을 담고, 반환한다.
- ErrorCompletion 은 생성자로 예외를 받으며, run 메서드를 오버 라이딩하여, next가 null이 아니면 next.run을 실행하도록 한다.
- error 메서드도 오버 라이딩을 해야 하는데, 해당 비동기 함수를 실행하다가 오류가 발생하면, 더 이상 코드를 실행시키지 않고, 람다의 dr.setErrorResult(e)를 실행해준다.
두 가지 경우를 실행해봐야 한다 (1. 정상 케이스 2. andError에서 오류 발생)
@RestController
public static class MyController {
@GetMapping("/service1")
public String service1(String req) throws InterruptedException {
Thread.sleep(2000);
throw new RuntimeException();
//return req + "/service1";
}
}
- URL1에서 반드시 오류가 발생하도록 코드를 변경했다(2번째 케이스)
- 에러가 발생되면 밑에 코드가 더 이상 실행되지 않는 것을 알 수 있다.
andApply의 제네릭
@SpringBootApplication
public class Chapter10Application {
@RestController
public static class MyController {
public static final String URL1 = "http://localhost:8081/service1?req={req}";
public static final String URL2 = "http://localhost:8081/service2?req={req}";
AsyncRestTemplate rt = new AsyncRestTemplate(
new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "h" + idx))//비동기 결과를 가지고 있는 오브젝트
.andApply(s -> rt.getForEntity(URL2, String.class,
s.getBody())) // 해당 함수는 람다를 실행한 결과를 반환해 주어야한다.
.andApply(s -> myService.work(
s.getBody())) //반환타입이 다르기 때문에 오류가 발생한다(구조는 같다) -> 오버로딩을 작성해도 되지만 별로 않좋다(제네릭으로)
.andError(e -> dr.setErrorResult(
e.toString())) //.andError 가 호출이 되면, 더이상 뒤에 코드가 동작하지 않고 종료하고 싶다.
.andAccept(s -> dr.setResult(s)) //앞에서 전달받은 결과를 람다식안으로 넘겨주는 역할을 하고 싶다 //리턴값을 이용하지 않기때문에 실행만 하면된다
;
return dr;
}
}
public static class Completion<S, T> {//andApply를 적용하면 타입이 2가지를 받아야한다
protected Completion next;
public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
Completion<S, T> completion = new Completion<>();
lf.addCallback(s -> {
//성공
completion.complete(s);
}, e -> {
completion.error(e);
});
return completion;
}
protected void complete(T s) {
if (next != null) {
next.run(s);
}
}
protected void run(S value) {
}
protected void error(Throwable e) {
if (next != null) {
next.error(e);
}
}
public void andAccept(Consumer<T> con) {
Completion<T, Void> completion = new AcceptCompletion<>(con);
this.next = completion;
}
public <V> Completion<T, V> andApply(
Function<T, ListenableFuture<V>> fn) { //현재 생성이 되는 결과 값이라 T타입
Completion<T, V> completion = new ApplyCompletion<>(fn);
this.next = completion;
return completion;
}
public Completion<T, T> andError(Consumer<Throwable> econ) {
Completion<T, T> completion = new ErrorCompletion<>(econ);
this.next = completion;
return completion;
}
}
public static class AcceptCompletion<S> extends Completion<S, Void> {
private Consumer<S> con;
public AcceptCompletion(Consumer<S> con) {
this.con = con;
}
@Override
protected void run(S value) {
con.accept(value);
}
}
public static class ApplyCompletion<S, T> extends Completion<S, T> {
private Function<S, ListenableFuture<T>> fn;
public ApplyCompletion(
Function<S, ListenableFuture<T>> fn) {
this.fn = fn;
}
@Override
protected void run(S value) {
ListenableFuture<T> listenableFuture = fn.apply(value);
listenableFuture.addCallback(
s -> complete(s), e -> error(e)
);
}
}
public static class ErrorCompletion<T> extends Completion<T, T> {
private Consumer<Throwable> econ;
public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}
@Override
protected void run(T value) {
if (next != null) {
next.run(value);
}
}
@Override
protected void error(Throwable e) {
econ.accept(e);
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolExecutor myThreadPool() {
ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
return te;
}
public static void main(String[] args) {
SpringApplication.run(Chapter10Application.class, args);
}
}
- 제네릭을 적용하여, 비동기 호출 코드의 반환 값에 어떤 타입이라도 다 받을 수 있도록 했다.
- Completion 같은 경우,. andApply에서 받을 수 있는 최대 타입이 2개밖에 없으므로 <S, T>로 선언했다.
- 이를 상속하는 하위 클래스 같은 경우에도 타입을 주어서 정의하였다
- 제네릭을 주는 것은 굉장히 복잡하기 때문에 코드를 한번 더 분석하고, 살펴보는 것이 중요할 것으로 보인다.
- Completion에 next 프로퍼티 같은 경우 어떤 타입을 주어서 선언해야 할지 모르기 때문에 작성하지 않았다
- 어차피 내부적인 메서드만 사용하기 때문에 타입에 대해 작성하지 않아도 컴파일 시 문제는 없다.
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| WebFlux (0) | 2022.08.28 |
|---|---|
| CompletableFuture (0) | 2022.08.27 |
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |
| Reactive Streams - Schedulers (0) | 2022.08.22 |