Toby의 ReactiveProgramming

Reactive Streams - Schedulers

webmaster 2022. 8. 22. 17:55
728x90
@Slf4j
public class SchedulerEx {

  public static void main(String[] args) {
    //모두 main 쓰레드에서 동작한다
    Publisher<Integer> pub = sub -> {
      sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
          sub.onNext(1);
          sub.onNext(2);
          sub.onNext(3);
          sub.onNext(4);
          sub.onNext(5);
          sub.onComplete();
        }

        @Override
        public void cancel() {

        }
      });
    };
    pub.subscribe(new Subscriber<Integer>() {
      @Override
      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer integer) {
        log.debug("onNext:{}", integer);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    });

  }

}

main 스레드가 모든 요청을 받는다(모든 코드가 main 스레드 위에서 동작)

  • 현재는 main 스레드가 모든 요청을 blocking 하고 있다.
  • main 스레드가 모든 요청을 처리를 동작시키게 코드를 작성하면, 오류나 장애가 발생하면 main 스레드가 잘못되기 때문에 이렇게 하면 안 된다.
    • main 스레드가 request에 대한 응답을 받기 전까지 계속 기다리기 때문에 오류가 발생 시 멈춘다.
    • 별도의 스레드로 만들고 동작시켜야 한다(스케줄러를 활용한다)

스케줄러의 종류

SubscribeOn

subscribeOn : subScribeOn에 작성한 스레드를 위에서 해당 동작을 실행하도록 해준다.

@Slf4j
public class SchedulerEx {

  public static void main(String[] args) {
    //모두 main 쓰레드에서 동작한다
    Publisher<Integer> pub = sub -> {
      sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
          log.debug("request()");
          sub.onNext(1);
          sub.onNext(2);
          sub.onNext(3);
          sub.onNext(4);
          sub.onNext(5);
          sub.onComplete();
        }

        @Override
        public void cancel() {

        }
      });
    };
    //pub

    Publisher<Integer> subOnPub = sub -> {
      ExecutorService es = Executors.newSingleThreadExecutor(); //동시에 하나 이상의 스레드를 주지 않는다.
      es.execute(() -> pub.subscribe(sub)); //메인 스레드가 아닌 다른 스레드에서 실행시켜준다.
    };

    //sub
    subOnPub.subscribe(new Subscriber<Integer>() {
      @Override
      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer integer) {
        log.debug("onNext:{}", integer);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    });
    System.out.println("exit");
  }

}

SubscribeOn
SubscribeOn 결과 (다른 쓰레드에 동작을 위임)

  • Publisher 가 느리고, 이를 호출하는 곳이(Subscribe)가 빠를 경우 사용해야 한다.

publishOn

@Slf4j
public class SchedulerEx {

  public static void main(String[] args) {
    Publisher<Integer> pub = sub -> {
      sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
          log.debug("request()");
          sub.onNext(1);
          sub.onNext(2);
          sub.onNext(3);
          sub.onNext(4);
          sub.onNext(5);
          sub.onComplete();
        }

        @Override
        public void cancel() {

        }
      });
    };
 
    Publisher<Integer> pubOnPub = sub -> {
      pub.subscribe(new Subscriber<Integer>() {
        ExecutorService es = Executors.newSingleThreadExecutor();

        @Override
        public void onSubscribe(Subscription s) {
          sub.onSubscribe(s);
        }

        @Override
        public void onNext(Integer integer) {
          es.execute(() -> sub.onNext(integer));
        }

        @Override
        public void onError(Throwable t) {
          es.execute(() -> sub.onError(t));
        }

        @Override
        public void onComplete() {
          es.execute(() -> sub.onComplete());
        }
      });
    };

    //sub
    pubOnPub.subscribe(new Subscriber<Integer>() {
      @Override
      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer integer) {
        log.debug("onNext:{}", integer);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    });
    System.out.println("exit");
  }

}

publishOn
publishOn

  • publish는 빠르게 동작하지만, 데이터를 받아서 처리하는 부분(Subscribe)은 느리게 처리해도 될 때 사용한다.
  • PublishOn에 들어온 요청이 한 번에 처리되는 것이 아니다(동시성을 지켜준다)
    • 하나의 Publisher가 여러 스레드를 생성해서 실행시킬 때에도 순서가 지켜진다.

