我有 apache 光束管道,我使用 pubsub 从输入文件中获取一些文本,然后我正在做一些转换,我正在获取句子和分数,但我的作者过度写入结果而不是附加,我想知道有没有为beam.filesystems 添加模块?from __future__ import absolute_importimport argparseimport loggingfrom datetime import datetimefrom past.builtins import unicodeimport jsonfrom google.cloud import languagefrom google.cloud.language import enumsfrom google.cloud.language import typesimport apache_beam as beamimport apache_beam.transforms.window as windowfrom apache_beam.io.filesystems import FileSystemsfrom apache_beam.io.gcp.pubsub import WriteToPubSubfrom apache_beam.examples.wordcount import WordExtractingDoFnfrom apache_beam.options.pipeline_options import PipelineOptionsfrom apache_beam.options.pipeline_options import SetupOptionsfrom apache_beam.options.pipeline_options import StandardOptionsfrom apache_beam.io.textio import ReadFromText, WriteToTextdef run(argv=None): """Build and run the pipeline.""" parser = argparse.ArgumentParser() parser.add_argument( '--output', dest='output', required=True, help='GCS destination folder to save the images to (example: gs://BUCKET_NAME/path') group = parser.add_mutually_exclusive_group(required=True) group.add_argument( '--input_topic', help=('Input PubSub topic of the form ' '"projects<project name>/subscriptions/<topic name>".')) group.add_argument( '--input_subscription', help=('Input PubSub subscription of the form ' '"projects<project name>/subscriptions/<subsciption name>."')) known_args, pipeline_args = parser.parse_known_args(argv) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True p = beam.Pipeline(options=pipeline_options)我只是明白这个:<sentence n> <score>我需要一些小的修复,我被卡住了请帮助我。
1 回答

白板的微信
TA贡献1883条经验 获得超3个赞
为此,您可以尝试使用beam.io.textio.WriteToText:
messages = (p | "Read From PubSub" >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
| "Write to GCS" >> beam.io.WriteToText('gs://<your_bucket>/<your_file>', file_name_suffix='.txt',append_trailing_newlines=True,shard_name_template=''))
当您完成流式传输作业时,这将为您提供一个文件作为输出。
希望能帮助到你!
添加回答
举报
0/150
提交
取消