为了账号安全,请及时绑定邮箱和手机立即绑定

无法在beamSql中调用UDF

无法在beamSql中调用UDF

DIEA 2021-05-14 18:24:27
我在下面的beamSql查询中:PCollectionTuple query0 = PCollectionTuple.of(            new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)"));                 PCollection<BeamRecord> rec_3 = query0.apply(            BeamSql.queryMulti("SELECT a.*, \r\n" +                    "(case \r\n" +                    "when a.grp > 5 then 1 \r\n" +                    "when b.grp > 5 then 1 \r\n" +                    "else 0 end) as flag \r\n" +                    "from temp2 a left join \r\n" +                    "temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));在上面的查询中,我在表temp2和temp3之间进行左连接,在ON条件下,我使用名称'AddS'调用UDF。在此UDF AddS中,将Weekint用作BigInt。UDF将Weekint用作输入,并将其转换为日期格式,然后向其添加7,然后将值返回为BigInt。以下是UDF:public static class AddS implements BeamSqlUdf {                     private static final long serialVersionUID = 1L;     public static BigInteger eval(BigInteger input) throws ParseException{         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");         String strdate = input.toString();            Date date1 = dateFormat.parse(strdate);             Calendar c = Calendar.getInstance();               c.setTime(date1);             c.add(Calendar.DATE, 7);             String f =c.getTime().toString();             BigInteger x = new BigInteger(f);                   return (x);              }            } 我无法弄清楚是什么原因导致此错误,可能是UDF创建不正确或未正确调用?或如果有人可以向我解释此错误的原因。
查看完整描述

1 回答

?
青春有我

TA贡献1784条经验 获得超8个赞

您的UDF创建不正确。Beam SQL在内部不支持Java BigInteger类型。如果您的SQL数据类型为BigInt,则应改用Java Long类型。



查看完整回答
反对 回复 2021-05-19
  • 1 回答
  • 0 关注
  • 121 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信