Related POST
Producer
- 카프카로 데이터(key, value) 전송
- ProducerRecord객체를 생성
- Java Kafka-client 제공
- 그 외 3rd party language의 경우 아래 링크 참고
https://cwiki.apache.org/confluence/display/KAFKA/Clients - 어떤 데이터를 보내나요?
- Web, Application 클릭로그
- 공유 자전거/자동차의 위치(GPS)정보
- 스마트팩토리 공장의 머신 센서정보
- 상호 통신하기 위한 application
- 기타 …
1 | plugins { |
1 | package com.tacademy; |
Producer options
- 필수옵션 - 반드시 입력
bootstrap.servers
: 카프카 클러스터에 연결하기 위한 브로커 목록key.serializer
: 메시지 키 직렬화에 사용되는 클래스value.serializer
: 메시지 값을 직렬화 하는데 사용되는 클래스- 선택옵션 - default값 존재하니 확인 필요
acks
: 레코드 전송 신뢰도 조절(리플리카)comression.type
: snappy, gzip, lz4 중 하나로 압축하여 전송retries
: 클러스터 장애에 대응하여 메시지 전송을 재시도하는 회수buffer.memory
: 브로커에 전송될 메시지의 버퍼로 사용 될 메모리 양batch.size
: 여러 데이터를 함께 보내기 위한 레코드 크기linger.ms
: 현재의 배치를 전송하기 전까지 기다리는 시간client.id
: 어떤 클라이언트인지 구분하는 식별자
출처: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/
그냥 실행하면 안됨
- why? 브릿지 네트워크를 만들었기때문에 접근이 안됨
- 어떻게 할까 생각하다가 docker 컨테이너로 그냥 붙이기로.
- 일단 gradle run이 가능하도록 수정
1 | plugins { |
- gradle 컨테이너에 현재 소스폴더 바인딩
도커로 실행 1
2
3docker run -it --rm -v maven-repo:/root/.m2 -v $(pwd):/app -w /app --network kafka-test-net gradle:alpine gradle run #linux
docker run -it --rm -v maven-repo:/root/.m2 -v "%cd%":/app -w /app --network kafka-test-net gradle:alpine gradle run #win
1 | //SimpleProducer |
1 | ./kafka-console-consumer.sh --bootstrap-server 10.10.20.142:9092 --topic test --property pri |
- 프로퍼티 몇개 추가
- 키 보이게
- 키와 value 사이 separator 보이는 부분
- 안나오는 건 from beginning이 아니니까..
- 다시 프로듀서 실행해보면 찍힌다.
- SimplePrducer 원래 코드를 실행해보면 key부분이 null이 틀린다.
key/value
- key
- 레코드 키: 메시지를 구분하는 구분자 역할
- 특징
- 동일 키, 동일 파티션 적재 by Default partitioner
- 순서를 보장하므로 상태머신(state machine)으로 사용 가능
- 역할에 따른 컨슈머 할당 적용 가능
key=order면 주문처리 컨슈머, payment면 결제처리 컨슈머 어플리케이션
- 동일 키, 동일 파티션 적재 by Default partitioner
- 레코드 값을 정의하는 구분자 (separator)
- value
- 레코드 값: 실질적으로 전달하고 싶은 데이터
- type : 사실상 제한 없음
- 선호 포맷? JSON, TSV, CSV등 서비스 특징에 맞게 권장
- JSON 사용시 key/value형태로 확장성이 뛰어남, 컬럼정보(key)포함
JSON자체가 key/value니까 레코드 키 설정없이 레코드 값만 보내도 될듯
- CSV 사용시 : 콤바 기준으로 데이터 구분, 용량이득
- JSON 사용시 key/value형태로 확장성이 뛰어남, 컬럼정보(key)포함
- 포맷을 관리하는 다른 방법?
- 컨플루언트 스키마 레지스트리 –>
1 | ... |
Producer acks
- acks = 0 : 가장 속도가 빠름, 유실 가능성이 높음
- acks = 1(default) : 속도 보통, 유실 가능성이 있음
- acks = all 또는 -1 : 속도 가장 느림. 메시지 전달 손실 가능성 없음
Producer acks Detail
- acks = 0
- 프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주
- 브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없음
- 팔로워 파티션에도 저장됬는지 알 수 없음
- 전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용
- acks = 1(default)
- 프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 (적어도) 확인
- 팔로워 파티션에 저장됬는지는 모름
- 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실
- acks=0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음
- acks = all 또는 -1
- 프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인
- 리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림
- 복제가 완료되기 까지 기다림으로 인해 속도가 느림
- 유실 가능성이 없지만, 속도가 느림
Consumer
- 데이터를 가져가는(polling) 주체
- commit을 통해 읽은 consumer offset을 카프카에 기록
- Java Kafka-client 제공
- 그 외 3rd party language의 경우 아래 링크 참고
https://cwiki.apache.org/confluence/display/KAFKA/Clients
- 그 외 3rd party language의 경우 아래 링크 참고
- 어디에 데이터를 저장하나요?
- FileSystem(.csv .log .tsv)
- Object Storage(S3, Minio)
- Hadoop(Hdfs, Hive)
- RDBMS(Oracle, Mysql)
- NoSql(MongoDB, CouchDB)
- 기타 다양한 저장소들(Elasticsearch, influxDB)
1 | ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test |
1 | package com.tacademy; |
- 데이터가 없을떄 계속 메세지가 나오는게 짜증이 나면 log 레벨을 info로 올리자
1 | //1개 토픽만 구독 |
Consumer options
- 필수옵션 - 반드시 입력
bootstrap.servers
: 카프카 클러스터에 연결하기 위한 브로커 목록group.id
: 컨슈머 그룹 idkey.deserializer
: 메시지 키 역직렬화에 사용되는 클래스value.deserializer
: 메시지 값을 역직렬화 하는데 사용되는 클래스
- 선택옵션 - default값 존재
enable.auto.commit
: 자동 오프셋 커밋 여부auto.commit.interval.ms
: 자동 오프셋 커밋일 때 interval 시간auto.offset.reset
: 신규 컨슈머그룹일 때 읽을 파티션의 오프셋 위치client.id
: 클라이언트 식별값max.poll.records
: poll() 메서드 호출로 반환되는 레코드의 최대 개수session.timeout.ms
: 컨슈머가 브로커와 연결이 끊기는 시간
Consumer commit
- enable.auto.commit=true
- 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit
- commit 관련 코드를 작성할 필요없음. 편리함.
- 속도가 가장 빠름
- 중복 또는 유실이 발생할 수 있음
- 중복/유실을 허용하지 않는 곳(은행, 카드 등)에서는 사용하면 안됨!!
- 일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용
예시
1 | configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); |
- 프로듀서로 test1,test2,test3 생산
- 오토 커밋 설정한 컨슈머 On , test1~3까지 polling 함
- 여기서 Stop으로(
SIGKILL
)정지하고 재시작하는 경우(graceful shutdown이 아닌 경우)
다시 test1~3이 들어옴 –> 데이터 중복문제
데이터 중복처리로 발생하는 문제
- 커머스의 장바구니 시스템에서 중복 이슈 발생
- 나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김
- 카드사의 결제 시스템에서 중복 이슈 발생
- 편의점에서 아이스크림 결제를 했는데 2번 결재됨
- 택배사 SMS 발송 시스템에서 중복 이슈 발생
- 집에 도착 했다는 SMS가 2번 발송
- 데이터 중복을 막을 수 있는 방법
- 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다
- 불가능함. 서버/애플리케이션은 언젠가 죽을 수 있다. ex. 배포
- 오토 커밋을 사용하지 않는다
- Kafka consumer의 commitSync(), commitAsync() 사용
- 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다
enable.auto.commit=false
- commitSync() : 동기 커밋
- ConsumerRecord 처리 순서를 보장함
- 가장 느림(커밋이 완료될 때 까지 block)
- poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋
- Map<TopicPartition, OffsetAndMetadata> 을 통해 오프셋 지정 커밋 가능
- commitAsync() : 비동기 커밋
- 동기 커밋보다 빠름
- 중복이 발생할 수 있음
� 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때 - ConsumerRecord 처리 순서를 보장하지 못함
� 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한
코드 예시 (enable.auto.commit=false)
1 | while (true) { |
Consumer rebalance
- 리밸런스 - 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상
- 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없음
- 리밸런스 발생시 데이터 유실/중복 발생 가능성 있음
- commitSync() 또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지
- 언제 리밸런스 발생?
- consumer.close() 호출시 또는 consumer의 세션이 끊어졌을 때
- 세션 끊김 예시
1
2configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);- 3초마다 그룹 코디네이터(브로커 중 1대)에게 Heartbeat 전송
- 10초의 세션 타임아웃 시간 안에 Heartbeat가 왔는지 확인
- 만약 Heartbeat가 없으면 해당 컨슈머는 죽은것으로 마킹
- 리밸런스 시작
1 | ... |
리밸런스 리스너는 언제 사용할까?
- 리밸런스 발생에 따른 offset commit
- 리밸런스 시간 측정을 통한 컨슈머 모니터링
- 복구 오래 걸릴 때 해당 로그등을 통해 추후에 시간 추정 가능
Consumer wakeup
- tcp/ip 할때 SIGTERM 받을시 실행되서 리소스(파일,소켓,DB커넥션)닫고 저장하는 cleanUp 함수로 여기면 됨
- 컨슈머 정상적으로 종료하고 싶을 때
- polling 할떄 wakeupException 발생 -> 정상적으로 나올 수 있음
- 예시) [위에서와 같음]
- 중간에 SIGKILL 종료시 마지막 커밋부터 다시 시작하므로 SIGKILL 직전까지 중복처리 됨
1 | Runtime.getRuntime().addShutdownHook(new Thread() { |
1 | try { |
- SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요
- SIGKILL(9)는 프로세스 강제 종료로 커밋 불가–> 중복/유실 발생
Consumer thread 전략
1안) 1 프로세스 + 1 스레드(컨슈머)
- 간략한 코드
- 프로세스 단위 실행/종료
- 다수의 컨슈머 실행 필요시 다수의 프로세스 실행 필요 2안) 1 프로세스 + n 스레드(동일 컨슈머 그룹)
1프로세스 + 1스레드 1
2
3$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers"}
$ java -jar one-process-one-consumer.jar --path consumer.conf - 복잡한 코드
- 스레드 단위 실행/종료
- 스레드간 간섭 주의(세마포어, 데드락 등)
- 다수 컨슈머 실행시 다수 스레드 실행 가능 3안) 1 프로세스 + n 스레드(다수 컨슈머 그룹)
1프로세스 20스레드 1
2
3$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20}
$ java -jar one-process-multiple-consumer.jar --path consumer.conf - 복잡한 코드
- 컨슈머 그룹별 스레드 개수 조절 주의
1
2
3
4
5
6
7$ cat consumer.conf
[
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20},
{"topic":"click_log", "group.id":"elasticsearch-consumers", "consumer.no":1},
{"topic":"application_log", "group.id":"hadoop-consumers", "consumer.no":5}
]
$ java -jar one-process-multiple-consumer-multiple-group.jar --path consumer.conf