728x90

실습하기(Callback 없는 구현)
1. ProducerAsyncWithKey 코드 작성
public class ProducerAsyncWithKey {
public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncWithKey.class.getName());
public static void main(String[] args) {
String topicName = "multipart-topic";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer 객체 생성
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
//ProducerRecord 객체 생성
for (int seq = 0; seq < 20; ++seq) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, String.valueOf(seq), "hello-world " + seq);
//KafkaProducer Message Send
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n#### record metadata received #### \n" +
"partition: {} \n" +
"offset: {} \n" +
"timestamp: {} \n", metadata.partition(), metadata.offset(), metadata.timestamp());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
});
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
- 0~20까지 Key를 생성해서, Producer에 요청을 보낸다.
- Producer에 요청을 보낼 때, Key값 Serializer를 StringSerializer를 사용했기 때문에, String으로 변환해서 전달해야 한다.
2. Multipart-topic Producer 삭제 후, 생성
kafka-topics --bootstrap-server localhost:9092 --delete --topic multipart-topic
kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3

3.Kafka-console-Consumer를 통해 파티션 값 읽어서 로그 남기기
kafka-console-consumer --bootstrap-server localhost:9092 --grout.value=true --property print.key=true

- 코드 실행시, Console에 로그가 남는다.
실습하기(Custom Callback)
1. Custom Callback 생성
public class CustomCallback implements Callback {
public static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());
private int seq;
public CustomCallback(int seq){
this.seq = seq;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
logger.info("seq:{} partition:{} offset:{}", this.seq, metadata.partition(), metadata.offset());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
}
}
2. Producer 수정
public class ProducerAsyncCumtomCB {
public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncCumtomCB.class.getName());
public static void main(String[] args) {
String topicName = "multipart-topic";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer 객체 생성
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);
//ProducerRecord 객체 생성
for (int seq = 0; seq < 20; ++seq) {
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topicName, seq, "hello-world " + seq);
Callback callback = new CustomCallback(seq);
//KafkaProducer Message Send
kafkaProducer.send(producerRecord, callback);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
3. Console-consumer 실행
kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic --property print.value=true --property print.key=true --key-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

- 기본적으로 Consumer는 String Key가 기본값이기 때문에 IntegerDeserializer를 넣어주어야 한다.
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1' 카테고리의 다른 글
| 피자 주문 시뮬레이션 Producer 구현 (0) | 2025.09.27 |
|---|---|
| Producer의 메시지 동기/비동기 전송 구현 (0) | 2025.09.26 |
| Kafka Producer의 Send() 와 Producer 메시지 동기화 전송 (0) | 2025.09.23 |
| Simple Producer 구현하기 (0) | 2025.08.15 |
| Java 기반 카프카 클라이언트 구현을 위한 프로젝트 세팅 (0) | 2025.08.15 |