스프링 배치와 JPA를 활용해 누락된 SNS 이벤트 재발행
SNS 이벤트를 저장하는 이벤트 저장소에서 발행이 누락된 ('published = false') 이벤트를 재발행해주는 배치 작업을 생성해 보자
1. SNS 미발행 이벤트를 스프링 배치로 재발행하는 프로세스 설명
1-1. 이벤트 저장소와 SNS/SQS 메시지 처리
우리 프로젝트는 SNS 이벤트 발행 여부를 추적하는 이벤트 저장소를 갖추고 있다. 이벤트 저장소에 이벤트가 기록된다는 것은 해당 도메인 행위가 성공적으로 완료되었음을 의미한다. 이때, 이벤트의 'published' 속성은 기본적으로 'false'로 설정된다. 프로세스가 원활하게 진행되어 SNS 발행과 SQS 구독이 정상적으로 이루어진 경우, 해당 이벤트는 'published = true'로 업데이트되어 저장된다.
(잘 이해가 안 된다면 아래 글을 읽고 오면 좋다.)
[Spring Boot]MSA 환경에서 SNS/SQS를 활용한 이벤트 처리: 이벤트 유실 문제 해결 방안
1-2. 네트워크 에러와 메시지 재발행
SNS 발행 과정은 HTTP 요청을 포함하기 때문에 네트워크 에러가 발생할 수 있다. 이러한 이유로 'published = false' 상태인 이벤트들이 이벤트 저장소에 남아있을 수 있으며, 이는 SNS 발행이 완료되지 않아 SQS 대기열로 메시지가 전송되지 않았음을 의미한다. 이러한 이벤트를 처리하기 위해, 5분마다 SNS로 재발행하는 배치 작업을 도입할 계획이다.
1-3. 배치 작업의 목적과 중요성
이 배치 작업은 시스템의 신뢰성을 높이는 중요한 역할을 한다. 네트워크 장애나 기타 예외 상황으로 인해 발행에 실패한 메시지를 주기적으로 확인하고 필요에 따라 재시도함으로써, 시스템의 연속성과 데이터의 일관성을 유지한다.
1-4. AWS SNS 재시도 정책과의 연계
현재 우리의 SNS 전송 정책은 아래와 같다.
{
"http": {
"defaultHealthyRetryPolicy": {
"numRetries": 5,
"numNoDelayRetries": 0,
"minDelayTarget": 60,
"maxDelayTarget": 60,
"numMinDelayRetries": 5,
"numMaxDelayRetries": 0,
"backoffFunction": "linear"
},
"disableSubscriptionOverrides": false,
"defaultRequestPolicy": {
"headerContentType": "text/plain; charset=UTF-8"
}
}
}
이 설정에서 "defaultHealthyRetryPolicy" 항목을 살펴보면, 재시도와 관련된 여러 파라미터들이 설정되어 있다:
- numRetries: 5 - 메시지 전송 실패 시, 총 5번의 재시도를 수행한다.
- numNoDelayRetries: 0 - 즉시 재시도를 수행하지 않는다. 따라서, 첫 번째 재시도 전에는 어떠한 지연 시간도 없다.
- minDelayTarget: 60 및 maxDelayTarget: 60 - 각 재시도 사이에는 60초의 지연 시간이 설정된다. 이는 최소 지연 시간(minDelayTarget)과 최대 지연 시간(maxDelayTarget)이 동일하게 설정되어 있으므로, 모든 재시도 간격은 일정하게 60초가 된다.
- numMinDelayRetries: 5 - 최소 지연 시간을 갖는 재시도를 총 5번 수행한다. 이 설정은 전체 재시도 횟수와 일치한다.
- numMaxDelayRetries: 0 - 최대 지연 시간을 갖는 재시도를 수행하지 않는다.
- backoffFunction: "linear" - 재시도 간의 지연 시간이 일정하게 유지된다 (선형 증가 없음).
이 설정에 따르면, 첫 번째 전송 시도가 실패한 후, 60초 간격으로 총 5번의 재시도가 이루어진다. 각 재시도 사이에는 60초의 일정한 지연 시간이 있으며, 이는 전체 재시도 과정이 최대 5분 동안 지속됨을 의미한다. 첫 번째 재시도는 처음 실패 후 60초가 지난 후에 이루어지고, 마지막 재시도는 네 번째 재시도 후 60초가 지난 후에 이루어진다. 이 설정은 시스템에 장애가 발생했을 때 충분한 시간 동안 메시지를 재시도할 수 있게 하여, 메시지 전송의 신뢰성을 높인다.
1-5. SNS 재시도 메커니즘과 어플리케이션 배치 주기 설정
우리의 SNS 전송 정책에 따라, 메시지 전송 실패 시 총 5번의 재시도가 60초 간격으로 이루어진다. 이는 첫 번째 전송 시도 후부터 시작되어, 전체 재시도 과정이 최대 5분 동안 지속될 수 있다는 것을 의미한다. 이러한 SNS의 내부 재시도 메커니즘은 네트워크 오류나 기타 장애 상황에서 발생한 문제를 해결하는 데 시간적 여유를 제공한다.
하지만, SNS의 재시도 시간인 5분이 지난 후에도 메시지가 성공적으로 전송되지 않는 경우가 있을 수 있다. 이를 대비하여, 우리는 어플리케이션 수준에서도 추가적인 안전장치를 마련했다. SNS의 재시도 과정이 끝난 이후, 여전히 'published = false' 상태인 이벤트들을 처리하기 위해 어플리케이션 단계에서 5분마다 실행되는 배치 작업을 구현했다.
이 배치 작업은 SNS의 재시도 시간과 동기화되어 운영되며, SNS의 재시도 과정이 완료된 후에도 전송되지 않은 메시지들을 감지하고 필요한 조치를 취한다. 이러한 방식으로, 우리는 SNS의 내부 메커니즘과 어플리케이션 수준의 배치 작업을 연계하여 시스템의 신뢰성을 더욱 강화했다. 이는 잠재적인 메시지 전송 실패를 최소화하고, 시스템 전반의 안정성을 보장하기 위한 조치다.
1-6. 시스템 부하와 성능에 대한 고려
5분 주기의 배치 작업 설정은 시스템 부하를 최소화하면서도 효율적으로 장애를 복구할 수 있는 균형을 찾는다. 너무 빈번한 배치 실행은 시스템 리소스에 부담을 줄 수 있으며, 반대로 너무 드문 실행은 장애 상황에 늦게 대응할 수 있다. 따라서, 5분 주기의 배치 작업은 시스템의 부하를 관리하고 성능을 유지하는 데 이상적인 간격이다.
2. 스프링 배치를 사용하는데 스프링 스케줄러를 연동한 이유
첫 번째로, 스프링 스케줄러를 사용하면 정해진 시간 또는 간격에 따라 배치 작업을 자동으로 실행할 수 있다. 이는 일정한 주기로 반복되는 데이터 처리 작업에 매우 적합하다. 예를 들어, 매일 또는 매주 특정 시간에 데이터를 집계하거나 업데이트하는 작업 등이 있을 수 있다.
두 번째로, 스케줄러를 사용하면 시스템 리소스 사용을 최적화할 수 있다. 작업을 비즈니스 시간 외에 실행하여 시스템 부하를 분산시키거나, 리소스 사용이 낮은 시간에 배치 작업을 실행하여 전체 시스템 성능에 미치는 영향을 최소화할 수 있다.
세 번째로, 스프링 배치는 로깅, 트랜잭션 관리, 예외 처리 등을 제공하여 배치 작업의 안정성을 높인다. 스케줄러와 연동하면 이러한 기능을 주기적으로 실행되는 작업에 적용하여, 작업의 실패나 예외 상황을 쉽게 감지하고 대응할 수 있다.
마지막으로, 스프링 배치와 스케줄러를 함께 사용하면, 작업의 로직과 스케줄링을 분리할 수 있다. 이는 코드의 확장성과 유지 보수성을 향상시킨다. 작업의 로직이 변경되어도 스케줄링 부분은 영향을 받지 않으며, 반대로 스케줄링 방식을 변경해도 배치 작업 로직에 영향을 미치지 않는다.
3. 스프링 스케줄러 코드
아래는 스프링 스케줄러 코드 화면이다.
@EnableScheduling 어노테이션은 스프링의 스케줄링 기능을 활성화한다.
jobLauncher는 스프링 배치 작업을 실행하는 데 사용되는 객체이며, job은 실행할 배치 작업 자체를 나타낸다.
@Scheduled(cron = "0 */5 * * * *") 이 표현식은 @Scheduled 어노테이션과 cron 표현식을 사용하여 배치 작업의 실행 주기를 정의하고 있다. 해당 코드는 5분마다 작업을 실행하라는 것을 의미한다.
- 초(0): 매 분의 0초에 실행
- 분(*/5): 매 시간의 5분 간격 (예: 0분, 5분, 10분,...)
- 시(*): 매 시간
- 일(*): 매월 모든 날짜
- 월(*): 모든 달
- 요일(*): 매주 모든 요일
public void runBatchJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
runBatchJob 메서드는 배치 작업을 실행한다. JobParameters는 배치 작업에 전달되는 파라미터들을 정의한다. 여기서는 현재 시간을 밀리초 단위로 전달하고 있다.
jobLauncher.run(job, jobParameters)는 정의된 작업(job)을 주어진 파라미터(jobParameters)와 함께 실행한다.
메서드는 여러 종류의 예외를 던질 수 있으며, 각 예외는 배치 작업 실행 중 발생할 수 있는 다양한 문제들(예: 이미 완료된 작업의 재실행 시도, 이미 실행 중인 작업의 중복 실행 시도 등)을 나타낸다.
💡 현재 시간을 밀리초 단위로 전달하는 부분은 스프링 배치에서 매우 중요한 역할을 한다.
이는 JobParameters를 통해 배치 작업의 실행마다 고유한 파라미터를 제공하는 데 사용된다. 이렇게 함으로써 배치 작업이 각 실행마다 서로 다른 인스턴스로 인식되도록 하며, 이는 다음과 같은 이유로 중요하다:
중복 실행 방지: 배치 작업이 이미 실행되었던 상태로 인식되지 않게 하기 위해 각 실행을 고유하게 식별할 필요가 있다. 만약 동일한 JobParameters가 사용된다면, 스프링 배치는 이전에 실행된 동일한 작업으로 간주하고, JobInstanceAlreadyCompleteException 같은 예외를 발생시킬 수 있다.
작업 실행 추적: 고유한 파라미터를 사용함으로써, 각 배치 작업 실행을 개별적으로 추적하고 로깅할 수 있다. 이는 특히 오류 발생 시 문제를 진단하고 해결하는 데 도움이 된다.
재시도 및 복구 용이성: 배치 작업이 실패한 경우, 고유한 파라미터를 가진 새로운 작업 인스턴스를 생성하여 재시도할 수 있다. 이는 오류 복구 프로세스를 간소화하고, 작업 실행의 일관성을 유지하는 데 유용하다.
작업의 독립성 보장: 시간 기반의 고유한 파라미터를 사용함으로써, 각 작업 실행이 서로 독립적임을 보장할 수 있다. 이는 데이터 무결성 및 처리 정확성을 유지하는 데 중요한 요소다.
따라서, 현재 시간을 밀리초 단위로 전달하는 것은 스프링 배치 작업의 실행 관리에 있어서 중요한 기법 중 하나로, 배치 작업의 고유성, 추적성, 그리고 재시도 및 복구 프로세스의 용이성을 제공한다.
(스프링 스케줄러 전체 코드)
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class SchedulerConfig {
private final JobLauncher jobLauncher;
private final Job job;
// 매 5분마다 실행
// @Scheduled(cron = "0 */5 * * * *")
@Scheduled(cron = "*/10 * * * * *") // 개발용 10초마다
public void runBatchJob() throws JobInstanceAlreadyCompleteException
, JobExecutionAlreadyRunningException
, JobParametersInvalidException
, JobRestartException {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
4. 스프링 배치 코드
우리 프로젝트에서 스프링 배치는 이벤트 저장소에서 published = false인 데이터를 읽어와 해당 sns-topic에 맞게 SNS를 재발행해주는 코드로 작성했다.
4-1. 클래스 선언 및 초기화
@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
private final SnsService snsService;
private final EntityManagerFactory entityManagerFactory;
BatchConfig 클래스는 DefaultBatchConfiguration을 상속받는다. 이는 스프링 배치 5에서 새로 도입된 접근 방식으로, 기본 배치 인프라 구성을 제공한다.
(스프링 배치 5에서 변경된 사항이 궁금하다면 아래 글에 작성해 놨다.)
[Spring Boot] Spring Boot 3.X 버전에서 Spring Batch 적용하기
클래스에는 SnsService, EntityManagerFactory 등의 필요한 의존성을 주입했다.
EntityManagerFactory는 JPA(Java Persistence API)를 사용하는 애플리케이션에서 데이터베이스와의 상호작용을 관리하는 핵심 컴포넌트다. 이 팩토리를 통해 EntityManager 인스턴스를 생성하고, 데이터베이스와의 연결, 엔티티의 생명 주기 관리 등의 작업을 수행할 수 있다.
4-2. 상수 선언
@Value("${chunkSize:2}")
private int chunkSize;
@Value("${pageSize:4}")
private int pageSize;
@Value 어노테이션을 사용해 프로퍼티 파일(.properties 또는 .yml)에서 정의된 값을 주입받기 위해 사용된다. 즉, 이 어노테이션은 설정 파일에서 정의한 값을 Java 클래스의 필드에 연결해 준다.
${chunkSize:2} 이 구문은 chunkSize라는 이름의 프로퍼티 값을 찾아 그 값을 chunkSize 필드에 할당한다. 여기서 :2는 기본값을 의미한다. 즉, chunkSize라는 프로퍼티가 설정 파일에 없을 경우 2가 기본값으로 적용된다는 의미다.
@Value("${pageSize:4}") 이 구문은 pageSize라는 프로퍼티 값을 찾아 그 값을 pageSize 필드에 할당한다. 여기서 :4는 기본값을 의미하며, pageSize 프로퍼티가 설정 파일에 없을 경우 4가 기본값으로 적용된다는 의미다.
4-3. Job 정의
/**
* Job을 정의. 하나 이상의 Step을 포함할 수 있으며, 여기서는 sendSmsStackStep을 시작점으로 설정함.
*/
@Bean
public Job job(final JobRepository jobRepository, final Step sendSmsStackStep) {
return new JobBuilder("job", jobRepository)
.start(sendSmsStackStep)
.build();
}
job 메서드는 배치 작업을 정의한다.
JobBuilder는 스프링 배치의 Job 인스턴스를 구성하고 생성하는 데 사용된다. JobBuilder의 생성자는 작업의 이름("job")과 JobRepository를 파라미터로 받는다. 여기서 JobRepository는 배치 작업의 상태와 관련된 데이터(예: 실행, 완료, 실패 등)를 저장하고 관리하는 데 사용된다.
JobBuilder의 start 메서드는 배치 작업의 첫 번째 단계를 설정한다. 여기서 sendSmsStackStep이라는 Step 객체가 시작점으로 설정된다. Step은 배치 작업의 개별 단계를 나타내며, 데이터를 읽고, 처리하고, 쓰는 등의 구체적인 작업을 수행한다. 여기서 sendSmsStackStep은 데이터를 처리하는 실질적인 로직을 포함하고 있다.
JobBuilder의 build 메서드는 최종적으로 Job 객체를 구성하고 생성한다. 이렇게 생성된 Job 객체는 스프링 배치 프레임워크에 의해 실행되며, 설정된 Step들을 순서대로 실행하게 된다.
4-4. Step 정의
@Bean
@Transactional
public Step step(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<MemberEventRecord, MemberEventRecord>chunk(this.chunkSize, transactionManager)
.reader(this.jpaPagingItemReader(entityManagerFactory))
.processor(this.itemProcessor())
.writer(this.itemWriter())
.build();
}
step 메서드는 데이터 처리 단계를 정의한다. 이 단계에서는 JpaPagingItemReader를 사용하여 published=false인 MemberEventRecord 데이터를 읽어온다. 또한, ItemProcessor와 ItemWriter를 통해 읽은 데이터를 처리한다.
@Transactional 어노테이션은 step 메서드 내에서 수행되는 작업들이 트랜잭션의 일부로 관리되어야 함을 나타낸다. 즉, 이 단계의 모든 데이터베이스 작업은 하나의 트랜잭션으로 묶여서, 오류 발생 시 전체 작업이 롤백될 수 있다.
StepBuilder는 스프링 배치의 Step 인스턴스를 구성하는 데 사용된다. StepBuilder의 생성자는 단계의 이름("step")과 JobRepository를 파라미터로 받는다. JobRepository는 배치 작업의 상태를 관리하는 데 필요하다.
.chunk(this.chunkSize, transactionManager) 부분은 이 단계가 'chunk' 지향 처리 방식을 사용함을 나타낸다. 여기서 chunkSize는 한 번에 처리할 아이템의 수를 정의한다. 즉, 이 크기만큼의 데이터를 읽고, 처리하고, 쓰는 작업이 하나의 트랜잭션으로 묶여서 처리된다. transactionManager는 이러한 트랜잭션 관리를 담당한다. 이 방식은 대량의 데이터를 작은 단위의 'Chunk'로 나누어 처리함으로써 메모리 효율성과 성능을 개선한다.
JpaPagingItemReader를 사용하여 published=false인 MemberEventRecord 데이터를 읽는다. 이는 대량의 데이터를 효율적으로 처리하기 위한 전략으로, 페이징 기법을 이용하여 데이터베이스에서 데이터를 분할하여 읽어 들인다.
읽어온 데이터는 ItemProcessor를 통해 가공되고, ItemWriter를 통해 최종적으로 데이터베이스에 저장하거나 다른 형태로 출력된다. 이 과정에서 비즈니스 로직이 적용되어 데이터 변환 및 처리가 이루어진다.
💡 왜 reader, processor, writer에 각각 @Transactional을 적용하지 않고 Step에 적용해 주는 걸까??
이러한 구성은 스프링 배치의 특성과 트랜잭션 관리 방식에 기반했다.
스프링 배치는 기본적으로 'Chunk-oriented processing' 모델을 사용한다. 이 모델에서는 하나의 트랜잭션 내에서 여러 레코드를 읽고(Reader), 처리(Processor), 쓰기(Writer)하는 방식으로 작동한다. 이렇게 함으로써, 트랜잭션 관리가 효율적으로 이루어지며, 오류 발생 시 롤백도 용이하다.
@Transactional을 step 메소드에 적용함으로써, 스프링 배치는 해당 Step의 모든 작업을 하나의 트랜잭션으로 관리할 수 있다. 이는 Chunk 단위로 트랜잭션을 관리하는 스프링 배치의 기본 동작과 일치한다.
❓ 만약 processor, writer에 각각 @Transactional 어노테이션을 적용하면 어떻게 될까?
ItemProcessor 나 ItemWriter에 개별적으로 @Transactional을 적용하면, 각 레코드 처리마다 별도의 트랜잭션이 생성되고 관리된다. 이는 트랜잭션 오버헤드를 증가시키고 성능에 부정적인 영향을 미칠 수 있다. 특히 대량의 데이터를 처리하는 배치 작업에서는 이러한 오버헤드가 누적되어 상당한 성능 저하를 초래할 수 있다.
또한 ItemProcessor나 ItemWriter에 개별적으로 트랜잭션을 적용하면, 전체 Chunk 처리 과정에서 데이터 일관성을 유지하기 어려워질 수 있다. 예를 들어, Processor에서 처리한 데이터가 Writer 단계에서 실패할 경우, 이미 처리된 데이터에 대한 롤백이 복잡해질 수 있다.
따라서, 일반적인 경우에는 Step 수준에서 @Transactional 을 적용하여 전체 Chunk의 읽기, 처리, 쓰기 작업을 하나의 트랜잭션으로 관리하는 것이 바람직하다. 물론, 특정 비즈니스 요구사항이나 성능 최적화를 위해 각 컴포넌트에 별도의 트랜잭션을 적용해야 하는 경우도 있을 수 있으나, 이러한 경우에는 추가적인 고려와 테스트가 필요하다.
4-5. Reader 설정
@Bean
public JpaPagingItemReader<MemberEventRecord> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
log.info("jpaPagingItemReader 동작!");
JpaPagingItemReader<MemberEventRecord> reader = new JpaPagingItemReader<>();
String jpql = """
SELECT mer
FROM MemberEventRecord mer
WHERE mer.published = false
AND mer.id IN (
SELECT MAX(innerMer.id)
FROM MemberEventRecord innerMer
WHERE innerMer.published = false
GROUP BY innerMer.member.id, innerMer.snsTopic
)
ORDER BY mer.id ASC
""";
reader.setQueryString(jpql);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(pageSize); // 페이지 크기 설정
return reader;
}
JpaPagingItemReader 객체는 Spring Batch에서 사용되는 구성 요소로, JPA를 사용하여 데이터베이스에서 페이지 단위로 데이터를 읽는 데 사용된다. 이 코드에서 JpaPagingItemReader<MemberEventRecord> 객체는 특정 조건을 만족하는 MemberEventRecord 엔티티를 데이터베이스에서 페이징 방식으로 읽어오는 역할을 한다.
실행 순서를 설명하자면
먼저, JpaPagingItemReader 객체는 생성되어 초기 설정을 받는다. 이 객체는 데이터베이스에서 MemberEventRecord라는 엔티티를 읽어오는 데 사용된다. 이 과정에서 중요한 부분은 JPQL 쿼리다. 이 쿼리는 published 상태가 false인 MemberEventRecord들 중에서, 각 member.id와 sns Topic별로 가장 큰 id (가장 최신) 값을 가진 레코드들을 선택한다. 쉽게 말해, 아직 발행되지 않은 최신 이벤트 레코드들을 찾는 것이다.
다음으로, 이 객체는 페이징 처리를 한다. reader.setPageSize(pageSize);를 통해 설정된 페이지 크기에 따라, 한 번에 읽어올 데이터의 양이 결정된다. 이는 데이터를 작은 단위로 나누어 처리함으로써, 대용량 데이터를 다룰 때 메모리 사용량을 줄이고 전체 성능을 향상시키는 데 도움을 준다.
마지막으로, 실제 배치 작업이 실행될 때, 이 JpaPagingItemReader는 설정된 페이지 크기에 따라 데이터베이스로부터 MemberEventRecord 데이터를 순차적으로 읽어온다. 이 과정에서 각 페이지를 차례대로 처리하면서, 필요한 데이터를 효율적으로 가져오고 처리한다.
이 코드는 Spring Batch와 JPA를 사용하여 페이징을 통해 메모리 사용량을 제어하고 대용량 데이터를 효과적으로 처리할 수 있다.
(published 상태가 false인 MemberEventRecord들 중에서, 각 member.id와 sns Topic별로 가장 최신 데이터만 가져오는 이유는 아래 글에서 자세히 설명했다.)
[Spring Boot] SNS MessageAttributes를 이용한 분산 시스템 추적용 Trace Id 전달 방법
4-6. Processor 설정
@Bean
public ItemProcessor<MemberEventRecord, MemberEventRecord> itemProcessor() {
return item -> {
log.info("processItem 동작중");
switch (item.getSnsTopic()) {
case "NicknameChange" -> snsService.publishNicknameToTopic(item.getAttribute(), item.getTraceId());
}
return item;
};
}
itemProcessor 메서드는 읽은 데이터를 처리하는 방법을 정의한다. ItemReader가 읽어온 단일 이벤트에 대해서 sns topic 별로 호출해야 하는 service가 다르기 때문에 processor 로직 내에서 분기처리를 했다. 이 과정에서 데이터는 필터링되고 필요한 처리가 수행된다.
현재 로직에서는 닉네임 변경 이벤트밖에 없기 때문에 swich문에 하나의 case만 있는 모습이지만 나중에는 더 추가될 예정이다.
publishNicknameToTopic 메소드는 아래처럼 생겼다.
기존에 SNS 발행 전에 이벤트 저장소에 이벤트를 저장하면서 SNS 발행하기 위한 메시지 본문과 추적을 위한 trace id를 각각 attribute, traceId 컬럼에 저장했었다. 이러한 과정이 있기 때문에 배치 작업에서 후처리 없이 DB에서 가져온 데이터를 이용해 그대로 메소드를 재사용할 수 있었다.
4-7. Writer 설정
@Bean
public ItemWriter<MemberEventRecord> itemWriter() {
log.info("itemWriter 동작!");
JpaItemWriter<MemberEventRecord> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
itemWriter 메서드는 처리된 데이터를 어떻게 기록할지 정의한다. 여기서는 JpaItemWriter를 사용하여 데이터베이스에 업데이트된 엔티티를 저장한다. 이 작업이 마무리되면 트랜잭션이 종료되고 커밋된다.
💡 현재 코드를 따라오다 보면 현재 배치 작업에서는 reader로 데이터를 읽어오고, processor에서 SNS 발행을 처리하기 때문에 writer에서 실제로 데이터베이스에 대한 업데이트나 쓰기 작업을 수행하지 않는다.
스프링 배치에서 ItemWriter는 일반적으로 데이터를 변형하거나 새로운 데이터를 생성한 후, 이를 데이터베이스에 저장하거나 외부 시스템에 전송하는 역할을 한다. 그러나 여기서는 ItemProcessor가 비즈니스 로직을 수행하고 있으며, 이 로직이 데이터베이스의 상태 변경을 포함하지 않는다.
이 경우, ItemWriter는 실제로 필요하지 않을 수 있다. 하지만 스프링 배치의 표준 패턴을 따르기 위해, 또는 향후 이 배치 작업이 확장되어 데이터베이스에 대한 쓰기 작업이 필요해질 수도 있기 때문에 ItemWriter를 구현해 놓았다.
만약 현재와 같은 상태에서 ItemWriter가 불필요하다고 확신한다면, 이를 제거하고 ItemProcessor만 사용하는 것도 가능하다. 이렇게 하면 코드의 명확성과 유지 관리성이 향상될 수 있다. 하지만 이 경우, 향후 배치 프로세스가 변경되어 데이터베이스에 쓰기 작업이 필요해지면, ItemWriter를 다시 추가해야 할 수도 있다.
💡 ItemWriter가 없는 경우에도, 트랜잭션 관리는 유효하다.
스프링 배치에서 트랜잭션 관리는 PlatformTransactionManager에 의해 수행된다. Step 정의 시 PlatformTransactionManager를 설정하면, 스프링 배치는 이를 사용하여 해당 Step의 트랜잭션을 관리한다.
ItemWriter가 없더라도, Step에 설정된 트랜잭션 관리자는 Chunk 단위로 트랜잭션을 시작하고 커밋(또는 롤백)한다. 이는 ItemReader와 ItemProcessor 가 포함된 전체 Chunk 처리 과정에 적용된다.
Chunk의 처리가 시작될 때 트랜잭션이 시작되고, Chunk의 처리가 완료되면 트랜잭션이 커밋된다. 이 과정은 ItemWriter의 유무와 관계없이 동일하게 작동한다. ItemWriter 가 없는 경우, 트랜잭션은 ItemProcessor의 처리가 완료된 후에 커밋된다.
트랜잭션 중에 예외가 발생하면, 해당 Chunk의 처리는 롤백된다. 이는 데이터의 일관성을 유지하는 데 중요하다.
따라서 ItemWriter가 없는 경우에도, 트랜잭션 관리에 대한 걱정은 안 해도 된다. 하지만, ItemWriter의 생략은 배치 프로세스의 전체적인 설계와 요구 사항을 고려하여 결정해야 한다.
이 설정을 통해, 어플리케이션은 주기적으로 데이터베이스에서 특정 조건을 만족하는 데이터를 조회하고, 이를 SNS 서비스를 통해 처리하는 자동화된 배치 작업을 수행할 수 있다. 이는 효율적인 데이터 관리와 자동화된 알림 발송을 가능하게 하며, 시스템의 전반적인 운영을 간소화한다.
(스프링 배치 전체 코드)
import com.recipia.member.aws.SnsService;
import com.recipia.member.domain.event.MemberEventRecord;
import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
/**
* DefaultBatchConfiguration을 확장하는 방식은 스프링 배치 5에서 새로 도입된 접근 방식이다.
* 이는 @EnableBatchProcessing의 기능을 포함하며, 기본 배치 인프라 구성을 제공한다.
*/
@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
private final SnsService snsService;
private final EntityManagerFactory entityManagerFactory;
// 배치 작업에서 사용할 chunkSize 값 설정. 기본값은 1000
@Value("${chunkSize:2}")
private int chunkSize;
@Value("${pageSize:4}")
private int pageSize;
/**
* Job을 정의. 하나 이상의 Step을 포함할 수 있으며, 여기서는 sendSmsStackStep을 시작점으로 설정함.
*/
@Bean
public Job job(final JobRepository jobRepository, final Step sendSmsStackStep) {
return new JobBuilder("job", jobRepository)
.start(sendSmsStackStep)
.build();
}
@Bean
@Transactional
public Step step(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<MemberEventRecord, MemberEventRecord>chunk(this.chunkSize, transactionManager)
.reader(this.jpaPagingItemReader(entityManagerFactory))
.processor(this.itemProcessor())
.writer(this.itemWriter())
.build();
}
/**
* JpaPagingItemReader를 설정. publishedAt이 false인 Member 데이터를 조회하는 쿼리를 실행함.
*/
@Bean
public JpaPagingItemReader<MemberEventRecord> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
log.info("jpaPagingItemReader 동작!");
JpaPagingItemReader<MemberEventRecord> reader = new JpaPagingItemReader<>();
String jpql = """
SELECT mer
FROM MemberEventRecord mer
WHERE mer.published = false
AND mer.id IN (
SELECT MAX(innerMer.id)
FROM MemberEventRecord innerMer
WHERE innerMer.published = false
GROUP BY innerMer.member.id, innerMer.snsTopic
)
ORDER BY mer.id ASC
""";
reader.setQueryString(jpql);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(pageSize); // 페이지 크기 설정
return reader;
}
@Bean
public ItemProcessor<MemberEventRecord, MemberEventRecord> itemProcessor() {
return item -> {
log.info("processItem 동작중");
switch (item.getSnsTopic()) {
case "NicknameChange" -> snsService.publishNicknameToTopic(item.getAttribute(), item.getTraceId());
}
return item;
};
}
/**
* ItemWriter를 설정. 여기서 작업이 마무리되면 트랜잭션이 종료되고 커밋된다.
*/
@Bean
public ItemWriter<MemberEventRecord> itemWriter() {
log.info("itemWriter 동작!");
JpaItemWriter<MemberEventRecord> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
}
5. 마무리
이렇게 스프링 배치를 이용해 누락되거나 유실되는 이벤트 없이 정상적으로 SNS 재발행을 할 수 있었다.
배치를 진행하면서 메소드 재사용 관련 이슈도 해결하면서 여러가지로 좋은 기회였던것같다.
👇 스프링 배치에서 chunk와 pageSize의 차이점이 궁금하다면? 👇
[Spring Boot] 스프링 배치에서 chunk와 JPA pageSize 설정의 관계성
이 포스트는 Team chillwave에서 사이드 프로젝트 중 적용했던 부분을 다시 공부하며 기록한 것입니다.
시간이 괜찮다면 팀원 '개발자의 서랍'님의 블로그도 한번 봐주세요 :)
'Spring > Spring Boot' 카테고리의 다른 글
헥사고날 아키텍처 실전 적용 (1) - 클래스 의존성 주입 및 도메인, 엔티티의 객체 변환 과정 (0) | 2023.12.16 |
---|---|
[Spring Boot] Spring Boot 3.X 버전에 p6spy 적용하기 (0) | 2023.12.08 |
[Spring Boot] SNS MessageAttributes를 이용한 분산 시스템 추적용 Trace Id 전달 방법 (0) | 2023.11.28 |
[Spring Boot] 스프링 배치에서 chunk와 JPA pageSize 설정의 관계성 (0) | 2023.11.27 |
[Spring Boot] 스프링 부트 3.X 버전에서 Spring Batch 테이블 자동 생성 안되는 에러 (0) | 2023.11.27 |