4장 카프카 상세 개념 설명
4-1) 토픽과 파티션
카프카에서 토픽은 가장 중요한 역할이며 토픽의 파티션 개수는 카프카의 성능과 관련이 많음
4-1-1)적정 파티션 개수
- 고려사항 : 데이터 처리량, 메시지 키 사용 여부, 브로커 & 컨슈머 영향도
1. 데이터 처리량
파티션 : 카프카 병렬처리의 핵심, 파티션-컨슈머 1:1 매핑임
=> 그러므로 파티션 개수를 정할때 해당 토픽에 필요한 데이터 처리량을 측정하여 정하는게 중요함.
데이터 처리 속도를 올리는 방법
1) 컨슈머 처리량을 늘리는 방법
- 컨슈머 서버의 사양을 늘리거나 GC 튜닝 등을 활용할 수 있으나 컨슈머 특성상 다른 시스템(=오라클, 하둡..)과 연동하여 데이터 처리함으로 한계 존재
2) 파티션 개수 늘리는 방법
- 가장 확실한 방법임 프로듀서가 보내는 데이터양보다 파티션을 늘려 컨슈머의 처리량을 늘리 수 있음
공식 : 프로듀서 전송 데이터량 < 컨슈머 데이터 처리량 * 파티션 개수
ex) 프로듀서가 초당 1000 레코드를 보내고 컨슈머가 처리할 수 있는 데이터가 초당 100인 경우 파티션 개수는 10개 이상으로 설정해야됨
만약 10개 미만으로 파티션을 설정할 경우 건슈머 랙이 생기고 데이터 지연이 발생하게 됨.
[주의]
파티션 개수를 늘리면 컨슈머와 브로커에 부담이 생길 수 있음.
그러므로 데이터 처리 지연이 발생할 경우 서비스 영향도를 고려해서 파티션 개수를 구하는 것이 좋음
(무조건 큰게 좋은게 아님!!!)
2. 메시지 키 사용 여부
파티션 개수를 구할 때 메세지 키 사용 여부와 데이터 순서 보장 여부를 함께 고려해야 됨
이유 : 메세지 키를 사용하면서 데이터 순서 보장이 중요한 경우 파티션 개수를 운영 중 변경하면 지정되는 파티션 순서가 변경됨으로 이전 파티션에 대한 데이터 순서를 보장할 수 없게 됨
그러므로 메시지 키를 사용하면서 데이터 순서가 중요한 토픽은 첨부터 파티션 개수를 넉넉히 잡는걸 권함.
그 외엔 데이터 양에 따라 파티션을 늘릴 수 있음으로 넉넉히 잡을 필요는 없음.
3. 브로커 & 컨슈머 영향도
파티션은 각 브로커의 파일 시스템을 사용함으로 개수가 늘면 그만큼 브로커에서 읽어야 될 파일 개수가 늘어남.
운영체제에서는 프로세스당 열 수 있은 파일 개수를 제한하고 있음으로 브로커당 파티션 개수를 모니터링해서 그 개수가 많을 경우 브로커를 늘리는 방안도 함께 고려해야 됨.
4-1-2) 토픽 정리 정책
토픽의 데이터를 시간/용량에 따른 규칙을 적용해서 삭제할 수도 삭제하지 않을 수도 있음
cleanup.policy 옵션으로 2가지 삭제 정책이 제공됨
1. 삭제 정책 (delete)
- 세그먼트 (=파일) 단위로 데이터를 삭제함
파티션별로 별개로 세그먼트를 생성되며 segment.byte 옵션으로 크기를 설정할 수 있음
삭제 주기는 시간 또는 용량으로 설정할 수 있음
2. 압축 정책 (compact)
- 압축 : 메시지 키 기준으로 오래된 데이터를 삭제하는 걸 의미함
데이터 흐름상 가장 마지막 업뎃한 메시지 키의 데이터는 삭제되지 않음. 즉 최신 데이터에 대해선 삭제되지 않음을 보장함
압축 주기는 min.cleanable.dirty.ratio 옵션 값으로 더티 비율로 설정할 수 있음.
= 더티(=헤드영역) 레코드 개수 / (클린(=테일 영역) 레코드 개수 + 더티 레코드 개수)
4-1-3) ISR(in-sync-replicas)
=리더 파티션과 팔로워 파티션이 모두 싱크된 상태를 의미함
리더파티션은 replica.lag.time.max.ms 설정된 주기에 팔로워 파티션이 데이터를 복제했는지 확인함. 만약 해당 값보다 긴 시간 동안 데이터를 가져가지 않았으면 해당 파티션은 ISR 그룹에서 제외됨
(ISR 그룹 제외 : 리더 파티션으로 선출 불가능함을 의미함)
unclean.leader.election.enable 옵션(토픽별로 설정 가능함)으로 ISR 그룹이 아닌 파티션도 리더로 선출할 수 있도록 설정 가능함
(단, 데이터 유실 여부에 따른 영향도에 따라 결정됨, 데이터 유실보다 서비스 무중단 운영이 중요한 경우)
4-2) 카프카 프로듀서
프로듀서는 카프카에서 데이터를 저장하는 단계임
4-2-1) acks 옵션
acks 옵션 : 0, 1, -1 (설정에 따라 성능 변화가 존재함)
acks=0 : 프로듀서가 데이터 전송 후 리더 파티션이 해당 데이터를 정상적으로 받았는지 여부 확인하지 않음
속도 빠름. 데이터 유실에 대한 처리 불가능(retry 처리 X)
acks=1 : 프로듀서가 데이터 전송 후 리더 파티션이 해당 데이터를 정상적으로 받았는지 여부 확인 함
(단, 리더 파티션에 대해서만 정상 수신여부를 확인함으로 복제 실패 시 데이터 유실 가능성은 존재함)
acks=-1 / all : 프로듀서가 데이터 전송 후 리더 파티션 뿐만 아니라 팔로워 파티션까지 데이터를 정상적으로 받았는지 여부 확인 함.
(데이터 유실 가능성이 없음으로 안정성 보장됨)
[주의]
-1/all로 설정할 경우 min.insync.replicas 옵션을 보는데 해당 옵션을 ISR 그룹 중 최소로 정상 수신여부를 확인하는 파티션 개수인데
1로 설정하면 acks=1 옵션과 동일한 동작임으로 -1/all로 설정한 의미가 없어짐 그러므로 min.insync.replicas 값을 2 이상으로 해야 됨.
(이유 : 리더 파티션도 ISR 그룹에 포함되어 있기 때문에)
또한 min.insync.replicas 옵션은 브로커 개수와 동일한 값으로 설정하면 안됨.
(이유 : 버전 업그레이드와 같이 롤링 다운 타임이 발생하는 상황에서 브로커 1대가 작업으로 중단되면 프로듀서가 데이터를 추가할 수 없음으로)
4-2-2) 멱등성 프로듀서
멱등성 : 여러번 연산을 수행하더라도 동일한 결과를 나타내는 것
enable.idempotence 옵션으로 멱등성 프로듀서를 사용할 수 있으며 이 기능을 통해 데이터 중복을 막을 수 있음
해당 옵션을 true로 설정하면 기본 프로듀서와 달리 데이터를 브로커에 전달할 때 PID, 시퀀스 넘버를 함께 전달하여 동일 메시지 전송 요청이 오더라도 중복 체크를 해서 1번만 전달할 수 있음.
멱등성 프로듀서는 동일 세션(PID)에 한에서만 중복 체크를 함으로 만약 프로듀서에 장애가 발생해 재기동할 경우 PID가 달라짐으로 동일한 메세지를 보낼 수 있음.
즉, 장애가 발생하지 않는 경우에만 정확히 한번 데이터 적재는 보장함.
enable.idempotence 옵션을 사용하면 강제로 retry는 기본값으로 Integer.MAX_VALUE로 설정되고, acks 옵션은 all로 설정됨
[주의]
멱등성 프로듀서는 브로커로 데이터 전송이 오직 1번임을 의미하는 것이 아니고 상황에 따라 프로듀서는 여러번 전송하되 브로커에서 데이터의 PID와 시퀀스 값을 통해 여러번 전송된 데이터임을 확인해서 중복된 데이터를 적재하지 않는 것임
멱등성 프로듀서를 사용할 경우 시퀀스 넘버는 0부터 1씩 증가하는데 만약 브로커에서 예상한 시퀀스 넘버와 다른 번호로 데이터 적재 요청이 온다면 OutOfOrderSequenceException 에러가 발생하는데 데이터 순서가 중요한 경우 해당 에러에 대한 대응 방안을 고려해야 됨.
4-2-3) 트랜잭션 프로듀서
다수의 파티션에 데이터를 저장할 경우 모둔 데이터에 대해 동일한 원자성(=다수의 데이터가 트랜잭션으로 묶여 전체 데이터를 처리하거나 처리하지 않음으로 데이터 통일)을 만족하기 위해 사용되는 기능임
enabled.idempotence = true, transactional.id를 임의의 String 값으로 정의한 뒤 컨슈머의 isolation.level을 read_committed로 설정하면 프로듀서와 컨슈머는 트랜잭션으로 처리 완료된 데이터만 쓰고 읽음을 보장할 수 있음
4-3 카프카 컨슈머
컨슈머 : 카프카에 적재된 데이터를 처리함
4-3-1) 멀티 스레드 컨슈머
데이터 병렬처리를 하기 위해선 파티션 개수와 컨슈머 개수를 늘려 처리 속도를 높일 수 있음.
이때 멀티 스레드로 컨슈머 개수를 늘려 운영할 수 있는데 하나의 프로세스에 스레드를 여러개 띄울 수도 있고 프로세스 1개에 스레드 1개로 여러 프로세스를 실행시켜서 병렬처리를 제공할 수 있음.
(주의점 : 다른 스레드에 영향을 주지 않도록 스레드 세이프 로직과 변수를 적용해야됨)
1) 멀티 워커 스레드
컨슈머 스레드는 1개만 실행하고 데이터 처리 담당 워커 스레드를 여러개 실행하는 방법
[주의사항]
- 스레드를 사용함으로써 데이터 처리가 끝나기 전에 커밋됨으로 리밸러싱 또는 컨슈머 장애 발생 시 데이터 유실이 발생할 수 있음.
- 각각 다른 스레드에서 데이터 처리가 진행됨으로 데이터 처리 순서가 보장되지 않음
public class ConsumerWorker implements Runnable{
private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private String recordValue;
ConsumerWorker(String recordValue) {
this.recordValue = recordValue;
}
@Override
public void run(){
logger.info("thread:{}\treocord:{}", Thread.currentThread().getName(), recordValue);
}
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
public class MutiWorkerThreadApplication {
private final static Logger logger = LoggerFactory.getLogger(MutiWorkerThreadApplication.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVICES = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVICES);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
ConsumerWorker worker = new ConsumerWorker(record.value());
executorService.execute(worker);
}
}
}
}
2) 컨슈머 멀티 스레드
컨슈머 스레드를 여러개 실행하는 방법
[주의사항]
- 토픽의 파티션 개수만큼 컨슈머 스레드를 생성해서 운영해야됨
public class ConsumerWorker implements Runnable{
private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private Properties properties;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
public ConsumerWorker(Properties properties, String topic, int number) {
this.properties = properties;
this.topic = topic;
this.threadName = "consumer-thread-"+number;
}
@Override
public void run(){
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
}
}
}
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
public class MutiConsumerThreadApplication {
private final static Logger logger = LoggerFactory.getLogger(MutiConsumerThreadApplication.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVICES = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
private static final int CONSUMER_COUNT = 3;
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVICES);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
executorService.execute(worker);
}
}
}
4-3-2) 컨슈머 랙
컨슈머 랙 : 토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이를 의미함
( = 프로듀서가 데이터를 적재한 오프셋 - 컨슈머가 데이터 가져간 오프셋)
컨슈머 랙을 모니터링하는 것은 매우 중요함
왜냐하면 컨슈머 장애를 인지할 수 있고 데이터 지연에 따른 파티션 개수 추가 작업을 결정할 수 있기 때문임
- 랙 확인 방법 : 카프카 명령어, 컨슈머 애플리케이션 metrics() 메소드, 외부 모니터링 툴 사용
컨슈머 배포 방식
무중단 배포
1. 블루/그린 : 기존 버전과 동일한 신규 버전을 구성한 뒤 기존 애플리케이션을 모두 종료해서 자동으로 신규 애플리케이션으로 전환 배포
2. 롤링 : 인스턴스 단위로 전환 배포
3. 카나리 : 파티션 1개를 신규버전 컨슈머로 배정하여 테스트 및 모니터링 진행
[참고자료]
https://product.kyobobook.co.kr/detail/S000001842177