Toby의 ReactiveProgramming

Reactive Streams

webmaster 2022. 7. 16. 13:07
728x90

FRP(Functional Reactive Programming)

Reactive Programing : 함수형 프로그래밍

Reactive : 외부에 이벤트가 발생하면, 거기에 대응하는 방식으로 코드를 작성하는 것

ReactiveX : 외부에서 이벤트가 발생하면 이를 반응하는 프로그래밍(즉, 이벤트 위주의 프로그래밍)

나오게 된 배경

  • Duality = 상대성
  • Observer Pattern = 디자인 패턴, 옵져버와 리스너를 통해 이벤트를 동작
  • Reactive Stream = Reactive 프로그래밍의 표준, JAVA9의 API로 들어가있다.

Collection의 Iterator

public static void main(String[] args) {
    //List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); //리스트는 Iterable를 상속받고 있기 떄문에 해당 타입으로 받아도 오류가 없다.
    Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//Iterable 생성
    //for-each //Iterable 이기떄문에 for-each를 사용할 수 있는것이다.
    //꼭 Collection이 아니더라도, Iterable 인터페이스를 구현한다면 for-each를 사용할 수 있다.
    for (Integer i : iter) {
        System.out.println("i = " + i);
    }
}

Collection&nbsp; Interface의 iterable

  • JAVA의 Collection은 Iterable을 상속받고, Iterator를 구현한 모든 클래스는 for-each문을 동작시킬 수 있다.
  • Iterable 같은 경우 iterator를 구현해야 한다.
    • iterator next(), hasNext()만 구현하면 된다.

Iterable를 구현해야 하는 for-each문

public static void main(String[] args) {
    //익명 클래스이기 때문에 람다로 구현 가능
    Iterable<Integer> iter = () ->
        new Iterator<>() {
            int i = 0;
            final static int MAX = 10;

            @Override
            public boolean hasNext() {
                return i < MAX;
            }

            @Override
            public Integer next() {
                return ++i;
            }
        };
    for (Integer i : iter) {
        //JAVA의 for-each는 iterator만 구현되면 사용할 수 있다
        System.out.println(i);
    }

    //JAVA 5이전 썻던 방식
    for (Iterator<Integer> it = iter.iterator(); it.hasNext(); ) {
        System.out.println(it.next());
    }
}
  • for-each문을 동작하기 위해서는 Iterator를 구현한 클래스를, Iterator를 구현하기 위해서는 Iterable 클래스를 만들어 주어야 한다.
    • Iterator를 생성하기 위해서는 Iterable이 반드시 필요하다.
    • Iterable를 구현하기 위해서는 반드시 2가지 메서드(next(), hasNext() 메서드를 구현되어 있어야 한다.
  • 여러 클라이언트의 요청이 들어와도 모두 처리할 수 있는 장점이 있다.
  • JAVA5 이전에는 for-each문이 존재하지 않았기 때문에 for문을 사용해서 작성하였다

Observer Pattern

Observable (이벤트들을 등록시킨다) <-> Iterable

Observable = Source -> 즉, Event/Data를 던진다(누구에게? Observer에게)

-> Observer에게 이벤트를 던지기 위해서는 Observer를 Observable에 등록해야 한다.

Observable Iterable
Push(해당 Observable를 정의한 곳에서 준다) Pull(다음것을 내가 가져오는 방식-> it.next())
DATA method() void method(DATA)

JAVA에 이미 정의되어 있는 Observable

public static void main(String[] args) {
    Observer ob = new Observer() { //받는쪽
      @Override
      public void update(Observable o, Object arg) {
        System.out.println(arg);
      }
    };

    IntObservable io = new IntObservable();
    io.addObserver(ob); //등록

    io.run();
  }

  static class IntObservable extends Observable implements Runnable { //비동기 적으로 돌리기 위해 Runnable 추가

    @Override
    public void run() {
      for (int i = 1; i <= 10; ++i) { //source 쪽
        setChanged(); //변화가 생겼다는 것을 해당 메서드로 알려줌
        //int i = it.next()                     //PULL
        notifyObservers(i); //변경되는 것을 알려준다 //PUSH
        //i가 메서드 안에 있으므로 Push 라고 하는것이다.
      }
    }
  }
  • void method(DATA)와 같은 이벤트를 통해 위와 같은 방식과 반대되는 방식으로 동작한다.
    • DATA method()와 기능은 똑같다
  • 여러 이벤트를 재사용하거나, 여러 스레드를 독립적으로 동작시키는 것에 특화되어 있다.
public static void main(String[] args) {
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            System.out.println(Thread.currentThread().getName() + " " + arg);
        }
    };
    IntObservable io = new IntObservable();
    io.addObserver(ob);

    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(io);

    System.out.println(Thread.currentThread().getName() + " EXIT"); //비동기식으로 동작하기 때문에 해당 코드가 먼저 실행됬다
    es.shutdown();
}

