为了账号安全,请及时绑定邮箱和手机立即绑定

如何从 Spark 结构化流获取 Kafka 输出中的批次 ID

如何从 Spark 结构化流获取 Kafka 输出中的批次 ID

慕森卡 2023-08-04 15:37:10
我正在更新模式下运行 Spark 结构化流作业,并且无法确定是否可以获取每个更新的批次 ID。例如,当您以更新模式输出到控制台时,Spark 将在输出时显示每个批次编号:-------------------------------------------Batch: 0-------------------------------------------...-------------------------------------------Batch: 1-------------------------------------------...等等。我需要将相同的信息添加到发送到 Kafka 的每条消息中。为此,我只能使用 Spark 2.3,因此我无法使用 forEachBatch。我的工作输出一组特定维度的聚合指标。每个触发器,自上次触发器以来指标可能已更新 - 具有更新指标的维度将在下一批中输出,因为我正在更新模式下运行。当我将这些输出到 Kafka 时,我需要知道哪个批次是最新的 - 因此需要批次号。我认为 forEachBatch 可以满足我的需要,但不幸的是我无法访问 Spark 2.4。我可以使用 forEach 来完成这个任务吗?我仅限于使用更新模式,因为后期事件可能会出现并更新之前已输出的指标。这是我用来测试的控制台模式。此输出分别显示每个批次及其编号:StreamingQuery query = logs.writeStream()        .format("console")        .outputMode(OutputMode.Update())        .start();我想做这样的事情StreamingQuery query = agg.WriteStream()    .format("kafka")    .outputMode(OutputMode.Update())    .option("kafka.bootstrap.servers", "myconnection")    .Option("topic", "mytopic")    .Start();但仍然保留在mytopic中判断消息来自哪个批次的能力。这可能吗?
查看完整描述

1 回答

?
12345678_0001

TA贡献1802条经验 获得超5个赞

我认为你可以使用版本long version号ForeachWriter


你可以像这样实现你自己的KafkaCustomSink。



class KafkaCustomSink(val config: Config) extends ForeachWriter[String] {

  var producer: KafkaProducer[String, String] = _

  var _version: Long = _


  override def open(partitionId: Long, version: Long): Boolean = {

    _version = version

    val props = new Properties()

    props.put("bootstrap.servers", config(Constant.OUTPUT_BOOTSTRAP_SERVER))

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    props.put("acks", "0")

    producer = new KafkaProducer[String, String](props)

    true

  }


  override def process(value: String): Unit = {

    //use version here

    val record = new ProducerRecord[String, String](config(Constant.OUTPUT_TOPIC), null, "version : %s, data : %s".format(_version, value))

    producer.send(record)

  }


  override def close(errorOrNull: Throwable): Unit = {

    producer.close()

  }

}


并将其分配给


      logs

          .writeStream

          .outputMode("update")

          .foreach(new KafkaCustomSink(config))

          .trigger(Trigger.ProcessingTime(config(Constant.TRIGGER_INTERVAL).toInt, TimeUnit.SECONDS))

          .option("checkpointLocation", config(Constant.CHECKPOINT_LOCATION))


查看完整回答
反对 回复 2023-08-04
  • 1 回答
  • 0 关注
  • 120 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信