카프카 입문 : 1. 기본 개념 및 생태계

Related POST

follow한 강좌:

참조한 글 :

  • 위 강좌는 aws 위에서 실습하게 되어있습니다
  • 다른 방법이 없을까 해서 찾아보니 좋은 글이 있었습니다.
  • 위 아티클을 참조하면 클라우드 대신 로컬에서 테스트 환경을 구성할 수 있다
  • 다만 해당 아티클처럼 굳이 따로 docker create network를 하지말고 다음처럼
    docker-compose를 사용해서 한번에 네트워크를 가시는게 좋을 듯합니다.
networks 구성까지 포함한 docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 version: '3'

services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "28080:2181"
networks:
dockernet:
ipv4_address: 10.10.20.141
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.10.20.142
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
dockernet:
ipv4_address: 10.10.20.142

networks:
dockernet:
name: kafka-test-net
driver: bridge
ipam:
config:
- subnet: 10.10.20.0/24
gateway: 10.10.20.1

Before Kafka

  • end to end 연결 방식의 아키텍처
  • 데이터 연동의 복잡성 증가(HW, OS, 장애 등)
  • 각기 다른 데이터 파이프라인 연결 구조
  • 확장에 엄청난 노력 필요
  • LinkedIn도 겪은 문제
LinkedIn이 원하는 것은?
  • 모든 시스템으로 데이터를 전송 실시간 처리 가능할 것
  • 데이터가 갑자기 많아져도 확장이 용이한 시스템 필요
  • LinkedIn 자체 관련 팀에서 카프카 개발

After Kafka

  • 프로듀서/컨슈머 분리
  • 메시지 데이터를 여러 컨슈머에게 허용
  • 높은 처리량을 위한 메시지 최적화
  • 스케일 아웃 가능
  • 관련 생태계 제공

Kafka broker

  • 실행된 카프카 애플리케이션 서버 중 1대
  • 3대 이상의 브로커로 클러스터 구성
  • 주키퍼와 연동(~2.5.0버젼)
  • 주키퍼의 역할 : 메타데이터(브로커id, 컨트롤러id 등) 저장
  • n개 브로커 중 1대는 컨트롤러(Controller)기능 수행
  • 컨트롤러 :
    • 각 브로커에게 담당파티션 할당 수행
    • 브로커 정상 동작 모니터링 관리
    • 누가 컨트롤러 인지는 주키퍼에 저장

Record

