所以,我有一个包含空间 ( latitude, longitude) 和时间 ( timestamp) 数据的 CSV 。为了对我们有用,我们将空间信息转换为“ geohash”,将时间信息转换为“ timehash”。问题是,如何使用 spark 为 CSV 中的每一行添加geohash和timehash作为字段(因为数据大约为200 GB)?我们尝试使用JavaPairRDD它的功能mapTopair,但问题仍然在于如何转换回 aJavaRdd然后转换为 CSV?所以我认为这是一个糟糕的解决方案,我要求一个简单的方法。问题更新:在@Alvaro 得到帮助后,我创建了这个 java 类:public class Hash {public static SparkConf Spark_Config;public static JavaSparkContext Spark_Context;UDF2 geohashConverter = new UDF2<Long, Long, String>() { public String call(Long latitude, Long longitude) throws Exception { // convert here return "calculate_hash"; }};UDF1 timehashConverter = new UDF1<Long, String>() { public String call(Long timestamp) throws Exception { // convert here return "calculate_hash"; }};public Hash(String path) { SparkSession spark = SparkSession .builder() .appName("Java Spark SQL Example") .config("spark.master", "local") .getOrCreate(); spark.udf().register("geohashConverter", geohashConverter, DataTypes.StringType); spark.udf().register("timehashConverter", timehashConverter, DataTypes.StringType); Dataset df=spark.read().csv(path) .withColumn("geohash", callUDF("geohashConverter", col("_c6"), col("_c7"))) .withColumn("timehash", callUDF("timehashConverter", col("_c1"))).write().csv("C:/Users/Ahmed/Desktop/preprocess2"); }public static void main(String[] args) { String path = "C:/Users/Ahmed/Desktop/cabs_trajectories/cabs_trajectories/green/2013"; Hash h = new Hash(path);}}然后我得到序列化问题,当我删除时消失 write().csv()
添加回答
举报
0/150
提交
取消