1 回答
TA贡献1818条经验 获得超3个赞
这是因为反序列化使用由 推断出的架构上的结构匹配Encoder,并且由于 bean 类没有自然结构,架构的字段按名称排序。
所以如果你定义一个像你的 bean 类Entity,从 bean 推断的模式Encoder将是
Encoders.bean(Storage.class).schema().printTreeString();
root
|-- storage: double (nullable = true)
|-- timestamp: timestamp (nullable = true)
不是
root
|-- timestamp: timestamp (nullable = true)
|-- storage: double (nullable = true)
这是应该使用的架构Dataset。换句话说,架构定义为:
StructType schema = Encoders.bean(Entity.class).schema();
或者
StructType schema = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<storage: double, timestamp: timestamp>>"
);
将是有效的,并且可以用于testData直接加载:
Dataset<Entity> ds = spark.read()
.option("multiline", "true")
.schema(schema)
.json("testData.json")
.as(Encoders.bean(Entity.class));
而您当前的架构,相当于:
StructType valid = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<timestamp: timestamp, storage: double>>"
);
不是,尽管它可以与 JSON 阅读器一起使用,它(与 相比Encoders)按名称匹配数据。
可以说,这种行为应该被报告为一个错误——直观地说,不应该有Encoder转储与其自己的加载逻辑不兼容的数据的情况。
添加回答
举报