728x90
Stream과 같이 데이터를 Operator(연산)을 통해 가공하여 Subscriber에 전달할 수 있다.
초기
/**
* Reactive Streams - Operators
* Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber
* Operator 연산을 통하면서 데이터가 가공되서 Subscriber에 제공이 된다.
*/
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
pub.subscribe(logSub()); //구독 진행
}
private static Publisher<Integer> iterPub(final List<Integer> iter) {
return new Publisher<Integer>() {
/*
Iterable<Integer> iter = Stream.iterate(1, a -> a + 1).limit(10)//Seed값에 대한 것을 a로 받아서 쓸 수 있다
.collect(Collectors.toList());
*/
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s)); //iterable이 가진 데이터를 모두 넘긴다.
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
//Subscriber 가 어떠한 이유로 더이상 처리하고 싶지 않을때, cancel 을 호출한다.
//결론적으로 Publisher 에게 더이상 데이터를 받고 싶지 않을 때 호출
}
});
}
};
}
private static Subscriber<Integer> logSub() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe:");
s.request(Long.MAX_VALUE); //너가 가지고 있는 데이터 모두 줘(무제한으로 생각하면 된다)
}
@Override
public void onNext(Integer i) {
log.debug("onNext: {}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
}
- Publisher가 Data를 SubScriber에 전달한다.
- 이렇게 Publisher가 Subscriber에 전달하는 과정에서 여러 Operator를 적용해 데이터를 전달할 수 있다.
- Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber
- EX) Java의 stream에서 데이터를 가공하는 과정
- 현재는 SubScription에서 log를 출력하는 부분, Publisher에서 iterator를 생성해서 전달하는 부분을 메서드로 추출하기만 하였다 -> 초기 상태
Map Operation (pub -> data1 -> mapPub -> data2 -> logSub)
mapPub을 걸쳐서 동작해야되기 때문에 Publisher에서 제공하는 기능을 모두 제공해야 한다.
pub -> data1 -> mapPub -> data2 -> logSub 흐름으로 동작시킬 것이다.
/**
* Publisher -> [DATA1] -> mapPub -> [DATA2] -> LogSub
* <- subscribe(logSub)
* -> onSubscribe(s)
* -> onNext
* -> onNext
* -> onComplete
* mapPub을 걸쳐서 동작해야되기 떄문에 Publisher에서 제공하는 기능을 모두 제공해야한다.
*/
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
//pub을 mapPub에 연결시켜주었다.
//Funtion 인터페이스 = 파라미터 타입(Integer) , 반환타입(Integer)라 Function<Integer, Integer> 라고 썻다
//Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>)s -> s * 10);
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
Publisher<Integer> map2Pub = mapPub(mapPub, s -> -s);
map2Pub.subscribe(logSub()); //구독 진행
}
private static Publisher<Integer> iterPub(final List<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s)); //iterable이 가진 데이터를 모두 넘긴다.
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
//Subscriber 가 어떠한 이유로 더이상 처리하고 싶지 않을때, cancel 을 호출한다.
//결론적으로 Publisher 에게 더이상 데이터를 받고 싶지 않을 때 호출
}
});
}
};
}
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
//전에 Publisher 만들던 것과 같다.
//logSubscriber 가 해당 메서드를 호출할 것이다
pub.subscribe(new Subscriber<Integer>() {//logSub가 mapPub을 호출하기 때문에 mapPub에서는 pub를 호출한다고 생각하면 된다
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s); //아무일 없이 중계만 한다
}
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i)); //Fuction을 적용한 값을 Sub에 넘기고 싶다.
}
@Override
public void onError(Throwable t) {
sub.onError(t);//아무일 없이 중계만 한다
}
@Override
public void onComplete() {
sub.onComplete();//아무일 없이 중계만 한다
}
});
}
};
}
private static Subscriber<Integer> logSub() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe:");
s.request(Long.MAX_VALUE); //너가 가지고 있는 데이터 모두 줘(무제한으로 생각하면 된다)
}
@Override
public void onNext(Integer i) {
log.debug("onNext: {}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
}

- mapPub : Operation 연산을 통해 데이터를 변환해서 다른 Pub를 호출해 준다.
- 1번째 파라미터 : 이전에 들어온 Pub
- 2번째 파라미터 : 적용시킬 함수(람다가 들어온다)
- onNext 함수에서 function을 적용시킨 item을 SubScriber에 전달한다.
- 여러 개의 Pub를 추가시키는 데에도 비용이 거의 들지 않는다.
리펙토링
@Slf4j
public class PubSub {
//...
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub){
@Override
public void onNext(Integer i) { //오버라이드 해서 적용한다.
sub.onNext(f.apply(i));
}
}); //delegateSub 에 subscriber를 전달하자
}
};
}
//...
}
DelegateSub
public class DelegateSub implements Subscriber<Integer> {
private Subscriber sub;
public DelegateSub(Subscriber sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s); //아무일 없이 중계만 한다
}
@Override
public void onNext(Integer i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
- DelegateSub라는 클래스를 만들어, SubScriber를 생성자로 받는다.
- 어떠한 SubScriber에게 전달할지를 알 수 없기에 생성자로 호출하는 곳에서 전달받는다.
- 기본적으로 onNext 같은 경우 위임하는 방식으로 구현해 두었다가, 내가 적용하고 싶은 곳이 있을 경우 Override 해서 구현하면 된다.
- DeletegateSub(sub)를 호출할 때, 내가 적용하고 싶은 연산을 onNext 메서드를 오버라이드 하여, 적용하면 된다.
SumPub()
public static void main(String[] args) {
//...
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
.collect(Collectors.toList()));
Publisher<Integer> sumPub = sumPub(pub);
sumPub.subscribe(logSub()); //구독이 진행된다.
}
//...
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub){
int sum = 0;
@Override
public void onNext(Integer i) {
sum += i; //Sum을 결과를 어디서 전달해야할까 -> onComplete 에서 전달하면 된다.
}
@Override
public void onComplete() {
sub.onNext(sum); //complete 이라고, 다른 Subscriber 에 complete 를 넘길 필요 없다.
sub.onComplete();//onNext 를 한번 호출 후(결과) onComplete 를 호출해준다
}
}); //중계할 Sub를 넘긴다.
}
};
}
//...
- onNext에서는 데이터를 Subscriber에 전달해 줄 필요 없이, sum에 데이터 값을 더해주면 된다.
- 그럼 언제 Subscriber에 데이터를 전달해야 할까?
- 답은 onComplete 이벤트에서 전달해 주면 된다.
- onComplete을 실행할 때, onNext()를 통해 더해진 값(sum)을 전달해 준 뒤, onComplete를 호출하면 된다.
reduce()
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
//ex) 1,2,3,4,5
// 0 -> (0, 1) -> 0 + 1 = 1
// 1 -> (1, 2) -> 1 + 2 = 3
// 3 -> (3, 3) -> 3 + 3 = 6
// 6 -> (6, 4) -> 6 + 4 = 10
// 10 -> (10, 5) -> 10 + 5 = 15
//초기 데이터를 시작으로, 함수의 연산을 통해 연산을 계속 진행해 최종 결과를 반환
Publisher<Integer> reducePub = reducePub(pub, 0,
(BiFunction<Integer, Integer, Integer>) (a, b) -> a + b); // aType, bType, returnType
//BiFunction 을 통해 인자가 2개인 람다를 전달
reducePub.subscribe(logSub()); //구독이 진행된다.
}
private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init,
BiFunction<Integer, Integer, Integer> biFunction) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
pub.subscribe(new DelegateSub(subscriber) {
int result = init;
@Override
public void onNext(Integer item) {
result = biFunction.apply(result, item);
}
@Override
public void onComplete() {
subscriber.onNext(result);
subscriber.onComplete();
}
});
}
};
}
- BiFunction을 통해 파라미터 인자 2개, 리턴 Value 가 있는 함수를 reducePub에 전달한다.
- reducePub 같은 경우 파라미터로, (Publisher, 초깃값, 적용될 람다)를 받는다.
- 초깃값에 람다를 적용해 값을 누적하며, 최종적으로 누적 값을 반환한다.
- OnNext에서 result에 람다가 적용된 값을 누적한 뒤, onComplete에서 OnNext로 누적 값을 전달 후, onComplete를 호출
Generic으로 적용
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
.collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
mapPub.subscribe(logSub());
}
//...
private static <T> Publisher<T> mapPub(Publisher<T> pub, Function<T, T> f) {
return new Publisher<T>() {
@Override
public void subscribe(Subscriber<? super T> sub) {
pub.subscribe(new DelegateSub<T>(sub){
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static <T> Subscriber<T> logSub() {
Subscriber<T> sub = new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
log.debug("onSubscribe:{}", subscription);
subscription.request(Long.MAX_VALUE); //해당 값만큼 데이터를 받겠다
}
@Override
public void onNext(T item) {
log.debug("onNext:{}", item);
}
@Override
public void onError(Throwable throwable) {
log.debug("onError:{}", throwable);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
return sub;
}
}
DelegateSub <T> : 제네릭 적용 Delegate
public class DelegateSub<T> implements Subscriber<T> {
private Subscriber sub;
public DelegateSub(Subscriber<? super T> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(T i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
- 제네릭을 적용하여, mapSub, logSub 연산을 모든 타입에 다 적용 가능하도록 메서드를 수정한다.
- 현재는 pub가 Integer 타입이기 때문에, mapSub, logSub 연산에 전달되는 값들은 모두 Integer이다.
Generic으로 받는 타입 반환 타입 입력받기(T -> R)
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
.collect(Collectors.toList()));
Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]");
mapPub.subscribe(logSub());
}
//T -> R
private static <T,R> Publisher<R> mapPub(Publisher<T> pub,
Function<T, R> function) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> subscriber) {
pub.subscribe(new DelegateSub<T, R>(subscriber) {
@Override
public void onNext(T item) {
sub.onNext(function.apply(item));
}
});
}
};
}
DelegateSub<T, R> : 제네릭 적용 Delegate 반환 타입 다르다.
public class DelegateSub<T,R> implements Subscriber<T> {
Subscriber sub;
public <R> DelegateSub(Subscriber<? super R> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription subscription) {
sub.onSubscribe(subscription); //중계역할만한다.
}
@Override
public void onNext(T item) {
sub.onNext(item);
}
@Override
public void onError(Throwable throwable) {
sub.onError(throwable);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
- mapPub에서 입력되는 타입과, 반환되는 타입이 다를 경우이다.
- 현재 예제에서는 Interger -> String으로 변환하는 것이다.
- mapPub에서 타입 인자
Reduce<T, R> 제네릭으로 변경하기
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
.collect(Collectors.toList()));
//Publisher<String> reducePub = reducePub(pub, "", (a, b) -> a + "-" + b);
Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
(a, b) -> a.append(b + ","));
reducePub.subscribe(logSub());
}
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init,
BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T i) {
result = bf.apply(result, i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
- 제네릭으로 바꾸기 위해 바로 변환하지 말고, 타입을 대입한 후, 하나씩 비교해 가면서 진행해야 한다.
- String 뿐만 아니라, StringBuilder에 대해서도 잘 동작하는 것을 확인할 수 있다.
Reactive 시작하기
리액터 공식 문서
https://projectreactor.io/docs/core/release/api/
reactor-core 3.4.21
projectreactor.io
ReactorEx
public class ReactorEx {
public static void main(String[] args) {
Flux.<Integer>create(e -> { //타입을 지정하지 않으면 Object로 받는다.
e.next(1);
e.next(2);
e.next(3);
e.complete();
})//Flux = Publisher
.log() //log를 통해 데이터가 어떤식으로 전달되는지 볼 수 있다.
.map(s -> s*10)
.reduce(0, (a,b) -> a + b)
.log()
.subscribe(System.out::println); //publisher 이니 Subscribe 가능
}
}

- Flux를 통해, Subscribe, Publisher 구현을 간단하게 할 수 있다.
- Flux = Publisher로 subscribe 이벤트를 작동시킬 수 있다.
- log()를 적절한 위치에 써주면, 주고받는 데이터를 확인할 수 있다.
main
@RestController
public class Chapter06Controller {
@RequestMapping("/hello")
public Publisher<String> hello(String name){ //Publisher 로 리턴이 가능
return new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onNext("Hello " + name);
s.onComplete();
}
@Override
public void cancel() {
}
});
}
}; //Subscribe 를 만들고, 데이터를 요청하는 것들은 스프링이 할일이다.
}
}
- Subscribe를 만들고, 데이터를 요청하는 것은 Spring이 할 일이다.
- 개발자는 Publisher만 만들어서 리턴만 해주면 된다.
접속하기 : http://localhost:8080/hello?name=spring
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
|---|---|
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |
| Reactive Streams - Schedulers (0) | 2022.08.22 |
| Reactive Streams (0) | 2022.07.16 |