실행 결과

  • 여러 개의 스레드에서 비동기 적으로 동작시키기 수월하다.
  • 재사용성이 증가한다.
  • 별도의 스레드에서 실행을 시켯기 때문에 Sub 쓰레드에서 실행시킨 동작이 더 늦게 출력된다.
    • es.execute(io)는 별도의 스레드(pool-1-thread-1)를 만들어 해당 스레드에서 이를 처리하도록 하고, mainThread는 그대로 실행 되도록 작성해두엇기 때문에 mainThread에 작성된 것이 먼저 출력된다.
    • Push 방식으로 코드를 작성하면 별도의 쓰레드에서 동작하는 코드를 쉽게 작성할 수 있다.

RxJAVA와 Reactive Stream

이전에 작성한 Observer 패턴에는 큰 문제가 있다.

  1. Complete : Complete 개념이 존재가 없다(끝나지 않는 이벤트도 있지만, 끝나야 되는 이벤트도 있다) -> notifyObservers()라는 이벤트밖에 없기 때문에 Complete 개념이 없다.
  2. Error : 복구 가능한 오류가 발생했을 때, 어떻게 처리할지에 대한, 대안이 Observer 패턴에는 전혀 구현이 되어있지 않다.

https://www.reactive-streams.org/

 

https://www.reactive-streams.org/

Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java

www.reactive-streams.org

 

https://reactivex.io/

 

ReactiveX

CROSS-PLATFORM Available for idiomatic Java, Scala, C#, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others

reactivex.io

  • Reactive Streams는 간단하게 비동기 스트림을 처리하는 표준을 제공하기 위한 것이라고 생각하면 된다.
  • Reactive Streams의 인터페이스들을 구현한 구현체는 RxJava, RxKotlin, RxJS, RxScala 등이 있다.
    • Reactive Streams이 표준이고, Reactive Streams 표준을 지킨 클래스끼리는 상호 호환이 가능하다고 생각하면 된다

공통 인터페이스(Reactive Streams)에 구현해야 하는 메서드를 살펴보자

인터페이스에 접근하게 되면 4가지 메서드만 구현하면 되는것을 확인할 수 있다(간단하다)
표준문서

 

GitHub - reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM

Reactive Streams Specification for the JVM. Contribute to reactive-streams/reactive-streams-jvm development by creating an account on GitHub.

github.com

/**
 * Observer 패턴을 문제를 해결한 클래스
 */
public class PubSub {

  public static void main(String[] args) {
    //Publisher <- Observable
    //Subscriber <- Observer
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);

    Publisher p = new Publisher() { //Subscribe가 구독하는 방식
      @Override
      public void subscribe(Subscriber subscriber) {

        Iterator<Integer> it = itr.iterator();

        subscriber.onSubscribe(new Subscription() {
          @Override
          public void request(long n) { //파라미터 n = 최대 받을수 있는 갯수
            try {
              //while (true) {
              while (n-- > 0) { //n으로 제약을 준다.
                if (it.hasNext()) {
                  subscriber.onNext(it.next()); //요청값을 보내주면 된다.
                } else {
                  subscriber.onComplete();
                  break;
                }
              }
            }catch (RuntimeException e){
              subscriber.onError(e); //onError 라는 메서드를 호출해서 우아하게 처리가능
            }
          }

          @Override
          public void cancel() {

          }
        });
      }
    };

