1 回答
TA贡献1815条经验 获得超6个赞
我不确切知道该方法如何binaryFiles跨 Spark 分区对文件进行分区。似乎与此相反textFiles,它倾向于只创建一个分区。让我们看看一个名为 的示例目录dir,其中包含 5 个文件。
> ls dir
test1 test2 test3 test4 test5
如果我使用textFile,事情就会并行运行。我不提供输出,因为它不是很漂亮,但你可以自己检查。我们可以验证事物是否与 并行运行getNumPartitions。
>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))
# ugly output, but everything starts at the same time,
# except maybe the last one since you have 4 cores.
>>> sc.textFile("dir").getNumPartitions()
5
由于binaryFiles情况不同,并且由于某种原因,所有内容都进入同一个分区。
>>> sc.binaryFiles("dir").getNumPartitions()
1
我什至尝试使用 10k 个文件,所有内容仍然位于同一分区。我相信这背后的原因是,在scala中,binaryFiles返回一个带有文件名的RDD和一个允许读取文件的对象(但不执行读取)。因此速度很快,并且生成的 RDD 很小。因此,将其放在一个分区上就可以了。在 scala 中,我们可以在使用后使用重新分区binaryFiles,一切都会很好。
scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
.foreach{ case (name, ds) => {
println(System.currentTimeMillis+": "+name)
Thread.sleep(2000)
// do some reading on the DataStream ds
}}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2
python 中的问题是binaryFiles实际上将文件读取到一个分区上。另外,这对我来说非常神秘,但是 pyspark 2.4 中的以下代码行会产生与您注意到的相同的行为,这是没有意义的。
# this should work but does not
sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
# this does not work either, which is strange but it would not be advised anyway
# since all the data would be read on one partition
sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))
然而,由于binaryFiles实际读取文件,您可以使用wholeTextFile它将文件作为文本文件读取并按预期运行:
# this works
sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
添加回答
举报