의존성 추가

implementation 'org.springframework.boot:spring-boot-starter-kafka'

설정하기

SFTP를 이용해 설치된 카프카 브로커 서버에서 발급된 truststore 키를 다운로드합니다.

 sftp -i kafka.pem<발급받은 인증서> ubuntu@13.124.253.55<카프카 서버 공인 IP>
 sftp> ls # truststore 키 경로 확인
 sftp> get kafka.server.truststore.jks # truststore 키 다운로드
 sftp> bye # 연결 종료

image.png

다운로드한 파일을 스프링 부트 프로젝트의 src/main/resources/ssl 경로에 복사해 넣습니다.

application.yaml 설정에 추가

spring:
  application:
    name: kafka
  kafka:
    # 1. 공통 접속 및 보안(SSL) 설정
    bootstrap-servers: 13.124.253.55:9092,3.35.173.176:9092,54.180.1.6:9092
    properties:
      security.protocol: SSL
      # 호스트네임 검증 알고리즘을 빈 값으로 설정하여 비활성화
      ssl.endpoint.identification.algorithm: ""
    ssl:
      # 파일 경로 (src/main/resources/ssl/ 하위에 파일이 있어야 함)
      trust-store-location: classpath:ssl/kafka.server.truststore.jks
      trust-store-password: _aA123456

    # 2. 프로듀서 설정 (신뢰성 & 비동기)
    producer:
      # acks=all: 리더와 모든 팔로워(ISR) 복제 확인
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        enable.idempotence: true # 멱등성 보장 (중복 방지), 기본값 true
        max.block.ms: 10000 # 연결 대기 시간을 10초로 제한
        max.in.flight.requests.per.connection: 5 # 순서 보장 및 성능 최적화
        retries: 2147483647 # 사실상 무한 재시도 (delivery.timeout.ms 내에서)
        #retries: 3 # 재시도 횟수
        delivery.timeout.ms: 120000 # 2분 동안은 전송 시도

    # 3. 컨슈머 및 컨슈머 그룹 설정
    consumer:
      # 컨슈머 그룹 ID 설정 (기본값)
      group-id: application-service-group # 적절한 컨슈머 그룹 설정 / 오프셋의 기준
      # 수동 커밋을 위해 자동 커밋 꺼짐
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 새로운 그룹이 시작할 때 오프셋 위치 (가장 처음부터)
      auto-offset-reset: earliest
      properties:
        # 세션 타임아웃: 컨슈머가 살아있는지 확인하는 시간 (30초)
        session.timeout.ms: 30000
        # 최대 폴링 간격: 로직 처리가 너무 길어지면 그룹에서 제외됨 (5분)
        max.poll.interval.ms: 300000

    # 4. 리스너(컨테이너) 설정
    listener:
      # 수동 커밋 모드 (acknowledge 호출 시 즉시 커밋)
      ack-mode: manual_immediate
      # 병렬 처리 스레드 개수 (파티션 개수에 맞춰 설정 권장)
      concurrency: 3

      # [DLQ 설정]
      # 총 시도 횟수: 1회 실행 + 2회 재시도 = 총 3회
      delivery-attempts: 3

      # 백오프 정책: 실패 시 다시 시도하기 전 대기 시간 정의
      back-off:
        initial-interval: 2000ms  # 첫 재시도 전 2초 대기
        multiplier: 2.0           # 실패할 때마다 대기 시간 2배 증가 (2s -> 4s -> 8s)
        max-interval: 10000ms     # 최대 대기 시간 10초로 제한

logging:
  level:
    org.springframework.kafka: DEBUG
    org.apache.kafka: INFO

DLQ Recoverer 등록 설정

**KafkaConfig.java**

package org.sparta.kafka;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;

@Configuration
public class KafkaConfig {
    @Bean
    public CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
        // 실패한 메시지를 {원래토픽}.DLT 로 전송하는 리커버러 생성
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

        // 에러 핸들러 등록 (yml에 정의한 back-off 설정을 자동으로 적용)
        return new DefaultErrorHandler(recoverer);
    }
}