为了账号安全,请及时绑定邮箱和手机立即绑定

如何在数据集中存储自定义对象?

如何在数据集中存储自定义对象?

婷婷同学_ 2019-06-19 15:45:49
如何在数据集中存储自定义对象?根据介绍星火数据集:在我们期待Spark2.0的同时,我们计划对数据集进行一些令人兴奋的改进,特别是:.自定义编码器-尽管我们目前为各种各样的类型自动生成编码器,但我们希望为自定义对象打开一个API。并尝试将自定义类型存储在Dataset导致以下错误:无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。或:异常:未找到编码器用于.有什么解决办法吗?注意,这个问题仅作为CommunityWiki答案的入口点存在。随时更新/改进问题和答案。
查看完整描述

3 回答

?
饮歌长啸

TA贡献1951条经验 获得超3个赞

更新

这个答案仍然是有效和信息丰富的,尽管现在情况更好,自从2.2/2.3,这增加了内置编码器的支持SetSeqMapDateTimestamp,和BigDecimal..如果您坚持只使用case类和通常的Scala类型来创建类型,那么应该可以只使用SQLImplicits.


不幸的是,在这方面几乎没有增加任何帮助。寻觅@since 2.0.0在……里面Encoders.scalaSQLImplicits.scala查找主要与原始类型有关的内容(以及对Case类的一些调整)。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。..这样的话,下面是一些我们可以期望做得很好的技巧,考虑到我们目前所拥有的一切。作为一个预先的免责声明:这不会完美的工作,我会尽我最大的努力使所有的限制清楚和预先。

到底是什么问题?

当您想创建数据集时,SPark“需要一个编码器(将T类型的JVM对象与内部SparkSQL表示形式相互转换),该编码器通常是通过从SparkSession,也可以通过调用Encoders“(摘自医生createDataset)。编码器将采用以下形式Encoder[T]哪里T是您正在编码的类型。第一个建议是增加import spark.implicits._(这给了你这些(第二个建议是显式地传递隐式编码器,使用这,这个一组编码器相关功能。

没有普通类可用的编码器,所以

import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

将给出以下隐式相关编译时错误:

无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。

但是,如果将刚才用于在某个类中获取上述错误的任何类型包装,则Product,错误会被延迟到运行时,所以

import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int)
// ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译很好,但是在运行时会失败。

未支持的OperationException:没有为MyObj找到编码器

这样做的原因是,实施者SPark创建的Induces实际上只在运行时(通过Scala关系)生成。在本例中,在编译时所有的SPark检查都是最外层的类扩展Product(所有的CASE类都这样做),并且只在运行时才意识到它仍然不知道如何处理MyObj(如果我试图创建一个Dataset[(Int,MyObj)]-星火等待运行时继续运行MyObj)。这些是迫切需要解决的核心问题:

  • 一些扩展的类

    Product

    编译,尽管在运行时总是崩溃,而且
  • 没有办法传递嵌套类型的自定义编码器(我无法仅为

    MyObj

    使它知道如何编码

    Wrap[MyObj]

    (Int,MyObj)).

只管用kryo

每个人建议的解决方案是使用kryo编码器。

import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

不过,这很快就会变得很乏味。特别是当您的代码正在操作各种数据集、加入、分组等时。那么,为什么不直接默示这一切都是自动完成的呢?

import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,我似乎可以做任何我想做的事情(下面的示例在spark-shell哪里spark.implicits._自动导入)

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2")
 // mapping works fine and ..val d3 = d1.map(d => (d.i,  d)).alias("d3") 
 // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者几乎。问题是使用kryo导致SPark只将DataSet中的每一行存储为平面二进制对象。为mapfilterforeach这就足够了,但是对于像join,星火确实需要将它们分隔成列。检查架构d2d3,您可以看到只有一个二进制列:

d2.printSchema// root//  |-- value: binary (nullable = true)

元组的部分解

因此,在Scala中使用InstitucesinScala的魔力(更多在6.26.3过载分辨率),我可以为自己做一系列能做得尽可能好的事情,至少对于元组来说是这样,并且可以很好地与现有的Institutions一起工作:

import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._  
// we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]):
 Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these

然后,带着这些请求,我可以让上面的例子起作用,尽管用了一些列重命名。

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d))
.toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何获得预期的元组名称(_1_2.)默认情况下不用重命名-如果有人想玩这个游戏,这,这个名字"value"被介绍和这,这个通常添加元组名称的位置。但是,关键是我现在有了一个很好的结构化模式:

d4.printSchema// root//  |-- _1: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  |   
 |-- _2: binary (nullable = true)//  |-- _2: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  | 
    |-- _2: binary (nullable = true)

总之,这个解决办法是:

  • 允许我们为元组获得单独的列(因此我们可以再次加入元组,耶!)
  • 我们可以再一次依赖于请求(所以不需要经过。)

    kryo

    (到处都是)
  • 几乎完全向后兼容

    import spark.implicits._

    (涉及一些重命名)
  • 是吗?

    让我们加入

    kyro

    序列化二进制列,更不用说那些可能具有
  • 将某些元组列重命名为“value”(如果有必要的话,可以通过转换将其撤消),会产生令人不快的副作用。

    .toDF

    ,指定新的列名,并将其转换回DataSet-模式名称似乎通过联接(最需要它们的地方)被保留。

一般类的部分解

这个不太愉快,也没有很好的解决办法。但是,现在我们有了上面的元组解决方案,我有一个预感-来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建DataSet之后,您可能会使用dataframe方法重命名这些列。如果一切顺利,这是真的一个改进,因为我现在可以在类的字段上执行联接。如果我只使用了一个平面二进制kryo序列化程序是不可能的。

下面是一个做了一些事情的例子:我有一个类MyObj其中有类型的字段。Intjava.util.UUID,和Set[String]..第一个照顾好自己。第二个,虽然我可以使用kryo如果存储为String(自UUID这通常是我想反对的事情。第三个真正属于二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = 
(Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit 
def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用这个机器创建一个具有良好模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]

模式向我展示了I列的正确名称和前两种情况,这两种情况我都可以使用。

d.printSchema// root//  |-- i: integer (nullable = false)//  |-- u: string (nullable = true)//  |-- s: binary (nullable = true)


查看完整回答
反对 回复 2019-06-19
?
繁花如伊

TA贡献2012条经验 获得超12个赞

您可以使用UDT注册,然后使用案例类、元组等.所有正确的工作与您的用户定义的类型!

假设您想使用自定义Enum:

trait CustomEnum { def value:String }case object Foo extends CustomEnum  { val value = "F" }case object Bar extends CustomEnum  
{ val value = "B" }object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}

登记如下:

// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class!
  // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum].
  getName, classOf[CustomEnumUDT].getName)

那就用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())

假设您想使用多态记录:

trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly

..它的用法是这样的:

case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()

您可以编写一个自定义的UDT,它将所有内容编码为字节(我在这里使用java序列化,但更好的方法可能是检测SPark的Kryo上下文)。

首先定义UDT类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]}

然后注册:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly].
getName, classOf[CustomPolyUDT].getName)

那你就可以用它了!

// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()


查看完整回答
反对 回复 2019-06-19
  • 3 回答
  • 0 关注
  • 659 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信