728x90
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
//모두 main 쓰레드에서 동작한다
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
}

- 현재는 main 스레드가 모든 요청을 blocking 하고 있다.
- main 스레드가 모든 요청을 처리를 동작시키게 코드를 작성하면, 오류나 장애가 발생하면 main 스레드가 잘못되기 때문에 이렇게 하면 안 된다.
- main 스레드가 request에 대한 응답을 받기 전까지 계속 기다리기 때문에 오류가 발생 시 멈춘다.
- 별도의 스레드로 만들고 동작시켜야 한다(스케줄러를 활용한다)
스케줄러의 종류
SubscribeOn
subscribeOn : subScribeOn에 작성한 스레드를 위에서 해당 동작을 실행하도록 해준다.
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
//모두 main 쓰레드에서 동작한다
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
//pub
Publisher<Integer> subOnPub = sub -> {
ExecutorService es = Executors.newSingleThreadExecutor(); //동시에 하나 이상의 스레드를 주지 않는다.
es.execute(() -> pub.subscribe(sub)); //메인 스레드가 아닌 다른 스레드에서 실행시켜준다.
};
//sub
subOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("exit");
}
}


- Publisher 가 느리고, 이를 호출하는 곳이(Subscribe)가 빠를 경우 사용해야 한다.
publishOn
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
Publisher<Integer> pubOnPub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor();
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
es.execute(() -> sub.onError(t));
}
@Override
public void onComplete() {
es.execute(() -> sub.onComplete());
}
});
};
//sub
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("exit");
}
}


- publish는 빠르게 동작하지만, 데이터를 받아서 처리하는 부분(Subscribe)은 느리게 처리해도 될 때 사용한다.
- PublishOn에 들어온 요청이 한 번에 처리되는 것이 아니다(동시성을 지켜준다)
- 하나의 Publisher가 여러 스레드를 생성해서 실행시킬 때에도 순서가 지켜진다.
2개의 스케줄러를 모두 적용했을 경우
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
//pub
Publisher<Integer> subOnPub = sub -> {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
//스레드의 이름을 바꿔주기 위해 스프링에서 제공하는 CustomizableThreadFactory를 사용한다
@Override
public String getThreadNamePrefix() {
return "subOn-";
}
});
es.execute(() -> pub.subscribe(sub));
};
Publisher<Integer> pubOnPub = sub -> {
subOnPub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
@Override
public String getThreadNamePrefix() {
return "pubOn-";
}
});
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
es.execute(() -> sub.onError(t));
}
@Override
public void onComplete() {
es.execute(() -> sub.onComplete());
}
});
};
//sub
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("exit");
}
}

