新手-ish python/pandas 用户在这里。我一直在尝试在 read_fwf 中使用 chunksize arg 并迭代 value_counts 变量。我编写了一个函数来传递参数,例如文件迭代器和变量来解析和计数。我希望并行化这个函数,并能够同时将 2 个文件读入同一个函数。它似乎确实有效……但是,我的速度出乎意料地变慢了。线程在同一时间完成,但一个似乎正在减慢另一个(IO 瓶颈?)。通过按顺序而不是并行运行函数(324 秒对 172 秒),我得到了更快的时间。想法?我在执行这个错误吗?我试过多进程但启动映射错误,我无法腌制文件迭代器(read_fwf 的输出)。testdf1=pd.read_fwf(filepath_or_buffer='200k.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000) testdf2=pd.read_fwf(filepath_or_buffer='200k2.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000)def tfuncth(df,varn,q,*args): td={} for key in varn.keys(): td[key]=pd.Series() for rdf in df: if args is not None: for arg in args: rdf=eval(f"rdf.query(\"{arg}\")") for key in varn.keys(): ecode=f'rdf.{varn[key]}.value_counts()' td[key]=pd.concat([td[key],eval(ecode)]) td[key]=td[key].groupby(td[key].index).sum() for key in varn.keys(): td[key]=pd.DataFrame(td[key].reset_index()).rename(columns={'index':'Value',0:'Counts'}).assign(Var=key,PCT=lambda x:round(x.Counts/x.Counts.sum()*100,2))[['Var','Value','Counts','PCT']] q.put(td) bands={ '1':'A', '2':'B', '3':'C', '4':'D', '5':'E', '6':'F', '7':'G', '8':'H', '9':'I' } vdict={ 'var1':'e1270.str.slice(0,2)', 'var2':'e1270.str.slice(2,3)', 'band':'e7641.str.slice(0,1).replace(bands)' }更新:经过大量阅读,这也是我得出的结论。这是非常简单的结论,我敢肯定,所以如果有人知道,请告诉我。Pandas 不是一个完全多线程友好的包显然有一个名为 'dask' 的包,它复制了很多 Pandas 函数。所以我会调查一下。在许多情况下,Python 并不是真正的多线程兼容语言因此,诸如“dask”之类的包可以利用多线程。可以分离多个线程,但只能并行非 CPU 绑定函数。我的代码是用 IO 和 CPU 包装的。简单的 IO 可能是并行运行的,但会等待处理器执行。我计划通过编写仅 IO 操作并尝试线程来测试这一点。Python 受其编译器的约束。在纯 python 中,它被 GIL 解释和绑定,一次只执行一个线程Python 可以使用在线程上没有全局解释器锁 (GIL) 的不同编译器进行编译。
1 回答
眼眸繁星
TA贡献1873条经验 获得超9个赞
我确实设法通过使用多处理包来解决这个问题并解决我的问题。我遇到了两个问题。
1) multiprocessing 包与 Juypter Notebook 不兼容
和
2)你不能pickle一个pandas阅读器的句柄(传递给进程的多处理pickles对象)。
我通过在 Notebook 环境之外编码来修复 1,我通过将打开分块文件所需的参数传递给每个进程并让每个进程开始自己的块读取来修复 2。
完成这两件事后,我的速度比顺序运行提高了 60%。
- 1 回答
- 0 关注
- 290 浏览
添加回答
举报
0/150
提交
取消