728x90
실습하기
1. main Thread가 죽었을 때, wakeup() 실행하도록 Hook 추가
//main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
));
2. wakeUp Exception Catch 추가
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
consumerRecords.forEach(record -> {
logger.info("record key:{}, record value:{}, partition:{}",
record.key(), record.value(), record.partition());
});
}
} catch (WakeupException e) {
logger.error("wakeup exception has been called");
} finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
결과

- Main Thread 종료 시, 해당 메시지가 뜨면서 우아하게 종료되는 것을 볼 수 있다.
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 1' 카테고리의 다른 글
| Consumer의 auto.offset.reset (0) | 2025.10.06 |
|---|---|
| Consumer의 읽기 Commit 이해 (0) | 2025.10.06 |
| Consumer Fetcher 관련 주요 파라미터와 메커니즘 이해 (0) | 2025.10.06 |
| KafkaConsumer 주요 구성 요소와 Poll() 메소드 (0) | 2025.10.06 |
| Consumer 주요 메커니즘 (0) | 2025.10.06 |