14 Apr 2022
Kafka 내부 메커니즘
카프카 내부 메커니즘
- 카프카 복제가 동작하는 방법
- 카프카가 프로듀서와 컨슈머의 요청을 처리하는 방법
- 카프카가 스토리지를 처리하는 방법
클러스터 멤버십
- 내부적으로 주키퍼는 표준 파일 시스템의 디렉터리처럼 계층적인 트리구조로 데이터를 저장한다
- 데이터를 저장하는 노드를 znode라고 부른다.
- 노드의 관리는 주키퍼를 사용하는 클라이언트가 수행
- 노드 생성/삭제
- 해당 경로 노드 존재 여부
- 해당 노드의 데이터를 읽기/쓰기
- 특정 노드의 자식노드 내역 가져오기
ex)
- /kafka-main
- /kafka-sub1
- /kafka-sub2
- /kafka-sub3
노드의 종류
각 노드는 상태와 구성 정보 및 위치 정보등 데이터만 저장되기 때문에 크기가 작으며 메모리에 저장되므로 속도가 빠르다.
- 임시 노드
- 노드를 생성한 클라이언트가 연결되있을때만 존재, 연결이 끊어지는 경우 자동 삭제
- 영구 노드
- 클라잉언트가 삭제하지 않는한 보존, 또한 Watch를 걸 경우 모니터링 가능
브로커 특징
- 브로커는 고유 식별자를 갖는다.
- 브로커 프로세스가 시작될때 /brokers/id 에 임시노드로 자신의 ID등록
- 같은 ID를 갖는 다른 브로커 시작시 같은 주키퍼노드의 존재로 에러발생
- 브로커가 추가,삭제,연결끊김 등이 발생할 경우 임시노드는 주키퍼에서 자동 삭제되며
- 브로커가 중단 되면 브로커의 주키퍼 노드가 삭제되는데 이때 브로커 ID는 데이터 구조에 존재한다.(각 토픽의 레플리카에 브로커 ID포함) 그래서 중단된 브로커 ID를 갖고 새로운 브로커를 시작시키면 삭제된 브로커에 할당됬던 토픽과 파티션을 갖고 클러스터에 합류된다.
컨트롤러
역할
- 일반 브로커 기능+파티션 리더를 선출하는 책임을 갖는다.
컨트롤러 동작 방식
- 컨트롤러 브로커가 중단되거나 주키퍼와 연결이 끊어지면 임시 노드인 /controller가 삭제 된다.
- 삭제시 다른 브로커들이 주키퍼 watch를 통해 그 사실을 알고 /controller 노드 생성을 시도하고 첫번째로 생성한 브로커가 컨트롤러가 된다. 그리고 나머지는 이미 존재한다는 예외를 받는다.
- 컨트롤러는 새로 선출될때 마다 주키퍼로부터 새로운 컨트롤러 세대 번호를 받는다.
- 나머지 컨트롤러는 새로 나온 세대번호를 알게 되고 이전 번호의 컨트롤러 메시지는 무시한다.

복제
복제: 분산과 분할 및 복제 처리되는 커밋 로그 서비스
카프카가 가용성과 내구성을 보장하는 방법

복제의 종류
- 리더 레플리카
- 각 파티션은 리더로 지정된 하나의 리플리카를 갖는다.
- 일관성 보장을 위해 모든 프로듀서와 컨슈머의 클라이언트 요청은 리더를 통해 처리한다.(ex) rdbms의 master-slave의 경우를 떠올려 보자)
- 팔로어 레플리카
- 팔로어는 클라이언트의 요청을 서비스 하지 않는다.
- 리더의 메시지를 복제하여 리더 레플리카가 중단되는경우 팔로어가 리더를 대체한다.
Leader replica <-> Follower replica 동기화 문제
- 리더는 Follower replica중에 어느 것이 최신 리더 메시지를 복제하고 있는지 알아야함
- 팔로어들은 리더가 받는 모든 최신 메시지를 복제하려고한다. 그러나 여러 이유로 실패할 수 있다.
- 네트워크 혼잡으로 복제가 늦어지는 경우
- 브로커가 중단되어 해당 브로커를 다시 복제할 수 있을때까지 해당 브로커의 모든 리플리카들이 복제가 늦어지는 경우
- 리더와 동기화를 하기위해 Follower->Leader fetch요청을 한다.
- fetch요청에는 Follower replica가 다음으로 받기 원하는 메시지의 오프셋이 포함된다.
- 항상 수신된 순서대로 처리된다.

