728x90
준비하기
Java Faker Import
implementation 'com.github.javafaker:javafaker:1.0.2'
https://mvnrepository.com/artifact/com.github.javafaker/javafaker/1.0.2
Pizza Message Source 코드 복사하기
public class PizzaMessage {
// 피자 메뉴를 설정. getRandomValueFromList()에서 임의의 피자명을 출력하는 데 사용.
private static final List<String> pizzaNames = List.of("Potato Pizza", "Cheese Pizza",
"Cheese Garlic Pizza", "Super Supreme", "Peperoni");
// private static final List<String> pizzaNames = List.of("고구마 피자", "치즈 피자",
// "치즈 갈릭 피자", "슈퍼 슈프림", "페페로니 피자");
// 피자 가게명을 설정. getRandomValueFromList()에서 임의의 피자 가게명을 출력하는데 사용.
private static final List<String> pizzaShop = List.of("A001", "B001", "C001",
"D001", "E001", "F001", "G001", "H001", "I001", "J001", "K001", "L001", "M001", "N001",
"O001", "P001", "Q001");
public PizzaMessage() {}
//인자로 피자명 또는 피자가게 List와 Random 객체를 입력 받아서 random한 피자명 또는 피자 가게 명을 반환.
private String getRandomValueFromList(List<String> list, Random random) {
int size = list.size();
int index = random.nextInt(size);
return list.get(index);
}
//random한 피자 메시지를 생성하고, 피자가게 명을 key로 나머지 정보를 value로 하여 Hashmap을 생성하여 반환.
public HashMap<String, String> produce_msg(Faker faker, Random random, int id) {
String shopId = getRandomValueFromList(pizzaShop, random);
String pizzaName = getRandomValueFromList(pizzaNames, random);
String ordId = "ord"+id;
String customerName = faker.name().fullName();
String phoneNumber = faker.phoneNumber().phoneNumber();
String address = faker.address().streetAddress();
LocalDateTime now = LocalDateTime.now();
String message = String.format("order_id:%s, shop:%s, pizza_name:%s, customer_name:%s, phone_number:%s, address:%s, time:%s"
, ordId, shopId, pizzaName, customerName, phoneNumber, address
, now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.KOREAN)));
//System.out.println(message);
HashMap<String, String> messageMap = new HashMap<>();
messageMap.put("key", shopId);
messageMap.put("message", message);
return messageMap;
}
public static void main(String[] args) {
PizzaMessage pizzaMessage = new PizzaMessage();
// seed값을 고정하여 Random 객체와 Faker 객체를 생성.
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
for(int i=0; i < 60; i++) {
HashMap<String, String> message = pizzaMessage.produce_msg(faker, random, i);
System.out.println("key:"+ message.get("key") + " message:" + message.get("message"));
}
}
}
KafkaProj-01/producers/src/main/java/com/example/kafka/PizzaMessage.java at AfterSerDe · chulminkw/KafkaProj-01
Contribute to chulminkw/KafkaProj-01 development by creating an account on GitHub.
github.com
실습하기
1. Topic 생성
kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic --partitions 3

2. Consumer 생성
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic pizza-topic \
> --property print.key=true --property print.value=true \
> --property print.partition=true
3. Pizza Producer 작성
public class PizzaProducer {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());
public static void sendPizzaMessage(
KafkaProducer<String, String> kafkaProducer,
String topicName,
int iterCount, //몇번 돌릴지
int interIntervalMillis, //몇번 쉴지
int intervalMillis, //몇번마다 얼마나 쉴지
int intervalCount, //몇번마다 쉴지
boolean sync
) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq++ != iterCount) { //iterCount == -1 무한 루프
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) { //중간 중간 쉰다.
try {
logger.info("############ IntervalCount: {} intervalMillis: {}", intervalCount, intervalMillis);
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
if (interIntervalMillis > 0) {
try {
logger.info("interIntervalMillis: {}", interIntervalMillis);
Thread.sleep(interIntervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
public static void sendMessage(
KafkaProducer<String, String> kafkaProducer,
ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage,
boolean sync
) {
if (!sync) {
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("async message: {} partition:{} offset:{}", pMessage.get("key"), metadata.partition(), metadata.offset());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
});
} else {
try {
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
logger.info("sync message: {} partition:{} offset:{}", pMessage.get("key"), recordMetadata.partition(), recordMetadata.offset());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer 객체 생성
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 10, 100, 100, true);
kafkaProducer.close();
}
}
- sync 메시지 전송과 Async 메시지 전송을 파라미터마다 다르게 호출한다.
- iterCount가 -1이면 계속 호출한다.
- intervalCount가 0보다 클 경우에는 IntervalCount 횟수마다, IntervalMillis 만큼 쉰다.
- InterIntervalMillis 만큼 매 호출마다, 쉰다.
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 1' 카테고리의 다른 글
| Producer에서 키 값을 가지는 메시지 전송 구현 (0) | 2025.09.27 |
|---|---|
| Producer의 메시지 동기/비동기 전송 구현 (0) | 2025.09.26 |
| Kafka Producer의 Send() 와 Producer 메시지 동기화 전송 (0) | 2025.09.23 |
| Simple Producer 구현하기 (0) | 2025.08.15 |
| Java 기반 카프카 클라이언트 구현을 위한 프로젝트 세팅 (0) | 2025.08.15 |