为了账号安全,请及时绑定邮箱和手机立即绑定

kafka AdminClient API超时,等待节点分配

kafka AdminClient API超时,等待节点分配

BIG阳 2021-05-13 18:19:27
我是Kafka的新手,正在尝试使用AdminClientAPI管理在我的本地计算机上运行的Kafka服务器。我将其设置与Kafka文档的“快速入门”部分完全相同。唯一的区别是我还没有创建任何主题。在此设置上运行任何外壳程序脚本都没有问题,但是当我尝试运行以下Java代码时:public class ProducerMain{    public static void main(String[] args) {        Properties props = new Properties();        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,             "localhost:9092");        try(final AdminClient adminClient =               KafkaAdminClient.create(props)){            try {                final NewTopic newTopic = new NewTopic("test", 1,                     (short)1);                final CreateTopicsResult createTopicsResult =                     adminClient.createTopics(                          Collections.singleton(newTopic));                createTopicsResult.all().get();            }catch (InterruptedException | ExecutionException e) {                e.printStackTrace();            }        }    }}错误: TimeoutException: Timed out waiting for a node assignmentException in thread "main" java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.at ProducerMain.main(ProducerMain.java:41)    <br>Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)at ProducerMain.main(ProducerMain.java:38)<br>Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.我已经在网上搜索了有关可能是什么问题的指示,但到目前为止没有发现任何问题。任何建议都值得欢迎,因为我已经走到尽头了。
查看完整描述

2 回答

?
函数式编程

TA贡献1807条经验 获得超9个赞

听起来您的经纪人不健康...


该代码可以正常工作


public class Main {


    static final Logger logger = LoggerFactory.getLogger(Main.class);


    public static void main(String[] args) {

        Properties properties = new Properties();

        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, "local-test");

        properties.setProperty(AdminClientConfig.RETRIES_CONFIG, "3");


        try (AdminClient client = AdminClient.create(properties)) {

            final CreateTopicsResult res = client.createTopics(

                    Collections.singletonList(

                            new NewTopic("foo", 1, (short) 1)

                    )

            );

            res.all().get(5, TimeUnit.SECONDS);

        } catch (InterruptedException | ExecutionException | TimeoutException e) {

            logger.error("unable to create topic", e);

        }

    }

}

而且我可以在代理日志中看到该主题已创建


查看完整回答
反对 回复 2021-05-26
?
桃花长相依

TA贡献1860条经验 获得超8个赞

我用bitnami / kafka启动了kafka服务,并得到了完全相同的错误。尝试通过此版本启动kafka,它可以正常工作:https : //hub.docker.com/r/wurstmeister/kafka


$ docker run -d --name zookeeper-server --network app-tier \

  -e ALLOW_ANONYMOUS_LOGIN=yes  -p 2181:2181 zookeeper:3.6.2


$ docker run -d --name kafka-server --network app-tier --publish 9092:9092 \

  --env KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \

  --env KAFKA_ADVERTISED_HOST_NAME=30.225.51.235 \

  --env KAFKA_ADVERTISED_PORT=9092  \

  wurstmeister/kafka

30.225.51.235是主机的IP地址。


查看完整回答
反对 回复 2021-05-26
  • 2 回答
  • 0 关注
  • 320 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信