카프카/Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1

Producer에서 키 값을 가지는 메시지 전송 구현

webmaster 2025. 9. 27. 01:15
728x90

Key 값을 가지는 메시지 전송

실습하기(Callback 없는 구현)

1. ProducerAsyncWithKey 코드 작성

public class ProducerAsyncWithKey {
    public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncWithKey.class.getName());

    public static void main(String[] args) {
        String topicName = "multipart-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        //ProducerRecord 객체 생성
        for (int seq = 0; seq < 20; ++seq) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, String.valueOf(seq), "hello-world " + seq);

            //KafkaProducer Message Send
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("\n#### record metadata received #### \n" +
                            "partition: {} \n" +
                            "offset: {} \n" +
                            "timestamp: {} \n", metadata.partition(), metadata.offset(), metadata.timestamp());
                } else {
                    logger.error("exception error from broker {}", exception.getMessage());
                }
            });
        }


        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaProducer.close();
    }
}
  • 0~20까지 Key를 생성해서, Producer에 요청을 보낸다.
    • Producer에 요청을 보낼 때, Key값 Serializer를 StringSerializer를 사용했기 때문에, String으로 변환해서 전달해야 한다.

2. Multipart-topic Producer 삭제 후, 생성

kafka-topics --bootstrap-server localhost:9092 --delete --topic multipart-topic
kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3

3.Kafka-console-Consumer를 통해 파티션 값 읽어서 로그 남기기

kafka-console-consumer --bootstrap-server localhost:9092 --grout.value=true --property print.key=true

  • 코드 실행시, Console에 로그가 남는다.

실습하기(Custom Callback)

1. Custom Callback 생성

public class CustomCallback implements Callback {
    public static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());
    private int seq;

    public CustomCallback(int seq){
        this.seq = seq;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            logger.info("seq:{} partition:{} offset:{}", this.seq, metadata.partition(), metadata.offset());
        } else {
            logger.error("exception error from broker {}", exception.getMessage());
        }
    }
}

2. Producer 수정

public class ProducerAsyncCumtomCB {
    public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncCumtomCB.class.getName());

    public static void main(String[] args) {
        String topicName = "multipart-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);

        //ProducerRecord 객체 생성
        for (int seq = 0; seq < 20; ++seq) {
            ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topicName, seq, "hello-world " + seq);

            Callback callback = new CustomCallback(seq);
            //KafkaProducer Message Send
            kafkaProducer.send(producerRecord, callback);
        }


        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaProducer.close();
    }
}

 

3. Console-consumer 실행

kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic  --property print.value=true --property print.key=true --key-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

  • 기본적으로 Consumer는 String Key가 기본값이기 때문에 IntegerDeserializer를 넣어주어야 한다.
728x90