- 스레드의 이름을 주기 위해 new CustomizableThreadFactory()를 (스프링에서 제공) 구현한다.
Flux를 사용하여 작성하기
public class FluxScEx {
public static void main(String[] args) {
Flux.range(1, 10)//1~10
.publishOn(Schedulers.newSingle("pub")) //소비하는 쪽 속도가 느릴 경우 사용
.log() //위쪽의 publisher 로 부터 받아오는 데이터를 볼 수 있다
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(System.out::println); //출력
System.out.println("exit");
}
}
- 매우 간단하게 publishOn, subscribeOn를 실행 가능하다.
- log()를 사용하게 되면 Publisher로부터 받아오는 데이터를 출력해서 보여줄 수 있다.
스레드가 종료되지 않고, 계속 실행돼서 동작하고 있는데, Publisher나 Flux는 서버 환경에서 동작하게 되며, 서버처럼 계속 실행되어 요청을 받게 되어 있는데 이를 종료하는 코드가 없어서 계속 동작한다(종료하도록 코드를 작성하자)
SchedulerEx 리펙토링
//...
Publisher<Integer> pubOnPub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
@Override
public String getThreadNamePrefix() {
return "pubOn-";
}
});
//...
@Override
public void onError(Throwable t) {
es.execute(() -> sub.onError(t));
es.shutdown(); //더이상 구독이 유용하지 않으니 종료
}
@Override
public void onComplete() {
es.execute(() -> sub.onComplete());
es.shutdown(); //더이상 구독이 유용하지 않으니 종료
}
//...
});
};
//...
- onError, onComplete 이벤트가 동작할 때에는 더 이상 구독이 유용하지 않으니 shutdown을 실행하면 된다.
Main 스레드 - User 스레드
public static void main(String[] args) throws InterruptedException {
//User가 만든 스레드는 메인스레드가 종료가 되어도 종료되지 않는다
Executors.newSingleThreadExecutor().execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e){
}
System.out.println("Hello");
});
System.out.println("exit");
}
- 해당 예제를 실행해 보면, Main 스레드가 먼저 종료 후, User 스레드가 종료가 되는데 문제없이 잘 실행된다.
- JVM이 User스레드가 내려가지 않는 이상 내부적으로 종료하지 않는다는 것을 알 수 있다.
그렇다면, 아래와 같은 코드는 왜 동작하지 않을까?
public static void main(String[] args) throws InterruptedException {
//Subscribe를 걸지 않았지만 자동으로 걸어주는 메서드가 있다.
//대표적으로 interval
Flux.interval(Duration.ofMillis(500))
//.take(10) //원하는 갯수만큼 데이터를 자르고 싶을 경우 해당 메서드를 사용하면 된다.
.subscribe(s -> log.debug("onNext:{}", s)); //아무것도 실행이 되지 않는다.
log.debug("exit");
TimeUnit.SECONDS.sleep(5);
}
- TimeUnit.SECONDS.sleep()을 주지 않을 경우 아무것도 실행되지 않고 종료되는 것을 볼 수 있다.
- StackOverFlow와 같은 곳에 해당 문제를 검색하게 되면, TimeUnit.SECONDS.sleep() 같은 해결책을 제시한다.
- 위 예제에서 보았듯이 Main 스레드가 먼저 종료돼서 실행되지 않은 것은 아니다 (그러면 무엇이 문제일까)
- 스레드에는 demon, user 스레드가 있다. 위 코드는 데몬스레드로 만들어 두기 때문에 종료하는 것이다.
- 메인 스레드는 User 스레드가 있다면 종료하지 않지만, 데몬 스레드가 남아있을 경우에는 그냥 종료해 버린다.
- 추가적으로 take라는 메서드를 통해 가지고 오고 싶은 데이터 개수만큼 제약을 걸 수도 있다.
take 구현하기
@Slf4j
public class IntervalEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
int no = 0;
boolean cancelled = false;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> {
if(cancelled){
exec.shutdown(); //더이상 실행시키지 않는다.
return;
}
sub.onNext(no++); //프로세스가 종료되기 전까지 계속 데이터를 전송
}, 0, 300, TimeUnit.MILLISECONDS);
}
@Override
public void cancel() {
cancelled = true;
}
});
};
Publisher<Integer> takePub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
int count = 0;
Subscription subsc;
@Override
public void onSubscribe(Subscription s) {
subsc = s;
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
if(++count > 10){ //10개 이상이면 실행을 안시킬 것이다
subsc.cancel();//cancel를 동작시켜 준다.
//데이터를 더이상 필요로 하지 않기 떄문에 cancel을 시키는 것이다
}
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
takePub.subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
}

- take 같은 경우 데이터를 더 이상 받지 않는 것이기 때문에, cancel 이벤트로 동작을 시켜야 한다.
- scheduleAtFixedRate 함수를 쓰면 intenal처럼 원하는 시간마다 이벤트를 동작시킬 수도 있다.
- 현재는 300ms 마다 no++를 실행시키며, onNext로 데이터를 전달하고 있다.
- takePub에서는 받은 데이터를 가지고 count값을 증가시키다가, count값이 10 이상이 되면 저장해둔 sub객체의 cancel 이벤트를 동작시킨다.
- onSubscribe를 통해 sub 객체를 저장해 두고 있는다.
- 저장해둔 sub 객체를 cancel 이벤트가 동작이 되면, cancelled 가 true가 되고, request에서 cancelled가 true이기 때문에 스레드를 종료시킨다.
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
|---|---|
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |
| Reactive Streams - Operators (0) | 2022.07.17 |
| Reactive Streams (0) | 2022.07.16 |