为了账号安全,请及时绑定邮箱和手机立即绑定

如何根据 rdd 中包含 pyspark 元组数组的第一个元素进行过滤?

如何根据 rdd 中包含 pyspark 元组数组的第一个元素进行过滤?

幕布斯6054654 2023-09-26 16:40:39
我在从 rdd 中过滤元组列表时遇到问题。示例business.json{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}from pyspark import SparkContextsc = SparkContext.getOrCreate()stars = "4.0"input_business_lines = sc.textFile('data/business.json') \    .map(lambda lines: json.loads(lines))business_ids = input_business_lines \    .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()上面的代码返回每个元组具有的元组列表(first element = business_id , second element = state)[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]到目前为止一切都很好。现在我需要与评论表进行联接,并希望使用评论的 rdd 过滤所有匹配的业务 id。如果那是一个数据框那就容易多了。但对于元组,我不确定我们该怎么做。示例 review.json{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},
查看完整描述

2 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

import json

from pyspark import SparkContext


if __name__ == '__main__':

    input_review_json_path = "publicdata/review.json"

    input_business_json_path = "publicdata/business.json"

    output_csv_path = "outputs/user_state.csv"

    stars = "4.0"


    sc = SparkContext.getOrCreate()


    input_business_lines = sc.textFile(input_business_json_path) \

                             .map(lambda lines: json.loads(lines))


    business_ids = input_business_lines \

                        .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \

                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))


    input_review_lines = sc.textFile(input_review_json_path) \

                            .map(lambda lines: json.loads(lines))


    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))

    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))


    review_rdd = finalRdd.collect()


查看完整回答
反对 回复 2023-09-26
?
慕侠2389804

TA贡献1719条经验 获得超6个赞

你可以加入那些rdd。


import json


stars = 4.0

input_business_lines = sc.textFile('test.json') \

    .map(lambda lines: json.loads(lines))


business_ids = input_business_lines \

    .filter(lambda kv: kv['stars'] >= stars) \

    .map(lambda kv: (kv['business_id'], kv['state']))


print(business_ids.collect())


input_review_lines = sc.textFile('test2.json') \

    .map(lambda lines: json.loads(lines))


rew_ids_bus_ids = input_review_lines \

    .map(lambda kv: (kv['business_id'], kv['user_id']))


joined = business_ids \

    .join(rew_ids_bus_ids)


print(joined.collect())



# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]

# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]


查看完整回答
反对 回复 2023-09-26
  • 2 回答
  • 0 关注
  • 98 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信