我有以下问题:假设我有一个包含压缩目录的目录,该压缩目录包含多个文件,存储在HDFS上。我想创建一个包含一些T类型对象的RDD,即:context = new JavaSparkContext(conf);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> { // The name of the file String fileName = fileNameContent._1(); // The content of the file String content = fileNameContent._2(); // Class T has a constructor of taking the filename and the content of each // processed file (as two strings) T t = new T(content, fileName); return t;});现在,当inputDataPath目录包含文件时,可以很好地工作,例如,当它类似于:String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders但是,当一个tgz包含多个文件时,文件内容(fileNameContent._2())为我提供了一些无用的二进制字符串(相当不错)。我在SO上发现了类似的问题,但是情况不一样,因为解决方案是每次压缩仅包含一个文件,而在我的情况下,还有许多其他文件需要单独读取为整个文件。我还发现了有关的问题wholeTextFiles,但这在我的情况下不起作用。任何想法如何做到这一点?编辑:我试图从读者在这里(试图从测试的读者在这里,就像在功能testTarballWithFolders()),但每当我打电话TarballReader tarballReader = new TarballReader(fileName);我得到NullPointerException:java.lang.NullPointerException at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) at utils.TarballReader.<init>(TarballReader.java:61) at main.SparkMain.lambda$0(SparkMain.java:105) at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727)第105行MainSpark是我在帖子编辑中显示的上方的行,而第61行TarballReader是GZIPInputStream gzip = new GZIPInputStream(in);in上面一行输入流的值为空:InputStream in = this.getClass().getResourceAsStream(tarball);我在正确的道路上吗?如果是这样,我如何继续?为什么我得到这个空值,我该如何解决?
2 回答

守着一只汪
TA贡献1872条经验 获得超3个赞
可接受答案的一个小改进是更改
Option(tar.getNextTarEntry)
至
Try(tar.getNextTarEntry).toOption.filter( _ != null)
以.tar.gz健壮的方式应对格式错误/截断的。
顺便说一句,缓冲区数组的大小有什么特别之处吗?如果接近平均文件大小(在我的情况下可能是500k),平均速度会更快吗?我猜是Stream相对于whileJava式的循环而言,我看到的是速度下降还是更可能是相对而言的开销。
添加回答
举报
0/150
提交
取消