为了账号安全,请及时绑定邮箱和手机立即绑定

调用 SQL Server 存储过程来提取 CSV 的 Python 代码需要几个小时才能执行

调用 SQL Server 存储过程来提取 CSV 的 Python 代码需要几个小时才能执行

有只小跳蛙 2023-07-18 18:02:54
我使用 python 使用 Pandas 读取 CSV,修复一些字段,然后将数据逐行写入 SQL Server 中的表。服务器上禁用批量导入 - 另外,因为最终会有数十个这样的文件,以自动下载和摄取文件。我可以看到这需要几分钟,但运行需要几个小时。我知道如果启用的话,我可以在几秒钟内批量上传这些内容,但这可能是不可能的。问题是使用 python 每次运行可能需要 1 到 3 个小时。这是不可接受的。我想知道是否有更快的方法来完成此上传。我可以对表格做些什么来加快导入速度,或者采用不同的编码方式吗?这是我正在使用的代码类型的示例:def ingest_glief_reporting_exceptions_csv():    global conn    global cursor    filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"    # filename = r"repex_1K.csv"    full_filename = os.path.join(raw_data_dir, filename)    sql_str = "exec dbo.util_get_gleif_last_reporting_exception"    cursor.execute(sql_str)    last_lei = ''    for result in cursor.fetchall():        last_lei = result[0]    # "repex" is short for "reporting exceptions", shorten the headers    repex_headers = [        'LEI',        'ExceptionCategory',        'ExceptionReason1',        'ExceptionReason2',        'ExceptionReason3',        'ExceptionReason4',        'ExceptionReason5',        'ExceptionReference1',        'ExceptionReference2',        'ExceptionReference3',        'ExceptionReference4',        'ExceptionReference5'    ]    df = pd.read_csv(full_filename, header=0, quotechar='"')    # Change to the column headers generated in VBA    df.columns = repex_headers    for colname in df.columns:        df[colname] = df[colname].astype(str)        df[colname] = df[colname].replace({'nan': ''})    place_holder = '?,?'    for i in range(1, len(repex_headers)):        place_holder += ',?'    sql_str = "exec save_gleif_reporting_exception " + place_holder    row_count = 0    row = dict()    do_not_upload = True    if last_lei == '':        do_not_upload = False   # There was no last uploaded record, so we can start now    for index, row in df.iterrows():        row_count += 1        if do_not_upload:            if row['LEI'] == last_lei:                do_not_upload = False                continue            else:                continue        )
查看完整描述

2 回答

?
慕丝7291255

TA贡献1859条经验 获得超6个赞

由于您不需要存储过程的返回值,因此您应该能够使用 pandas 的to_sql方法将行直接插入表中。这段代码...


from time import time

import pandas as pd

import sqlalchemy as sa


from_engine = sa.create_engine("mssql+pyodbc://@mssqlLocal64")

to_engine = sa.create_engine(

    "mssql+pyodbc://sa:_whatever_@192.168.0.199/mydb"

    "?driver=ODBC+Driver+17+for+SQL+Server",

    fast_executemany=False,

)


# set up test

to_cnxn = to_engine.raw_connection()

to_cnxn.execute("TRUNCATE TABLE MillionRows")

to_cnxn.commit()

num_rows_to_upload = 10000

df = pd.read_sql_query(

    f"SELECT TOP {num_rows_to_upload} "

    "[TextField], [LongIntegerField], [DoubleField], [varchar_column] "

    "FROM MillionRows ORDER BY ID",

    from_engine,

)


# run test

t0 = time()

df.to_sql("MillionRows", to_engine, index=False, if_exists="append")

s = f"{(time() - t0):0.1f} seconds"

print(f"uploading {num_rows_to_upload:,d} rows took {s}")

… 表示与您现在所做的工作大致相同的内部工作水平,即,将每个单独的行作为单独的调用上传.execute。结果是


uploading 10,000 rows took 60.2 seconds

但是,简单地更改to_engine为使用fast_executemany=True结果


uploading 10,000 rows took 1.4 seconds


查看完整回答
反对 回复 2023-07-18
?
侃侃尔雅

TA贡献1801条经验 获得超16个赞

关闭自动提交


 conn = pyodbc.connect(

        r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \

        Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False)

 

并在此处和循环结束时提交。


    if index % 1000 == 0:

            print("Imported %s rows" % (index))

使用自动提交,您必须等待日志文件在每一行之后保存到磁盘。


为了进一步优化,如果您使用 SQL 2016+,请使用 JSON 将批量行发送到 SQL Server,并使用OPENJSON在服务器端进行解析。



查看完整回答
反对 回复 2023-07-18
  • 2 回答
  • 0 关注
  • 145 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信