为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Flink:由于类型擦除,无法自动确定函数的返回类型

Apache Flink:由于类型擦除,无法自动确定函数的返回类型

12345678_0001 2021-04-05 17:14:17
我使用Java中的Flink编写了一个简单程序,该程序将文件或文本作为输入,然后使用flatMap函数打印所有单词。这是我的代码:        final ParameterTool params = ParameterTool.fromArgs(args);        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.getConfig().setGlobalJobParameters(params);        // show user defined parameters in the apache flink dashboard        DataStream<String> dataStream;        if(params.has("input"))         {            System.out.println("Executing Words example with file input");            dataStream = env.readTextFile(params.get("input"));        }else if (params.has("host") && params.has("port"))         {            System.out.println("Executing Words example with socket stream");            dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));        }        else {            System.exit(1);            return;        }        DataStream<String> wordDataStream = dataStream.flatMap(                (String sentence, Collector<String> out) -> {                    for(String word: sentence.split(" "))                        out.collect(word);        });        wordDataStream.print();        env.execute("Word Split");  但是当我使用以下命令运行它时:bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999我收到以下错误:该程序失败,但有以下异常:由于类型擦除,无法自动确定函数“ main(Words.java:32)”的返回类型。您可以通过在转换调用的结果上使用return(...)方法或通过让函数实现“ ResultTypeQueryable”接口来提供类型信息提示。(第32行指的是第二个DataStream的声明)
查看完整描述

2 回答

?
PIPIONE

TA贡献1829条经验 获得超9个赞

我认为错误消息的简短描述非常好,但让我对其进行扩展。


为了执行程序,Flink需要知道要处理的值的类型,因为它需要序列化和反序列化它们。Flink的类型系统基于TypeInformation它来描述数据类型。当您指定一个函数时,Flink会尝试推断该函数的返回类型。如果您的示例使用FlatMapFunction,则传递给的对象的类型Collector。


不幸的是,某些Lambda函数由于类型擦除而丢失了此信息,因此Flink无法自动推断类型。因此,您必须显式声明返回类型。


您可以提供TypeInformation,如下所示:


DataStream<String> wordDataStream = dataStream.flatMap(

    (String sentence, Collector<String> out) -> {

        for(String word: sentence.split(" "))

        out.collect(word); // collect objects of type String

    }

).returns(Types.STRING); // declare return type of flatmap lambda function as String


查看完整回答
反对 回复 2021-04-18
  • 2 回答
  • 0 关注
  • 354 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信