3 回答
TA贡献1824条经验 获得超5个赞
给出的实现相同,但使用更高级别的数据集 api。
Dataset<Row> Ad1 = df.select(functions.col("Address1").as("Address4"));
Dataset<Row> Ad2 = df.select("Address2");
Dataset<Row> Ad3 = df.select("Address3");
Dataset<Row> Union_DS = Ad1.union(Ad2).union(Ad3);
Union_DS.show();
Dataset<Row> Union_Sorted = Union_DS
.groupBy("Address4")
.agg(functions.count(functions.col("Address4")).as("Count"))
.sort(functions.desc("Count"))
;
Union_Sorted.show();
TA贡献1783条经验 获得超4个赞
您应该能够使用UNIONSpark SQL 来解决这个问题:
spark.sql(
"""
|SELECT Address4
|FROM (
| SELECT Address1 FROM table
| UNION
| SELECT Address2 FROM table
| UNION
| SELECT Address3 FROM table
| )
""".stripMargin).show()
TA贡献1796条经验 获得超4个赞
#You can be able to do it with the below approach
val input_rdd = spark.sparkContext.parallelize(List(("a1", "b1", "c1"), ("a1", "b2", "c1"), ("a1", "b1", "c2"), ("a2", "b2", "c2")))
val input_df = input_rdd.toDF("Address1", "Address2", "Address3")
input_df.show()
+--------+--------+--------+
|Address1|Address2|Address3|
+--------+--------+--------+
| a1| b1| c1|
| a1| b2| c1|
| a1| b1| c2|
| a2| b2| c2|
+--------+--------+--------+
val out_address1_df = input_df.groupBy("Address1").agg(count(input_df("Address1")).as("count_address1")).
select(input_df("Address1").as("ADDRESS"), col("count_address1").as("COUNT"))
//out_address1_df.show()
val out_address2_df = input_df.groupBy("Address2").agg(count(input_df("Address2")).as("count_address2")).
select(input_df("Address2").as("ADDRESS"), col("count_address2").as("COUNT"))
//out_address2_df.show()
val out_address3_df = input_df.groupBy("Address3").agg(count(input_df("Address3")).as("count_address3")).
select(input_df("Address3").as("ADDRESS"), col("count_address3").as("COUNT"))
val output_df = out_address1_df.unionAll(out_address2_df).unionAll(out_address3_df)
output_df.show()
+-------+-----+
|ADDRESS|COUNT|
+-------+-----+
| a2| 1|
| a1| 3|
| b2| 2|
| b1| 2|
| c1| 2|
| c2| 2|
+-------+-----+
添加回答
举报