2 回答
TA贡献1830条经验 获得超9个赞
我修改了我的原始脚本以部署在任意数量的 CPU 上。它运行得更快,因为我可以使用多个线程并部署在 aws 上。我用了一台96核的机器,8小时左右就完成了任务。我很惊讶,因为这几乎是线性缩放!这个想法是使一些重复的任务可分发。然后你就可以将任务分配给 CPU。这里的并行化是通过命令完成的pool.map()。
从命令行使用此脚本非常简单:
python3 transposer.py -i largeFile.tsv
如果需要,您也可以指定其他参数。
import argparse, subprocess
import numpy as np
import pandas as pd
import dask.dataframe as dd
from IPython.display import clear_output
from contextlib import closing
from os import cpu_count
from multiprocessing import Pool
parser = argparse.ArgumentParser(description='Transpose csv')
parser.add_argument('-i', '--infile', help='Path to input folder',
default=None)
parser.add_argument('-s', '--sep', help='input separator',
default='\t')
args = parser.parse_args()
infile = args.infile
sep = args.sep
df = pd.read_csv(infile, sep='\t', nrows=3)
def READ_COL(item):
print(item)
outfile = 'outfile{}.temp'.format(item)
if item !=0:
x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
subprocess.check_call([x], shell=True)
col = pd.read_csv(outfile)
row = col.T
display(row)
row.to_csv('col_{:09d}.csv'.format(item), header=False)
subprocess.check_call(['rm '+outfile], shell=True)
print(item/len(df.columns))
with closing(Pool(processes=cpu_count())) as pool:
pool.map(READ_COL, list(range(1, len(df.columns)+1)))
在此之后,您应该有许多转置列的文件。您只需要使用cat或其他命令行工具将它们连接在一起。我刚跑cat col_* > full_file_transposed.csv
添加回答
举报