2 回答
TA贡献1810条经验 获得超4个赞
如果您没有定义任何自定义分区,它将按照以下规则使用默认分区器
如果记录中指定了分区,则使用该分区进行发布。
如果未指定分区但存在键,则根据键的散列选择分区
如果不存在分区或键,请以循环方式选择一个分区
下面默认,Partition实现来更好的理解
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
添加回答
举报