为了账号安全,请及时绑定邮箱和手机立即绑定

熊猫:参差不齐的时间序列的时间加权滚动平均值

熊猫:参差不齐的时间序列的时间加权滚动平均值

aluckdog 2023-06-06 15:10:20
我有一个参差不齐(意思是不规则频率)的时间索引 DataFrame,我想对其执行时间加权滚动平均值,以保持 DataFrame 的原始索引。假设记录的值在被另一个值取代之前是有效的。实现这一点的一种方法是将参差不齐的 DataFrame 上采样到统一频率,然后进行滚动平均:import pandas as pdimport numpy as npdef time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:    # Leads to high memory usage    original_index = df.index.copy()    avg = (        df.resample("1s")        .ffill()        .rolling(avg_window, closed="left", min_periods=int(avg_window[0])))        .mean()        .reindex(original_index)    )    return avgif __name__ == "__main__":    df = pd.DataFrame(        {"A": [0, 1, 2, 3, 4, 5]},        index=[            pd.Timestamp("20130101 09:00:00"),            pd.Timestamp("20130101 09:00:02"),            pd.Timestamp("20130101 09:00:03"),            pd.Timestamp("20130101 09:00:05"),            pd.Timestamp("20130101 09:00:06"),            pd.Timestamp("20130101 09:00:10"),        ],    )    expected_avg = pd.DataFrame(        {"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},        index=[            pd.Timestamp("20130101 09:00:00"),            pd.Timestamp("20130101 09:00:02"),            pd.Timestamp("20130101 09:00:03"),            pd.Timestamp("20130101 09:00:05"),            pd.Timestamp("20130101 09:00:06"),            pd.Timestamp("20130101 09:00:10"),        ],    )    pd.testing.assert_frame_equal(        time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg    )这个问题是上采样破坏了参差不齐的 df 提供的稀疏表示的目的。稀疏表示具有内存效率,而上采样版本则不然。这就引出了一个问题:如何在不必对整个 df 进行上采样的情况下获得上面显示的结果?
查看完整描述

2 回答

?
开满天机

TA贡献1786条经验 获得超13个赞

这是一个替代方案,而不是对整个数据帧进行上采样,您可以首先检查两行之间的时间差异大于间隙的位置。然后将 3s 删除到具有这些特定新时间戳的联合的间隙和reindexdf 的行。创建这些行后,您可以groupby使用添加新索引的位置,resample每组 1 秒,最后rolling使用您所做的方法。Reindex最后有df。


rule = 3

rolling_win = f'{rule}s'


sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)

new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)

print(new_timestamps) 

#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)


#reindex with the new 

df_ = df.reindex(df.index.union(new_timestamps))


#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean

#finally reindex like original df

df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())

          .resample("1s").ffill()

          .reset_index(level=0, drop=True).ffill()

          .rolling(rolling_win, closed="left", min_periods=rule)\

          .mean()

          .reindex(df.index)

      )

print(df_)

                            A

2013-01-01 09:00:00       NaN

2013-01-01 09:00:02       NaN

2013-01-01 09:00:03  0.333333

2013-01-01 09:00:05  1.666667

2013-01-01 09:00:06  2.333333

2013-01-01 09:00:10  4.000000

在这种情况下,它并不是很有趣,因为差距实际上很小,但如果差距很大,那么它就会变得有用。


编辑或其他选项,可能更好,union所有索引都是从您删除 1s、2s、3s 的原始索引中创建的,...(取决于规则)。现在你只有滚动 so和reindex所需的索引。最后结果一样ffillrolling.mean


from functools import reduce


rule = 3

rolling_win = f'{rule}s'


idx = df.index

df_ = (df.reindex(reduce(lambda x, y: x.union(y), 

                         [idx - pd.Timedelta(seconds=i) 

                          for i in range(0, rule+1)]))

         .ffill()

         .rolling(rolling_win, closed="left", min_periods=rule)\

         .mean()

         .reindex(df.index)

        )


查看完整回答
反对 回复 2023-06-06
?
撒科打诨

TA贡献1934条经验 获得超2个赞

启发的两种可能的解决方案:


def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses second resolution up-sampling only on smaller windows at a time."""

    original_index = df.index.copy()

    avg = (

        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

        .rolling(avg_window, closed="both", min_periods=2)

        .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))

        .reindex(original_index)

    )

    return avg



def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses weighting by duration, by ensuring every window has a point at the start."""

    original_index = df.index.copy()

    avg = (

        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

        .rolling(avg_window, closed="both", min_periods=2)

        .apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))

        .reindex(original_index)

    )

    return avg

第一个一次对单个滚动窗口进行上采样,而后者实际上通过确保在我们关心的窗口开始处始终有一个可用点来进行参差不齐的时间加权平均。这是通过包括按窗口长度移动的原始索引来完成的。


我还没有衡量相关案例的表现。


编辑: 我决定在大约 100,000 行的第二个分辨率数据集上测试函数,并使用 20 分钟的窗口(!)两种变体都慢得令人难以忍受,但我认为我有一个新的赢家:


def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses weighting by duration, by ensuring every window has a point at the start."""

    original_index = df.index.copy()

    avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

    avg = (

        avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)

        .divide(pd.Timedelta(avg_window).seconds)

        .rolling(avg_window, closed="left")

        .sum()

        .reindex(original_index)

    )

    avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan

    return avg

这个在滚动之前预先加权,因此我们不用使用.sum()而不是apply(). 这转化为巨大的速度提升。无论平均窗口的大小如何,我们最多也可以将索引加倍。


查看完整回答
反对 回复 2023-06-06
  • 2 回答
  • 0 关注
  • 126 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信