2개의 스케줄러를 모두 적용했을 경우

@Slf4j
public class SchedulerEx {

  public static void main(String[] args) {
    Publisher<Integer> pub = sub -> {
      sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
          log.debug("request()");
          sub.onNext(1);
          sub.onNext(2);
          sub.onNext(3);
          sub.onNext(4);
          sub.onNext(5);
          sub.onComplete();
        }

        @Override
        public void cancel() {

        }
      });
    };
    //pub

    Publisher<Integer> subOnPub = sub -> {
      ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
        //스레드의 이름을 바꿔주기 위해 스프링에서 제공하는 CustomizableThreadFactory를 사용한다
        @Override
        public String getThreadNamePrefix() {
          return "subOn-";
        }
      }); 
      es.execute(() -> pub.subscribe(sub));
    };

    Publisher<Integer> pubOnPub = sub -> {
      subOnPub.subscribe(new Subscriber<Integer>() {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
          @Override
          public String getThreadNamePrefix() {
            return "pubOn-";
          }
        });
        
        @Override
        public void onSubscribe(Subscription s) {
          sub.onSubscribe(s);
        }

        @Override
        public void onNext(Integer integer) {
          es.execute(() -> sub.onNext(integer));
        }

        @Override
        public void onError(Throwable t) {
          es.execute(() -> sub.onError(t));
        }

        @Override
        public void onComplete() {
          es.execute(() -> sub.onComplete());
        }
      });
    };

    //sub
    pubOnPub.subscribe(new Subscriber<Integer>() {
      @Override
      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer integer) {
        log.debug("onNext:{}", integer);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    });
    System.out.println("exit");
  }

}

SubscribeOn, PublishOn 연산을 둘다 적용

  • 스레드의 이름을 주기 위해 new CustomizableThreadFactory()를 (스프링에서 제공) 구현한다.

Flux를 사용하여 작성하기

public class FluxScEx {
  public static void main(String[] args) {
    Flux.range(1, 10)//1~10
        .publishOn(Schedulers.newSingle("pub")) //소비하는 쪽 속도가 느릴 경우 사용
        .log() //위쪽의 publisher 로 부터 받아오는 데이터를 볼 수 있다
        .subscribeOn(Schedulers.newSingle("sub"))
        .subscribe(System.out::println); //출력

    System.out.println("exit");
  }
}
  • 매우 간단하게 publishOn, subscribeOn를 실행 가능하다.
  • log()를 사용하게 되면 Publisher로부터 받아오는 데이터를 출력해서 보여줄 수 있다.

스레드가 종료되지 않고, 계속 실행돼서 동작하고 있는데, Publisher나 Flux는 서버 환경에서 동작하게 되며, 서버처럼 계속 실행되어 요청을 받게 되어 있는데 이를 종료하는 코드가 없어서 계속 동작한다(종료하도록 코드를 작성하자)

SchedulerEx 리펙토링

//...
Publisher<Integer> pubOnPub = sub -> {
  pub.subscribe(new Subscriber<Integer>() {
    ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
      @Override
      public String getThreadNamePrefix() {
        return "pubOn-";
      }
    });
    //...
    @Override
    public void onError(Throwable t) {
      es.execute(() -> sub.onError(t));
      es.shutdown(); //더이상 구독이 유용하지 않으니 종료
    }

    @Override
    public void onComplete() {
      es.execute(() -> sub.onComplete());
      es.shutdown(); //더이상 구독이 유용하지 않으니 종료
    }
    //...
  });
};
//...
  • onError, onComplete 이벤트가 동작할 때에는 더 이상 구독이 유용하지 않으니 shutdown을 실행하면 된다.

Main 스레드 - User 스레드

public static void main(String[] args) throws InterruptedException {
  //User가 만든 스레드는 메인스레드가 종료가 되어도 종료되지 않는다
  Executors.newSingleThreadExecutor().execute(() -> {
    try {
      TimeUnit.SECONDS.sleep(2);
    }catch (InterruptedException e){

    }
    System.out.println("Hello");
  });
  System.out.println("exit");
  
}
  • 해당 예제를 실행해 보면, Main 스레드가 먼저 종료 후, User 스레드가 종료가 되는데 문제없이 잘 실행된다.
  • JVM이 User스레드가 내려가지 않는 이상 내부적으로 종료하지 않는다는 것을 알 수 있다.

