为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Spark 将字段添加到 Csv

使用 Spark 将字段添加到 Csv

偶然的你 2021-06-21 10:09:26
所以,我有一个包含空间 ( latitude, longitude) 和时间 ( timestamp) 数据的 CSV 。为了对我们有用,我们将空间信息转换为“ geohash”,将时间信息转换为“ timehash”。问题是,如何使用 spark 为 CSV 中的每一行添加geohash和timehash作为字段(因为数据大约为200 GB)?我们尝试使用JavaPairRDD它的功能mapTopair,但问题仍然在于如何转换回 aJavaRdd然后转换为 CSV?所以我认为这是一个糟糕的解决方案,我要求一个简单的方法。问题更新:在@Alvaro 得到帮助后,我创建了这个 java 类:public class Hash {public static SparkConf Spark_Config;public static JavaSparkContext Spark_Context;UDF2 geohashConverter = new UDF2<Long, Long, String>() {        public String call(Long latitude, Long longitude) throws Exception {        // convert here        return "calculate_hash";    }};UDF1 timehashConverter = new UDF1<Long, String>() {        public String call(Long timestamp) throws Exception {        // convert here        return "calculate_hash";    }};public Hash(String path) {    SparkSession spark = SparkSession            .builder()            .appName("Java Spark SQL Example")            .config("spark.master", "local")            .getOrCreate();        spark.udf().register("geohashConverter", geohashConverter, DataTypes.StringType);    spark.udf().register("timehashConverter", timehashConverter, DataTypes.StringType);    Dataset df=spark.read().csv(path)    .withColumn("geohash", callUDF("geohashConverter", col("_c6"), col("_c7")))    .withColumn("timehash", callUDF("timehashConverter", col("_c1"))).write().csv("C:/Users/Ahmed/Desktop/preprocess2"); }public static void main(String[] args) {    String path = "C:/Users/Ahmed/Desktop/cabs_trajectories/cabs_trajectories/green/2013";    Hash h = new Hash(path);}}然后我得到序列化问题,当我删除时消失 write().csv()
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 274 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信