为了账号安全,请及时绑定邮箱和手机立即绑定

用火花-csv编写单个csv文件

用火花-csv编写单个csv文件

用火花-csv编写单个csv文件我在用https://github.com/databricks/spark-csv,我试图写一个CSV,但不能,它是一个文件夹。需要一个Scala函数,它将接受像路径和文件名这样的参数,并编写那个CSV文件。
查看完整描述

3 回答

?
慕森王

TA贡献1777条经验 获得超3个赞

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果需要一个输出文件(仍在文件夹中),则可以repartition(如果上游数据很大,但需要洗牌,则首选):

df   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

coalesce:

df   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

保存前的数据帧:

所有数据将写入mydata.csv/part-00000..在使用此选项之前确保您了解正在发生的事情,以及将所有数据传输给单个员工的成本。..如果使用带有复制的分布式文件系统,数据将被多次传输-首先获取到单个工作人员,然后通过存储节点分发。

或者,您可以保留代码的原样,并使用通用工具,如catHDFSgetmerge然后简单地合并所有的部分。


查看完整回答
反对 回复 2019-07-11
?
HUWWW

TA贡献1874条经验 获得超12个赞

如果您正在使用HDFS运行SPark,我一直在通过正常编写CSV文件和利用HDFS进行合并来解决这个问题。我是在星火(1.6)直接这样做的:

import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs._def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output}val newData =
    << create your dataframe >>val outputfile = "/user/feeds/project/outputs/subject"  
    var filename = "myinsights"var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filenamevar mergeFindGlob  = outputFileName

    newData.write        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

我不记得我是从哪里学到这个把戏的,但它可能对你有用。


查看完整回答
反对 回复 2019-07-11
?
慕妹3242003

TA贡献1824条经验 获得超6个赞

我在这里可能有点晚了,但是.coalesce(1)repartition(1)可能适用于小数据集,但大型数据集都将被抛到一个节点上的一个分区中。这可能会抛出OOM错误,或者充其量只能缓慢地处理。

我强烈建议你使用FileUtil.copyMerge()函数来自HadoopAPI。这将把输出合并到一个文件中。

编辑-这有效地将数据带给驱动程序,而不是执行者节点。Coalesce()如果单个执行器具有比驱动程序更多的RAM,就可以了。

编辑2:copyMerge()在Hadoop3.0中被删除。有关如何使用最新版本的更多信息,请参见下面的堆栈溢出文章:Hadoop如何在Hadoop3.0中实现CopyMerge


查看完整回答
反对 回复 2019-07-11
  • 3 回答
  • 0 关注
  • 580 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信