3 回答
TA贡献1799条经验 获得超9个赞
如果运行repartition(COL)
,则在计算过程中更改分区-您将获得spark.sql.shuffle.partitions
(默认值:200)分区。如果您随后致电,.write
您将获得一个包含许多文件的目录。
如果运行,.write.partitionBy(COL)
则结果将获得与COL中的唯一值一样多的目录。这样可以加快进一步的数据读取速度(如果您按分区列进行过滤),并节省了一些存储空间(分区列已从数据文件中删除)。
更新:参见@conradlee的答案。他不仅详细说明了应用不同方法后的目录结构,而且还解释了两种情况下文件的数量。
TA贡献1871条经验 获得超8个赞
repartition()用于对内存中的数据进行分区,并partitionBy用于对磁盘上的数据进行分区。如本博客文章所述,它们通常结合使用。
二者repartition()并partitionBy可以用于“基于数据帧列分区数据”,但repartition()在存储分区中的数据和partitionBy分区在磁盘上的数据。
repartition()
让我们玩一些代码以更好地了解分区。假设您具有以下CSV数据。
first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
df.repartition(col("country")) 将按内存中的国家/地区对数据进行分区。
让我们写出数据,以便我们可以检查每个内存分区的内容。
val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
.write
.csv(outputPath)
这是将数据写到磁盘上的方法:
partitioned_by_country/
part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
每个文件都包含一个国家/ part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv地区的数据-该文件包含以下中国数据,例如:
Bruce,Lee,China
Jack,Ma,China
partitionBy()
让我们将数据写到磁盘上partitionBy,看看文件系统输出如何不同。
这是将数据写到磁盘分区的代码。
val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
.write
.partitionBy("country")
.csv(outputPath)
磁盘上的数据如下所示:
partitionedBy_disk/
country=Argentina/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
country=China/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
country=Russia/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
为什么要对磁盘上的数据进行分区?
如本博文所述,对磁盘上的数据进行分区可以使某些查询运行得更快。
添加回答
举报