Producer 부분
1
new ProducerRecord<String,String>("topic","key","message");
Consumer 부분
1
2
3
4
ConsumerRecords<String,String> records = consumer.poll(1000);
for (ConsumerRecord<String,String> record : records) {
...
{
  • 객체를 프로듀서에서 컨슈머로 전달하기 위해 Kafka 내부에
    byte형태로 저장할 수 있도록 직렬화/역직렬화하여 사용.
  • 기본 제공 직렬화 class : StringSerializer, ShortSerializer 등
  • 커스텀 직렬화 class를 통해 Custom Object 직렬화/역직렬화 가능
  • SK Planet에서는 key는 null, Value는 JSON으로 된 자체 형식 사용

Topic & Partition

  • 메시지 분류 단위
  • n개의 파티션 할당 가능
  • 각 파티션마다 고유한 오프셋(offset)을 가짐.
  • 메시지 처리순서는 파티션 별로 유지 관리됨

Producer & Consumer

  • 프로듀서는 레코드를 생성하여 브로커로 전송
  • 전송된 레코드는 파티션에 신규 오프셋과 함께 기록됨
  • 컨슈머는 브로커로 부터 레코드를 요청하여 가져감(polling)
    절대 브로커가 push하는게 아님에 주의!
  • 그림처럼 컨슈머 B가 offet11을 가져갔다는 것은 앞에 것도 가져갔다는 이야기임

Kafka log and segment

  • 실제로 메시지가 저장되는 파일시스템 단위
  • 메시지가 저장될때는 세그먼트파일이 열려있음
  • 세그먼트는 시간 또는 크기 기준으로 닫힘
  • 세그먼트가 닫힌 이후 일정 시간(또는 용량)에 따라
    삭제(delete) 또는 압축(compact)
    1
    2
    3
    4
    -rw-rw-r-- 1 ec2-user ec2-user 10485760 6월 5 03:54 00000000000000000000.index
    -rw-rw-r-- 1 ec2-user ec2-user 0 6월 5 03:54 00000000000000000000.log
    -rw-rw-r-- 1 ec2-user ec2-user 10485756 6월 5 03:54 00000000000000000000.timeindex
    -rw-rw-r-- 1 ec2-user ec2-user 8 6월 5 03:54 leader-epoch-checkpoint

파티션 과 컨슈머 와 관계

파티션3짜리 토픽에 1개 프로듀서가 레코드를 보내고 있을때

  • 컨슈머가 1개면 3개의 파티션으로부터 polling한다
  • 컨슈머가 3개면 3개의 컨슈머로 이루어진 1개의 컨슈머 그룹이
    토픽으로부터 polling (각 컨슈퍼가 각 파티션과 1:1로 매칭)
    각컨슈머는 각각의 작업(db저장, 파일저장, 로그기록등)을 하므로
    각 컨슈머 스레드가 병렬처리 되서 퍼포먼스 증가
  • 컨슈머가 4개가 되면 남은 컨슈머는 파티션 할당을 받지 못하고 대기
    따라서 가능한 파티션 개수 >= 컨슈머 개수
    [4개의 컨슈머가 같은 컨슈머 그룹에 있다는 가정하에 ]
  • 컨슈머 3개 운영 중1개 장애시 리밸런스 발생:
    파티션 컨슈머 할당이 재조정되서 남은 컨슈머가 2개를 파티션을 할당받음
  • 각각의 목적을 위해 2개이상의 컨슈머 그룹을 운영하는 경우
    각 그룹은 다른 그룹과 영향없이 할당 받는다
    SK Planet의 경우 장애 대응하기 위해 재입수(or재처리)목적으로
    임시 신규 컨슈머 그룹을 생성해서 사용하기도 한다고 한다
    즉, 중간에 어떤 데이터가 재처리되어야 한다고 하면 현재 이미 스트리밍
    라이브 중인 라이브그룹을 남겨두고 새로운 컨슈머 그룹을 만들어서
    중간부더 재처리한다는 것
    (각 컨슈머그룹은 서로 영향이 없음)

카프카 토픽 생성

파티션 3개짜리 토픽 생성
1
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_name --partitions 3
  • kafka-topics.sh : 토픽을 생성할 수 있는 쉘 스크립트

  • --bootstrap-server : 서버지정

  • --create: topic을 만들겠다는 뜻

  • --partitions 3: 파티션 3개

  • 이렇게 만들면 브로커가 3개일 경우
    브로커 1개당 파티션 하나로 균등하게 만들어진다
    이 경우 broker #1이 장애가 나면 복구 될때까지 파티션 #1은 사용불가

  • 이 이슈를 해결하는 방법은 파티션을 다른 Broker에 replication
    broker #1 에 문제가 발생하면 다른 브로커에 복제된 데이터를 사용
    Sk Planet의 경우 --replicator-factor 3을 기본 설정
    보통 2,3을 하는 이유는 복제 하는 것도 프로세싱이므로 ISR을 맞추는
    데 부하가 많이 걸리기 때문

  • 원본 파티션을 리더 파티션, 복제된 파티션은 팔로워 파티션이라고 부른다
    리더 파티션 동작이 불가능하면 나머지 팔로워중 1개가 리더로 선출된다

  • 특정 파티션의 리더, 팔로워가 레코드가 모두 복제되서 sync가 맞는 경우
    ISR(In-Sync Replica)이라고 한다

  • ISR이 아닌 상태에서 장애가 나면?

    • unclean.leader.election.enable
    • 기본은 false
    • false면 ISR이 될때까지(장애 브로커가 복구되서 ISR 될때까지) 대기
    • true면 다 복제가 안된 부분이 유실되더라도 우선적 처리를 중요시

    Kafka rack-awareness

    • 1개의 Rack에 다수의 브로커를 몰아 넣는 것은 위험
      rack파워가 나가면 그 서버 다나가므로.
    • 다수의 Rack에 분산하여 브로커 옵션(broker.rack) 설정 및 배치\
    • 파티션 할당 및 레플리케이션 동작시 특정 브로커에 몰리는 현상 방지

왜 카프카 클러스터는 서버 장애에 대응한 로직이 많나요?

  • 서비스운영에 있어 장애 허용(Fault-tolerant)은 아주 중요
  • 서버의 중단(이슈발생, 재시작) 언제든 발생할 수 있음.
  • Example)
    30대 브로커로 이루어진 카프카 클러스터가 있을때,
    1대의 서버가 365일 중 1일 중단이 발생할 가능성이 있다고 가정하면
    12.1일(약 2주)에 한번씩 브로커 이슈 발생!!
  • 일부 서버가 중단되더라도 데이터가 유실되면 안됨
  • 안정성이 보장되지 않으면 신뢰도가 하락(사용 중단)

Kafka의 핵심요소 중간정리

  • Broker : 카프카 애플리케이션 서버 단위
  • Topic : 데이터 분리 단위. 다수 파티션 보유
  • Partition : 레코드를 담고 있음. 컨슈머 요청시 레코드 전달
  • Offset : 각 레코드당 파티션에 할당된 고유 번호
  • Consumer : 레코드를 polling하는 애플리케이션
  • Consumer group : 다수 컨슈머 묶음
  • Consumer offset : 특정 컨슈머가 가져간 레코드의 번호
  • Producer : 레코드를 브로커로 전송하는 애플리케이션
  • Replication : 파티션 복제 기능
  • ISR : 리더+팔로워 파티션의 sync가 된 묶음
  • Rack-awareness : Server rack 이슈에 대응

Kafka Client

Kafka Streams

  • 데이터를 변환(Transformation)하기 위한 목적으로 사용하는 API
  • 스트림 프로세싱을 지원하기 위한 다양한 기능을 제공
  • Stateful 또는 Stateless 와 같이 상태기반 스트림 처리 가능
  • Stream api와 DSL(Domain Specific Language)를 동시 지원
  • Exactly-once 처리, 고 가용성 특징
  • Kafka security(acl, sasl 등) 완벽 지원
  • 스트림 처리를 위한 별도 클러스터(ex. yarn 등) 불필요
  • LINE등의 기업에서 사용

Kafka Connect

  • 많은 경우 Kafka client로 Kafka로 데이터를 넣는 코드를 작성할때도 있지만,
    Kafka connect를 통해 data를 Import/Export 할 수 있음.
  • 즉 이 자체가 이미 데이터 import/export하는 애플리케이션으로 볼 수 있음
  • 코드 없이 configuration으로 데이터를 이동시키는 것이 목적
    • Standalone mode, distribution mode 지원
    • REST api interface를 통해 제어
    • Stream 또는 Batch 형태로 데이터 전송 가능
    • 커스텀 connector을 통한 다양한 plugin 제공(File, S3, Hive, Mysql etc…)

Kafka Mirror maker

  • 특정 카프카 클러스터에서 다른 카프카 클러스터로
    Topic 및 Record를 복제하는 Standalone tool
  • 2019년 11월, 기존 MirrorMaker를 개선한 MirrorMaker2.0 release.
  • 클러스터간 토픽에 대한 모든 것을 복제하는 것이 목적
  • 신규 토픽, 파티션 감지기능 및 토픽 설정 자동 Sync기능
  • 양방향 클러스터 토픽 복제
  • 미러링 모니터링을 위한 다양한 metric(latency, count 등) 제공

그 외 Kafka 생태계를 지탱하는 application들

  • confluent/ksqlDB : sql구문을 통한 stream data processing 지원
    이거 정말 재미있다고 하다고 하니 나중에 따로 실습해보자
  • confluent/Schema Registry : avro기반의 스키마 저장소
  • confluent/REST Proxy : REST api를 통한 consumer/producer
  • linkedin/Kafka burrow : consumer lag 수집 및 분석
  • yahoo/CMAK : 카프카 클러스터 매니저
  • uber/uReplicator : 카프카 클러스터 간 토픽 복제(전달)
  • Spark stream : 다양한 소스(카프카 포함)로 부터 실시간 데이터 처리

Kafka @SKPlanet

  • 3 Cluster
  • 20+ brokers
  • 3TB per day
  • 100+ Topics
  • 10+ Consumers
  • 2TB HDFS files write per day
공유하기