implementation 'org.springframework.boot:spring-boot-starter-kafka'
SFTP를 이용해 설치된 카프카 브로커 서버에서 발급된 truststore 키를 다운로드합니다.
sftp -i kafka.pem<발급받은 인증서> ubuntu@13.124.253.55<카프카 서버 공인 IP>
sftp> ls # truststore 키 경로 확인
sftp> get kafka.server.truststore.jks # truststore 키 다운로드
sftp> bye # 연결 종료

다운로드한 파일을 스프링 부트 프로젝트의 src/main/resources/ssl 경로에 복사해 넣습니다.
application.yaml 설정에 추가
spring:
application:
name: kafka
kafka:
# 1. 공통 접속 및 보안(SSL) 설정
bootstrap-servers: 13.124.253.55:9092,3.35.173.176:9092,54.180.1.6:9092
properties:
security.protocol: SSL
# 호스트네임 검증 알고리즘을 빈 값으로 설정하여 비활성화
ssl.endpoint.identification.algorithm: ""
ssl:
# 파일 경로 (src/main/resources/ssl/ 하위에 파일이 있어야 함)
trust-store-location: classpath:ssl/kafka.server.truststore.jks
trust-store-password: _aA123456
# 2. 프로듀서 설정 (신뢰성 & 비동기)
producer:
# acks=all: 리더와 모든 팔로워(ISR) 복제 확인
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true # 멱등성 보장 (중복 방지), 기본값 true
max.block.ms: 10000 # 연결 대기 시간을 10초로 제한
max.in.flight.requests.per.connection: 5 # 순서 보장 및 성능 최적화
retries: 2147483647 # 사실상 무한 재시도 (delivery.timeout.ms 내에서)
#retries: 3 # 재시도 횟수
delivery.timeout.ms: 120000 # 2분 동안은 전송 시도
# 3. 컨슈머 및 컨슈머 그룹 설정
consumer:
# 컨슈머 그룹 ID 설정 (기본값)
group-id: application-service-group # 적절한 컨슈머 그룹 설정 / 오프셋의 기준
# 수동 커밋을 위해 자동 커밋 꺼짐
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 새로운 그룹이 시작할 때 오프셋 위치 (가장 처음부터)
auto-offset-reset: earliest
properties:
# 세션 타임아웃: 컨슈머가 살아있는지 확인하는 시간 (30초)
session.timeout.ms: 30000
# 최대 폴링 간격: 로직 처리가 너무 길어지면 그룹에서 제외됨 (5분)
max.poll.interval.ms: 300000
# 4. 리스너(컨테이너) 설정
listener:
# 수동 커밋 모드 (acknowledge 호출 시 즉시 커밋)
ack-mode: manual_immediate
# 병렬 처리 스레드 개수 (파티션 개수에 맞춰 설정 권장)
concurrency: 3
# [DLQ 설정]
# 총 시도 횟수: 1회 실행 + 2회 재시도 = 총 3회
delivery-attempts: 3
# 백오프 정책: 실패 시 다시 시도하기 전 대기 시간 정의
back-off:
initial-interval: 2000ms # 첫 재시도 전 2초 대기
multiplier: 2.0 # 실패할 때마다 대기 시간 2배 증가 (2s -> 4s -> 8s)
max-interval: 10000ms # 최대 대기 시간 10초로 제한
logging:
level:
org.springframework.kafka: DEBUG
org.apache.kafka: INFO
bootstrap-servers: 13.124.253.55:9093,54.180.1.6:9093,3.35.173.176:9093 : 설치된 카프카 브로커 서버 주소:포트번호를 설정합니다.
trust-store-password: _aA123456 : 발급할때 설정한 truststore 키 비밀번호를 입력합니다.
프로듀서 관련 보충 설명
acks
카프카에서 acks(Acknowledgments) 설정은 프로듀서가 메시지를 보낸 후 "제대로 저장되었다"는 확인 응답을 브로커로부터 언제 받을지 결정하는 핵심 파라미터입니다.
이 설정은 시스템의 데이터 신뢰성과 처리량 사이의 트레이드오프를 조정합니다.
acks = 0 (Fire and Forget)
프로듀서가 메시지를 네트워크를 통해 브로커에 보내기만 하면 성공으로 간주
acks = 1 (Leader Only)
메시지가 해당 파티션의 리더(Leader) 브로커에 적재되면 성공 응답을 받습니다.
acks = all 또는 -1 (Full Replication)
메시지가 리더뿐만 아니라 모든 ISR(In-Sync Replicas) 팔로워들까지 복제되어야 성공 응답을 받습니다.
멱등성 프로듀서 (Idempotent Producer)
acks: all 환경에서는 네트워크 오류 등으로 인해 프로듀서가 ACK를 받지 못하면 메시지를 다시 보냅니다. 이때 브로커가 이미 메시지를 저장했다면 데이터 중복이 발생할 수 있는데, 이를 방지하는 것이 멱등성 설정입니다.
동작 원리: 프로듀서가 메시지를 보낼 때 고유한 PID(Producer ID)와 시퀀스 번호(Sequence Number)를 함께 보냅니다. 브로커는 이 번호를 확인하여 이미 기록된 번호라면 저장하지 않고 ACK만 다시 보냅니다.
설정 방법: spring.kafka.producer.properties.enable.idempotence: true
acks: all 설정 시 스프링 부트 3.x 이상에서는 기본적으로 true로 설정되지만, 명시적으로 적어주는 것이 안전합니다.
효과: "정확히 한 번(Exactly Once)" 전달을 보장하는 기초가 됩니다.
재시도 및 순서 보장 (Retries & Max In-Flight)
데이터 유실을 막기 위해 전송 실패 시 다시 시도하는 설정입니다.
ISR (In-Sync Replicas)의 심화 이해
acks: all 에서 말하는 "모든 팔로워"는 실제로는 ISR에 속한 브로커들만을 의미합니다.
컨슈머 관련 보충 설명
group-id: 동일한 group-id를 가진 컨슈머들은 하나의 토픽 파티션을 나누어서 처리합니다. (로드 밸런싱) 만약 같은 데이터를 여러 서비스가 각각 받아야 한다면 group-id를 다르게 설정해야 합니다.
concurrency: 하나의 애플리케이션 안에서 몇 개의 스레드로 메시지를 동시에 처리할지 결정합니다. 보통 해당 토픽의 파티션 개수와 맞추는 것이 성능상 가장 효율적입니다.
max.poll.interval.ms: 수동 커밋을 사용할 때 주의할 점입니다. 메시지 처리 로직이 이 설정 시간보다 오래 걸리면, 카프카는 해당 컨슈머가 죽은 것으로 간주하고 리밸런싱(Rebalancing)을 일으킵니다. 복잡한 로직을 수행한다면 이 값을 충분히 늘려주어야 합니다.
isolation.level: read_committed : 카프카 브로커에 메시지가 들어와 있더라도 최종 커밋이 되기 전까지는 컨슈머가 이를 가져가지 않고 대기합니다.
주의사항
package org.sparta.kafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
@Configuration
public class KafkaConfig {
@Bean
public CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
// 실패한 메시지를 {원래토픽}.DLT 로 전송하는 리커버러 생성
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// 에러 핸들러 등록 (yml에 정의한 back-off 설정을 자동으로 적용)
return new DefaultErrorHandler(recoverer);
}
}