为了账号安全,请及时绑定邮箱和手机立即绑定

如何在流查询中使用 from_json 标准函数(在 select 中)?

如何在流查询中使用 from_json 标准函数(在 select 中)?

摇曳的蔷薇 2023-07-19 16:32:33
我使用以下 JSON 结构处理来自 Kafka 的消息:{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}我想打印出我收到的内容。这是我已经完成的代码片段:JavaSparkContext sc = createJavaSparkContext();JavaStreamingContext streamingContext =                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));SparkSession sparkSession = SparkSession        .builder()        .config(new SparkConf())        .getOrCreate();Dataset<Row> df = sparkSession        .readStream()        .format("kafka")        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)        .option("subscribe", KAFKA_TOPIC)        .load();StreamingQuery query = df.selectExpr("CAST(value AS STRING)")            .select(from_json(new Column("value"), getSchema())).as("data").                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {                @Override                public void process(Row value) {                    System.out.println(value);                }                @Override                public void close(Throwable errorOrNull) {                }                @Override                public boolean open(long partitionId, long version) {                    return true;                }            })            .start();    query.awaitTermination();架构方法:private static StructType getSchema() {    return new StructType(new StructField[]{            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),    });}如何克服这个问题?对此有何建议?
查看完整描述

1 回答

?
慕虎7371278

TA贡献1802条经验 获得超4个赞

异常的这一部分准确地告诉您在哪里寻找答案:

无法解析给定输入列的“data.category_id”:[jsontostruct(value)]

换句话说,data.category_id可用列中没有一列只是 1 列jsontostruct(value)

这意味着仅select在流式查询中不起作用。原因相当简单(我可以将其视为拼写错误)——在Column和Datasetas("data")类型上可用的右括号太多。

总之,替换查询的以下部分:

.select(from_json(new Column("value"), getSchema())).as("data")

至以下内容:

.select(from_json(new Column("value"), getSchema()).as("data"))

请注意,我将一个右括号移到了末尾。


查看完整回答
反对 回复 2023-07-19
  • 1 回答
  • 0 关注
  • 108 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信