3-1) 카프카 브로커, 클러스터, 주키퍼
브로커 : 클라이언트와 데이터를 주고받는 주체, 데이터 분산 저장으로 장애 발생에 대한 대응이 가능하도록 도와주는 애플리케이션
(한 클러스터(=렉)에 3대 이상의 브로커로 운영하길 권장함 - 클러스터로 묶인 브로커끼린 데이터 분산 저장 및 복제하는 역할)
프로듀서 -> 브로커 -> 컨슈머 관계로 데이터를 주고받는다.
프로듀서 : 데이터 전송
브로커 : 요청받은 토픽의 파티션에 데이터 저장 및 컨슈머가 데이터 요청하는 경우 데이터 전달
컨슈머 : 데이터 요청
config/server.properties의 log.dir 옵션에 정의된 디렉토리에 데이터를 저장함
카프카는 별도 db나 캐시 메모리를 사용하지 않고 파일 시스템에 데이터를 저장하기 때문에 파일 입출력으로 속도 이슈가 있을 수 있다고 생각하지만 전혀 그렇지 않다.
이유 : 페이지 캐시를 사용해서 디스크 입출력 속도를 높였기에 속도 이슈가 없으며 힙메모리 사이즈도 크게 설정할 필요가 없음
(페이지 캐시 : OS에서 파일 입출력 성능 향상을 위해 만들어 놓은 메모리 영역)
데이터 복제는 카프카 장애허용시스템으로 동작하도록 하는 원동력임
이유: 클러스터로 묶인 브로커 중 일부에 장애가 발생하더라도 데이터 유실이 없고 안정하게 사용하기 위함
복제는 파티션 단위로 이뤄지고 토픽 생성때 파티션의 복제 개수를 설정함 (기본값 : 1 - 복제 없음, 최대 브로커 개수만큼, 금융데이터인 경우 최소 3개로 설정하길 권장)
복제 : 리더 파티션의 오프셋을 확인해서 팔로워 파티션 자신들의 오프셋과 차이가 있다면 리더 파티션에서 데이터를 가져와 저장하는 과정을 뜻함.
컨트롤러 : 브로커 상태 체크 및 리더 파티션 분배 역할을 함 (만약, 컨트롤러가 장애나면 다른 브로커가 해당 역할을 위임받음)
데이터 삭제 : 특정 데이터만 삭제 불가, 로그세크먼트(=파일)단위로만 삭제 가능
- log.retention.bytes, log.retention.ms 설정에 맞춰 체크해서 삭제함.
컨슈머 오프셋 저장 : 처리한 데이터 위치(=오프셋)을 consumer_offset 토픽에 저장
코디네이터 : 컨슈머 상태 체크 및 파티션을 컨슈머와 매칭되도록 분배하는 역할
- 리밸런스 : 컨슈머 재할당하는 과정을 의미함
주키퍼 : 카프카의 메타데이터를 관리하는 역할(카프카 서버랑 동일한 서버에 설치되며 포트는 2181임)
3-2) 토픽과 파티션
토픽 : 데이터를 구분하기 위해 사용하는 단위
파티션 : 프로듀서가 보낸 데이터(=레코드 : 오프셋, 타임스탬프+메시지키+메시지값) 저장
- 파티션은 카프카의 병렬처리의 핵심으로 그룹으로 묶인 컨슈머들이 레코드(=데이터)를 병렬로 처리할 수 있게 매칭됨
- 파티션 : 큐형태로 FIFO(선입선출)구조이나 큐와 다르게 데이터는 삭제되지 않음
3-3) 레코드
레코드 : 타임스탬프, 메세지 키, 메세지 값, 오프셋, 헤더로 구성됨
브로커에 한번 적재된 레코드는 수정 및 삭제가 안됨 (특정 규칙에 의한 파일 단위로는 삭제됨)
타임스탬프 : 프로듀서에서 해당 레코드가 생성된 시점 또는 브로커에 적재된 시점을 의미함
메세지 키 : 메시지 값을 순서대로 처리하거나 종류를 나타내는 값으로 해당 키 값의 해시값을 토대로 파티션을 지정함
(동일 메세지 키라면 동일 파티션으로 들어감)
※ 파티션 개수 변경 시 재분배됨으로 운영 중에 값을 변경해서는 안됨
메세지 값 : 실제 처리할 데이터
오프셋 : 컨슈머가 데이터를 가져갔음을 나타내는 위치값
헤더 : 레코드의 추가적인 정보를 담는 메타데이터 저장소 용도
3-4) 카프카 클라이언트
프로듀서 API : 리더 브로커와 연결되어 레코드(=데이터)를 특정 파티션으로 전송함
- 이때 사용되는 애플리케이션 기반으로 연동됨으로 자바를 사용하는 경우 자바에서 제공되는 형태로 데이터를 직렬화(=외부 시스템에서 사용 가능하도록 바이트 형태로 데이터를 변환하는 기술)해서 보낼 수 있음
[참고사항]
send() - 바로 브로커로 데이터 전송이 아니고 배치 형태로 묶여서 브로커에 전송되도록 대기열에 넣는걸 의미함
flush() - 레코드 배치를 브로커로 전송함 (바로 전송)
close() - 인스턴스의 리소스 안전하게 종료
프로듀서 전송절차
카프카 프로듀서의 인스턴스가 send() 메소드 호출 -> 파티셔너를 통해 어느 파티션으로 전송될 건지 결정 -> 어큐뮬레이터에 토픽별로 버퍼로 쌓아둠 -> sender 를 통해서 클러스터로 데이터 전송
(배치 형태로 묶어서 전송함으로 프로듀서의 처리량을 향상시킴)
프로듀서 파티셔너 : uniformStickyPartitioner(카프카 2.4.0 부터 디폴트 값) RoundRobinPartitioner 2가지 존재함
uniformStickyPartitioner: 라운드로빈 파티셔너의 단점을 보완한 버전으로 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 전송함으로 높으 처리량과 낮은 리소스 사용률이 특징임
RoundRobinPartitioner : 대기 없이 바로 전송
※ 프로듀서는 전송 시 압축 방식을 통해 네트워크 처리량을 높일 수 있으나 압축으로 CPU, 메모리 리소스를 사용하게 됨으로 해당 옵션은 적철한 상화에서만 사용해야 됨
프로듀서 옵션들
주요 옵션 :
bootstrap.servers : 데이터 전송 대상 카프카 클러스터에 속한 브로커 호스트 이름 (2개 이상 입력 권장)
key.serializer : 메시지 키 직렬화 클래스 지정
value.serializer : 메시지 값 직렬화 클래스 지정
선택 옵션 :
acks : 전송 데이터가 정상적으로 브로커로 저장되었는지 전송 성공 여부 확인 (0: 전송만 check, 1: 저장까지 check, -1 or all: 리더&팔로워 모두 저장 check)
buffer.memory : 브로커에 전송할 데이터의 배치의 버퍼 메모리 사이즈 (기본값 : 32MB)
retries : 재시도 횟수
batch.size : 배치로 전송할 레코드의 최대 용량
linger.ms : 배치를 전송하기 전까지 기다리는 최소 시간 (기본값 0)
patitioner.class : 파티셔너 클래스 지정
enable.idempotence : 멱등성 프로듀서 동작 여부
transactional.id : 프로듀서가 레코드를 전송할 때 레코드 트랜잭션 단위로 묵을지 여부 설정
프로듀서 레코드 생성 때 파티션 번호를 넣으면 해당 토픽의 특정 파티션으로 지정해서 레코드를 전송할 수 있음
ex) new ProducerRecord<>(TOPIC_NAME, partionNo, messageKey, messageValue);
브로커 정상 전송 여부를 확인은 get() 메소드를 통한 동기적 확인과 callback 인터페이스를 활용한 비동기적 확인이 있음.
(단, 비동기 확인은 재전송으로 데이터 순서가 꼬일 수 있음)
컨슈머 API : 브로커에 적재된 데이터를 가져와 필요한 처리를 함
컨슈머 그룹을 통해서 컨슈머의 목적을 구분할 수 있음 ex) email-application-group인 경우 email 발송 처리 애플리케이션을 의미함
※ 컨슈머 그룹을 기준으로 오프셋을 관리하기 때문에 subscribe() 메서드를 사용하여 토픽을 구독하는 경우 컨슈머 그룹을 선언해야 됨
역직렬화는 반드시 프로듀서에 선언한 직렬화 클래스와 동일하게 선언
poll() 메소드를 통해서 데이터 가져옴
컨슈머 중요 개념
컨슈머 운영방식은 2가지 존재함
1) 1개 이상의 컨슈머로 이뤄진 컨슈머 그룹으로 운영 : Subscribe 메소드 사용 - 리밸런싱 발생함
- 컨슈머 개수는 가져가려는 토픽의 파트션 개수보다 같거나 작아야 됨 안그럼 유휴상태인 불필요한 컨슈머가 생길 수 있음
2) 토픽의 특정 파티션만 구독하는 컨슈머로 운영 : assign 메소드 사용 - 리밸런싱 발생하지 않음
컨슈머 그룹의 장점 : 각 그룹은 서로 격리된 상태로 운영됨으로 장애가 전파되는걸 막을 수 있음
※ 장애 발생 시 자동으로 리밸런싱되어 정상 컨슈머들끼리 데이터 처리를 재분배하여 운영됨으로 서비스 장애를 막을 수 있음
그러므로 반드시 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야 됨
컨슈머는 consumer_offset 에 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록함
만약 데이터 처리 후 consumer_offset에 기록되지 않으면 중복으로 재처리될 수 있음
그러므로 오프셋 커밋을 정상적으로 처리했는지 검증해야 됨
오프셋 커밋 방법
1) 명시적: 직접 데이터 처리 후 커밋하는 방법 (중복처리 가능성 없음)
- poll() 호출 후 반환받은 데이터 처리의 완료 후 commitSync()-동기식, commitAsync()-비동기식 메소드로 직접 커밋함.
※ 개별 레코드 단위 커밋을 원하는 경우 commitSycn() 메소드에 Map<TopicPartition, OffsetAndMetadata> 인스턴스를 파라미터로 넘기면 됨
2) 비명시적 : enable.auto.commit = true 옵션으로 일정 가격마다 자동 커밋하는 방법 (중복처리 가능성 다소 있음)
- poll() 호출 이후 auto.commit.interval.ms에 설정된 값 이상이 지나면 그시점에 읽은 레코드의 오프셋을 커밋함.
컨슈머 내부 구조
컨슈머 애플리케이션이 실행되면 Fetcher 인스턴스가 생성되며 미리 레코드들을 내뷰 큐로 가져옴 그 뒤에 poll() 메소드를 호출하면 큐에 담겨 있는 레코드를 꺼내 반환받아 처리를 수행할 수 있음.
컨슈머 옵션
주요옵션
bootstrap.servers : 데이터 전송 대상 카프카 클러스터에 속한 브로커 호스트 이름 (2개 이상 입력 권장)
key.deserializer : 메시지 키 역직렬화 클래스 지정
value.deserializer : 메시지 값 역직렬화 클래스 지정
선택옵션
group.id : 컨슈머 그룹 아이디, subsecribe() 메서드로 토픽 구독할 경우 반드시 선언 필요
auto.offse.reset : 저장된 오프셋이 없는 경우 어디서부터 읽을지 선택 (latest : 가장 최근에 넣은 오프셋부터, earliest: 가장 오래전에 넣은 오프셋부터, none : 컨슈머 그룹이 커밋한 기록을 확인해 없는 경우 에러 반환, 있으면 기존 커밋 이후 오프셋부터, 기본값 : latest)
enable.auto.commit : 자동 커밋 동작 여부
auto.commit.interval.ms : 자동 커밋일 경우 커밋 간격 (기본값 : 5000)
max.poll.records: poll() 호출 시 반환되는 레코드 개수
session.timout.ms : 컨슈머와 브로커 연결이 끊기는 최대시간 (리밸러싱 발생 기준시간)
heartbeat.interval.ms : 하트비트 전송 간격 시간
max.poll.interval.ms : poll() 호출 간격의 최대 시간, poll 호출 이후 데이터 처리에 시간이 너무 많이 걸리는 경우 비정상으로 판단하여 리밸런싱함
isolation.level : 트랜잭션 프로듀서가 레코드를 트랜잭션 단우로 보낼 경우 사용, read_commited(커밋 완료된 레코드만 읽음), read_uncommited(커밋 상관없이 모든 레코드 읽음) 설정 가능(기본값 : read_commited)
리밸러스 리스너
poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터가 중복 처리될 수 있다 왜냐하면 데이터 중 일부를 처리했으나 커밋하지 않았기 때문임.
ComsumerRebalanceListener 인터페이스를 통해 리밸런싱을 감지할 수 있는데 onPartitionAsstigned()(=리밸런스 완료 후 호출), onPartitionRevoked()(=리밸런스 시작 전에 호출)메소드를 통해서 처리할 수 있다.
onPartitionRevoked 메소드에서 마지막 처리 기준으로 커밋을 수행시킴으로 중복처리를 방지할 수 있음
※ 컨슈머는 정상적으로 종료되어야 함 왜냐하면 종료하지 않을 경우 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게되고 동작하지 않는 컨슈머가 존재함으로 파티션 데이터가 소모되지 못하고 컨슈머 렉이 늘어나 데이터 처리 지연이 발생하기 때문임.
컨슈머 애플리케이션 (자바) 셧다운 훅에서 wakeup 메소드 호출하여 자바쓰레드 종료 시 반드시 해당 컨슈머도 종료되도록 처리해야 됨.
이유: wakeup이 호출된 컨슈머에 poll이 호출되면 WakeupExcetion이 발생함으로 해당 예외를 캐치해서 close()(=해당 컨슈머는 더는 동작하지 않는다는걸 명시적으로 알리는 메서드)하여 컨슈머 자원 해제됨.
3-5) 카프카 스트림즈
카프카 스트림즈 : 토픽에 적재된 데이터를 상태기반 또는 비상태기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리임.
장점 : 카프카 클러스터와 완벽한 호환, 장애허용시스템을 갖춰 안정성이 매우 높음
즉, 카프카 클러스터를 운영하면서 실시간 스트림 처리가 필요할 경우 1순위로 고려해야됨.
프로듀서, 컨슈머를 조합하지 않고 스트림즈를 사용해야 되는 이유
- 스트림즈 DSL, 프로세스 API로 기능을 쉽게 확장할 수 있고, 스트림즈에서 제공하는 단 한번 데이터 처리, 장애 허용 시스템등의 특징을 구현하기 어려움으로
(단, 소스 토픽(=사용하는 토픽), 싱크토픽(=저장하는 토픽)의 카프카 클러스터가 다른 경우엔 스트림즈를 적용할 수 없음으로 컨슈머, 프로듀서 조합으로 클라스터를 지정하는 방식으로 개발해야 됨)
태스크(Task) : 스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위
카프카 스트림즈는 트리형 토폴로지 형태와 유사함.
소스 프로세스 : O
(가져오기) / \
---------------------------------------------------
스트림 프로세스 : O O
(분기, 변환) \ / \
---------------------------------------------------
싱크 프로세스: O O
(저장)
스트림즈를 구현하는 방법
1) 스트림즈DSL : 프로세싱에 쓰일만한 다양한 기능을 자체 API로 제공함
ex) 메시지 값을 기반으로 토픽 분기처리, 지난 10분간 들어온 데이터 개수 집계, 토픽과 다른 토픽 결합으로 새로운 데이터 생성
2) 프로세스 API : DSL로 구현하기 어려운 경우 해당 API를 사용하면 됨
ex) 메시지 값의 종류에 따라 토픽을 가변적으로 전송, 일정한 시가 간격으로 데이터 처리
스트림즈DSL 레코드 흐름 개념
1) Kstream : 데이터 조회 시 모든 레코드를 흐름대로 출력함(동일한 키에 대한 새로운 데이터를 추가하는 경우 새로운 row에 저장)
2) KTable : 키를 기준으로 묶어서 출력됨 (동일한 키에 대한 새로운 데이터 추가하는 경우 기존 키의 값을 새로운 값으로 업뎃)
3) GlobalKTable : 키를 기준으로 묶어서 출력되는 KTable과 동일하지만 다른 점은 모든 파티션 데이터가 각 태스크에 사용되는 점과 조인할 경우 KStream의 메시지 키뿐만 아니라 메시지 값이랑도 매핑하여 조인되는 차이점이 있음 (단, 모든 데이터를 모든 테스크에서 동일하게 쓰임으로 로컬 스토리지 사용량과 네트워트 및 브로커의 부하가 생길 수 있어 적은 데이터에서만 사용하는걸 권장함)
stream() : Kstream 형태로 데이터를 가져오기
filter() : 특정 조건에 맞는 데이터를 골라내기
to() : 특정 토픽에 저장
join() : 특정 토픽들의 데이터를 조인하기
카프카에선 실시간으로 들어오는 데이터들은 조인할 수 있음
예를 들어 이름-주소를 가지고 있는 KTalble이 있고, 이름-주문 정보가 있는 KStream이 있는 경우 두 토픽을 조인해서 고객의 주소와 주문정보를 가져와 처리할 수 있음.
-이때 KTable과 KStream은 서로 코파티셔닝이 되어야 됨으로 동일한 파티션 개수, 동일한 파티셔닝 전략을 사용해야 됨.
(만약 코파티셔닝이 되지 않는 경우 리파티셔닝 수행 후 처리 또는 KTable로 사용한 토픽을 GlobalKTable로 선언하여 해결할 수 있음)
3-6) 카프카 커넥트
카프카 커넥트 : 데이터 파이트라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션임(특정작업을 템플릿으로 만듦으로서)
커넥터 - 소스커넥트(=프로듀서), 싱크 커넥트(=컨슈머) 2가지로 나뉨
카프카 커넥트는 오픈소스 커넥터(jar 형태)를 쓸 수도 있고 직접 개발한 커넥터를 쓸 수도 있음
카프카 커넥트 내부에슷 커넥터와 테스크를 생성하는데 테스크는 커넥터 내부에서 실질적으로 데이터를 처리하는 공간임
커넥터 옵션
1) 컨버터 : 데이터 처리 전 스키마 변경을 도움
2) 트랜스폼 : 데이터 처리 시 메시지 단위로 데이터를 간단하게 변환
커넥트 실행방법
1) 단일모드 커넥트 : 1개 프로세스로만 실행됨으로 고가용 구성이 불가능하고 spof(단일 장애점)이 될 수 있음
2) 분산모드 커넥트 : 2대 이상의 서버에서 클러스터 형태로 운영함으로써 고가용성 구성 및 무중단 스케일 아웃도 가능함
소스커넥터 : 소스 애플리케이션 또는 소스파일로부터 데이터를 가져와 토핏으로 넣는 역할
- sourceConnector, sourceTask 클래스를 사용해서 구현할 수 있고 반드시 connect-api라이브러 추가가 필요함
* sourceTask는 토픽에서 사용하는 오프셋이 아닌 자체 오프셋을 사용함
커넥터 직접 개발할 경우 반드시 작성한 클래스 뿐만 아니라 참조하는 라이브러리도 함께 빌드하여 jar로 압축해야됨
- jar { from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } }
커넥터 옵션값 설정시 중요도 지정 기준
High : 사용자가 입력하는 설정이 필요한 경우
Medium : 사용자가 입력하는 설정이 없더라고 기본값이 있는 경우
Low : 사용자의 입력값이 없어도 되는 경우
소스 테스크에서 가장 중요한 부분은 소스파일로부터 읽고 토픽으로 보낸 지잠을 기록하고 사용한다는 점이다 그리하여 커넥터가 재시작할때 데이터 중복을 막기 위해서다.
[참고자료]
https://product.kyobobook.co.kr/detail/S000001842177
'IT > 자바' 카테고리의 다른 글
4장 카프카 상세 개념 설명(2) (0) | 2024.02.15 |
---|---|
4장 카프카 상세 개념 설명 (0) | 2024.02.04 |
Test-DrivenDevelopment : 테스트 주도 개발 (0) | 2023.03.12 |
자바 플레이그라운드 with TDD, 클린코드 강좌 후기 Day 3 (0) | 2022.12.18 |
자바 플레이그라운드 with TDD, 클린코드 강좌 후기 Day 2 (0) | 2022.11.27 |