为了账号安全,请及时绑定邮箱和手机立即绑定

NFS(Netapp 服务器)-> Flink -> s3

NFS(Netapp 服务器)-> Flink -> s3

Helenr 2023-07-13 18:14:47
我是 flink (java) 的新手,并尝试将作为文件路径安装的 netapp 文件服务器上的 xml 文件移动到安装了 flink 的服务器上。如何实时进行批处理或流处理以获取进入文件夹的文件并使用 s3 接收它。我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来监听文件夹和管理检查点/保存点?
查看完整描述

2 回答

?
料青山看我应如是

TA贡献1772条经验 获得超8个赞

如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。也许同步是合适的。

假设使用 Flink 有意义(例如,因为您想要对数据执行一些有状态转换),则所有任务管理器(工作人员)都可以使用相同的 URI 访问要处理的文件。为此,您可以使用 file:// URI。

您可以执行以下操作来监视目录并在新文件出现时摄取它们:

StreamExecutionEnvironment env =    

  StreamExecutionEnvironment.getExecutionEnvironment();


// monitor directory, checking for new files

// every 100 milliseconds


TextInputFormat format = new TextInputFormat(

  new org.apache.flink.core.fs.Path("file:///tmp/dir/"));


DataStream<String> inputStream = env.readFile(

  format, 

  "file:///tmp/dir/",

  FileProcessingMode.PROCESS_CONTINUOUSLY, 

  100, 

  FilePathFilter.createDefaultFilter());

请注意文档中的此警告:

如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则修改文件时,将完全重新处理其内容。这可能会破坏“仅一次”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

这意味着您应该自动将准备好摄取的文件移动到正在监视的文件夹中。

您可以使用流文件接收器写入S3。Flink 的写入操作(例如 )writeUsingOutputFormat()不参与检查点,因此在这种情况下这不是一个好的选择。


查看完整回答
反对 回复 2023-07-13
?
烙印99

TA贡献1829条经验 获得超13个赞

此问题的完整工作代码位于以下链接中。您需要启用检查点以将 .inprogress 文件移动到实际文件

// 每 1000 毫秒启动一个检查点 env.enableCheckpointing(1000);

StreamingFileSink 未将数据提取到 s3


查看完整回答
反对 回复 2023-07-13
  • 2 回答
  • 0 关注
  • 150 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信