Toby의 ReactiveProgramming

Reactive Streams - Operators

webmaster 2022. 7. 17. 20:14
728x90

Stream과 같이 데이터를 Operator(연산)을 통해 가공하여 Subscriber에 전달할 수 있다.

초기

/**
 * Reactive Streams - Operators
 * Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber 
 * Operator 연산을 통하면서 데이터가 가공되서 Subscriber에 제공이 된다.
 */
@Slf4j
public class PubSub {

  public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
    pub.subscribe(logSub()); //구독 진행
  }


  private static Publisher<Integer> iterPub(final List<Integer> iter) {
    return new Publisher<Integer>() {
      /*
      Iterable<Integer> iter = Stream.iterate(1, a -> a + 1).limit(10)//Seed값에 대한 것을 a로 받아서 쓸 수 있다
          .collect(Collectors.toList());
       */

      @Override
      public void subscribe(Subscriber<? super Integer> sub) {
        sub.onSubscribe(new Subscription() {
          @Override
          public void request(long n) {
            try {
              iter.forEach(s -> sub.onNext(s)); //iterable이 가진 데이터를 모두 넘긴다.
              sub.onComplete();
            } catch (Throwable t) {
              sub.onError(t);
            }
          }

          @Override
          public void cancel() {
            //Subscriber 가 어떠한 이유로 더이상 처리하고 싶지 않을때, cancel 을 호출한다.
            //결론적으로 Publisher 에게 더이상 데이터를 받고 싶지 않을 때 호출
          }
        });
      }
    };
  }

  private static Subscriber<Integer> logSub() {
    return new Subscriber<Integer>() {
      @Override

      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe:");
        s.request(Long.MAX_VALUE); //너가 가지고 있는 데이터 모두 줘(무제한으로 생각하면 된다)
      }

      @Override
      public void onNext(Integer i) {
        log.debug("onNext: {}", i);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    };
  }

}
  • Publisher가 Data를 SubScriber에 전달한다.
    • 이렇게 Publisher가 Subscriber에 전달하는 과정에서 여러 Operator를 적용해 데이터를 전달할 수 있다.
    • Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber
    • EX) Java의 stream에서 데이터를 가공하는 과정
  • 현재는 SubScription에서 log를 출력하는 부분, Publisher에서 iterator를 생성해서 전달하는 부분을 메서드로 추출하기만 하였다 -> 초기 상태

Map Operation (pub -> data1 -> mapPub -> data2 ->  logSub)

mapPub을 걸쳐서 동작해야되기 때문에 Publisher에서 제공하는 기능을 모두 제공해야 한다.

pub -> data1 -> mapPub -> data2 ->  logSub 흐름으로 동작시킬 것이다.

/**
 * Publisher -> [DATA1] -> mapPub -> [DATA2] -> LogSub
 *                      <- subscribe(logSub)
 *                      -> onSubscribe(s)
 *                      -> onNext
 *                      -> onNext
 *                      -> onComplete
 * mapPub을 걸쳐서 동작해야되기 떄문에 Publisher에서 제공하는 기능을 모두 제공해야한다.
 */
@Slf4j
public class PubSub {

  public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
    //pub을 mapPub에 연결시켜주었다.
    //Funtion 인터페이스 = 파라미터 타입(Integer) , 반환타입(Integer)라 Function<Integer, Integer> 라고 썻다
    //Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>)s -> s * 10);
    Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
    Publisher<Integer> map2Pub = mapPub(mapPub, s -> -s);

