1 回答
TA贡献1794条经验 获得超8个赞
解决方案是创建第三个任务(在示例中Dynamic),它产生等待动态输入的任务,并使依赖项成为参数而不是requires方法。
class ToDatabase(luigi.Task):
fp = luigi.Parameter()
def output(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
client = pymongo.MongoClient('localhost', 27017, ssl=False)
return mongodb.MongoRangeTarget(client, 'myDB', 'myData',
valid_ids, 'myField')
def run(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
self.output().write({k: 5 for k in valid_ids})
class Dynamic(luigi.Task):
def output(self):
return self.input()
def requires(self):
return MakeIDs()
def run(self):
yield(AddToDatabase(fp=self.input().path))
添加回答
举报