3 回答
TA贡献1825条经验 获得超4个赞
要使用基于文件系统的源正确写入(或读取)数据,您需要一个共享存储。
_temporary
目录是 Spark 使用的基本提交机制的一部分 - 数据首先写入临时目录,一旦所有任务完成,原子移动到最终目的地。您可以在Spark _temporary 创建原因中阅读有关此过程的更多信息
要使此过程成功,您需要共享文件系统(HDFS、NFS 等)或等效的分布式存储(如 S3)。由于您没有,因此预计无法清除临时状态 - 将数据帧保存到本地文件系统会导致结果为空。
当某些执行程序与驱动程序位于同一位置并与驱动程序共享文件系统时,可能会发生您观察到的行为(数据部分提交和部分未提交),从而为数据子集启用完全提交。
TA贡献1876条经验 获得超5个赞
经过分析,观察到我的 spark 作业使用的fileoutputcommitter version 1
是默认值。然后我包含要使用的配置,fileoutputcommitter version 2
而不是version 1
在 AWS 中的 10 节点 spark 独立集群中进行测试。所有part-* files
都直接在outputDirPath
指定的下生成dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)
我们可以设置属性
通过包括同
--conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'
在spark-submit command
或使用 sparkContext 设置属性
javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")
我了解spark docs 中概述的失败情况下的后果,但我达到了预期的结果!
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version , defaultValue is 1
文件输出提交者算法版本,有效算法版本号:1 或 2。版本 2 可能有更好的性能,但版本 1 在某些情况下可能会更好地处理故障,如根据 MAPREDUCE-4815。
TA贡献1900条经验 获得超5个赞
多个部分文件基于您的数据帧分区。写入的文件或数据的数量取决于您写出数据时 DataFrame 的分区数量。默认情况下,每个数据分区写入一个文件。
您可以通过使用合并或重新分区来控制它。您可以减少或增加分区。
如果将合并为 1,则不会在其中看到多个部分文件,但这会影响并行写入数据。
[outputDirPath = /tmp/multiple.csv]
dataframe
.coalesce(1)
.write.option("header","false")
.mode(SaveMode.Overwrite)
.csv(outputDirPath);
关于如何引用它的问题..
请参阅/tmp/multiple.csv以下所有部分。
/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
添加回答
举报