실무 프로젝트로 배우는 Kotlin & Spring/스프링 WebFlux 이해하기

스프링 데이터 R2DBC

webmaster 2022. 12. 11. 14:52
728x90

JDBC

String selectSql = "SELECT * FROM employees";
try (ResultSet resultSet = stmt.executeQuery(selectSql)) {
	List<Employee> employees = new ArrayList<>();
    while (resultSet.next()) {
        Employee emp = new Employee();
        emp.setId(resultSet.getInt("emp_id"));
        emp.setName(resultSet.getString("name"));
        emp.setPosition(resultSet.getString("position"));
        emp.setSalary(resultSet.getDouble("salary"));
        employees.add(emp);
    }
}
  • 전통적인 JDBC 드라이버는 하나의 커넥션에 하나의 스레드를 사용하는 Thread Per Connection 방식이다.
  • Thread Per Connection 방식은 데이터베이스로부터 응답을 받기 전까지 스레드는 Blocking 된다.
    • 높은 처리량과 대규모 애플리케이션을 위해서는 비동기/Non-Blocking 데이터베이스 API에 대한 요구가 생긴다.
    • 애플리케이션 로직이 비동기/Non-Blocking이어도 DB 드라이버가 JDBC면 Blocking이 발생하므로 100% 비동기 성능을 내기 힘들다.

R2DBC

connection.createStatement("SELECT * FROM employess")
    .execute()
    .flatMap(r -> r.map((row, metadata) -> {
        Employee emp = new Employee();
        emp.setId(row.get("emp_id", Integer.class));
        emp.setName(row.get("name", String.class));
        emp.setPosition(row.get("position", String.class));
        emp.setSalary(row.get("salary", Double.class));
        return emp;
    }))
    .close()
    .subscribe();
  • R2DBC는 리액티브 기반의 비동기/Non-Blocking 데이터베이스 드라이버이다.
  • 다양한 DB를 지원한다(Oracle, Postgres, H2, MSSQL, Google Spanner, MariaDB)
  • 리액티브 스트림 구현체인 Project Reactor, RxJava 등을 지원한다.

스프링 데이터 R2DBC

  • 스프링 데이터 R2DBC는 R2DBC 기반의 스프링 데이터 프로젝트이다.
  • 스프링 데이터 프로젝트이므로 스프링 애플리케이션에 쉽게 통합할 수 있으며, 스프링 데이터 JPA, 스프링 데이터 몽고 DB 같은 프로젝트처럼 뛰어난 추상화를 제공한다.
  • 스프링 WebFlux와 스프링 데이터 R2DBC를 같이 사용하면, 전 구간 비동기/Non-Blocking 애플리케이션을 구현할 수 있다.
  • 많은 ORM에서 제공하는 LazyLoading, Dirty-Checking, Cache 등을 지원하지는 않아 기능은 적지만, 오히려 심플하게 사용 가능하다.

ReactiveCrudRepository

@NoRepositoryBean
public interface ReactiveCrudRepository<T, ID> extends Repository<T, ID> {

 /**
  * Saves a given entity. Use the returned instance for further operations as the save operation might have changed the
  * entity instance completely.
  *
  * @param entity must not be {@literal null}.
  * @return {@link Mono} emitting the saved entity.
  * @throws IllegalArgumentException in case the given {@literal entity} is {@literal null}.
  * @throws OptimisticLockingFailureException when the entity uses optimistic locking and has a version attribute with
  *           a different value from that found in the persistence store. Also thrown if the entity is assumed to be
  *           present but does not exist in the database.
  */
 <S extends T> Mono<S> save(S entity);

 /**
  * Saves all given entities.
  *
  * @param entities must not be {@literal null}.
  * @return {@link Flux} emitting the saved entities.
  * @throws IllegalArgumentException in case the given {@link Iterable entities} or one of its entities is
  *           {@literal null}.
  * @throws OptimisticLockingFailureException when at least one entity uses optimistic locking and has a version
  *           attribute with a different value from that found in the persistence store. Also thrown if at least one
  *           entity is assumed to be present but does not exist in the database.
  */
 <S extends T> Flux<S> saveAll(Iterable<S> entities);

