为了账号安全,请及时绑定邮箱和手机立即绑定

Springboot批量读取/验证多个不同格式的csv文件

Springboot批量读取/验证多个不同格式的csv文件

HUH函数 2021-08-19 16:16:27
我正在评估特定项目的 Spring Batch,在网上进行了大量搜索后,我一直无法找到满足我要求的 Spring Batch 解决方案。我想知道 spring 批处理是否能够在单个作业中读取由不同格式组成的多个 CSV 文件?例如,假设 Person.csv 和 Address.csv,两者都由不同的格式组成,但相互依赖我需要阅读、处理数据更正(即 toUpperCase 等),并验证每条记录。在验证错误的情况下,我需要将错误记录到某种对象数组中,在验证完成后,它将在稍后提供,以通过电子邮件发送给最终用户进行更正。一旦验证了两个文件中的所有数据并且任一文件中均未发生验证错误,请继续执行批处理编写器。如果这两个文件中的任何一个发生任何错误,我都需要停止整个作业。如果在发生错误时写入器已经开始写入数据库,则无论错误是否存在于相反文件中,都需要回滚整个作业。如果其中任何一个存在任何类型的验证错误,我都无法插入两个 CSV 文件中的任何一个。必须将错误通知最终用户。这些错误将用于在重新处理文件之前进行任何必要的更正。SpringBoot 2 中的 Spring 批处理是否能够实现这种行为?例子人物.csvBatchId, personId, firstName, lastName地址.csvBatchId, personId, address1在上面的例子中,两个文件之间的关系是batchId 和personId。如果两个文件中的任何一个中存在任何类型的验证错误,我必须使整个批处理失败。我想完成对两个文件的验证,以便我可以响应所有错误,但只是不写入数据库。
查看完整描述

1 回答

?
临摹微笑

TA贡献1982条经验 获得超2个赞

我想知道 spring 批处理是否能够在单个作业中读取由不同格式组成的多个 CSV 文件?


是的,您可以有一个包含多个步骤的作业,每个步骤处理一个给定类型的文件。关键是如何设计工作。您可以应用的一种技术是使用临时表。批处理作业可以创建临时临时表,在其中加载所需的所有数据,然后在完成后将其删除。


在您的情况下,您可以通过两个步骤将每个文件加载到特定的暂存表中。每个步骤都可以应用特定于每个文件的验证逻辑。如果这些步骤之一失败,您的工作就会失败。临时表可以有一个用于无效记录的标记列(这对报告很有用)。


完成这两个准备步骤后,您可以在另一个步骤中从两个临时表中读取数据,并对连接的数据应用交叉验证规则(例如,从两个表中选择并通过BatchId和连接PersonId)。如果此步骤失败,则作业失败。否则,您将在适当的地方写入数据。


这种技术的优点是数据在整个作业期间都可以在临时表中使用。因此,只要验证步骤失败,您就可以使用流将失败的步骤重定向到“报告步骤”(读取无效数据并发送报告),然后使作业失败。这是您可以使用的自包含示例:


import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobParameters;

import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.batch.repeat.RepeatStatus;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationContext;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;


@Configuration

@EnableBatchProcessing

public class FlowJobSample {


    @Autowired

    private JobBuilderFactory jobs;


    @Autowired

    private StepBuilderFactory steps;


    @Bean

    public Step personLoadingStep() {

        return steps.get("personLoadingStep")

                .tasklet((contribution, chunkContext) -> {

                    System.out.println("personLoadingStep");

                    return RepeatStatus.FINISHED;

                })

                .build();

    }


    @Bean

    public Step addressLoadingStep() {

        return steps.get("addressLoadingStep")

                .tasklet((contribution, chunkContext) -> {

                    System.out.println("addressLoadingStep");

                    return RepeatStatus.FINISHED;

                })

                .build();

    }


    @Bean

    public Step crossValidationStep() {

        return steps.get("crossValidationStep")

                .tasklet((contribution, chunkContext) -> {

                    System.out.println("crossValidationStep");

                    return RepeatStatus.FINISHED;

                })

                .build();

    }


    @Bean

    public Step reportingStep() {

        return steps.get("reportingStep")

                .tasklet((contribution, chunkContext) -> {

                    System.out.println("reportingStep");

                    return RepeatStatus.FINISHED;

                })

                .build();

    }


    @Bean

    public Job job() {

        return jobs.get("job")

                .start(personLoadingStep()).on("INVALID").to(reportingStep())

                    .from(personLoadingStep()).on("*").to(addressLoadingStep())

                    .from(addressLoadingStep()).on("INVALID").to(reportingStep())

                    .from(addressLoadingStep()).on("*").to(crossValidationStep())

                    .from(crossValidationStep()).on("INVALID").to(reportingStep())

                    .from(crossValidationStep()).on("*").end()

                    .from(reportingStep()).on("*").fail()

                    .build()

                .build();

    }


    public static void main(String[] args) throws Exception {

        ApplicationContext context = new AnnotationConfigApplicationContext(FlowJobSample.class);

        JobLauncher jobLauncher = context.getBean(JobLauncher.class);

        Job job = context.getBean(Job.class);

        jobLauncher.run(job, new JobParameters());

    }


}

要使其中一个步骤失败,请将退出状态设置为INVALID,例如:


@Bean

public Step personLoadingStep() {

    return steps.get("personLoadingStep")

            .tasklet((contribution, chunkContext) -> {

                System.out.println("personLoadingStep");

                chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID"));

                return RepeatStatus.FINISHED;

            })

            .build();

}

我希望这有帮助。


查看完整回答
反对 回复 2021-08-19
  • 1 回答
  • 0 关注
  • 317 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信