2 回答
TA贡献1111条经验 获得超0个赞
TextIO
逐行读取文件。因此,在您的 test.json 中,每一行都需要包含一个单独的 Json 对象。
光束或任何分布式处理引擎的想法是能够并行化输入数据。从您的问题来看,似乎需要进行一些预处理才能将它们拆分为多个 json。请注意,它不必位于单个文件中,您可以拥有多个文件,每个文件都包含任意数量的 json 文件。Beam 将并行读取行。
如果有帮助,请接受答案。
TA贡献1871条经验 获得超8个赞
希望对从文件读取的对象进行并行化是一个合理的用例。
import apache_beam as beam
from apache_beam.io import fileio
import json
# Make some fake data
for i in range(0,10):
with open(f'/tmp/data{i}.json', 'w') as f:
json.dump({'somethinig':i,'otherthing':[1,2,3]}, f)
filenames = [f'/tmp/data{i}.json' for i in range(0,10)]
with beam.Pipeline() as pipeline:
lines = (
pipeline
| beam.Create(filenames)
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda file: print(file.read_utf8()))
)
添加回答
举报