JAVA: Spring Boot 集成 Batch 实现数据批处理

admin
5
2025-10-30

1、简述

Spring Batch 是一个用于批量处理数据的框架,它支持大规模数据的批量导入、处理和导出,可以与数据库、文件和消息队列集成,适用于数据迁移、数据分析、报表生成等场景。

image-m5vu.png


2、基本结构

Spring Batch 的核心概念包括:

🔹 Job:批处理任务的顶层容器,包含多个 Step。

🔹 Step:Job 的基本执行单元,包括读取(Reader)、处理(Processor)和写入(Writer)。

🔹 ItemReader:从数据源读取数据。

🔹 ItemProcessor:处理数据(如转换、过滤)。

🔹 ItemWriter:将处理后的数据写入目标存储。

一个完整的 Spring Batch 任务由Job → Step → Reader/Processor/Writer 组成,如下图所示:

Job
 ├── Step 1 (ItemReader → ItemProcessor → ItemWriter)
 ├── Step 2 (ItemReader → ItemProcessor → ItemWriter)
 ├── Step 3 (ItemReader → ItemProcessor → ItemWriter)
组件 说明 典型实现
Job 批处理作业顶层抽象 SimpleJob
Step 作业的独立处理阶段 TaskletStep, Chunk-Oriented Step
Item 单条数据处理单元 Reader/Processor/Writer

3、基础样例

3.1 创建 Spring Boot 项目并引入依赖

pom.xml 添加 Spring Batch 依赖:

<dependencies>
    <!-- Spring Boot Batch -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>

    <!-- mybatis-plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.19</version>
    </dependency>
    <!-- druid -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>

MySQL 数据库用于存储 Batch 任务的执行状态。

3.2 配置 Batch 数据源

application.yml 配置 MySQL 数据库:

spring:
  batch:
    initialize-schema: always
  datasource:
    druid:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://192.168.2.181:3306/shop_admin?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
      username: shop
      password: shop123
      initial-size: 5               # 初始化连接数
      min-idle: 5                   # 最小空闲连接数
      max-active: 20                # 最大连接数
      max-wait: 60000               # 获取连接等待超时时间
      time-between-eviction-runs-millis: 60000 # 检测空闲连接的间隔
      min-evictable-idle-time-millis: 300000  # 最小空闲时间
      validation-query: SELECT 1    # 验证连接有效性 SQL
      test-on-borrow: false         # 借用连接时是否测试
      test-on-return: false         # 归还连接时是否测试
      test-while-idle: true         # 空闲时测试连接
      stat-view-servlet:
        enabled: true
        login-username: admin      # Druid 监控页用户名
        login-password: admin123      # Druid 监控页密码
      web-stat-filter:
        enabled: true
        exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # 排除的 URL


3.3 定义数据模型

创建 Customer 实体类:



import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;

@Data
@TableName("customer")
public class Customer implements Serializable {

    public Customer (String name,  String email) {
        this.name =   name;
        this.email =  email;
    }

    @TableId(value = "id")
    private Long id;

    @TableField("name")
    private String name;

    @TableField("email")
    private String email;
}

3.4 实现 Reader、Processor、Writer

Reader - 自定义数据

import org.springframework.batch.item.ItemReader;

public class CustomerItemReader implements ItemReader<String> {

    private String[] data = {"1", "2", "3", "4", "5"};
    private int index = 0;
    @Override
    public String read() throws Exception {
        if (index < data.length) {
            return data[index++];
        }
        return null;  // 读取完成
    }
}

Processor - 处理数据

import org.springframework.batch.item.ItemProcessor;

public class CustomerItemProcessor implements ItemProcessor<String, String> {
    @Override
    public String process(String item) throws Exception {
        // 数据处理
        return item + ":数据处理";
    }
}

Writer - 写入控制台

import org.springframework.batch.item.ItemWriter;

import java.util.List;

public class CustomerItemWriter implements ItemWriter<String> {


    @Override
    public void write(List<? extends String> items) throws Exception {
        // 将数据输出到控制台,或者写入到文件/数据库
        for (String item : items) {
            System.out.println(item);
        }
    }
}

