4-4) 스프링 카프카
스프링 카프카 : 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리를 의미함
4-4-1) 스프링 카프카 프로듀서
카프카 템플릿 클래스를 통해서 데이터를 전송할 수 있음.
[카프카 템플릿을 사용하는 방법]
1) 기본 카프카 템플릿을 사용하는 방법
[application.yaml]
spring:
kafka:
producer:
bootstrap-servers: my-kafka:9092
acks: all
@SpringBootApplication
public class DefaultKafkaProducer implements CommandLineRunner {
private static String TOPIC_NAME = "test";
@Autowired
private KafkaTemplate<Integer, String> template;
public static void main(String[] args) {
SpringApplication application = new SpringApplication(DefaultKafkaProducer.class);
application.run(args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
template.send(TOPIC_NAME, "test"+i);
}
System.exit(0);
}
}
2) 직접 프로듀서 팩토리로 카프카 템플릿을 생성하는 방법
커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것을 의미함
예를 들어 A클러스터로 전송하는 카프카 프로듀서와 B클러스터로 전송하는 카프카 프로듀서를 동시에 사용하고 싶은 경우 커스텀 카프카 템플릿을 통해 2개의 카프카 템플릿을 빈으로 등록해서 사용할 수 있음
@Configuration
public class KafkaTempleteConfiguration {
@Bean
public KafkaTemplate<String, String> customKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(pf);
}
}
빈 객체로 사용할 KafkaTemplate 외에도 ReplyingKafkaTemplate(컨슈머가 특정 데이터를 전달받았는지 여부 확인 가능), RoutingKafkaTemplate(전송하는 토픽별로 옵션을 다르게 설정할 수 있음)도 제공함
@SpringBootApplication
public class CustomKafkaProducer implements CommandLineRunner {
private static String TOPIC_NAME = "test";
@Autowired
private KafkaTemplate<String, String> customKafkaTemplate; //ㅋㅓ스텀한 카프카 템플릿 빈객체 네임이랑 동일해야됨!!
public static void main(String[] args) {
SpringApplication application = new SpringApplication(CustomKafkaProducer.class);
application.run(args);
}
@Override
public void run(String... args) throws Exception {
ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
future.addCallback(new KafkaSendCallback<String, String>() {
@Override
public void onFailure(KafkaProducerException ex) {
}
@Override
public void onSuccess(SendResult<String, String> result) {
}
});
System.exit(0);
}
}
ListenableFuture 인스턴스를 통해 비동기로 전송 후 정상 적재여부를 확인할 수 있음
4-4-2) 스프링 카프카 컨슈머
스프링 카프카 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋은 7가지로 나누어 세분화된 개념이 적용되어 있음.
타입 : 레코드 리스너(1개씩 읽어서 처리), 배치 리스너(n개씩 읽어서 처리)
커밋 : RECORD(레코드 단위로 프로세싱 이후 커밋), BATCH(poll 메소드로 호출된 레코드가 모두 처리된 이수 커밋), TIME(특정시간 이후 커밋), COUNT(특정 개수만큼 처리 후 커밋), COUNT_TIME(TIME, COUNT 둘중 한개라로 맞는게 있음 커밋), MANUAL(Acknowledgement.acknowledge 메소드 호출 후 다음번 poll 때 커밋), MANUAL_IMMEDIATE(Acknowledgement.acknowledge 메소드 호출 즉시 커밋)
커밋 = ackMode
기본값은 BATCH이며 enable.auto.commit = False임.
리스너를 사용하는 방법
1. 기본 리스너 컨테이너 사용
[application.yaml]
spring:
kafka:
consumer:
bootstrap-servers: my-kafka:9092
listener:
ack-mode: MANUAL_IMMEDIATE
type: SINGLE
@SpringBootApplication
public class DefaultKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(DefaultKafkaConsumer.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(DefaultKafkaConsumer.class);
application.run(args);
}
@KafkaListener(topics = "test",
groupId = "test-group-00")
public void recordListener(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
@KafkaListener(topics = "test",
groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
logger.info(messageValue);
}
@KafkaListener(topics = "test",
groupId = "test-group-02",
properties = {
"max.poll.interval.ms:60000",
"auto.offset.reset:earliest"
})
public void singleTopicWithPropertiesListener(String messageValue) {
logger.info(messageValue);
}
@KafkaListener(topics = "test",
groupId = "test-group-03",
concurrency = "3")
public void concurrencyTopicListener(String messageValue) {
logger.info(messageValue);
}
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "test01", partitions = {"0","1"}),
@TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")),
},
groupId = "test-group-04")
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
}
배치 리스너는 리스너 타입을 BATCH 로 선언한 뒤 각 컨슈머 호출 메소드에 레코드가 리스트 형태입으로 record.foreach를 통해 데이터 처리를 구현하면 됨.
배치 컨슈머는 2가지가 존재하는데 "배치 컨슈머 리스너"와 "배치 커밋 리스너"가 있음.
배치 컨슈머 리스너 : 컨슈머를 직접 사용하기 위해서 컨슈머 인스턴스를 파라미터로 받아 처리하는 리스너
- commitSync, commitAsync를 사용할 수 있음
배치 커밋 리스너 : 컨테이너에서 관리하는 AckMode를 사용하기 위해 Acknowledgement 인스턴스를 파라미터로 받아 처리하는 리스너
-AckMode 제공하는 방식만 사용할 수 있음
주의점 : 수동 커밋을 하기 위해선 반드시 AckMode를 MANUAL 또는 MANUAL_IMMEDIATE로 설정해야 됨.
@SpringBootApplication
public class KafkaCommitConsumer {
public static Logger logger = LoggerFactory.getLogger(KafkaCommitConsumer.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(KafkaCommitConsumer.class);
application.run(args);
}
@KafkaListener(topics = "test",
groupId = "test-group-00")
public void recordListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
records.forEach(a -> logger.info(a.toString()));
ack.acknowledge();
}
@KafkaListener(topics = "test",
groupId = "test-group-01")
public void singleTopicListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
records.forEach(a -> logger.info(a.toString()));
consumer.commitAsync();
}
}
만약 둘다 쓰고 싶다면 "배치 컨슈머 리스너"를 사용해서 구현할 수 있음.
2. 컨테이너 팩토리로 직접 리스너 생성해서 사용
주로 서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서 커스텀 리스너 컨테이너를 사용함
@Configuration
public class ListenerContainerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer, partitions);
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
ConsumerAwareRebalanceListener.super.onPartitionsRevokedAfterCommit(consumer, partitions);
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer, partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
ConsumerAwareRebalanceListener.super.onPartitionsLost(partitions);
}
});
factory.setBatchListener(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.setConsumerFactory(cf);
return factory;
}
}
onPartitionsRevokedBeforeCommit : 커밋 되기 전에 리밸런스가 발생했을때
onPartitionsRevokedAfterCommit : 커밋이 일어난 이후에 리밸런스가 발생했을때
@SpringBootApplication
public class CustomKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(CustomKafkaConsumer.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(CustomKafkaConsumer.class);
application.run(args);
}
@KafkaListener(topics = "test",
groupId = "test-group-00",
containerFactory = "customContainerFactory") //커스텀 빈으로 등록한 객체의 네임이랑 동일하게 선언!!
public void recordListener(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
}
[참고자료]
https://product.kyobobook.co.kr/detail/S000001842177
'IT > 자바' 카테고리의 다른 글
코틀린 기본 문법 (0) | 2024.08.04 |
---|---|
4장 카프카 상세 개념 설명 (0) | 2024.02.04 |
3장 카프카 기본 개념 설명 (0) | 2024.01.27 |
Test-DrivenDevelopment : 테스트 주도 개발 (0) | 2023.03.12 |
자바 플레이그라운드 with TDD, 클린코드 강좌 후기 Day 3 (0) | 2022.12.18 |