Message queuing Service
카프카를 실행해 보았다면 스프링 부트와 연결해보자
디펜던시 추가부터 시작한다.
implementation 'org.springframework.kafka:spring-kafka'
먼저
Kafka Consumer 에 대해 구성해보자
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
보면 알겠지만 카프카 Consumer Factory 에 서버config, groupConfig, 키와 밸류 config 를 등록한다.
또한 가져올 타입도 지정한다.
다음으로 카프카 컨데이너 팩토리에 등록해준다.
@Slf4j
@Service
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository catalogRepository){
this.repository = catalogRepository;
}
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("Kafka message -> "+ kafkaMessage);
Map<Object,Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
if(entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
repository.save(entity);
}
}
}