카프카/Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 1

wakeup을 이용한 Consumer 효과적 종료

webmaster 2025. 10. 6. 22:31
728x90

실습하기

1. main Thread가 죽었을 때, wakeup() 실행하도록  Hook 추가

//main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.info("main program starts to exit by calling wakeup");
    kafkaConsumer.wakeup();
    try {
        mainThread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
));

 

2. wakeUp Exception Catch 추가

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
        consumerRecords.forEach(record -> {
            logger.info("record key:{}, record value:{}, partition:{}",
                    record.key(), record.value(), record.partition());
        });
    }
} catch (WakeupException e) {
    logger.error("wakeup exception has been called");
} finally {
    logger.info("finally consumer is closing");
    kafkaConsumer.close();
}

 

결과

  • Main Thread 종료 시, 해당 메시지가 뜨면서 우아하게 종료되는 것을 볼 수 있다.
728x90