728x90
1) Topic 생성
Topic-p3-t1
kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t1 --partitions 3
Topic-p3-t2
kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t2 --partitions 3
2) ConsumerMTopicRebalance 생성
public static void main(String[] args) {
//String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-mtopic");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));
//main Thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
));
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
consumerRecords.forEach(record -> {
logger.info("topic:{}, record key:{}, partition:{}, record offset:{}, record value:{}",
record.topic(), record.key(), record.partition(), record.offset(), record.value());
});
}
} catch (WakeupException e) {
logger.error("wakeup exception has been called");
} finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}

- topic을 하나가 아닌 생성한 Topic(Topic-p3-t1, Topic-p3-t2)을 List로 전달한다.
3) Console Producer로 메시지 전송
Topic-p3-t1 토픽에 메시지 전송
kafka-console-producer --bootstrap-server localhost:9092 --topic topic-p3-t1
Topic-p3-t2 토픽에 메시지 전송
kafka-console-producer --bootstrap-server localhost:9092 --topic topic-p3-t2

728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 2' 카테고리의 다른 글
| eager, cooperative 프로토콜과 파티션 할당 전략 (0) | 2025.10.19 |
|---|---|
| Consumer의 Heartbeat 스레드와 관련 주요 파라미터 (0) | 2025.10.18 |
| Consumer의 Heartbeat 스레드와 관련 주요 파라미터 (0) | 2025.10.18 |
| Consumer의 Static Group Membership (0) | 2025.10.18 |