为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 Python 的 Apache-Beam DataFlow 中组合解析的文本文件?

如何在 Python 的 Apache-Beam DataFlow 中组合解析的文本文件?

catspeake 2022-07-19 20:21:17
这在 DirectRunner 中似乎工作正常,但是当我切换到 DataflowRunner 时出错。我基本上需要以某种方式组合读入的文件,但是一旦我beam.combiners.ToList()用来连接我的数据,它就会引入一大堆问题。代码示例:def convert_to_dataframe(readable_file):    yield pd.read_csv(io.TextIOWrapper(readable_file.open()))class merge_dataframes(beam.DoFn):    def process(self, element):        yield pd.concat(element).reset_index(drop=True)    with beam.Pipeline(options=pipeline_options) as p:        (p            | 'Match Files From GCS' >> beam.io.fileio.MatchFiles(raw_data_path)            | 'Read Files' >> beam.io.fileio.ReadMatches()            | 'Shuffle' >> beam.Reshuffle()            | 'Create DataFrames' >> beam.FlatMap(convert_to_dataframe)            | 'Combine To List' >> beam.combiners.ToList()            | 'Merge DataFrames' >> beam.ParDo(merge_dataframes())            | 'Apply Transformations' >> beam.ParDo(ApplyPipeline(creds_path=args.creds_path,                                                                  project_name=args.project_name,                                                                  feature_group_name=args.feature_group_name                                                                  ))            | 'Write To GCS' >> beam.io.WriteToText(feature_data_path,                                                    file_name_suffix='.csv',                                                    shard_name_template='')         )错误:"No objects to concatenate [while running 'Merge DataFrames']" 我不明白这个错误,因为执行“组合到列表”的部分应该生成一个数据帧列表,然后将其传递到“合并数据帧”步骤中,当我使用 DirectRunner 时确实是这种情况。
查看完整描述

1 回答

?
jeck猫

TA贡献1909条经验 获得超7个赞

鉴于这个错误,我怀疑它MatchFiles实际上没有匹配任何东西(例如,由于文件模式错误),因此,输出beam.combiners.ToList是一个空列表。



查看完整回答
反对 回复 2022-07-19
  • 1 回答
  • 0 关注
  • 72 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信