批处理作业批处理步骤。作业由步骤构成,每个步骤可以涉及读取器处理器写入器

1 项目结构

image.png

2 示例demo

schema-all.sql

sample-data.csv

引入依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<dependency>
  <groupId>org.hsqldb</groupId>
  <artifactId>hsqldb</artifactId>
  <scope>runtime</scope>
</dependency>

添加实体类

/**
 * Person 实体类
 *
 * @param firstName 名字
 * @param lastName 姓氏
 */
public record Person(String firstName, String lastName) {
}

添加Person 数据处理器

/**
 * Person 数据处理器
 * 实现 ItemProcessor 接口,在批处理中对读取的数据进行转换/处理
 */
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    /**
     * 处理单条数据记录
     */
    @Override
    public Person process(final Person person) {
        // 将姓名转换为大写
        final String firstName = person.firstName().toUpperCase();
        final String lastName = person.lastName().toUpperCase();
        final Person transformedPerson = new Person(firstName, lastName);

        log.info("转换 ({}) 为 ({})", person, transformedPerson);
        return transformedPerson;
    }
}

添加Spring Batch 配置类

/**
 * Spring Batch 配置类
 * 用于配置批处理作业的各个组件:Reader、Processor、Writer、Step 和 Job
 */
@Configuration
public class BatchConfiguration {

    /**
     * 读取器
     * 配置 ItemReader - 从 CSV 文件读取数据
     */
    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")                    // 设置读取器名称,用于日志和监控
            .resource(new ClassPathResource("sample-data.csv"))  // 指定 CSV 文件位置(类路径下)
            .delimited()                                  // 使用分隔符格式(默认逗号)
            .names("firstName", "lastName")              // CSV 列名,对应 Person 实体的属性
            .targetType(Person.class)                    // 目标类型,将每行数据映射为 Person 对象
            .build();
    }

    /**
     * 处理器
     * 配置 ItemProcessor - 数据转换/处理
     */
    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    /**
     * 写入器
     * 配置 ItemWriter - 将数据写入数据库
     */
    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
            .dataSource(dataSource)
            .beanMapped()
            .build();
    }

    /**
     * 批处理作业
     * Job: 是批处理的最顶层概念,包含一个或多个 Step
     * JobBuilder: 用于构建 Job 对象的构建器
     */
    @Bean
    public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
        return new JobBuilder("importUserJob", jobRepository)  // 创建作业,指定作业名称和仓库
                .listener(listener)                    // 注册监听器,监听作业生命周期事件
                .start(step1)                          // 设置起始步骤
                .build();
    }

    /**
     * 批处理步骤
     * Step: 作业的执行单元,包含 ItemReader、ItemProcessor、ItemWriter
     * StepBuilder: 用于构建 Step 对象的构建器
     */
    @Bean
    public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
                      FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
        return new StepBuilder("step1", jobRepository)    // 创建步骤,指定步骤名称和仓库
                .<Person, Person>chunk(3)               // 块大小为 3:每读取/处理 3 条记录后执行一次写入操作
                .transactionManager(transactionManager) // 设置事务管理器,确保数据一致性
                .reader(reader)                          // 设置数据读取器
                .processor(processor)                    // 设置数据处理器
                .writer(writer)                          // 设置数据写入器
                .build();
    }
}

作业完成监听器

/**
 * 作业完成监听器
 * 实现 JobExecutionListener 接口,用于监听批处理作业的生命周期事件
 * 在作业完成后执行回调操作(如验证结果、发送通知等)
 */
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * 作业执行后的回调方法
     * 当作业执行完成(无论成功或失败)后自动调用
     *
     * @param jobExecution 作业执行对象,包含作业执行状态、参数等信息
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        // 仅当作业成功完成时才验证结果
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! 任务完成! 验证数据库结果:");

            // 查询数据库中的所有 Person 记录并打印
            jdbcTemplate
                    .query("SELECT first_name, last_name FROM people",
                            new DataClassRowMapper<>(Person.class))
                    .forEach(person -> log.info("数据库记录: <{}>", person));
        }
    }
}

启动类

/**
 * Spring Boot 应用启动类
 * 这是一个 Spring Batch 批处理应用程序的入口点
 */
@SpringBootApplication  // 标识这是一个 Spring Boot 应用,自动配置组件扫描、自动配置等
public class App {

    /**
     * 应用程序主入口方法
     * 启动 Spring Boot 应用,并执行批处理作业
     *
     * @param args 命令行参数
     */
    public static void main(String[] args) {
        // SpringApplication.run: 启动 Spring Boot 应用,返回 ApplicationContext
        // SpringApplication.exit: 优雅地关闭应用并返回退出码
        System.exit(SpringApplication.exit(
                SpringApplication.run(App.class, args)));
    }
}