为了账号安全,请及时绑定邮箱和手机立即绑定

如果我不提任何,kafka如何决定分区

如果我不提任何,kafka如何决定分区

婷婷同学_ 2021-05-31 15:49:44
这就是我产生消息的方式:String json = gson.toJson(msg);ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProducerConfig.getTopic(), json);long startTime = System.currentTimeMillis();try {    RecordMetadata meta = producer.send(record).get(5, TimeUnit.SECONDS);} catch (InterruptedException e) {    e.printStackTrace();} catch (ExecutionException e) {    e.printStackTrace();} catch (TimeoutException e) {    e.printStackTrace();}我有15这个主题的分区,我在制作时没有提到分区键,分配的默认分区是什么?
查看完整描述

2 回答

?
蝴蝶不菲

TA贡献1810条经验 获得超4个赞

如果您没有定义任何自定义分区,它将按照以下规则使用默认分区器

  1. 如果记录中指定了分区,则使用该分区进行发布。

  2. 如果未指定分区但存在键,则根据键的散列选择分区

  3. 如果不存在分区或键,请以循环方式选择一个分区

下面默认,Partition实现来更好的理解

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        if (keyBytes == null) {

            int nextValue = nextValue(topic);

            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() > 0) {

                int part = Utils.toPositive(nextValue) % availablePartitions.size();

                return availablePartitions.get(part).partition();

            } else {

                // no partitions are available, give a non-available partition

                return Utils.toPositive(nextValue) % numPartitions;

            }

        } else {

            // hash the keyBytes to choose a partition

            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

        }

    }


查看完整回答
反对 回复 2021-06-10
  • 2 回答
  • 0 关注
  • 189 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信