为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Spark - 在流式事件上捕获 Kafka 数据以触发工作流

Apache Spark - 在流式事件上捕获 Kafka 数据以触发工作流

泛舟湖上清波郎朗 2021-11-11 18:17:20
简而言之,我是一名开发人员,试图使用 Spark 将数据从一个系统移动到另一个系统。一个系统中的原始数据以经过处理、汇总的形式进入一个本土的分析系统。我对 Spark 非常陌生——我的知识仅限于我在过去一两周内能够挖掘和试验的内容。我想象的是;使用 Spark 监视来自 Kafka 的事件作为触发器。捕获消费者事件上的实体/数据,并使用它来告诉我分析系统中需要更新的内容。然后,我将对原始 Cassandra 数据运行相关的 Spark 查询,并将结果写入分析端的不同表中,仪表板指标将其称为数据源。我有一个简单的 Kafka 结构化流查询工作。虽然我可以看到消耗的对象被输出到控制台,但当消费者事件发生时,我无法检索 Kafka 记录:try {    SparkSession spark = SparkSession        .builder()        .master(this.sparkMasterAddress)        .appName("StreamingTest2")        .getOrCreate();    //THIS -> None of these events seem to give me the data consumed?    //...thinking I'd trigger the Cassandra write from here?    spark.streams().addListener(new StreamingQueryListener() {        @Override        public void onQueryStarted(QueryStartedEvent queryStarted) {            System.out.println("Query started: " + queryStarted.id());        }        @Override        public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {            System.out.println("Query terminated: " + queryTerminated.id());        }        @Override        public void onQueryProgress(QueryProgressEvent queryProgress) {            System.out.println("Query made progress: " + queryProgress.progress());        }    });    Dataset<Row> reader = spark        .readStream()        .format("kafka")        .option("startingOffsets", "latest")        .option("kafka.bootstrap.servers", "...etc...")        .option("subscribe", "my_topic")        .load();    Dataset<String> lines = reader        .selectExpr("cast(value as string)")        .as(Encoders.STRING());    StreamingQuery query = lines        .writeStream()        .format("console")        .start();    query.awaitTermination();} catch (Exception e) {    e.printStackTrace();}我的想法是;使用前者触发后者,将此东西捆绑为 Spark 应用程序/包/任何东西,然后将其部署到 Spark 中。那时,我希望它不断地将更新推送到指标表。这会是我需要的可行、可扩展、合理的解决方案吗?我在正确的道路上吗?如果以某种方式更容易或更好,我不反对使用 Scala。
查看完整描述

1 回答

?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

知道了。了解了 ForeachWriter。效果很好:


        StreamingQuery query = lines

            .writeStream()

            .format("foreach")

            .foreach(new ForeachWriter<String>() {

                @Override

                public void process(String value) {

                    System.out.println("process() value = " + value);

                }


                @Override

                public void close(Throwable errorOrNull) {}


                @Override

                public boolean open(long partitionId, long version) {

                    return true;

                }

            })

            .start(); 


查看完整回答
反对 回复 2021-11-11
  • 1 回答
  • 0 关注
  • 137 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信