컨슈머(Consumer) API
프로듀서가 전송한 데이터는 브로커에 적재되고 컨슈머는 적재된 데이터를 사용하기 위해 브로커부터 데이터를 가져와서 필요한 처리를 하게 됩니다.
1. 컨슈머 설정 및 사용
예제 스펙은 프로듀서(Producer) API 설명 및 예제글에서 확인 가능합니다.
public KafkaConsumerTest() {
Properties properties = new Properties();
// 카프카 클러스터 Host 설정
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key값 역직렬화 클래스 설정
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value값 역직렬화 클래스 설정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 컨슈머 그룹 명 설정
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
this.kafkaConsumer = new KafkaConsumer<>(properties);
// 구독할 토픽 설정
this.kafkaConsumer.subscribe(List.of("producer-test"));
}
@Test
public void consume() {
while (true) {
// 데이터를 가져올 때 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 1초 설정
this.kafkaConsumer.poll(Duration.ofSeconds(1)).forEach(record -> {
log.info("Consume record: {}", record);
});
}
}
예제에는 컨슈머 그룹을 설정해주었는데 컨슈머 그룹을 선언하지 않을 경우 어느 그룹에도 속하지 않는 컨슈머로 동작하게 됩니다.
컨슈머가 중단되거나 재시작되더라도 컨슈머 그룹의 오프셋을 기준으로 이후 데이터 처리를 하기 때문에 subscribe() 메서드를 사용하여 토픽을 구독하는 경우에는 컨슈머 그룹을 선언해야 합니다.
해당 테스트 코드를 실행해보면 다음과 같은 컨슈머의 설정 옵션과 메시지 컨슘 로그를 확인할 수 있습니다.
2. 컨슈머 중요 개념
데이터를 가져오기 위해 컨슈머를 운영하는 두 가지 방법이 있습니다.
첫 번째는 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것이고 두 번째는 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것입니다.
컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때 1개의 컨슈머는 1개 이상의 파티션들에 할당이 가능하고, 1개의 파티션은 최대 1개의 컨슈머에 할당이 가능합니다.
그러므로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 합니다.
만약 파티션의 개수보다 컨슈머의 개수가 많다면 남은 컨슈머들은 파티션을 할당받지 못하고 유휴 상태로 남기 때문에 불필요한 쓰레드를 차지하게 됩니다.
컨슈머 그룹은 다른 그룹들과 격리되는 특징을 가지고 있기 때문에 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다는 장점이 있습니다.
예를 들어 회원이 상품 주문을 완료했을 때 상품 재고를 감소시키고 주문 완료 이메일을 발송하는 프로세스가 있을 때 동기적으로 각기 도메인에 요청할 수 있습니다.
반면 카프카를 사용할 경우 각각 도메인 별로 컨슈머 그룹으로 묶음으로써 격리되어 비동기로 프로세스를 진행할 수 있습니다.
현재 운영하고 있는 토픽의 데이터가 어디에 적재되는지, 어떻게 처리되는지 파악하고 컨슈머 그룹으로 따로 나눌 수 있는 것은 최대한 나누어 적절히 운영하는 것이 매우 중요합니다.
리밸런싱(rebalancing)
만약 컨슈머 그룹 내 특정 컨슈머에 장애가 발생하면 할당된 파티션은 장애가 발생하지 않은 다른 컨슈머로 소유권이 이전됩니다.
이러한 과정을 리밸런싱(rebalancing)이라고 부르는데 크게 두 가지 상황에서 일어나게 됩니다.
첫 번째는 컨슈머가 추가되는 상황이고, 두 번째는 컨슈머에 장애로 인해 제외되는 상황입니다.
한 대의 그룹 조정자(group coordinator) 브로커가 컨슈머 그룹에 컨슈머가 추가되고 삭제되는 상황을 감지하여 리밸런싱을 발동하여 가용성을 높여줍니다.
하지만 리밸런싱이 진행될 때 파티션의 소유권을 컨슈머로 재할당하는 과장에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없으므로 리밸런싱이 자주 발생해서는 안됩니다.
오프셋 커밋(offset commit)
컨슈머는 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 오프셋을 기록하게 되는데 이때 이슈가 발생하여 기록되지 못했다면 데이터 처리의 중복이 발생하기 때문에 오프셋 커밋을 정상적으로 처리했는지 검증해야 합니다.
오프셋 커밋은 명시적, 비명시적으로 수행할 수 있습니다.
명시적 오프셋
명시적 오프셋 커밋은 poll() 메서드 호출 이후 데이터 처리가 완료되고 commitSync() 메서드를 호출해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋합니다.
commitSync() 메서드는 브로커에 커밋 요청을 하고 커밋이 정상적으로 처리되었는지 응답까지 기다리기 때문에 데이터 처리량에 영향을 끼칩니다.
비명시적 오프셋
기본 옵션인 비명시적 오프셋 커밋은 poll() 메서드가 수행될 때 일정한 간격마다 오프셋을 커밋합니다.
enable.auto.commit=true이고 poll() 메서드가 auto.commit.interval.ms 옵션에 설정된 값 이상이 지났을 때 그 시점까지 읽은 오프셋을 커밋합니다.
코드상에서 따로 커밋 관련 코드를 작성할 필요가 없어서 편리하지만 리밸런싱이나 컨슈머 강제종료가 발생할 경우 데이터 중복 또는 유실될 가능성이 있어 데이터가 중요한 서비스의 경우 자동 커밋을 사용해선 안됩니다.
3. 컨슈머 주요 옵션
컨슈머 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있는데 필수 옵션은 반드시 선언해야 하며, 선택 옵션은 선언하지 않아도 되지만 기본값으로 설정되기 때문에 선택 옵션은 무엇이 있는지 기본값은 무엇인지 알고 있어야 합니다.
필수 옵션
- bootstrap.servers: 데이터를 전송받을 카프카 클러스터를 1개 이상 설정
- key.deserializer: 레코드의 데이터 키를 역직렬화하는 클래스 설정
- value.deserializer: 레코드의 데이터 값을 역직렬화하는 클래스 설정
선택 옵션
- group.id: 컨슈머의 그룹 아이디 설정, subscribe() 메서드로 토픽을 구독하여 사용할 경우 필수(default: null)
- auto.offset.reset: 컨슈머 그룹이 저장된 컨슈머 오프셋이 없는 경우 처음에 읽을 오프셋 설정(default: latest)
- latest: 가장 높은 오프셋부터 읽음(가장 최신의 데이터)
- earliest: 가장 낮은 오프셋부터 읽음(가장 오래된 데이터)
- none: 커밋한 기록이 없으면 오류 발생, 있다면 이후 오프셋부터 읽음
- enable.auto.commit: 자동 커밋 여부 설정(default: true)
- auto.commit.interval.ms: 오프셋 커밋 간격 설정(default: 5000(5초), enable.auto.commit=true인 경우)
- max.poll.records: poll() 메서드를 통해 반환되는 레코드 개수 설정(default: 500)
- heartbeat.interval.ms: 하트비트를 전송하는 시간 간격(default: 3000(3초))
- session.timeout.ms: 컨슈머가 브로커와 연결이 끊키는 최대 시간 설정(default: 10000(10초))
- 이 시간 내에 하트비트(heartbeat)를 전송하지 않으면 이슈라고 판단 리밸런싱 진행, 보통 하트비트 시간의 3배로 설정
- max.poll.interval.ms: poll() 메서드를 호출하는 간격의 최대 시간 설정(default: 300000(5분))
- poll() 메서드를 호출한 이후 데이터를 처리하는 시간이 오래 걸리는 경우 비정상으로 판단해 리밸런싱 진행
- isolation.level: 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용(default: read_uncommitted)
- read_committed: 커밋이 완료된 레코드만 읽음
- read_uncommitted: 커밋 여부와 관계없이 모든 레코드 읽음
더욱 다양한 옵션은 https://kafka.apache.org/documentation/#consumerconfigs에서 확인이 가능합니다.
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (2/2)
이전 글에서는 컨슈머의 중요 개념과 주요 옵션에 대해 정리해보았습니다. 카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (1/2)컨슈머(Consumer) API프로듀서가 전송한 데이터는 브로커에 적재
devbksheen.tistory.com
'Apache Kafka' 카테고리의 다른 글
카프카 클라이언트(Kafka Client) - 어드민(Admin) API (0) | 2025.01.30 |
---|---|
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (2/2) (1) | 2025.01.30 |
카프카 클라이언트(Kafka Client) - 프로듀서(Producer) API (0) | 2025.01.20 |
카프카(Kafka)의 기본 개념 - 토픽(Topic), 파티션(Partition), 레코드(Record) (0) | 2025.01.17 |
카프카 커맨드 라인 툴(kafka command-line-tool) 명령어 설명 및 사용법 (0) | 2025.01.17 |