为了账号安全,请及时绑定邮箱和手机立即绑定

将 JSON 保存到 HDFS 的结构化流

将 JSON 保存到 HDFS 的结构化流

繁星淼淼 2023-04-26 14:18:58
我的 Structured Spark Streaming 程序是从 Kafka 读取 JSON 数据并以 JSON 格式写入 HDFS。我能够将 JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark                    .readStream()                    .format("kafka")                    .option("kafka.bootstrap.servers", input_bootstrap_server)                    .option("subscribe", topics[0])                    .load();    //Save Stream to HDFS    StreamingQuery ds = df             .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
查看完整描述

1 回答

?
BIG阳

TA贡献1859条经验 获得超6个赞

以下 .select("data.*") 达到了目的。

StreamingQuery ds = df
                        .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
                        .select("data.*")
                        .writeStream()
                        .format("json")
                        .outputMode(OutputMode.Append())
                        .option("path", destPath)
                        .option("checkpointLocation", checkpoint)
                        .start();


查看完整回答
反对 回复 2023-04-26
  • 1 回答
  • 0 关注
  • 159 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信