3 回答
TA贡献2016条经验 获得超9个赞
filter
def even(x): return x % 2 == 0def odd(x): return not even(x)rdd = sc.parallelize(range(20))rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
kv_rdd = rdd.map(lambda x: (x, odd(x)))kv_rdd.cache()rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
星火转换是懒惰的,直到您执行一个操作,您的rdd才会成为现实。 这有什么关系?回到我的例子: rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果以后我决定我只需要 rdd_odd
那么就没有理由去实现 rdd_even
.如果您查看一下要计算的SAS示例 work.split2
您需要同时实现输入数据和 work.split1
.RDDs提供了一个声明性API。当你使用 filter
或 map
这完全取决于火花引擎如何执行这一操作。只要传递给转换的函数是无副作用的,它就会为优化整个管道创造多种可能性。
randomSplit
partitionBy
DataFrameWriter
def makePairs(row: T): (String, String) = ??? data .map(makePairs).toDF("key", "value") .write.partitionBy($"key").format("text").save(...)
RDD[T]=>RDD[T] RDD[T]=>RDD[U] (RDD[T],RDD[U])=>RDD[W]
TA贡献1893条经验 获得超10个赞
Partitioner
RangePartitioner
.
val filtered = partitioned.mapPartitions { iter => { new Iterator[Int](){ override def hasNext: Boolean = { if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) { false } else { iter.hasNext } } override def next():Int = iter.next() }
添加回答
举报