为了账号安全,请及时绑定邮箱和手机立即绑定

如何从 PCollection<String> 创建 PCollection<Row> 以执行

如何从 PCollection<String> 创建 PCollection<Row> 以执行

凤凰求蛊 2023-03-09 17:24:37
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据PCollection<String>,我需要将其转换为PCollection<Row>. 我将逗号分隔的字符串拆分为一个数组,并使用模式将其转换为行。但是,如何实现/构建架构并将值动态绑定到它?即使我为模式构建创建了一个单独的类,有没有办法将字符串数组直接绑定到模式?下面是我当前的工作代码,它是静态的,每次我构建管道时都需要重写,它也会根据字段的数量进行扩展。final Schema sch1 =                Schema.builder().addStringField("name").addInt32Field("age").build();PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline  .apply(    KafkaIO.<Long, String>read()      .withBootstrapServers("localhost:9092")      .withTopic("testin1")      .withKeyDeserializer(LongDeserializer.class)      .withValueDeserializer(StringDeserializer.class)      .updateConsumerProperties(         ImmutableMap.of("group.id", (Object)"test1")));PCollection<Row> Input1 = kafkaDataIn1.apply(  ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() {    @ProcessElement    public void processElement(        ProcessContext processContext,        final OutputReceiver<Row> emitter) {          KafkaRecord<Long, String> record = processContext.element();          final String input = record.getKV().getValue();          final String[] parts = input.split(",");          emitter.output(            Row.withSchema(sch1)               .addValues(                   parts[0],                   Integer.parseInt(parts[1])).build());        }}))  .apply("window",     Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))       .triggering(AfterWatermark.pastEndOfWindow())       .withAllowedLateness(Duration.ZERO)       .accumulatingFiredPanes());Input1.setRowSchema(sch1);我的期望是以动态/可重用的方式实现与上述代码相同的事情。
查看完整描述

1 回答

?
大话西游666

TA贡献1817条经验 获得超14个赞

该模式是在 pcollection 上设置的,因此它不是动态的,如果您想懒惰地构建它,那么您需要使用支持它的格式/编码器。Java 序列化或 json 就是例子。

也就是说,为了受益于 sql 功能,您还可以使用带有查询字段和其他字段的静态模式,这样静态部分就可以执行 sql,并且不会丢失额外的数据。


查看完整回答
反对 回复 2023-03-09
  • 1 回答
  • 0 关注
  • 67 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信