카프카/Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 2

Consumer의 Heartbeat 스레드와 관련 주요 파라미터

webmaster 2025. 10. 18. 14:34
728x90

Heart Beat Thread
Heart Beat Thread 동작 과정

  • HeartBeat Thread를 통해서 브로커의 Group Coordinator에 Consumer의 상태를 전송한다.

Heart Beat 관련 주요 파라미터

Consumer 파라미터 명 기본값(ms) 설명
heartbeat.interval.ms 3000 - heart beat thread가 heart beat를 보내는 간격
- session.timout.ms보다 낮게 설정되어야 한다. session.timeout.ms의 1/3 보다 낮게 설정하는 것을 권장한다.
session.timeout.ms 45000 - 브로커가 Consumer로 Heart Beat를 기다리는 최대 시간
- 브로커는 이 시간동안 Heart Beat를 Consumer로 부터 받지 못하면 해당 Consumer를 Group에서 제외하고, Rebalance 지시
max.poll.interval.ms 300000 - 이전 poll() 호출 후 다음 poll()이 호출될 때까지 브로커가 기다리는 시간
- 해당 시간동안 poll()이 호출되지 않으면, 해당 Consumer는 문제있는 것으로 판단하여 브로커가 Rebalance 명령을 지시

 

실습

Property 수정 테스트(Heart beat 주요 파라미터)

Property 수정

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
  • heartbeat.interval.ms = 5000 로 수정
  • session.timeout.ms = 90000로 수정
  • max.poll.interval.ms = 600000 로 수정

session.timeout.ms 파라미터 로그 보기

1) Consumer 실행

2) Consumer 프로세스 종료 후 45초 대기(기본 값)

  • heart beat이 45초 동안 오질 않는다면, Rebalance를 실행해 준다.
  • 보통의 케이스는 Consumer 프로세스가 죽는다면, Heart Beat 스레드도 죽지만, 만약 죽지 않는다면..?
    • poll() 호출 이후 다음 poll()이 호출되지 않기 때문에 해당 Consumer는 300,000 ms(300초) 이후 Rebalance 된다.

max.poll.interval.ms 파라미터 로그 보기

1) ConsumerWakeupV2 생성

public class ConsumerWakeupV2 {
    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeupV2.class.getName());

    public static void main(String[] args) {
        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_02");
        props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(List.of(topicName));

        //main Thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("main program starts to exit by calling wakeup");
            kafkaConsumer.wakeup();
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ));

        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
                logger.info(" ########### loopCnt: {}, consumerRecords count: {}", loopCnt++, consumerRecords.count());
                consumerRecords.forEach(record -> {
                    logger.info("record key:{}, partition:{}, record offset:{}, record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                });
                try {
                    logger.info("main thread is sleeping {} ms during while loop", loopCnt * 10_000);
                    Thread.sleep(loopCnt * 10_000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (WakeupException e) {
            logger.error("wakeup exception has been called");
        } finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }


    }
}
  • max.poll.interval.ms 값을 60초로 변경
  • poll() 호출을 10초 * count(1씩 증가) 마다 호출하여 로그 찍도록 작업

2) Pizza Producer 실행 

3) ConsumerWakeUpV2 실행

60초 이후 로그

브로커 로그

  • 60초 동안 poll()이 호출되지 않았기 때문에 브로커가 Rebalance를 지시한다.
  • Rebalance를 하여도, Consumer가 while문에 의해 다시 poll()을 하기 때문에 해당 Consumer로 다시 호출된다.
728x90