为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Beam SQL 查询 Avro 架构

使用 Beam SQL 查询 Avro 架构

HUH函数 2024-01-05 16:43:20
我正在尝试使用 Apache Beam 读取 avro 文件并使用 Beam SQL 来转换数据。我对 Beam 和 Java 还是新手。这是我的简单代码:public class BeamSQLReadAvro {    @SuppressWarnings("serial")    public static void main(String[] args) throws IOException {        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();        Pipeline p = Pipeline.create(options);        /* Schema definition */        Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));        /* Create record/row */        PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));        /* SQL Transform */        records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))        /* Print output */               .apply("Output",                      MapElements.via(                        new SimpleFunction<Row, Row>() {                          @Override                          public Row apply(Row input) {                            System.out.println("PCOLLECTION: " + input.getValues());                            return input;                          }                        }                      )               );        p.run().waitUntilFinish();    }}它给了我错误Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema我不明白,我定义了一个名为 schema 的变量。这里有什么指点吗?
查看完整描述

1 回答

?
慕姐4208626

TA贡献1852条经验 获得超7个赞

实际上,您的管道中有两种类型的模式 - Avro 和 Beam 模式。Avro 模式用于解析 Avro 输入记录,但对于 SQL 转换,您应该使用具有 Beam 模式的行。为此,AvroIO提供一个选项withBeamSchemas(boolean),应true根据您的情况设置为,例如:

AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")



查看完整回答
反对 回复 2024-01-05
  • 1 回答
  • 0 关注
  • 88 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信