Toby의 ReactiveProgramming

Flux의 특징과 활용방법

webmaster 2022. 8. 30. 01:14
728x90

여러 개의 Reactive 스타일로 받는 Flux

@Slf4j
@SpringBootApplication
public class Chapter14Application {

  @RestController
  public static class MyController {

    @GetMapping("/event/{id}")
    public Mono<Event> event(@PathVariable long id) {
      return Mono.just(new Event(id, "event " + id));//특정 오브젝트를 컬렉션으로 다뤄 리액티브 스타일로 다루고 싶다면 어떻게 할까?
    }

    @GetMapping("/events")
    public Flux<Event> events() {
      return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
  }

  @Data
  @AllArgsConstructor
  public static class Event {

    long id;
    String value;
  }

  public static void main(String[] args) {
    SpringApplication.run(Chapter14Application.class, args);
  }
}
  • Event라는 static class를 만들어서 id, value를 선언한다.
  • Mono는 하나의 오브젝트를 받아서 처리하는 것에 능하다(Optional과 비슷한 단건 처리에 강하다)
  • 여러 건을 데이터를 스트림처럼 처리하기 위해서는 Flux를 사용해야 한다.
    • 복수개의 데이터를 받아서 onNext를 순차적으로 호출한 뒤, complete를 호출하는 것과 같다

Mono<List<Data>> 와 Flux<Data> 차이

@GetMapping("/event/{id}")
public Mono<List<Event>> event(@PathVariable long id) {
  List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
  return Mono.just(list); //Flux와 차이는 무엇일까?
}

@GetMapping("/events")
public Flux<Event> events() {
  return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}

결과가 다르지 않은것을 확인할 수 있다.

  • Flux<Data> 와 Mono<List<Data>> 는 같은 동작을 하지만, Flux 타입으로 선언하면 많은 기능을 사용할 수 있다.
  • 비슷하게 동작하지만, Mono 같은 경우, 안에 들어 있는 것이 List<Data> 객체이기 때문에, 해당 객체를 통채로 조작해야 하지만, Flux 같은 경우 fromIterable을 사용하면, 해당 스트림에 대한 여러 연산들을 지원하기에 좀 더 다양한 기능을 쓸 수 있다.
  • Http 스트림을 지원하는 방식에서는 Flux를 사용하는 것이 훨씬 간단하게 쓸 수 있다.

Http Stream을 사용한 예시(Flux를 사용하는 것이 좋다)

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) //client 요청에 Accept 헤더를 메핑이 1차 목적, 어떤 타입을 리턴할지 결정하는것이 2차 목적
public Flux<Event> events() {
  List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
  return Flux.fromIterable(list);
}

produces =&nbsp;MediaType.TEXT_EVENT_STREAM_VALUE

  • MediaType.TEXT_EVENT_STREAM_VALUE은 한 번에 리스트로 값을 전달해 주는 것이 아닌 Stream처럼 일정 시간이 지나고, 값을 전달해 준다.
  • 일정 시간이 지나고 값을 전달해주기 때문에, Flux를 사용하기에 최적화가 되어있다.

take()

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) //client 요청에 Accept 헤더를 메핑이 1차 목적, 어떤 타입을 리턴할지 결정하는것이 2차 목적
public Flux<Event> events() {
  //generation 한 코드를 작성해 보자
  Stream<Event> stream = Stream.generate(() -> new Event(System.currentTimeMillis(), "Value"));//.limit();
  return Flux.fromStream(stream) //Stream 된 데이터를 Flux에 넣을 수 있다.
  	  .delayElements(Duration.ofSeconds(1)) //next로 온 데이터에 delay를 걸 수 있다.
      .take(10); //데이터를 끊어서 가지고 올 수 있다( 10개의 데이터를 카운팅 하다 cancel 기능을 수행한다)
}

