2 回答
TA贡献1830条经验 获得超9个赞
import json
from pyspark import SparkContext
if __name__ == '__main__':
input_review_json_path = "publicdata/review.json"
input_business_json_path = "publicdata/business.json"
output_csv_path = "outputs/user_state.csv"
stars = "4.0"
sc = SparkContext.getOrCreate()
input_business_lines = sc.textFile(input_business_json_path) \
.map(lambda lines: json.loads(lines))
business_ids = input_business_lines \
.map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
.filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))
input_review_lines = sc.textFile(input_review_json_path) \
.map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))
finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))
review_rdd = finalRdd.collect()
TA贡献1719条经验 获得超6个赞
你可以加入那些rdd。
import json
stars = 4.0
input_business_lines = sc.textFile('test.json') \
.map(lambda lines: json.loads(lines))
business_ids = input_business_lines \
.filter(lambda kv: kv['stars'] >= stars) \
.map(lambda kv: (kv['business_id'], kv['state']))
print(business_ids.collect())
input_review_lines = sc.textFile('test2.json') \
.map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
.map(lambda kv: (kv['business_id'], kv['user_id']))
joined = business_ids \
.join(rew_ids_bus_ids)
print(joined.collect())
# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]
# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]
添加回答
举报