使用Spark将列转换为行我正在尝试将我的表的某些列转换为行。我正在使用Python和Spark 1.5.0。这是我的初始表:+-----+-----+-----+-------+| A |col_1|col_2|col_...|+-----+-------------------+| 1 | 0.0| 0.6| ... || 2 | 0.6| 0.7| ... || 3 | 0.5| 0.9| ... || ...| ...| ...| ... |我想有这样的事情:+-----+--------+-----------+| A | col_id | col_value |+-----+--------+-----------+| 1 | col_1| 0.0|| 1 | col_2| 0.6| | ...| ...| ...| | 2 | col_1| 0.6|| 2 | col_2| 0.7| | ...| ...| ...| | 3 | col_1| 0.5|| 3 | col_2| 0.9|| ...| ...| ...|有人知道我能做到吗?谢谢您的帮助。
3 回答
呼唤远方
TA贡献1856条经验 获得超11个赞
Spark本地线性代数库目前非常弱:它们不包括如上所述的基本操作。
有一个JIRA用于解决Spark 2.1的问题 - 但今天对你没有帮助。
要考虑的事情:执行转置可能需要完全改组数据。
现在您需要直接编写RDD代码。我用transpose
scala 编写- 但不是用python 编写的。这是scala
版本:
def transpose(mat: DMatrix) = { val nCols = mat(0).length val matT = mat .flatten .zipWithIndex .groupBy { _._2 % nCols } .toSeq.sortBy { _._1 } .map(_._2) .map(_.map(_._1)) .toArray matT }
所以你可以将它转换为python供你使用。在这个特定的时刻,我没有带宽来编写/测试:如果你无法进行转换,请告诉我。
至少 - 以下内容很容易转换为python
。
zipWithIndex
- >enumerate()
(等价的python - 归功于@ zero323)map
- >[someOperation(x) for x in ..]
groupBy
- >itertools.groupBy()
这是flatten
没有python等价的实现:
def flatten(L): for item in L: try: for i in flatten(item): yield i except TypeError: yield item
所以你应该能够把它们放在一起寻找解决方案。
不负相思意
TA贡献1777条经验 获得超10个赞
使用flatmap。像下面的东西应该工作
from pyspark.sql import Rowdef rowExpander(row): rowDict = row.asDict() valA = rowDict.pop('A') for k in rowDict: yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
添加回答
举报
0/150
提交
取消