728x90
여러 개의 Reactive 스타일로 받는 Flux
@Slf4j
@SpringBootApplication
public class Chapter14Application {
@RestController
public static class MyController {
@GetMapping("/event/{id}")
public Mono<Event> event(@PathVariable long id) {
return Mono.just(new Event(id, "event " + id));//특정 오브젝트를 컬렉션으로 다뤄 리액티브 스타일로 다루고 싶다면 어떻게 할까?
}
@GetMapping("/events")
public Flux<Event> events() {
return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}
}
@Data
@AllArgsConstructor
public static class Event {
long id;
String value;
}
public static void main(String[] args) {
SpringApplication.run(Chapter14Application.class, args);
}
}
- Event라는 static class를 만들어서 id, value를 선언한다.
- Mono는 하나의 오브젝트를 받아서 처리하는 것에 능하다(Optional과 비슷한 단건 처리에 강하다)
- 여러 건을 데이터를 스트림처럼 처리하기 위해서는 Flux를 사용해야 한다.
- 복수개의 데이터를 받아서 onNext를 순차적으로 호출한 뒤, complete를 호출하는 것과 같다
Mono<List<Data>> 와 Flux<Data> 차이
@GetMapping("/event/{id}")
public Mono<List<Event>> event(@PathVariable long id) {
List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
return Mono.just(list); //Flux와 차이는 무엇일까?
}
@GetMapping("/events")
public Flux<Event> events() {
return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}

- Flux<Data> 와 Mono<List<Data>> 는 같은 동작을 하지만, Flux 타입으로 선언하면 많은 기능을 사용할 수 있다.
- 비슷하게 동작하지만, Mono 같은 경우, 안에 들어 있는 것이 List<Data> 객체이기 때문에, 해당 객체를 통채로 조작해야 하지만, Flux 같은 경우 fromIterable을 사용하면, 해당 스트림에 대한 여러 연산들을 지원하기에 좀 더 다양한 기능을 쓸 수 있다.
- Http 스트림을 지원하는 방식에서는 Flux를 사용하는 것이 훨씬 간단하게 쓸 수 있다.
Http Stream을 사용한 예시(Flux를 사용하는 것이 좋다)
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) //client 요청에 Accept 헤더를 메핑이 1차 목적, 어떤 타입을 리턴할지 결정하는것이 2차 목적
public Flux<Event> events() {
List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
return Flux.fromIterable(list);
}

- MediaType.TEXT_EVENT_STREAM_VALUE은 한 번에 리스트로 값을 전달해 주는 것이 아닌 Stream처럼 일정 시간이 지나고, 값을 전달해 준다.
- 일정 시간이 지나고 값을 전달해주기 때문에, Flux를 사용하기에 최적화가 되어있다.
take()
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) //client 요청에 Accept 헤더를 메핑이 1차 목적, 어떤 타입을 리턴할지 결정하는것이 2차 목적
public Flux<Event> events() {
//generation 한 코드를 작성해 보자
Stream<Event> stream = Stream.generate(() -> new Event(System.currentTimeMillis(), "Value"));//.limit();
return Flux.fromStream(stream) //Stream 된 데이터를 Flux에 넣을 수 있다.
.delayElements(Duration.ofSeconds(1)) //next로 온 데이터에 delay를 걸 수 있다.
.take(10); //데이터를 끊어서 가지고 올 수 있다( 10개의 데이터를 카운팅 하다 cancel 기능을 수행한다)
}



- fromStream 같은 경우 stream을 돌면서 해당 값을 순차적으로 돌 수 있다.
- delayElements 같은 경우 delay를 줄 수 있는데, 해당 delay를 실행하는 스레드는 하나를 사용한다.
- take 같은 경우 파라미터로 넘긴 값까지, 데이터를 넘기다가 cancel()를 실행하여 더 이상 데이터가 전달되지 않도록 한다.
generate() : Flux을 stream 기능
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
return Flux
.<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value"))) // sink = 데이터를 흘려 보내는 역할을 sink에서 한다.
//sink를 넘겨주면 다음 데이터를 실행하는 코드를 작성하면 된다.
.delayElements(Duration.ofSeconds(1))
.take(10);
}

- generate 메서드를 사용하면, stream을 자바에서 만들 필요 없이 flux에서 바로 반환할 수 있다.
- sink를 파라미터로 하는 람다식을 실행하며, sink의 next를 호출하며, 다음 데이터를 실행하는 코드를 작성하면 된다.
- sink는 다음 파라미터를 이어주는 역할을 한다.
generate() : callable, bifunction을 파라미터로 받는 함수
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
return Flux
//.range(1, 10) //1~10까지 변하는 스트림 생성
.<Event, Long>generate(() -> 1L, (id, sink) -> {
sink.next(new Event(id, "value " + id));
return id + 1;
})//take로 뒤에서 끊고 싶을 때 사용
.delayElements(Duration.ofSeconds(1))
.take(10);
}


- 첫 번째 파라미터로 상태 값을 받고(generate 할 때마다 바뀌는 값) Bifuntion에 현재 상태와, 변경될 값을 파라미터로 받아 변경된 상태의 값을 반환하는 함수를 작성할 수 있다.
- Object을 상태를 계속 변경해 가면서, 다음 sink에 변경된 상태를 계속 넘길 수 있다.
Zip(), Interval()
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
Flux<Event> flux = Flux.<Event, Long>generate(() -> 1L, (id, sink) -> {
sink.next(new Event(id, "value " + id));
return id + 1;
});
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); //데이터를 일정한 주기를 가지고, 0부터 값을 전달하는 함수
//두가지 Flux를 병합할 수 있다.
return Flux.zip(flux, interval)//2가지 이상을 작업을 묶어서 동작시킬때 사용한다.
// 첫번째 flux 이벤트에 interval 이벤트 하나씩 묶어서 전달한다.
.map(tu -> tu.getT1());
}


- interval을 사용하게 된다면, 데이터를 일정한 주기를 가지고, 0부터 값을 전달할 수 있다.
- zip을 사용하면, flux끼리 쌍으로 묶어서, 동작시킬 수 있다.
- flux1-interval1 , flux2-interval2... 와 같이 묶여 Flux의 Tuple 타입으로 반환하고, Tuple 값을 map을 통해 꺼내거나, 값을 변경할 수 있다.
- 2개 이상의 작업을 동기화시켜 실행할 때, 자주 사용되는 메서드이다.
Zip() 응용(zip으로 합쳐진 데이터를 통해 Event 생성)
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
Flux<String> flux = Flux.generate((sink) -> sink.next("value"));
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(flux, interval)
.map(tu -> new Event(tu.getT2(), tu.getT1())); //이벤트 조합을 해당 부분에서 사용할 수도 있다.
}
- zip으로 합쳐진 데이터를 통해 Event 생성할 수 있다.
- Zip 결과로 반환된 Tuple에서 map 함수를 통해 Event를 생성해서 반환할 수 있다.
- id 값은 tuple을 2번째, value 값은 tuple을 1번째 에서 꺼내서 값을 넣어준다.
- Zip 같은 경우 여러 API를 Flux로 호출해 비동기로 호출하며, zip으로 이것들의 조합해, 동기화 실행을 할 수 있다.
728x90
'Toby의 ReactiveProgramming' 카테고리의 다른 글
| Spring 6의 새로운 HTTP Interface와 3가지 REST Clients (0) | 2023.06.06 |
|---|---|
| Mono의 동작방식과 block() (0) | 2022.08.28 |
| WebFlux (0) | 2022.08.28 |
| CompletableFuture (0) | 2022.08.27 |
| AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (0) | 2022.08.26 |