스프링 부트에서 Kafka로 메시지 전송하기: 실시간 에러 로그 처리
📌 서론
이전 글에서 docker-compose로 로컬 환경에 카프카를 실행했다.
스프링 부트 프로젝트에서 에러가 발생하면 Slack으로 알림을 보내기 위해서 Kafka를 사용하기 위해서 이전 글에서 로컬 환경에서 docker-compose로 카프카를 실행했다. 이제 스프링 부트에 에러 핸들링하는 부분에 카프카 메시지를 전송(producer)하는 코드를 추가하는 과정을 함께 살펴보자
🔻 macOS 환경에서 도커로 카프카를 실행하는 방법이 궁금하다면? 🔻
macOS에서 docker-compose로 Kafka 설치하기 (Zookeeper 사용 X)
스프링 부트 의존성 주입
우리 프로젝트는 스프링 부트 3.1.2 버전을 사용하고 있다.
build.gradle에 spring-kafka 추가
build.gradle에 spring-kafka를 추가해 주자. (스프링 부트라 따로 버전 명시는 하지 않았다.)
// kafka
implementation 'org.springframework.kafka:spring-kafka'
위 코드를 추가하고 Load Gradle Changes를 클릭해주면 kafka-clients 라이브러리가 추가된 모습을 확인할 수 있다.
스프링 부트와 호환되는 버전은 다음 링크에서 확인할 수 있다.
https://spring.io/projects/spring-kafka
kafka-clients가 다운로드된 이유는 'Apache Kafka용 Spring은 순수 Java kafka-clients jar를 기반으로 합니다. (Spring for Apache Kafka is based on the pure java kafka-clients jar.)'라고 해당 페이지에 명시되어 있다.
우리는 스프링 부트 3.1.2 버전을 사용하기 때문에 kafka-clients:3.4.1 버전이 다운로드됐다.
application.yml에 kafka 환경 변수 추가
application.yml에 kafka 환경 변수를 추가해주자.
kafka:
bootstrap-servers: localhost:10000,localhost:10001,localhost:10002 # 카프카 서버 주소
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: error-handler-group
auto-offset-reset: earliest
template:
default-topic: error-messages
- 공통 설정:
- bootstrap-servers: Kafka 클러스터에 접속하기 위한 서버 주소 목록이다. 이 설정은 프로듀서와 컨슈머 모두에 공통으로 적용되어, 모두가 동일한 Kafka 클러스터에 연결되도록 한다. (포트는 docker-compose에 명시한 포트로 작성하면 된다.)
- bootstrap-servers: Kafka 클러스터에 접속하기 위한 서버 주소 목록이다. 이 설정은 프로듀서와 컨슈머 모두에 공통으로 적용되어, 모두가 동일한 Kafka 클러스터에 연결되도록 한다. (포트는 docker-compose에 명시한 포트로 작성하면 된다.)
- Producer 설정:
- key-serializer: 메시지의 키를 직렬화하는 방법을 지정한다. Kafka는 메시지의 키와 값을 바이트 배열로 변환해야 하므로, StringSerializer를 사용해 문자열 키를 직렬화한다. org.apache.kafka.common.serialization.StringSerializer를 사용하여 문자열 키를 직렬화한다.
- value-serializer: 메시지의 값을 직렬화하는 방법을 지정한다. 이것도 StringSerializer를 사용해 문자열 값을 직렬화한다.
- Consumer 설정:
- group-id: 컨슈머 그룹의 ID를 지정한다. 같은 group-id를 가진 컨슈머들은 메시지를 공유하여 처리하므로, 메시지가 중복 처리되지 않도록 한다.
- auto-offset-reset: 컨슈머 그룹이 이전에 오프셋을 커밋한 적이 없거나, 현재 오프셋이 더 이상 유효하지 않은 경우 어떤 오프셋부터 메시지를 소비할지 결정한다. earliest로 설정하면 가장 초기의 미처리 메시지부터 시작하여 소비한다.
- Template 설정:
- default-topic: KafkaTemplate을 사용해 메시지를 발행할 때 기본적으로 사용할 토픽의 이름을 지정한다. 여기서는 error- messages라는 토픽을 기본값으로 사용한다.
이 설정들은 Spring Boot 애플리케이션에서 Kafka와의 통신을 위해 필요한 기본적인 구성 요소들이다. 각 설정은 애플리케이션의 요구 사항에 맞게 조정될 수 있으며, Kafka 클러스터의 구성이나 메시지 처리 로직에 따라 추가적인 설정이 필요할 수도 있다.
현재 프로젝트에서 카프카를 사용하려는 이유는 에러 로그를 실시간으로 Slack에 알림으로 띄우기 위함이다.
에러가 발생할 때 카프카로 메시지를 전송하기 위해서는 기존에 우리가 사용하고 있던 에러 핸들링에 카프카 메시지 전송 로직을 추가해줘야 한다.
기존 에러 핸들링 수정
기존에 우리는 에러를 중앙에서 처리하기 위해 @RestControllerAdvice 어노테이션을 사용했다.
수정 사항
- 기존에 우리가 정의한 MemberApplicationException에서 500 에러는 슬랙으로 에러 알림을 보낸다.
- 우리가 정의하지 않은 에러가 발생했을 때도 슬랙으로 에러 알림을 보낸다.
@RestControllerAdvice 클래스에 KafkaTemplate 의존성 추가
사용자 정의 Exception에서 500 에러일 때 error-messages 토픽으로 메시지 전송
그 외 에러에도 kafka 메시지 발행 코드 추가
- NullpointException 처리
- RuntimeException 처리
- IllegalArgumentException 처리 (사실 Bad Request는 500대 에러는 아니지만 일단 카프카로 메시지를 전송하는 코드를 추가해 줬다.)
스프링 부트와 카프카 연동 확인
이제 스프링 부트에서 일부러 에러를 발생시켰을 때 카프카로 메시지가 잘 전송되는지 확인해 보자
만약 Kafka에 토픽이 없다면?
만약에 Kafka에 존재하는 토픽이 없다면 다음 명령어를 Kafka 컨테이너 내부에서 실행하면 토픽이 생성된다.
kafka-topics.sh --create --topic <토픽명> --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092 --partitions 3 --replication-factor 2
다음 캡처는 Kafka 컨테이너 내부에서 error-messages 토픽을 생성하는 모습이다. 'Created topic <토픽명>'이 뜨면 성공한 거다.
카프카 컨슈머 실행
이제 내가 스프링 부트에서 error-messages 토픽에 메시지를 전송하면 이 메시지를 소비할 컨슈머를 실행해 보자.
새로운 터미널을 열고 다음 명령어를 실행하면 해당 토픽의 메시지를 받는 컨슈머가 실행된다.
kafka-console-consumer.sh --topic <토픽명> --from-beginning --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
이제 해당 토픽으로 메시지가 전송되면 자동으로 하단에 메시지가 출력된다.
스프링 부트에서 에러 발생
강제로 스프링 부트에서 500 에러를 발생시켜 보자.
1) RuntimeException 발생
스프링 부트에 다음과 같이 강제로 RuntimeException 에러를 발생시키면 내가 작성한 대로 메시지가 잘 전달된 걸 확인할 수 있다.
throw new RuntimeException("런타임 에러 발생");
2) IllegalArgumentException 발생
throw new IllegalArgumentException("IllegalArgumentException 에러 발생");
3) MemberApplicationException 발생
사용자 정의에서도 500 에러를 발생시켜 보자.
throw new MemberApplicationException(ErrorCode.MAIL_SEND_FAILED);
이렇게 Kafka와 스프링 부트가 잘 연동된 걸 확인했다.
이제 다음글에서 스프링 부트와 슬랙을 연동해서 카프카 error-messages 토픽으로 메시지가 들어오면 이 메시지를 슬랙으로 알림을 보내는 코드를 추가해 보자!
다른 팀원인 "개발자의 서랍"님의 블로그도 한번 방문해 보세요! 좋은 글이 많이 있습니다 :)
'Tools > Kafka (카프카)' 카테고리의 다른 글
스프링 부트, 카프카, 슬랙 연동 - 실시간 에러 알림 시스템 구축 (3) | 2024.02.19 |
---|---|
macOS에서 docker-compose로 Kafka 설치하기 (Zookeeper 사용 X) (1) | 2024.02.17 |