3.5 创建 Job 配置

import com.lm.batch.job.CustomerItemProcessor;
import com.lm.batch.job.CustomerItemReader;
import com.lm.batch.job.CustomerItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job importCustomerJob(Step step) {
        return jobBuilderFactory.get("importCustomerJob")
                .start(step)
                .build();
    }


    @Bean
    public Step step() {
        return stepBuilderFactory.get("step")
                .<String, String>chunk(10)
                .reader(new CustomerItemReader())
                .processor(new CustomerItemProcessor())
                .writer(new CustomerItemWriter())
                .build();
    }
}

3.6 运行 Batch 任务

创建 CommandLineRunner 触发任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class JobRunner implements CommandLineRunner {

    private final JobLauncher jobLauncher;
    private final Job importCustomerJob;

    public JobRunner(JobLauncher jobLauncher, Job importCustomerJob) {
        this.jobLauncher = jobLauncher;
        this.importCustomerJob = importCustomerJob;
    }

    @Override
    public void run(String... args) throws Exception {
        JobExecution execution = jobLauncher.run(importCustomerJob, new JobParameters());
        System.out.println("Job Status: " + execution.getStatus());
    }
}

4、文件处理实战

4.1 CSV文件读取处理

@Bean
public FlatFileItemReader<Customer> reader() {
    return new FlatFileItemReaderBuilder<Customer>()
            .name("customerItemReader")
            .resource(new ClassPathResource("customers.csv"))
            .linesToSkip(1)
            .skippedLinesCallback(line -> System.out.println("跳过行:" + line)) // 可选
            .delimited()
            .delimiter(",")
            .names(new String[]{"id", "name", "email"})
            .targetType(Customer.class)
            .build();
}

@Bean
public JdbcBatchItemWriter<Customer> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Customer>()
            .itemSqlParameterSourceProvider(
                    new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO customer (id, name, email) VALUES (:id, :name, :email)")
            .dataSource(dataSource)
            .build();
}

@Bean
public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .flow(step1)
            .end()
            .build();
}

@Bean
public Step step1(JdbcBatchItemWriter<Customer> writer) {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer> chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
}

@Bean
public ItemProcessor<Customer, Customer> processor() {
    return new ItemProcessor<Customer, Customer>() {
        @Override
        public Customer process(Customer customer) {
            // 业务逻辑:将名字转成大写
            customer.setName(customer.getName().toUpperCase());
            return customer;
        }
    };
}

5、数据库批处理

5.1 分页读取数据库

@Bean
public JdbcPagingItemReader<User> databaseReader(DataSource dataSource) {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ACTIVE");
  
    return new JdbcPagingItemReaderBuilder<User>()
        .name("databaseReader")
        .dataSource(dataSource)
        .queryProvider(queryProvider(dataSource))
        .parameterValues(parameterValues)
        .rowMapper(new BeanPropertyRowMapper<>(User.class))
        .pageSize(1000)
        .build();
}

@Bean
public PagingQueryProvider queryProvider(DataSource dataSource) throws Exception {
    SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
    provider.setDataSource(dataSource);
    provider.setSelectClause("SELECT id, name, email");
    provider.setFromClause("FROM users");
    provider.setWhereClause("WHERE status = :status");
    provider.setSortKey("id");
    return provider.getObject();
}

6、高级特性应用

6.1 并行处理

@Bean
public Step partitionedStep() {
    return stepBuilderFactory.get("partitionedStep")
        .partitioner("slaveStep", partitioner())
        .step(slaveStep())
        .taskExecutor(taskExecutor())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(20);
    return executor;
}

@Bean
public Partitioner partitioner() {
    return gridSize -> {
        Map<String, ExecutionContext> result = new HashMap<>();
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.put("partitionNumber", i);
            result.put("partition" + i, context);
        }
        return result;
    };
}

6.2 条件流程控制

@Bean
public Job conditionalJob() {
    return jobBuilderFactory.get("conditionalJob")
        .start(step1())
        .next(decision())
            .on("FAILED").to(failureStep())
            .from(decision())
            .on("*").to(successStep())
        .end()
        .build();
}

