카프카 입문 : 3. 카프카 Producer 어플리케이션

Related POST

Producer

  • 카프카로 데이터(key, value) 전송
  • ProducerRecord객체를 생성
  • Java Kafka-client 제공
  • 그 외 3rd party language의 경우 아래 링크 참고
    https://cwiki.apache.org/confluence/display/KAFKA/Clients
  • 어떤 데이터를 보내나요?
  • Web, Application 클릭로그
  • 공유 자전거/자동차의 위치(GPS)정보
  • 스마트팩토리 공장의 머신 센서정보
  • 상호 통신하기 위한 application
  • 기타 …
Producer 어플리케이션 build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
plugins {
id 'java'
}

group 'com.tacademy'
version '1.0'

repositories {
mavenCentral()
}

dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
// broker와 client 버전을 가급적 맞추자
// 1.0 이하에서는 호환성 문제가 크니 가급적 피하자
}
Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleProducer {
private static String TOPIC_NAME = "test"; //Topic 명
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092"; //카프카 클러스터 주소

public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}

Producer options

  • 필수옵션 - 반드시 입력
    • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
    • key.serializer : 메시지 키 직렬화에 사용되는 클래스
    • value.serializer : 메시지 값을 직렬화 하는데 사용되는 클래스
    • 선택옵션 - default값 존재하니 확인 필요
    • acks : 레코드 전송 신뢰도 조절(리플리카)
    • comression.type : snappy, gzip, lz4 중 하나로 압축하여 전송
    • retries : 클러스터 장애에 대응하여 메시지 전송을 재시도하는 회수
    • buffer.memory : 브로커에 전송될 메시지의 버퍼로 사용 될 메모리 양
    • batch.size : 여러 데이터를 함께 보내기 위한 레코드 크기
    • linger.ms : 현재의 배치를 전송하기 전까지 기다리는 시간
    • client.id : 어떤 클라이언트인지 구분하는 식별자

출처: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/

그냥 실행하면 안됨

  • why? 브릿지 네트워크를 만들었기때문에 접근이 안됨
  • 어떻게 할까 생각하다가 docker 컨테이너로 그냥 붙이기로.
  • 일단 gradle run이 가능하도록 수정
수정된 build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
plugins {
id 'java'
id 'application' // gradle run 가능하도록

}
application {
mainClassName = 'com.tacademy.SimpleProducer' // 메인 메소드 클래스
}

group 'com.tacademy'
version '1.0'

repositories {
mavenCentral()
}

dependencies {
testCompile 'junit:junit:4.12'
compile 'org.apache.kafka:kafka-clients:2.5.0'
compile 'ch.qos.logback:logback-classic:1.2.3' // slf4j 구현 에러 때문에 추가
compile 'ch.qos.logback:logback-core:1.2.3'
compile 'org.slf4j:slf4j-api:1.7.30'
}
  • gradle 컨테이너에 현재 소스폴더 바인딩
    도커로 실행
    1
    2
    3
    docker run -it --rm -v maven-repo:/root/.m2 -v $(pwd):/app -w /app --network kafka-test-net gradle:alpine gradle run #linux
    docker run -it --rm -v maven-repo:/root/.m2 -v "%cd%":/app -w /app --network kafka-test-net gradle:alpine gradle run #win

