카프카 입문 : 2. 실습

Related POST

사실 도커로 설치했기때문에 그냥 눈으로만 학습

  • 카프카는 1GB heap이 디폴트. aws 프리티어로는 부족하므로 힙을 400으로 줄이는 부분.
  • config 바꾸는 방법은 docker-compose에서 ENV로 주어져서 바꿔짐
  • 카프카(2.5기준)는 zookeeper가 반드시 필수
  • zookeeper standalone 모드에서는 replication이 없음

EC2,Kafka 설정

Kafka 실행 최소 Heap size 설정 제거
1
$ export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
  • Kafka 2.5.0은 1G의 Heap memory가 default
  • 테스트용 ec2인 t2.micro에 실행하기 위해 heap size 환경변수 선언
Kafka 실행 최소 Heap size 설정 제거
1
2
3
4
export KAFKA_HEAP_OPTS="
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 "
  • 링크드인에서 테스트한 최적의 Java option 추천값
  • 60대 브로커, 5만개 파티션, Replication-factor 2로 구성시
  • 300MB/sec inbound, 1GB/sec outbound 보장
  • 환산 : 1TB/hour inbound, 3TB/hour inbound

PRACTICE - config/server.properties

  • broker.id : 정수로 된 브로커 번호. 클러스터 내 고유번호로 지정
  • listeners : kafka 통신에 사용되는 host:port
  • advertised.listeners : Kafka client가 접속할 host:port
  • log.dirs : 메시지를 저장할 디스크 디렉토리. 세그먼트가 저장됨
  • log.segment.bytes : 메시지가 저장되는 파일의 크기 단위
  • log.retention.ms : 메시지를 얼마나 보존할지 지정. 닫힌 세그먼트를 처리
    SK planet의 경우 3일로 지정
  • zookeeper.connect : 브로커의 메타데이터를 저장하는 주키퍼의 위치
  • auto.create.topics.enable : 자동으로 토픽이 생성여부 (Producer가 데이터 넣을 시)
  • num.partitions : 자동생성된 토픽의 default partition 개수
  • message.max.bytes : kafka broker에 쓰려는 메시지 최대 크기

Zookeeper 실행, Kafka 실행

Zookeeper 실행
1
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Kafka 실행
1
$ bin/kafka-server-start.sh -daemon config/server.properties
  • Zookeeper는 카프카의 메타데이터(브로커id, 컨트롤러id 등) 저장
  • 카프카(~2.5.0까지)는 반드시 주키퍼가 필요함
  • 주키퍼 디펜던시를 제거하는 작업이 KIP-500을 통해 진행 중
  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  • JPS 명령어를 통해 카프카, 주키퍼가 정상적으로 Running중인지 확인 가능

Kafka shell scripts

  • kafka-topics.sh
    • 토픽 생성, 조회, 수정 등 역할
  • kafka-console-consumer.sh
    • 토픽의 레코드 즉시 조회
  • kafka-console-producer.sh
    • 토픽의 레코드를 전달(String)
  • kafka-consumer-groups.sh
    • 컨슈머그룹 조회, 컨슈머 오프셋 확인, 수정
  • 위 4개를 포함하여 33개의 Kafka shell script가 제공됨

카프카 토픽 생성

aws 예제에서 카프카 토픽 생성
1
2
3
 ./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test
#결과
Created topic test. # 토픽이 생성되었다는 말이 나옴

도커 예제를 사용시엔 컨테이너에 들어가서 해야한다

카프카 컨테이너 직접 들어가기
1
docker exec -it kafka ash
docker예제에서는 docker-compose에 설정된 kafka 서버 ip를 넣는다
1
2
# pwd: /opt/kafka_2.13-2.6.0/bin
./kafka-topics.sh --create --bootstrap-server 10.10.20.142:9092 --replication-factor 1 --partitions 3 --topic test
데이터 넣어보기
1
2
3
4
5
./kafka-console-producer.sh --bootstrap-server 10.10.20.142:9092 --topic test
>hello # 이런식으로 데이터를 넣을 수 있음
>kafka
>hello
>world
다른 터미널에서 polling해보자
1
2
3
4
5
./kafka-console-consumer.sh --bootstrap-server 10.10.20.142:9092 --topic test --from-beginning
hello # 이런식으로 데이터를 넣을 수 있음
kafka
hello
world
  • --from-beginning : 첫 offset부터 가져오라는 뜻
Consumer Group 테스트
1
2
3
4
5
6
7
8
./kafka-console-consumer.sh --bootstrap-server 10.10.20.142:9092 --topic test -group testgroup --from-beginning
world
hello world
hello
hello
hello kafka
kafka
send?
  • producer가 보낸 것과 Consumer 그룹이 받은 것과 순서가 다른 이유?
    • 파티션 3개에 데이터가 들어가 있는 상태에서 순서 없이 가져가므로
    • 파티션 1개 사용하면 queue처럼 순서가 같게 된다.
  • Consumer 그룹 종료 -> Producer로 더 생산 –> 다시 위의 명령어 실행 하면?
    • 해당 그룹이 어디까지 받았는지 기록이 되어 있기 때문에
      그룹 종료 이후 생산되는 것부터 받아오게 된다
그룹 확인
1
2
3
4
5
6
7
8
9
10
11
./kafka-consumer-groups.sh --bootstrap-server 10.10.20.142:9092 --list # 그룹 리스트
testgroup # 그룹 출력

./kafka-consumer-groups.sh --bootstrap-server 10.10.20.142:9092 --group testgroup --describe # 특정 그룹 상세

Consumer group 'testgroup' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 1 2 2 0 - - -
testgroup test 0 6 6 0 - - -
testgroup test 2 4 4 0 - - -
  • PARTITION : 0~2까지 파티션 3개를 확인 가능
  • CURRENT-OFFSET : 컨슈퍼 오프셋 (BY testgroup)
  • LOG-END-OFFSET : 현재 토픽의 마지막 오프셋
  • LAG : 컨슈머 랙
    • 두 오프셋의 차이
    • 컨슈머가 안가져가고 있는 양의 크기
    • 크면 실시간으로 빨리 처리를 못하고 있다는 반증
offset reset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
./kafka-consumer-groups.sh --bootstrap-server 10.10.20.142:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute

GROUP TOPIC PARTITION NEW-OFFSET
testgroup test 0 0 # 0 초기화
testgroup test 1 0 # 0 초기화
testgroup test 2 0 # 0 초기화


./kafka-consumer-groups.sh --bootstrap-server 10.10.20.142:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 1 0 2 2 - - -
testgroup test 0 0 7 7 - - -
testgroup test 2 0 5 5 - - -
# Current 오프셋 초기화 부분 확인 과 그에 따른 랙 증가 확인 가능

# 특정 파티션만 변경
# 1번 파티션만 10으로 바꾸기
./kafka-consumer-groups.sh --bootstrap-server 10.10.20.142:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 10 --execute
공유하기