각 팔로어 레플리카가 요청한 마지막 오프셋을 보면 복제가 얼마나 지연되고 있는지 리더는 알 수 있다.
- ex) 팔로우 레플리카가 10초 이상 메시지를 요청하지 않았거나 요청했지만 10초 이상 최근 메시지를 복제하지 못했다면 동기화되지 않은 것으로 간주한다.
replica.lag.time.max.ms 매개변수로 해당 time을 조절
선호 리더: 토픽이 생성될때 각 파티션의 리더였던 리플리카
요청 처리
- 카프카는 TCP로 전송되는 이진 프로토콜을 갖는다.
- 특정 클라이언트로부터 브로커에 전송된 모든 요청은 항상 수신된 순서로 처리된다.
- 카프카가 메시지 큐처럼 동작할 수 있어서 저장되는 메시지의 순서가 보장된다.
모든 요청에 포함되는 표준헤더

요청 전송시 어디로 해야하는지 클라이언트가 어떻게 알 수 있는가?
카프카 클라이언트는 메타 데이터 요청을 통해 해당 정보를 얻고 캐싱한뒤 이후 요청에 이용한다.

쓰기 요청
- acks 는 구성 매개변수에는 메시지를 수신해야하는 브로커수를 설정한다
ack=0 : 브로커의 수신 응답을 기다리지 않음
ack=1 : 리더만 메시지를 받으면 됨
ack=all : 모든 동기화 리플리카가 메시지를 받아야 함
쓰기 요청이 왔을때 브로커의 검사 요소
- 데이터를 전송한 사용자가 해당 토픽 쓰기 권한이 있는가?
- acks 값 검사
- acks=all일 경우 충분한 동기화 레플리카가 존재하는가?
이후에 브로커는 로컬 디스크에 새로받은 메시지를 쓰는데 이때 리눅스는 파일 시스템 캐시에 메시지를 쓰지만 언제 디스크에 쓰는지 알 수 없다. 카프카는 디스크에 데이터 쓰기를 기다리지 않고 복제에 의존한다.
해당 동작 방식은 Mysql에서 쓰기가 발생했을때 바로 디스크에 쓰는게 아니라 더티 페이지에 모아두고 추후 디스크에 쓰는 동작방식과 유사하다.
읽기 요청
클라이언트는 읽기를 원하는 토픽과 파티션 및 오프셋에 있는 메시들의 읽기 요청을 브로커에게 전송한다.
- 클라이언트는 각 파티션마다 브로커가 반환할 수 있는 데이터 크기를 제한할 수 있다.
- 클라이언트는 브로커가 전송한 응답을 저장하는 메모리를 할당해야 하므로 데이터 크기 제한이 중요할 수 있다(읽는 쪽 pc or 기기의 성능이 매우 떨어지는 경우)
- 읽기 요청 또한 메타데이터가 필요하며 리더는 요청을 받고 적합한지 검사한다.
- 이미 삭제된 메시지
- 존재하지 않는 오프셋 요청
zero copy
카프카는 메시지를 중간 버퍼 메모리에 쓰지 않고 곧바로 네트워크 채널로 전달한다.

컨슈머가 읽을 수 있는 메시지
- 파티션 리더에 존재하는 모든 데이터를 읽을 수 없고 모든 동기화 레플리카에 쓴 메시지들만 읽을 수 있다.
- 파티션 리더는 어떤 메시지들이 어떤 리플리카에 복제되있는지 알고 있으며 모든 동기화 리플리카들이 메시지를 쓸 때까지 컨슈머에게 전송되지 않는다.

replica.lag.time.max.ms : 복제 끝날때까지 기다리는 지연시간
스토리지
- log.dirs 를 통해 저장될 디렉토리 경로 지정
파티션 할당
- 파티션 리플리카들을 브로커 간에 고르게 분산 시킨다.
- 각 파티션의 리플리카는 서로 다른 브로커에 할당한다.
- 브로커가 랙 정보를 갖고 있다면 가능한 파티션의 리플리카는 서로 다른 랙에 있는 것으로 지정한다.

