为了账号安全,请及时绑定邮箱和手机立即绑定

Pool 只执行一个线程而不是 4 个,我如何使它无限?

Pool 只执行一个线程而不是 4 个,我如何使它无限?

胡子哥哥 2021-11-09 15:22:16
所以我正在开发一个小的 Python 工具来对应用程序的 API 进行压力测试。我有一个使用线程的非常好的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程),以及这里的建议:如何开始旧线程结束后的新线程?就是使用ThreadPool,我尝试如下:def test_post():    print "Executing in " + threading.currentThread().getName() + "\n"    time.sleep(randint(1, 3))    return randint(1, 5), "Message"if args.send:    code, content = post()    print (code, "\n")    print (content)elif args.test:    # Create new threads    print threads    results_list = []    pool = ThreadPool(processes=threads)    results = pool.apply_async(test_post())    pool.close()  # Done adding tasks.    pool.join()  # Wait for all tasks to complete.    # results = list(pool.imap_unordered(    #     test_post(), ()    # ))    # thread_list = []    # while threading.activeCount() <= threads:    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)    #     thread.start()    #     thread_list.append(thread)    print "Exiting Main Thread" + "\n"else:    print ("cant get here!")当我调用脚本时,我得到一致的输出,例如:4在主线程中执行退出主线程我不知道为什么......正如你在注释掉的块中看到的那样,我尝试了不同的方法,但它仍然只执行一次。我的目标是让脚本循环运行,始终运行 n 个线程。在test_post(分别,post)函数返回的HTTP响应代码和内容-我想以后使用该打印/停止时响应代码是不是200 OK。
查看完整描述

1 回答

?
qq_笑_17

TA贡献1818条经验 获得超7个赞

您的第一个问题是您已经在MainThreadwith 调用中调用了您的函数:


pool.apply_async(test_post())

...而不是test_post作为在工作线程中执行的调用的参数传递:


pool.apply_async(test_post)

OP:我有一个非常好的使用线程的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程)......


您需要区分工作单元(作业、任务)和线程。首先使用池的重点是重用执行器,无论是线程还是进程。在实例化池时已经创建了工作线程,只要您不关闭池,所有初始线程都会保持活动状态。所以你不关心重新创建线程,你只需调用现有池的池方法,只要你有一些要分发的工作。Pool 接受这个作业(一个池方法调用)并从中创建任务。这些任务被放在一个无界队列中。每当工作人员完成任务时,它都会阻塞地尝试get()从这样的inqueue.


OP:Pool 只执行一个线程而不是 4 个......我尝试了不同的方法,但它仍然只执行一次。


pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

...是一个单一调用、单一任务的生产工作。如果您想要多次执行func,则必须pool.apply_async()多次调用,或者使用映射池方法,例如


pool.map(func, iterable, chunksize=None)

...,它将一个函数映射到一个可迭代对象上。pool.apply_async是非阻塞的,这就是为什么它是“异步的”。它立即返回一个AsyncResult您可以(阻塞地)调用.wait()或调用.get()的对象。


通过评论很明显,要没完没了并立即对已完成的任务(个体经营产生的输入流)的替代品...和程序应停止在一个KeyboardInterrupt或者当结果不具有一定的价值。


您可以使用 -callback参数在任何旧apply_async任务完成后立即安排新任务。困难在于在 MainThread 的同时如何防止整个脚本过早结束,同时保持它对 KeyboardInterrupt 的响应。让 MainThread 在循环中休眠使其仍然可以立即对 KeyboardInterrupt 做出反应,同时防止提前退出。如果结果应该停止程序,您可以让回调终止池。然后 MainThread 只需要在他的睡眠循环中包含对池状态的检查。


import time

from random import randint, choice

from itertools import count

from datetime import datetime

from threading import current_thread

from multiprocessing.pool import ThreadPool



def test_post(post_id):

    time.sleep(randint(1, 3))

    status_code = choice([200] * 9 + [404])

    return "{} {} Message no.{}: {}".format(

        datetime.now(), current_thread().name, post_id, status_code

    ), status_code



def handle_result(result):

    msg, code = result

    print(msg)

    if code != 200:

        print("terminating")

        pool.terminate()

    else:

        pool.apply_async(

            test_post, args=(next(post_cnt),), callback=handle_result

        )



if __name__ == '__main__':


    N_WORKERS = 4


    post_cnt = count()


    pool = ThreadPool(N_WORKERS)


    # initial distribution

    for _ in range(N_WORKERS):

        pool.apply_async(

            test_post, args=(next(post_cnt),), callback=handle_result

        )


    try:

        while pool._state == 0:  # check if pool is still alive

            time.sleep(1)

    except KeyboardInterrupt:

        print(" got interrupt")

带有键盘中断的示例输出:


$> python2 scratch.py

2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200

2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200

2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200

2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200

2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200

2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200

2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200

^C got interrupt

由于不需要的返回值而终止的示例输出:


$> python2 scratch.py

2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200

2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200

2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200

2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200

2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404

terminating

请注意,在您的场景中,您还可以apply_async更频繁地调用N_WORKERS-times 为您的初始分发提供一些缓冲区以减少延迟。


查看完整回答
反对 回复 2021-11-09
  • 1 回答
  • 0 关注
  • 201 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信