为了账号安全,请及时绑定邮箱和手机立即绑定

在组归约上按键触发嵌套结构的错误序列化

在组归约上按键触发嵌套结构的错误序列化

慕标5832272 2022-07-20 16:11:18
我想按键减少数据帧。reduce 逻辑非常复杂,需要更新大约 10-15 个字段。这就是为什么我想将 DataFrame 转换为 DataSet 并减少 Java POJO。问题问题是,在groupByKey-reduceByKey我得到一些非常奇怪的值之后。Encoders.bean(Entity.class)读取正确的数据。请参阅代码示例部分。变通方法替换Encoders.bean为Encoders.kryo不起作用,异常:Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.我也看到了这个 workarround,但Encoders.product需要TypeTag. 我不知道如何TypeTag在 Java 代码中创建。
查看完整描述

1 回答

?
泛舟湖上清波郎朗

TA贡献1818条经验 获得超3个赞

这是因为反序列化使用由 推断出的架构上的结构匹配Encoder,并且由于 bean 类没有自然结构,架构的字段按名称排序。


所以如果你定义一个像你的 bean 类Entity,从 bean 推断的模式Encoder将是


Encoders.bean(Storage.class).schema().printTreeString();

root

 |-- storage: double (nullable = true)

 |-- timestamp: timestamp (nullable = true)

不是



root

 |-- timestamp: timestamp (nullable = true)

 |-- storage: double (nullable = true)


这是应该使用的架构Dataset。换句话说,架构定义为:


StructType schema = Encoders.bean(Entity.class).schema();

或者


StructType schema = StructType.fromDDL(

  "broker_name string, order integer, server_name string, " + 

  "storages array<struct<storage: double, timestamp: timestamp>>" 

);

将是有效的,并且可以用于testData直接加载:


Dataset<Entity> ds = spark.read()

  .option("multiline", "true")

  .schema(schema)

  .json("testData.json")

  .as(Encoders.bean(Entity.class));

而您当前的架构,相当于:



StructType valid = StructType.fromDDL(

  "broker_name string, order integer, server_name string, " + 

  "storages array<struct<timestamp: timestamp, storage: double>>" 

);


不是,尽管它可以与 JSON 阅读器一起使用,它(与 相比Encoders)按名称匹配数据。


可以说,这种行为应该被报告为一个错误——直观地说,不应该有Encoder转储与其自己的加载逻辑不兼容的数据的情况。



查看完整回答
反对 回复 2022-07-20
  • 1 回答
  • 0 关注
  • 106 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信