Produce with key/value records
1
2
3
4
5
6
//SimpleProducer
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
// Produce with key/value records
// key: Integer.toString(index)
// value: data
ProducerRecord<String, String> record = new ProducerRecord<>((TOPIC_NAME, Integer.toString(index), data);
1
2
3
4
5
6
7
8
9
10
11
12
./kafka-console-consumer.sh --bootstrap-server 10.10.20.142:9092 --topic test --property pri
nt.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
6-This is record 6
7-This is record 7
8-This is record 8
9-This is record 9
  • 프로퍼티 몇개 추가
    • 키 보이게
    • 키와 value 사이 separator 보이는 부분
    • 안나오는 건 from beginning이 아니니까..
    • 다시 프로듀서 실행해보면 찍힌다.
    • SimplePrducer 원래 코드를 실행해보면 key부분이 null이 틀린다.

key/value

  • key
    • 레코드 키: 메시지를 구분하는 구분자 역할
    • 특징
      • 동일 키, 동일 파티션 적재 by Default partitioner
        • 순서를 보장하므로 상태머신(state machine)으로 사용 가능
        • 역할에 따른 컨슈머 할당 적용 가능
          key=order면 주문처리 컨슈머, payment면 결제처리 컨슈머 어플리케이션
    • 레코드 값을 정의하는 구분자 (separator)
  • value
    • 레코드 값: 실질적으로 전달하고 싶은 데이터
    • type : 사실상 제한 없음
    • 선호 포맷? JSON, TSV, CSV등 서비스 특징에 맞게 권장
      • JSON 사용시 key/value형태로 확장성이 뛰어남, 컬럼정보(key)포함

        JSON자체가 key/value니까 레코드 키 설정없이 레코드 값만 보내도 될듯

      • CSV 사용시 : 콤바 기준으로 데이터 구분, 용량이득
    • 포맷을 관리하는 다른 방법?
      • 컨플루언트 스키마 레지스트리 –> confluentinc/schema-registry
파티션 직접 지정해서 넣기
1
2
3
4
5
...
private static int PARTITION_NUMBER = 1;
...
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
// 이렇게 되는 경우 순서가 보장된다

Producer acks

  • acks = 0 : 가장 속도가 빠름, 유실 가능성이 높음
  • acks = 1(default) : 속도 보통, 유실 가능성이 있음
  • acks = all 또는 -1 : 속도 가장 느림. 메시지 전달 손실 가능성 없음

Producer acks Detail

  • acks = 0
    • 프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주
    • 브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없음
    • 팔로워 파티션에도 저장됬는지 알 수 없음
    • 전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용
  • acks = 1(default)
    • 프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 (적어도) 확인
    • 팔로워 파티션에 저장됬는지는 모름
    • 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실
    • acks=0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음
  • acks = all 또는 -1
    • 프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인
    • 리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림
    • 복제가 완료되기 까지 기다림으로 인해 속도가 느림
    • 유실 가능성이 없지만, 속도가 느림

Consumer

  • 데이터를 가져가는(polling) 주체
  • commit을 통해 읽은 consumer offset을 카프카에 기록
  • Java Kafka-client 제공
  • 어디에 데이터를 저장하나요?
    • FileSystem(.csv .log .tsv)
    • Object Storage(S3, Minio)
    • Hadoop(Hdfs, Hive)
    • RDBMS(Oracle, Mysql)
    • NoSql(MongoDB, CouchDB)
    • 기타 다양한 저장소들(Elasticsearch, influxDB)
컨슈머 테스트를 위한 프로듀서 켜기
1
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
SimpleConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "10.10.20.142:9092";

public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
  • 데이터가 없을떄 계속 메세지가 나오는게 짜증이 나면 log 레벨을 info로 올리자
Consumer subscribe
1
2
3
4
5
6
7
//1개 토픽만 구독
consumer.subscribe(Arrays.asList("click_log"));
//n개 토픽 구독 : 정규 표현식(regex)를 통한 토픽 구독
consumer.subscribe(Pattern.compile("dev.*"));

//특정 토픽의 파티션 구독 : 키포함 레코드를 컨슘 할 때 특정 파티션 할당
consumer.assign(Collections.singleton(new TopicPartition("web_log", 1)));

Consumer options

  • 필수옵션 - 반드시 입력
    • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
    • group.id : 컨슈머 그룹 id
    • key.deserializer : 메시지 키 역직렬화에 사용되는 클래스
    • value.deserializer : 메시지 값을 역직렬화 하는데 사용되는 클래스
  • 선택옵션 - default값 존재
    • enable.auto.commit : 자동 오프셋 커밋 여부
    • auto.commit.interval.ms : 자동 오프셋 커밋일 때 interval 시간
    • auto.offset.reset : 신규 컨슈머그룹일 때 읽을 파티션의 오프셋 위치
    • client.id : 클라이언트 식별값
    • max.poll.records : poll() 메서드 호출로 반환되는 레코드의 최대 개수
    • session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 시간

Consumer commit

  • enable.auto.commit=true
  • 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit
  • commit 관련 코드를 작성할 필요없음. 편리함.
  • 속도가 가장 빠름
  • 중복 또는 유실이 발생할 수 있음
    • 중복/유실을 허용하지 않는 곳(은행, 카드 등)에서는 사용하면 안됨!!
    • 일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용

예시

컨슈머에 오토커밋 설정
1
2
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
  1. 프로듀서로 test1,test2,test3 생산
  2. 오토 커밋 설정한 컨슈머 On , test1~3까지 polling 함
  3. 여기서 Stop으로(SIGKILL)정지하고 재시작하는 경우(graceful shutdown이 아닌 경우)
    다시 test1~3이 들어옴 –> 데이터 중복문제

데이터 중복처리로 발생하는 문제

  • 커머스의 장바구니 시스템에서 중복 이슈 발생
    • 나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김
  • 카드사의 결제 시스템에서 중복 이슈 발생
    • 편의점에서 아이스크림 결제를 했는데 2번 결재됨
  • 택배사 SMS 발송 시스템에서 중복 이슈 발생
    • 집에 도착 했다는 SMS가 2번 발송
  • 데이터 중복을 막을 수 있는 방법
    • 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다
      • 불가능함. 서버/애플리케이션은 언젠가 죽을 수 있다. ex. 배포
    • 오토 커밋을 사용하지 않는다
      • Kafka consumer의 commitSync(), commitAsync() 사용

enable.auto.commit=false

  1. commitSync() : 동기 커밋
  • ConsumerRecord 처리 순서를 보장함
  • 가장 느림(커밋이 완료될 때 까지 block)
  • poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋
  • Map<TopicPartition, OffsetAndMetadata> 을 통해 오프셋 지정 커밋 가능
  1. commitAsync() : 비동기 커밋
  • 동기 커밋보다 빠름
  • 중복이 발생할 수 있음
    � 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때
  • ConsumerRecord 처리 순서를 보장하지 못함
    � 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한

코드 예시 (enable.auto.commit=false)

동기 커밋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
consumer.commitSync(); //이렇게
} catch (CommitFailedException e) {
System.err.println("commit failed");
}
}

