카프카/Java 기반 카프카 클라이언트 구현, Consumer 내부 메커니즘 2

Consumer에서 여러 개의 Topic 읽기

webmaster 2025. 10. 18. 15:24
728x90

1) Topic 생성

Topic-p3-t1

kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t1 --partitions 3

Topic-p3-t2

kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t2 --partitions 3

 

2) ConsumerMTopicRebalance 생성

 public static void main(String[] args) {
        //String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-mtopic");


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));

        //main Thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 kafkaconsumer wakeup() 메서드 호출
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("main program starts to exit by calling wakeup");
            kafkaConsumer.wakeup();
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));//1초 동안 대기
                consumerRecords.forEach(record -> {
                    logger.info("topic:{}, record key:{}, partition:{}, record offset:{}, record value:{}",
                            record.topic(), record.key(), record.partition(), record.offset(), record.value());
                });
            }
        } catch (WakeupException e) {
            logger.error("wakeup exception has been called");
        } finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }


    }

Topic 등록 로그

  • topic을 하나가 아닌 생성한 Topic(Topic-p3-t1, Topic-p3-t2)을 List로 전달한다.

3) Console Producer로 메시지 전송

Topic-p3-t1 토픽에 메시지 전송

kafka-console-producer --bootstrap-server localhost:9092 --topic topic-p3-t1

Topic-p3-t2 토픽에 메시지 전송

kafka-console-producer --bootstrap-server localhost:9092 --topic topic-p3-t2

결과 로그

728x90