1 回答
TA贡献1804条经验 获得超7个赞
您可以按照以下方式进行操作 - 下面是 100% 有效的示例。请注意,在生产中使用 Celery 来执行此类任务,或者自己编写另一个守护程序应用程序(另一个进程),并借助消息队列(例如 RabbitMQ)或公共数据库的帮助从 http 服务器向其提供任务。
如果对下面的代码有任何疑问,请随时提问,这对我来说是很好的练习:
from flask import Flask, current_app
import threading
from threading import Thread, Event
import time
from random import randint
app = Flask(__name__)
# use the dict to store events to stop other treads
# one event per thread !
app.config["ThreadWorkerActive"] = dict()
def do_work(e: Event):
"""function just for another one thread to do some work"""
while True:
if e.is_set():
break # can be stopped from another trhead
print(f"{threading.current_thread().getName()} working now ...")
time.sleep(2)
print(f"{threading.current_thread().getName()} was stoped ...")
@app.route("/long_thread", methods=["GET"])
def long_thread_task():
"""Allows to start a new thread"""
th_name = f"Th-{randint(100000, 999999)}" # not really unique actually
stop_event = Event() # is used to stop another thread
th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)
th.start()
current_app.config["ThreadWorkerActive"][th_name] = stop_event
return f"{th_name} was created!"
@app.route("/stop_thread/<th_id>", methods=["GET"])
def stop_thread_task(th_id):
th_name = f"Th-{th_id}"
if th_name in current_app.config["ThreadWorkerActive"].keys():
e = current_app.config["ThreadWorkerActive"].get(th_name)
if e:
e.set()
current_app.config["ThreadWorkerActive"].pop(th_name)
return f"Th-{th_id} was asked to stop"
else:
return "Sorry something went wrong..."
else:
return f"Th-{th_id} not found"
@app.route("/", methods=["GET"])
def index_route():
text = ("/long_thread - create another thread. "
"/stop_thread/th_id - stop thread with a certain id. "
f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")
return text
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9999)
添加回答
举报