Backend

<Backend> Spring / Batch

이게왜 2024. 7. 10. 15:38

목차

1. Spring Batch란?

2. Spring Batch의 목적

      2.1 Spring Batch 주요 기능

           2.1.1 Job

           2.1.2 Step

           2.1.3 Chunk

3. 구성요소 및 동작방식

      3.1 구성요소

      3.2 동작방식

4. 실습


1. Spring Batch란?

Spring Batch는 대용량 데이터의 일괄 처리(batch processing)를 지원하는 스프링 기반 프레임워크입니다.

Spring Batch를 통해 복잡한 Batch 작업을 손쉽게 구성하고 실행할 수 있습니다.

또한, 데이터베이스, 파일 시스템 또는 기타 데이터 소스로부터 데이터를 읽고, 처리하고, 다른 데이터베이스나 파일 시스템으로 쓰는 등의 작업을 자동화할 수 있습니다.


2. Spring Batch의 목적

목적 설명
대량 데이터 처리 
대규모 데이터를 효율적으로 처리할 수 있는 기능을 제공하여, 데이터베이스, 파일 시스템, 메시지 큐 등 다양한 소스로부터 데이터를 읽고 가공하여 저장하는 작업을 자동화합니다.
재사용성 배치 작업을 모듈화하여 재사용할 수 있게 함으로써, 동일한 데이터 처리 로직을 다양한 배치 작업에서 쉽게 재사용할 수 있습니다.
에러 처리  배치 작업 중 발생할 수 있는 다양한 오류를 효과적으로 처리하고 복구할 수 있는 메커니즘을 제공합니다. 이를 통해 배치 작업의 신뢰성을 높이고, 오류 발생 시에도 중단 없이 계속 진행할 수 있습니다.
트랜잭션 관리 
배치 작업 중 데이터의 일관성을 유지하기 위해 트랜잭션 관리 기능을 제공합니다. 각 단계에서 발생하는 트랜잭션을 관리하고, 실패 시 롤백할 수 있습니다.
모니터링 및 보고 
배치 작업의 진행 상황을 모니터링하고, 실행 결과를 보고할 수 있는 기능을 제공합니다. 이를 통해 배치 작업의 상태를 쉽게 추적하고 관리할 수 있습니다.
확장성 
대규모 데이터 처리와 다양한 비즈니스 요구 사항을 지원하기 위해 확장 가능한 아키텍처를 제공합니다. 이를 통해 필요에 따라 배치 작업을 확장할 수 있습니다.
스케줄링
배치 작업을 주기적으로 실행할 수 있도록 스케줄링 기능을 제공합니다. 이를 통해 특정 시간이나 주기에 맞춰 배치 작업을 자동으로 실행할 수 있습니다.

2.1 Spring Batch의 주요 기능

기능 설명
Job 및 Step 배치 작업을 Job과 Step으로 구성하여 관리합니다. 각 Job은 여러 Step으로 구성될 수 있으며, 각 Step은 독립적으로 실행됩니다.
ItemReader, ItemProcessor, ItemWriter  데이터를 읽고, 처리하고, 쓰는 작업을 정의할 수 있는 인터페이스를 제공합니다.
Chunk 기반 처리  데이터를 일정 단위로 나누어 처리하는 Chunk 기반 처리 방식을 지원합니다.
트랜잭션 관리 배치 작업 중 트랜잭션을 관리하여 데이터 일관성을 유지합니다.
재시작 및 복구 실패한 배치 작업을 재시작하고 복구할 수 있는 기능을 제공합니다.
스케줄링 배치 작업을 주기적으로 실행할 수 있도록 스케줄링 기능을 제공합니다.
모니터링 및 보고 배치 작업의 실행 상태를 모니터링하고 보고할 수 있는 기능을 제공합니다.

2.1.1 Job

 

Job은 Spring Batch에서 일괄 처리 작업의 전체 흐름을 정의하는 최상위 개체입니다. Job은 여러 개의 Step으로 구성되며, 각 Step은 특정 작업을 수행합니다. Job은 시작, 중지, 재시작, 상태 모니터링 등이 가능합니다.

 

Job의 특징  

구성 요소 하나 이상의 Step으로 구성됩니다.
 독립성 Job은 독립적으로 실행되고, 여러 Job을 병렬로 실행할 수 있습니다.
상태 관리 Job의 상태(예: 성공, 실패, 재시작 등)를 관리합니다.

2.1.2 Step

 

Step은 Job의 구성 요소로, 특정 작업 단위를 수행합니다. 각 Step은 독립적으로 실행되며, 데이터를 읽고, 처리하고, 쓰는 작업을 수행할 수 있습니다.

 

Step의 특징

구성 요소 ItemReader, ItemProcessor, ItemWriter로 구성됩니다.
독립성 각 Step은 독립적으로 실행되며, 하나의 Job 내에서 여러 Step을 순차적으로 실행할 수 있습니다.
트랜잭션 관리 각 Step은 별도의 트랜잭션 범위에서 실행될 수 있습니다.

2.1.3 Chunk

 

