1 回答
TA贡献1818条经验 获得超7个赞
您的第一个问题是您已经在MainThreadwith 调用中调用了您的函数:
pool.apply_async(test_post())
...而不是test_post作为在工作线程中执行的调用的参数传递:
pool.apply_async(test_post)
OP:我有一个非常好的使用线程的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程)......
您需要区分工作单元(作业、任务)和线程。首先使用池的重点是重用执行器,无论是线程还是进程。在实例化池时已经创建了工作线程,只要您不关闭池,所有初始线程都会保持活动状态。所以你不关心重新创建线程,你只需调用现有池的池方法,只要你有一些要分发的工作。Pool 接受这个作业(一个池方法调用)并从中创建任务。这些任务被放在一个无界队列中。每当工作人员完成任务时,它都会阻塞地尝试get()从这样的inqueue.
OP:Pool 只执行一个线程而不是 4 个......我尝试了不同的方法,但它仍然只执行一次。
pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
...是一个单一调用、单一任务的生产工作。如果您想要多次执行func,则必须pool.apply_async()多次调用,或者使用映射池方法,例如
pool.map(func, iterable, chunksize=None)
...,它将一个函数映射到一个可迭代对象上。pool.apply_async是非阻塞的,这就是为什么它是“异步的”。它立即返回一个AsyncResult您可以(阻塞地)调用.wait()或调用.get()的对象。
通过评论很明显,要没完没了并立即对已完成的任务(个体经营产生的输入流)的替代品...和程序应停止在一个KeyboardInterrupt或者当结果不具有一定的价值。
您可以使用 -callback参数在任何旧apply_async任务完成后立即安排新任务。困难在于在 MainThread 的同时如何防止整个脚本过早结束,同时保持它对 KeyboardInterrupt 的响应。让 MainThread 在循环中休眠使其仍然可以立即对 KeyboardInterrupt 做出反应,同时防止提前退出。如果结果应该停止程序,您可以让回调终止池。然后 MainThread 只需要在他的睡眠循环中包含对池状态的检查。
import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool
def test_post(post_id):
time.sleep(randint(1, 3))
status_code = choice([200] * 9 + [404])
return "{} {} Message no.{}: {}".format(
datetime.now(), current_thread().name, post_id, status_code
), status_code
def handle_result(result):
msg, code = result
print(msg)
if code != 200:
print("terminating")
pool.terminate()
else:
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
if __name__ == '__main__':
N_WORKERS = 4
post_cnt = count()
pool = ThreadPool(N_WORKERS)
# initial distribution
for _ in range(N_WORKERS):
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
try:
while pool._state == 0: # check if pool is still alive
time.sleep(1)
except KeyboardInterrupt:
print(" got interrupt")
带有键盘中断的示例输出:
$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt
由于不需要的返回值而终止的示例输出:
$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating
请注意,在您的场景中,您还可以apply_async更频繁地调用N_WORKERS-times 为您的初始分发提供一些缓冲区以减少延迟。
添加回答
举报