1、简述
Spring Batch 是一个用于批量处理数据的框架,它支持大规模数据的批量导入、处理和导出,可以与数据库、文件和消息队列集成,适用于数据迁移、数据分析、报表生成等场景。

2、基本结构
Spring Batch 的核心概念包括:
🔹 Job:批处理任务的顶层容器,包含多个 Step。
🔹 Step:Job 的基本执行单元,包括读取(Reader)、处理(Processor)和写入(Writer)。
🔹 ItemReader:从数据源读取数据。
🔹 ItemProcessor:处理数据(如转换、过滤)。
🔹 ItemWriter:将处理后的数据写入目标存储。
一个完整的 Spring Batch 任务由Job → Step → Reader/Processor/Writer 组成,如下图所示:
Job
├── Step 1 (ItemReader → ItemProcessor → ItemWriter)
├── Step 2 (ItemReader → ItemProcessor → ItemWriter)
├── Step 3 (ItemReader → ItemProcessor → ItemWriter)
| 组件 | 说明 | 典型实现 |
|---|---|---|
| Job | 批处理作业顶层抽象 | SimpleJob |
| Step | 作业的独立处理阶段 | TaskletStep, Chunk-Oriented Step |
| Item | 单条数据处理单元 | Reader/Processor/Writer |
3、基础样例
3.1 创建 Spring Boot 项目并引入依赖
在 pom.xml 添加 Spring Batch 依赖:
<dependencies>
<!-- Spring Boot Batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>
MySQL 数据库用于存储 Batch 任务的执行状态。
3.2 配置 Batch 数据源
在 application.yml 配置 MySQL 数据库:
spring:
batch:
initialize-schema: always
datasource:
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.2.181:3306/shop_admin?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: shop
password: shop123
initial-size: 5 # 初始化连接数
min-idle: 5 # 最小空闲连接数
max-active: 20 # 最大连接数
max-wait: 60000 # 获取连接等待超时时间
time-between-eviction-runs-millis: 60000 # 检测空闲连接的间隔
min-evictable-idle-time-millis: 300000 # 最小空闲时间
validation-query: SELECT 1 # 验证连接有效性 SQL
test-on-borrow: false # 借用连接时是否测试
test-on-return: false # 归还连接时是否测试
test-while-idle: true # 空闲时测试连接
stat-view-servlet:
enabled: true
login-username: admin # Druid 监控页用户名
login-password: admin123 # Druid 监控页密码
web-stat-filter:
enabled: true
exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # 排除的 URL
3.3 定义数据模型
创建 Customer 实体类:
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
@Data
@TableName("customer")
public class Customer implements Serializable {
public Customer (String name, String email) {
this.name = name;
this.email = email;
}
@TableId(value = "id")
private Long id;
@TableField("name")
private String name;
@TableField("email")
private String email;
}
3.4 实现 Reader、Processor、Writer
✅ Reader - 自定义数据
import org.springframework.batch.item.ItemReader;
public class CustomerItemReader implements ItemReader<String> {
private String[] data = {"1", "2", "3", "4", "5"};
private int index = 0;
@Override
public String read() throws Exception {
if (index < data.length) {
return data[index++];
}
return null; // 读取完成
}
}
✅ Processor - 处理数据
import org.springframework.batch.item.ItemProcessor;
public class CustomerItemProcessor implements ItemProcessor<String, String> {
@Override
public String process(String item) throws Exception {
// 数据处理
return item + ":数据处理";
}
}
✅ Writer - 写入控制台
import org.springframework.batch.item.ItemWriter;
import java.util.List;
public class CustomerItemWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
// 将数据输出到控制台,或者写入到文件/数据库
for (String item : items) {
System.out.println(item);
}
}
}
3.5 创建 Job 配置
import com.lm.batch.job.CustomerItemProcessor;
import com.lm.batch.job.CustomerItemReader;
import com.lm.batch.job.CustomerItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importCustomerJob(Step step) {
return jobBuilderFactory.get("importCustomerJob")
.start(step)
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(10)
.reader(new CustomerItemReader())
.processor(new CustomerItemProcessor())
.writer(new CustomerItemWriter())
.build();
}
}
3.6 运行 Batch 任务
创建 CommandLineRunner 触发任务:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class JobRunner implements CommandLineRunner {
private final JobLauncher jobLauncher;
private final Job importCustomerJob;
public JobRunner(JobLauncher jobLauncher, Job importCustomerJob) {
this.jobLauncher = jobLauncher;
this.importCustomerJob = importCustomerJob;
}
@Override
public void run(String... args) throws Exception {
JobExecution execution = jobLauncher.run(importCustomerJob, new JobParameters());
System.out.println("Job Status: " + execution.getStatus());
}
}
4、文件处理实战
4.1 CSV文件读取处理
@Bean
public FlatFileItemReader<Customer> reader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.resource(new ClassPathResource("customers.csv"))
.linesToSkip(1)
.skippedLinesCallback(line -> System.out.println("跳过行:" + line)) // 可选
.delimited()
.delimiter(",")
.names(new String[]{"id", "name", "email"})
.targetType(Customer.class)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.itemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO customer (id, name, email) VALUES (:id, :name, :email)")
.dataSource(dataSource)
.build();
}
@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Customer> writer) {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(100)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public ItemProcessor<Customer, Customer> processor() {
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer customer) {
// 业务逻辑:将名字转成大写
customer.setName(customer.getName().toUpperCase());
return customer;
}
};
}
5、数据库批处理
5.1 分页读取数据库
@Bean
public JdbcPagingItemReader<User> databaseReader(DataSource dataSource) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "ACTIVE");
return new JdbcPagingItemReaderBuilder<User>()
.name("databaseReader")
.dataSource(dataSource)
.queryProvider(queryProvider(dataSource))
.parameterValues(parameterValues)
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.pageSize(1000)
.build();
}
@Bean
public PagingQueryProvider queryProvider(DataSource dataSource) throws Exception {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("SELECT id, name, email");
provider.setFromClause("FROM users");
provider.setWhereClause("WHERE status = :status");
provider.setSortKey("id");
return provider.getObject();
}
6、高级特性应用
6.1 并行处理
@Bean
public Step partitionedStep() {
return stepBuilderFactory.get("partitionedStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(20);
return executor;
}
@Bean
public Partitioner partitioner() {
return gridSize -> {
Map<String, ExecutionContext> result = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.put("partitionNumber", i);
result.put("partition" + i, context);
}
return result;
};
}
6.2 条件流程控制
@Bean
public Job conditionalJob() {
return jobBuilderFactory.get("conditionalJob")
.start(step1())
.next(decision())
.on("FAILED").to(failureStep())
.from(decision())
.on("*").to(successStep())
.end()
.build();
}
@Bean
public JobExecutionDecider decision() {
return (jobExecution, stepExecution) -> {
if (someCondition()) {
return new FlowExecutionStatus("FAILED");
}
return FlowExecutionStatus.COMPLETED;
};
}
7、监控与管理
7.1 作业监听器
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time: {}", jobExecution.getEndTime());
jobExecution.getStepExecutions().forEach(stepExecution -> {
log.info("Step {} processed {} items",
stepExecution.getStepName(),
stepExecution.getWriteCount());
});
}
}
}
7.2 Spring Boot Actuator集成
# application.yml
management:
endpoints:
web:
exposure:
include: health,info,batchjobs
endpoint:
batchjobs:
enabled: true
8、典型业务场景实现
8.1 月末结算批处理
@Bean
public Job monthlySettlementJob() {
return jobBuilderFactory.get("monthlySettlementJob")
.start(step1()) // 数据准备
.next(step2()) // 计算利息
.next(step3()) // 生成报表
.next(step4()) // 发送通知
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("generateReport")
.tasklet(new ReportGeneratorTasklet())
.build();
}
public class ReportGeneratorTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) {
// 生成PDF报表
generatePdfReport();
// 上传到S3
uploadToS3();
return RepeatStatus.FINISHED;
}
}
8.2 数据迁移作业
@Bean
public Job dataMigrationJob() {
return jobBuilderFactory.get("dataMigrationJob")
.start(
stepBuilderFactory.get("migrateUserData")
.<User, User>chunk(500)
.reader(oldDbUserReader())
.processor(userDataProcessor())
.writer(newDbUserWriter())
.build()
)
.next(
stepBuilderFactory.get("migrateOrderData")
.<Order, Order>chunk(500)
.reader(oldDbOrderReader())
.writer(newDbOrderWriter())
.build()
)
.build();
}
9、性能优化技巧
9.1 批处理参数调优
@Bean
public Step optimizedStep() {
return stepBuilderFactory.get("optimizedStep")
.<User, User>chunk(200) // 适当增大chunk大小
.reader(reader())
.processor(processor())
.writer(writer())
.throttleLimit(15) // 控制并发度
.taskExecutor(taskExecutor())
.build();
}
9.2 异步ItemProcessor
@Bean
public AsyncItemProcessor<User, User> asyncItemProcessor() {
AsyncItemProcessor<User, User> asyncProcessor = new AsyncItemProcessor<>();
asyncProcessor.setDelegate(syncProcessor());
asyncProcessor.setTaskExecutor(asyncTaskExecutor());
return asyncProcessor;
}
@Bean
public AsyncItemWriter<User> asyncItemWriter() {
AsyncItemWriter<User> asyncWriter = new AsyncItemWriter<>();
asyncWriter.setDelegate(syncWriter());
return asyncWriter;
}
@Bean
public Step asyncStep() {
return stepBuilderFactory.get("asyncStep")
.<User, Future<User>>chunk(100)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
10、总结
通过Spring Boot集成Spring Batch,开发者可以快速构建健壮的批处理应用。关键要点包括:
🔹 合理设计Job-Step结构:保持步骤职责单一
🔹 重视状态管理:利用JobRepository持久化执行状态
🔹 优化性能参数:调整chunk大小和并发度
🔹 完善错误处理:实现skip/retry机制
🔹 建立监控体系:集成Actuator和自定义监听器
样例代码:https://gitee.com/lhdxhl/springboot-example.git
建议在实际项目中:
🔹 对长时间运行的任务实现checkpoint机制
🔹 为关键作业添加邮件通知功能
🔹 定期清理历史作业数据
🔹 考虑与调度系统(如Quartz)集成
你可以根据业务需求扩展,如支持 XML/JSON 数据源、并行处理等。🚀
