2 回答
TA贡献1805条经验 获得超10个赞
您的基本算法是“我希望将 的前 10 个值df['Node']设置为 的第一个值ndf,将接下来的 10 个值设置为 的下一个值ndf,依此类推”。这在 Dask 中很难,因为它不知道每个分区中有多少行:您正在从 CSV 读取,并且您在 X 字节中获得的行数取决于每个部分中的数据是什么样的. 其他格式为您提供更多信息...
因此,您肯定需要两次遍历数据。您可以使用索引来找出划分并可能进行一些排序。在我看来,你能做的最简单的事情就是测量分割长度,然后得到每个开始的偏移量:
lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]
现在使用自定义延迟功能来处理零件
@dask.delayed
def add_node(part, offset, ndf):
index = pd.Series(range(offset, offset + len(part)) // 10,
index=part.index) # 10 is the repeat factor
part['Node'] = index.map(ndf)
return part
df2 = dd.from_delayed([add_node(d, off, ndf)
for d, off in zip(df.to_delayed(), offsets)])
TA贡献1830条经验 获得超3个赞
使用相同的工作流程,您可以divisions按照此处的建议手动设置
import dask.dataframe as dd
import pandas as pd
import numpy as np
pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df.divisions = (0, len(df)-1)
df["Note"] = dd.from_array(np.repeat(ndf.values, 10))
我不认为使用np.repeat是非常有效的,特别是对于大 df。
添加回答
举报