SNS MessageAttributes를 이용한 분산 시스템 추적용 Trace Id 전달 방법
프로젝트를 진행하면서 기존에 진행하던 방법에 많은 에러사항이 있다는 걸 느끼고 SNS, SQS를 사용하면서 Trace Id를 전달하는 새로운 방식인 MessageAttributes를 적용한 과정을 설명해 보겠다.
기존 로직과 코드에 대해서 상세한 설명은 아래 글에 작성되어 있다.
[Spring Boot] 분산 시스템에서의 SNS/SQS 메시지 처리: 단일 책임 원칙과 Zipkin의 Trace ID 에러 핸들링을 중심으로
1. 이벤트 저장소에 이벤트 저장하는 로직 수정
기존에는 memberId와 traceId를 JSON 형태의 문자열로 결합하여 단일 attribute 컬럼에 저장했다. 이 방식은 단순하고 직관적이지만, 몇 가지 문제점을 내포하고 있다.
먼저, 데이터 정규화의 원칙에 어긋나며, 이는 데이터 중복과 불필요한 복잡성을 증가시킨다. 두 번째로, JSON 형태로 데이터를 저장하면 SQL 쿼리의 효율성이 떨어진다. 특정 traceId를 조건으로 데이터를 조회할 때, JSON 문자열을 파싱해야 하므로 성능에 부담을 준다.
따라서, traceId를 별도의 컬럼으로 분리하는 것은 이러한 문제들을 해결하는 효과적인 방법이다. 새로운 trace_id 컬럼을 도입함으로써 데이터 정규화를 개선하고, SQL 쿼리의 성능과 유지보수성을 향상시킬 수 있다.
이 변경으로, memberId는 메시지에 필수적인 데이터로 남게 되며, traceId는 로그 추적을 위한 독립적인 데이터 요소로 취급되어, 각각의 용도에 맞게 효율적으로 사용될 수 있게 된다. 이런 구조적 개선은 데이터 관리의 효율성을 높이고, 시스템의 확장성과 유연성을 강화하는 데 기여한다.
그리고 이벤트를 저장하기 전에 기존에 동일한 SNS 토픽으로 동일한 회원의 이벤트 데이터중에서 'published = false'로 남아있는 데이터를 전부 'published = true'로 바꿔주는 로직도 추가해 줬다.
1-1. 이벤트 저장소 테이블 수정
아래는 기존 is_batch 컬럼이 제거되고 trace_id라는 새로운 컬럼이 추가된 이벤트 저장소 테이블 모습이다.
1-2. 이벤트 저장 로직 전에 기존에 미발행 처리 되었던 이벤트 발행 처리하는 로직 추가
아래 코드는 이벤트 저장소에서 동일한 이벤트와 동일한 회원에 대해서 미발행된 이벤트들을 전부 발행처리로 바꿔주는 로직을 추가한 모습이다.
여기서 중요한 점은 회원의 닉네임 변경 이벤트를 처리하기 전에, 같은 회원의 이전 미발행 닉네임 변경 이벤트들을 발행 상태로 전환하는 것이다.
memberId가 2인 회원의 닉네임 변경 이벤트를 예로 들어 설명하자면, 이 회원이 닉네임을 변경할 때마다 새로운 이벤트(memberId=2, Topic=NicknameChange, published=false)가 저장소에 기록된다. 이때, 저장소에는 이전에 발생했으나 아직 발행되지 않은('published = false' 상태인) 닉네임 변경 이벤트들이 존재할 수 있다.
새로운 닉네임 변경 이벤트가 저장소에 추가되면 이전에 저장된 미발행 이벤트들이 더 이상 중요하지 않다. 이미 동일한 유형의 이벤트가 이벤트 저장소에 추가되었기 때문에 가장 최신 이벤트의 성공 여부만 확인하면 되기 때문이다.
만약 가장 최신 이벤트가 발행에 성공했다면 ('published = true' 상태) 외부 서버는 이미 가장 최신의 닉네임으로 업데이트를 완료한다. 이는 'zero-payload' 방식을 적용했기 때문에 가능한 일이다.
외부 서버는 memberId만을 메시지로 받기 때문에, '닉네임 변경' 이벤트가 성공적으로 발행되면, 외부 서버는 회원 서버에 feign 클라이언트 요청을 통해 'memberId가 2인 회원의 닉네임이 무엇인지'를 다시 확인한다. 따라서 최근에 발행된 이벤트가 성공했다면, 외부 서버 역시 가장 최신의 닉네임 정보를 받게 된다.
만약 가장 최신 이벤트가 발행에 실패했더라도 'published = false'로 데이터가 남아있기 때문에 이벤트가 유실될 일은 없다. 나중에 배치 서버에서 published = false인 데이터를 모아 해당 이벤트를 다시 SNS 재발행해주기 때문이다.
결국, 새로운 닉네임 변경 이벤트를 저장하기 전에 이전의 memberId가 2인 회원의 미발행 닉네임 변경 이벤트들을 'published = true'로 업데이트하는 것은, 데이터의 일관성을 유지하면서 최신 데이터에 대한 접근을 보장하는 효율적인 방법이다.
1-3. attribute 컬럼에는 memberId만 주입
기존에는 attribute 컬럼에 memberId와 traceId를 전부 주입했는데 이제는 memberId만 주입하는 코드로 수정됐다.
(이벤트 저장소에 이벤트를 저장하는 전체 코드)
import brave.Tracer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.recipia.member.config.aws.AwsSnsConfig;
import com.recipia.member.domain.Member;
import com.recipia.member.domain.event.MemberEventRecord;
import com.recipia.member.exception.ErrorCode;
import com.recipia.member.exception.MemberApplicationException;
import com.recipia.member.repository.MemberEventRecordRepository;
import com.recipia.member.repository.MemberRepository;
import com.recipia.member.utils.CustomJsonBuilder;
import com.recipia.member.utils.MemberStringUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@RequiredArgsConstructor
@Component
public class SpringEventRecordListener {
private final MemberRepository memberRepository;
private final MemberEventRecordRepository memberEventRecordRepository;
private final AwsSnsConfig awsSnsConfig;
private final CustomJsonBuilder customJsonBuilder;
private final Tracer tracer;
/**
* 이벤트를 호출한 서비스 코드의 트랜잭션과 묶이게 된다.
*/
@Transactional
@EventListener
public void eventRecordListener(NicknameChangeSpringEvent event) throws JsonProcessingException {
Member member = memberRepository.findById(event.memberId()).orElseThrow(() -> new MemberApplicationException(ErrorCode.USER_NOT_FOUND));
// 현재 TraceID 추출
String traceId = tracer.currentSpan().context().traceIdString();
// attribute memberId 주입
String attribute = customJsonBuilder
.add("memberId", member.getId().toString())
.build();
String topicName = MemberStringUtils.extractLastPart(awsSnsConfig.getSnsTopicNicknameChangeARN());
List<MemberEventRecord> memberEventFalseList = memberEventRecordRepository.findByMember_IdAndSnsTopicAndPublished(member.getId(), topicName, false);
// forEach 메소드는 리스트가 비어있을 경우, 아무 작업도 수행하지 않기 때문에,
// 빈 리스트에 대한 명시적 체크는 불필요하다.
memberEventFalseList.forEach(MemberEventRecord::changePublishedToTrue);
MemberEventRecord memberEventRecord = MemberEventRecord.of(
member,
topicName,
"NicknameChangeEvent",
attribute,
traceId,
false,
null
);
memberEventRecordRepository.save(memberEventRecord);
}
}
2. SNS 발행할 때 Trace Id 전달 방법을 messageAttributes로 수정
기존 로직에서는 SNS 메시지를 발행할 때 traceId를 메시지 본문에 포함시켜 전달했다. 그러나 이런 방식은 설계상 몇 가지 문제점을 내포하고 있다.
먼저, 메시지 본문에 traceId 같은 로그 추적용 데이터를 포함시키는 것은 메시지의 본래 목적과 맞지 않는다. 메시지 본문은 도메인 행위에 필요한 정보를 전달하는 데 집중되어야 하며, 추적용 데이터를 포함시키는 것은 메시지의 명확성과 가독성을 해칠 수 있다.
또한, traceId와 같은 메타데이터를 메시지 본문에 포함시키는 것은 메시지의 구조를 불필요하게 복잡하게 만들며, 이로 인해 메시지 처리 로직이 더 복잡해질 수 있다.
우리가 그동안 traceId를 메시지에 포함시킨 이유는 SNS에 HTTP 요청의 header와 같은 기능이 없어서였다. 이는 SNS 메시지 처리할 때 메타데이터를 별도로 관리할 방법이 없다는 한계를 보완하기 위한 임시방편이었다.
그러나 최근에 SNS 메시지 처리에서 HTTP header와 유사한 기능을 하는 'messageAttributes'라는 설정이 있다는 것을 발견했다. 이 설정을 활용하면 메시지 본문에서 traceId를 분리하여 메시지의 구조를 단순화하고, 동시에 효율적으로 메타데이터를 관리할 수 있다.
💡 HTTP 요청의 헤더와 AWS SNS의 messageAttributes는 비슷한 개념을 가지고 있다. 둘 다 메시지 또는 요청의 메인 콘텐츠와는 별도로 추가적인 메타데이터를 전달하는 역할을 한다.
HTTP 요청에서 헤더는 클라이언트와 서버 간의 통신에서 요청의 성격, 내용의 타입, 캐싱 정책 등과 같은 정보를 담는다. 이러한 정보는 요청의 본문에 영향을 주지 않으면서 통신의 맥락을 제공한다.
AWS SNS의 messageAttributes도 유사한 방식으로 작동한다. 이것은 메시지의 본문을 수정하지 않고도 필요한 추가 정보를 전달할 수 있는 수단을 제공한다.
예를 들어, Zipkin 추적용 trace ID를 messageAttributes에 포함시키면, 메시지를 수신하는 서비스는 이 정보를 사용하여 전체 시스템에서의 메시지 흐름을 추적할 수 있다. 이 방법을 사용하면 메시지 본문의 순수성과 명확성을 유지하면서도 필요한 메타데이터를 전달할 수 있다. 이는 시스템의 각 구성 요소가 서로의 콘텐츠를 더욱 효과적으로 이해하고 처리할 수 있도록 해준다.결론적으로, HTTP 요청의 헤더와 AWS SNS의 messageAttributes는 메시지나 요청의 주된 내용과 분리된 상태에서 추가적인 정보를 제공함으로써, 통신 프로세스를 더욱 효율적이고 명확하게 만드는 역할을 한다. 이러한 방식은 메시지 본문의 무결성을 보장하고, 시스템 전반에서의 데이터 흐름을 보다 쉽게 관리하고 추적할 수 있게 해준다.
2-1. SNS 메시지 생성할 때 MessageAttribute에 traceId 추가
아래는 MessageAttribute로 traceId를 전달하는 코드다.
(SNS 발행 전체 코드)
import brave.Span;
import brave.Tracer;
import com.recipia.member.config.aws.AwsSnsConfig;
import com.recipia.member.exception.ErrorCode;
import com.recipia.member.exception.MemberApplicationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Service
public class SnsService {
private final SnsClient snsClient;
private final AwsSnsConfig awsSnsConfig;
private final Tracer tracer;
public PublishResponse publishNicknameToTopic(String message, String traceId) {
// SNS 발행 Span 생성
Span span = tracer.nextSpan().name("SNS Publish").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("traceId", MessageAttributeValue.builder()
.dataType("String")
.stringValue(traceId)
.build());
// SNS 발행 요청 생성
PublishRequest publishRequest = PublishRequest.builder()
.message(message)
.messageAttributes(messageAttributes)
.topicArn(awsSnsConfig.getSnsTopicNicknameChangeARN())
.build();
// SNS 클라이언트를 통해 메시지 발행
PublishResponse response = snsClient.publish(publishRequest);
// messageId 로깅
log.info("[MEMBER] Published message to SNS with messageId: {}", response.messageId());
return response;
} catch (Exception e) {
span.tag("error", e.toString()); // 오류 태깅
throw new MemberApplicationException(ErrorCode.AWS_SNS_CLIENT);
} finally {
span.finish(); // Span 종료
}
}
}
2-2. Message 전체 내용
이렇게 생성된 메시지는 아래와 같은 JSON 형태로 발행된다.
{
"Type" : "Notification",
"MessageId" : "8f53688f-42c1-508a-851d-355246fa17ad",
"TopicArn" : "arn:aws:sns:<REGION>:<ACCOUNT_ID>:<SNS_NAME>",
"Message" : "{\"memberId\":\"2\"}",
"Timestamp" : "2023-11-26T08:29:47.128Z",
"SignatureVersion" : "1",
"Signature" : "T2m6iF1Rx9T9N4erfmmASDFMOktNJv7vBy+TYcgI/FKQlFeblyUoz3Jf7Yxn5d0pGRYlzEzxeCQVVt2SmNszpsxzkIwV70N0h3kxCxCrvS2BZkL2Rm/7fHMANMu0cPY+VSx+3KDS1Z3MNULAo5L8Gwwxm0eVGdQrV3oNwBjRgtkty3pOwkvouywwylWFAiOo8etAcjnCurh3kzcavLmwjt4bMp/9abdVi1kJ4t5l6ZJ2Szvrhog9MYw/5E4ua7odnrLs4silcyqUMKzGahPQINVj51UCov9X0JBksLWEd6KhNaUvRy6vtA==",
"SigningCertURL" : "https://sns.<REGION>.amazonaws.com/SimpleNotificationService-@#$.pem",
"UnsubscribeURL" : "https://sns.<REGION>.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=ASDF",
"MessageAttributes" : {"traceId" : {"Type":"String","Value":"656301fa05eecf2b120320d40e5e650c"}}
}
3. 비즈니스 로직 담당 SQS 리스너에서 Trace Id 추출 및 에러 핸들링 수정
기존에는 메시지 본문을 DTO로 파싱 해서 memberId, traceId를 추출했다. 그리고 만약 traceId가 없다면 해당 로직을 실행하지 않고 그대로 return 시켰다.
이제는 메시지 본문에서는 memberId만 추출하고, traceId는 MessageAttributes에서 추출하는 로직으로 수정했다.
그리고 traceId가 없다면 생성해 주고 계속 프로세스를 이어나가는 로직으로 수정했다.
3-1. messageJson에서 traceId 추출
아래 코드는 messageJson에서 MessageAttributes에 들어있는 traceId를 DTO를 이용해 파싱 하는 로직이다.
3-2. buildTraceContext 메소드에서 새로운 Trace Id 생성 로직 추가
TraceContext를 생성해 주는 buildTraceContext 메소드에서 만약 파라미터로 받은 traceId가 없다면 새로 생성해주는 코드를 추가했다. 이렇게 함으로써 traceId가 없이 메시지가 발행되었어도 이 메소드 이후부터는 새로 생긴 traceId로 추적이 이어진다.
새로 생성한 SnsNotificationDto: SNS 메시지의 전반적인 구조를 나타낸다. 여기에는 메시지 타입, ID, 본문 등이 포함된다.
public record SnsNotificationDto(
String Type,
String MessageId,
String TopicArn,
String Message,
String Timestamp,
String SignatureVersion,
String Signature,
String SigningCertURL,
String UnsubscribeURL,
MessageAttributesDto MessageAttributes
) {
}
새로 생성한 MessageAttributesDto: 메시지 속성을 나타내며, 여기에는 TraceIdDto가 포함된다.
public record MessageAttributesDto(
TraceIdDto traceId
) {
}
새로 생성한 TraceIdDto: 추적 ID의 타입과 값으로 구성된다.
public record TraceIdDto(
String Type,
String Value
) {
}
새로 생성한 MessageMemberIdDto: memberId로 구성된다.
public record MessageMemberIdDto(Long memberId) {
}
(비즈니스 로직 담당 SQS Listener 전체 코드)
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recipia.recipe.dto.message.*;
import com.recipia.recipe.event.springevent.NicknameChangeEvent;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@Slf4j
@RequiredArgsConstructor
@Service
public class AwsSqsListenerService {
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher eventPublisher;
private final Tracer tracer;
/**
* 아래의 Sqs리스너는 멤버 서버로 FeignClient 요청을 하는 스프링 이벤트를 발행한다.
*
*/
@SqsListener(value = "${spring.cloud.aws.sqs.nickname-sqs-name}")
public void receiveMessage(String messageJson) throws JsonProcessingException {
SnsNotificationDto snsNotificationDto = objectMapper.readValue(messageJson, SnsNotificationDto.class);
TraceIdDto traceIdDto = snsNotificationDto.MessageAttributes().traceId();
String traceId = traceIdDto.Value();
MessageMemberIdDto snsMessageDto = objectMapper.readValue(snsNotificationDto.Message(), MessageMemberIdDto.class);
// 이전 서버에서 보낸 traceId를 사용하여 새로운 TraceContext를 생성
TraceContext context = buildTraceContext(traceId);
Span span = tracer.nextSpan(TraceContextOrSamplingFlags.create(context))
.name("[RECIPE] nickname-change SQS Received")
.start();
// 추출된 memberId로 이벤트 발행 및 로깅
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
eventPublisher.publishEvent(new NicknameChangeEvent(snsMessageDto.memberId()));
} catch (Exception e) {
span.tag("error", e.toString());
log.error("Error processing SQS message: ", e);
} finally {
span.finish();
}
}
/**
* [Extract Method] - TraceContext 생성
* 멤버 서버에서 받은 traceId를 TraceContext안에 세팅해 준다.
*/
private TraceContext buildTraceContext(String traceId) {
if (traceId == null) {
// 새로운 Span 생성
Span newSpan = tracer.newTrace();
// 새로운 traceId 추출
traceId = newSpan.context().traceIdString();
}
TraceContext.Builder contextBuilder = TraceContext.newBuilder();
// traceId의 길이가 32자리인 경우 128비트 traceId로 처리하고, 그렇지 않은 경우 64비트 traceId로 처리
if (traceId.length() == 32) {
long traceIdHigh = Long.parseUnsignedLong(traceId.substring(0, 16), 16);
long traceIdLow = Long.parseUnsignedLong(traceId.substring(16), 16);
contextBuilder.traceIdHigh(traceIdHigh).traceId(traceIdLow);
} else {
long traceIdLow = Long.parseUnsignedLong(traceId, 16);
contextBuilder.traceId(traceIdLow);
}
// 새로운 Span ID를 생성 하고 TraceContext 객체를 빌드해서 반환한다.
contextBuilder.spanId(tracer.nextSpan().context().spanId());
return contextBuilder.build();
}
}
그동안 SNS를 사용하면서 '이렇게 message에 traceId를 직접 넣는 게 올바른 방식인가?'에 대한 끊임없이 질문을 했었지만 답을 못 찾았었다.
그러나 새로운 방식을 발견하고 적용해 보면서, 그 의문을 해소할 수 있었다. 이 방식은 기존의 문제점들을 해결하는 동시에, 데이터 관리의 효율성과 시스템의 확장성을 강화시켰다.
이 변경을 통해 메시지 처리의 정확성이 크게 향상되었으며, 시스템 전반의 추적과 로깅이 더욱 효과적으로 이루어질 수 있게 되었다. 이 경험은 개발 과정에서 마주하는 어려움에 대해 지속적으로 질문하고, 새로운 해결책을 탐색하는 중요성을 다시 한번 일깨워 주었다. 앞으로도 이러한 태도를 계속 유지하며, 지속적으로 발전해 나가고자 한다.
이 포스트는 Team chillwave에서 사이드 프로젝트 중 적용했던 부분을 다시 공부하며 기록한 것입니다.
시간이 괜찮다면 팀원 '개발자의 서랍'님의 블로그도 한번 봐주세요 :)
'Spring > Spring Boot' 카테고리의 다른 글
[Spring Boot] Spring Boot 3.X 버전에 p6spy 적용하기 (0) | 2023.12.08 |
---|---|
[Spring Boot] 스프링 배치와 JPA를 활용해 누락된 SNS 이벤트 재발행 (0) | 2023.11.29 |
[Spring Boot] 스프링 배치에서 chunk와 JPA pageSize 설정의 관계성 (0) | 2023.11.27 |
[Spring Boot] 스프링 부트 3.X 버전에서 Spring Batch 테이블 자동 생성 안되는 에러 (0) | 2023.11.27 |
[Spring Boot] Spring Boot 3.X 버전에서 Spring Batch 적용하기 (0) | 2023.11.26 |