Chunk는 Spring Batch에서 데이터를 일정 단위로 나누어 처리하는 방식을 말합니다. Chunk 기반 처리에서는 데이터를 한 번에 모두 처리하는 대신, 일정 크기의 데이터 묶음(Chunk)을 읽고, 처리하고, 쓰는 작업을 수행합니다.

 

Chunk 특징

ItemReader 데이터를 읽어오는 역할을 합니다.
ItemPrcessor 읽어온 데이터를 가공하는 역할을 합니다.
ItemWriter 가공된 데이터를 저장하는 역할을 합니다.

 

Chunk의 흐름

 

1. 읽기(Read): ItemReader를 통해 데이터를 읽어옵니다.

2. 처리(Process): 읽어온 데이터를 ItemProcessor를 통해 가공합니다.

3. 쓰기(Write): 가공된 데이터를 ItemWriter를 통해 저장합니다.

4. 반복: 위의 과정을 Chunk 크기만큼 반복합니다. 모든 데이터가 처리될 때까지 이 과정이 계속됩니다.

 

Spring Batch 사용해 귀찮고 복잡한 대용량 데이터 처리 작업을 편하고 안전하게 관리해 봅시다.


 

3. Batch 구성요소 및 동작 방식

 

3.1 구성요소

Job Scheduler Job Scheduler는 배치 작업을 주기적으로 또는 특정 시간에 실행되도록 스케줄링하는 역할을 합니다.
JobLauncher JobLauncher는 배치 Job을 시작하는 역할을 합니다. JobScheduler로부터 호출되어 Job을 실행하게 됩니다.
Execution Context Execution Context는 Job과 Step의 상태 정보를 저장하는 역할을 합니다. 이를 통해 Job과 Step의 상태를 추적하고, 재시작 시 이전 상태를 복원할 수 있습니다.
JobRepository JobRepository는 Job과 Step의 실행 정보를 데이터베이스에 저장하고 관리하는 역할을 합니다. 이를 통해 실행 이력 관리, 재시작 시점 관리 등이 가능합니다.

 


3.2 동작 방식

 

1. JobExecution

JobLauncher는 Job을 실행하며, 이때 JobExecution 객체가 생성되어 Job의 실행 상태를 관리합니다.

 

2. StepExecution

각 Step이 실행되면 StepExecution 객체가 생성되어 Step의 실행 상태를 관리합니다. 각 Step은 ItemReader, ItemProcessor, ItemWriter로 구성된 Chunk 단위로 데이터를 처리합니다.

 

3. Chunk 기반 처리

ItemReader를 통해 데이터를 일정 크기(Chunk)로 읽어옵니다.

읽어온 데이터는 ItemProcessor를 통해 가공됩니다.

가공된 데이터는 ItemWriter를 통해 저장됩니다.

이 과정은 모든 데이터가 처리될 때까지 반복됩니다.

 

4. 데이터 저장 및 재시작

JobRepository는 Job과 Step의 실행 상태와 데이터를 데이터베이스에 저장합니다. 이를 통해 실행 중 문제가 발생하거나 중단되었을 때, 저장된 상태를 기반으로 Job을 재시작할 수 있습니다.


4. 실습

본 실습은 Spring 2.x 버전을 사용합니다.

Sample-data.csv를 읽고 해당 데이터를 firstName과 lastName으로 나눠 DB에 저장

 

Sample-data.csv

 

batch DB의 people Table


실습 목적

  1. local에 있는 .csv 파일을 읽어 local DB에 입력하기 위함.
  2. 스케쥴러를 사용하여 사용자가 지정한 시간마다 Batch가 실행되도록 구현.

Directory 구조

 

BatchConfig 

더보기
package com.example.batch.config;

import com.example.batch.listener.JobCompletionNotificationListener;
import com.example.batch.model.Person;
import com.example.batch.processor.PersonItemProcessor;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableBatchProcessing
@EnableScheduling
@RequiredArgsConstructor
public class BatchConfig {

    public final JobBuilderFactory jobBuilderFactory;

    public final StepBuilderFactory stepBuilderFactory;

    public final DataSource dataSource;

    private final JobLauncher jobLauncher;

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited()
                .names("firstName", "lastName")
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .build();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(5) // chunk size를 5로 설정 (row 5줄)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

    @Scheduled(cron = "*/3 * * * * *") // 3초마다 실행
    public void perform() throws JobExecutionAlreadyRunningException, JobRestartException,
            JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(importUserJob(), params); // 메서드를 호출하여 Job을 가져옴
    }

    // 스케줄링 메서드에서 사용할 Job을 반환하는 메서드
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .start(step1(writer(dataSource)))
                .build();
    }
}

JobCompletionNotificationListener

더보기
package com.example.batch.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("Job finished! Verifying the results...");

            jdbcTemplate.query("SELECT first_name, last_name FROM people",
                    (rs, row) -> String.format("FirstName: %s, LastName: %s", rs.getString(1), rs.getString(2))
            ).forEach(str -> log.info("Found <" + str + "> in the database."));
        }
    }
}

Person

더보기
package com.example.batch.model;

public class Person {

    private String firstName;
    private String lastName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

 

PersonItemProcessor

더보기
package com.example.batch.processor;

import com.example.batch.model.Person;
import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();
        return new Person(firstName, lastName);
    }
}

 

Log History 및 실행 후 Table