为了账号安全,请及时绑定邮箱和手机立即绑定

连接一个 dask 数据框和一个 Pandas 数据框

连接一个 dask 数据框和一个 Pandas 数据框

桃花长相依 2021-11-09 17:02:51
我有一个df包含大约 2.5 亿行(来自 10Gb CSV 文件)的 dask 数据框 ( )。我有另一个ndf25,000 行的Pandas 数据框 ( )。我想通过将每个项目重复 10,000 次,将 Pandas 数据框的第一列添加到 dask 数据框。这是我尝试过的代码。我已将问题缩小到较小的范围。import dask.dataframe as ddimport pandas as pdimport numpy as nppd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv")df = dd.read_csv("tempfile.csv")ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))df['Node'] = np.repeat(ndf[0], 10)使用此代码,我最终遇到错误。ValueError:并非所有分区都是已知的,无法对齐分区。请使用set_index来设置索引。我可以执行 areset_index()后跟 aset_index()来制作df.known_divisions Truedask 数据框。但这是一项耗时的操作。有没有更好更快的方法来做我想做的事情?我可以使用熊猫本身来做到这一点吗?最终目标是从ndf其中的任何相应行中找到df与某些条件匹配的行。
查看完整描述

2 回答

?
holdtom

TA贡献1805条经验 获得超10个赞

您的基本算法是“我希望将 的前 10 个值df['Node']设置为 的第一个值ndf,将接下来的 10 个值设置为 的下一个值ndf,依此类推”。这在 Dask 中很难,因为它不知道每个分区中有多少行:您正在从 CSV 读取,并且您在 X 字节中获得的行数取决于每个部分中的数据是什么样的. 其他格式为您提供更多信息...


因此,您肯定需要两次遍历数据。您可以使用索引来找出划分并可能进行一些排序。在我看来,你能做的最简单的事情就是测量分割长度,然后得到每个开始的偏移量:


lengths = df.map_partitions(len).compute()

offsets = np.cumsum(lengths.values)

offsets -= offsets[0]

现在使用自定义延迟功能来处理零件


@dask.delayed

def add_node(part, offset, ndf):

    index = pd.Series(range(offset, offset + len(part)) // 10,

                      index=part.index)  # 10 is the repeat factor

    part['Node'] = index.map(ndf)

    return part


df2 = dd.from_delayed([add_node(d, off, ndf) 

                       for d, off in zip(df.to_delayed(), offsets)])


查看完整回答
反对 回复 2021-11-09
?
牛魔王的故事

TA贡献1830条经验 获得超3个赞

使用相同的工作流程,您可以divisions按照此处的建议手动设置


import dask.dataframe as dd

import pandas as pd

import numpy as np


pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)

df = dd.read_csv("tempfile.csv")

ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))



df.divisions = (0, len(df)-1)

df["Note"] = dd.from_array(np.repeat(ndf.values, 10))

我不认为使用np.repeat是非常有效的,特别是对于大 df。


查看完整回答
反对 回复 2021-11-09
  • 2 回答
  • 0 关注
  • 197 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信