我正在编写一个库,以将Apache Spark与自定义环境集成。我正在实现自定义流源和流编写器。我正在开发的某些资源至少在应用程序崩溃后是不可恢复的。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation'选项。但是,如果未提供该选项,则会看到以下错误:org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);但是,如果我使用控制台流输出,则一切正常。有没有办法获得相同的行为?注意:我们正在将Spark v2接口用于流读取器/写入器。火花日志:18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpointorg.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...); at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213) at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296) ...18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook这就是我开始流媒体作业的方式:spark.readStream().format("mysource").load() .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();一切正常,相反,例如,如果我运行:spark.readStream().format("mysource").load() .writeStream().format("console").outputMode(OutputMode.Append()).start();
添加回答
举报
0/150
提交
取消