2 回答
TA贡献1789条经验 获得超8个赞
如果多个连接尝试同时 INSERT 或 UPDATE 到同一个表,则可能会因表索引的争用而导致死锁。
你的问题说你从多个线程执行 INSERT。执行 INSERT 需要检查主键唯一性和外键有效性等约束,然后更新表示这些约束的索引。所以多个并发更新
必须锁定索引以供读取,然后
锁定它们以进行写入。
从你的问题来看,MySQL有时会陷入死锁情况(一个线程按a,b顺序锁定索引,另一个线程按b,a顺序锁定索引)。如果不同的线程可以同时将行插入到不同的表中,并且这些表通过外键约束相互关联,则索引维护相对容易陷入死锁情况。
您可以通过在执行加载之前更改要填充的表以删除所有索引(自动增量主键除外),然后在之后重新创建它们来解决此问题。
或者,您可以摆脱并发性并仅使用一个线程执行ETL的L部分。由于所有索引维护,线程并不能像直观上那样帮助提高吞吐量。
避免在多个并发线程上运行数据定义语言(CREATE TABLE、CREATE INDEX 等)。对这些东西进行故障排除比其价值更麻烦。
此外,在事务中包装大约数百行的每个块的 INSERT 可以以惊人的方式帮助 ETL 吞吐量。在每个主干之前,说BEGIN TRANSACTION;
在每个块之后说COMMIT;
为什么这有帮助?因为 COMMIT 操作需要时间,并且不在显式事务中的每个操作后面都会有一个隐式 COMMIT。
TA贡献1865条经验 获得超7个赞
我发现这个问题的一个可能的解决方案是重试机制。如果发生死锁 - 睡眠并多尝试几次直到成功,同时将 DF 保留在内存中:
class Query(abc.ABC):
def __init__(self):
self.engine = MysqlEngine.engine()
....
....
def load(self, df: pd.DataFrame) -> None:
for i in range(5): # If load fails due to a deadlock, try 4 more times
try:
df.to_sql(
name=self.table,
con=self.engine.connect(),
if_exists="replace",
index=False,
)
return
except sqlalchemy.exc.OperationalError as ex:
if "1213" in repr(ex):
logging.warning(
"Failed to acquire lock for %s", self.__class__.__name__
)
sleep(1)
死锁仍然会发生,并且您会损失一些性能,但它胜过重新执行整个 Extrac - Transform。
添加回答
举报