    Subscriber<Integer> s = new Subscriber<Integer>() {

      Subscription subscription;

      @Override
      public void onSubscribe(Subscription subscription) { //반드시 호출이 되어야한다.
        System.out.println("onSubscribe");
        //최초 request는 onSubscribe 에서 해야한다.
        this.subscription = subscription;
        subscription.request(1);
      }

      @Override
      public void onNext(Integer item) { //0~무제한
        //Publisher가 데이터를 주면 해당 메서드에서 받는다.
        System.out.println("onNext = " + item);
        this.subscription.request(1);
      }

      @Override
      public void onError(Throwable throwable) { //onComplete,onError 둘 중하나만 사용, 아예 안써도 됨
        //Exception 발생시키지 말고, 해당 메서드를 통해서 오류를 전해준다.
        System.out.println("onError");
      }

      @Override
      public void onComplete() {
        //Publisher가 더 이상 줄 데이터가 없을 때
        System.out.println("onComplete");
      }
    };

    p.subscribe(s);
  }

}

Publisher, Subscriber, Subscription 관계

  • Publisher = Observable, Subscriber = Observer
  • Subscriber는 반드시 onSubscribe라는 함수를 반드시 호출해야 한다.
    • onSubscribe 메서드는 반드시 호출해야 한다.
    • onNext* : 0~ 무한대(N)까지 호출이 가능하다, Publisher가 데이터를 주면 해당 메서드에서 받는다.
    • OnError | onComplete : 둘 중 하나를 호출하거나, 호출하지 않거나 할 수 있다.
      • OnError : Exception을 발생시키지 말고, 해당 메서드를 통해서 오류를 전달(해당 메서드에서 try-catch로 예외를 복구할 수 있다)
      • onComplete : Publisher가 더 이상 줄 데이터가 없을 때, 호출되는 메서드
  • Subscription : Subscriber, Publisher 사이에서 구독 정보를 가진 중간 매개체 역할을 한다(속도 조절?)

ExecutorService를 사용해 다른 스레드에서 실행하기

public class PubSub {

  public static void main(String[] args) throws InterruptedException {
    Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
    ExecutorService es = Executors.newSingleThreadExecutor();

    Publisher p = new Publisher() { //Subscribe가 구독하는 방식
      @Override
      public void subscribe(Subscriber subscriber) {

        Iterator<Integer> it = itr.iterator();

        subscriber.onSubscribe(new Subscription() {
          @Override
          public void request(long n) { //파라미터 n = 최대 받을수 있는 갯수
            es.execute(() -> {
              int i = 0;
              try {
                while (i++ < n) { //n으로 제약을 준다.
                  if (it.hasNext()) {
                    subscriber.onNext(it.next()); //요청값을 보내주면 된다.
                  } else {
                    subscriber.onComplete();
                    break;
                  }
                }
              } catch (RuntimeException e) {
                subscriber.onError(e); //onError 라는 메서드를 호출해서 우아하게 처리가능
              }
            });
          }

          @Override
          public void cancel() {

          }
        });
      }
    };

    Subscriber<Integer> s = new Subscriber<Integer>() {

      Subscription subscription;

      @Override
      public void onSubscribe(Subscription subscription) { //반드시 호출이 되어야한다.
        System.out.println(Thread.currentThread().getName() + " onSubscribe");
        //최초 request는 onSubscribe 에서 해야한다.
        this.subscription = subscription;
        subscription.request(1);
      }

      @Override
      public void onNext(Integer item) { //0~무제한
        //Publisher가 데이터를 주면 해당 메서드에서 받는다.
        System.out.println(Thread.currentThread().getName() + " onNext = " + item);
        this.subscription.request(1);
      }

      @Override
      public void onError(Throwable throwable) { //onComplete,onError 둘 중하나만 사용, 아예 안써도 됨
        //Exception 발생시키지 말고, 해당 메서드를 통해서 오류를 전해준다.
        System.out.println("onError = " + throwable.getMessage());
      }

      @Override
      public void onComplete() {
        //Publisher가 더 이상 줄 데이터가 없을 때
        System.out.println("onComplete");
      }
    };

    p.subscribe(s);
    //es.shutdown(); //쓰레드가 종료가 되어서 안된다.
    es.awaitTermination(10, TimeUnit.HOURS);
    es.shutdown();
  }

}
  • ExecutorService를 활용하여, 다른 스레드를 손쉽게 호출해서 사용할 수 있다.
  • 비동기 방식(Subscription request에서 다른 쓰레드를 통해 메서드를 호출할 수 있게끔 한다)

 

728x90