为了账号安全,请及时绑定邮箱和手机立即绑定

使用 SPARK 从 zip 到 seq

使用 SPARK 从 zip 到 seq

慕娘9325324 2021-09-12 14:35:52
我每天都会收到一个 zip 存档“2018-06-26.zip”,大小约为 . 250 Mb 压缩,包含 165-170.000 个小的 XML 文件 (Kb's)。我将 zip-archive 加载到 HDFS(避免小文件问题),并使用 SPARK 从 zip 中提取它们(zip 不可拆分),制作一个 Paired RDD,以文件名为键,内容为值并保存通过成对的 RDD 将它们作为序列文件。使用一个仅包含 3 个用于测试目的的 XML 文件的小型 zip 存档,一切都运行顺畅,但是当我将其提供给大存档时,我得到了   java.lang.OutOfMemoryError: GC overhead limit exceeded   at java.util.Arrays.copyOf(Arrays.java:2367)   at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)   ...   ...我在 Cloudera Quickstart VM 上运行:CDH 5.13.3(HDFS:2.60,JDK:1.7.0.67,SPARK:1.6.0,Scala 2.10)我还没有在成熟的集群上运行它,因为我想在部署它之前确保我的代码是正确的......垃圾收集器在超出开销限制的情况下继续运行 OOM。我知道要增加驱动程序和 Java 堆空间的内存量,但我怀疑我的方法占用了太多内存.... 监视内存使用情况,虽然没有发现任何内存泄漏....这是代码:import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport java.util.zip.{ZipEntry, ZipInputStream}import org.apache.spark.input.PortableDataStreamimport scala.collection.mutableval conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)var xml_map = new mutable.HashMap[String, String]()sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect   .foreach { zip_file : (String, PortableDataStream) =>    val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)    var zip_entry : ZipEntry = null    while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {      if (!zip_entry.isDirectory) {        val key_file_name = zip_entry.getName        val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")        xml_map += ( key_file_name -> value_file_content )      }      zip_stream.closeEntry()    }    zip_stream.close()  }val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")任何帮助或想法都受到高度赞赏。
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 151 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信