@Bean
public JobExecutionDecider decision() {
    return (jobExecution, stepExecution) -> {
        if (someCondition()) {
            return new FlowExecutionStatus("FAILED");
        }
        return FlowExecutionStatus.COMPLETED;
    };
}

7、监控与管理

7.1 作业监听器

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time: {}", jobExecution.getEndTime());
        
            jobExecution.getStepExecutions().forEach(stepExecution -> {
                log.info("Step {} processed {} items", 
                    stepExecution.getStepName(), 
                    stepExecution.getWriteCount());
            });
        }
    }
}

7.2 Spring Boot Actuator集成

# application.yml
management:
  endpoints:
    web:
      exposure:
        include: health,info,batchjobs
  endpoint:
    batchjobs:
      enabled: true

8、典型业务场景实现

8.1 月末结算批处理

@Bean
public Job monthlySettlementJob() {
    return jobBuilderFactory.get("monthlySettlementJob")
        .start(step1()) // 数据准备
        .next(step2())  // 计算利息
        .next(step3())  // 生成报表
        .next(step4())  // 发送通知
        .build();
}

@Bean
public Step step3() {
    return stepBuilderFactory.get("generateReport")
        .tasklet(new ReportGeneratorTasklet())
        .build();
}

public class ReportGeneratorTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, 
            ChunkContext chunkContext) {
        // 生成PDF报表
        generatePdfReport();
        // 上传到S3
        uploadToS3();
        return RepeatStatus.FINISHED;
    }
}

8.2 数据迁移作业

@Bean
public Job dataMigrationJob() {
    return jobBuilderFactory.get("dataMigrationJob")
        .start(
            stepBuilderFactory.get("migrateUserData")
                .<User, User>chunk(500)
                .reader(oldDbUserReader())
                .processor(userDataProcessor())
                .writer(newDbUserWriter())
                .build()
        )
        .next(
            stepBuilderFactory.get("migrateOrderData")
                .<Order, Order>chunk(500)
                .reader(oldDbOrderReader())
                .writer(newDbOrderWriter())
                .build()
        )
        .build();
}

9、性能优化技巧

9.1 批处理参数调优

@Bean
public Step optimizedStep() {
    return stepBuilderFactory.get("optimizedStep")
        .<User, User>chunk(200) // 适当增大chunk大小
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .throttleLimit(15)      // 控制并发度
        .taskExecutor(taskExecutor())
        .build();
}

9.2 异步ItemProcessor

@Bean
public AsyncItemProcessor<User, User> asyncItemProcessor() {
    AsyncItemProcessor<User, User> asyncProcessor = new AsyncItemProcessor<>();
    asyncProcessor.setDelegate(syncProcessor());
    asyncProcessor.setTaskExecutor(asyncTaskExecutor());
    return asyncProcessor;
}

@Bean
public AsyncItemWriter<User> asyncItemWriter() {
    AsyncItemWriter<User> asyncWriter = new AsyncItemWriter<>();
    asyncWriter.setDelegate(syncWriter());
    return asyncWriter;
}

@Bean
public Step asyncStep() {
    return stepBuilderFactory.get("asyncStep")
        .<User, Future<User>>chunk(100)
        .reader(reader())
        .processor(asyncItemProcessor())
        .writer(asyncItemWriter())
        .build();
}

10、总结

通过Spring Boot集成Spring Batch,开发者可以快速构建健壮的批处理应用。关键要点包括:

🔹 合理设计Job-Step结构:保持步骤职责单一

🔹 重视状态管理:利用JobRepository持久化执行状态

🔹 优化性能参数:调整chunk大小和并发度

🔹 完善错误处理:实现skip/retry机制

🔹 建立监控体系:集成Actuator和自定义监听器

样例代码:https://gitee.com/lhdxhl/springboot-example.git

建议在实际项目中:

🔹 对长时间运行的任务实现checkpoint机制

🔹 为关键作业添加邮件通知功能

🔹 定期清理历史作业数据

🔹 考虑与调度系统(如Quartz)集成

你可以根据业务需求扩展,如支持 XML/JSON 数据源、并行处理等。🚀

image.png

动物装饰