    map2Pub.subscribe(logSub()); //구독 진행
  }


  private static Publisher<Integer> iterPub(final List<Integer> iter) {
    return new Publisher<Integer>() {
      @Override
      public void subscribe(Subscriber<? super Integer> sub) {
        sub.onSubscribe(new Subscription() {
          @Override
          public void request(long n) {
            try {
              iter.forEach(s -> sub.onNext(s)); //iterable이 가진 데이터를 모두 넘긴다.
              sub.onComplete();
            } catch (Throwable t) {
              sub.onError(t);
            }
          }

          @Override
          public void cancel() {
            //Subscriber 가 어떠한 이유로 더이상 처리하고 싶지 않을때, cancel 을 호출한다.
            //결론적으로 Publisher 에게 더이상 데이터를 받고 싶지 않을 때 호출
          }
        });
      }
    };
  }


  private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
    return new Publisher<Integer>() {
      @Override
      public void subscribe(Subscriber<? super Integer> sub) {
        //전에 Publisher 만들던 것과 같다.
        //logSubscriber 가 해당 메서드를 호출할 것이다
        pub.subscribe(new Subscriber<Integer>() {//logSub가 mapPub을 호출하기 때문에 mapPub에서는 pub를 호출한다고 생각하면 된다
          @Override
          public void onSubscribe(Subscription s) {
            sub.onSubscribe(s); //아무일 없이 중계만 한다
          }

          @Override
          public void onNext(Integer i) {
            sub.onNext(f.apply(i)); //Fuction을 적용한 값을 Sub에 넘기고 싶다.

          }

          @Override
          public void onError(Throwable t) {
            sub.onError(t);//아무일 없이 중계만 한다
          }

          @Override
          public void onComplete() {
            sub.onComplete();//아무일 없이 중계만 한다
          }
        });
      }
    };
  }


  private static Subscriber<Integer> logSub() {
    return new Subscriber<Integer>() {
      @Override

      public void onSubscribe(Subscription s) {
        log.debug("onSubscribe:");
        s.request(Long.MAX_VALUE); //너가 가지고 있는 데이터 모두 줘(무제한으로 생각하면 된다)
      }

      @Override
      public void onNext(Integer i) {
        log.debug("onNext: {}", i);
      }

      @Override
      public void onError(Throwable t) {
        log.debug("onError:{}", t);
      }

      @Override
      public void onComplete() {
        log.debug("onComplete");
      }
    };
  }

}

실행 결과

  • mapPub : Operation 연산을 통해 데이터를 변환해서 다른 Pub를 호출해 준다.
    • 1번째 파라미터 : 이전에 들어온 Pub
    • 2번째 파라미터 : 적용시킬 함수(람다가 들어온다) 
  • onNext 함수에서 function을 적용시킨 item을 SubScriber에 전달한다.
  • 여러 개의 Pub를 추가시키는 데에도 비용이 거의 들지 않는다.

리펙토링

@Slf4j
public class PubSub {
	//...
    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
          @Override
          public void subscribe(Subscriber<? super Integer> sub) {
            pub.subscribe(new DelegateSub(sub){
              @Override
              public void onNext(Integer i) { //오버라이드 해서 적용한다.
                sub.onNext(f.apply(i));
              }
            }); //delegateSub 에 subscriber를 전달하자
          }
        };
    }
	//...
}

DelegateSub

public class DelegateSub implements Subscriber<Integer> {
  private Subscriber sub;

  public DelegateSub(Subscriber sub) {
    this.sub = sub;
  }

  @Override
  public void onSubscribe(Subscription s) {
    sub.onSubscribe(s); //아무일 없이 중계만 한다
  }

  @Override
  public void onNext(Integer i) {
    sub.onNext(i);
  }

  @Override
  public void onError(Throwable t) {
    sub.onError(t);
  }

  @Override
  public void onComplete() {
    sub.onComplete();
  }
}
  • DelegateSub라는 클래스를 만들어, SubScriber를 생성자로 받는다.
    • 어떠한 SubScriber에게 전달할지를 알 수 없기에 생성자로 호출하는 곳에서 전달받는다. 
  • 기본적으로 onNext 같은 경우 위임하는 방식으로 구현해 두었다가, 내가 적용하고 싶은 곳이 있을 경우 Override 해서 구현하면 된다.
    • DeletegateSub(sub)를 호출할 때, 내가 적용하고 싶은 연산을 onNext 메서드를 오버라이드 하여, 적용하면 된다.

SumPub()

public static void main(String[] args) {
  	//...
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
        .collect(Collectors.toList()));
    Publisher<Integer> sumPub = sumPub(pub);
    sumPub.subscribe(logSub()); //구독이 진행된다.
}

//...
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
    return new Publisher<Integer>() {
      @Override
      public void subscribe(Subscriber<? super Integer> sub) {
        pub.subscribe(new DelegateSub(sub){
          int sum = 0;

          @Override
          public void onNext(Integer i) {
            sum += i; //Sum을 결과를 어디서 전달해야할까 -> onComplete 에서 전달하면 된다.
          }

          @Override
          public void onComplete() {
            sub.onNext(sum); //complete 이라고, 다른 Subscriber 에 complete 를 넘길 필요 없다.
            sub.onComplete();//onNext 를 한번 호출 후(결과) onComplete 를 호출해준다
          }
        }); //중계할 Sub를 넘긴다.
      }
    };
  }
