스프링 배치 5.0 예제로 알아보기
먼저 필자는 스프링 배치에 대해 잘 모르는데 5.0 예제를 간단하게 사용해보고 정리함.
스프링 배치란
Spring Batch는 일괄 처리를 위한 오픈 소스 프레임워크 입니다.
현대 기업 시스템에서 자주 볼 수 있는 강력한 배치 응용 프로그램을 개발할 수 있도록 설계된 가볍고 포괄적인 솔루션 입니다. - 위키피디아
전체적인 개념은 adjh54 - [Java] Spring Boot Batch 이해하고 설정하기님의 블로그 글을 읽고 이해하려고 했다.
https://docs.spring.io/spring-batch/reference/job.html 여기 들어가 보면

Job이라는 가장 큰 단위의 작업이 있고 그 안에 Step이 있고
Step에는 Reader 데이터를 읽는 작업, Proccessor 데이터를 처리하는 작업, Writer 처리한 데이터를 출력(저장)하는 단계로 구성되어 있다.
프로젝트 생성
먼저 이번에 실습 해본 프로젝트는 링크를 통해서 확인할 수 있다.

- 프로젝트 선택
- Project : Gradle - Groovy
- Language : Java
- Spring Boot : 3.3.1
- Project Metadata
- Group : hello
- Artifact : example
- Name : example
- Package name : hello.example
- Packaging : Jar
- Java : 21
- Dependencies: Spring Batch, Lombok, HyperSQL Database
프로젝트를 생성하고 jdk 설정 등 세팅이 끝났다고 가정.
resources 폴더에 먼저 샘플 파일들을 몇 개 생성하겠다.
sample-data.csv
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
schema-all.sql
1
2
3
4
5
6
7
DROP TABLE people IF EXISTS;
CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
Spring Boot는
schema-@@platform@@.sql파일을 시작할 때 자동으로 실행해 버린다.all은 모든 플랫폼의 기본 값.
코드 구현
이제 코드를 구현해 보자.
모든 코드는hello.example.batchprocessing 해당 패키지에서 진행한다.

