본문 바로가기
DataOps/Kafka

[Kafka] 카프카 메시지 저장 방식, 오프셋이 2개씩 증가?

by BenKangKang 2024. 11. 25.

카프카 저장 단위

카프카 메시지는 세그먼트라는 저장 단위로 관리되며 세그먼트 내부에 레코드가 쌓이는 형식입니다.

[Topic]
└── [Partition]
    ├── 00000000000000000000.log    # 활성 세그먼트 (Active Segment)
    ├── 00000000000000000000.index  # 오프셋 인덱스
    ├── 00000000000000000000.timeindex  # 타임스탬프 인덱스
    ├── 00000000000000456789.log    # 이전 세그먼트
    ├── 00000000000000456789.index
    └── 00000000000000456789.timeindex

 

실제로 카프카에 접속해 토픽 데이터를 확인할 수 있습니다. test1이라는 토픽을 만들고 메시지를 발행해봤습니다.

ls /var/lib/kafka/data/

ls /var/lib/kafka/data/test1-0/


실제 토픽 데이터에 접근하면 아래처럼 토픽 세그먼트들이 존재하는 걸 볼 수 있습니다. 파티션은 1개라 test1-0 만 존재합니다.

 

활성 세그먼트

현재 카프카가 메시지를 append 하는 세그먼트를 활성 세그먼트라고 부릅니다.

segment1.log (inactive) - 0~999     ← 리텐션 대상
segment2.log (inactive) - 1000~1999  ← 리텐션 대상
segment3.log (active)   - 2000~      ← 리텐션 제외

 

활성 세그먼트와 리텐션

메시지 리텐션을 retension.ms 로 관리하는데, 이는 활성 세그먼트가 롤링된 후 (비활성화)부터 적용이 되니 관련 속성을 잘 파악해야 합니다.

- retention.ms=1일 설정해도 active 세그먼트가 꽉 차지 않으면 
  1일 이상 데이터가 남을 수 있음
- 즉, 실제 보관 기간 = retention.ms + active 세그먼트 기간

# 세그먼트 파일 최대 크기
log.segment.bytes=1073741824  # 기본값 1GB

# 인덱스 파일 최대 크기
log.index.size.max.bytes=10485760  # 기본값 10MB

# 세그먼트 롤링 시간 간격
log.roll.ms=null  # 지정 안하면 log.roll.hours 사용
log.roll.hours=168  # 기본값 168시간(7일)

# 새 세그먼트로 롤링하기 전 활성 세그먼트의 최대 시간

 

세그먼트 롤링 조건:
IF (현재세그먼트크기 >= log.segment.bytes) OR
   (현재세그먼트시간 >= log.roll.hours) THEN
    세그먼트롤링();

예시:
00000000000000000000.log  # 꽉 차서 닫힘
00000000000002000000.log  # 새로 생성된 활성 세그먼트

 

 

카프카 메시지

메시지 계층

카프카 메시지에는 계층이 있습니다. record batch 단위로 저장되며, 1개 이상의 record 가 batch 에 포함될 수 있습니다. 

Segment
└── Record Batch (메시지 배치)
    └── Record (개별 메시지)

 

카프카 디버깅 툴을 이용해 실제 세그먼트를 들여다 볼 수 있습니다.

/bin/kafka-dump-log --files /var/lib/kafka/data/test1-0/00000000000000000000.log

# 출력 예시
baseOffset: 0 lastOffset: 2 count: 3 ... # Record Batch 정보
 | offset: 0 ... key: ... value: ...     # Record 1
 | offset: 1 ... key: ... value: ...     # Record 2
 | offset: 2 ... key: ... value: ...     # Record 3

 

Record Batch 형식

메시지는 항상 아래와 같은 record batch 형식으로 저장이 됩니다. 배치에는 1개 이상의 records 를 담을 수 있습니다.

baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
        4: zstd
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
    bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]

 

트랜잭션 정보, 압축 정보 등이 모두 포함됩니다.

 

컨트롤 배치와 트랜잭션

카프카에서는 Transaction Producer 를 이용해 트랜잭션 구현이 가능합니다. 그 방법으로 컨트롤 배치를 이용합니다.

 

컨트롤 레코드 스키마

version: int16 (current version is 0)
type: int16 (0 indicates an abort marker, 1 indicates a commit)

 

트랜잭션 방식

트랜잭션 결과가 나오기 전, 메시지는 기본적으로 컨슈머에서 컨슘이 불가능하며, (isolation level 을 read_uncommited 로 설정하면 확인 가능) 컨트롤 배치가 확인되었을 때, commit 이 기록되어 있디면 메시지를 읽는 방식입니다.

1. Transaction Coordinator에 BEGIN_TXN 요청
2. Partition에 데이터 메시지 작성 (uncommitted)
3. Transaction Coordinator에 COMMIT/ABORT 요청
4. Transaction Marker(Control Batch) 작성

 

- LSO(Last Stable Offset)를 사용
- LSO 이전의 메시지만 consumer에게 전달
- Control Batch를 만나면 해당 트랜잭션의 상태 확인
- COMMIT이면 관련 메시지들을 consumer에게 전달
- ABORT면 관련 메시지들을 무시

 

이로인하여 Transaction Producer 를 사용할 경우 Offset 이 2씩 증가하게 됩니다.

 

참고

https://kafka.apache.org/documentation/#controlbatch

 

https://stackoverflow.com/questions/56182606/in-kafka-when-producing-message-with-transactional-consumer-offset-doubled-up

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 



'DataOps > Kafka' 카테고리의 다른 글

[Kafka] Log Compaction, Message Compression  (0) 2024.07.30
[Kafka] 트랜잭션, exactly-once  (0) 2024.06.24
[Kafka] KSQL  (0) 2024.06.05
[Kafka] Kafka Stream  (0) 2024.06.05
[Kafka] MirrorMaker2  (0) 2024.06.05

댓글