为了账号安全,请及时绑定邮箱和手机立即绑定

PCollection 与自身的笛卡尔积

PCollection 与自身的笛卡尔积

慕神8447489 2022-08-02 17:19:41
假设我有一个类型的有界 PCollection p。假设 p 不能适合内存,因此不能是 的侧输入。KV<String, Integer>DoFn示例 p:("foo", 0)("bar", 1)("baz", 2)我怎么能取p和自身的笛卡尔积?例如,可能如下所示:p x p("foo+foo", [("foo", 0), ("foo", 0)])("foo+bar", [("foo", 0), ("bar", 1)])("foo+baz", [("foo", 0), ("baz", 2)])("bar+foo", [("bar", 1), ("foo", 0)])("bar+bar", [("bar", 1), ("bar", 1)])("bar+baz", [("bar", 1), ("baz", 2)])("baz+foo", [("baz", 2), ("foo", 0)])("baz+bar", [("baz", 2), ("bar", 1)])("baz+baz", [("baz", 2), ("baz", 2)])
查看完整描述

2 回答

?
慕桂英4014372

TA贡献1871条经验 获得超13个赞

正如你所推测的那样,最简单的方法是使用DoFn将PCollection作为主要和侧面输入进行处理。


如果由于 PCollection 太大而无法放入内存而无法做到这一点,则可以将其划分为 N 个不相交的 PCollections,将其传递到每个 PCollections 中,然后将结果平展。例如,你可以写一些类似的东西


class CrossProduct(beam.PTransform):

  def expand(self, pcoll):

    N = 10

    parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)

    cross_parts = [

        pcoll | str(ix) >> beam.FlatMap(

            lambda x, side: [(x, s) for s in side],

            beam.pvalue.AsIter(part))

        for ix, part in enumerate(parts)]

    return cross_parts | beam.Flatten()


output = input | CrossProduct()

但是,请注意,除非 PCollection 的元素特别大,否则如果 PCollection 无法放入内存,则其交叉积的生成(和处理)可能非常大。


查看完整回答
反对 回复 2022-08-02
?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

我将提出一个使用Python的解决方案。


首先,让我们实现算法,然后解决内存限制的问题。


import itertools


# Let's build a list with your pairs

collection_items = [("foo", 0), ("bar", 1), ("baz", 2)]


"""

A Python generator is a function that produces a sequence of results. 

It works by maintaining its local state, so that the function can resume again exactly where 

it left off when called subsequent times. Same generator can't be used twice.

I will explain a little later why I use generators

"""


collection_generator1 = (el for el in collection_items)  # Create the first generator

# For example; calling next(collection_generator1) => ("foo", 0); next(collection_generator1) => ("bar", 1),

# next(collection_generator1) => ("bar": 2)

collection_generator2 = (el for el in collection_items) # Create the second generator

cartesian_product = itertools.product(collection_generator1, collection_generator2) # Create the cartesian product


for pair in cartesian_product:

    first_el, second_el = pair

    str_pair1, val_pair1 = first_el

    str_pair2, val_pair2 = first_el


    name = "{str_pair1}+{str_pair2}".format(str_pair1=str_pair1, str_pair2=str_pair2)

    item = (name, [first_el, second_el]) # Compose the item

    print(item)


# OUTPUT


('foo+foo', [('foo', 0), ('foo', 0)])

('foo+foo', [('foo', 0), ('bar', 1)])

('foo+foo', [('foo', 0), ('baz', 2)])

('bar+bar', [('bar', 1), ('foo', 0)])

('bar+bar', [('bar', 1), ('bar', 1)])

('bar+bar', [('bar', 1), ('baz', 2)])

('baz+baz', [('baz', 2), ('foo', 0)])

('baz+baz', [('baz', 2), ('bar', 1)])

('baz+baz', [('baz', 2), ('baz', 2)])

现在让我们解决内存问题


由于您有很多数据,因此最好将它们存储在文件中,在每行上写入一对(如示例中所示),现在让我们读取文件(“输入.txt”)并创建一个包含其数据的生成器。


file_generator_1 = (line.strip() for line in open("input.txt"))

file_generator_2 = (line.strip() for line in open("input.txt").readlines())

现在,您唯一需要做的修改是替换变量名称collection_generator1,collection_generator2 file_generator_1,file_generator_2


查看完整回答
反对 回复 2022-08-02
  • 2 回答
  • 0 关注
  • 132 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信