728x90


- HeartBeat Thread를 통해서 브로커의 Group Coordinator에 Consumer의 상태를 전송한다.
Heart Beat 관련 주요 파라미터
| Consumer 파라미터 명 | 기본값(ms) | 설명 |
| heartbeat.interval.ms | 3000 | - heart beat thread가 heart beat를 보내는 간격 - session.timout.ms보다 낮게 설정되어야 한다. session.timeout.ms의 1/3 보다 낮게 설정하는 것을 권장한다. |
| session.timeout.ms | 45000 | - 브로커가 Consumer로 Heart Beat를 기다리는 최대 시간 - 브로커는 이 시간동안 Heart Beat를 Consumer로 부터 받지 못하면 해당 Consumer를 Group에서 제외하고, Rebalance 지시 |
| max.poll.interval.ms | 300000 | - 이전 poll() 호출 후 다음 poll()이 호출될 때까지 브로커가 기다리는 시간 - 해당 시간동안 poll()이 호출되지 않으면, 해당 Consumer는 문제있는 것으로 판단하여 브로커가 Rebalance 명령을 지시 |
실습
Property 수정 테스트(Heart beat 주요 파라미터)
Property 수정
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
- heartbeat.interval.ms = 5000 로 수정
- session.timeout.ms = 90000로 수정
- max.poll.interval.ms = 600000 로 수정
session.timeout.ms 파라미터 로그 보기
1) Consumer 실행
2) Consumer 프로세스 종료 후 45초 대기(기본 값)

- heart beat이 45초 동안 오질 않는다면, Rebalance를 실행해 준다.
- 보통의 케이스는 Consumer 프로세스가 죽는다면, Heart Beat 스레드도 죽지만, 만약 죽지 않는다면..?
- poll() 호출 이후 다음 poll()이 호출되지 않기 때문에 해당 Consumer는 300,000 ms(300초) 이후 Rebalance 된다.
max.poll.interval.ms 파라미터 로그 보기
1) ConsumerWakeupV2 생성
public class ConsumerWakeupV2 {
public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeupV2.class.getName());
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_02");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(List.of(topicName));
//main Thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
));
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
logger.info(" ########### loopCnt: {}, consumerRecords count: {}", loopCnt++, consumerRecords.count());
consumerRecords.forEach(record -> {
logger.info("record key:{}, partition:{}, record offset:{}, record value:{}",
record.key(), record.partition(), record.offset(), record.value());
});
try {
logger.info("main thread is sleeping {} ms during while loop", loopCnt * 10_000);
Thread.sleep(loopCnt * 10_000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (WakeupException e) {
logger.error("wakeup exception has been called");
} finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
}
- max.poll.interval.ms 값을 60초로 변경
- poll() 호출을 10초 * count(1씩 증가) 마다 호출하여 로그 찍도록 작업
2) Pizza Producer 실행
3) ConsumerWakeUpV2 실행
60초 이후 로그


- 60초 동안 poll()이 호출되지 않았기 때문에 브로커가 Rebalance를 지시한다.
- Rebalance를 하여도, Consumer가 while문에 의해 다시 poll()을 하기 때문에 해당 Consumer로 다시 호출된다.
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 2' 카테고리의 다른 글
| eager, cooperative 프로토콜과 파티션 할당 전략 (0) | 2025.10.19 |
|---|---|
| Consumer에서 여러 개의 Topic 읽기 (0) | 2025.10.18 |
| Consumer의 Heartbeat 스레드와 관련 주요 파라미터 (0) | 2025.10.18 |
| Consumer의 Static Group Membership (0) | 2025.10.18 |