분산 시스템에서의 SNS/SQS 메시지 처리: 단일 책임 원칙과 Zipkin의 Trace ID 에러 핸들링을 중심으로
이번 글에서는 단일 책임 원칙에 따라 메소드를 분리하고, SNS 발행 메소드와 SQS 메소드에서 메시지 에러 핸들링 내용을 다뤄보자.
프로젝트를 진행하면서 이제 모든 정상 프로세스가 완료되었고, 배치 작업을 시작하려고 했을때 갑자기 팀원이
근데, 만약에 메세지에 traceId가 없을 때 SQS Listener에서 에러 핸들링은 어떻게 하지?
라는 질문을 했다.
그래서 SNS 메세지에 traceId가 없을 때 발생할 수 있는 에러를 핸들링하는 과정을 소개하겠다.
1. 기존 프로세스 간략 설명
기존에 어떤 프로세스로 진행되었는지 전부 설명하기에는 너무 오래 걸릴것같아서, 아래 글을 통해 확인하고 오면 좋을 것 같다.
스프링 이벤트를 활용해서 SNS, SQS 프로세스를 진행한 구성은 아래 글에 나와있다.
[Spring Boot]MSA 환경에서 SNS/SQS를 활용한 이벤트 처리: 이벤트 유실 문제 해결 방안
SNS 메시지에 traceId를 포함해서 발행하는 구성은 아래 글을 나와있다.
[Spring Boot] AWS 분산 시스템에서 Zipkin으로 로그 추적하기: SNS/SQS 통합 실전 적용
우리 프로젝트에서는 닉네임 변경 이벤트를 다루고 있다. 이 프로세스는 다음과 같다.
우선 닉네임 변경 이벤트가 발생하면, 첫 번째 이벤트 리스너는 이벤트 저장소에 'memberId'를 attribute 컬럼에 포함시켜 저장한다. 그리고 두번째 이벤트 리스너가 memberId를 메세지에 포함시키고 이를 SNS 발행 메소드로 전달한다. 그리고 SNS 발행 메소드에서는 Trace ID를 직접 추출하여 메세지에 추가한다. 이로써 SNS 메시지에서 Trace ID가 누락될 가능성을 줄였다.
그리고 비즈니스 로직을 담당하는 SQS 리스너는 메시지에 포함된 Trace ID를 추출하여 새로운 TraceContext를 생성한다.
정상적으로 진행될 때는 아무 문제 없이 작동하는 코드가 배치 작업을 구현하려고 하면서 여러 에러 사항이 나타났다. 여기서 우리가 진행할 배치 작업이란, 5분마다 이벤트 저장소에서 'published = false'인 데이터들을 찾아내어 다시 SNS에 재발행하는 작업을 의미한다.
2. 프로세스 에러사항
첫 번째로, 이벤트 저장소 테이블의 attribute 컬럼에 memberId만 저장하게 되어, 배치 작업에서 메시지를 재발행해도 기존 Trace Id로 추적할 수 없는 문제가 발생했다. 즉, 최초 HTTP 요청에서 생성된 Trace Id가 사라져(누락되어) Zipkin UI에서 기존 Trace Id로 추적이 불가능한 상황이었다.
▶️ 이 문제는 이벤트 저장소에 Trace Id를 함께 저장함으로써 해결할 수 있다.
두 번째로, 배치 작업에서 SNS 발행 메소드를 재사용할 때 문제가 발생했다. SNS 발행 메소드 안에서 Trace Id를 직접 추출하게 되면, 이벤트 저장소에 Trace Id를 저장해도 그 값으로 메시지가 발행되지 않고, SNS 발행 메소드에서 추출한 Trace Id로 발행된다. 하지만 배치 작업은 일반적인 HTTP 요청이 아니라 백그라운드 작업이기 때문에 별도의 Trace Id가 생성되지 않는다. 이로 인해 SNS 발행 메소드에 작성된 아래 코드에서 traceId가 null이거나 빈 값을 반환하는 문제가 생길 수 있다.
// 현재 TraceID 추출
String traceId = tracer.currentSpan().context().traceIdString();
▶️ 해결책으로는 두 가지 방법이 있다. 첫째, 메시지를 받고 비즈니스 로직을 처리하는 SQS 리스너(지금 상황에서는 레시피 서버의 회원 닉네임을 업데이트시키는 로직)가 메시지에 Trace Id 값이 있고 비어있지 않을 때만 정상 진행하도록 코드를 수정하는 것이다. 둘째, SNS 발행 메소드에서 Trace Id를 추출하는 코드를 제거하는 것이다. SNS 발행 메소드는 단일 책임 원칙에 따라 파라미터로 받은 메시지를 그대로 SNS에 발행하는 메소드로 변경한다. 이를 위해서는 SNS 발행 메소드를 호출하기 전에 이미 완전한 메시지를 준비해야 하는 추가 변경 사항이 생긴다.
이제 이 해결책들을 코드에 적용해 보자.
💡 단일 책임 원칙에 대한 설명:
단일 책임 원칙(Single Responsibility Principle, SRP)은 소프트웨어 설계의 중요한 원칙 중 하나다. 이 원칙은 "한 클래스는 하나의 책임만 가져야 한다"는 개념을 중심으로 한다. 즉, 한 클래스가 시스템의 여러 기능을 담당하면 안 되며, 그 클래스가 변경되어야 하는 이유는 오직 하나여야 한다는 의미다. 이 원칙을 따르면 코드의 유지 보수가 용이해지고, 클래스의 기능이 명확해져 시스템의 복잡성이 줄어든다.
우리 프로젝트에서 SNS 발행 메소드를 단일 책임 원칙에 맞게 수정하는 것은 이러한 이유 때문이다.
처음에 이 메소드는 두 가지 일을 했다. 메시지를 SNS에 발행하는 것과 Trace ID를 추출하는 것이었다. 이렇게 되면 Trace ID 처리 방식에 변화가 생길 때 SNS 발행 부분까지 영향을 받을 수 있다. 이를 방지하기 위해 SNS 발행 메소드를 수정했다. 이제 이 메소드는 메시지 발행만 담당하고 Trace ID 추출은 다른 곳에서 처리한다. 이 변경으로 메소드가 하나의 책임만 지니게 되었고, 시스템 전체가 더 유연하고 관리하기 쉬워졌다.
결국 단일 책임 원칙을 따르면 코드가 더 깔끔해지고, 변경이 필요할 때 해당 부분만 수정하면 되니 전체적인 유지 보수가 훨씬 수월해진다.
3. 코드 수정
3-1. 첫 번째 문제 해결 : 이벤트 저장소에 Trace Id까지 함께 저장하기
- 이벤트 저장소에 이벤트를 저장하는 로직은 도메인 행위(회원 테이블의 닉네임 변경) 트랜잭션과 하나로 묶여있기 때문에 여기서 가져오는 Tracer Id는 최초 HTTP 요청으로 인해 생성된 Trace Id와 동일하다.
- 여기서 추출한 traceId를 메시지에 추가해 주고 이 전체 내용을 이벤트 저장소에 저장해 준다.
(이벤트 저장소에 데이터 저장하는 Spring Event 리스너 전체 코드)
import brave.Tracer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.recipia.member.config.aws.AwsSnsConfig;
import com.recipia.member.domain.Member;
import com.recipia.member.domain.event.MemberEventRecord;
import com.recipia.member.exception.ErrorCode;
import com.recipia.member.exception.MemberApplicationException;
import com.recipia.member.repository.MemberEventRecordRepository;
import com.recipia.member.repository.MemberRepository;
import com.recipia.member.utils.CustomJsonBuilder;
import com.recipia.member.utils.MemberStringUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@RequiredArgsConstructor
@Component
public class SpringEventRecordListener {
private final MemberRepository memberRepository;
private final MemberEventRecordRepository memberEventRecordRepository;
private final AwsSnsConfig awsSnsConfig;
private final CustomJsonBuilder customJsonBuilder;
private final Tracer tracer;
/**
* 이벤트를 호출한 서비스 코드의 트랜잭션과 묶이게 된다.
*/
@Transactional
@EventListener
public void eventRecordListener(NicknameChangeSpringEvent event) throws JsonProcessingException {
Member member = memberRepository.findById(event.memberId()).orElseThrow(() -> new MemberApplicationException(ErrorCode.USER_NOT_FOUND));
// 현재 TraceID 추출
String traceId = tracer.currentSpan().context().traceIdString();
// message에 memberId, traceId 주입
String messageJson = customJsonBuilder
.add("memberId", event.memberId().toString())
.add("traceId", traceId)
.build();
String topicName = MemberStringUtils.extractLastPart(awsSnsConfig.getSnsTopicNicknameChangeARN());
MemberEventRecord memberEventRecord = MemberEventRecord.of(
member,
topicName,
"NicknameChangeEvent",
messageJson,
false,
null
);
memberEventRecordRepository.save(memberEventRecord);
}
}
3-2. SQS 리스너(비즈니스 로직 담당)에서 Trace Id가 있을 때만 정상 진행
SQS 리스너 메소드를 수정하면서 사소한 로직도 조금씩 수정했다.
아래는 기존 SQS 리스너 전체 코드다.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recipia.recipe.event.springevent.NicknameChangeEvent;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
@Slf4j
@RequiredArgsConstructor
@Service
public class AwsSqsListenerService {
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher eventPublisher;
private final Tracer tracer;
@SqsListener(value = "${spring.cloud.aws.sqs.nickname-sqs-name}")
public void receiveMessage(String messageJson) throws JsonProcessingException {
JsonNode messageNode = objectMapper.readTree(messageJson);
String messageId = messageNode.get("MessageId").asText();
String messageContent = messageNode.get("Message").asText();
log.info("[RECIPE] Received message from SQS with messageId: {}", messageId);
JsonNode message = objectMapper.readTree(messageContent);
String traceId = extractTraceIdFromMessage(message);
TraceContext context = buildTraceContext(traceId);
Span span = tracer.nextSpan(TraceContextOrSamplingFlags.create(context))
.name("[RECIPE] nickname-change SQS Received") // Span 이름 지정
.start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
processNicknameMessage(message);
} catch (Exception e) {
span.tag("error", e.toString());
log.error("Error processing SQS message: ", e);
} finally {
span.finish();
}
}
private String extractTraceIdFromMessage(JsonNode message) {
return message.get("traceId").asText();
}
private TraceContext buildTraceContext(String traceId) {
TraceContext.Builder contextBuilder = TraceContext.newBuilder();
if (traceId.length() == 32) {
long traceIdHigh = Long.parseUnsignedLong(traceId.substring(0, 16), 16);
long traceIdLow = Long.parseUnsignedLong(traceId.substring(16), 16);
contextBuilder.traceIdHigh(traceIdHigh).traceId(traceIdLow);
} else {
long traceIdLow = Long.parseUnsignedLong(traceId, 16);
contextBuilder.traceId(traceIdLow);
}
// 새로운 Span ID 생성 (tracer 인스턴스를 사용)
contextBuilder.spanId(tracer.nextSpan().context().spanId());
return contextBuilder.build();
}
private void processNicknameMessage(JsonNode message) throws JsonProcessingException {
// 'message' 필드 내의 JSON 문자열을 추출
String messageContent = message.get("message").asText();
// 추출된 JSON 문자열을 파싱하여 memberId를 얻음
JsonNode innerMessageNode = objectMapper.readTree(messageContent);
Long memberId = Long.valueOf(innerMessageNode.get("memberId").asText());
// 추출된 memberId로 이벤트 발행 및 로깅
eventPublisher.publishEvent(new NicknameChangeEvent(memberId));
log.info("Processed NicknameChangeEvent for memberId: {}", memberId);
}
}
기존에는 발행된 메시지(messageJson)를 JsonNode로 변환해서 접근했다면 이제는 아예 데이터를 받자마자 우리가 알아보기 쉬운 객체로 맵핑해주는 로직으로 수정했다.
SnsInformationDto는 SNS 메시지 전체 Json 데이터다.
public record SnsInformationDto(
String Type,
String MessageId,
String TopicArn,
String Message, // 이 필드는 JSON 문자열이므로 나중에 MemberMessageDto로 파싱됩니다.
String Timestamp,
String SignatureVersion,
String Signature,
String SigningCertURL,
String UnsubscribeURL
) {
}
그리고 SnsMessageDto는 SNS 발행 메소드에서 우리가 직접 넣은 메시지 부분에 해당한다. (현재 로직에서는 memberId, traceId가 존재한다.)
public record SnsMessageDto(Long memberId, String traceId) {
}
그 이후에 snsMessageDto에서 traceId가 없으면 바로 리턴시키는 코드를 추가시켜 줬다.
(traceId가 없어도 에러를 던지지 않는 이유는 4번 항목(비즈니스 로직 담당하는 SQS Listener에서 에러처리 하지 않은 이유)에서 자세히 설명하겠다.)
(수정된 SQS 리스너 전체 코드다)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recipia.recipe.dto.SnsInformationDto;
import com.recipia.recipe.dto.SnsMessageDto;
import com.recipia.recipe.event.springevent.NicknameChangeEvent;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
@Slf4j
@RequiredArgsConstructor
@Service
public class AwsSqsListenerService {
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher eventPublisher;
private final Tracer tracer;
@SqsListener(value = "${spring.cloud.aws.sqs.nickname-sqs-name}")
public void receiveMessage(String messageJson) throws JsonProcessingException {
SnsInformationDto snsInformationDto = objectMapper.readValue(messageJson, SnsInformationDto.class);
SnsMessageDto snsMessageDto = objectMapper.readValue(snsInformationDto.Message(), SnsMessageDto.class);
if (snsMessageDto.traceId() == null) {
// log.error("No traceId found in the message. memberId: {}, skipping processing.", snsMessageDto.memberId());
return;
}
// 이전 서버에서 보낸 traceId를 사용하여 새로운 TraceContext를 생성
TraceContext context = buildTraceContext(snsMessageDto.traceId());
Span span = tracer.nextSpan(TraceContextOrSamplingFlags.create(context))
.name("[RECIPE] nickname-change SQS Received")
.start();
// 추출된 memberId로 이벤트 발행 및 로깅
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
eventPublisher.publishEvent(new NicknameChangeEvent(snsMessageDto.memberId()));
} catch (Exception e) {
span.tag("error", e.toString());
log.error("Error processing SQS message: ", e);
} finally {
span.finish();
}
}
/**
* [Extract Method] - TraceContext 생성
* 멤버 서버에서 받은 traceId를 TraceContext안에 세팅해 준다.
*/
private TraceContext buildTraceContext(String traceId) {
TraceContext.Builder contextBuilder = TraceContext.newBuilder();
// traceId의 길이가 32자리인 경우 128비트 traceId로 처리하고, 그렇지 않은 경우 64비트 traceId로 처리
if (traceId.length() == 32) {
long traceIdHigh = Long.parseUnsignedLong(traceId.substring(0, 16), 16);
long traceIdLow = Long.parseUnsignedLong(traceId.substring(16), 16);
contextBuilder.traceIdHigh(traceIdHigh).traceId(traceIdLow);
} else {
long traceIdLow = Long.parseUnsignedLong(traceId, 16);
contextBuilder.traceId(traceIdLow);
}
// 새로운 Span ID를 생성 하고 TraceContext 객체를 빌드해서 반환한다.
contextBuilder.spanId(tracer.nextSpan().context().spanId());
return contextBuilder.build();
}
}
3-3. SNS 발행 메소드에서 Trace Id 추출 코드 제거
위에서 설명한 단일 책임 원칙에 따라 SNS 발행 메소드에서 Trace Id를 추출하는 코드를 제거했다. 더 추가된건 새로운 Span을 생성하는 코드다.
(혹시 Trace Id와 Span Id가 헷갈린다면 아래 링크에서 설명을 확인하면 좋다.)
[Spring Boot] AWS 분산 시스템에서 Zipkin으로 로그 추적하기: SNS/SQS 통합 실전 적용
❓이 메소드에서 Trace ID 추출 로직을 제거하고 Zipkin Trace의 Span만 생성하는 것을 보면 처음에는 '이것도 단일 책임 원칙을 위반하는 거 아닌가?'라고 생각할 수 있다. 하지만 이 변경이 단일 책임 원칙에 오히려 부합한다.
단일 책임 원칙은 한 클래스나 메소드가 오직 하나의 기능이나 책임만을 가져야 한다고 말한다. publishNicknameToTopic 메소드의 경우, 이 메소드의 핵심 책임은 SNS에 메시지를 발행하는 것이다. 여기에 Zipkin Trace의 Span을 생성하는 로직은 SNS 발행이 추적 가능하도록 하는 부가적인 역할을 한다. 즉, 이 로직은 메시지 발행과 직접적으로 관련된 추적 기능을 지원하는 것으로, 메소드의 기본 책임을 벗어나지 않는다.
이렇게 메소드의 책임을 명확히 하고, 부가적인 추적 기능은 지원하는 방식으로 코드를 구성하면, 나중에 Trace ID 처리 방식에 변경이 생겨도 SNS 발행 로직 자체에는 영향을 미치지 않는다. 이는 코드의 유지 보수성을 높이고, 각 부분의 역할을 명확히 하는 데 도움이 된다.
결론적으로, publishNicknameToTopic 메소드에서 Trace ID 추출 로직을 제거하고 Span 생성 로직만 남긴 것은 단일 책임 원칙에 부합한다. 이는 메소드가 본연의 책임에 충실하면서도 필요한 추적 기능을 제공하도록 하여, 전체 시스템의 유연성과 관리 용이성을 향상시키는 것이다.
(SNS 발행 메소드 전체 코드)
import brave.Span;
import brave.Tracer;
import com.recipia.member.config.aws.AwsSnsConfig;
import com.recipia.member.exception.ErrorCode;
import com.recipia.member.exception.MemberApplicationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
@Slf4j
@RequiredArgsConstructor
@Service
public class SnsService {
private final SnsClient snsClient;
private final AwsSnsConfig awsSnsConfig;
private final Tracer tracer;
public PublishResponse publishNicknameToTopic(String message) {
// SNS 발행 Span 생성
Span span = tracer.nextSpan().name("SNS Publish").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// SNS 발행 요청 생성
PublishRequest publishRequest = PublishRequest.builder()
.message(message)
.topicArn(awsSnsConfig.getSnsTopicNicknameChangeARN())
.build();
// SNS 클라이언트를 통해 메시지 발행
PublishResponse response = snsClient.publish(publishRequest);
// messageId 로깅
log.info("[MEMBER] Published message to SNS with messageId: {}", response.messageId());
return response;
} catch (Exception e) {
span.tag("error", e.toString()); // 오류 태깅
throw new MemberApplicationException(ErrorCode.AWS_SNS_CLIENT);
} finally {
span.finish(); // Span 종료
}
}
}
4. 비즈니스 로직을 담당하는 SQS Listener에서 에러처리를 하지 않은 이유
이벤트 처리 과정에서 중요한 변경사항으로, 이벤트 저장소에 'is_batch'라는 새 컬럼을 추가했다.
이 변경은 우리 시스템의 두 가지 핵심적인 요소와 관련이 있다.
우리 시스템에서는 비즈니스 로직을 처리하는 SQS Listener가 Trace ID 없이 메시지를 처리할 때 에러를 던지지 않는데, 그 이유는 두 가지다
4-1. 첫 번째 이유: 이벤트 저장소 업데이트를 담당하는 별도의 SQS 리스너 존재
시스템에서 클라이언트의 닉네임 변경 HTTP 요청이 들어오면 이 도메인 행위(회원 테이블 닉네임 변경)는 '이벤트 저장소 저장 담당' 이벤트 리스너와 하나의 트랜잭션으로 처리된다. 도메인 행위 완료 후, 이 리스너('이벤트 저장소 저장 담당')는 이벤트를 published=false, isBatch=false 상태로 이벤트 저장소에 저장한다. 즉, 도메인 행위가 성공하면 무조건 이벤트 저장소에 이벤트가 추가된다는 뜻이다. 이어서, SNS 발행이 성공하면 멤버 서버에서 이 SNS를 구독하고 있던 '이벤트 저장소 업데이트 담당' SQS 리스너가 활성화되어 해당 데이터를 published=true로 업데이트한다.
메시지에 Trace ID가 없다면, 이 SQS 리스너('이벤트 저장소 업데이트 담당')는 DB 업데이트 로직을 수행하지 않는다. Trace ID가 없는 메시지는 제대로 발행되지 않았다고 판단되기 때문이다. 결과적으로 Trace ID가 없는 데이터는 published=false 상태로 남아있게 되며, 이는 배치 작업의 대상이 된다.
4-2. 두 번째 이유: 누락된 이벤트를 재발행하는 배치 작업
배치 작업은 published=false 상태인 모든 데이터를 이벤트 저장소에서 가져와 처리한다. 데이터의 attribute 필드에 Trace ID가 있다면, 바로 SNS 발행 메소드로 전달된다. 반면, Trace ID가 없는 경우에는 각 데이터마다 새로운 Trace ID를 생성해 추가하고, 이를 SNS 발행 메소드로 전달한다. 그 후, is_batch 값을 true로 설정하고, attribute에 새로 생성된 Trace ID를 추가하여 데이터를 업데이트한다.
이 과정은 Trace ID 없이 발행된 메시지도 효과적으로 처리하며, 누락된 이벤트는 재발행된다. 따라서, 비즈니스 로직을 처리하는 SQS 리스너('레시피 서버의 회원 닉네임 변경')에서 Trace ID가 없을 때 굳이 에러 처리를 하지 않는다. 배치 작업이 5분마다 실행되어, 빠른 시간 내에 정상적으로 SNS 발행이 이루어질 것이기 때문이다.
이러한 시스템 설계는 전체적인 효율성과 안정성을 높인다. 비록 Trace ID가 없이 발행되었던 메시지는 최초 HTTP 요청에서 생성된 Trace ID의 추적이 끊기고 배치 작업에서 새로 생성된 Trace ID로 추적되지만, 이는 문제가 되지 않는다.
왜냐하면 도메인 행위 이후의 로직, 즉 SNS 발행 이후의 로직이 중요하기 때문이다. 도메인 행위가 제대로 완료되었다는 것은 이미 이벤트 저장소에 데이터가 존재한다는 것으로 증명이 되었다. 그리고 우리는 Zipkin을 사용하는 이유가 서버 간 연동이 잘 되는지를 확인하기 위함이기 때문에 SNS 발행부터의 추적으로도 충분하다. 또한, 이벤트 저장소에서 published=true로 있는 데이터는 전부 Trace ID가 존재하기 때문에 바로 Zipkin UI에서 검색하여 추적할 수 있어 시스템의 전체적인 추적 가능성과 안정성이 유지된다.
이러한 과정을 통해, 우리 시스템은 누락된 이벤트를 효과적으로 처리하고, 전체 시스템의 안정성과 효율성을 보장한다.
💡 "이벤트 저장소에 published=false로 남아있는 데이터는 대체로 발행에 실패했거나 문제가 있는 데이터로 간주된다. 그런데 이 메시지를 기존의 Trace ID를 그대로 사용해 다시 SNS에 발행하면, 과연 이전과 같은 방식으로 추적이 이어질까?"라는 궁금증이 들 수 있다.
published=false로 남은 데이터에서 Trace ID가 없을 경우, 우리는 부득이하게 새로운 Trace ID를 생성한다. 하지만, 중요한 점은 이미 Trace ID가 있는 데이터에 대한 처리 방식이다.
만약 데이터에 Trace ID가 이미 있다면, 이 Trace ID를 그대로 사용하여 메시지를 다시 SNS에 발행한다.
이렇게 기존의 Trace ID를 사용하는 경우, 추적이 멈췄던 부분부터 다시 추적이 시작된다. 즉, 최초 HTTP 요청으로부터 시작된 추적 체인이 중단된 부분에서 다시 연결되어 전체 트랜잭션을 추적할 수 있게 된다. 이 과정은 시스템의 추적 가능성을 유지하며, 효율적인 문제 해결과 분석을 가능하게 한다.
따라서, Trace ID가 있는 경우 이를 활용하는 것은 시스템의 추적 능력을 극대화하는 중요한 전략이다. Trace ID가 없을 때와는 달리, 기존의 추적 체인을 유지하며 시스템의 동작을 더 명확하게 이해할 수 있게 된다.
(이벤트 저장소 업데이트 담당 SQS 리스너 전체 코드)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recipia.member.domain.event.MemberEventRecord;
import com.recipia.member.dto.SnsMessageDto;
import com.recipia.member.dto.SnsInformationDto;
import com.recipia.member.exception.ErrorCode;
import com.recipia.member.exception.MemberApplicationException;
import com.recipia.member.repository.MemberEventRecordRepository;
import com.recipia.member.utils.MemberStringUtils;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* MEMBER 서버에서 SNS가 잘 발행되었을때 DB MEMBER_EVENT_RECORD테이블에 true로 값 업데이트 해주는 SQS
* 단, Message 안에 traceId가 있을때만 정상 실행 되도록 구성
* traceId 없을때는 에러로그만 남기기 (왜냐하면, 어차피 배치 작업을 통해 SNS 재발행 프로세스가 진행되기 때문.)
*/
@Slf4j
@Transactional
@RequiredArgsConstructor
@Service
public class SqsEventRecordListener {
private final ObjectMapper objectMapper;
private final MemberEventRecordRepository memberEventRecordRepository;
@SqsListener(value = "${spring.cloud.aws.sqs.record-sqs-name}")
public void receiveSnsPublishedMessage(String messageJson) throws JsonProcessingException {
SnsInformationDto snsInformationDto = objectMapper.readValue(messageJson, SnsInformationDto.class);
SnsMessageDto snsMessageDto = objectMapper.readValue(snsInformationDto.Message(), SnsMessageDto.class);
// 'traceId' key 확인
if (snsMessageDto.traceId() == null) {
log.error("No traceId found in the message. memberId: {}, skipping processing.", snsMessageDto.memberId());
return;
}
String topicArn = snsInformationDto.TopicArn();
String topicName = MemberStringUtils.extractLastPart(topicArn);
Long memberId = snsMessageDto.memberId();
MemberEventRecord memberEventRecord = memberEventRecordRepository
.findFirstByMember_IdAndSnsTopicOrderByIdDesc(memberId, topicName)
.orElseThrow(() -> new MemberApplicationException(ErrorCode.EVENT_NOT_FOUND));
memberEventRecord.changePublished();
}
}
5. 마무리
사실 이번 로직을 통해 메시지에서 Trace ID가 누락될 가능성이 그리 크지 않음을 확인할 수 있었다. 하지만 이런 상황에 대비한 에러 핸들링을 구현하면서 단일 책임 원칙에 더욱 부합하는 코드 구조로의 개선이 가능했다는 점에서 큰 의미가 있다.
더욱이 이 과정을 통해 Zipkin 같은 분산 추적 시스템에 대한 이해도 깊어졌다. Zipkin의 역할과 중요성, 그리고 효과적인 추적 전략을 구현하는 방법에 대해 배울 수 있었다. 이러한 지식은 분산 시스템 환경에서의 문제 해결 능력을 한층 강화시켜 준다.
이제 다음 단계인 배치 작업의 구현에 집중할 시간이다. 배치 작업은 시스템의 안정성을 더욱 높이고, 이벤트 처리 효율성을 개선하는 중요한 부분이다. 이 부분에 대한 자세한 설명은 다음 포스팅에서 공유할 예정이다.
👉 ECS에 Zipkin 서버 올리는 내용이 궁금하다면?
[AWS] ECS에서 Zipkin을 통한 스프링 부트 서비스 트레이싱 구축하기
이 포스트는 Team chillwave에서 사이드 프로젝트 중 적용했던 부분을 다시 공부하며 기록한 것입니다.
시간이 괜찮다면 팀원 '개발자의 서랍'님의 블로그도 한번 봐주세요 :)
'Spring > Spring Boot' 카테고리의 다른 글
[Spring Boot] 스프링 부트 3.X 버전에서 Spring Batch 테이블 자동 생성 안되는 에러 (0) | 2023.11.27 |
---|---|
[Spring Boot] Spring Boot 3.X 버전에서 Spring Batch 적용하기 (0) | 2023.11.26 |
[Spring Boot] Zipkin에서 URL 추적 설정: Sampler와 Sleuth를 활용한 스프링 부트 적용 (0) | 2023.11.22 |
[Spring Boot] AWS 분산 시스템에서 Zipkin으로 로그 추적하기: SNS/SQS 통합 실전 적용 (1) | 2023.11.20 |
[Spring Boot] 스프링 부트에서 Feign 클라이언트 사용하기: 파라미터 처리 방식 이해하기 (1) | 2023.11.19 |