我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据PCollection<String>,我需要将其转换为PCollection<Row>. 我将逗号分隔的字符串拆分为一个数组,并使用模式将其转换为行。但是,如何实现/构建架构并将值动态绑定到它?即使我为模式构建创建了一个单独的类,有没有办法将字符串数组直接绑定到模式?下面是我当前的工作代码,它是静态的,每次我构建管道时都需要重写,它也会根据字段的数量进行扩展。final Schema sch1 = Schema.builder().addStringField("name").addInt32Field("age").build();PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline .apply( KafkaIO.<Long, String>read() .withBootstrapServers("localhost:9092") .withTopic("testin1") .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties( ImmutableMap.of("group.id", (Object)"test1")));PCollection<Row> Input1 = kafkaDataIn1.apply( ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() { @ProcessElement public void processElement( ProcessContext processContext, final OutputReceiver<Row> emitter) { KafkaRecord<Long, String> record = processContext.element(); final String input = record.getKV().getValue(); final String[] parts = input.split(","); emitter.output( Row.withSchema(sch1) .addValues( parts[0], Integer.parseInt(parts[1])).build()); }})) .apply("window", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes());Input1.setRowSchema(sch1);我的期望是以动态/可重用的方式实现与上述代码相同的事情。
1 回答
大话西游666
TA贡献1817条经验 获得超14个赞
该模式是在 pcollection 上设置的,因此它不是动态的,如果您想懒惰地构建它,那么您需要使用支持它的格式/编码器。Java 序列化或 json 就是例子。
也就是说,为了受益于 sql 功能,您还可以使用带有查询字段和其他字段的静态模式,这样静态部分就可以执行 sql,并且不会丢失额外的数据。
添加回答
举报
0/150
提交
取消