简而言之,我是一名开发人员,试图使用 Spark 将数据从一个系统移动到另一个系统。一个系统中的原始数据以经过处理、汇总的形式进入一个本土的分析系统。我对 Spark 非常陌生——我的知识仅限于我在过去一两周内能够挖掘和试验的内容。我想象的是;使用 Spark 监视来自 Kafka 的事件作为触发器。捕获消费者事件上的实体/数据,并使用它来告诉我分析系统中需要更新的内容。然后,我将对原始 Cassandra 数据运行相关的 Spark 查询,并将结果写入分析端的不同表中,仪表板指标将其称为数据源。我有一个简单的 Kafka 结构化流查询工作。虽然我可以看到消耗的对象被输出到控制台,但当消费者事件发生时,我无法检索 Kafka 记录:try { SparkSession spark = SparkSession .builder() .master(this.sparkMasterAddress) .appName("StreamingTest2") .getOrCreate(); //THIS -> None of these events seem to give me the data consumed? //...thinking I'd trigger the Cassandra write from here? spark.streams().addListener(new StreamingQueryListener() { @Override public void onQueryStarted(QueryStartedEvent queryStarted) { System.out.println("Query started: " + queryStarted.id()); } @Override public void onQueryTerminated(QueryTerminatedEvent queryTerminated) { System.out.println("Query terminated: " + queryTerminated.id()); } @Override public void onQueryProgress(QueryProgressEvent queryProgress) { System.out.println("Query made progress: " + queryProgress.progress()); } }); Dataset<Row> reader = spark .readStream() .format("kafka") .option("startingOffsets", "latest") .option("kafka.bootstrap.servers", "...etc...") .option("subscribe", "my_topic") .load(); Dataset<String> lines = reader .selectExpr("cast(value as string)") .as(Encoders.STRING()); StreamingQuery query = lines .writeStream() .format("console") .start(); query.awaitTermination();} catch (Exception e) { e.printStackTrace();}我的想法是;使用前者触发后者,将此东西捆绑为 Spark 应用程序/包/任何东西,然后将其部署到 Spark 中。那时,我希望它不断地将更新推送到指标表。这会是我需要的可行、可扩展、合理的解决方案吗?我在正确的道路上吗?如果以某种方式更容易或更好,我不反对使用 Scala。
1 回答
宝慕林4294392
TA贡献2021条经验 获得超8个赞
知道了。了解了 ForeachWriter。效果很好:
StreamingQuery query = lines
.writeStream()
.format("foreach")
.foreach(new ForeachWriter<String>() {
@Override
public void process(String value) {
System.out.println("process() value = " + value);
}
@Override
public void close(Throwable errorOrNull) {}
@Override
public boolean open(long partitionId, long version) {
return true;
}
})
.start();
添加回答
举报
0/150
提交
取消