728x90
Kafka Consumer

- 브로커의 Topic 메시지를 읽는 역할을 수행한다.
- 모든 Consumer들은 고유한 그룹 아이디(group.id)를 가지는 Consumer Group에 소속되어야 한다.
- 개별 Consumer Group 내에서 여러 개의 Consumer 들은 토픽 파티션 별로 분배된다.
Consumer의 Subscribe, Poll, Commit

- Consumer는 subscribe()를 호출하여 읽어 들이려는 토픽을 등록
- Consumer는 poll() 메서드를 이용하여, 주기적으로 브로커의 토픽 파티션에서 메시지를 가지고 온다.
- 메시지를 성공적으로 가져 왔으면, Commit을 통해 __consumer_offset에 다음에 읽을 offset 위치를 기재함
Consumer의 주요 수행

- KafkaConsumer는 Fetcher, ConsumerClientNetwork 등의 주요 내부 객체와 별도의 Heart Beat Thread를 생성한다.
- Fetch, ConsumerClientNetwork 객체는 Broker의 토픽 파티션에서 메시지를 Fetch 및 Poll 수행
- Heart Beat Thread는 Consumer의 정상적인 활동을 Group Coordinator에 보고하는 역할을 수행(Group Coordinator는 주어진 시간 동안 Heart Beat를 받지 못하면, Consumer들의 Rebalance를 수행 명령)
실습하기(Java Consumer Client API 구현하기)

1. Consumer 환경 설정(Properties 객체 이용)
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
2. 환경 설정 값을 반영하여 KafkaConsumer 생성
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
3. 읽어들일 Topic을 subscribe()를 호출하여 설정
kafkaConsumer.subscribe(List.of(topicName));
- 여러개의 Topic을 구독할 수 있다.
4. 지속적으로 poll()을 호출하여 topic의 새로운 메시지를 계속 읽어 들인다.
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
consumerRecords.forEach(record -> {
logger.info("record key:{}, record value:{}, partition:{}",
record.key(), record.value(), record.partition());
});
}
5. kafkaConsumer 객체를 명확하게 Close() 수행
- wake-up을 통해 종료 해야함.. 다음 섹션에 진행
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 1' 카테고리의 다른 글
| Consumer의 auto.offset.reset (0) | 2025.10.06 |
|---|---|
| Consumer의 읽기 Commit 이해 (0) | 2025.10.06 |
| wakeup을 이용한 Consumer 효과적 종료 (0) | 2025.10.06 |
| Consumer Fetcher 관련 주요 파라미터와 메커니즘 이해 (0) | 2025.10.06 |
| KafkaConsumer 주요 구성 요소와 Poll() 메소드 (0) | 2025.10.06 |