Toby의 ReactiveProgramming

AsyncRestTemplate의 콜백 헬과 중복 작업 문제

webmaster 2022. 8. 26. 14:44
728x90

Refectoring: CallBack 지옥에서 벗어나는 코드 Version 1

코드를 자세히 보면, 결과를 리턴으로 받는 것이 아닌 콜백 형식으로 받게 되니, Callback 지옥이 발생하게 된다.

에러를 처리해주는 코드가 매번 중복되는 문제도 계속 발생한다.

@SpringBootApplication
public class Chapter10Application {

  @RestController
  public static class MyController {

    public static final String URL1 = "http://localhost:8081/service1?req={req}";
    public static final String URL2 = "http://localhost:8081/service2?req={req}";

    AsyncRestTemplate rt = new AsyncRestTemplate(
        new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;

    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
      DeferredResult<String> dr = new DeferredResult<>();
      Completion
          .from(rt.getForEntity(URL1, String.class, "h" + idx))//비동기 결과를 가지고 있는 오브젝트
          .andAccept(s -> dr.setResult(s.getBody())) //앞에서 전달받은 결과를 람다식안으로 넘겨주는 역할을 하고 싶다
      ;
      return dr;
    }
  }

  public static class Completion {

    private Consumer<ResponseEntity<String>> con;
    private Completion next;

    public Completion() {
    }

    public Completion(Consumer<ResponseEntity<String>> con) {
      this.con = con;
    }

    //비동기 작업을 결과를 담는 클래스
    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
      //생성하는 static factory 메서드
      Completion completion = new Completion();
      lf.addCallback(s -> {
        //성공
        completion.complete(s);
      }, e -> {
        completion.error(e);
      });
      return completion;
    }

    //성공을 처리하는 메서드
    private void complete(ResponseEntity<String> s) {
      if(next != null){
        next.run(s);
      }
    }

    private void run(ResponseEntity<String> value) {
      if(con != null){
        con.accept(value);
      }
    }

    //에러를 처리하는 메서드
    private void error(Throwable e) {
    }

    public void andAccept(Consumer<ResponseEntity<String>> con) {
      Completion completion = new Completion(con);
      this.next = completion;

    }
  }

  @Service
  public static class MyService {

    @Async
    public ListenableFuture<String> work(String req) {
      return new AsyncResult<>(req + "/asyncwork");
    }
  }

  @Bean
  public ThreadPoolExecutor myThreadPool() {
    ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
    return te;
  }

  public static void main(String[] args) {
    SpringApplication.run(Chapter10Application.class, args);
  }

}
  • 비동기 작업 하나에 대한 결과를, 클래스로 받아 처리하는 작업만 진행하였다.
  • Completion 같은 경우, 내부에 연산을 진행한 결과인 next와, 실행되어야 할 람다 함수인 con을 가지고 있다.
    • from 메서드는 비동기를 진행한 결과를 파라미터로 받아, Completion을 생성하고, 콜백과, 에러를 처리해주는 책임을 가진다.
    • andAccept 메서드는 내부에서 람다 연산을 파라미터로 받는 Completion을 생성한 뒤, 그 Completion을 next에 담아둔다.
    • andAccept 메서드는 결과를 받아 람다를 실행하고, 종료하는 역할만 하는 것이다.

Refectoring: CallBack 지옥에서 벗어나는 코드 Version2

비동기 호출을 결과로 받은 결과를 람다를 실행 후, 리턴 값을 돌려받는 코드를 작성해보자(f2)

 @RestController
public static class MyController {
//..

    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
      DeferredResult<String> dr = new DeferredResult<>();
      Completion
          .from(rt.getForEntity(URL1, String.class, "h" + idx))
          .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) // 해당 함수는 람다를 실행한 결과를 반환해 주어야한다.
          .andAccept(s -> dr.setResult(s.getBody())) 
      ;
      return dr;
    }
}

public static class Completion {
    //..
    private Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;

