5 분 소요

먼저 필자는 스프링 배치에 대해 잘 모르는데 5.0 예제를 간단하게 사용해보고 정리함.

스프링 배치란

Spring Batch일괄 처리를 위한 오픈 소스 프레임워크 입니다.

현대 기업 시스템에서 자주 볼 수 있는 강력한 배치 응용 프로그램을 개발할 수 있도록 설계된 가볍고 포괄적인 솔루션 입니다. - 위키피디아

전체적인 개념은 adjh54 - [Java] Spring Boot Batch 이해하고 설정하기님의 블로그 글을 읽고 이해하려고 했다.

https://docs.spring.io/spring-batch/reference/job.html 여기 들어가 보면

Job이라는 가장 큰 단위의 작업이 있고 그 안에 Step이 있고

Step에는 Reader 데이터를 읽는 작업, Proccessor 데이터를 처리하는 작업, Writer 처리한 데이터를 출력(저장)하는 단계로 구성되어 있다.

프로젝트 생성

먼저 이번에 실습 해본 프로젝트는 링크를 통해서 확인할 수 있다.

  • 프로젝트 선택
    • Project : Gradle - Groovy
    • Language : Java
    • Spring Boot : 3.3.1
  • Project Metadata
    • Group : hello
    • Artifact : example
    • Name : example
    • Package name : hello.example
    • Packaging : Jar
    • Java : 21
  • Dependencies: Spring Batch, Lombok, HyperSQL Database

프로젝트를 생성하고 jdk 설정 등 세팅이 끝났다고 가정.

resources 폴더에 먼저 샘플 파일들을 몇 개 생성하겠다.

sample-data.csv

Jill,Doe  
Joe,Doe  
Justin,Doe  
Jane,Doe  
John,Doe

schema-all.sql

1
2
3
4
5
6
7
DROP TABLE people IF EXISTS;  
  
CREATE TABLE people  (  
	 person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,  
	 first_name VARCHAR(20),  
	 last_name VARCHAR(20)  
);

Spring Boot는 schema-@@platform@@.sql파일을 시작할 때 자동으로 실행해 버린다. all은 모든 플랫폼의 기본 값.

코드 구현

이제 코드를 구현해 보자.

모든 코드는hello.example.batchprocessing 해당 패키지에서 진행한다.

도메인 클래스 만들기

1
2
3
4
package hello.example.batchprocessing;  
  
public record Person(String firstName, String lastName) {  
}

간단하게 enum 비슷한 record 클래스로 만들었다.

성과 이름으로 구성했다.

생성자를 통해 Person 객체를 만들 수 있다.

데이터 처리 클래스 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package hello.example.batchprocessing;  
  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.batch.item.ItemProcessor;  
  
/**  
 * 이름을 대문자로 변경하는 역할을 수행  
 * */  
@Slf4j  
public class PersonItemProcessor implements ItemProcessor<Person, Person> {  
  
    @Override  
    public Person process(final Person person) throws Exception {  
        final String firstName = person.firstName().toUpperCase();  
        final String lastName = person.lastName().toUpperCase();  
  
        final Person transformedPerson = new Person(firstName, lastName);  
  
        log.info("Converting ({}) into ({})", person, transformedPerson);  
        return transformedPerson;  
    }  
}

다음과 같이 PersonItemProcessor 클래스를 만들었고, ItemProcessor 인터페이스를 상속 받는다.

ItemProcessor<I, O> 으로 제네릭이 2개인 것 보니 인풋 아웃풋의 타입을 정해주는 것 같다.

process 메서드를 구현해 주면 되는데, 뭐 어렵진 않다. toUpperCase로 대문자로 바꿔줬다.

이제 실제 배치를 작성해 보자.

배치 클래스 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration  
public class BatchConfiguration {  
  
  
    /**  
     * csv 파일을 읽어서 Person 객체로 바인딩 한다.  
     **/    @Bean  
    public FlatFileItemReader<Person> reader() {  
        return new FlatFileItemReaderBuilder<Person>()  
                .name("personItemReader")  
                .resource(new ClassPathResource("sample-data.csv"))  
                .delimited()  
                .names("firstName", "lastName")  
                .targetType(Person.class)  
                .build();  
    }  
  
    /**  
     * Person 객체의 이름을 대문자로 변경하기위한 인스턴스를 생성해준다.  
     **/    @Bean  
    public PersonItemProcessor processor() {  
        return new PersonItemProcessor();  
    }  
  
    /**  
     * DB에 데이터를 저장한다.  
     * */    @Bean  
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {  
        return new JdbcBatchItemWriterBuilder<Person>()  
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")  
                .dataSource(dataSource)  
                .beanMapped()  
                .build();  
    }  
  
    /*  
    * ==================================================================================================    * */  
    /**  
     * 작업을 정의한다.  
     * listener 를 통해 작업의 상태를 체크 (구현 필요)  
     * start(Step) 을 통해 구현한 작업을 시작한다.  
     * build()로 Job 객체 반환  
     * */  
    @Bean  
    public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {  
        return new JobBuilder("importUserJob", jobRepository)  
                .listener(listener)  
                .start(step1)  
                .build();  
    }  
  
    /**  
     * 실제 작업 내용을 작성한다.  
     * */    @Bean  
    public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,  
                      FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {  
        return new StepBuilder("step1", jobRepository)  
                .<Person, Person> chunk(3, transactionManager)  
                .reader(reader)  
                .processor(processor)  
                .writer(writer)  
                .faultTolerant()  
                .skipPolicy(new CustomSkipPolicy())  
                .build();  
    }  
  
}

