카프카/Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1

Producer의 메시지 동기/비동기 전송 구현

webmaster 2025. 9. 26. 23:33
728x90

Sync(동기 방식)

  • Producer는 브로커로부터 해당 메시지를 성공적으로 받았다는 Ack 메시지를 받은 후 다음 메시지를 전송
  • KafkaProducer.send().get() 호출하여, 브로커로 부터 Ack 메시지를 받을 때까지 대기함

Async(비동기 방식)

  • Producer는 브로커로 부터 해당 메시지를 성공적으로 받았다는 Ack 메시지를 기다리지 않고 전송
  • 브로커로부터 Ack 메시지를 비동기로 Producer에 받기 위해서 Callback을 적용함
  • Send() 메서드 호출 시에 callback 객체를 인자로 입력하여, Ack 메시지를 Producer로 전달받을 수 있음

Producer와 브로커와의 메시지 동기화 코드

//동기화 코드 1
Future<RecordMetaData> future = KafkaProducer.send();
RecordMetaData recordMetadata = future.get();

//동기화 코드 2
RecordMetaData recordMetadata = KafkaProducer.send().get();
  • Future<RecordMetaData> = KafkaProducer.send() 기본적으로 비동기 호출
  • Future객체의 get( )을 호출하여 브로커로부터 메시지 Ack 응답을 받을 때까지 Main Thread를 대기시키는 방식으로 동기화 구성

동기화 실습하기

public class SimpleProducerSync {
    public static final Logger logger = LoggerFactory.getLogger(SimpleProducerSync.class.getName());

    public static void main(String[] args) {
        String topicName = "simple-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        //ProducerRecord 객체 생성
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello-world");

        //KafkaProducer Message Send
        /*
        Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
        RecordMetadata recordMetadata = future.get();
         */
        try {
            RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
            logger.info("\n#### record metadata received #### \n" +
                    "partition: {} \n" +
                    "offset: {} \n" +
                    "timestamp: {} \n"  ,recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }

    }

}

Response

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1758898981376
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition simple-topic-0 to 0 since the associated topicId changed from null to VWy5UJYKTRiNeBpekJXY9g
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: TeGaZWlGTX2IAWzTWxbePA
[main] INFO com.example.kafka.SimpleProducerSync - 
#### record metadata received #### 
partition: 0 
offset: 7 
timestamp: 1758898982545
  • partition과 offset, timestamp를 찍는다.

Callback의 이해

다른 코드의 인수로서, 넘겨주는 실행 가능한 코드이며, 콜백을 넘겨받는 코드는 이 콜백을 필요에 따라 즉시 실행할 수도 있고, 아니면 나중에 실행할 수도 있음(위키 백과)

즉, Callback은 다른 함수의 인자로서 전달된 후에 특정 이벤트가 발생 시 해당 함수에서 다시 호출됨

 

JAVA에서의 Callbak

  • Callback을 인터페이스로 구성하고, 호출되어질 메서드 선언
  • 해당 Callback()을 구현하는 객체 생성, 호출 되어질 메서드를 구체적으로 구현
  • 다른 함수의 인자로 해당 Callback을 인자로 전달
  • 해당 함수는 특정 이벤트 발생 시 Callback에 선언된 메서드를 호출

Callback을 이용한 Producer와 브로커와의 메시지 전송/재전송

  • Client는 Callback과 메시지를 함께 전달한 뒤, 자신의 일을 계속할 수 있다.
  • 브로커로부터, 응답을 받을 때, 오류가 발생하면 이를 재시도 할 수 있는 예외라면 acks 값에 따라 재시도한다(acks = 0: 재시도 안함, acks = all/1: 재시도)

비동기 코드

kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null){
                    logger.info("\n#### record metadata received #### \n" +
                            "partition: {} \n" +
                            "offset: {} \n" +
                            "timestamp: {} \n"  ,metadata.partition(), metadata.offset(), metadata.timestamp());
                }else{
                    exception.printStackTrace();
                }
            }
        });
  • onCompletion은 브로커에서 응답이 올 때 실행된다.
  • 오류 발생 시, exception에 데이터가 채워지고, 정상 응답 시, metadata에 값이 채워진다.

비동기 코드 실습하기

public class SimpleProducerAsync {
    public static final Logger logger = LoggerFactory.getLogger(SimpleProducerAsync.class.getName());

    public static void main(String[] args) {
        String topicName = "simple-topic";

        //KafkaProducer Configuration Setting
        //Null, String
        Properties props = new Properties();

        //Bootstrap.servers, key.serializer.class, value.serializer.class
        //props.setProperty("bootstrap.servers", "192.168.56.101:9092");
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer 객체 생성
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        //ProducerRecord 객체 생성
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello-world");

        //KafkaProducer Message Send
        /*
        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null){
                    logger.info("\n#### record metadata received #### \n" +
                            "partition: {} \n" +
                            "offset: {} \n" +
                            "timestamp: {} \n"  ,metadata.partition(), metadata.offset(), metadata.timestamp());
                }else{
                    logger.error("exception error from broker {}", exception.getMessage());
                }
            }
        });
         */
        kafkaProducer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                logger.info("\n#### record metadata received #### \n" +
                        "partition: {} \n" +
                        "offset: {} \n" +
                        "timestamp: {} \n", metadata.partition(), metadata.offset(), metadata.timestamp());
            } else {
                logger.error("exception error from broker {}", exception.getMessage());
            }
        });
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaProducer.close();
    }
}

Response

  • MainThread 가 비동기 코드를 실행하기 전에 종료될 수도 있기 때문에 Sleep을 준다.
728x90