3 回答
TA贡献1784条经验 获得超9个赞
Spark Datasets需要Encoders即将存储的数据类型。对于常见类型(原子,产品类型),有许多可用的预定义编码器,但是您必须首先从中导入这些编码器SparkSession.implicits才能使其工作:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
或者,您可以直接提供一个明确的
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
或隐式
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder 用于存储的类型。
请注意,Enocders还提供了一些Encoders针对原子类型的预定义,Encoders对于复杂的原子类型,可以使用进行预定义ExpressionEncoder。
进一步阅读:
对于内置编码器未涵盖的自定义对象,请参见如何在数据集中存储自定义对象?
对于Row对象,您必须在尝试将数据框行映射到更新的行时Encoder明确提供如编码器错误所示的对象
TA贡献1841条经验 获得超3个赞
我会用我自己的问题的答案来澄清,如果目标是定义一个简单的文字SparkData框架,而不是使用Scala元组和隐式转换,则更简单的方法是像这样直接使用Spark API:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("a", StringType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) ::
StructField("d", IntegerType) ::
StructField("e", IntegerType) :: Nil)
val data = List(
Row("001", 1, 0, 3, 4),
Row("001", 3, 4, 1, 7),
Row("001", null, 0, 6, 4),
Row("003", 1, 4, 5, 7),
Row("003", 5, 4, null, 2),
Row("003", 4, null, 9, 2),
Row("003", 2, 3, 0, 1)
)
val df = spark.createDataFrame(data.asJava, simpleSchema)
添加回答
举报