我正在尝试使用 Apache Beam 读取 avro 文件并使用 Beam SQL 来转换数据。我对 Beam 和 Java 还是新手。这是我的简单代码:public class BeamSQLReadAvro { @SuppressWarnings("serial") public static void main(String[] args) throws IOException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); Pipeline p = Pipeline.create(options); /* Schema definition */ Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc")); /* Create record/row */ PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro")); /* SQL Transform */ records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10")) /* Print output */ .apply("Output", MapElements.via( new SimpleFunction<Row, Row>() { @Override public Row apply(Row input) { System.out.println("PCOLLECTION: " + input.getValues()); return input; } } ) ); p.run().waitUntilFinish(); }}它给了我错误Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema我不明白,我定义了一个名为 schema 的变量。这里有什么指点吗?
1 回答
慕姐4208626
TA贡献1852条经验 获得超7个赞
实际上,您的管道中有两种类型的模式 - Avro 和 Beam 模式。Avro 模式用于解析 Avro 输入记录,但对于 SQL 转换,您应该使用具有 Beam 模式的行。为此,AvroIO
提供一个选项withBeamSchemas(boolean)
,应true
根据您的情况设置为,例如:
AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")
添加回答
举报
0/150
提交
取消