为了账号安全,请及时绑定邮箱和手机立即绑定

如何修复“不兼容的类型:

如何修复“不兼容的类型:

智慧大石 2022-06-04 09:30:32
我按照此链接创建了一个模板,该模板构建了一个从 KafkaIO 读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。Beam 版本是 2.9.0,这是我的代码的一部分。public interface Options extends PipelineOptions {    @Description("Kafka server")    @Required    ValueProvider<String> getKafkaServer();    void setKafkaServer(ValueProvider<String> value);    @Description("Topic to read from")    @Required    ValueProvider<String> getInputTopic();    void setInputTopic(ValueProvider<String> value);    @Description("Topic to write to")    @Required    ValueProvider<String> getOutputTopic();    void setOutputTopic(ValueProvider<String> value);    @Description("File path to write to")    @Required    ValueProvider<String> getOutput();    void setOutput(ValueProvider<String> value);}public static void main(String[] args) {    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);    Pipeline p = Pipeline.create(options);    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()            .withBootstrapServers(options.getKafkaServer())            .withTopic(options.getInputTopic())            .withKeyDeserializer(LongDeserializer.class)            .withValueDeserializer(StringDeserializer.class)            .withoutMetadata()     )以下是我运行代码的方式:mvn compile exec:java \-Dexec.mainClass=${MyClass} \-Pdataflow-runner -Dexec.args=" \--project=${MyClass} \--stagingLocation=gs://${MyBucket}/staging \--tempLocation=gs://${MyBucket}/temp \--templateLocation=gs://${MyBucket}/templates/${MyClass} \--runner=DataflowRunner"
查看完整描述

2 回答

?
BIG阳

TA贡献1859条经验 获得超6个赞

为了通过 访问值ValueProvider,您需要使用该get方法,然后获取具有具体类型的值。

例如:当有选项时:

ValueProvider<String> getKafkaServer();

您可以通过以下方式访问它:

getKafkaServer().get()这将返回您的 String 对象。

似乎 KafkaIo Api 需要获取字符串参数而不是 ValueProvider,您必须从 ValueProvider 包装器中提取值。


查看完整回答
反对 回复 2022-06-04
?
慕标5832272

TA贡献1966条经验 获得超4个赞

我可能会发现问题,即不支持 kafkaIO。以下来自谷歌创建模板

" 一些 I/O 连接器包含接受 ValueProvider 对象的方法。要确定对特定连接器和方法的支持,请参阅 I/O 连接器的 API 参考文档。支持的方法具有 ValueProvider 的重载。如果方法没有重载,该方法不支持运行时参数。以下 I/O 连接器至少有部分 ValueProvider 支持:

基于文件的 IO:TextIO、AvroIO、FileIO、TFRecordIO、XmlIO BigQueryIO* BigtableIO(需要 SDK 2.3.0 或更高版本)PubSubIO SpannerIO "


查看完整回答
反对 回复 2022-06-04
  • 2 回答
  • 0 关注
  • 149 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信