我正在尝试为我的 Kafka 使用者编写集成测试。我已经遵循了官方参考文档,但是当我开始测试时,我只看到这个重复的广告无限期:-2019-04-03 15:47:34.002 WARN 13120 --- [主] 组织.apache.kafka.clients.Network客户端 : [消费者客户端 Id=消费者-1,groupId=我的-组] 无法建立与节点 -1 的连接。经纪人可能不可用。我做错了什么?我正在使用 JUnit5、弹簧靴和 .spring-kafkaspring-kafka-test我有我的班级的注释。@EnableKafka@Configuration我的测试类是这样的:@ExtendWith(SpringExtension::class)@SpringBootTest(classes = [TestKafkaConfig::class])@DirtiesContext@EmbeddedKafka( partitions = 1, topics = [KafkaIntegrationTest.MY_TOPIC])class KafkaIntegrationTest { @Autowired private lateinit var embeddedKafka: EmbeddedKafkaBroker @Test fun test() { val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString) val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps)) template.defaultTopic = KafkaIntegrationTest.MY_TOPIC template.sendDefault("foo") }}我看起来像这样:application.ymlkafka: consumer: group-id: my-group bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092} value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081} specific.avro.reader: true我也尝试过设置一个,但我得到完全相同的重复消息。(这就是我尝试设置的方式):MockSchemaRegistryClientMockSchemaRegistryClient@TestConfiguration@Import(TestConfig::class)class TestKafkaConfig { @Autowired private lateinit var props: KafkaProperties @Bean fun schemaRegistryClient() = MockSchemaRegistryClient() @Bean fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient()) @Bean fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。
1 回答
炎炎设计
TA贡献1808条经验 获得超4个赞
我相信您错过了为测试设置代理 URL。
文档中有一条关于如何获取此值的说明:
当嵌入式卡夫卡和嵌入式动物园管理员服务器由嵌入式卡夫布鲁克启动时,名为 spring.embedded.kafka.brokers 的系统属性将设置为卡夫卡代理的地址,而名为 spring.embedded.zookeeper.连接的系统属性将设置为动物园管理员的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。
(它位于此处的 junit 部分的底部)
解决此问题的一种方法是在测试中将此值设置为此值,例如kafka.consumers.bootstrap-servers
spring: kafka: consumer: bootstrap-servers: ${spring.embedded.kafka.brokers}
添加回答
举报
0/150
提交
取消