我正在挖掘我前段时间编写的一些代码,它具有以下结构。我试图在需要的地方添加解释性评论。# Create a reader and a writer processreader_proc = Process(target=self.reader)reader_proc.start()writer_proc = Process(target=self.writer, args=(pfin,))writer_proc.start()# start a pool of workerswith Pool(n_workers, maxtasksperchild=max_tasks_per_child) as pool: # a list to keep track of workers worker_jobs = [] # a list to keep track of return values return_vals = [] # get input chunks from the reader # reader writes input chunks to a work_q (queue) while True: work = work_q.get() if work == 'done': break # process_chunk is a function that ... processes the given chunk # this function will do some computations and write those to a results_q (queue) # which the writer will then write to a file # the function also returns another type of value that is processed below job = pool.apply_async(process_chunk, (work,)) worker_jobs.append(job) print('Done reading chunks!') # reader is done reading reader_proc.join() reader_proc.terminate() # When a worker has finished its job, get its information back for job_idx, job in enumerate(worker_jobs, 1): print(f"Processing job {job_idx}") res1, res2 = job.get() return_vals.append((res1, res2)) # process results in main process process_results(return_vals) # Notify the writer that we're done results_q.put('done')Tl; dr 一个池处理来自队列的块apply_async。队列用完后,我们.get()将结果返回并处理它们。我不确定作业在应用到池时是否立即执行,还是等到.get()被调用?这很重要,因为如果他们等待执行直到队列耗尽,那么对于长队列来说可能需要很长时间。另一方面,如果它们不等待并立即执行,那么这些函数的结果存储在哪里?由于我们正在等待直到.get()获取结果,这是否意味着子进程被阻塞直到.get()被调用?我问的原因是因为第一个打印语句(读取完成)和后续语句(处理作业 x)之间有很长的延迟,我不知道为什么。
1 回答
吃鸡游戏
TA贡献1829条经验 获得超7个赞
一旦工作人员空闲,任务就会执行。得到结果或根本没有得到结果对此没有影响。
您的工作人员结果存储在AsyncResult
对象中,在您的情况下job
是其中之一并且worker_jobs
拥有它们。然后你做正确的事情并遍历你的结果对象并得到结果。
池在内部存储结果,直到您获得它们 - 即使您根本没有获得结果,它也不会阻止工作人员 - 在许多并行处理的情况下,如果工作人员,您甚至可能对来自工作人员的“结果”不感兴趣只是根据输入执行特定任务。一旦工作人员完成并将结果(或异常!)存储在此对象中,就可以自由地从池中接受另一个作业。
这也意味着您必须在关闭池之前获得结果 - 就像现在一样。如果您将“处理作业”循环移到with Pool...
结构之外,那么当您尝试获取结果时,结果就会丢失。
有关可用的对象 方法,请参阅https://docs.python.org/3.4/library/multiprocessing.html?highlight=process 。AsyncResult
如果您的工作人员提出一个异常,该AsyncResult
对象也会存储一个异常。当工作人员遇到异常时不会立即触发它,而是存储在那里并在您 get() 结果时引发。如果您的工作人员可以引发异常,您应该为您的 get 循环而不是工作人员构建异常处理。
添加回答
举报
0/150
提交
取消