3 回答
TA贡献1854条经验 获得超8个赞
我已经用 from_json 函数完成了!!
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
Dataset<Row> dz = dg.select(
from_json(dg.col("value"), DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("Name", StringType,true)
})).getField("Name").alias("Name")
,from_json(dg.col("value"), DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("Age", IntegerType,true)
})).getField("Age").alias("Age")
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
TA贡献1893条经验 获得超10个赞
您可以使用 Spark 获得预期的结果,如下所示:
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)")
.withColumn("Name", functions.json_tuple(functions.col("value"),"Name"))
.withColumn("Age", functions.json_tuple(functions.col("value"),"Age"));
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
基本上,您必须为值列中 json 字符串中的每个字段创建单独的列。
TA贡献1789条经验 获得超8个赞
使用以下内容:
Dataframe<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
df.printSchema();
StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")
.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
确保架构包含value作为列。
添加回答
举报