为了账号安全,请及时绑定邮箱和手机立即绑定

Pyspark:在 python 中将所有压缩的 csv 合并为一个 csv

Pyspark:在 python 中将所有压缩的 csv 合并为一个 csv

Smart猫小萌 2021-11-16 10:38:24
如果我有压缩 csvs 形式的大量数据,我如何将它组合成一个 csv 文件(压缩输出与否无关紧要)?我正在将它读入 spark Dataframes,但后来我陷入了如何连接 pyspark Dataframes 的问题。下面是我运行循环的代码,并希望为每次循环运行附加数据帧:        schema=StructType([])        result = spark.createDataFrame(sc.emptyRDD(), schema)        for day in range(1,31):            day_str = str(day) if day>=10 else "0"+str(day)            print 'Ingesting %s' % day_str            df = spark.read.format("csv").option("header", "false").option("delimiter", "|").option("inferSchema", "true").load("s3a://key/201811%s" % (day_str))            result = result.unionAll(df)        result.write.save("s3a://key/my_result.csv", format='csv')这给了我错误AnalysisException: u"Union can only be performed on tables with the same number of columns, but the first table has 0 columns and the second table has 1 columns;;\n'Union\n:- LogicalRDD\n+- Relation[_c0#75] csv\n"。任何人都可以帮助我如何继续?
查看完整描述

1 回答

?
阿晨1998

TA贡献2037条经验 获得超6个赞

这对我有用:


result=spark.createDataFrame(sc.emptyRDD(), schema_mw)


for day in range(1,31):

    day_str = str(day) if day>=10 else "0"+str(day)

    print 'Ingesting %s' % day_str


    df = spark.read.format("csv").option("header", "false").option("delimiter", ",").schema(schema_mw).load("s3a://bucket/201811%s" % (day_str))


    if result:

        result = result.union(df)

    else:

        result = df

result.repartition(1).write.save("s3a://bucket/key-Compiled", format='csv', header=False)

但是,当我尝试在重新分区的最后一步中将标头加载为 true 时,这有效,标头存储为一行。我不确定如何将这些标题添加为标题而不是一行。


查看完整回答
反对 回复 2021-11-16
  • 1 回答
  • 0 关注
  • 271 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信