728x90

- 아무것도 설정하지 않을 경우 CustomPartitioner가 기본적으로 DefaultPartitioner로 설정이 되고, Key가 있으면 Key값에 따른 해시 함수 실행, Key가 없으면 스티키 파티셔닝으로 동작한다.
Producer의 메시지 파티셔닝
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param numPartitions The number of partitions of the given {@code topic}
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
- KafkaProducer는 기본적으로 DefaultPartitioner 클래스를 이용해, 메시지 전송 시 도착할 Partition을 지정
- DefaultPartitioner는 키를 가지는 메시지의 경우 키 값을 해싱하여 키 값에 따른 파티션을 균일하게 전송
Custom Partitioning 예시

- 특정 키는 Partition#0,#1로 그 외는 나머지 파티션으로 균일 분배하고 싶다면 Custom Partitioning을 구현해야 한다.
- Custom Partitioning을 구현하기 위해서는 Partitioner 인터페이스를 구현해야 하고, partition() 메서드에 Custom Partitioning로직을 직접 구현해야 한다.
실습하기
1) Custom Partitioner 생성
p001만 파티션 2개, 나머지는 균일하게 동작
public class CustomPartitioner implements Partitioner {
public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName());
private String specialKeyName;
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
@Override
public void configure(Map<String, ?> configs) {
specialKeyName = configs.get("custom.specialKey").toString();
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partitionIndex = 0;
//전체 파티션 읽어오기
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfos.size();
int numSpecialPartitions = (int) (numPartitions * 0.5); //절반 할당
if (keyBytes == null) {
//return stickyPartitionCache.partition(topic, cluster);
throw new InvalidRecordException("key should not be null");
}
if (((String) key).equals(specialKeyName)) {
partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions;
} else {
partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + numSpecialPartitions;
}
logger.info("key:{} is sent to partitioner: {}", key.toString(), partitionIndex);
return partitionIndex;
}
@Override
public void close() {
}
}
- configure에 들어오는 값은 PizzaProducerCustomPartitioner에서 설정한 Property 값이다.
- partitionIndex는 SpecialKey가 라면, 파티션 중 절반으로 % 연산한 값 중 하나로 할당된다.
- partitionIndex가 SpecialKey가 아니라면, SpecialKey 영역이 아닌 파티션 중 하나로 할당된다.
- ex) 5개의 파티션이 있다면, 1/2 파티션에 SpecialKey로 들어올 수 있고, 3/4/5 파티션에 SpecialKey가 아닌 값이 들어올 수 있다.
2) PizzaProducerCustomPartitioner Config 수정
String topicName = "pizza-topic-partitioner";
//KafkaProducer Configuration Setting
//Null, String
Properties props = new Properties();
//Bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty("custom.specialKey", "P001");
props.setProperty(PARTITIONER_CLASS_CONFIG, "com.example.kafka.CustomPartitioner");
- 다른 패키지일 경우 Full 패키지 이름을 적어주어야 한다.
3) 파티션 생성
kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic-partitioner --partitions 5
- 파티션 5개인 Topic 생성
4-1) 확인하기(Dump로그로 확인하기)
kafka-dump-log --deep-iteration --files /home/min/data/kafka-logs/pizza-topic-partitioner-0/00000000000000000000.log --print-data-log

- "P001" Key 값만 들어온 것을 확인할 수 있다.
4-2) 확인하기(Consumer로 확인하기)
kafka-console-consumer --bootstrap-server localhost:9092 --topic pizza-topic-partitioner \
> --property print.key=true --property print.value=true --partition 0

- 운영환경에서는 잘 쓰이지 않음
728x90
'카프카 > Java 기반 카프카 클라이언트 구현, Producer 내부 메커니즘 2' 카테고리의 다른 글
| idempotence(멱등성) 기반 중복 없이 전송 (0) | 2025.10.04 |
|---|---|
| Producer의 max.in.flight.request.per.connection의 이해 (0) | 2025.10.04 |
| Producer의 전송/재전송 내부 메커니즘 및 재전송 동작 관련 주요 파라미터 (0) | 2025.09.30 |
| Producer의 메시지 배치 전송 내부 메커니즘 (0) | 2025.09.29 |
| acks 값 설정에 따른 Producer의 전송 방식 차이 이해 (0) | 2025.09.29 |