为了账号安全,请及时绑定邮箱和手机立即绑定

在Python多处理中将Pool.map与共享内存数组结合

在Python多处理中将Pool.map与共享内存数组结合

aluckdog 2019-10-17 10:22:58
我有一个很大的(只读)数据数组,我想由多个进程并行处理。我喜欢Pool.map函数,并希望使用它来并行计算该数据上的函数。我看到可以使用Value或Array类在进程之间使用共享内存数据。但是,当我尝试使用它时,我得到一个RuntimeError:'使用Pool.map函数时,应该仅通过继承在进程之间共享SynchronizedString对象:这是我正在尝试做的一个简化示例:from sys import stdinfrom multiprocessing import Pool, Arraydef count_it( arr, key ):  count = 0  for c in arr:    if c == key:      count += 1  return countif __name__ == '__main__':  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"  # want to share it using shared memory  toShare = Array('c', testData)  # this works  print count_it( toShare, "a" )  pool = Pool()  # RuntimeError here  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )谁能告诉我我在做什么错?因此,我想做的是在进程池中创建新的共享内存分配数组后,将信息传递给这些进程。
查看完整描述

3 回答

?
千万里不及你

TA贡献1784条经验 获得超9个赞

当我刚看到赏金的时候再试一次;)


基本上,我认为错误消息的含义是它的意思-多处理共享内存数组不能作为参数传递(通过酸洗)。序列化数据没有意义-关键是数据是共享内存。因此,您必须使共享数组成为全局数组。我认为像我的第一个答案一样,将其作为模块的属性比较整洁,但在示例中将其保留为全局变量也可以很好地工作。考虑到您不想在fork之前设置数据的观点,这是一个修改后的示例。如果您希望拥有多个共享数组(这就是为什么要将toShare作为参数传递的原因),则可以类似地创建共享数组的全局列表,然后将索引传递给count_it(将变为for c in toShare[i]:)。


from sys import stdin

from multiprocessing import Pool, Array, Process


def count_it( key ):

  count = 0

  for c in toShare:

    if c == key:

      count += 1

  return count


if __name__ == '__main__':

  # allocate shared array - want lock=False in this case since we 

  # aren't writing to it and want to allow multiple processes to access

  # at the same time - I think with lock=True there would be little or 

  # no speedup

  maxLength = 50

  toShare = Array('c', maxLength, lock=False)


  # fork

  pool = Pool()


  # can set data after fork

  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

  if len(testData) > maxLength:

      raise ValueError, "Shared array too small to hold data"

  toShare[:len(testData)] = testData


  print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:以上内容由于未使用fork而无法在Windows上运行。但是,下面的方法在Windows上仍然可以使用Pool,但仍然可以使用,因此我认为这与您想要的最接近:


from sys import stdin

from multiprocessing import Pool, Array, Process

import mymodule


def count_it( key ):

  count = 0

  for c in mymodule.toShare:

    if c == key:

      count += 1

  return count


def initProcess(share):

  mymodule.toShare = share


if __name__ == '__main__':

  # allocate shared array - want lock=False in this case since we 

  # aren't writing to it and want to allow multiple processes to access

  # at the same time - I think with lock=True there would be little or 

  # no speedup

  maxLength = 50

  toShare = Array('c', maxLength, lock=False)


  # fork

  pool = Pool(initializer=initProcess,initargs=(toShare,))


  # can set data after fork

  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

  if len(testData) > maxLength:

      raise ValueError, "Shared array too small to hold data"

  toShare[:len(testData)] = testData


  print pool.map( count_it, ["a", "b", "s", "d"] )

不知道为什么map不会腌制数组,而Process和Pool会腌制-我想也许它已经在Windows上的子进程初始化时转移了。请注意,尽管在派生之后仍然设置了数据。


查看完整回答
反对 回复 2019-10-17
?
蛊毒传说

TA贡献1895条经验 获得超3个赞

如果数据是只读的,只需在从Pool派生之前将其设置为模块中的变量即可。然后,所有子进程都应该能够访问它,并且只要您不对其进行写操作,就不会被复制。


import myglobals # anything (empty .py file)

myglobals.data = []


def count_it( key ):

    count = 0

    for c in myglobals.data:

        if c == key:

            count += 1

    return count


if __name__ == '__main__':

myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"


pool = Pool()

print pool.map( count_it, ["a", "b", "s", "d"] )

如果您确实想尝试使用Array,则可以尝试使用lock=False关键字参数(默认情况下为true)。


查看完整回答
反对 回复 2019-10-17
  • 3 回答
  • 0 关注
  • 1183 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信