下面我提供我的代码。我遍历DataFrame,prodRows并为每个product_PK找到了匹配的product_PKs子列表prodRows。 numRecProducts = 10 var listOfProducts: Map[Long,Array[(Long, Int)]] = Map() prodRows.foreach{ row : Row => val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong val gender = row.get(row.fieldIndex("gender_PK")).toString val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK") var productList: Array[(Long, Int)] = Array() if (!selection.rdd.isEmpty()) { productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect() } listOfProducts = listOfProducts + (product_PK -> productList) }但是当我执行它时,它给了我以下错误。selection在某些迭代中看起来像是空的。但是,我不明白如何处理此错误:Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)这是什么意思,我该如何处理?
添加回答
举报
0/150
提交
取消