为了账号安全,请及时绑定邮箱和手机立即绑定

如何实现Python的多处理池来转换数千个文件

如何实现Python的多处理池来转换数千个文件

蝴蝶不菲 2022-08-02 15:51:34
我有一个Python脚本,可以在包含超过10,000个DBF文件的目录中读取,以便将它们转换为CSV。我想将此任务视为一般化,而不是单独转换每个文件。我已经阅读了Python的多处理模块,尽管我在实现此任务时遇到了一些麻烦。具体来说,我想使用 Pool 类在 CPU 内核之间分配工作负载。这是我到目前为止的代码:import osfrom dbfread import DBFimport pandas as pdimport multiprocessingdirectory = 'C:\\Path_to_DBF_Files' #define file directory files_in = os.listdir(directory) #store files in directory to list def convert():    for file in files_in:        if file.startswith('D') and file.endswith('.DBF'): #define parameters of filenames to convert            file_path = os.path.join(files_in, file)            print(f'\nReading in {file}...')            dbf = DBF(file_path) #create DBF object             dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii             dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is             print('\nConverting to DataFrame...')            df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe             df.columns.astype(str) #convert column datatypes to string            print(df)            print('\nWriting to CSV...')            dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files             df.to_csv(dest_directory, index = False)            print(f'\nConverted {file} to CSV. Moving to next file...')        elif file.startswith('B') and file.endswith('.DBF'): #define parameters for unnecessary files             print('\nB file not needed.')            continue        elif file.endswith('.FPT'): #skip FPT files             print('Skipping FPT file.')            continue我在StackOverflow上读到了一些与我的问题有些相似的答案;但是,我没有看到任何适用于我的特定任务的内容。如何改进代码,使脚本同时处理多个文件,而不是一次只读取和转换一个文件?感谢您提供的任何帮助。
查看完整描述

1 回答

?
MM们

TA贡献1886条经验 获得超2个赞

一些一般指导:

  1. 您正在创建一个池。池大小应取决于计算机,而不是作业的大小。例如,您希望池中有 4 个进程而不是 10000 个进程,即使您有 10000 个文件要处理

  2. 在每个进程上运行的作业应该简单但已参数化。在您的例子中,创建一个函数来获取文件名作为输入并执行转换。然后将输入文件映射到其中。过滤应在调用之前完成。map

因此,我会将您的代码转换为如下所示的内容:

import os

from dbfread import DBF

import pandas as pd

import multiprocessing


directory = 'C:\\Path_to_DBF_Files' #define file directory 


files_in = os.listdir(directory) #store files in directory to list 


def convert(file):

    file_path = os.path.join(files_in, file)

    print(f'\nReading in {file}...')

    dbf = DBF(file_path) #create DBF object 

    dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii 

    dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is 

    print('\nConverting to DataFrame...')

    df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe 

    df.columns.astype(str) #convert column datatypes to string

    print(df)

    print('\nWriting to CSV...')

    dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files 

    df.to_csv(dest_directory, index = False)

    print(f'\nConverted {file} to CSV. Moving to next file...')


pool = multiprocessing.Pool(processes = 4)

pool.map(convert, [file for file in files_in if file.startswith('D') and file.endswith('.DBF')])

查看完整回答
反对 回复 2022-08-02
  • 1 回答
  • 0 关注
  • 81 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号