//...
  • onNext에서는 데이터를 Subscriber에 전달해 줄 필요 없이, sum에 데이터 값을 더해주면 된다.
    • 그럼 언제 Subscriber에 데이터를 전달해야 할까?
    • 답은 onComplete 이벤트에서 전달해 주면 된다.
  • onComplete을 실행할 때, onNext()를 통해 더해진 값(sum)을 전달해 준 뒤, onComplete를 호출하면 된다.

reduce()

 public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
        //ex) 1,2,3,4,5
        // 0 -> (0, 1) -> 0 + 1 = 1
        // 1 -> (1, 2) -> 1 + 2 = 3
        // 3 -> (3, 3) -> 3 + 3 = 6
        // 6 -> (6, 4) -> 6 + 4 = 10
        // 10 -> (10, 5) -> 10 + 5 = 15
        //초기 데이터를 시작으로, 함수의 연산을 통해 연산을 계속 진행해 최종 결과를 반환
        Publisher<Integer> reducePub = reducePub(pub, 0,
            (BiFunction<Integer, Integer, Integer>) (a, b) -> a + b); // aType, bType, returnType
		//BiFunction 을 통해 인자가 2개인 람다를 전달
        
        reducePub.subscribe(logSub()); //구독이 진행된다.
    }

private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init,
    BiFunction<Integer, Integer, Integer> biFunction) {

    return new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer> subscriber) {
            pub.subscribe(new DelegateSub(subscriber) {
                int result = init;

                @Override
                public void onNext(Integer item) {
                    result = biFunction.apply(result, item);
                }

                @Override
                public void onComplete() {
                    subscriber.onNext(result);
                    subscriber.onComplete();
                }
            });
        }
    };
}
  • BiFunction을 통해 파라미터 인자 2개, 리턴 Value 가 있는 함수를 reducePub에 전달한다.
  • reducePub 같은 경우 파라미터로, (Publisher, 초깃값, 적용될 람다)를 받는다.
    • 초깃값에 람다를 적용해 값을 누적하며, 최종적으로 누적 값을 반환한다.
  • OnNext에서 result에 람다가 적용된 값을 누적한 뒤, onComplete에서 OnNext로 누적 값을 전달 후, onComplete를 호출

Generic으로 적용

@Slf4j
public class PubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
            .collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
        mapPub.subscribe(logSub()); 
    }
	//...
    private static <T> Publisher<T> mapPub(Publisher<T> pub, Function<T, T> f) {
        return new Publisher<T>() {
          @Override
          public void subscribe(Subscriber<? super T> sub) {
            pub.subscribe(new DelegateSub<T>(sub){
              @Override
              public void onNext(T i) {
                sub.onNext(f.apply(i));
              }
            });
          }
        };
    }

    private static <T> Subscriber<T> logSub() {
        Subscriber<T> sub = new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                log.debug("onSubscribe:{}", subscription);
                subscription.request(Long.MAX_VALUE); //해당 값만큼 데이터를 받겠다
            }

            @Override
            public void onNext(T item) {
                log.debug("onNext:{}", item);
            }

            @Override
            public void onError(Throwable throwable) {
                log.debug("onError:{}", throwable);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
        return sub;
    }
}

DelegateSub <T> : 제네릭 적용 Delegate

public class DelegateSub<T> implements Subscriber<T> {
  private Subscriber sub;

  public DelegateSub(Subscriber<? super T> sub) {
    this.sub = sub;
  }

  @Override
  public void onSubscribe(Subscription s) {
    sub.onSubscribe(s);
  }

  @Override
  public void onNext(T i) {
    sub.onNext(i);
  }

  @Override
  public void onError(Throwable t) {
    sub.onError(t);
  }

  @Override
  public void onComplete() {
    sub.onComplete();
  }
}
  • 제네릭을 적용하여, mapSub, logSub 연산을 모든 타입에 다 적용 가능하도록 메서드를 수정한다.
  • 현재는 pub가 Integer 타입이기 때문에, mapSub, logSub 연산에 전달되는 값들은 모두 Integer이다.

