목차
1. Spring Batch란?
2. Spring Batch의 목적
2.1 Spring Batch 주요 기능
2.1.1 Job
2.1.2 Step
2.1.3 Chunk
3. 구성요소 및 동작방식
3.1 구성요소
3.2 동작방식
4. 실습
1. Spring Batch란?
Spring Batch는 대용량 데이터의 일괄 처리(batch processing)를 지원하는 스프링 기반 프레임워크입니다.
Spring Batch를 통해 복잡한 Batch 작업을 손쉽게 구성하고 실행할 수 있습니다.
또한, 데이터베이스, 파일 시스템 또는 기타 데이터 소스로부터 데이터를 읽고, 처리하고, 다른 데이터베이스나 파일 시스템으로 쓰는 등의 작업을 자동화할 수 있습니다.
2. Spring Batch의 목적
목적 | 설명 |
대량 데이터 처리 | 대규모 데이터를 효율적으로 처리할 수 있는 기능을 제공하여, 데이터베이스, 파일 시스템, 메시지 큐 등 다양한 소스로부터 데이터를 읽고 가공하여 저장하는 작업을 자동화합니다. |
재사용성 | 배치 작업을 모듈화하여 재사용할 수 있게 함으로써, 동일한 데이터 처리 로직을 다양한 배치 작업에서 쉽게 재사용할 수 있습니다. |
에러 처리 | 배치 작업 중 발생할 수 있는 다양한 오류를 효과적으로 처리하고 복구할 수 있는 메커니즘을 제공합니다. 이를 통해 배치 작업의 신뢰성을 높이고, 오류 발생 시에도 중단 없이 계속 진행할 수 있습니다. |
트랜잭션 관리 | 배치 작업 중 데이터의 일관성을 유지하기 위해 트랜잭션 관리 기능을 제공합니다. 각 단계에서 발생하는 트랜잭션을 관리하고, 실패 시 롤백할 수 있습니다. |
모니터링 및 보고 | 배치 작업의 진행 상황을 모니터링하고, 실행 결과를 보고할 수 있는 기능을 제공합니다. 이를 통해 배치 작업의 상태를 쉽게 추적하고 관리할 수 있습니다. |
확장성 | 대규모 데이터 처리와 다양한 비즈니스 요구 사항을 지원하기 위해 확장 가능한 아키텍처를 제공합니다. 이를 통해 필요에 따라 배치 작업을 확장할 수 있습니다. |
스케줄링 | 배치 작업을 주기적으로 실행할 수 있도록 스케줄링 기능을 제공합니다. 이를 통해 특정 시간이나 주기에 맞춰 배치 작업을 자동으로 실행할 수 있습니다. |
2.1 Spring Batch의 주요 기능
기능 | 설명 |
Job 및 Step | 배치 작업을 Job과 Step으로 구성하여 관리합니다. 각 Job은 여러 Step으로 구성될 수 있으며, 각 Step은 독립적으로 실행됩니다. |
ItemReader, ItemProcessor, ItemWriter | 데이터를 읽고, 처리하고, 쓰는 작업을 정의할 수 있는 인터페이스를 제공합니다. |
Chunk 기반 처리 | 데이터를 일정 단위로 나누어 처리하는 Chunk 기반 처리 방식을 지원합니다. |
트랜잭션 관리 | 배치 작업 중 트랜잭션을 관리하여 데이터 일관성을 유지합니다. |
재시작 및 복구 | 실패한 배치 작업을 재시작하고 복구할 수 있는 기능을 제공합니다. |
스케줄링 | 배치 작업을 주기적으로 실행할 수 있도록 스케줄링 기능을 제공합니다. |
모니터링 및 보고 | 배치 작업의 실행 상태를 모니터링하고 보고할 수 있는 기능을 제공합니다. |
2.1.1 Job
Job은 Spring Batch에서 일괄 처리 작업의 전체 흐름을 정의하는 최상위 개체입니다. Job은 여러 개의 Step으로 구성되며, 각 Step은 특정 작업을 수행합니다. Job은 시작, 중지, 재시작, 상태 모니터링 등이 가능합니다.
Job의 특징
구성 요소 | 하나 이상의 Step으로 구성됩니다. |
독립성 | Job은 독립적으로 실행되고, 여러 Job을 병렬로 실행할 수 있습니다. |
상태 관리 | Job의 상태(예: 성공, 실패, 재시작 등)를 관리합니다. |
2.1.2 Step
Step은 Job의 구성 요소로, 특정 작업 단위를 수행합니다. 각 Step은 독립적으로 실행되며, 데이터를 읽고, 처리하고, 쓰는 작업을 수행할 수 있습니다.
Step의 특징
구성 요소 | ItemReader, ItemProcessor, ItemWriter로 구성됩니다. |
독립성 | 각 Step은 독립적으로 실행되며, 하나의 Job 내에서 여러 Step을 순차적으로 실행할 수 있습니다. |
트랜잭션 관리 | 각 Step은 별도의 트랜잭션 범위에서 실행될 수 있습니다. |
2.1.3 Chunk
Chunk는 Spring Batch에서 데이터를 일정 단위로 나누어 처리하는 방식을 말합니다. Chunk 기반 처리에서는 데이터를 한 번에 모두 처리하는 대신, 일정 크기의 데이터 묶음(Chunk)을 읽고, 처리하고, 쓰는 작업을 수행합니다.
Chunk 특징
ItemReader | 데이터를 읽어오는 역할을 합니다. |
ItemPrcessor | 읽어온 데이터를 가공하는 역할을 합니다. |
ItemWriter | 가공된 데이터를 저장하는 역할을 합니다. |
Chunk의 흐름
1. 읽기(Read): ItemReader를 통해 데이터를 읽어옵니다.
2. 처리(Process): 읽어온 데이터를 ItemProcessor를 통해 가공합니다.
3. 쓰기(Write): 가공된 데이터를 ItemWriter를 통해 저장합니다.
4. 반복: 위의 과정을 Chunk 크기만큼 반복합니다. 모든 데이터가 처리될 때까지 이 과정이 계속됩니다.
Spring Batch 사용해 귀찮고 복잡한 대용량 데이터 처리 작업을 편하고 안전하게 관리해 봅시다.
3. Batch 구성요소 및 동작 방식
3.1 구성요소
Job Scheduler | Job Scheduler는 배치 작업을 주기적으로 또는 특정 시간에 실행되도록 스케줄링하는 역할을 합니다. |
JobLauncher | JobLauncher는 배치 Job을 시작하는 역할을 합니다. JobScheduler로부터 호출되어 Job을 실행하게 됩니다. |
Execution Context | Execution Context는 Job과 Step의 상태 정보를 저장하는 역할을 합니다. 이를 통해 Job과 Step의 상태를 추적하고, 재시작 시 이전 상태를 복원할 수 있습니다. |
JobRepository | JobRepository는 Job과 Step의 실행 정보를 데이터베이스에 저장하고 관리하는 역할을 합니다. 이를 통해 실행 이력 관리, 재시작 시점 관리 등이 가능합니다. |
3.2 동작 방식
1. JobExecution
JobLauncher는 Job을 실행하며, 이때 JobExecution 객체가 생성되어 Job의 실행 상태를 관리합니다.
2. StepExecution
각 Step이 실행되면 StepExecution 객체가 생성되어 Step의 실행 상태를 관리합니다. 각 Step은 ItemReader, ItemProcessor, ItemWriter로 구성된 Chunk 단위로 데이터를 처리합니다.
3. Chunk 기반 처리
ItemReader를 통해 데이터를 일정 크기(Chunk)로 읽어옵니다.
읽어온 데이터는 ItemProcessor를 통해 가공됩니다.
가공된 데이터는 ItemWriter를 통해 저장됩니다.
이 과정은 모든 데이터가 처리될 때까지 반복됩니다.
4. 데이터 저장 및 재시작
JobRepository는 Job과 Step의 실행 상태와 데이터를 데이터베이스에 저장합니다. 이를 통해 실행 중 문제가 발생하거나 중단되었을 때, 저장된 상태를 기반으로 Job을 재시작할 수 있습니다.
4. 실습
본 실습은 Spring 2.x 버전을 사용합니다.
Sample-data.csv를 읽고 해당 데이터를 firstName과 lastName으로 나눠 DB에 저장
Sample-data.csv
batch DB의 people Table
실습 목적
- local에 있는 .csv 파일을 읽어 local DB에 입력하기 위함.
- 스케쥴러를 사용하여 사용자가 지정한 시간마다 Batch가 실행되도록 구현.
Directory 구조
BatchConfig
package com.example.batch.config;
import com.example.batch.listener.JobCompletionNotificationListener;
import com.example.batch.model.Person;
import com.example.batch.processor.PersonItemProcessor;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableBatchProcessing
@EnableScheduling
@RequiredArgsConstructor
public class BatchConfig {
public final JobBuilderFactory jobBuilderFactory;
public final StepBuilderFactory stepBuilderFactory;
public final DataSource dataSource;
private final JobLauncher jobLauncher;
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names("firstName", "lastName")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(5) // chunk size를 5로 설정 (row 5줄)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Scheduled(cron = "*/3 * * * * *") // 3초마다 실행
public void perform() throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters params = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(importUserJob(), params); // 메서드를 호출하여 Job을 가져옴
}
// 스케줄링 메서드에서 사용할 Job을 반환하는 메서드
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.start(step1(writer(dataSource)))
.build();
}
}
JobCompletionNotificationListener
package com.example.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("Job finished! Verifying the results...");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, row) -> String.format("FirstName: %s, LastName: %s", rs.getString(1), rs.getString(2))
).forEach(str -> log.info("Found <" + str + "> in the database."));
}
}
}
Person
package com.example.batch.model;
public class Person {
private String firstName;
private String lastName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
PersonItemProcessor
package com.example.batch.processor;
import com.example.batch.model.Person;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
return new Person(firstName, lastName);
}
}
Log History 및 실행 후 Table
'Backend' 카테고리의 다른 글
<Backend> Spring / Spring Security (0) | 2024.07.11 |
---|---|
<Backend> Spring / QueryDSL (0) | 2024.07.09 |
<Backend> Java / 상속 (0) | 2024.04.05 |
<Backend> Java / Class (3) 접근제한자와 변수의 타입 (0) | 2024.04.04 |
<Backend> Java / Class (2) this와 메소드 (0) | 2024.04.04 |