//offset 지정 커밋
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(new TopicPartition(record.topic(), record.partition()), null);
try {
consumer.commitSync(offset); //이렇게
} catch (CommitFailedException e) {
System.err.println("commit failed");
}


//비동기 커밋
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitASync();
}

//비동기 , 동기 커밋 같이 쓰기
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.err.println("commit failed");
}finally {
consumer.commitSync();
}

Consumer rebalance

  • 리밸런스 - 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상
  • 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없음
  • 리밸런스 발생시 데이터 유실/중복 발생 가능성 있음
    • commitSync() 또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지
  • 언제 리밸런스 발생?
    • consumer.close() 호출시 또는 consumer의 세션이 끊어졌을 때
    • 세션 끊김 예시
      1
      2
      configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
      configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
      1. 3초마다 그룹 코디네이터(브로커 중 1대)에게 Heartbeat 전송
      2. 10초의 세션 타임아웃 시간 안에 Heartbeat가 왔는지 확인
      3. 만약 Heartbeat가 없으면 해당 컨슈머는 죽은것으로 마킹
      4. 리밸런스 시작
컨슈머 리밸런스 리스너 활용 예
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
consumer.subscribe(Arrays.asList("test"), new RebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
static class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions.");
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned partitions.");
}
}

리밸런스 리스너는 언제 사용할까?

  • 리밸런스 발생에 따른 offset commit
  • 리밸런스 시간 측정을 통한 컨슈머 모니터링
    • 복구 오래 걸릴 때 해당 로그등을 통해 추후에 시간 추정 가능

Consumer wakeup

  • tcp/ip 할때 SIGTERM 받을시 실행되서 리소스(파일,소켓,DB커넥션)닫고 저장하는 cleanUp 함수로 여기면 됨
  • 컨슈머 정상적으로 종료하고 싶을 때
  • polling 할떄 wakeupException 발생 -> 정상적으로 나올 수 있음
  • 예시) [위에서와 같음]
    • 중간에 SIGKILL 종료시 마지막 커밋부터 다시 시작하므로 SIGKILL 직전까지 중복처리 됨
자바에서 shutdown에 hook을 받아 wakeup실행
1
2
3
4
5
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
}
});
WakeupException처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) { //SIGTERM시 위의 셧다운 훅에 의해 발생
System.out.println("poll() method trigger WakeupException");
}finally {
//안정한 종료
consumer.commitSync();
consumer.close();
}
  • SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요
  • SIGKILL(9)는 프로세스 강제 종료로 커밋 불가–> 중복/유실 발생

Consumer thread 전략
1안) 1 프로세스 + 1 스레드(컨슈머)

  • 간략한 코드
  • 프로세스 단위 실행/종료
  • 다수의 컨슈머 실행 필요시 다수의 프로세스 실행 필요
    1프로세스 + 1스레드
    1
    2
    3
    $ cat consumer.conf
    {"topic":"click_log", "group.id":"hadoop-consumers"}
    $ java -jar one-process-one-consumer.jar --path consumer.conf
    2안) 1 프로세스 + n 스레드(동일 컨슈머 그룹)
  • 복잡한 코드
  • 스레드 단위 실행/종료
  • 스레드간 간섭 주의(세마포어, 데드락 등)
  • 다수 컨슈머 실행시 다수 스레드 실행 가능
    1프로세스 20스레드
    1
    2
    3
    $ cat consumer.conf
    {"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20}
    $ java -jar one-process-multiple-consumer.jar --path consumer.conf
    3안) 1 프로세스 + n 스레드(다수 컨슈머 그룹)
  • 복잡한 코드
  • 컨슈머 그룹별 스레드 개수 조절 주의
    1
    2
    3
    4
    5
    6
    7
    $ cat consumer.conf
    [
    {"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20},
    {"topic":"click_log", "group.id":"elasticsearch-consumers", "consumer.no":1},
    {"topic":"application_log", "group.id":"hadoop-consumers", "consumer.no":5}
    ]
    $ java -jar one-process-multiple-consumer-multiple-group.jar --path consumer.conf
공유하기