为了账号安全,请及时绑定邮箱和手机立即绑定

SparkSQL:如何处理用户定义函数中的空值?

SparkSQL:如何处理用户定义函数中的空值?

MYYA 2019-11-18 13:15:44
给定表1,其中一列“ x”为String类型。我想用“ y”列创建表2,该列是“ x”中给出的日期字符串的整数表示。重要的是将null值保留在“ y”列中。表1(数据框df1):+----------+|         x|+----------+|2015-09-12||2015-09-13||      null||      null|+----------+root |-- x: string (nullable = true)表2(资料框df2):+----------+--------+                                                                  |         x|       y|+----------+--------+|      null|    null||      null|    null||2015-09-12|20150912||2015-09-13|20150913|+----------+--------+root |-- x: string (nullable = true) |-- y: integer (nullable = true)用户定义的函数(udf)将“ x”列的值转换为“ y”列的值是:val extractDateAsInt = udf[Int, String] (  (d:String) => d.substring(0, 10)      .filterNot( "-".toSet)      .toInt )并且有效,无法处理空值。即使,我可以做类似的事情val extractDateAsIntWithNull = udf[Int, String] (  (d:String) =>     if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt     else 1 )我发现没有办法null通过udfs “产生” 值(当然,因为Ints不能null)。我当前用于创建df2的解决方案(表2)如下:// holds data of table 1  val df1 = ... // filter entries from df1, that are not nullval dfNotNulls = df1.filter(df1("x")  .isNotNull)  .withColumn("y", extractDateAsInt(df1("x")))  .withColumnRenamed("x", "right_x")// create df2 via a left join on df1 and dfNotNull having val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")问题:当前的解决方案似乎很麻烦(并且可能无法有效地提高性能)。有没有更好的办法?@ Spark-developers:是否有NullableInt计划/可用的类型,以便可以使用以下udf(请参见代码摘录)?代码摘录val extractDateAsNullableInt = udf[NullableInt, String] (  (d:String) =>     if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt     else null )
查看完整描述

3 回答

?
守着星空守着你

TA贡献1799条经验 获得超8个赞

这是Option派上用场的地方:


val extractDateAsOptionInt = udf((d: String) => d match {

  case null => None

  case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)

})

或在一般情况下使其更加安全:


import scala.util.Try


val extractDateAsOptionInt = udf((d: String) => Try(

  d.substring(0, 10).filterNot("-".toSet).toInt

).toOption)

一切归功于德米特里Selivanov谁已经指出,这种解决方案为(失踪?)编辑这里。


另一种方法是null在UDF之外处理:


import org.apache.spark.sql.functions.{lit, when}

import org.apache.spark.sql.types.IntegerType


val extractDateAsInt = udf(

   (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt

)


df.withColumn("y",

  when($"x".isNull, lit(null))

    .otherwise(extractDateAsInt($"x"))

    .cast(IntegerType)

)


查看完整回答
反对 回复 2019-11-18
?
噜噜哒

TA贡献1784条经验 获得超7个赞

我创建了以下代码,以使用户定义的函数可用,以处理所述的空值。希望对别人有帮助!


/**

 * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that

 * handle `null` values.

 */

object NullableFunctions {


  import org.apache.spark.sql.functions._

  import scala.reflect.runtime.universe.{TypeTag}

  import org.apache.spark.sql.UserDefinedFunction


  /**

   * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that

   *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.

   *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.

   * @param f function from A1 => RT

   * @tparam RT return type

   * @tparam A1 input parameter type

   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above

   */

  def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {

    udf[Option[RT],A1]( (i: A1) => i match {

      case null => None

      case s => Some(f(i))

    })

  }


  /**

   * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that

   *   * if on of the function input parameters is null, None is returned.

   *     This will create a null value in the output Spark column.

   *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)

   *     as value in the output column.

   * @param f function from A1 => RT

   * @tparam RT return type

   * @tparam A1 input parameter type

   * @tparam A2 input parameter type

   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above

   */

  def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {

    udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {

      case (null, _) => None

      case (_, null) => None

      case (s1, s2) => Some((f(s1,s2)))

    } )

  }

}


查看完整回答
反对 回复 2019-11-18
  • 3 回答
  • 0 关注
  • 1261 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信