为了账号安全,请及时绑定邮箱和手机立即绑定

Spark - 将CSV文件加载为DataFrame?

Spark - 将CSV文件加载为DataFrame?

Spark - 将CSV文件加载为DataFrame?我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")我试过了:scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")我得到的错误:java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]     at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)     at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)     at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)     at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)     at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)     at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)     at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)     at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)     at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)     at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)     at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)     at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?
查看完整描述

3 回答

?
PIPIONE

TA贡献1829条经验 获得超9个赞

spark-csv是Spark核心功能的一部分,不需要单独的库。所以你可以这样做

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

在scala中,(这适用于任何格式的分隔符提及“,”用于csv,“\ t”用于tsv等) val df = sqlContext.read.format("com.databricks.spark.csv")    .option("delimiter", ",")    .load("csvfile.csv")


查看完整回答
反对 回复 2019-08-06
?
慕仙森

TA贡献1827条经验 获得超8个赞

它的Hadoop是2.6,Spark是1.6,没有“databricks”包。

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};import org.apache.spark.sql.Row;val csv = sc.textFile("/path/to/file.csv")val rows = csv.map(line => line.split(",").map(_.trim))val header = rows.firstval data = rows.filter(_(0) != header(0))val rdd = data.map(row => Row(row(0),row(1).toInt))val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))val df = sqlContext.createDataFrame(rdd, schema)


查看完整回答
反对 回复 2019-08-06
  • 3 回答
  • 0 关注
  • 1649 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信