4 回答
TA贡献1804条经验 获得超8个赞
您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame.
在 Java 中,这有点痛苦,尤其是在创建Row对象时,但它是这样进行的:
List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
.stream()
.map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
.map(row -> row.collect(Collectors.toList()))
.map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
.map(Row$.MODULE$::fromSeq)
.collect(Collectors.toList());
StructType schema = new StructType(
cols.stream()
.map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
.collect(Collectors.toList())
.toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);
TA贡献1848条经验 获得超10个赞
spark 文档已经指出了如何加载内存中的 json 字符串。
这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
TA贡献1966条经验 获得超4个赞
public class MyRow implements Serializable {
private String fund;
private String broker;
private int qty;
public MyRow(String fund, String broker, int qty) {
super();
this.fund = fund;
this.broker = broker;
this.qty = qty;
}
public String getFund() {
return fund;
}
public void setFund(String fund) {
this.fund = fund;
}
public String getBroker() {
return broker;
}
public void setBroker(String broker) {
this.broker = broker;
}
public int getQty() {
return qty;
}
public void setQty(int qty) {
this.qty = qty;
}
}
现在创建一个 ArrayList。此列表中的每个项目都将充当最终数据框中的行。
MyRow r1 = new MyRow("f1", "b1", 100);
MyRow r2 = new MyRow("f2", "b2", 200);
List<MyRow> dataList = new ArrayList<>();
dataList.add(r1);
dataList.add(r2);
现在我们必须将此列表转换为数据集 -
Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);
ds.show()
TA贡献1852条经验 获得超7个赞
import org.apache.spark.api.java.function.Function;
private static JavaRDD<Map<String, Object>> rows;
private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));
public void test(){
rows = sc.parallelize(list);
JavaRDD<Row> rowRDD = rows.map(f);
Map<String, Object> headMap = list.get(0);
TreeMap<String, Object> headerMap = new TreeMap<>(headMap);
List<StructField> fields = new ArrayList<>();
StructField field;
for (String key : headerMap.keySet()) {
System.out.println("key:::"+key);
Object value = list.get(0).get(key);
if (value instanceof Integer) {
field = DataTypes.createStructField(key, DataTypes.IntegerType, true);
}
else if (value instanceof Double) {
field = DataTypes.createStructField(key, DataTypes.DoubleType, true);
}
else if (value instanceof Date || value instanceof java.util.Date) {
field = DataTypes.createStructField(key, DataTypes.DateType, true);
}
else {
field = DataTypes.createStructField(key, DataTypes.StringType, true);
}
fields.add(field);
}
StructType struct = DataTypes.createStructType(fields);
Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);
}
添加回答
举报