为了账号安全,请及时绑定邮箱和手机立即绑定

如何将侧面输入/额外输入传递给 JdbcIO RowMapper Java

如何将侧面输入/额外输入传递给 JdbcIO RowMapper Java

犯罪嫌疑人X 2023-08-09 17:16:34
我正在尝试使用 JdbcIO.Read 读取 Java Beam 中的云 SQL 表。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record。有没有办法可以将 JSON 架构字符串作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView我尝试过执行这两种读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和 My Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read 读取表我正在动态生成表的 Avro 模式,如下所示:PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()                .withDataSourceConfiguration(config)                .withCoder(StringUtf8Coder.of())                .withQuery("SELECT DISTINCT column_name, data_type \n" +                        "FROM information_schema.columns\n" +                        "WHERE table_name = " + "'" + tableName + "'")                .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {            // code here to generate avro schema string           // this works fine for me}))创建 PCollectionView 它将保存每个表的 json 模式。 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());// I want to access this view as side input in next JdbcIO.Read operation// something like this ;pipeline.apply(JdbcIO.<String>read()        .withDataSourceConfiguration(config)        .withCoder(StringUtf8Coder.of())        .withQuery(queryString)        .withRowMapper(new JdbcIO.RowMapper<String>() {            @Override            public String mapRow(ResultSet resultSet) throws Exception {                // access schema here and use it to parse and create                //GenericData.Record from ResultSet fields as per schema                return null;            }        })).    withSideInputs(My PCollectionView here); // this option is not there right now.有没有更好的方法来解决这个问题?
查看完整描述

1 回答

?
千巷猫影

TA贡献1829条经验 获得超7个赞

此时 IO API 不接受 SideInputs。

在读取后立即添加 ParDo 并在那里进行映射应该是可行的。ParDo 可以接受侧面输入。


查看完整回答
反对 回复 2023-08-09
  • 1 回答
  • 0 关注
  • 95 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信