2 回答
TA贡献1847条经验 获得超7个赞
我基本上可以使用您的代码完成这项工作,对 sbt 文件进行了一些更改: Github Repo with full code sample
name := "test-sftp-upload"
version := "0.0.1"
scalaVersion := "2.11.12"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.3.2",
"org.apache.spark" %% "spark-core" % "2.3.2",
"com.jcraft" % "jsch" % "0.1.55",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.springml" %% "spark-sftp" % "1.1.3"
)
// JAR file settings
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
几乎相同的应用程序文件:
package org.twelveHart.example.sftp
import org.apache.spark.sql.DataFrame
object sftpTest extends SparkSessionWrapper {
def main(args: Array[String]): Unit = {
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val df: DataFrame = Seq(
("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
df.printSchema()
df.write
.format("com.springml.spark.sftp")
.option("host", "localhost")
.option("username", "XXXXXX")
.option("password", "XXXXXXX")
.option("fileType", "csv")
.option("delimiter", ";")
.option("codec", "bzip2")
.save("/tmp/daily.csv")
spark.stop()
}
}
TA贡献1780条经验 获得超1个赞
我注意到spark-sftp库 ( 1.1.3) 有几个依赖项。其中有sftp-client ( 1.0.3) 库。spark-sftp库使用一些sftp-client重复的库方法。这是我的有效代码。
def runJob(): Unit ={
try {
val spark: SparkSession = initializeSpark()
import spark.sqlContext.implicits._
// Create DataFrame.
val df: DataFrame = Seq(("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"), ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"), ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"), ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"), ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
df.show()
// Create the object based on class "SFTPClient".
val sftpClient = new SFTPClient(null, "username", "password", "host", 22)
val tmpFolder = System.getProperty("java.io.tmpdir")
val hdfsTemp = tmpFolder
val source = writeToTemp(spark, df, hdfsTemp, tmpFolder, "csv", "true", ";", "rowTag", "rootTag")
println("source: " + source)
// Copy file to FTP server.
sftpClient.copyToFTP(source, "/reports/example.csv")
} catch {
case e: Exception => e.printStackTrace()
}
}
def writeToTemp(sparkSession: SparkSession, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, delimiter: String, rowTag: String, rootTag: String) : String = {
val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
val localTempLocation = tempFolder + File.separator + randomSuffix
println("hdfsTempLocation: " + hdfsTempLocation)
println("localTempLocation: " + localTempLocation)
addShutdownHook(localTempLocation)
df.coalesce(1).write.option("header", header).option("delimiter", delimiter).csv(hdfsTempLocation)
copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation)
println(copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation))
copiedFile(localTempLocation)
}
def addShutdownHook(tempLocation: String) {
println("Adding hook for file " + tempLocation)
val hook = new DeleteTempFileShutdownHook(tempLocation)
Runtime.getRuntime.addShutdownHook(hook)
}
def copyFromHdfs(sparkSession: SparkSession, hdfsTemp : String, fileLocation : String): String = {
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
val hdfsPath = new Path(hdfsTemp)
val fs = hdfsPath.getFileSystem(hadoopConf)
if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
fs.deleteOnExit(new Path(hdfsTemp))
fileLocation
} else {
hdfsTemp
}
}
def copiedFile(tempFileLocation: String) : String = {
val baseTemp = new File(tempFileLocation)
val files = baseTemp.listFiles().filter { x =>
!x.isDirectory && !x.getName.contains("SUCCESS") && !x.isHidden && !x.getName.contains(".crc")
}
files(0).getAbsolutePath
}
我删除了有关codec选项的信息,因为最终 csv 文件中的字符集存在问题。
添加回答
举报