카프카/Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1

Simple Producer 구현하기

webmaster 2025. 8. 15. 18:44
728x90

Java Producer Client Api 처리 로직 

  1. Producer 환경 설정(Properties 객체를 이용)
  2. 1번에서 설정한 환경 설정값을 반영하여 KafkaProducer 객체 생성.
  3. 토픽명과 메시지 값(Key, Value)을 입력하여 보낼 메시지인 ProducerRecord 객체 생성
  4. KafkaProducer객체의 send( )메소드를 호출하여 ProducerRecord 전송
  5. KafkaProducer객체의 close( )메소드를 호출하여 종료

실습

1. Topic 생성

kafka-topics --bootstrap-server localhost:9092 --create --topic simple-topic

2. Producer 생성

public class SimpleProducer {
    public static void main(String[] args) {
        String topicName = "simple-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        //ProducerRecord 객체 생성
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello-world");
        
        //KafkaProducer Message Send
        kafkaProducer.send(producerRecord);
        
        kafkaProducer.flush(); //Close시 flush가 자동 호출
        kafkaProducer.close();
    }

}
  • ProducerRecord 생성 시, 첫번째 인자는 무조건 Topic 명이 들어가야 한다.
    • 두 번째는 키가 들어가는데, 생략 시 키가 없는 ProducerRecord가 생성된다.
  • Flush()는 Close 시, 자동으로 호출된다.

3. 확인

kafka-console-consumer --bootstrap-server localhost:9092 --topic simple-topic --from-beginning

728x90