为了账号安全,请及时绑定邮箱和手机立即绑定

Spark Dataframe Write to CSV 在独立集群模式下创建

Spark Dataframe Write to CSV 在独立集群模式下创建

jeck猫 2021-08-19 21:35:24
我spark job在一个有 2 个工作节点的集群中运行!我正在使用下面的代码(spark java)将计算出的数据帧作为 csv 保存到工作节点。dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);我试图了解 spark 如何在每个工作节点上写入多个部分文件。Run1)worker1有part files和SUCCESS; worker2让_temporarty/task*/part*每个任务都运行零件文件。Run2)worker1有部分文件和_temporary目录;worker2拥有multiple part files谁能帮助我理解为什么会出现这种行为?1)我应该将记录outputDir/_temporary视为输出文件的一部分part files in outputDir吗?2)是否_temporary 应该在作业运行后删除 dir 并将part文件移动到outputDir?3)为什么不能直接在输出目录下创建零件文件?coalesce(1)并且repartition(1)不能是选项,因为 outputDir 文件本身将在附近500GBSpark 2.0.2. 2.1.3 和 Java 8, no HDFS
查看完整描述

3 回答

?
凤凰求蛊

TA贡献1825条经验 获得超4个赞

要使用基于文件系统的源正确写入(或读取)数据,您需要一个共享存储。

_temporary目录是 Spark 使用的基本提交机制的一部分 - 数据首先写入临时目录,一旦所有任务完成,原子移动到最终目的地。您可以在Spark _temporary 创建原因中阅读有关此过程的更多信息

要使此过程成功,您需要共享文件系统(HDFS、NFS 等)或等效的分布式存储(如 S3)。由于您没有,因此预计无法清除临时状态 - 将数据帧保存到本地文件系统会导致结果为空。

当某些执行程序与驱动程序位于同一位置并与驱动程序共享文件系统时,可能会发生您观察到的行为(数据部分提交和部分未提交),从而为数据子集启用完全提交。


查看完整回答
反对 回复 2021-08-19
?
慕运维8079593

TA贡献1876条经验 获得超5个赞

经过分析,观察到我的 spark 作业使用的fileoutputcommitter version 1是默认值。然后我包含要使用的配置,fileoutputcommitter version 2而不是version 1在 AWS 中的 10 节点 spark 独立集群中进行测试。所有part-* files都直接在outputDirPath指定的下生成dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)

我们可以设置属性

  1. 通过包括同--conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'spark-submit command

  2. 或使用 sparkContext 设置属性 javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")

我了解spark docs 中概述的失败情况下的后果,但我达到了预期的结果!

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version , defaultValue is 1
文件输出提交者算法版本,有效算法版本号:1 或 2。版本 2 可能有更好的性能,但版本 1 在某些情况下可能会更好地处理故障,如根据 MAPREDUCE-4815。


查看完整回答
反对 回复 2021-08-19
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

多个部分文件基于您的数据帧分区。写入的文件或数据的数量取决于您写出数据时 DataFrame 的分区数量。默认情况下,每个数据分区写入一个文件。


您可以通过使用合并或重新分区来控制它。您可以减少或增加分区。


如果将合并为 1,则不会在其中看到多个部分文件,但这会影响并行写入数据。


[outputDirPath = /tmp/multiple.csv]


dataframe

 .coalesce(1)

 .write.option("header","false")

 .mode(SaveMode.Overwrite)

 .csv(outputDirPath);

关于如何引用它的问题..


请参阅/tmp/multiple.csv以下所有部分。


/tmp/multiple.csv/part-00000.csv

/tmp/multiple.csv/part-00001.csv

/tmp/multiple.csv/part-00002.csv

/tmp/multiple.csv/part-00003.csv


查看完整回答
反对 回复 2021-08-19
  • 3 回答
  • 0 关注
  • 346 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信