이전 글에서는 컨슈머의 중요 개념과 주요 옵션에 대해 정리해보았습니다.
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (1/2)
컨슈머(Consumer) API프로듀서가 전송한 데이터는 브로커에 적재되고 컨슈머는 적재된 데이터를 사용하기 위해 브로커부터 데이터를 가져와서 필요한 처리를 하게 됩니다.1. 컨슈머 설정 및 사용
devbksheen.tistory.com
이번 글에는 더욱 다양한 설정과 안전한 종료에 대해 정리해보겠습니다.
4. 동기 오프셋 커밋
poll() 메서드가 호출된 이후에 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있습니다.
...
// 명시적 오프셋을 구현하기 위헤 자동 커밋 옵션을 false로 설정
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of("consumer-test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info("record: {}", record);
}
consumer.commitSync();
}
commitSync()는 poll() 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋하기 때문에 모든 레코드의 처리가 끝난 후 commitSync() 메서드를 호출해야 합니다.
동기 커밋의 경우 브로커로 커밋을 요청한 이후에 커밋이 완료되기까지 데이터를 처리하지 않기 때문에 자동 커밋이나 비동기 오프셋 커밋보다 동일 시간당 데이터 처리량이 적다는 특징이 있습니다.
만약 개별 레코드 단위로 매번 오프셋을 커밋하고 싶다면, commitSync() 메서드에 Map<TopicPartition, OffsetAndMetadata> 인스턴스를 파라미터로 넣으면 됩니다.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
currentOffset.put(
// 해당 토픽, 파티션의 오프셋 커밋
new TopicPartition(record.topic(), record.partition()),
// 이후 컨슈머가 poll()을 수행할때 마지막으로 커밋한 오프셋부터 읽기 때문에 현재 오프셋에 +1 처리
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(currentOffset);
}
}
5. 비동기 오프셋 커밋
동일 시간당 더 많은 데이터를 처리하기 위해 commitAsync() 메서드를 호출하여 비동기 오프셋 커밋을 사용할 수 있습니다.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info("record: {}", record);
}
consumer.commitAsync();
}
동기 오프셋 커밋과 마찬가지로 poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋합니다.
다만 다른 점은 커밋이 완료될 때까지 응답을 기다리지 않기 때문에 데이터 처리량이 더 많습니다.
비동기 오프셋 커밋을 사용할 경우 비동기로 커밋 응답을 받기 때문에 callback 함수를 파라미터로 받아서 결과를 얻을 수 있습니다.
consumer.commitAsync((map, e) -> {
if (e != null) {
log.error("Commit failed", e);
} else {
log.info("Commit succeeded");
}
});
OffsetCommitCallback라는 함수형 인터페이스의 onComplete 메서드를 람다식으로 구현하여 매개변수에 넘겨주어 응답을 받을 수 있습니다.
정상적으로 커밋되었다면 Exception(e) 변수는 null이고, 커밋 완료된 오프셋 정보가 Map<TopicPartition, OffsetAndMetadata>에 포함되어 있습니다.
만약 커밋이 실패했다면 Exception(e) 변수를 통해 커밋이 실패한 이유를 확인할 수 있습니다.
6. 리밸런스 리스너를 가진 컨슈머
poll() 메서드를 통해 받은 데이터를 처리하는 중에 리밸런스가 발생하게 되면 아직 커밋하지 않았기 때문에 데이터가 중복 처리될 가능성이 있습니다.
이러한 문제를 방지하기 위해선 데이터 처리 도중 리밸런스를 감지해 처리한 데이터를 기준으로 커밋을 시도해야 합니다.
카프카 라이브러리는 리밸런스를 감지하기 위해 ConsumerRebalanceListerner 인터페이스를 지원하는데 이 인터페이스는 onPartitionRevoked() 메서드와 onPartitionAssigned() 메서드로 이루어져 있습니다.
- onPartitionRevoked(): 리밸런스가 시작되기 직전에 호출
- onPartitionAssigned(): 리밸런스가 끝난 뒤 파티션 할당이 완료되면 호출
private KafkaConsumer<String, String> consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
public void rebalanceTest() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋을 위해 설정
consumer = new KafkaConsumer<>(properties);
// ConsumerRebalanceListener 구현체 설정
consumer.subscribe(List.of("consumer-test"), new CustomConsumerRebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
log.info("record: {}", record);
currentOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(currentOffset);
}
}
}
private class CustomConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
log.warn("Partitions revoked: {}", collection);
// 리밸런싱이 발생하기 직전에 오프셋 커밋
consumer.commitSync(currentOffset);
currentOffset.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
log.warn("Partitions assigned: {}", collection);
}
}
7. 파티션 할당 컨슈머
컨슈머를 운영할 때 assign() 메서드를 사용해 파티션을 명시적으로 할당하여 운영할 수도 있습니다.
이 방법은 컨슈머가 특정 토픽, 파티션에 할당되므로 리밸런싱 하는 과정이 없습니다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// TopicPartition 클래스에 토픽과 함께 파티션 번호 설정
consumer.assign(List.of(new TopicPartition("consumer-test", 0)));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info("record: {}", record);
...
}
}
assignment() 메서드로 컨슈머에 할당된 파티션을 확인할 수 있습니다.
Set<TopicPartition> topicPartitions = consumer.assignment();
8. 컨슈머의 안전한 종료
정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게 됩니다.
이로 인해 파티션의 데이터를 소모하지 못하게 되어 컨슈머 랙이 늘어나고 데이터 처리 지연이 발생하게 됩니다.
KafkaConsumer 클래스는 wakeup() 메서드를 지원하여 컨슈머를 안전하게 종료할 수 있습니다.
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info("record: {}", record);
}
}
} catch (WakeupException e) {
log.warn("WakeupException", e);
} finally {
consumer.close();
}
poll() 메서드를 통해 데이터를 처리하다 wakeup() 메서드가 호출되면 WakeupException 예외가 발생하고 catch 문에서 리소스를 종료하는 작업을 처리합니다.
finally 문에서는 close() 메서드를 호출하여 카프카 클러스터(Kafka cluster)에 컨슈머가 안전하게 종료되었음을 알려주게 되면 해당 컨슈머는 컨슈머 그룹에서 제거되고 나머지 컨슈머들이 파티션을 할당받게 됩니다.
자바 애플리케이션의 경우 코드 내부에 셧다운 훅(shutdown hook)을 구현하여 안전한 종료를 명시적으로 구현할 수 있습니다.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutting down");
consumer.wakeup();
}));
'Apache Kafka' 카테고리의 다른 글
카프카 스트림즈(Kafka Streams)의 기본 개념 (0) | 2025.02.07 |
---|---|
카프카 클라이언트(Kafka Client) - 어드민(Admin) API (0) | 2025.01.30 |
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (1/2) (0) | 2025.01.29 |
카프카 클라이언트(Kafka Client) - 프로듀서(Producer) API (0) | 2025.01.20 |
카프카(Kafka)의 기본 개념 - 토픽(Topic), 파티션(Partition), 레코드(Record) (0) | 2025.01.17 |