为了账号安全,请及时绑定邮箱和手机立即绑定

即使有块,Dask也会耗尽内存

即使有块,Dask也会耗尽内存

繁星coding 2022-07-05 19:47:35
我正在处理大型 CSV 文件,我需要制作笛卡尔积(合并操作)。由于内存错误,我试图解决 Pandas 的问题(您可以在此处查看 Pandas 的代码和相同问题的数据格式示例)但没有成功。现在,我正在尝试使用 Dask,它应该可以管理大量数据集,即使它的大小大于可用 RAM。首先,我阅读了两个 CSV:from dask import dataframe as ddBLOCKSIZE = 64000000  # = 64 Mb chunksdf1_file_path = './mRNA_TCGA_breast.csv'df2_file_path = './miRNA_TCGA_breast.csv'# Gets Dataframesdf1 = dd.read_csv(    df1_file_path,    delimiter='\t',    blocksize=BLOCKSIZE)first_column = df1.columns.values[0]df1.set_index(first_column)df2 = dd.read_csv(    df2_file_path,    delimiter='\t',    blocksize=BLOCKSIZE)first_column = df2.columns.values[0]df2.set_index(first_column)# Filter common columnscommon_columns = df1.columns.intersection(df2.columns)df1 = df1[common_columns]df2 = df2[common_columns]然后,我将操作存储在磁盘上以防止内存错误:# Computes a Cartesian productdf1['_tmpkey'] = 1df2['_tmpkey'] = 1# Neither of these two options work# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_hdf('/tmp/merge.*.hdf', key='/merge_data')# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_parquet('/tmp/')我创建了一个 repo 来尝试使用与我使用的完全相同的 CSV 文件。我尝试过使用较小blocksize的值,但我得到了同样的错误。我错过了什么吗?任何形式的帮助将非常感激。
查看完整描述

1 回答

?
明月笑刀无情

TA贡献1828条经验 获得超4个赞

我使用以下方法成功运行了您的代码,内存限制为 32GB。


我已经摆脱了这个论点BLOCKSIZE,repartition而是在 df1 和 df2 上使用。


df1 = df1.repartition(npartitions=50)

df2 = df2.repartition(npartitions=1)

请注意,与 df1 相比,df2 的大小确实更小( 2.5 MB vs 23.75 MB),这就是为什么我只为 df2 保留一个分区并将 df1 分成 50 个分区的原因。


这样做应该使代码为您工作。对我来说,使用的内存保持在 12GB 以下。


为了检查,我计算了结果的 len :


len(df) # 3001995

按照上述内容创建一个包含 50 个分区的 parquet 文件。您可以repartition再次使用来获得所需的 partition_size。


注意:


添加这个应该可以加快你的代码:


from dask.distributed import Client

client = Client()

Client(processes=False)就我而言,由于我的运行环境,我不得不使用该参数。


查看完整回答
反对 回复 2022-07-05
  • 1 回答
  • 0 关注
  • 126 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信