    public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
      this.fn = fn;
    }

    private void run(ResponseEntity<String> value) {
      if(con != null){
        con.accept(value);
      } else if (fn != null) { //fn이 존재 할 경우 다음 단계로 넘어가야한다.
        ListenableFuture<ResponseEntity<String>> listenableFuture = fn.apply(value); //앞에 작업 종료
        listenableFuture.addCallback(
            s -> complete(s), e -> error(e)
        ); //콜백을 가지고 연결을 해야한다.
      }
    }
    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
      Completion completion = new Completion(fn);
      this.next = completion;
      return completion;
    }

    //..
}
  • Completion 클래스의 andApply() 메서드는 람다를 실행한 뒤, 그 결과를 반환해야 한다.
    • 결과로 반환된 타입이 Completion 타입 이어야 한다(비동기 실행 chain이 계속되어야 한다)
  • andApply 메서드에서 Function(함수를 인자 1, 리턴 가능) 형 클래스를 파라미터로 받는 Completion을 생성한 뒤, next에 담고, 생성한 Completion을 반환한다.
    • 이제 run 메서드를 비동기적으로 실행이 될 때, fn 프로퍼티에 값이 비어있지 않을 경우, 람다를 생성한 실행한 뒤, 해당 결과에 대한 콜백 분기를 처리해 줘야 한다
    • 이때 콜백 분기는 이전에 from에서 했던 동작과 동일하다.

Run 코드를 보면 if를 통해 분기 처리에 따른 여러 호출을 하고 있지만, 리펙토링이 필요해 보인다

Version2 Refectoring

public static class Completion {

  private Completion next;
  
  public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
    Completion completion = new Completion();
    lf.addCallback(s -> {
      completion.complete(s);
    }, e -> {
      completion.error(e);
    });
    return completion;
  }

  protected void complete(ResponseEntity<String> s) {
    if(next != null){
      next.run(s);
    }
  }

  protected void run(ResponseEntity<String> value) {

  }
  
  protected void error(Throwable e) {
  }

  public void andAccept(Consumer<ResponseEntity<String>> con) {
    Completion completion = new AcceptCompletion(con);
    this.next = completion;
  }

  public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
    Completion completion = new ApplyCompletion(fn);
    this.next = completion;
    return completion;
  }
}

public static class AcceptCompletion extends Completion{
  private Consumer<ResponseEntity<String>> con;
  public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
    this.con = con;
  }

  @Override
  protected void run(ResponseEntity<String> value) {
    con.accept(value);
  }
}
public static class ApplyCompletion extends Completion{
  private Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
  public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
    this.fn = fn;
  }
  @Override
  protected void run(ResponseEntity<String> value) {
    ListenableFuture<ResponseEntity<String>> listenableFuture = fn.apply(value); //앞에 작업 종료
    listenableFuture.addCallback(
        s -> complete(s), e -> error(e)
    );
  }
}
  • Completion을 상속받는 AcceptCompletion과, ApplyComletion을 만든다.
    • AcceptCompletion 같은 경우 Consumer 함수를 인자로 받으며, run 메서드를 오버 라이딩하여 accept를 호출한다.
    • ApplyComletion 같은 경우 Fuction 함수를 인자로 받으며, run 메서드를 오버라이딩 하여 function 실행 결과를 콜백으로 호출한다.
  • Completion의 Run 메서드는 더 이상 분기 처리할 필요 없다
    • andAccept 호출 시에는 AcceptCompletion을 생성하고, andApply 호출시에는 ApplyCompletion을 생성하도록 코드를 리펙토링 하였다.
    • 따라서, run 메서드를 호출하게 되면, 오버 라이딩된 메서드가 호출이 된다.

예외 처리를 추가한 코드

 @RestController
