2 回答

TA贡献2003条经验 获得超2个赞
可能有更聪明的方法来实现这一点,但您也可以使用 RDD :
from operator import add
from numpy import mean
from datetime import datetime
data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),
(2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]
data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)
def computeMvgAvg(values):
sorted_date = sorted(values)
diffs = []
mvg_avg = []
for i in range(1, len(sorted_date)):
diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))
mvg_avg.append(int(mean(diffs)))
diffs = [None] + diffs
mvg_avg = [None] + mvg_avg
return zip(sorted_date, diffs, mvg_avg)
sch = StructType([
StructField("A", StringType(), True),
StructField("B", DateType(), True),
StructField("C", IntegerType(), True),
StructField("moving_avg", IntegerType(), True)
])
data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()
+---+----------+----+----------+
| A| B| C|moving_avg|
+---+----------+----+----------+
| 1|2018-03-20|null| null|
| 1|2018-12-25| 280| 280|
| 1|2019-01-20| 26| 153|
| 2|2019-01-01|null| null|
| 2|2019-01-15| 14| 14|
| 3|2018-01-01|null| null|
| 3|2018-11-08| 311| 311|
| 4|2018-04-09|null| null|
+---+----------+----+----------+

TA贡献1813条经验 获得超2个赞
文档: 窗口
文档: 滞后
# Creating a Dataframe
from pyspark.sql.window import Window
from pyspark.sql.functions import col, to_date, lag, datediff, when, udf
df = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),
(2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],
['A','B'])
df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))
# Using window and lag functions to find the value from previous row
my_window = Window.partitionBy('A').orderBy('A','B')
# Creating a UDF to calculate average of window sized 2.
def row_avg(c1,c2):
count_non_null = 2
total = 0
if c1 == None:
c1 = 0
count_non_null = count_non_null - 1
if c2 == None:
c2 = 0
count_non_null = count_non_null - 1
if count_non_null == 0:
return None
else:
return int((c1+c2)/count_non_null)
row_avg = udf(row_avg)
df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\
.withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\
.withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\
.withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')
df.show()
+---+----------+----------+
| A| B|moving_avg|
+---+----------+----------+
| 1|2018-03-20| null|
| 1|2018-12-25| 280|
| 1|2019-01-20| 153|
| 3|2018-01-01| null|
| 3|2018-11-08| 311|
| 2|2019-01-01| null|
| 2|2019-01-15| 14|
| 4|2018-04-09| null|
+---+----------+----------+
添加回答
举报