Spring Cloud로 개발하는 MSA/데이터 동기화를 위한 Apache Kafka의 활용

Multi Orders Microservice 사용에 대한 데이터 동기화 문제

webmaster 2022. 2. 8. 14:07
728x90
  • 각각의 MicroService마다 DataBase가 사용되기 때문에 다른 데이터를 가지고 있는 문제가 있다.
  • Kafka Sink Connect를 사용해 단일 DB에 저장하여 데이터를 동기화할 것이다.
  • DB 변경 
    • H2 -> MariaDB
    • table 생성
      • 통합 DB에 접속하여 테이블을 생성한다.
    • orderService의 application.yml 수정
  • OrderService의 Controller 수정
    • OrderService.controller OrderProducer 주입
    • OrderProducer의 send를 호출
  • Schema, Field, Payload 형태의 JSON 형태로 전송해야하기 때문에 클래스로 만들 것이다.
    • KafkaOrderDto.class
      • Connector에 보내는 형식
    • Schema.class
      • 테이블 정보
    • Field.class
      • 테이블에 컬럼 정보
    • payload.class
      • 실제 DB에 저장될 정보
  • OrderProducer 생성
    • 
      @Service
      @Slf4j
      @RequiredArgsConstructor
      public class OrderProducer {
      
          private final KafkaTemplate<String, String> kafkaTemplate;
      
          List<Field> fields = Arrays.asList(
              new Field("string", true, "order_id")
              , new Field("string", true, "user_id")
              , new Field("string", true, "product_id")
              , new Field("int32", true, "qty")
              , new Field("int32", true, "unit_price")
              , new Field("int32", true, "total_price"));
          Schema schema = Schema.builder()
              .type("struct")
              .fields(fields)
              .optional(false)
              .name("orders")
              .build();
      
      
          public OrderDto send(String topic, OrderDto orderDto) {
              //Schema와 Payload 형태로 만들어 주어야한다.
              Payload payload = Payload.builder()
                  .order_id(orderDto.getOrderId())
                  .user_id(orderDto.getUserId())
                  .product_id(orderDto.getProductId())
                  .qty(orderDto.getQty())
                  .unit_price(orderDto.getUnitPrice())
                  .total_price(orderDto.getTotalPrice())
                  .build();
              KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
      
              ObjectMapper mapper = new ObjectMapper();
              String jsonInString = "";
              try {
                  jsonInString = mapper.writeValueAsString(kafkaOrderDto);
              } catch (JsonProcessingException e) {
                  e.printStackTrace();
              }
              kafkaTemplate.send(topic, jsonInString);
              log.info("Order Producer sent data from the Order Microservice : " + kafkaOrderDto);
              return orderDto;
          }
      }
    • payload, field, schema를 생성하고, 이를 kafkaOrderDto에 주입한다.
  • my-order-sink-connect를 생성
    • connector 추가
728x90