为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用python pandas处理传入的实时数据

如何使用python pandas处理传入的实时数据

跃然一笑 2021-03-17 17:32:23
哪一种是使用熊猫处理实时传入数据的最推荐/ Python方法?每隔几秒钟,我就会收到以下格式的数据点:{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}我想将其附加到现有的DataFrame上,然后对其进行一些分析。问题是,仅将DataFrame.append添加到行中可能会导致所有复制的性能问题。我尝试过的事情:一些人建议预分配一个大的DataFrame并在数据输入时对其进行更新:In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)In [2]: columns = ['high', 'low', 'open', 'close']In [3]: df = pd.DataFrame(index=t, columns=columns)In [4]: dfOut[4]:                     high  low open close2013-01-01 00:00:00  NaN  NaN  NaN   NaN2013-01-01 00:00:01  NaN  NaN  NaN   NaN2013-01-01 00:00:02  NaN  NaN  NaN   NaN2013-01-01 00:00:03  NaN  NaN  NaN   NaN2013-01-01 00:00:04  NaN  NaN  NaN   NaNIn [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}In [6]: data_ = pd.Series(data)In [7]: df.loc[data['time']] = data_In [8]: dfOut[8]:                     high  low open close2013-01-01 00:00:00  NaN  NaN  NaN   NaN2013-01-01 00:00:01  NaN  NaN  NaN   NaN2013-01-01 00:00:02    4    3    2     12013-01-01 00:00:03  NaN  NaN  NaN   NaN2013-01-01 00:00:04  NaN  NaN  NaN   NaN另一种选择是建立字典列表。只需将传入的数据附加到列表中,然后将其切成较小的DataFrame,即可完成工作。In [9]: ls = []In [10]: for n in range(5):   .....:     # Naive stuff ahead =)   .....:     time = '2013-01-01 00:00:0' + str(n)   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}   .....:     ls.append(d)In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')In [12]: dfOut[12]:                         close      high       low      open stocktime                                                             2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH或类似的东西,也许要多处理一些输入。
查看完整描述

3 回答

?
阿波罗的战车

TA贡献1862条经验 获得超6个赞

我将使用HDF5 / pytables如下:

  1. 将数据尽可能长地保留为python列表。

  2. 将结果追加到该列表。

  3. 当它变大时:

    • 使用pandas io(和一个可附加的表)推送到HDF5 Store。

    • 清除列表。

  4. 重复。

实际上,我定义的函数为每个“键”使用一个列表,以便您可以在同一过程中将多个DataFrame存储到HDF5存储。


我们定义一个函数,您需要在每一行中调用它d

CACHE = {}

STORE = 'store.h5'   # Note: another option is to keep the actual file open


def process_row(d, key, max_len=5000, _cache=CACHE):

    """

    Append row d to the store 'key'.


    When the number of items in the key's cache reaches max_len,

    append the list of rows to the HDF5 store and clear the list.


    """

    # keep the rows for each key separate.

    lst = _cache.setdefault(key, [])

    if len(lst) >= max_len:

        store_and_clear(lst, key)

    lst.append(d)


def store_and_clear(lst, key):

    """

    Convert key's cache list to a DataFrame and append that to HDF5.

    """

    df = pd.DataFrame(lst)

    with pd.HDFStore(STORE) as store:

        store.append(key, df)

    lst.clear()

注意:我们使用with语句在每次写入后自动关闭存储。它可以更快地保持开放,但即便如此我们建议您定期刷新(收盘刷新)。还要注意,使用collection deque而不是列表可能更易读,但是列表的性能在这里会稍好一些。


要使用此功能,请致电:


process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},

            key="df")

注意:“ df”是pytables存储中使用的存储键。


作业完成后,请确保您store_and_clear剩余的缓存:


for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2

    store_and_clear(lst, k)

现在,您可以通过以下方式使用完整的DataFrame:


with pd.HDFStore(STORE) as store:

    df = store["df"]                    # other keys will be store[key]


查看完整回答
反对 回复 2021-03-26
?
慕斯王

TA贡献1864条经验 获得超2个赞

您实际上是在尝试解决两个问题:捕获实时数据并分析该数据。第一个问题可以通过为此目的设计的Python日志记录来解决。然后可以通过读取相同的日志文件来解决另一个问题。


查看完整回答
反对 回复 2021-03-26
  • 3 回答
  • 0 关注
  • 229 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号