FRP(Functional Reactive Programming)
Reactive Programing : 함수형 프로그래밍
Reactive : 외부에 이벤트가 발생하면, 거기에 대응하는 방식으로 코드를 작성하는 것
ReactiveX : 외부에서 이벤트가 발생하면 이를 반응하는 프로그래밍(즉, 이벤트 위주의 프로그래밍)
나오게 된 배경
- Duality = 상대성
- Observer Pattern = 디자인 패턴, 옵져버와 리스너를 통해 이벤트를 동작
- Reactive Stream = Reactive 프로그래밍의 표준, JAVA9의 API로 들어가있다.
Collection의 Iterator
public static void main(String[] args) {
//List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); //리스트는 Iterable를 상속받고 있기 떄문에 해당 타입으로 받아도 오류가 없다.
Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//Iterable 생성
//for-each //Iterable 이기떄문에 for-each를 사용할 수 있는것이다.
//꼭 Collection이 아니더라도, Iterable 인터페이스를 구현한다면 for-each를 사용할 수 있다.
for (Integer i : iter) {
System.out.println("i = " + i);
}
}

- JAVA의 Collection은 Iterable을 상속받고, Iterator를 구현한 모든 클래스는 for-each문을 동작시킬 수 있다.
- Iterable 같은 경우 iterator를 구현해야 한다.
- iterator next(), hasNext()만 구현하면 된다.
Iterable를 구현해야 하는 for-each문
public static void main(String[] args) {
//익명 클래스이기 때문에 람다로 구현 가능
Iterable<Integer> iter = () ->
new Iterator<>() {
int i = 0;
final static int MAX = 10;
@Override
public boolean hasNext() {
return i < MAX;
}
@Override
public Integer next() {
return ++i;
}
};
for (Integer i : iter) {
//JAVA의 for-each는 iterator만 구현되면 사용할 수 있다
System.out.println(i);
}
//JAVA 5이전 썻던 방식
for (Iterator<Integer> it = iter.iterator(); it.hasNext(); ) {
System.out.println(it.next());
}
}
- for-each문을 동작하기 위해서는 Iterator를 구현한 클래스를, Iterator를 구현하기 위해서는 Iterable 클래스를 만들어 주어야 한다.
- Iterator를 생성하기 위해서는 Iterable이 반드시 필요하다.
- Iterable를 구현하기 위해서는 반드시 2가지 메서드(next(), hasNext() 메서드를 구현되어 있어야 한다.
- 여러 클라이언트의 요청이 들어와도 모두 처리할 수 있는 장점이 있다.
- JAVA5 이전에는 for-each문이 존재하지 않았기 때문에 for문을 사용해서 작성하였다
Observer Pattern
Observable (이벤트들을 등록시킨다) <-> Iterable
Observable = Source -> 즉, Event/Data를 던진다(누구에게? Observer에게)
-> Observer에게 이벤트를 던지기 위해서는 Observer를 Observable에 등록해야 한다.
| Observable | Iterable |
| Push(해당 Observable를 정의한 곳에서 준다) | Pull(다음것을 내가 가져오는 방식-> it.next()) |
| DATA method() | void method(DATA) |

public static void main(String[] args) {
Observer ob = new Observer() { //받는쪽
@Override
public void update(Observable o, Object arg) {
System.out.println(arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob); //등록
io.run();
}
static class IntObservable extends Observable implements Runnable { //비동기 적으로 돌리기 위해 Runnable 추가
@Override
public void run() {
for (int i = 1; i <= 10; ++i) { //source 쪽
setChanged(); //변화가 생겼다는 것을 해당 메서드로 알려줌
//int i = it.next() //PULL
notifyObservers(i); //변경되는 것을 알려준다 //PUSH
//i가 메서드 안에 있으므로 Push 라고 하는것이다.
}
}
}
- void method(DATA)와 같은 이벤트를 통해 위와 같은 방식과 반대되는 방식으로 동작한다.
- DATA method()와 기능은 똑같다
- 여러 이벤트를 재사용하거나, 여러 스레드를 독립적으로 동작시키는 것에 특화되어 있다.
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT"); //비동기식으로 동작하기 때문에 해당 코드가 먼저 실행됬다
es.shutdown();
}

- 여러 개의 스레드에서 비동기 적으로 동작시키기 수월하다.
- 재사용성이 증가한다.
- 별도의 스레드에서 실행을 시켯기 때문에 Sub 쓰레드에서 실행시킨 동작이 더 늦게 출력된다.
- es.execute(io)는 별도의 스레드(pool-1-thread-1)를 만들어 해당 스레드에서 이를 처리하도록 하고, mainThread는 그대로 실행 되도록 작성해두엇기 때문에 mainThread에 작성된 것이 먼저 출력된다.
- Push 방식으로 코드를 작성하면 별도의 쓰레드에서 동작하는 코드를 쉽게 작성할 수 있다.
RxJAVA와 Reactive Stream
이전에 작성한 Observer 패턴에는 큰 문제가 있다.
- Complete : Complete 개념이 존재가 없다(끝나지 않는 이벤트도 있지만, 끝나야 되는 이벤트도 있다) -> notifyObservers()라는 이벤트밖에 없기 때문에 Complete 개념이 없다.
- Error : 복구 가능한 오류가 발생했을 때, 어떻게 처리할지에 대한, 대안이 Observer 패턴에는 전혀 구현이 되어있지 않다.
https://www.reactive-streams.org/
https://www.reactive-streams.org/
Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java
www.reactive-streams.org
ReactiveX
CROSS-PLATFORM Available for idiomatic Java, Scala, C#, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others
reactivex.io
- Reactive Streams는 간단하게 비동기 스트림을 처리하는 표준을 제공하기 위한 것이라고 생각하면 된다.
- Reactive Streams의 인터페이스들을 구현한 구현체는 RxJava, RxKotlin, RxJS, RxScala 등이 있다.
- Reactive Streams이 표준이고, Reactive Streams 표준을 지킨 클래스끼리는 상호 호환이 가능하다고 생각하면 된다
공통 인터페이스(Reactive Streams)에 구현해야 하는 메서드를 살펴보자


- Reactive Streams 인터페이스에 접근하기 위해서는 4가지(Proceesor, Publisher, Subscriber, Subscription) 메서드만 구현하면 된다.
- 해당 메서드만 구현하면 되는 것은 아니고, 스펙이 있어 이를 잘 파악하고 사용해야 한다.
- https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#specification
GitHub - reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM
Reactive Streams Specification for the JVM. Contribute to reactive-streams/reactive-streams-jvm development by creating an account on GitHub.
github.com
/**
* Observer 패턴을 문제를 해결한 클래스
*/
public class PubSub {
public static void main(String[] args) {
//Publisher <- Observable
//Subscriber <- Observer
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
Publisher p = new Publisher() { //Subscribe가 구독하는 방식
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) { //파라미터 n = 최대 받을수 있는 갯수
try {
//while (true) {
while (n-- > 0) { //n으로 제약을 준다.
if (it.hasNext()) {
subscriber.onNext(it.next()); //요청값을 보내주면 된다.
} else {
subscriber.onComplete();
break;
}
}
}catch (RuntimeException e){
subscriber.onError(e); //onError 라는 메서드를 호출해서 우아하게 처리가능
}
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) { //반드시 호출이 되어야한다.
System.out.println("onSubscribe");
//최초 request는 onSubscribe 에서 해야한다.
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) { //0~무제한
//Publisher가 데이터를 주면 해당 메서드에서 받는다.
System.out.println("onNext = " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) { //onComplete,onError 둘 중하나만 사용, 아예 안써도 됨
//Exception 발생시키지 말고, 해당 메서드를 통해서 오류를 전해준다.
System.out.println("onError");
}
@Override
public void onComplete() {
//Publisher가 더 이상 줄 데이터가 없을 때
System.out.println("onComplete");
}
};
p.subscribe(s);
}
}

- Publisher = Observable, Subscriber = Observer
- Subscriber는 반드시 onSubscribe라는 함수를 반드시 호출해야 한다.
- onSubscribe 메서드는 반드시 호출해야 한다.
- onNext* : 0~ 무한대(N)까지 호출이 가능하다, Publisher가 데이터를 주면 해당 메서드에서 받는다.
- OnError | onComplete : 둘 중 하나를 호출하거나, 호출하지 않거나 할 수 있다.
- OnError : Exception을 발생시키지 말고, 해당 메서드를 통해서 오류를 전달(해당 메서드에서 try-catch로 예외를 복구할 수 있다)
- onComplete : Publisher가 더 이상 줄 데이터가 없을 때, 호출되는 메서드
- Subscription : Subscriber, Publisher 사이에서 구독 정보를 가진 중간 매개체 역할을 한다(속도 조절?)
ExecutorService를 사용해 다른 스레드에서 실행하기
public class PubSub {
public static void main(String[] args) throws InterruptedException {
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService es = Executors.newSingleThreadExecutor();
Publisher p = new Publisher() { //Subscribe가 구독하는 방식
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) { //파라미터 n = 최대 받을수 있는 갯수
es.execute(() -> {
int i = 0;
try {
while (i++ < n) { //n으로 제약을 준다.
if (it.hasNext()) {
subscriber.onNext(it.next()); //요청값을 보내주면 된다.
} else {
subscriber.onComplete();
break;
}
}
} catch (RuntimeException e) {
subscriber.onError(e); //onError 라는 메서드를 호출해서 우아하게 처리가능
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) { //반드시 호출이 되어야한다.
System.out.println(Thread.currentThread().getName() + " onSubscribe");
//최초 request는 onSubscribe 에서 해야한다.
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) { //0~무제한
//Publisher가 데이터를 주면 해당 메서드에서 받는다.
System.out.println(Thread.currentThread().getName() + " onNext = " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) { //onComplete,onError 둘 중하나만 사용, 아예 안써도 됨
//Exception 발생시키지 말고, 해당 메서드를 통해서 오류를 전해준다.
System.out.println("onError = " + throwable.getMessage());
}
@Override
public void onComplete() {
//Publisher가 더 이상 줄 데이터가 없을 때
System.out.println("onComplete");
}
};
p.subscribe(s);
//es.shutdown(); //쓰레드가 종료가 되어서 안된다.
es.awaitTermination(10, TimeUnit.HOURS);
es.shutdown();
}
}
- ExecutorService를 활용하여, 다른 스레드를 손쉽게 호출해서 사용할 수 있다.
- 비동기 방식(Subscription request에서 다른 쓰레드를 통해 메서드를 호출할 수 있게끔 한다)
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |
|---|---|
| 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2022.08.26 |
| 자바와 스프링의 비동기 기술 (0) | 2022.08.22 |
| Reactive Streams - Schedulers (0) | 2022.08.22 |
| Reactive Streams - Operators (0) | 2022.07.17 |