为了账号安全,请及时绑定邮箱和手机立即绑定

使用原始 PyMySQL 进行多线程处理 celery

使用原始 PyMySQL 进行多线程处理 celery

子衿沉夜 2023-12-26 15:20:12
在我目前正在进行的项目中,我不允许使用 ORM,所以我自己做了它工作得很好,但我在 Celery 和它的并发性方面遇到了问题。有一段时间,我将其设置为1(using --concurrency=1),但我添加了新任务,这些任务的处理时间比使用 celerybeat 运行所需的时间要长,这会导致任务大量积压。当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):https://pastebin.com/M4HZXTDC关于如何在其他进程上实现某种锁定/等待以使不同的工作人员不会相互交叉的任何想法?编辑:这是我设置PyMySQL 实例以及如何处理打开和关闭的位置
查看完整描述

1 回答

?
鸿蒙传说

TA贡献1865条经验 获得超7个赞

PyMSQL不允许线程共享同一个连接(模块可以共享,但线程不能共享连接)。您的模型类在各处重用相同的连接

因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。

确保您的连接对象是线程本地的。不要使用db类属性,而是考虑一种检索线程本地连接对象的方法,而不是重用可能在不同线程中创建的连接对象。

例如,在任务中创建连接

现在,您在每个模型的任何地方都使用全局连接。

# Connect to the database

connection = pymysql.connect(**database_config)



class Model(object):

    """

    Base Model class, all other Models will inherit from this

    """


    db = connection

为了避免这种情况,您可以在方法中创建数据库__init__......


class Model(object):

    """

    Base Model class, all other Models will inherit from this

    """


    def __init__(self, *args, **kwargs):

        self.db = pymysql.connect(**database_config)

但是,这可能不高效/不实用,因为 db 对象的每个实例都会创建一个会话。


为了改进这一点,您可以使用一种方法threading.local来将连接保持在线程本地。




class Model(object):

    """

    Base Model class, all other Models will inherit from this

    """

    _conn = threading.local()

    @property

    def db(self):

        if not hasattr(self._conn, 'db'):

            self._conn.db = pymysql.connect(**database_config)

        return self._conn.db

请注意,假设您使用线程并发模型,线程本地解决方案就可以工作。另请注意,celery 默认情况下使用多个进程(prefork)。这可能是问题,也可能不是问题。



查看完整回答
反对 回复 2023-12-26
  • 1 回答
  • 0 关注
  • 84 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信