我们正在创建一个数据流管道,我们将从 postgres 读取数据并将其写入镶木地板文件。ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(来自此处https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO。网页)。但是镶木地板文件架构并不像我预期的那样这是我的模式:schema = new org.apache.avro.Schema.Parser().parse("{\n" + " \"type\": \"record\",\n" + " \"namespace\": \"com.example\",\n" + " \"name\": \"Patterns\",\n" + " \"fields\": [\n" + " { \"name\": \"id\", \"type\": \"string\" },\n" + " { \"name\": \"name\", \"type\": \"string\" },\n" + " { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" + " { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" + " { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" + " ]\n" + "}");到目前为止,这是我的代码:Pipeline p = Pipeline.create( PipelineOptionsFactory.fromArgs(args).withValidation().create());p.apply(JdbcIO.<GenericRecord> read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database") .withUsername("username") .withPassword("password")) .withQuery("select * from table limit(10)") .withCoder(AvroCoder.of(schema)) .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> { GenericRecord record = new GenericData.Record(schema); ResultSetMetaData metadata = resultSet.getMetaData(); int columnsNumber = metadata.getColumnCount();
2 回答
皈依舞
TA贡献1851条经验 获得超3个赞
我没有找到从 Avro 创建不在 GroupType 中的重复元素的方法。
Beam 中的 ParquetIO 使用项目中定义的“标准”avro 转换,在这里parquet-mr
实现。
似乎有两种方法可以将 Avro ARRAY 字段转换为 Parquet 消息——但它们都没有创建您正在寻找的内容。
目前,avro 转换是目前与 ParquetIO 交互的唯一方式。我在 ParquetIO 中看到了这个 JIRA Use Beam 模式,将其扩展到 Beam Rows,这可能允许不同的 parquet 消息策略。
或者,您可以为 ParquetIO 创建 JIRA 功能请求以支持 thrift 结构,这应该允许更好地控制 parquet 结构。
FFIVE
TA贡献1797条经验 获得超6个赞
它是您用来描述预期模式的 protobuf 消息吗?我认为您得到的是从指定的 JSON 模式正确生成的。optional repeated
在 protobuf 语言规范中没有意义:https://developers.google.com/protocol-buffers/docs/reference/proto2-spec
您可以删除null
方括号以生成简单的repeated
字段,它在语义上等同于optional repeated
(因为repeated
意味着零次或多次)。
添加回答
举报
0/150
提交
取消