728x90
DB Reader
ItemReaders and ItemWriters
So far, only delimited files have been discussed in much detail. However, they represent only half of the file reading picture. Many organizations that use flat files use fixed length formats. An example fixed length file follows: The following example sho
docs.spring.io
DB Writer
ItemReaders and ItemWriters
So far, only delimited files have been discussed in much detail. However, they represent only half of the file reading picture. Many organizations that use flat files use fixed length formats. An example fixed length file follows: The following example sho
docs.spring.io
초기 설정
SQL
-- 주문 테이블 생성
CREATE TABLE `spring_batch`.`orders`
(
`id` INT NOT NULL AUTO_INCREMENT,
`order_item` VARCHAR(45) NULL,
`price` INT NULL,
`order_date` DATE NULL,
PRIMARY KEY (`id`)
)
;
-- 정산 테이블 생성
CREATE TABLE `spring_batch`.`accounts`
(
`id` INT NOT NULL AUTO_INCREMENT,
`order_item` VARCHAR(45) NULL,
`price` INT NULL,
`order_date` DATE NULL,
`account_date` DATE NULL,
PRIMARY KEY (`id`)
)
;
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('카카오 선물', 15000, '2022-03-01');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('배달주문', 18000, '2022-03-01');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('교보문고', 14000, '2022-03-02');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('아이스크림', 3800, '2022-03-03');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('치킨', 21000, '2022-03-04');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('커피', 4000, '2022-03-04');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('교보문고', 13800, '2022-03-05');
INSERT INTO spring_batch.orders(`order_item`, `price`, `order_date`)
values ('카카오 선물', 5500, '2022-03-06');
select *
from spring_batch.orders
;
select *
from spring_batch.accounts
;
- 해당 SQL을 선반영하여, orders, accounts 테이블을 생성 후 orders 테이블의 데이터를 미리 채운다.
- accounts_date를 제외한 데이터 외에는 모두 같다.
Orders
@Getter
@ToString
@Entity
public class Orders {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String orderItem;
private Integer price;
private Date orderDate;
}
OrdersRepository
public interface OrdersRepository extends JpaRepository<Orders, Integer> {
}
Accounts
@Getter
@ToString
@Entity
public class Accounts {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String orderItem;
private Integer price;
private Date orderDate;
private Date accountDate;
}
AccountsRepository
public interface AccountsRepository extends JpaRepository<Accounts, Integer> {
}
- Orders, Accounts 엔티티를 생성하고, JpaRepository를 만들어 준다.
- 초기 설정 완료
데이터 이관(Writer를 사용하지 않고 출력만 하기)
/**
* desc: 주문 테이블 -> 정산 테이블 데이터 이관
* run: --spring.batch.job.name=trMigrationJob
*/
@Configuration
@RequiredArgsConstructor
public class TrMigrationConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final OrdersRepository ordersRepository;
private final AccountsRepository accountsRepository;
@Bean
public Job trMigrationJob(Step trMigrationStep) {
return jobBuilderFactory.get("trMigrationJob")
.incrementer(new RunIdIncrementer())
.start(trMigrationStep)
.build();
}
@Bean
@JobScope
public Step trMigrationStep(ItemReader trOrdersReader) {
return stepBuilderFactory.get("trMigrationStep")
.<Orders, Orders>chunk(5) //어떤 데이터를 읽어와서 어떤 데이터로 쓸지 작성하고, 몇개의 데이터 단위로 커밋할 지 적어준다.
//<읽어올 객체, 쓸 객체>(단위)
.reader(trOrdersReader)
.writer(new ItemWriter() {
@Override
public void write(List items) throws Exception {
items.forEach(System.out::println); //지금은 단순히 쓰는 작업만 진행
}
}
)
.build();
}
@Bean
@StepScope
public RepositoryItemReader<Orders> trOrdersReader() {
return new RepositoryItemReaderBuilder<Orders>()
.name("trOrdersReader")
.repository(ordersRepository) //repository와
.methodName("findAll") //메서드만 맞게 작성해준면 된다.
.pageSize(5) //보통 chunk size와 동일하게 일어온다.
.arguments(Arrays.asList())
.sorts(Collections.singletonMap("id", Direction.ASC)) //오름 차순 정렬
.build();
}
}
- <Orders, Orders>chunk(5)
- <읽어올 객체, 쓸 객체>chunk(커밋 크기)
- 읽어올 객체(Reader), 쓸 객체(Writer), 커밋 크기(size)를 작성한다.
- reader(trOrdersReader)
- repository에 실제 주입받은 repository를, method에 실행시킬 메서드를 작성하면, 해당 repository의 메서드를 실행한다.
- PageSize를 통해 읽어올 데이터를 크기를 지정할 수 있으며, 보통 chunk 사이즈와 동일하게 한다.
- repository 메서드에 추가되어야 하는 파라미터가 있다면 arguments에 추가해 주면 된다.
- sorts를 통해 정렬된 상태로 읽어올 수 있으며, Map 형식으로 정렬 조건을 입력받는다 -> 여기서는 id를 오름차순 정렬
- writer
- 현재 writer 같은 경우에는 콘솔에 출력만 하도록 작성하였다.
- 실행 결과 데이터를 5개를 읽어온 것을 알 수 있다.
데이터 이관(Writer를 통한 DB 쓰기 작업)
/**
* desc: 주문 테이블 -> 정산 테이블 데이터 이관
* run: --spring.batch.job.name=trMigrationJob
*/
@Configuration
@RequiredArgsConstructor
public class TrMigrationConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final OrdersRepository ordersRepository;
private final AccountsRepository accountsRepository;
@Bean
public Job trMigrationJob(Step trMigrationStep) {
return jobBuilderFactory.get("trMigrationJob")
.incrementer(new RunIdIncrementer())
.start(trMigrationStep)
.build();
}
@Bean
@JobScope
public Step trMigrationStep(ItemReader trOrdersReader, ItemProcessor trOrderProcessor, ItemWriter trOrderWriter) {
return stepBuilderFactory.get("trMigrationStep")
//.<Orders, Orders>chunk(5) //어떤 데이터를 읽어와서 어떤 데이터로 쓸지 작성하고, 몇개의 데이터 단위로 커밋할 지 적어준다.
//<읽어올 객체, 쓸 객체>(단위)
.<Orders, Accounts>chunk(5)
.reader(trOrdersReader)
/*
.writer(new ItemWriter() {
@Override
public void write(List items) throws Exception {
items.forEach(System.out::println); //지금은 단순히 쓰는 작업만 진행
}
}
)
*/
.processor(trOrderProcessor)
.writer(trOrderWriter)
.build();
}
@Bean
@StepScope
public RepositoryItemReader<Orders> trOrdersReader() {
return new RepositoryItemReaderBuilder<Orders>()
.name("trOrdersReader")
.repository(ordersRepository) //repository와
.methodName("findAll") //메서드만 맞게 작성해준면 된다.
.pageSize(5) //보통 chunk size와 동일하게 일어온다.
.arguments(Arrays.asList())
.sorts(Collections.singletonMap("id", Direction.ASC)) //오름 차순 정렬
.build();
}
//ItemProcessor 작성
@StepScope
@Bean
public ItemProcessor<Orders, Accounts> trOrderProcessor(){
return new ItemProcessor<Orders, Accounts>(){
@Override
public Accounts process(Orders item) throws Exception {
return new Accounts(item); //실무에서는 더 복잡한 코드 실행
}
};
}
@Bean
@StepScope
public RepositoryItemWriter<Accounts> trOrderWriter(){
return new RepositoryItemWriterBuilder<Accounts>()
.repository(accountsRepository) //Repository
.methodName("save") //실행할 메서드
.build();
}
//직접 구현할수도 있다.
/*
@StepScope
@Bean
public ItemWriter<Accounts> trOrderWriter(){
return new ItemWriter<Accounts>() {
@Override
public void write(List<? extends Accounts> items) throws Exception {
items.forEach(accountsRepository::save);
}
};
}
*/
}
Accounts
@Getter
@ToString
@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Accounts {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String orderItem;
private Integer price;
private Date orderDate;
private Date accountDate;
public Accounts(Orders orders) {
this.id = orders.getId();
this.orderItem = orders.getOrderItem();
this.price = orders.getPrice();
this.orderDate = orders.getOrderDate();
this.accountDate = new Date();
}
}
- Processor를 생성하여, 동작시킬 비즈니스 로직을 작성한다.
- 현재는 간단하게 생성자를 통해 Orders -> Accounts로 변경할 뿐이지만 실무에서는 더 복잡한 로직이 들어간다.
- Accounts에 Orders를 인자로 받는 생성자를 추가하고, 기본 생성자를 추가해 주는 @NoArgsConstructor 어노테이션을 추가한다(JPA 기본 스펙)
- Writer를 사용하여, DB에 데이터를 Write 하는 작업을 진행한다.
- RepositoryItemWriter를 통해 실행할 Repository의 Method만 작성해 주면, 알아서 실행해 준다.
- 직접 ItemWriter를 반환하도록 작성할 수도 있다.
- 여기서는 ItemWriter에 write 메서드를 오버라이드하여, items의 원소 하나씩을 accountsRepository에 save 한다.
728x90
'예제로 배우는 핵심 Spring Batch' 카테고리의 다른 글
| Ch07. 여러개의 step 구동 및 실행 상태에 따른 분기처리 (0) | 2023.06.10 |
|---|---|
| Ch06. 배치 작업의 기본, 파일 읽기와 쓰기 (0) | 2023.06.09 |
| Ch04. 배치 작업 실행 전, 후 로그 추가를 위한 리스너 (0) | 2023.06.07 |
| Ch03. 배치 실행 시 파라미터 (파일 이름) 받기 및 (csv)검증 (0) | 2023.06.07 |
| Ch02. 일단 실행 - Hello, World (스프링 배치 구조 익히기) (0) | 2023.05.22 |