728x90
- 각각의 MicroService마다 DataBase가 사용되기 때문에 다른 데이터를 가지고 있는 문제가 있다.
- Kafka Sink Connect를 사용해 단일 DB에 저장하여 데이터를 동기화할 것이다.
- DB 변경
- H2 -> MariaDB
- table 생성

통합 DB에 접속하여 테이블을 생성한다.

orderService의 application.yml 수정
- OrderService의 Controller 수정

OrderService.controller OrderProducer 주입 
OrderProducer의 send를 호출
- Schema, Field, Payload 형태의 JSON 형태로 전송해야하기 때문에 클래스로 만들 것이다.
- KafkaOrderDto.class

Connector에 보내는 형식
- Schema.class

테이블 정보
- Field.class

테이블에 컬럼 정보
- payload.class

실제 DB에 저장될 정보
- KafkaOrderDto.class
- OrderProducer 생성
-
@Service @Slf4j @RequiredArgsConstructor public class OrderProducer { private final KafkaTemplate<String, String> kafkaTemplate; List<Field> fields = Arrays.asList( new Field("string", true, "order_id") , new Field("string", true, "user_id") , new Field("string", true, "product_id") , new Field("int32", true, "qty") , new Field("int32", true, "unit_price") , new Field("int32", true, "total_price")); Schema schema = Schema.builder() .type("struct") .fields(fields) .optional(false) .name("orders") .build(); public OrderDto send(String topic, OrderDto orderDto) { //Schema와 Payload 형태로 만들어 주어야한다. Payload payload = Payload.builder() .order_id(orderDto.getOrderId()) .user_id(orderDto.getUserId()) .product_id(orderDto.getProductId()) .qty(orderDto.getQty()) .unit_price(orderDto.getUnitPrice()) .total_price(orderDto.getTotalPrice()) .build(); KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload); ObjectMapper mapper = new ObjectMapper(); String jsonInString = ""; try { jsonInString = mapper.writeValueAsString(kafkaOrderDto); } catch (JsonProcessingException e) { e.printStackTrace(); } kafkaTemplate.send(topic, jsonInString); log.info("Order Producer sent data from the Order Microservice : " + kafkaOrderDto); return orderDto; } } - payload, field, schema를 생성하고, 이를 kafkaOrderDto에 주입한다.
-
- my-order-sink-connect를 생성

connector 추가
728x90
'Spring Cloud로 개발하는 MSA > 데이터 동기화를 위한 Apache Kafka의 활용' 카테고리의 다른 글
| Kafka Topic의 적용 (0) | 2022.02.08 |
|---|---|
| Kafka Sink Connect 사용 (0) | 2022.02.07 |
| Kafka Source Connect 사용 (0) | 2022.02.07 |
| Apache Kafka 사용 - Kafka Connect 설치하기 (0) | 2022.02.07 |
| Apache Kafka 사용 - Producer/Consumer (0) | 2022.02.07 |