批处理作业,批处理步骤。作业由步骤构成,每个步骤可以涉及读取器、处理器和写入器。

引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>runtime</scope>
</dependency>
添加实体类
/**
* Person 实体类
*
* @param firstName 名字
* @param lastName 姓氏
*/
public record Person(String firstName, String lastName) {
}
添加Person 数据处理器
/**
* Person 数据处理器
* 实现 ItemProcessor 接口,在批处理中对读取的数据进行转换/处理
*/
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
/**
* 处理单条数据记录
*/
@Override
public Person process(final Person person) {
// 将姓名转换为大写
final String firstName = person.firstName().toUpperCase();
final String lastName = person.lastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("转换 ({}) 为 ({})", person, transformedPerson);
return transformedPerson;
}
}
添加Spring Batch 配置类
/**
* Spring Batch 配置类
* 用于配置批处理作业的各个组件:Reader、Processor、Writer、Step 和 Job
*/
@Configuration
public class BatchConfiguration {
/**
* 读取器
* 配置 ItemReader - 从 CSV 文件读取数据
*/
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader") // 设置读取器名称,用于日志和监控
.resource(new ClassPathResource("sample-data.csv")) // 指定 CSV 文件位置(类路径下)
.delimited() // 使用分隔符格式(默认逗号)
.names("firstName", "lastName") // CSV 列名,对应 Person 实体的属性
.targetType(Person.class) // 目标类型,将每行数据映射为 Person 对象
.build();
}
/**
* 处理器
* 配置 ItemProcessor - 数据转换/处理
*/
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
/**
* 写入器
* 配置 ItemWriter - 将数据写入数据库
*/
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
/**
* 批处理作业
* Job: 是批处理的最顶层概念,包含一个或多个 Step
* JobBuilder: 用于构建 Job 对象的构建器
*/
@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository) // 创建作业,指定作业名称和仓库
.listener(listener) // 注册监听器,监听作业生命周期事件
.start(step1) // 设置起始步骤
.build();
}
/**
* 批处理步骤
* Step: 作业的执行单元,包含 ItemReader、ItemProcessor、ItemWriter
* StepBuilder: 用于构建 Step 对象的构建器
*/
@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository) // 创建步骤,指定步骤名称和仓库
.<Person, Person>chunk(3) // 块大小为 3:每读取/处理 3 条记录后执行一次写入操作
.transactionManager(transactionManager) // 设置事务管理器,确保数据一致性
.reader(reader) // 设置数据读取器
.processor(processor) // 设置数据处理器
.writer(writer) // 设置数据写入器
.build();
}
}
作业完成监听器
/**
* 作业完成监听器
* 实现 JobExecutionListener 接口,用于监听批处理作业的生命周期事件
* 在作业完成后执行回调操作(如验证结果、发送通知等)
*/
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
/**
* 作业执行后的回调方法
* 当作业执行完成(无论成功或失败)后自动调用
*
* @param jobExecution 作业执行对象,包含作业执行状态、参数等信息
*/
@Override
public void afterJob(JobExecution jobExecution) {
// 仅当作业成功完成时才验证结果
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! 任务完成! 验证数据库结果:");
// 查询数据库中的所有 Person 记录并打印
jdbcTemplate
.query("SELECT first_name, last_name FROM people",
new DataClassRowMapper<>(Person.class))
.forEach(person -> log.info("数据库记录: <{}>", person));
}
}
}
启动类
/**
* Spring Boot 应用启动类
* 这是一个 Spring Batch 批处理应用程序的入口点
*/
@SpringBootApplication // 标识这是一个 Spring Boot 应用,自动配置组件扫描、自动配置等
public class App {
/**
* 应用程序主入口方法
* 启动 Spring Boot 应用,并执行批处理作业
*
* @param args 命令行参数
*/
public static void main(String[] args) {
// SpringApplication.run: 启动 Spring Boot 应用,返回 ApplicationContext
// SpringApplication.exit: 优雅地关闭应用并返回退出码
System.exit(SpringApplication.exit(
SpringApplication.run(App.class, args)));
}
}