2 回答
TA贡献1871条经验 获得超13个赞
正如你所推测的那样,最简单的方法是使用DoFn将PCollection作为主要和侧面输入进行处理。
如果由于 PCollection 太大而无法放入内存而无法做到这一点,则可以将其划分为 N 个不相交的 PCollections,将其传递到每个 PCollections 中,然后将结果平展。例如,你可以写一些类似的东西
class CrossProduct(beam.PTransform):
def expand(self, pcoll):
N = 10
parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)
cross_parts = [
pcoll | str(ix) >> beam.FlatMap(
lambda x, side: [(x, s) for s in side],
beam.pvalue.AsIter(part))
for ix, part in enumerate(parts)]
return cross_parts | beam.Flatten()
output = input | CrossProduct()
但是,请注意,除非 PCollection 的元素特别大,否则如果 PCollection 无法放入内存,则其交叉积的生成(和处理)可能非常大。
TA贡献1794条经验 获得超7个赞
我将提出一个使用Python的解决方案。
首先,让我们实现算法,然后解决内存限制的问题。
import itertools
# Let's build a list with your pairs
collection_items = [("foo", 0), ("bar", 1), ("baz", 2)]
"""
A Python generator is a function that produces a sequence of results.
It works by maintaining its local state, so that the function can resume again exactly where
it left off when called subsequent times. Same generator can't be used twice.
I will explain a little later why I use generators
"""
collection_generator1 = (el for el in collection_items) # Create the first generator
# For example; calling next(collection_generator1) => ("foo", 0); next(collection_generator1) => ("bar", 1),
# next(collection_generator1) => ("bar": 2)
collection_generator2 = (el for el in collection_items) # Create the second generator
cartesian_product = itertools.product(collection_generator1, collection_generator2) # Create the cartesian product
for pair in cartesian_product:
first_el, second_el = pair
str_pair1, val_pair1 = first_el
str_pair2, val_pair2 = first_el
name = "{str_pair1}+{str_pair2}".format(str_pair1=str_pair1, str_pair2=str_pair2)
item = (name, [first_el, second_el]) # Compose the item
print(item)
# OUTPUT
('foo+foo', [('foo', 0), ('foo', 0)])
('foo+foo', [('foo', 0), ('bar', 1)])
('foo+foo', [('foo', 0), ('baz', 2)])
('bar+bar', [('bar', 1), ('foo', 0)])
('bar+bar', [('bar', 1), ('bar', 1)])
('bar+bar', [('bar', 1), ('baz', 2)])
('baz+baz', [('baz', 2), ('foo', 0)])
('baz+baz', [('baz', 2), ('bar', 1)])
('baz+baz', [('baz', 2), ('baz', 2)])
现在让我们解决内存问题
由于您有很多数据,因此最好将它们存储在文件中,在每行上写入一对(如示例中所示),现在让我们读取文件(“输入.txt”)并创建一个包含其数据的生成器。
file_generator_1 = (line.strip() for line in open("input.txt"))
file_generator_2 = (line.strip() for line in open("input.txt").readlines())
现在,您唯一需要做的修改是替换变量名称collection_generator1,collection_generator2 file_generator_1,file_generator_2
添加回答
举报