为了账号安全,请及时绑定邮箱和手机立即绑定

如何让 Flask 网页(路由)在另一个网页(路由)上后台运行

如何让 Flask 网页(路由)在另一个网页(路由)上后台运行

梦里花落0921 2024-01-15 17:20:02
所以我正在创建这个应用程序,它的一部分是一个网页,其中交易算法正在使用实时数据进行自我测试。所有这些都有效,但问题是如果我离开(退出)网页,它就会停止。我想知道如何让它无限期地在后台运行,因为我希望算法继续做它的事情。这是我想在后台运行的路线。@app.route('/live-data-source')def live_data_source():    def get_live_data():        live_options = lo.Options()        while True:            live_options.run()            live_options.update_strategy()            trades = live_options.get_all_option_trades()            trades = trades[0]            json_data = json.dumps(                {'data': trades})            yield f"data:{json_data}\n\n"            time.sleep(5)    return Response(get_live_data(), mimetype='text/event-stream')我研究过多线程,但不太确定这是否适合这项工作。我对烧瓶还是个新手,所以这是一个可怜的问题。如果您需要更多信息,请发表评论。
查看完整描述

1 回答

?
三国纷争

TA贡献1804条经验 获得超7个赞

您可以按照以下方式进行操作 - 下面是 100% 有效的示例。请注意,在生产中使用 Celery 来执行此类任务,或者自己编写另一个守护程序应用程序(另一个进程),并借助消息队列(例如 RabbitMQ)或公共数据库的帮助从 http 服务器向其提供任务。

如果对下面的代码有任何疑问,请随时提问,这对我来说是很好的练习:

from flask import Flask, current_app

import threading

from threading import Thread, Event

import time

from random import randint


app = Flask(__name__)

# use the dict to store events to stop other treads

# one event per thread !

app.config["ThreadWorkerActive"] = dict()



def do_work(e: Event):

    """function just for another one thread to do some work"""

    while True:

        if e.is_set():

            break  # can be stopped from another trhead

        print(f"{threading.current_thread().getName()} working now ...")

        time.sleep(2)

    print(f"{threading.current_thread().getName()} was stoped ...")



@app.route("/long_thread", methods=["GET"])

def long_thread_task():

    """Allows to start a new thread"""

    th_name = f"Th-{randint(100000, 999999)}"  # not really unique actually

    stop_event = Event()  # is used to stop another thread

    th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)

    th.start()

    current_app.config["ThreadWorkerActive"][th_name] = stop_event

    return f"{th_name} was created!"



@app.route("/stop_thread/<th_id>", methods=["GET"])

def stop_thread_task(th_id):

    th_name = f"Th-{th_id}"

    if th_name in current_app.config["ThreadWorkerActive"].keys():

        e = current_app.config["ThreadWorkerActive"].get(th_name)

        if e:

            e.set()

            current_app.config["ThreadWorkerActive"].pop(th_name)

            return f"Th-{th_id} was asked to stop"

        else:

            return "Sorry something went wrong..."

    else:

        return f"Th-{th_id} not found"



@app.route("/", methods=["GET"])

def index_route():

    text = ("/long_thread - create another thread.  "

            "/stop_thread/th_id - stop thread with a certain id.  "

            f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")

    return text



if __name__ == '__main__':

    app.run(host="0.0.0.0", port=9999)


查看完整回答
反对 回复 2024-01-15
  • 1 回答
  • 0 关注
  • 145 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信