각 파티션과 리플리카의 브로커가 선택 되었으면 새 파티션들이 생길 경우 디렉토리는 각 파티션 마다 독립적으로 하면 되며 각 디렉토리의 파티션 개수를 계산하고 가장 적은 파티션을 갖는 디렉터리에 새 파티션을 추가하면 된다.
파일 관리
카프카는 데이터를 영원히 보존하지 않고 대신 삭제전 보존하는 시간, 또는 오래된 메시지의 제거전에 보존할 데이터의 크기를 설정한다.
하나의 큰 파일에서 제거해야 하는 메시지를 찾아서 일부 삭제하는 것은 시간이 많이 소요된다. 그래서 카프카는 파티션을 세그먼트로 나눈다.
기본적으로 카프카는 세그먼트를 최대 1GB or 1주일 보존한다.
세그먼트의 제한 크기나 보존 기간에 도달하면 새로운 세그먼트 파일에 계속 쓴다.
메시지를 쓰기 위해 사용 중인 세그먼트를 액티브 세그먼트라 한다.
- 카프카 브로커는 모든 파티션의 모든 세그먼트에 대해 각각 하나의 열린 파일 핸들을 유지한다.(현재 사용중이 아닌 세그먼트도 동일)
- 파일 핸들이 일정 개수를 넘어가면 운영체제의 조정이 필요하다.
파일 형식
- 각 세그먼트는 하나의 데이터 파일로 생성되며 메시지와 오프셋이 저장된다.
- 디스크에 수록되는 데이터의 형식은 메시지의 형식과 동일하다.
- 디스크와 네트워크가 형식이 동일 하기때문에 제로카피로 별도의 메모리 버퍼를 사용하지 않고 디스크-> 네트워크로 바로 전송하고 압축해서 전송되어 온 메시지를 해지와 재압축을 하지 않아도 된다.
메시지 구성 요소
- 키,값,오프셋
- 메시지 크기
- 손상여부 검출 체크섬 코드
- 메시지 형식 버전의 매직 바이트
- 압축 코덱
인덱스
- 카프카는 컨슈머가 특정 오프셋부터 메시지를 읽을 수 있게 해준다
오프셋 100에서 1MB 요청할 경우 브로커가 오프셋 100을 빨리 찾아야 함으로 카프카는 각 파티션의 인덱스를 유지 관리하며 인덱스는 세그먼트 파일과 이 파일의 내부 위치로 오프셋을 연관시킨다.
- 인덱스 또한 세그먼트로 분할되며 메시지가 삭제되면 그것과 연관된 인덱스도 삭제된다.
- 카프카는 인덱스의 체크섬을 유지 관리 하지않기 때문에 인덱스가 손상될 경우 연관된 로그 세그먼트로부터 메시지를 다시 읽고 오프셋과 위치를 다시 생성한다.
압축
- 카프카는 설정된 시간 동안 메시지들을 저장하며 보존 기간이 지난 메시지들을 삭제한다.
ex)
카프카를 사용해서 자신의 현재상태를 저장하는 애플리케이션에서 매번 상태가 변경될 때마다 이 애플리케이션에서는 상태 데이터를 카프카에 쓴다. 이때 문제가 생겨 종료됬다가 복구될 경우 카프카에 저장된 상태 메시지들을 읽어 가장 최근 상태로 복구한다.
- 토픽 보존 정책
- 삭제 보존 정책에서는 보존 기간 이전의 메시지들을 삭제
- 압축 보존 정책에서는 각 키의 가장 최근 값만 토픽에 저장할 수 있다.
- 키와 값을 갖는 메시지를 생성하는 애플리케이션의 토픽에만 적용가능,null키가 포함되면 압축 X
압축 처리 방법
- 클린
- 이전에 압축되었던 메시지가 존재하고 각 키에 대해 하나의 값만 포함하며, 이전에 압축할 당시 가장 최근 값이다.
- 더티
- 직접 압축 이후에 추가로 쓴 메시지들이 저장된 부분이다.
압축 활성화가 되려면 각 브로커는 하나의 압축 매니저 스레드와 여러개의 매니저 스레드를 시작시키며 전체 파티션 크기보다 더티 메시지 비율이 가장 큰 파티션을 선택,압축한다.

삭제된 메시지
각 키의 가장 최근 메시지가 항상 보존된다면 특정 키를갖는 모든 메시지를 삭제할때는 어떻게 해야 하는가?
ex) 사용자가 더 이상 서비스를 받지 않는다면, 법적으로 모든 기록을 삭제해야 하는 경우
- 가장 최근 메시지조차 남기지 않고 특정 키를 완전히 삭제할 때는 애플리케이션에서 해당 키와 null값을 포함하는 메시지를 카프카에 쓰면 된다.
- 평상시 대로 압축한뒤 해당 키에 대해서는 null값을 갖는 메시지만 남겨둔다.
- 툼스톤이라고 하는 이런 메시지는 설정 기간동안 보존되고 이 기간 동안 메시지를 읽으면 null값으로 삭제됬음을 알 수 있다.
- 이후 카프카에 설정된 시간이 지나면 압축 스레드가 툼스톤 메시지를 삭제하고 해당 키의 메시지는 파티션에서 사라진다.
토픽은 언제 압축되는지?
- 현재 사용중인 액티브 세그먼틀르 삭제하지 않는 삭제 보존 정책과 마찬가지로, 압축 보존 정책에서도 현재 사용중인 세그먼트는 압축하지 않는다.
- 압축은 토픽의 읽기 쓰기 성능에 영향을 주기 때문에 자주하면 좋지 않다.
- 디스크 공간을 차지 하기 때문에 더티 레코드가 토픽의 50%에 해당하는 공간을 차지하면 압축하면 좋다.