public static class MyController {
	//..
    
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
      DeferredResult<String> dr = new DeferredResult<>();
      Completion
          .from(rt.getForEntity(URL1, String.class, "h" + idx))
          .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) 
          .andError(e -> dr.setErrorResult(e.toString())) //.andError 가 호출이 되면, 더이상 뒤에 코드가 동작하지 않고 종료하고 싶다.
          .andAccept(s -> dr.setResult( s.getBody()))
      ;
      return dr;
    }
  }

  public static class Completion {

    protected Completion next;

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
      Completion completion = new Completion();
      lf.addCallback(s -> {
        //성공
        completion.complete(s);
      }, e -> {
        completion.error(e);
      });
      return completion;
    }

    //에러를 처리하는 메서드
    protected void error(Throwable e) {
      if (next != null) {
        next.error(e);
      }
    }

    public Completion andError(Consumer<Throwable> econ) {
      Completion completion = new ErrorCompletion(econ);
      this.next = completion;
      return completion;
    }
}

public static class ErrorCompletion extends Completion {
    private Consumer<Throwable> econ;

    public ErrorCompletion(Consumer<Throwable> econ) {
      this.econ = econ;
    }

    @Override
    protected void run(ResponseEntity<String> value) {
      if (next != null) {
        next.run(value);
      }
    }

    @Override
    protected void error(Throwable e) {
      econ.accept(e); //다음으로 패싱한다.
    }
}
  • . andError를 추가하여, 만약 예외가 발생하지 않았을 경우에는 아래 코드가 계속 실행되고, 만약 예외가 발생하였다면 더 이상 실행시키면 안 된다.
    • andError 메서드에서는 ErrorCompletion을 생성하고, next에 생성한 Completion을 담고, 반환한다.
  • ErrorCompletion 은 생성자로 예외를 받으며, run 메서드를 오버 라이딩하여, next가 null이 아니면 next.run을 실행하도록 한다.
    • error 메서드도 오버 라이딩을 해야 하는데, 해당 비동기 함수를 실행하다가 오류가 발생하면, 더 이상 코드를 실행시키지 않고, 람다의 dr.setErrorResult(e)를 실행해준다.

두 가지 경우를 실행해봐야 한다 (1. 정상 케이스 2. andError에서 오류 발생)

@RestController
public static class MyController {

  @GetMapping("/service1")
  public String service1(String req) throws InterruptedException {
    Thread.sleep(2000);
    throw new RuntimeException();
    //return req + "/service1";
  }

}
  • URL1에서 반드시 오류가 발생하도록 코드를 변경했다(2번째 케이스)
  • 에러가 발생되면 밑에 코드가 더 이상 실행되지 않는 것을 알 수 있다.

andApply의 제네릭

@SpringBootApplication
public class Chapter10Application {

  @RestController
  public static class MyController {

    public static final String URL1 = "http://localhost:8081/service1?req={req}";
    public static final String URL2 = "http://localhost:8081/service2?req={req}";

    AsyncRestTemplate rt = new AsyncRestTemplate(
        new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;

    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
      DeferredResult<String> dr = new DeferredResult<>();
      Completion
          .from(rt.getForEntity(URL1, String.class, "h" + idx))//비동기 결과를 가지고 있는 오브젝트
          .andApply(s -> rt.getForEntity(URL2, String.class,
              s.getBody())) // 해당 함수는 람다를 실행한 결과를 반환해 주어야한다.
          .andApply(s -> myService.work(
              s.getBody())) //반환타입이 다르기 때문에 오류가 발생한다(구조는 같다) -> 오버로딩을 작성해도 되지만 별로 않좋다(제네릭으로)
          .andError(e -> dr.setErrorResult(
              e.toString())) //.andError 가 호출이 되면, 더이상 뒤에 코드가 동작하지 않고 종료하고 싶다.
          .andAccept(s -> dr.setResult(s)) //앞에서 전달받은 결과를 람다식안으로 넘겨주는 역할을 하고 싶다 //리턴값을 이용하지 않기때문에 실행만 하면된다
      ;
      return dr;
    }
  }