fromStream 마블 다이어그램
delay 마블 다이어그램
take 마블 다이어그램

  • fromStream 같은 경우 stream을 돌면서 해당 값을 순차적으로 돌 수 있다.
  • delayElements 같은 경우 delay를 줄 수 있는데, 해당 delay를 실행하는 스레드는 하나를 사용한다.
  • take 같은 경우 파라미터로 넘긴 값까지, 데이터를 넘기다가 cancel()를 실행하여 더 이상 데이터가 전달되지 않도록 한다.

generate() : Flux을 stream 기능

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
  return Flux
      .<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value"))) // sink = 데이터를 흘려 보내는 역할을 sink에서 한다.
      //sink를 넘겨주면 다음 데이터를 실행하는 코드를 작성하면 된다.
      .delayElements(Duration.ofSeconds(1))
      .take(10);
}

generate

  • generate 메서드를 사용하면, stream을 자바에서 만들 필요 없이 flux에서 바로 반환할 수 있다.
  • sink를 파라미터로 하는 람다식을 실행하며, sink의 next를 호출하며, 다음 데이터를 실행하는 코드를 작성하면 된다.
    • sink는 다음 파라미터를 이어주는 역할을 한다.

generate() : callable, bifunction을 파라미터로 받는 함수

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
  return Flux
      //.range(1, 10) //1~10까지 변하는 스트림 생성
      .<Event, Long>generate(() -> 1L, (id, sink) -> {
        sink.next(new Event(id, "value " + id));
        return id + 1;
      })//take로 뒤에서 끊고 싶을 때 사용
      .delayElements(Duration.ofSeconds(1))
      .take(10);
}

generate
generate의 오버로딩 메서드

  • 첫 번째 파라미터로 상태 값을 받고(generate 할 때마다 바뀌는 값) Bifuntion에 현재 상태와, 변경될 값을 파라미터로 받아 변경된 상태의 값을 반환하는 함수를 작성할 수 있다.
  • Object을 상태를 계속 변경해 가면서, 다음 sink에 변경된 상태를 계속 넘길 수 있다.

Zip(), Interval()

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
  Flux<Event> flux = Flux.<Event, Long>generate(() -> 1L, (id, sink) -> {
        sink.next(new Event(id, "value " + id));
        return id + 1;
      });
  Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); //데이터를 일정한 주기를 가지고, 0부터 값을 전달하는 함수

  //두가지 Flux를 병합할 수 있다.
  return Flux.zip(flux, interval)//2가지 이상을 작업을 묶어서 동작시킬때 사용한다.
      // 첫번째 flux 이벤트에 interval 이벤트 하나씩 묶어서 전달한다.
      .map(tu -> tu.getT1());
}

interval
zip

  • interval을 사용하게 된다면, 데이터를 일정한 주기를 가지고, 0부터 값을 전달할 수 있다.
  • zip을 사용하면, flux끼리 쌍으로 묶어서, 동작시킬 수 있다.
  • flux1-interval1 , flux2-interval2... 와 같이 묶여 Flux의 Tuple 타입으로 반환하고, Tuple 값을 map을 통해 꺼내거나, 값을 변경할 수 있다.
  • 2개 이상의 작업을 동기화시켜 실행할 때, 자주 사용되는 메서드이다.

Zip() 응용(zip으로 합쳐진 데이터를 통해 Event 생성)

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
  Flux<String> flux = Flux.generate((sink) -> sink.next("value"));
  Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

  return Flux.zip(flux, interval)
      .map(tu -> new Event(tu.getT2(), tu.getT1())); //이벤트 조합을 해당 부분에서 사용할 수도 있다.
}
  • zip으로 합쳐진 데이터를 통해 Event 생성할 수 있다.
    • Zip 결과로 반환된 Tuple에서 map 함수를 통해 Event를 생성해서 반환할 수 있다.
    • id 값은 tuple을 2번째, value 값은 tuple을 1번째 에서 꺼내서 값을 넣어준다.
  • Zip 같은 경우 여러 API를 Flux로 호출해 비동기로 호출하며, zip으로 이것들의 조합해, 동기화 실행을 할 수 있다.

 

728x90