그렇다면, 아래와 같은 코드는 왜 동작하지 않을까?

  public static void main(String[] args) throws InterruptedException {
    //Subscribe를 걸지 않았지만 자동으로 걸어주는 메서드가 있다.
    //대표적으로 interval
    Flux.interval(Duration.ofMillis(500))
    	//.take(10) //원하는 갯수만큼 데이터를 자르고 싶을 경우 해당 메서드를 사용하면 된다.
        .subscribe(s -> log.debug("onNext:{}", s)); //아무것도 실행이 되지 않는다.

    log.debug("exit");
    TimeUnit.SECONDS.sleep(5);
  }
  • TimeUnit.SECONDS.sleep()을 주지 않을 경우 아무것도 실행되지 않고 종료되는 것을 볼 수 있다.
    • StackOverFlow와 같은 곳에 해당 문제를 검색하게 되면, TimeUnit.SECONDS.sleep() 같은 해결책을 제시한다.
    • 위 예제에서 보았듯이 Main 스레드가 먼저 종료돼서 실행되지 않은 것은 아니다 (그러면 무엇이 문제일까)
  • 스레드에는 demon, user 스레드가 있다. 위 코드는 데몬스레드로 만들어 두기 때문에 종료하는 것이다.
    • 메인 스레드는 User 스레드가 있다면 종료하지 않지만, 데몬 스레드가 남아있을 경우에는 그냥 종료해 버린다.
  • 추가적으로 take라는 메서드를 통해 가지고 오고 싶은 데이터 개수만큼 제약을 걸 수도 있다.

take 구현하기

@Slf4j
public class IntervalEx {

  public static void main(String[] args) {
    Publisher<Integer> pub = sub -> {
      sub.onSubscribe(new Subscription() {
        int no = 0;
        boolean cancelled = false;

        @Override
        public void request(long n) {
          ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
          exec.scheduleAtFixedRate(() -> {
            if(cancelled){
              exec.shutdown(); //더이상 실행시키지 않는다.
              return;
            }
            sub.onNext(no++); //프로세스가 종료되기 전까지 계속 데이터를 전송
          }, 0, 300, TimeUnit.MILLISECONDS);
        }

        @Override
        public void cancel() {
          cancelled = true;
        }
      });
    };

    Publisher<Integer> takePub = sub -> {
      pub.subscribe(new Subscriber<Integer>() {
        int count = 0;
        Subscription subsc;
        @Override
        public void onSubscribe(Subscription s) {
          subsc = s;
          sub.onSubscribe(s);
        }

        @Override
        public void onNext(Integer integer) {
          sub.onNext(integer);
          if(++count > 10){ //10개 이상이면 실행을 안시킬 것이다
            subsc.cancel();//cancel를 동작시켜 준다.
            //데이터를 더이상 필요로 하지 않기 떄문에 cancel을 시키는 것이다
          }
        }

        @Override
        public void onError(Throwable t) {
          sub.onError(t);
        }

        @Override
        public void onComplete() {
          sub.onComplete();
        }
      });
    };

    takePub.subscribe(new Subscriber<>() {
      @Override
      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer integer) {
        log.debug("onNext:{}", integer);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    });
  }
}

결과

  • take 같은 경우 데이터를 더 이상 받지 않는 것이기 때문에, cancel 이벤트로 동작을 시켜야 한다.
  • scheduleAtFixedRate 함수를 쓰면 intenal처럼 원하는 시간마다 이벤트를 동작시킬 수도 있다.
    • 현재는 300ms 마다 no++를 실행시키며, onNext로 데이터를 전달하고 있다.
  • takePub에서는 받은 데이터를 가지고 count값을 증가시키다가, count값이 10 이상이 되면 저장해둔 sub객체의 cancel 이벤트를 동작시킨다.
    • onSubscribe를 통해 sub 객체를 저장해 두고 있는다.
    • 저장해둔 sub 객체를 cancel 이벤트가 동작이 되면, cancelled 가 true가 되고, request에서 cancelled가 true이기 때문에 스레드를 종료시킨다.
728x90