为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

一只名叫tom的猫 2021-05-30 14:54:57
我正在尝试从GCS存储桶中读取XML文件的集合,并对其进行处理,其中集合中的每个元素都是代表整个文件的字符串,但是我找不到如何实现此目的的示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本。我当前的管道如下所示:p = beam.Pipeline(options=PipelineOptions(pipeline_args))(p | 'Read from a File' >> beam.io.Read(training_files_folder) | 'String To BigQuery Row' >> beam.Map(lambda s:                                        data_ingestion.parse_method(s)) | 'Write to BigQuery' >> beam.io.Write(            beam.io.BigQuerySink(                known_args.output,                schema='title:STRING,text:STRING,id:STRING',                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))p.run().wait_until_finish()解决了第一个问题:事实证明这不适用于DirectRunner,将运行器更改为DataFlowRunner并将Read替换为ReadFromText可以解决此异常:p = beam.Pipeline(options=PipelineOptions(pipeline_args))(p | 'Read from a File' >> beam.io.ReadFromText(training_files_folder) | 'String To BigQuery Row' >> beam.Map(lambda s:                                        data_ingestion.parse_method(s)) | 'Write to BigQuery' >> beam.io.Write(            beam.io.BigQuerySink(                known_args.output,                schema='title:STRING,text:STRING,id:STRING',                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))p.run().wait_until_finish() 但是现在我看到这种方法给了我每个文件中的一行作为管道元素,而我希望将整个文件作为一个字符串作为每个元素。不知道该怎么做。我找到了这篇文章,但它是用 java 编写的,完全不知道它是如何与 python 和 gcs 版本一起工作的。所以看起来 ReadFromText 对我的用例不起作用,我不知道如何创建文件管道。解决方案:在Ankur的帮助下,我修改了代码,以包括从MatchResult对象列表转换所需的步骤,这是GCSFileSystem返回到字符串pCollection的内容,每个字符串代表一个文件。p = beam.Pipeline(options=PipelineOptions(pipeline_args))gcs = GCSFileSystem(PipelineOptions(pipeline_args))gcs_reader = GCSFileReader(gcs)
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 161 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信