为了账号安全,请及时绑定邮箱和手机立即绑定

如何仅从 kafka 源获取值以激发?

如何仅从 kafka 源获取值以激发?

慕的地10843 2023-04-13 10:54:22
我从 kafka 来源获取日志,并将其放入 spark 中。保存在我的 hadoop_path 中的日志格式如下所示{"value":"{\"Name\":\"Amy\",\"Age\":\"22\"}"}{"value":"{\"Name\":\"Jin\",\"Age\":\"26\"}"}但是,我想让它像{\"Name\":\"Amy\",\"Age\":\"22\"}{\"Name\":\"Jin\",\"Age\":\"26\"}任何一种解决方案都会很棒。(使用纯 Java 代码、Spark SQL 或 Kafka)        SparkSession spark = SparkSession.builder()                .master("local")                .appName("MYApp").getOrCreate();        Dataset<Row> df = spark                .readStream()                .format("kafka")                .option("kafka.bootstrap.servers", Kafka_source)                .option("subscribe", Kafka_topic)                .option("startingOffsets", "earliest")                .option("failOnDataLoss",false)                .load();        Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");        StreamingQuery queryone = dg.writeStream()                .format("json")                .outputMode("append")                .option("checkpointLocation",Hadoop_path)                .option("path",Hadoop_path)                .start();
查看完整描述

3 回答

?
哔哔one

TA贡献1854条经验 获得超8个赞

我已经用 from_json 函数完成了!!

        SparkSession spark = SparkSession.builder()

                .master("local")

                .appName("MYApp").getOrCreate();

        Dataset<Row> df = spark

                .readStream()

                .format("kafka")

                .option("kafka.bootstrap.servers", Kafka_source)

                .option("subscribe", Kafka_topic)

                .option("startingOffsets", "earliest")

                .option("failOnDataLoss",false)

                .load();

        Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");

        Dataset<Row> dz = dg.select(

                        from_json(dg.col("value"), DataTypes.createStructType(

                        new StructField[] {

                                DataTypes.createStructField("Name", StringType,true)

                        })).getField("Name").alias("Name")

                        ,from_json(dg.col("value"), DataTypes.createStructType(

                        new StructField[] {

                                DataTypes.createStructField("Age", IntegerType,true)

                        })).getField("Age").alias("Age")

        StreamingQuery queryone = dg.writeStream()

                .format("json")

                .outputMode("append")

                .option("checkpointLocation",Hadoop_path)

                .option("path",Hadoop_path)

                .start();


查看完整回答
反对 回复 2023-04-13
?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

您可以使用 Spark 获得预期的结果,如下所示:


SparkSession spark = SparkSession.builder()

                .master("local")

                .appName("MYApp").getOrCreate();


Dataset<Row> df = spark

                .readStream()

                .format("kafka")

                .option("kafka.bootstrap.servers", Kafka_source)

                .option("subscribe", Kafka_topic)

                .option("startingOffsets", "earliest")

                .option("failOnDataLoss",false)

                .load();


Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)")

        .withColumn("Name", functions.json_tuple(functions.col("value"),"Name"))

        .withColumn("Age", functions.json_tuple(functions.col("value"),"Age"));


StreamingQuery queryone = dg.writeStream()

                .format("json")

                .outputMode("append")

                .option("checkpointLocation",Hadoop_path)

                .option("path",Hadoop_path)

                .start();

基本上,您必须为值列中 json 字符串中的每个字段创建单独的列。


查看完整回答
反对 回复 2023-04-13
?
拉丁的传说

TA贡献1789条经验 获得超8个赞

使用以下内容:


Dataframe<Row> df = spark

                .readStream()

                .format("kafka")

                .option("kafka.bootstrap.servers", Kafka_source)

                .option("subscribe", Kafka_topic)

                .option("startingOffsets", "earliest")

                .option("failOnDataLoss",false)

                .load();

df.printSchema();

StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")

            .writeStream()

            .format("json")

            .outputMode("append")

            .option("checkpointLocation",Hadoop_path)

            .option("path",Hadoop_path)

            .start();

确保架构包含value作为列。


查看完整回答
反对 回复 2023-04-13
  • 3 回答
  • 0 关注
  • 72 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信