1 回答
TA贡献1982条经验 获得超2个赞
我想知道 spring 批处理是否能够在单个作业中读取由不同格式组成的多个 CSV 文件?
是的,您可以有一个包含多个步骤的作业,每个步骤处理一个给定类型的文件。关键是如何设计工作。您可以应用的一种技术是使用临时表。批处理作业可以创建临时临时表,在其中加载所需的所有数据,然后在完成后将其删除。
在您的情况下,您可以通过两个步骤将每个文件加载到特定的暂存表中。每个步骤都可以应用特定于每个文件的验证逻辑。如果这些步骤之一失败,您的工作就会失败。临时表可以有一个用于无效记录的标记列(这对报告很有用)。
完成这两个准备步骤后,您可以在另一个步骤中从两个临时表中读取数据,并对连接的数据应用交叉验证规则(例如,从两个表中选择并通过BatchId和连接PersonId)。如果此步骤失败,则作业失败。否则,您将在适当的地方写入数据。
这种技术的优点是数据在整个作业期间都可以在临时表中使用。因此,只要验证步骤失败,您就可以使用流将失败的步骤重定向到“报告步骤”(读取无效数据并发送报告),然后使作业失败。这是您可以使用的自包含示例:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class FlowJobSample {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Step personLoadingStep() {
return steps.get("personLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("personLoadingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step addressLoadingStep() {
return steps.get("addressLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("addressLoadingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step crossValidationStep() {
return steps.get("crossValidationStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("crossValidationStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step reportingStep() {
return steps.get("reportingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("reportingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(personLoadingStep()).on("INVALID").to(reportingStep())
.from(personLoadingStep()).on("*").to(addressLoadingStep())
.from(addressLoadingStep()).on("INVALID").to(reportingStep())
.from(addressLoadingStep()).on("*").to(crossValidationStep())
.from(crossValidationStep()).on("INVALID").to(reportingStep())
.from(crossValidationStep()).on("*").end()
.from(reportingStep()).on("*").fail()
.build()
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(FlowJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
要使其中一个步骤失败,请将退出状态设置为INVALID,例如:
@Bean
public Step personLoadingStep() {
return steps.get("personLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("personLoadingStep");
chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID"));
return RepeatStatus.FINISHED;
})
.build();
}
我希望这有帮助。
添加回答
举报