为了账号安全,请及时绑定邮箱和手机立即绑定

如何将 DataFrame 转换为 csv 文件并将其放入远程 SFTP 服务器?

如何将 DataFrame 转换为 csv 文件并将其放入远程 SFTP 服务器?

慕莱坞森 2022-05-25 10:19:48
在我的Spark应用程序中,我需要转换DataFrame为.csv文件并将其放入远程SFTP服务器。我决定使用spark-sftp库来完成这项任务。我的sbt文件如下所示:import sbt.Keys.scalaVersionname := "TEST"version := "0.1"scalaVersion := "2.11.12"val sparkVersion = "2.3.2"val ENVIRONMENT_MODE = "development"mainClass in Compile := Some("MainApp")mainClass in (Compile, packageBin) := Some("MainApp")mainClass in assembly := Some("MainApp")assemblyJarName in assembly := ENVIRONMENT_MODE + "_test" + ".jar"// Spark Packages from "bintray.com"resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven/"// "Spark Project SQL"libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion// "Spark Project Core"libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion// Current library is a PostgreSQL database connection JDBC4 driver.libraryDependencies += "postgresql" % "postgresql" % "9.1-901-1.jdbc4"// "scala-xml" is a Scala library for working with XML files.libraryDependencies += "org.scala-lang.modules" %% "scala-xml" % "1.1.1"// "Apache Commons VFS" is a virtual file system library.libraryDependencies += "org.apache.commons" % "commons-vfs2" % "2.2"libraryDependencies ++= Seq(  "org.scalatest"    %% "scalatest"  % "3.0.5" % "test",  "com.jcraft"        % "jsch"       % "0.1.54",  "com.springml"     %% "spark-sftp" % "1.1.3")assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)// The mapping of path names to merge strategies is done via the setting "assemblyMergeStrategy".assemblyMergeStrategy in assembly := {  case PathList("META-INF", _ @ _*) => MergeStrategy.discard  case _ => MergeStrategy.last}
查看完整描述

2 回答

?
aluckdog

TA贡献1847条经验 获得超7个赞

我基本上可以使用您的代码完成这项工作,对 sbt 文件进行了一些更改: Github Repo with full code sample


name := "test-sftp-upload"


version := "0.0.1"


scalaVersion := "2.11.12"


resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"


libraryDependencies ++= Seq(

        "org.apache.spark" %% "spark-sql"  % "2.3.2",

        "org.apache.spark" %% "spark-core" % "2.3.2",

        "com.jcraft"        % "jsch"       % "0.1.55",

        "org.scalatest"    %% "scalatest"  % "3.0.5" % "test",

        "com.springml"     %% "spark-sftp" % "1.1.3"

        )



// JAR file settings

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

几乎相同的应用程序文件:


package org.twelveHart.example.sftp


import org.apache.spark.sql.DataFrame


object sftpTest extends SparkSessionWrapper {

  def main(args: Array[String]): Unit = {


    import spark.implicits._

    spark.sparkContext.setLogLevel("ERROR")


    val df: DataFrame = Seq(

      ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),

      ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),

      ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),

      ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),

      ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")

    ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")


    df.printSchema()


    df.write

      .format("com.springml.spark.sftp")

      .option("host", "localhost")

      .option("username", "XXXXXX")

      .option("password", "XXXXXXX")

      .option("fileType", "csv")

      .option("delimiter", ";")

      .option("codec", "bzip2")

      .save("/tmp/daily.csv")


    spark.stop()

  }

}


查看完整回答
反对 回复 2022-05-25
?
慕神8447489

TA贡献1780条经验 获得超1个赞

我注意到spark-sftp库 ( 1.1.3) 有几个依赖项。其中有sftp-client ( 1.0.3) 库。spark-sftp库使用一些sftp-client重复的库方法。这是我的有效代码。


def runJob(): Unit ={

    try {

      val spark: SparkSession = initializeSpark()

      import spark.sqlContext.implicits._


      // Create DataFrame.

      val df: DataFrame  = Seq(("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"), ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"), ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"), ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"), ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

      df.show()


      // Create the object based on class "SFTPClient".

      val sftpClient = new SFTPClient(null, "username", "password", "host", 22)


      val tmpFolder = System.getProperty("java.io.tmpdir")

      val hdfsTemp = tmpFolder


      val source = writeToTemp(spark, df, hdfsTemp, tmpFolder, "csv", "true", ";", "rowTag", "rootTag")


      println("source: " + source)


      // Copy file to FTP server.

      sftpClient.copyToFTP(source, "/reports/example.csv")

    } catch {

      case e: Exception => e.printStackTrace()

    }

  }


  def writeToTemp(sparkSession: SparkSession, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, delimiter: String, rowTag: String, rootTag: String) : String = {


    val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID

    val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix

    val localTempLocation = tempFolder + File.separator + randomSuffix


    println("hdfsTempLocation: " + hdfsTempLocation)

    println("localTempLocation: " + localTempLocation)


    addShutdownHook(localTempLocation)


    df.coalesce(1).write.option("header", header).option("delimiter", delimiter).csv(hdfsTempLocation)


    copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation)


    println(copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation))


    copiedFile(localTempLocation)

  }


  def addShutdownHook(tempLocation: String) {

    println("Adding hook for file " + tempLocation)

    val hook = new DeleteTempFileShutdownHook(tempLocation)

    Runtime.getRuntime.addShutdownHook(hook)

  }


  def copyFromHdfs(sparkSession: SparkSession, hdfsTemp : String, fileLocation : String): String  = {

    val hadoopConf = sparkSession.sparkContext.hadoopConfiguration

    val hdfsPath = new Path(hdfsTemp)

    val fs = hdfsPath.getFileSystem(hadoopConf)

    if ("hdfs".equalsIgnoreCase(fs.getScheme)) {

      fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))

      fs.deleteOnExit(new Path(hdfsTemp))

      fileLocation

    } else {

      hdfsTemp

    }

  }


  def copiedFile(tempFileLocation: String) : String = {

    val baseTemp = new File(tempFileLocation)

    val files = baseTemp.listFiles().filter { x =>

      !x.isDirectory && !x.getName.contains("SUCCESS") && !x.isHidden && !x.getName.contains(".crc")

    }

    files(0).getAbsolutePath

  }

我删除了有关codec选项的信息,因为最终 csv 文件中的字符集存在问题。


查看完整回答
反对 回复 2022-05-25
  • 2 回答
  • 0 关注
  • 96 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信