为了账号安全,请及时绑定邮箱和手机立即绑定

无法使用 Flink Table API 打印 CSV 文件

无法使用 Flink Table API 打印 CSV 文件

皈依舞 2021-06-08 17:01:11
我正在尝试使用 Netbeans 在控制台上读取一个包含 34 个字段的文件。但是,我所能打印的只是模式。因为在与 csvreader 一起使用的这个特定版本的 Flink 中缺少打印选项。请查看代码并帮助我了解应该更正的地方。我本来会使用CSVReader内置 API,但事实证明它不支持超过 22 个字段,因此求助于使用 Table API。还尝试使用CsvTableSource1.5.1 版 Flink,但语法不走运。由于.field("%CPU", Types.FLOAT())不断给出类型浮点数的错误无法识别的符号。我的主要目标是能够读取 CSV 文件然后发送到 Kafka 主题,但在此之前我想检查文件是否已读取,但还没有运气。import org.apache.flink.table.api.Table;import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.sources.CsvTableSource;import org.apache.flink.types.Row;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSink;import org.apache.flink.table.api.Types;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.table.sinks.CsvTableSink;import org.apache.flink.table.sinks.TableSink;import org.apache.flink.table.api.java.Slide;public class CsvReader {  public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",            new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",    "OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",    "io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",    "mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",    "mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",    "cpu_time_children_system", "container_nr_sleeping", "container_nr_running",    "container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
查看完整描述

2 回答

?
狐的传说

TA贡献1804条经验 获得超3个赞

这段代码:


package example.flink;


import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.sources.CsvTableSource;

import org.apache.flink.types.Row;


public class TestFlink {

    public static void main(String[] args) {


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);


        CsvTableSource csvTableSource = CsvTableSource

                .builder()

                .path("container_data.csv")

                .field("%CPU", Types.FLOAT)

                .field("MEM", Types.FLOAT)

                .field("VSZ", Types.FLOAT)

                .field("RSS", Types.FLOAT)

                .field("timestamp", Types.FLOAT)

                .field("OOM_Score", Types.FLOAT)

                .field("io_read_count", Types.FLOAT)

                .field("io_write_count", Types.FLOAT)

                .field("io_read_bytes", Types.FLOAT)

                .field("io_write_bytes", Types.FLOAT)

                .field("io_read_chars", Types.FLOAT)

                .field("io_write_chars", Types.FLOAT)

                .field("num_fds", Types.FLOAT)

                .field("num_ctx_switches_voluntary", Types.FLOAT)

                .field("num_ctx_switches_involuntary", Types.FLOAT)

                .field("mem_rss", Types.FLOAT)

                .field("mem_vms", Types.FLOAT)

                .field("mem_shared", Types.FLOAT)

                .field("mem_text", Types.FLOAT)

                .field("mem_lib", Types.FLOAT)

                .field("mem_data", Types.FLOAT)

                .field("mem_dirty", Types.FLOAT)

                .field("mem_uss", Types.FLOAT)

                .field("mem_pss", Types.FLOAT)

                .field("mem_swap", Types.FLOAT)

                .field("num_threads", Types.FLOAT)

                .field("cpu_time_user", Types.FLOAT)

                .field("cpu_time_system", Types.FLOAT)

                .field("cpu_time_children_user", Types.FLOAT)

                .field("cpu_time_children_system", Types.FLOAT)

                .field("container_nr_sleeping", Types.FLOAT)

                .field("container_nr_running", Types.FLOAT)

                .field("container_nr_stopped", Types.FLOAT)

                .field("container_nr_uninterruptible", Types.FLOAT)

                .field("container_nr_iowait", Types.FLOAT)

                .fieldDelimiter(",")

                .lineDelimiter("\n")

                .ignoreFirstLine()

                .ignoreParseErrors()

                .commentPrefix("%")

                .build();

        // name your table source

        tEnv.registerTableSource("container", csvTableSource);

        Table table = tEnv.scan("container");

        DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);

        // define the sink as common print on console here

        stream.print();

        try {

            env.execute();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

使用这些库(有些可能是多余的):


compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'

compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'

compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'

至少正在运行。我不确定它是否提供了您需要的输出,但它几乎与您在最新编辑中的输出完全相同,但它正在 IDE 中运行。这有帮助吗?


如果您的分隔符仍然是空格,请记住更改 .fieldDelimiter(",")


查看完整回答
反对 回复 2021-06-17
?
慕神8447489

TA贡献1780条经验 获得超1个赞

您必须将其转换Table为 aDataStream才能打印它。最简单的方法是将其转换为 a DataStream<Row>,如下所示:


DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);

// print the stream & execute the program

stream.print();

env.execute();

有关更多详细信息,请参阅文档


查看完整回答
反对 回复 2021-06-17
  • 2 回答
  • 0 关注
  • 276 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信