  public static class Completion<S, T> {//andApply를 적용하면 타입이 2가지를 받아야한다

    protected Completion next;

    public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
      Completion<S, T> completion = new Completion<>();
      lf.addCallback(s -> {
        //성공
        completion.complete(s);
      }, e -> {
        completion.error(e);
      });
      return completion;
    }

	protected void complete(T s) {
      if (next != null) {
        next.run(s);
      }
    }

    protected void run(S value) {

    }

    protected void error(Throwable e) {
      if (next != null) {
        next.error(e);
      }
    }

    public void andAccept(Consumer<T> con) {
      Completion<T, Void> completion = new AcceptCompletion<>(con);
      this.next = completion;
    }

    public <V> Completion<T, V> andApply(
        Function<T, ListenableFuture<V>> fn) { //현재 생성이 되는 결과 값이라 T타입
      Completion<T, V> completion = new ApplyCompletion<>(fn);
      this.next = completion;
      return completion;
    }

    public Completion<T, T> andError(Consumer<Throwable> econ) {
      Completion<T, T> completion = new ErrorCompletion<>(econ);
      this.next = completion;
      return completion;
    }
  }

  public static class AcceptCompletion<S> extends Completion<S, Void> {

    private Consumer<S> con;

    public AcceptCompletion(Consumer<S> con) {
      this.con = con;
    }

    @Override
    protected void run(S value) {
      con.accept(value);
    }
  }

  public static class ApplyCompletion<S, T> extends Completion<S, T> {

    private Function<S, ListenableFuture<T>> fn;

    public ApplyCompletion(
        Function<S, ListenableFuture<T>> fn) {
      this.fn = fn;
    }

    @Override
    protected void run(S value) {
      ListenableFuture<T> listenableFuture = fn.apply(value);
      listenableFuture.addCallback(
          s -> complete(s), e -> error(e)
      );
    }
  }

  public static class ErrorCompletion<T> extends Completion<T, T> {

    private Consumer<Throwable> econ;

    public ErrorCompletion(Consumer<Throwable> econ) {
      this.econ = econ;
    }

    @Override
    protected void run(T value) {
      if (next != null) {
        next.run(value);
      }
    }

    @Override
    protected void error(Throwable e) {
      econ.accept(e); 
    }
  }

  @Service
  public static class MyService {

    @Async
    public ListenableFuture<String> work(String req) {
      return new AsyncResult<>(req + "/asyncwork");
    }
  }

  @Bean
  public ThreadPoolExecutor myThreadPool() {
    ThreadPoolExecutor te = new ThreadPoolExecutor(10, 100, 100,
        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
    return te;
  }

  public static void main(String[] args) {
    SpringApplication.run(Chapter10Application.class, args);
  }

}
  • 제네릭을 적용하여, 비동기 호출 코드의 반환 값에 어떤 타입이라도 다 받을 수 있도록 했다.
  • Completion 같은 경우,. andApply에서 받을 수 있는 최대 타입이 2개밖에 없으므로 <S, T>로 선언했다.
  • 이를 상속하는 하위 클래스 같은 경우에도 타입을 주어서 정의하였다
  • 제네릭을 주는 것은 굉장히 복잡하기 때문에 코드를 한번 더 분석하고, 살펴보는 것이 중요할 것으로 보인다.
  • Completion에 next 프로퍼티 같은 경우 어떤 타입을 주어서 선언해야 할지 모르기 때문에 작성하지 않았다
    • 어차피 내부적인 메서드만 사용하기 때문에 타입에 대해 작성하지 않아도 컴파일 시 문제는 없다.
728x90

'Toby의 ReactiveProgramming' 카테고리의 다른 글

WebFlux  (0) 2022.08.28
CompletableFuture  (0) 2022.08.27
비동기 RestTemplate과 비동기 MVC/Servlet  (0) 2022.08.26
자바와 스프링의 비동기 기술  (0) 2022.08.22
Reactive Streams - Schedulers  (0) 2022.08.22