배치 클래스는 좀 복잡하다.

단계 별로 보자.

1. 파일 읽기

1
2
3
4
5
6
7
8
9
10
11
12
13
/**  
 * csv 파일을 읽어서 Person 객체로 바인딩 한다.  
**/    
@Bean  
public FlatFileItemReader<Person> reader() {  
	return new FlatFileItemReaderBuilder<Person>()  
			.name("personItemReader")  
			.resource(new ClassPathResource("sample-data.csv"))  
			.delimited()  
			.names("firstName", "lastName")  
			.targetType(Person.class)  
			.build();  
}  

FlatFileItemReaderBuilder는 데이터를 읽어서 여기 서는 Person 객체로 변환할 수 있도록 도와주는 클래스 이다.

resources 폴더에 sample-data.csv을 읽어서

delimited() 구분자로 구분한다. (기본은 , comma)

names로 각 컬럼을 나누고

targetType으로 바인딩 할 객체의 타입을 알아낸다.

2. 읽은 파일 데이터 처리하기

1
2
3
4
5
6
7
/**  
 * Person 객체의 이름을 대문자로 변경하기위한 인스턴스를 생성해준다.  
 **/
@Bean  
public PersonItemProcessor processor() {  
    return new PersonItemProcessor();  
}

맨 처음 만든 대문자로 바꿔주는 작업을 인스턴스 반환 할 수 있게 메서드로 만든다.

3. 처리한 데이터 쓰기

1
2
3
4
5
6
7
8
9
10
/**  
 * DB에 데이터를 저장한다.  
 * */@Bean  
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {  
    return new JdbcBatchItemWriterBuilder<Person>()  
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")  
            .dataSource(dataSource)  
            .beanMapped()  
            .build();  
}

JdbcBatchItemWriterBuilder 에서 쿼리를 만들어서 JdbcBatchItemWriter에서 쌓은 쿼리를 DB에 쏜다.

4. 작업(Job)을 정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
/**  
 * 작업을 정의한다.  
 * listener 를 통해 작업의 상태를 체크 (구현 필요)  
 * start(Step) 을 통해 구현한 작업을 시작한다.  
 * build()로 Job 객체 반환  
 * */  
@Bean  
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {  
    return new JobBuilder("importUserJob", jobRepository)  
            .listener(listener)  
            .start(step1)  
            .build();  
}

작업을 정의해 주는데

  • listener : 작업의 상태를 체크, 작업 시작 전, 후 뭔가 동작 하게 할 수 있다.
  • start : 작업의 하위 단위인 step을 실행한다.

이런 식으로 작업을 정의해서 jobRepository에 쌓일 수(?) 있도록 정의한다.

5. step 을 정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**  
 * 실제 작업 내용을 작성한다.  
 * */
@Bean  
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,  
                  FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {  
    return new StepBuilder("step1", jobRepository)  
            .<Person, Person> chunk(3, transactionManager)  
            .reader(reader)  
            .processor(processor)  
            .writer(writer)  
            .faultTolerant()  
            .skipPolicy(new CustomSkipPolicy())  
            .build();  
}

자 여기 서는 이제 가장 작은 단위인 Step을 정의한다.

  • <Person, Person> chunk(3, transactionManager)
    • 제네릭은 Input, Output 즉 reader, writer에 사용될 타입을 정의
    • chunk : 한 번의 몇 개의 데이터를 작업하는 지, 여기 서는 3개의 데이터를 한 단위로 작업
  • reader : 데이터를 어떻게 읽어 올 것인가 ? 여기 서는 1. 파일 읽기에 해당
  • processor : 데이터를 어떻게 처리할 것인가? 2. 데이터 처리에 해당
  • writer : 파일 결과 출력, 3. 에 해당
  • faultTolerant, skipPolicy : 예외 시 처리

이런 단계로 구성되어 있다.

리스너 작성

hello.example.batchprocessing.JobCompletionNotificationListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RequiredArgsConstructor  
@Slf4j  
@Component  
public class JobCompletionNotificationListener implements JobExecutionListener {  
  
    private final JdbcTemplate jdbcTemplate;  
  
    /**  
     * 작업이 끝나고 할 행위  
     * */  
    @Override  
    public void afterJob(JobExecution jobExecution) {  
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {  
            log.info("!!! JOB FINISHED! Time to verify the results");  
        }  
  
        jdbcTemplate  
                .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))  
                .forEach(person -> log.info("Found <> in the database", person));  
    }  
}

JobExecutionListener인터페이스를 상속 받아 메서드를 구현하면 되는데

이렇게 시작하기 전, 종료 후 뭔가 할 수 있도록 메서드를 만들어 놨다.

jobExecution으로 job의 상태를 확인할 수 도 있다.

여기서는 로그를 남기고, SELECT 쿼리를 날려서 조회를 해본다.

실행

이제 실행을 해보자.

main 메서드를 실행하면 된다.

다음과 같이 컨버팅이랑 afterJob이 잘 실행 된 걸 볼 수 있다.

이게 배치에 아주 간단한 예제이고, 많은 기능을 사용하진 못했다.

핵심은 Job -> step 의 흐름이라고 생각한다.

더 공부해서 필요할 때 사용할 수 있었으면 좋겠다.

댓글남기기