我正在尝试使用 JdbcIO.Read 读取 Java Beam 中的云 SQL 表。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record。有没有办法可以将 JSON 架构字符串作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView我尝试过执行这两种读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和 My Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read 读取表我正在动态生成表的 Avro 模式,如下所示:PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read() .withDataSourceConfiguration(config) .withCoder(StringUtf8Coder.of()) .withQuery("SELECT DISTINCT column_name, data_type \n" + "FROM information_schema.columns\n" + "WHERE table_name = " + "'" + tableName + "'") .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> { // code here to generate avro schema string // this works fine for me}))创建 PCollectionView 它将保存每个表的 json 模式。 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());// I want to access this view as side input in next JdbcIO.Read operation// something like this ;pipeline.apply(JdbcIO.<String>read() .withDataSourceConfiguration(config) .withCoder(StringUtf8Coder.of()) .withQuery(queryString) .withRowMapper(new JdbcIO.RowMapper<String>() { @Override public String mapRow(ResultSet resultSet) throws Exception { // access schema here and use it to parse and create //GenericData.Record from ResultSet fields as per schema return null; } })). withSideInputs(My PCollectionView here); // this option is not there right now.有没有更好的方法来解决这个问题?
添加回答
举报
0/150
提交
取消