728x90
Sync(동기 방식)

- Producer는 브로커로부터 해당 메시지를 성공적으로 받았다는 Ack 메시지를 받은 후 다음 메시지를 전송
- KafkaProducer.send().get() 호출하여, 브로커로 부터 Ack 메시지를 받을 때까지 대기함
Async(비동기 방식)

- Producer는 브로커로 부터 해당 메시지를 성공적으로 받았다는 Ack 메시지를 기다리지 않고 전송
- 브로커로부터 Ack 메시지를 비동기로 Producer에 받기 위해서 Callback을 적용함
- Send() 메서드 호출 시에 callback 객체를 인자로 입력하여, Ack 메시지를 Producer로 전달받을 수 있음
Producer와 브로커와의 메시지 동기화 코드
//동기화 코드 1
Future<RecordMetaData> future = KafkaProducer.send();
RecordMetaData recordMetadata = future.get();
//동기화 코드 2
RecordMetaData recordMetadata = KafkaProducer.send().get();
- Future<RecordMetaData> = KafkaProducer.send() 기본적으로 비동기 호출
- Future객체의 get( )을 호출하여 브로커로부터 메시지 Ack 응답을 받을 때까지 Main Thread를 대기시키는 방식으로 동기화 구성
동기화 실습하기
public class SimpleProducerSync {
public static final Logger logger = LoggerFactory.getLogger(SimpleProducerSync.class.getName());
public static void main(String[] args) {
String topicName = "simple-topic";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer 객체 생성
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
//ProducerRecord 객체 생성
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello-world");
//KafkaProducer Message Send
/*
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = future.get();
*/
try {
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
logger.info("\n#### record metadata received #### \n" +
"partition: {} \n" +
"offset: {} \n" +
"timestamp: {} \n" ,recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
Response
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1758898981376
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition simple-topic-0 to 0 since the associated topicId changed from null to VWy5UJYKTRiNeBpekJXY9g
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: TeGaZWlGTX2IAWzTWxbePA
[main] INFO com.example.kafka.SimpleProducerSync -
#### record metadata received ####
partition: 0
offset: 7
timestamp: 1758898982545
- partition과 offset, timestamp를 찍는다.
Callback의 이해
다른 코드의 인수로서, 넘겨주는 실행 가능한 코드이며, 콜백을 넘겨받는 코드는 이 콜백을 필요에 따라 즉시 실행할 수도 있고, 아니면 나중에 실행할 수도 있음(위키 백과)
즉, Callback은 다른 함수의 인자로서 전달된 후에 특정 이벤트가 발생 시 해당 함수에서 다시 호출됨
JAVA에서의 Callbak
- Callback을 인터페이스로 구성하고, 호출되어질 메서드 선언
- 해당 Callback()을 구현하는 객체 생성, 호출 되어질 메서드를 구체적으로 구현
- 다른 함수의 인자로 해당 Callback을 인자로 전달
- 해당 함수는 특정 이벤트 발생 시 Callback에 선언된 메서드를 호출
Callback을 이용한 Producer와 브로커와의 메시지 전송/재전송

- Client는 Callback과 메시지를 함께 전달한 뒤, 자신의 일을 계속할 수 있다.
- 브로커로부터, 응답을 받을 때, 오류가 발생하면 이를 재시도 할 수 있는 예외라면 acks 값에 따라 재시도한다(acks = 0: 재시도 안함, acks = all/1: 재시도)
비동기 코드
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
logger.info("\n#### record metadata received #### \n" +
"partition: {} \n" +
"offset: {} \n" +
"timestamp: {} \n" ,metadata.partition(), metadata.offset(), metadata.timestamp());
}else{
exception.printStackTrace();
}
}
});
- onCompletion은 브로커에서 응답이 올 때 실행된다.
- 오류 발생 시, exception에 데이터가 채워지고, 정상 응답 시, metadata에 값이 채워진다.
비동기 코드 실습하기
public class SimpleProducerAsync {
public static final Logger logger = LoggerFactory.getLogger(SimpleProducerAsync.class.getName());
public static void main(String[] args) {
String topicName = "simple-topic";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer 객체 생성
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
//ProducerRecord 객체 생성
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello-world");
//KafkaProducer Message Send
/*
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
logger.info("\n#### record metadata received #### \n" +
"partition: {} \n" +
"offset: {} \n" +
"timestamp: {} \n" ,metadata.partition(), metadata.offset(), metadata.timestamp());
}else{
logger.error("exception error from broker {}", exception.getMessage());
}
}
});
*/
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n#### record metadata received #### \n" +
"partition: {} \n" +
"offset: {} \n" +
"timestamp: {} \n", metadata.partition(), metadata.offset(), metadata.timestamp());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
Response

- MainThread 가 비동기 코드를 실행하기 전에 종료될 수도 있기 때문에 Sleep을 준다.
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1' 카테고리의 다른 글
| 피자 주문 시뮬레이션 Producer 구현 (0) | 2025.09.27 |
|---|---|
| Producer에서 키 값을 가지는 메시지 전송 구현 (0) | 2025.09.27 |
| Kafka Producer의 Send() 와 Producer 메시지 동기화 전송 (0) | 2025.09.23 |
| Simple Producer 구현하기 (0) | 2025.08.15 |
| Java 기반 카프카 클라이언트 구현을 위한 프로젝트 세팅 (0) | 2025.08.15 |