Generic으로 받는 타입 반환 타입 입력받기(T -> R)

public static void main(String[] args) {
    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
        .collect(Collectors.toList()));
    Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]");
    mapPub.subscribe(logSub()); 
}

//T -> R
private static <T,R> Publisher<R> mapPub(Publisher<T> pub,
    Function<T, R> function) {
    return new Publisher<R>() {
        @Override
        public void subscribe(Subscriber<? super R> subscriber) {
            pub.subscribe(new DelegateSub<T, R>(subscriber) {
                @Override
                public void onNext(T item) {
                    sub.onNext(function.apply(item));
                }
            });
        }
    };
}

DelegateSub<T, R> : 제네릭 적용 Delegate 반환 타입 다르다.

public class DelegateSub<T,R> implements Subscriber<T> {
    Subscriber sub;
    public <R> DelegateSub(Subscriber<? super R> sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        sub.onSubscribe(subscription); //중계역할만한다.
    }

    @Override
    public void onNext(T item) {
        sub.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        sub.onError(throwable);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}
  • mapPub에서 입력되는 타입과, 반환되는 타입이 다를 경우이다.
    • 현재 예제에서는 Interger -> String으로 변환하는 것이다.
  • mapPub에서 타입 인자 

Reduce<T, R> 제네릭으로 변경하기

public static void main(String[] args) {

    Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10)
        .collect(Collectors.toList()));

    //Publisher<String> reducePub = reducePub(pub, "", (a, b) -> a + "-" + b);
    Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
        (a, b) -> a.append(b + ","));
    reducePub.subscribe(logSub()); 
}

private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init,
      BiFunction<R, T, R> bf) {
    return new Publisher<R>() {
      @Override
      public void subscribe(Subscriber<? super R> sub) {
        pub.subscribe(new DelegateSub<T, R>(sub) {
          R result = init;

          @Override
          public void onNext(T i) {
            result = bf.apply(result, i);
          }

          @Override
          public void onComplete() {
            sub.onNext(result);
            sub.onComplete();
          }
        });
      }
    };
  }
  • 제네릭으로 바꾸기 위해 바로 변환하지 말고, 타입을 대입한 후, 하나씩 비교해 가면서 진행해야 한다.
  • String 뿐만 아니라, StringBuilder에 대해서도 잘 동작하는 것을 확인할 수 있다.

Reactive 시작하기

리액터 공식 문서

https://projectreactor.io/

https://projectreactor.io/docs/core/release/api/

 

reactor-core 3.4.21

 

projectreactor.io

ReactorEx

public class ReactorEx {

  public static void main(String[] args) {
    Flux.<Integer>create(e -> { //타입을 지정하지 않으면 Object로 받는다.
          e.next(1);
          e.next(2);
          e.next(3);
          e.complete();
        })//Flux = Publisher
        .log() //log를 통해 데이터가 어떤식으로 전달되는지 볼 수 있다.
        .map(s -> s*10)
        .reduce(0, (a,b) -> a + b) 
        .log()
        .subscribe(System.out::println); //publisher 이니 Subscribe 가능
  }

}

출력 로그

  • Flux를 통해, Subscribe, Publisher 구현을 간단하게 할 수 있다.
    • Flux = Publisher로 subscribe 이벤트를 작동시킬 수 있다.
  • log()를 적절한 위치에 써주면, 주고받는 데이터를 확인할 수 있다.

main

@RestController
public class Chapter06Controller {
  @RequestMapping("/hello")
  public Publisher<String> hello(String name){ //Publisher 로 리턴이 가능
    return new Publisher<String>() {
      @Override
      public void subscribe(Subscriber<? super String> s) {
        s.onSubscribe(new Subscription() {
          @Override
          public void request(long n) {
            s.onNext("Hello " + name);
            s.onComplete();
          }

          @Override
          public void cancel() {

          }
        });
      }
    }; //Subscribe 를 만들고, 데이터를 요청하는 것들은 스프링이 할일이다.
  }
}
  • Subscribe를 만들고, 데이터를 요청하는 것은 Spring이 할 일이다.
    • 개발자는 Publisher만 만들어서 리턴만 해주면 된다.

접속하기 : http://localhost:8080/hello?name=spring

 

728x90