 /**
  * Saves all given entities.
  *
  * @param entityStream must not be {@literal null}.
  * @return {@link Flux} emitting the saved entities.
  * @throws IllegalArgumentException in case the given {@link Publisher entityStream} is {@literal null}.
  * @throws OptimisticLockingFailureException when at least one entity uses optimistic locking and has a version
  *           attribute with a different value from that found in the persistence store. Also thrown if at least one
  *           entity is assumed to be present but does not exist in the database.
  */
 <S extends T> Flux<S> saveAll(Publisher<S> entityStream);

 /**
  * Retrieves an entity by its id.
  *
  * @param id must not be {@literal null}.
  * @return {@link Mono} emitting the entity with the given id or {@link Mono#empty()} if none found.
  * @throws IllegalArgumentException in case the given {@literal id} is {@literal null}.
  */
 Mono<T> findById(ID id);

 /**
  * Retrieves an entity by its id supplied by a {@link Publisher}.
  *
  * @param id must not be {@literal null}. Uses the first emitted element to perform the find-query.
  * @return {@link Mono} emitting the entity with the given id or {@link Mono#empty()} if none found.
  * @throws IllegalArgumentException in case the given {@link Publisher id} is {@literal null}.
  */
 Mono<T> findById(Publisher<ID> id);

 /**
  * Returns whether an entity with the given {@literal id} exists.
  *
  * @param id must not be {@literal null}.
  * @return {@link Mono} emitting {@literal true} if an entity with the given id exists, {@literal false} otherwise.
  * @throws IllegalArgumentException in case the given {@literal id} is {@literal null}.
  */
 Mono<Boolean> existsById(ID id);

 /**
  * Returns whether an entity with the given id, supplied by a {@link Publisher}, exists. Uses the first emitted
  * element to perform the exists-query.
  *
  * @param id must not be {@literal null}.
  * @return {@link Mono} emitting {@literal true} if an entity with the given id exists, {@literal false} otherwise.
  * @throws IllegalArgumentException in case the given {@link Publisher id} is {@literal null}.
  */
 Mono<Boolean> existsById(Publisher<ID> id);

 /**
  * Returns all instances of the type.
  *
  * @return {@link Flux} emitting all entities.
  */
 Flux<T> findAll();

 /**
  * Returns all instances of the type {@code T} with the given IDs.
  * <p>
  * If some or all ids are not found, no entities are returned for these IDs.
  * <p>
  * Note that the order of elements in the result is not guaranteed.
  *
  * @param ids must not be {@literal null} nor contain any {@literal null} values.
  * @return {@link Flux} emitting the found entities. The size can be equal or less than the number of given
  *         {@literal ids}.
  * @throws IllegalArgumentException in case the given {@link Iterable ids} or one of its items is {@literal null}.
  */
 Flux<T> findAllById(Iterable<ID> ids);

 /**
  * Returns all instances of the type {@code T} with the given IDs supplied by a {@link Publisher}.
  * <p>
  * If some or all ids are not found, no entities are returned for these IDs.
  * <p>
  * Note that the order of elements in the result is not guaranteed.
  *
  * @param idStream must not be {@literal null}.
  * @return {@link Flux} emitting the found entities.
  * @throws IllegalArgumentException in case the given {@link Publisher idStream} is {@literal null}.
  */
 Flux<T> findAllById(Publisher<ID> idStream);

 /**
  * Returns the number of entities available.
  *
  * @return {@link Mono} emitting the number of entities.
  */
 Mono<Long> count();

 /**
  * Deletes the entity with the given id.
  * <p>
  * If the entity is not found in the persistence store it is silently ignored.
  *
  * @param id must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given {@literal id} is {@literal null}.
  */
 Mono<Void> deleteById(ID id);

