Spark - 将CSV文件加载为DataFrame?我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")我试过了:scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")我得到的错误:java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?
3 回答
PIPIONE
TA贡献1829条经验 获得超9个赞
spark-csv是Spark核心功能的一部分,不需要单独的库。所以你可以这样做
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
在scala中,(这适用于任何格式的分隔符提及“,”用于csv,“\ t”用于tsv等) val df = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter", ",")
.load("csvfile.csv")
慕仙森
TA贡献1827条经验 获得超8个赞
它的Hadoop是2.6,Spark是1.6,没有“databricks”包。
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};import org.apache.spark.sql.Row;val csv = sc.textFile("/path/to/file.csv")val rows = csv.map(line => line.split(",").map(_.trim))val header = rows.firstval data = rows.filter(_(0) != header(0))val rdd = data.map(row => Row(row(0),row(1).toInt))val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true))val df = sqlContext.createDataFrame(rdd, schema)
添加回答
举报
0/150
提交
取消