我使用以下 JSON 结构处理来自 Kafka 的消息:{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}我想打印出我收到的内容。这是我已经完成的代码片段:JavaSparkContext sc = createJavaSparkContext();JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));SparkSession sparkSession = SparkSession .builder() .config(new SparkConf()) .getOrCreate();Dataset<Row> df = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT) .option("subscribe", KAFKA_TOPIC) .load();StreamingQuery query = df.selectExpr("CAST(value AS STRING)") .select(from_json(new Column("value"), getSchema())).as("data"). select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() { @Override public void process(Row value) { System.out.println(value); } @Override public void close(Throwable errorOrNull) { } @Override public boolean open(long partitionId, long version) { return true; } }) .start(); query.awaitTermination();架构方法:private static StructType getSchema() { return new StructType(new StructField[]{ new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()), new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()), new StructField(IP, DataTypes.StringType, false, Metadata.empty()), new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()), });}如何克服这个问题?对此有何建议?
1 回答
慕虎7371278
TA贡献1802条经验 获得超4个赞
异常的这一部分准确地告诉您在哪里寻找答案:
无法解析给定输入列的“data.category_id”:[jsontostruct(value)]
换句话说,data.category_id
可用列中没有一列只是 1 列jsontostruct(value)
。
这意味着仅select
在流式查询中不起作用。原因相当简单(我可以将其视为拼写错误)——在Column和Datasetas("data")
类型上可用的右括号太多。
总之,替换查询的以下部分:
.select(from_json(new Column("value"), getSchema())).as("data")
至以下内容:
.select(from_json(new Column("value"), getSchema()).as("data"))
请注意,我将一个右括号移到了末尾。
添加回答
举报
0/150
提交
取消