카프카 저장 단위
카프카 메시지는 세그먼트라는 저장 단위로 관리되며 세그먼트 내부에 레코드가 쌓이는 형식입니다.
[Topic]
└── [Partition]
├── 00000000000000000000.log # 활성 세그먼트 (Active Segment)
├── 00000000000000000000.index # 오프셋 인덱스
├── 00000000000000000000.timeindex # 타임스탬프 인덱스
├── 00000000000000456789.log # 이전 세그먼트
├── 00000000000000456789.index
└── 00000000000000456789.timeindex
실제로 카프카에 접속해 토픽 데이터를 확인할 수 있습니다. test1이라는 토픽을 만들고 메시지를 발행해봤습니다.
ls /var/lib/kafka/data/
ls /var/lib/kafka/data/test1-0/
실제 토픽 데이터에 접근하면 아래처럼 토픽 세그먼트들이 존재하는 걸 볼 수 있습니다. 파티션은 1개라 test1-0 만 존재합니다.
활성 세그먼트
현재 카프카가 메시지를 append 하는 세그먼트를 활성 세그먼트라고 부릅니다.
segment1.log (inactive) - 0~999 ← 리텐션 대상
segment2.log (inactive) - 1000~1999 ← 리텐션 대상
segment3.log (active) - 2000~ ← 리텐션 제외
활성 세그먼트와 리텐션
메시지 리텐션을 retension.ms 로 관리하는데, 이는 활성 세그먼트가 롤링된 후 (비활성화)부터 적용이 되니 관련 속성을 잘 파악해야 합니다.
- retention.ms=1일 설정해도 active 세그먼트가 꽉 차지 않으면
1일 이상 데이터가 남을 수 있음
- 즉, 실제 보관 기간 = retention.ms + active 세그먼트 기간
# 세그먼트 파일 최대 크기
log.segment.bytes=1073741824 # 기본값 1GB
# 인덱스 파일 최대 크기
log.index.size.max.bytes=10485760 # 기본값 10MB
# 세그먼트 롤링 시간 간격
log.roll.ms=null # 지정 안하면 log.roll.hours 사용
log.roll.hours=168 # 기본값 168시간(7일)
# 새 세그먼트로 롤링하기 전 활성 세그먼트의 최대 시간
세그먼트 롤링 조건:
IF (현재세그먼트크기 >= log.segment.bytes) OR
(현재세그먼트시간 >= log.roll.hours) THEN
세그먼트롤링();
예시:
00000000000000000000.log # 꽉 차서 닫힘
00000000000002000000.log # 새로 생성된 활성 세그먼트
카프카 메시지
메시지 계층
카프카 메시지에는 계층이 있습니다. record batch 단위로 저장되며, 1개 이상의 record 가 batch 에 포함될 수 있습니다.
Segment
└── Record Batch (메시지 배치)
└── Record (개별 메시지)
카프카 디버깅 툴을 이용해 실제 세그먼트를 들여다 볼 수 있습니다.
/bin/kafka-dump-log --files /var/lib/kafka/data/test1-0/00000000000000000000.log
# 출력 예시
baseOffset: 0 lastOffset: 2 count: 3 ... # Record Batch 정보
| offset: 0 ... key: ... value: ... # Record 1
| offset: 1 ... key: ... value: ... # Record 2
| offset: 2 ... key: ... value: ... # Record 3
Record Batch 형식
메시지는 항상 아래와 같은 record batch 형식으로 저장이 됩니다. 배치에는 1개 이상의 records 를 담을 수 있습니다.
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
트랜잭션 정보, 압축 정보 등이 모두 포함됩니다.
컨트롤 배치와 트랜잭션
카프카에서는 Transaction Producer 를 이용해 트랜잭션 구현이 가능합니다. 그 방법으로 컨트롤 배치를 이용합니다.
컨트롤 레코드 스키마
version: int16 (current version is 0)
type: int16 (0 indicates an abort marker, 1 indicates a commit)
트랜잭션 방식
트랜잭션 결과가 나오기 전, 메시지는 기본적으로 컨슈머에서 컨슘이 불가능하며, (isolation level 을 read_uncommited 로 설정하면 확인 가능) 컨트롤 배치가 확인되었을 때, commit 이 기록되어 있디면 메시지를 읽는 방식입니다.
1. Transaction Coordinator에 BEGIN_TXN 요청
2. Partition에 데이터 메시지 작성 (uncommitted)
3. Transaction Coordinator에 COMMIT/ABORT 요청
4. Transaction Marker(Control Batch) 작성
- LSO(Last Stable Offset)를 사용
- LSO 이전의 메시지만 consumer에게 전달
- Control Batch를 만나면 해당 트랜잭션의 상태 확인
- COMMIT이면 관련 메시지들을 consumer에게 전달
- ABORT면 관련 메시지들을 무시
이로인하여 Transaction Producer 를 사용할 경우 Offset 이 2씩 증가하게 됩니다.
참고
https://kafka.apache.org/documentation/#controlbatch
'DataOps > Kafka' 카테고리의 다른 글
[Kafka] Log Compaction, Message Compression (0) | 2024.07.30 |
---|---|
[Kafka] 트랜잭션, exactly-once (0) | 2024.06.24 |
[Kafka] KSQL (0) | 2024.06.05 |
[Kafka] Kafka Stream (0) | 2024.06.05 |
[Kafka] MirrorMaker2 (0) | 2024.06.05 |
댓글