카프카/Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1

피자 주문 시뮬레이션 Producer 구현

webmaster 2025. 9. 27. 10:12
728x90

준비하기

Java Faker Import

implementation 'com.github.javafaker:javafaker:1.0.2'

https://mvnrepository.com/artifact/com.github.javafaker/javafaker/1.0.2

 

Pizza Message Source 코드 복사하기

public class PizzaMessage {
    // 피자 메뉴를 설정. getRandomValueFromList()에서 임의의 피자명을 출력하는 데 사용.
    private static final List<String> pizzaNames = List.of("Potato Pizza", "Cheese Pizza",
            "Cheese Garlic Pizza", "Super Supreme", "Peperoni");
//    private static final List<String> pizzaNames = List.of("고구마 피자", "치즈 피자",
//            "치즈 갈릭 피자", "슈퍼 슈프림", "페페로니 피자");

    // 피자 가게명을 설정. getRandomValueFromList()에서 임의의 피자 가게명을 출력하는데 사용.
    private static final List<String> pizzaShop = List.of("A001", "B001", "C001",
            "D001", "E001", "F001", "G001", "H001", "I001", "J001", "K001", "L001", "M001", "N001",
            "O001", "P001", "Q001");

    public PizzaMessage() {}

    //인자로 피자명 또는 피자가게 List와 Random 객체를 입력 받아서 random한 피자명 또는 피자 가게 명을 반환.
    private String getRandomValueFromList(List<String> list, Random random) {
        int size = list.size();
        int index = random.nextInt(size);

        return list.get(index);
    }

    //random한 피자 메시지를 생성하고, 피자가게 명을 key로 나머지 정보를 value로 하여 Hashmap을 생성하여 반환.
    public HashMap<String, String> produce_msg(Faker faker, Random random, int id) {

        String shopId = getRandomValueFromList(pizzaShop, random);
        String pizzaName = getRandomValueFromList(pizzaNames, random);

        String ordId = "ord"+id;
        String customerName = faker.name().fullName();
        String phoneNumber = faker.phoneNumber().phoneNumber();
        String address = faker.address().streetAddress();
        LocalDateTime now = LocalDateTime.now();
        String message = String.format("order_id:%s, shop:%s, pizza_name:%s, customer_name:%s, phone_number:%s, address:%s, time:%s"
                , ordId, shopId, pizzaName, customerName, phoneNumber, address
                , now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.KOREAN)));
        //System.out.println(message);
        HashMap<String, String> messageMap = new HashMap<>();
        messageMap.put("key", shopId);
        messageMap.put("message", message);

        return messageMap;
    }

    public static void main(String[] args) {
        PizzaMessage pizzaMessage = new PizzaMessage();
        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        for(int i=0; i < 60; i++) {
            HashMap<String, String> message = pizzaMessage.produce_msg(faker, random, i);
            System.out.println("key:"+ message.get("key") + " message:" + message.get("message"));
        }

    }
}

https://github.com/chulminkw/KafkaProj-01/blob/AfterSerDe/producers/src/main/java/com/example/kafka/PizzaMessage.java

 

KafkaProj-01/producers/src/main/java/com/example/kafka/PizzaMessage.java at AfterSerDe · chulminkw/KafkaProj-01

Contribute to chulminkw/KafkaProj-01 development by creating an account on GitHub.

github.com

 

실습하기

1. Topic 생성

kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic --partitions 3

2. Consumer 생성

kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic pizza-topic \
> --property print.key=true --property print.value=true \
> --property print.partition=true

 

3. Pizza Producer 작성

public class PizzaProducer {
    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());

    public static void sendPizzaMessage(
            KafkaProducer<String, String> kafkaProducer,
            String topicName,
            int iterCount, //몇번 돌릴지
            int interIntervalMillis, //몇번 쉴지
            int intervalMillis, //몇번마다 얼마나 쉴지
            int intervalCount, //몇번마다 쉴지
            boolean sync
    ) {
        PizzaMessage pizzaMessage = new PizzaMessage();

        int iterSeq = 0;
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq++ != iterCount) { //iterCount == -1 무한 루프
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
            sendMessage(kafkaProducer, producerRecord, pMessage, sync);
            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) { //중간 중간 쉰다.
                try {
                    logger.info("############ IntervalCount: {} intervalMillis: {}", intervalCount, intervalMillis);
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: {}", interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }
    }

    public static void sendMessage(
            KafkaProducer<String, String> kafkaProducer,
            ProducerRecord<String, String> producerRecord,
            HashMap<String, String> pMessage,
            boolean sync
    ) {
        if (!sync) {
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: {} partition:{} offset:{}", pMessage.get("key"), metadata.partition(), metadata.offset());
                } else {
                    logger.error("exception error from broker {}", exception.getMessage());
                }
            });
        } else {
            try {
                RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
                logger.info("sync message: {} partition:{} offset:{}", pMessage.get("key"), recordMetadata.partition(), recordMetadata.offset());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }

        }
    }

    public static void main(String[] args) {
        String topicName = "pizza-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 10, 100, 100, true);

        kafkaProducer.close();
    }
}
  • sync 메시지 전송과 Async 메시지 전송을 파라미터마다 다르게 호출한다.
  • iterCount가 -1이면 계속 호출한다.
  • intervalCount가 0보다 클 경우에는 IntervalCount 횟수마다, IntervalMillis 만큼 쉰다.
  • InterIntervalMillis 만큼 매 호출마다, 쉰다.
728x90