我的 Structured Spark Streaming 程序是从 Kafka 读取 JSON 数据并以 JSON 格式写入 HDFS。我能够将 JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", input_bootstrap_server) .option("subscribe", topics[0]) .load(); //Save Stream to HDFS StreamingQuery ds = df .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
1 回答
BIG阳
TA贡献1859条经验 获得超6个赞
以下 .select("data.*") 达到了目的。
StreamingQuery ds = df .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data")) .select("data.*") .writeStream() .format("json") .outputMode(OutputMode.Append()) .option("path", destPath) .option("checkpointLocation", checkpoint) .start();
添加回答
举报
0/150
提交
取消