我按照此链接创建了一个模板,该模板构建了一个从 KafkaIO 读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。Beam 版本是 2.9.0,这是我的代码的一部分。public interface Options extends PipelineOptions { @Description("Kafka server") @Required ValueProvider<String> getKafkaServer(); void setKafkaServer(ValueProvider<String> value); @Description("Topic to read from") @Required ValueProvider<String> getInputTopic(); void setInputTopic(ValueProvider<String> value); @Description("Topic to write to") @Required ValueProvider<String> getOutputTopic(); void setOutputTopic(ValueProvider<String> value); @Description("File path to write to") @Required ValueProvider<String> getOutput(); void setOutput(ValueProvider<String> value);}public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline p = Pipeline.create(options); PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read() .withBootstrapServers(options.getKafkaServer()) .withTopic(options.getInputTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() )以下是我运行代码的方式:mvn compile exec:java \-Dexec.mainClass=${MyClass} \-Pdataflow-runner -Dexec.args=" \--project=${MyClass} \--stagingLocation=gs://${MyBucket}/staging \--tempLocation=gs://${MyBucket}/temp \--templateLocation=gs://${MyBucket}/templates/${MyClass} \--runner=DataflowRunner"
2 回答
BIG阳
TA贡献1859条经验 获得超6个赞
为了通过 访问值ValueProvider
,您需要使用该get
方法,然后获取具有具体类型的值。
例如:当有选项时:
ValueProvider<String> getKafkaServer();
您可以通过以下方式访问它:
getKafkaServer().get()
这将返回您的 String 对象。
似乎 KafkaIo Api 需要获取字符串参数而不是 ValueProvider,您必须从 ValueProvider 包装器中提取值。
慕标5832272
TA贡献1966条经验 获得超4个赞
我可能会发现问题,即不支持 kafkaIO。以下来自谷歌创建模板。
" 一些 I/O 连接器包含接受 ValueProvider 对象的方法。要确定对特定连接器和方法的支持,请参阅 I/O 连接器的 API 参考文档。支持的方法具有 ValueProvider 的重载。如果方法没有重载,该方法不支持运行时参数。以下 I/O 连接器至少有部分 ValueProvider 支持:
基于文件的 IO:TextIO、AvroIO、FileIO、TFRecordIO、XmlIO BigQueryIO* BigtableIO(需要 SDK 2.3.0 或更高版本)PubSubIO SpannerIO "
添加回答
举报
0/150
提交
取消