为了账号安全,请及时绑定邮箱和手机立即绑定

将 JavaRDD<Status> 转换为 JavaRDD<String> 的问题

将 JavaRDD<Status> 转换为 JavaRDD<String> 的问题

人到中年有点甜 2023-06-08 20:51:09
我正在尝试将推文从 twitter 保存到 MongoDb 数据库。我有RDD<Status>,我正在尝试借助 ObjectMapper 将其转换为 JSON 格式。但是这种转换存在一些问题(public class Main {    //set system credentials for access to twitter    private static void setTwitterOAuth() {        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);    }    public static void main(String [] args) {        setTwitterOAuth();        SparkConf conf = new SparkConf().setMaster("local[2]")                                        .setAppName("SparkTwitter");        JavaSparkContext sparkContext = new JavaSparkContext(conf);        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);        //Stream that contains just tweets in english        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());        enTweetsDStream.print();        jssc.start();        jssc.awaitTermination();    }    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {     try {            ObjectMapper objectMapper = new ObjectMapper();            SQLContext sqlContext = new SQLContext(sparkContext);            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));            DataFrame dataFrame = sqlContext.read().json(tweet);        } catch (Exception e) {            System.out.println("Error saving to database");        }    }JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));这是一个问题。需要不兼容的类型JavaRDD<String>,但地图被推断为javaRDD<R>
查看完整描述

1 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

不幸的是,Java 类型推断并不总是非常聪明,所以我在这些情况下所做的是提取我的 lambda 的所有位作为变量,直到我找到一个 Java 无法为其提供准确类型的位。然后我给表达式我认为它应该具有的类型,看看为什么 Java 会抱怨它。有时它只是编译器的一个限制,您必须显式地将表达式“转换”为所需的类型,有时您会发现代码存在问题。在你的情况下,代码对我来说很好,所以一定有别的东西。

然而,我有一个评论:在这里你支付一次 JSON 序列化(从StatusJSON 字符串)然后反序列化(从 JSON 字符串到Row)的成本。另外,您没有向您提供任何架构Dataset,因此它必须两次传递数据(或根据您的配置对其进行采样)以推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且相对简单,我建议您直接编写从Status到的转换。RowStatus

另一个“顺便说一句”:您正在隐式序列化您的ObjectMapper,很可能您不想这样做。看起来该类确实支持 Java 序列化,但具有特殊的逻辑。由于 Spark 的默认配置是使用 Kryo(其性能比 Java 序列化好得多),我怀疑它在使用默认FieldSerializer. 您有以下三种选择:

  • 使对象映射器静态化以避免序列化它

  • 配置您的 Kryo 注册器以ObjectMapper使用 Java 序列化序列化/反序列化类型的对象。那会起作用,但不值得付出努力。

  • 到处使用 Java 序列化而不是 Kryo。馊主意!它很慢并且占用大量空间(内存和磁盘取决于序列化对象的写入位置)。


查看完整回答
反对 回复 2023-06-08
  • 1 回答
  • 0 关注
  • 114 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信