 /**
  * Deletes the entity with the given id supplied by a {@link Publisher}.
  * <p>
  * If the entity is not found in the persistence store it is silently ignored.
  *
  * @param id must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given {@link Publisher id} is {@literal null}.
  */
 Mono<Void> deleteById(Publisher<ID> id);

 /**
  * Deletes a given entity.
  *
  * @param entity must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given entity is {@literal null}.
  * @throws OptimisticLockingFailureException when the entity uses optimistic locking and has a version attribute with
  *           a different value from that found in the persistence store. Also thrown if the entity is assumed to be
  *           present but does not exist in the database.
  */
 Mono<Void> delete(T entity);

 /**
  * Deletes all instances of the type {@code T} with the given IDs.
  * <p>
  * Entities that aren't found in the persistence store are silently ignored.
  *
  * @param ids must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given {@literal ids} or one of its elements is {@literal null}.
  *           {@literal null}.
  * @since 2.5
  */
 Mono<Void> deleteAllById(Iterable<? extends ID> ids);

 /**
  * Deletes the given entities.
  *
  * @param entities must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given {@link Iterable entities} or one of its entities is
  *           {@literal null}.
  * @throws OptimisticLockingFailureException when at least one entity uses optimistic locking and has a version
  *           attribute with a different value from that found in the persistence store. Also thrown if at least one
  *           entity is assumed to be present but does not exist in the database.
  */
 Mono<Void> deleteAll(Iterable<? extends T> entities);

 /**
  * Deletes the given entities supplied by a {@link Publisher}.
  *
  * @param entityStream must not be {@literal null}.
  * @return {@link Mono} signaling when operation has completed.
  * @throws IllegalArgumentException in case the given {@link Publisher entityStream} is {@literal null}.
  * @throws OptimisticLockingFailureException when at least one entity uses optimistic locking and has a version
  *           attribute with a different value from that found in the persistence store. Also thrown if at least one
  *           entity is assumed to be present but does not exist in the database.
  */
 Mono<Void> deleteAll(Publisher<? extends T> entityStream);

 /**
  * Deletes all entities managed by the repository.
  *
  * @return {@link Mono} signaling when operation has completed.
  */
 Mono<Void> deleteAll();
}
  • 모든 반환 타입이 Mono/Flux 같은 리액터의 Publisher인 것을 확인할 수 있다.
  • ReactiveCrudRepository 같은 경우 리액티브를 지원하는 CRUD 인터페이스이다.

예제

Schema 등록을 위한 초기 설정

scripts/schema.sql

DROP TABLE IF EXISTS book;
CREATE TABLE book
(
    id bigint not null  AUTO_INCREMENT,
    name varchar(50),
    price int,
    primary key (id)
)

SpringBootApplication(빈 등록)

@SpringBootApplication
class SpringwebfluxApplication{
   @Bean
   fun init(connectionFactory: ConnectionFactory) =
      ConnectionFactoryInitializer().apply {
         setConnectionFactory(connectionFactory)
         setDatabasePopulator(ResourceDatabasePopulator(ClassPathResource("scripts/schema.sql")))
      }
}

fun main(args: Array<String>) {
   runApplication<SpringwebfluxApplication>(*args)
}

Repository

interface BookRepository : ReactiveCrudRepository<Book, Long> {
    fun findByName(name: String): Mono<Book>
}

Controller

@RestController
@RequestMapping("/book2s")
class BookController(
    private val bookRepository: BookRepository,
) {

    @GetMapping("{name}")
    fun getByName(@PathVariable name: String): Mono<Book>{
        return bookRepository.findByName(name)
    }

    @PostMapping("")
    fun create(@RequestBody map: Map<String, Any>): Mono<Book>{
        val book = Book(
            name = map["name"].toString(),
            price = map["price"] as Int,
        )
        return bookRepository.save(book)
    }
}
  • scripts/schema.sql 파일을 읽기 위해 Springboot를 실행하는 곳에서 init 빈을 정의하였다
  • Repository에서 ReactiveCrudRepository를 구현하며, 스프링 데이터 R2DBC의 기능을 사용하였다.

 

728x90