2 回答
TA贡献1772条经验 获得超8个赞
如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。也许同步是合适的。
假设使用 Flink 有意义(例如,因为您想要对数据执行一些有状态转换),则所有任务管理器(工作人员)都可以使用相同的 URI 访问要处理的文件。为此,您可以使用 file:// URI。
您可以执行以下操作来监视目录并在新文件出现时摄取它们:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
请注意文档中的此警告:
如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则修改文件时,将完全重新处理其内容。这可能会破坏“仅一次”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
这意味着您应该自动将准备好摄取的文件移动到正在监视的文件夹中。
您可以使用流文件接收器写入S3。Flink 的写入操作(例如 )writeUsingOutputFormat()
不参与检查点,因此在这种情况下这不是一个好的选择。
TA贡献1829条经验 获得超13个赞
此问题的完整工作代码位于以下链接中。您需要启用检查点以将 .inprogress 文件移动到实际文件
// 每 1000 毫秒启动一个检查点 env.enableCheckpointing(1000);
StreamingFileSink 未将数据提取到 s3
添加回答
举报