为了账号安全,请及时绑定邮箱和手机立即绑定

如何有效地转置 67 gb 文件/Dask 数据帧而不将其完全加载到内存中?

如何有效地转置 67 gb 文件/Dask 数据帧而不将其完全加载到内存中?

小唯快跑啊 2021-10-12 10:16:25
我有 3 个相当大的文件(67gb、36gb、30gb)需要训练模型。但是,特征是行,样本是列。由于 Dask 尚未实现转置并存储按行拆分的 DataFrame,因此我需要自己编写一些东西来执行此操作。有没有一种方法可以有效地转置而不加载到内存中?我有 16 GB 的内存可供我使用,并且正在使用 jupyter notebook。我写了一些相当慢的代码,但真的很感激更快的解决方案。以下代码的速度将需要一个月才能完成所有文件。几个数量级的最慢步骤是 awk。import dask.dataframe as ddimport subprocessfrom IPython.display import clear_outputdf = dd.read_csv('~/VeryLarge.tsv')with open('output.csv','wb') as fout:    for i in range(1, len(df.columns)+1):        print('AWKing')        #read a column from the original data and store it elsewhere        x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"        subprocess.check_call([x], shell=True)        print('Reading')        #load and transpose the column        col = pd.read_csv('~/file.temp')        row = col.T        display(row)        print('Deleting')        #remove the temporary file created        !rm ../file.temp        print('Storing')        #store the row in its own csv just to be safe. not entirely necessary        row.to_csv('~/columns/col_{:09d}'.format(i), header=False)        print('Appending')        #append the row (transposed column) to the new file        with open('~/columns/col_{:09d}', 'rb') as fin:            for line in fin:                fout.write(line)        clear_output()        #Just a measure of progress        print(i/len(df.columns))数据本身有 1000 万行(特征)和 2000 列(样本)。它只需要转置。目前,它看起来像这样:
查看完整描述

2 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

我修改了我的原始脚本以部署在任意数量的 CPU 上。它运行得更快,因为我可以使用多个线程并部署在 aws 上。我用了一台96核的机器,8小时左右就完成了任务。我很惊讶,因为这几乎是线性缩放!这个想法是使一些重复的任务可分发。然后你就可以将任务分配给 CPU。这里的并行化是通过命令完成的pool.map()。


从命令行使用此脚本非常简单:


python3 transposer.py -i largeFile.tsv

如果需要,您也可以指定其他参数。


import argparse, subprocess

import numpy as np

import pandas as pd

import dask.dataframe as dd

from IPython.display import clear_output

from contextlib import closing

from os import cpu_count

from multiprocessing import Pool


parser = argparse.ArgumentParser(description='Transpose csv')

parser.add_argument('-i', '--infile', help='Path to input folder',

                    default=None)

parser.add_argument('-s', '--sep', help='input separator',

                    default='\t')


args = parser.parse_args()

infile = args.infile

sep = args.sep    

df = pd.read_csv(infile, sep='\t', nrows=3)    


def READ_COL(item):

    print(item)

    outfile = 'outfile{}.temp'.format(item)

    if item !=0:

                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile

                subprocess.check_call([x], shell=True)

                col = pd.read_csv(outfile)

                row = col.T

                display(row)

                row.to_csv('col_{:09d}.csv'.format(item), header=False)

                subprocess.check_call(['rm '+outfile], shell=True)

                print(item/len(df.columns))


with closing(Pool(processes=cpu_count())) as pool:

    pool.map(READ_COL, list(range(1, len(df.columns)+1)))

在此之后,您应该有许多转置列的文件。您只需要使用cat或其他命令行工具将它们连接在一起。我刚跑cat col_* > full_file_transposed.csv


查看完整回答
反对 回复 2021-10-12
  • 2 回答
  • 0 关注
  • 153 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信