도메인 클래스 만들기
1
2
3
4
package hello.example.batchprocessing;
public record Person(String firstName, String lastName) {
}
간단하게 enum 비슷한 record 클래스로 만들었다.
성과 이름으로 구성했다.
생성자를 통해 Person 객체를 만들 수 있다.
데이터 처리 클래스 만들기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package hello.example.batchprocessing;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
/**
* 이름을 대문자로 변경하는 역할을 수행
* */
@Slf4j
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.firstName().toUpperCase();
final String lastName = person.lastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting ({}) into ({})", person, transformedPerson);
return transformedPerson;
}
}
다음과 같이 PersonItemProcessor 클래스를 만들었고, ItemProcessor 인터페이스를 상속 받는다.
ItemProcessor<I, O> 으로 제네릭이 2개인 것 보니 인풋 아웃풋의 타입을 정해주는 것 같다.
process 메서드를 구현해 주면 되는데, 뭐 어렵진 않다. toUpperCase로 대문자로 바꿔줬다.
이제 실제 배치를 작성해 보자.
배치 클래스 작성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
public class BatchConfiguration {
/**
* csv 파일을 읽어서 Person 객체로 바인딩 한다.
**/ @Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names("firstName", "lastName")
.targetType(Person.class)
.build();
}
/**
* Person 객체의 이름을 대문자로 변경하기위한 인스턴스를 생성해준다.
**/ @Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
/**
* DB에 데이터를 저장한다.
* */ @Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
/*
* ================================================================================================== * */
/**
* 작업을 정의한다.
* listener 를 통해 작업의 상태를 체크 (구현 필요)
* start(Step) 을 통해 구현한 작업을 시작한다.
* build()로 Job 객체 반환
* */
@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository)
.listener(listener)
.start(step1)
.build();
}
/**
* 실제 작업 내용을 작성한다.
* */ @Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person> chunk(3, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(new CustomSkipPolicy())
.build();
}
}
배치 클래스는 좀 복잡하다.
단계 별로 보자.
1. 파일 읽기
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* csv 파일을 읽어서 Person 객체로 바인딩 한다.
**/
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names("firstName", "lastName")
.targetType(Person.class)
.build();
}
FlatFileItemReaderBuilder는 데이터를 읽어서 여기 서는 Person 객체로 변환할 수 있도록 도와주는 클래스 이다.
resources 폴더에 sample-data.csv을 읽어서
delimited() 구분자로 구분한다. (기본은 , comma)
names로 각 컬럼을 나누고
targetType으로 바인딩 할 객체의 타입을 알아낸다.
2. 읽은 파일 데이터 처리하기
1
2
3
4
5
6
7
/**
* Person 객체의 이름을 대문자로 변경하기위한 인스턴스를 생성해준다.
**/
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
맨 처음 만든 대문자로 바꿔주는 작업을 인스턴스 반환 할 수 있게 메서드로 만든다.
3. 처리한 데이터 쓰기
1
2
3
4
5
6
7
8
9
10
/**
* DB에 데이터를 저장한다.
* */@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
JdbcBatchItemWriterBuilder 에서 쿼리를 만들어서 JdbcBatchItemWriter에서 쌓은 쿼리를 DB에 쏜다.
4. 작업(Job)을 정의한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 작업을 정의한다.
* listener 를 통해 작업의 상태를 체크 (구현 필요)
* start(Step) 을 통해 구현한 작업을 시작한다.
* build()로 Job 객체 반환
* */
@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository)
.listener(listener)
.start(step1)
.build();
}
작업을 정의해 주는데
listener: 작업의 상태를 체크, 작업 시작 전, 후 뭔가 동작 하게 할 수 있다.start: 작업의 하위 단위인step을 실행한다.
이런 식으로 작업을 정의해서 jobRepository에 쌓일 수(?) 있도록 정의한다.
5. step 을 정의한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 실제 작업 내용을 작성한다.
* */
@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person> chunk(3, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(new CustomSkipPolicy())
.build();
}
자 여기 서는 이제 가장 작은 단위인 Step을 정의한다.
<Person, Person> chunk(3, transactionManager)- 제네릭은 Input, Output 즉 reader, writer에 사용될 타입을 정의
chunk: 한 번의 몇 개의 데이터를 작업하는 지, 여기 서는 3개의 데이터를 한 단위로 작업
reader: 데이터를 어떻게 읽어 올 것인가 ? 여기 서는 1. 파일 읽기에 해당processor: 데이터를 어떻게 처리할 것인가? 2. 데이터 처리에 해당writer: 파일 결과 출력, 3. 에 해당faultTolerant,skipPolicy: 예외 시 처리
이런 단계로 구성되어 있다.
리스너 작성
hello.example.batchprocessing.JobCompletionNotificationListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RequiredArgsConstructor
@Slf4j
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private final JdbcTemplate jdbcTemplate;
/**
* 작업이 끝나고 할 행위
* */
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
}
jdbcTemplate
.query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
.forEach(person -> log.info("Found <> in the database", person));
}
}
JobExecutionListener인터페이스를 상속 받아 메서드를 구현하면 되는데

이렇게 시작하기 전, 종료 후 뭔가 할 수 있도록 메서드를 만들어 놨다.
jobExecution으로 job의 상태를 확인할 수 도 있다.
여기서는 로그를 남기고, SELECT 쿼리를 날려서 조회를 해본다.
실행
이제 실행을 해보자.
main 메서드를 실행하면 된다.

다음과 같이 컨버팅이랑 afterJob이 잘 실행 된 걸 볼 수 있다.
이게 배치에 아주 간단한 예제이고, 많은 기능을 사용하진 못했다.
핵심은 Job -> step 의 흐름이라고 생각한다.
더 공부해서 필요할 때 사용할 수 있었으면 좋겠다.
댓글남기기