为了账号安全,请及时绑定邮箱和手机立即绑定

了解 Python 多处理 apply_aync 并获取

了解 Python 多处理 apply_aync 并获取

月关宝盒 2022-05-24 15:07:05
我正在挖掘我前段时间编写的一些代码,它具有以下结构。我试图在需要的地方添加解释性评论。# Create a reader and a writer processreader_proc = Process(target=self.reader)reader_proc.start()writer_proc = Process(target=self.writer, args=(pfin,))writer_proc.start()# start a pool of workerswith Pool(n_workers, maxtasksperchild=max_tasks_per_child) as pool:    # a list to keep track of workers    worker_jobs = []    # a list to keep track of return values    return_vals = []    # get input chunks from the reader    # reader writes input chunks to a work_q (queue)    while True:        work = work_q.get()        if work == 'done':            break        # process_chunk is a function that ... processes the given chunk        # this function will do some computations and write those to a results_q (queue)        # which the writer will then write to a file        # the function also returns another type of value that is processed below        job = pool.apply_async(process_chunk, (work,))        worker_jobs.append(job)    print('Done reading chunks!')    # reader is done reading    reader_proc.join()    reader_proc.terminate()    # When a worker has finished its job, get its information back    for job_idx, job in enumerate(worker_jobs, 1):        print(f"Processing job {job_idx}")        res1, res2 = job.get()        return_vals.append((res1, res2))    # process results in main process    process_results(return_vals)    # Notify the writer that we're done    results_q.put('done')Tl; dr 一个池处理来自队列的块apply_async。队列用完后,我们.get()将结果返回并处理它们。我不确定作业在应用到池时是否立即执行,还是等到.get()被调用?这很重要,因为如果他们等待执行直到队列耗尽,那么对于长队列来说可能需要很长时间。另一方面,如果它们不等待并立即执行,那么这些函数的结果存储在哪里?由于我们正在等待直到.get()获取结果,这是否意味着子进程被阻塞直到.get()被调用?我问的原因是因为第一个打印语句(读取完成)和后续语句(处理作业 x)之间有很长的延迟,我不知道为什么。
查看完整描述

1 回答

?
吃鸡游戏

TA贡献1829条经验 获得超7个赞

一旦工作人员空闲,任务就会执行。得到结果或根本没有得到结果对此没有影响。

您的工作人员结果存储在AsyncResult对象中,在您的情况下job是其中之一并且worker_jobs拥有它们。然后你做正确的事情并遍历你的结果对象并得到结果。

池在内部存储结果,直到您获得它们 - 即使您根本没有获得结果,它也不会阻止工作人员 - 在许多并行处理的情况下,如果工作人员,您甚至可能对来自工作人员的“结果”不感兴趣只是根据输入执行特定任务。一旦工作人员完成并将结果(或异常!)存储在此对象中,就可以自由地从池中接受另一个作业。

这也意味着您必须在关闭池之前获得结果 - 就像现在一样。如果您将“处理作业”循环移到with Pool...结构之外,那么当您尝试获取结果时,结果就会丢失。

有关可用的对象 方法,请参阅https://docs.python.org/3.4/library/multiprocessing.html?highlight=process 。AsyncResult

如果您的工作人员提出一个异常,该AsyncResult对象也会存储一个异常。当工作人员遇到异常时不会立即触发它,而是存储在那里并在您 get() 结果时引发。如果您的工作人员可以引发异常,您应该为您的 get 循环而不是工作人员构建异常处理。


查看完整回答
反对 回复 2022-05-24
  • 1 回答
  • 0 关注
  • 146 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信