为了账号安全,请及时绑定邮箱和手机立即绑定

有没有办法将具有值范围的列添加到 Spark Dataframe 中?

有没有办法将具有值范围的列添加到 Spark Dataframe 中?

慕哥9229398 2023-12-09 16:43:51
我有一个 Spark 数据框:df1 如下:age = spark.createDataFrame(["10","11","13"], "string").toDF("age")age.show()+---+|age|+---+| 10|| 11|| 13|+---+我需要向数据框中添加行号列以使其:+---+------+|age|col_id|+---+------+| 10|   1  || 11|   2  || 13|   3  |+---+------+我的数据框中的所有列都不包含唯一值。我尝试使用F.monotonically_increasing_id()),但它只是按递增顺序生成随机数。>>> age = spark.createDataFrame(["10","11","13"], "string").toDF("age").withColumn("rowId1", F.monotonically_increasing_id())>>> ageDataFrame[age: string, rowId1: bigint]>>> age.show<bound method DataFrame.show of DataFrame[age: string, rowId1: bigint]>>>> age.show()+---+-----------+|age|     rowId1|+---+-----------+| 10|17179869184|| 11|42949672960|| 13|60129542144|+---+-----------+由于我没有任何包含唯一数据的列,因此我担心使用窗口函数和生成row_numbers。那么,有没有一种方法可以row_count在数据框中添加一列,该列给出:+---+------+|age|col_id|+---+------+| 10|   1  || 11|   2  || 13|   3  |+---+------+如果窗口功能是唯一的实现方法,我如何确保所有数据都位于单个分区下?或者如果有一种方法可以在不使用窗口函数的情况下实现相同的功能,那么如何实现它?任何帮助表示赞赏。
查看完整描述

2 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

使用zipWithIndex

pyspark 与 Scala 不同。

其他答案对性能不利 - 使用单个执行器。zipWithIndexnarrow transformation这样,它可以按partition.

在这里,您可以进行相应的定制:

from pyspark.sql.types import StructField

from pyspark.sql.types import StructType

from pyspark.sql.types import StringType, LongType

import pyspark.sql.functions as F


df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())


schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])

rdd = df1.rdd.zipWithIndex()

rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))

df1 = spark.createDataFrame(rdd1, schema)

df1.show()

返回:


+-----+-----+

|value|index|

+-----+-----+

|  abc|    0|

|    2|    1|

|    3|    2|

|    4|    3|

|  abc|    4|

|    2|    5|

|    3|    6|

|    4|    7|

|  abc|    8|

|    2|    9|

|    3|   10|

|    4|   11|

+-----+-----+


查看完整回答
反对 回复 2023-12-09
?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

假设:这个答案基于以下假设: 的顺序col_id应取决于age列。如果假设不成立,则其他建议的解决方案是问题评论中提到的zipWithIndex。zipWithIndex可以在此答案中找到 的示例用法。


建议的解决方案:您可以使用window带有空partitionBy和行号的 a 来获取预期的数字。


from pyspark.sql.window import Window

from pyspark.sql import functions as F


windowSpec = Window.partitionBy().orderBy(F.col('age').asc())

age = age.withColumn(

    'col_id',

    F.row_number().over(windowSpec)

)


查看完整回答
反对 回复 2023-12-09
  • 2 回答
  • 0 关注
  • 79 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信