为了账号安全,请及时绑定邮箱和手机立即绑定

在多线程环境下使用DataFrame.to_sql时MySQL死锁

在多线程环境下使用DataFrame.to_sql时MySQL死锁

烙印99 2023-08-15 16:44:32
我在 docker 容器内有一个多线程 ETL 进程,看起来像这样的简化代码:class Query(abc.ABC):    def __init__(self):        self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)    def load(self, df: pd.DataFrame) -> None:        df.to_sql(            name=self.table, con=self.connection, if_exists="replace", index=False,        )    @abc.abstractmethod    def transform(self, data: object) -> pd.DataFrame:        pass    @abc.abstractmethod    def extract(self) -> object:        pass    #  other methods...class ComplianceInfluxQuery(Query):    # Implements abstract methods... load method is the same as Query classALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]while True:    with ThreadPoolExecutor(max_workers=8) as pool:        for query in ALL_QUERIES:            pool.submit(execute_etl, query) # execute_etl function calls extract, transform and loadload()许多类继承自 Query,具有与类中所示相同的实现,Query它只是将 pandas DataFrame 对象加载到 sql 表中,并替换该表(如果存在)。所有类同时运行,并在完成后将结果加载到 MySQLExtract()数据库Transform()。每个类都会将不同的表加载到数据库中。load()当调用该方法时,我经常会从随机线程中遇到死锁:日志显示了load()两个线程几乎同时调用的方法。无论数据如何,这种情况都可能发生在所有类中。我运行了命令SHOW ENGINE INNODB STATUS,那里没有列出死锁。cc_influx_share_count我检查了 General_log 表以更好地了解死锁期间发生的情况,但除了死锁的线程在(我认为)应该具有的情况下没有向表中插入任何值这一事实之外,没有注意到任何有用的信息:该错误于 09:48:28,241 提出SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;此 ETL 是运行 MySQL 的唯一进程。我已阅读有关死锁发生原因的文档,但我无法理解两个之间没有连接的不同表如何导致死锁。我知道我可以简单地load()再次运行该方法直到成功,但我想了解为什么会发生死锁以及如何防止它们。MySQL版本是8.0.21。蟒蛇3.8.4。sqlalchemy 1.3.19。熊猫 1.0.5。PyMySQL 0.10.1。
查看完整描述

2 回答

?
拉丁的传说

TA贡献1789条经验 获得超8个赞

如果多个连接尝试同时 INSERT 或 UPDATE 到同一个表,则可能会因表索引的争用而导致死锁。

你的问题说你从多个线程执行 INSERT。执行 INSERT 需要检查主键唯一性和外键有效性等约束,然后更新表示这些约束的索引。所以多个并发更新

  1. 必须锁定索引以供读取,然后

  2. 锁定它们以进行写入。

从你的问题来看,MySQL有时会陷入死锁情况(一个线程按a,b顺序锁定索引,另一个线程按b,a顺序锁定索引)。如果不同的线程可以同时将行插入到不同的表中,并且这些表通过外键约束相互关联,则索引维护相对容易陷入死锁情况。

您可以通过在执行加载之前更改要填充的表以删除所有索引(自动增量主键除外),然后在之后重新创建它们来解决此问题。

或者,您可以摆脱并发性并仅使用一个线程执行ETLL部分。由于所有索引维护,线程并不能像直观上那样帮助提高吞吐量。

避免在多个并发线程上运行数据定义语言(CREATE TABLE、CREATE INDEX 等)。对这些东西进行故障排除比其价值更麻烦。

此外,在事务中包装大约数百行的每个块的 INSERT 可以以惊人的方式帮助 ETL 吞吐量。在每个主干之前,说BEGIN TRANSACTION; 在每个块之后说COMMIT; 为什么这有帮助?因为 COMMIT 操作需要时间,并且不在显式事务中的每个操作后面都会有一个隐式 COMMIT。


查看完整回答
反对 回复 2023-08-15
?
鸿蒙传说

TA贡献1865条经验 获得超7个赞

我发现这个问题的一个可能的解决方案是重试机制。如果发生死锁 - 睡眠并多尝试几次直到成功,同时将 DF 保留在内存中:


class Query(abc.ABC):

    def __init__(self):

        self.engine = MysqlEngine.engine()


    ....

    ....


    def load(self, df: pd.DataFrame) -> None:

        for i in range(5):  # If load fails due to a deadlock, try 4 more times

            try:

                df.to_sql(

                    name=self.table,

                    con=self.engine.connect(),

                    if_exists="replace",

                    index=False,

                )

                return

            except sqlalchemy.exc.OperationalError as ex:

                if "1213" in repr(ex):

                    logging.warning(

                        "Failed to acquire lock for %s", self.__class__.__name__

                    )

                sleep(1)

死锁仍然会发生,并且您会损失一些性能,但它胜过重新执行整个 Extrac - Transform。


查看完整回答
反对 回复 2023-08-15
  • 2 回答
  • 0 关注
  • 159 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信