3 回答
TA贡献1951条经验 获得超3个赞
更新
Set
, Seq
, Map
, Date
, Timestamp
BigDecimal
SQLImplicits
.
@since 2.0.0
Encoders.scala
SQLImplicits.scala
到底是什么问题?
SparkSession
Encoders
createDataset
Encoder[T]
T
import spark.implicits._
import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。
Product
import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int) // ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
未支持的OperationException:没有为MyObj找到编码器
Product
MyObj
Dataset[(Int,MyObj)]
MyObj
一些扩展的类 Product
编译,尽管在运行时总是崩溃,而且 没有办法传递嵌套类型的自定义编码器(我无法仅为 MyObj
使它知道如何编码 Wrap[MyObj]
或 (Int,MyObj)
).
只管用 kryo
kryo
import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
spark-shell
spark.implicits._
class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
kryo
map
, filter
, foreach
join
d2
d3
d2.printSchema// root// |-- value: binary (nullable = true)
元组的部分解
import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._ // we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these
class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)) .toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
_1
, _2
"value"
d4.printSchema// root// |-- _1: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)// |-- _2: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)
允许我们为元组获得单独的列(因此我们可以再次加入元组,耶!) 我们可以再一次依赖于请求(所以不需要经过。) kryo
(到处都是) 几乎完全向后兼容 import spark.implicits._
(涉及一些重命名) 是吗? 不
让我们加入 kyro
序列化二进制列,更不用说那些可能具有 将某些元组列重命名为“value”(如果有必要的话,可以通过转换将其撤消),会产生令人不快的副作用。 .toDF
,指定新的列名,并将其转换回DataSet-模式名称似乎通过联接(最需要它们的地方)被保留。
一般类的部分解
kryo
MyObj
Int
, java.util.UUID
Set[String]
kryo
String
UUID
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = (Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]
d.printSchema// root// |-- i: integer (nullable = false)// |-- u: string (nullable = true)// |-- s: binary (nullable = true)
TA贡献2012条经验 获得超12个赞
假设您想使用自定义Enum:
trait CustomEnum { def value:String }case object Foo extends CustomEnum { val value = "F" }case object Bar extends CustomEnum { val value = "B" }object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}
// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum]. getName, classOf[CustomEnumUDT].getName)
case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())
假设您想使用多态记录:
trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly
case class UsingPoly(id:Int, poly:CustomPoly)Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false}).show()
class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly]}
// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly]. getName, classOf[CustomPolyUDT].getName)
// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false}).show()
添加回答
举报