3 回答
TA贡献1772条经验 获得超5个赞
foldApache Spark中的内容与fold未分发的集合中的内容不同。实际上,它需要交换函数才能产生确定性的结果:
这与以Scala之类的功能语言为非分布式集合实现的折叠操作有些不同。该折叠操作可以单独应用于分区,然后将那些结果折叠为最终结果,而不是以某些定义的顺序将折叠应用于每个元素。对于非交换函数,结果可能与应用于非分布式集合的折叠结果不同。
Mishael Rosenthal 已证明了这一点,Make42在其评论中建议了这一点。
有人建议观察到的行为与HashPartitioner何时parallelize不洗牌和不使用有关HashPartitioner。
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
解释:
foldRDD的结构
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
与RDD的结构reduce相同:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
在runJob不考虑分区顺序的情况下执行,导致需要交换功能。
foldPartition并且reducePartition在处理顺序上有效,reduceLeft并且foldLeft在上有效执行(通过继承和委派)TraversableOnce。
结论:foldRDD不能依赖于块的顺序,而是需要可交换性和关联性。
- 3 回答
- 0 关注
- 1014 浏览
添加回答
举报