为了账号安全,请及时绑定邮箱和手机立即绑定

从Spark压缩中读取整个文本文件

从Spark压缩中读取整个文本文件

冉冉说 2019-11-19 14:51:48
我有以下问题:假设我有一个包含压缩目录的目录,该压缩目录包含多个文件,存储在HDFS上。我想创建一个包含一些T类型对象的RDD,即:context = new JavaSparkContext(conf);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {    // The name of the file    String fileName = fileNameContent._1();    // The content of the file    String content = fileNameContent._2();    // Class T has a constructor of taking the filename and the content of each    // processed file (as two strings)    T t = new T(content, fileName);    return t;});现在,当inputDataPath目录包含文件时,可以很好地工作,例如,当它类似于:String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders但是,当一个tgz包含多个文件时,文件内容(fileNameContent._2())为我提供了一些无用的二进制字符串(相当不错)。我在SO上发现了类似的问题,但是情况不一样,因为解决方案是每次压缩仅包含一个文件,而在我的情况下,还有许多其他文件需要单独读取为整个文件。我还发现了有关的问题wholeTextFiles,但这在我的情况下不起作用。任何想法如何做到这一点?编辑:我试图从读者在这里(试图从测试的读者在这里,就像在功能testTarballWithFolders()),但每当我打电话TarballReader tarballReader = new TarballReader(fileName);我得到NullPointerException:java.lang.NullPointerException    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)    at utils.TarballReader.<init>(TarballReader.java:61)    at main.SparkMain.lambda$0(SparkMain.java:105)    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)第105行MainSpark是我在帖子编辑中显示的上方的行,而第61行TarballReader是GZIPInputStream gzip = new GZIPInputStream(in);in上面一行输入流的值为空:InputStream in = this.getClass().getResourceAsStream(tarball);我在正确的道路上吗?如果是这样,我如何继续?为什么我得到这个空值,我该如何解决?
查看完整描述

2 回答

?
守着一只汪

TA贡献1872条经验 获得超3个赞

可接受答案的一个小改进是更改


Option(tar.getNextTarEntry)



Try(tar.getNextTarEntry).toOption.filter( _ != null)


以.tar.gz健壮的方式应对格式错误/截断的。


顺便说一句,缓冲区数组的大小有什么特别之处吗?如果接近平均文件大小(在我的情况下可能是500k),平均速度会更快吗?我猜是Stream相对于whileJava式的循环而言,我看到的是速度下降还是更可能是相对而言的开销。


查看完整回答
反对 回复 2019-11-19
  • 2 回答
  • 0 关注
  • 699 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信