카프카 클라이언트(Kafka Client)란?
카프카 클러스터(Kafka Cluster)에 명령을 내리거나 데이터를 송수신하기 위해 다양한 API를 제공합니다.
카프카 클라이언트는 라이브러리이기 때문에 자체 라이프사이클을 가진 프레임워크 혹은 애플리케이션 위에서 구현 및 실행해야 합니다.
각각의 API가 어떻게 동작하며 어떤 기능을 제공하는지 정리하려고 합니다.
예제 스펙은 아래와 같습니다.
- Java 17
- Spring Boot 3.4.1
- Kafka Client 3.8.1
# spring-kafka를 사용하였습니다.
dependencies {
...
implementation 'org.springframework.kafka:spring-kafka'
}
프로듀서(Producer) API
프로듀서 애플리케이션은 데이터를 특정 토픽 파티션에 전송합니다.
프로듀서는 데이털르 직렬화여 카프카 브로커로 보내기 때문에 자바에서 선언 가능한 모든 형태를 브로커에 전송할 수 있습니다.
1. 프로듀서 설정 및 사용
public class KafkaProducerTest {
private static final Logger log = LoggerFactory.getLogger(KafkaProducerTest.class);
private final KafkaProducer<String, String> kafkaProducer;
public KafkaProducerTest() {
Properties properties = new Properties();
// 카프카 클러스터 Host 설정
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key값 직렬화 클래스 설정
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value값 직렬화 클래스 설정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
this.kafkaProducer = new KafkaProducer<>(properties);
}
@Test
public void send() {
final String TOPIC_NAME = "producer-test";
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
log.info("Send record: {}", record);
Future<RecordMetadata> sendResult = kafkaProducer.send(record);
log.info("Send result: {}", sendResult);
// 프로듀서 내부 레코드 배치를 브로커에 전송
kafkaProducer.flush();
// Producer 인스턴스 리소스 종료
kafkaProducer.close();
}
}
예제에서는 ProducerRecord에 토픽과 메시지값만 설정하였지만 ProducerRecord를 오버로딩하여 파티션 번호나 타임스탬프 같은 내부 변수를 선언할 수 있습니다.
해당 테스트 코드를 실행해보면 다음과 같은 카프카 프로듀서의 설정 옵션과 메시지 발송 로그를 확인할 수 있습니다.
커맨드 명령어로 확인해보면 메시지가 정상적으로 전송된 것을 확인할 수 있습니다.
2. 프로듀서 중요 개념
프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거칩니다.
send() 메서드를 호출하면 ProducerRecord는 파티셔너(Partitioner)에서 토픽의 어느 파티션으로 전송될 것인지 선택됩니다.
파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐물레이터(Accumulator)에 데이터를 버퍼로 쌓고 있다가 배치고 묶어서 전송합니다. 이로 인해 카프카 프로듀서에 처리량을 향상하는 데에 상당한 도움을 줍니다.
KafkaProducer 인스턴스를 생성할 때 파티셔너를 따로 설정하지 않으면 기본 파티셔너로 지정되는데 카프라 클라이언트 라이브러리 버전에 따라 사용하는 파티셔너가 달라질 수 있으므로 주의해야 합니다.
v2.4.0 이전 - RoundRobinPartitioner
Round Robin 방식으로 파티션을 순회하면서 레코드를 하나씩 전달하여 배치 기능을 비효율적으로 사용하기 때문에 레이턴시가 늘어가게 됩니다.
v2.4.0 이전 - UniformStrickyPartitioner
RoundRobinPartitioner의 단점인 작은 사이즈의 배치의 영향을 줄이기 위해 서로 가까운 시간에 전송된 메시지를 수신하는 프로듀서는 다른 파티션에 대한 배치를 만들기 전에 배치를 하나의 파티션에 채우려고 시도하여 레이턴시를 감소시킬 수 있었습니다.
3.4.0 이후 - StrictlyUniformStickyPartitioner
UniformStrickyPartitioner는 초기에는 파티션이 불균형한 것 처럼 보이지만 시간이 지나면서 파티션의 균형에는 영향을 주지 않습니다.
하지만 파티션의 리더(Leader)가 바뀌거나 일시적인 네트워크의 문제로 특정한 브로커의 성능이 느려질 경우 새로운 배치가 생성되는 시간이 늘어나기 때문에 분배되는 레코드 수가 늘어나게 됩니다. 이로 인해 느린 브로커는 더욱 느려져 계속해서 불균형해지는 문제 있습니다.
StrictlyUniformStickyPartitioner에서는 UniformStickyPartitioner의 방식 대신 batch.size 기준으로 파티션을 전환합니다.
특정 파티션으로 batch.size 바이트가 생산되면, 새로운 파티션으로 전환합니다. 이는 배치나 기타 설정(linger.ms 등)과 무관하게 작동하며, 단순히 각 파티션에 전송된 바이트 수를 기준으로 전환합니다.
하지만 여기에도 느린 브로커가 병목 되는 문제가 있는데 느린 브로커가 처리량을 따라가지 못하면, 데이터가 누적되어 버퍼 풀 메모리가 고갈되고 전송 속도가 가장 느린 브로커의 처리 용량에 맞춰지게 됩니다.
이러한 문제를 해결하기 위해 대기열에 전송을 기다리는 배치 크기를 통해 브로커의 부하를 판단하고 해당 파티션의 선택 가능성을 낮추게 됩니다.(선택 확률은 대기열 크기의 역비례 관계)
이 방식은 균등성, 배치 효율성, 적응성을 모두 제공하며, Kafka의 데이터 분배와 처리량을 더욱 최적화할 수 있습니다.
출처)
https://devidea.tistory.com/123
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
추가적으로 카프카 프로듀서는 압축을 통해 브로커로 전송할 수 있는데 gzip, snappy, lz4, zstd를 지원하고 있습니다.
압축하여 데이터 전송 시 네트워크 처리량에서는 이득을 볼 수 있지만 압축하거나 압축을 푸는데 CPU와 메모리 리소스를 사용하게 되므로 환경에 따라 적절하게 사용해야 합니다.
3. 프로듀서 주요 옵션
프로듀서 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있는데 필수 옵션은 반드시 선언해야 하며, 선택 옵션은 선언하지 않아도 되지만 기본값으로 설정되기 때문에 선택 옵션은 무엇이 있는지 기본값은 무엇인지 알고 있어야 합니다.
필수 옵션
- bootstrap.servers: 데이터를 저송할 카프카 클러스터를 1개 이상 설정
- key.serializer: 레코드의 데이터 키를 직렬화하는 클래스 설정
- value.serializer: 레코드의 데이터 값을 직렬화 하는 클래스 설정
선택 옵션
- acks: 데이터가 브로커에 정상적으로 저장되었는지 판단 기준 설정
- 0: 프로듀서가 전송한 즉시 성공으로 판단
- 1: 리더 파티션에 저장되면 성공으로 판단
- -1(all): min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공
- buffer.memory: 전송할 데이터를 배치로 모으기 위한 설정 버퍼 메모리양 설정
- retries: 프로듀서가 브로커로부터 에러를 전달받았을 때 재전송을 시도하는 횟수 설정
- batch.size: 배치로 전송할 레코드 최대 용량 설정
(너무 작게 설정하면 네트워크가 많아지고, 너무 크게 설정하면 메모리 사용량이 많아질 수 있습니다.) - linger.ms: 배치를 전송하지 전까지 기다리는 최소 시간 설정(default: 0)
- partitioner.class: 파티셔너 클래스 설정
- enable.idempotence: 멱등성 프로듀서 설정
- transactional.id: 고유한 트랜잭션 아이디 설정
(이 옵션을 설정하게 되면 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶어 전송합니다. -> 트랜잭션 프로듀서)
더욱 다양한 옵션은 https://kafka.apache.org/documentation/#producerconfigs에서 확인이 가능합니다.
4. 전송할 메시지에 키와 파티션 설정
메시지 키가 포함된 레코드를 전송하고 싶다면 ProducerRecord 생성 시 파라미터로 추가해야 합니다.
String key = "hello1";
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, message);
만약 파티션을 직접 지정하고 싶다면 동일하게 ProducerRecord 생성 시 파라미터로 추가하면 됩니다.
int partition = 1;
String key = "hello1";
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partition, key, message);
5. 커스텀 파티셔너 설정
환경에 따라 특정한 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있습니다.
이때 메시지 키를 사용하더라도 어느 파티션으로 고정되어 들어가는지 알 수 없기 때문에 Partitioner 인터페이스를 사용하여 사용자 저의 파티셔너를 생성하여 특정 파티션으로 전송되도록 고정할 수 있습니다.
이렇게 지정할 경우 파티션이 추가되더라도 해당 메시지는 설정한 파티션에만 전송되게 됩니다.
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if ("p1".equals(key)) {
return 1;
} else if ("p2".equals(key)) {
return 2;
} else {
RoundRobinPartitioner roundRobinPartitioner = new RoundRobinPartitioner();
return roundRobinPartitioner.partition(topic, key, keyBytes, value, valueBytes, cluster);
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
예제에서는 메세지 키가 "p1"일 때는 1번 파티션으로, "p2"일 때는 2번 파티션으로, 그 외에는 RoundRobinPartitioner의 파티션 선택 로직을 실행하도록 구현하였습니다.
사용 시에는 KafkaProducer 인스턴스를 생성할 때 구현한 커스텀 파티션 클래스를 설정해야 합니다.
Properties properties = new Properties();
...
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
this.kafkaProducer = new KafkaProducer<>(properties);
6. 브로커에 정상적인 전송 확인
KafkaProducer의 send() 메서드는 Future 객체를 반환하는데 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 정상적으로 브로커에 적재되었는지에 대한 데이터가 포함되어 있습니다.
send()의 결과값은 카프카 브로커로부터 응답을 기다렸다가 브로커로부터 응답이 오면 RecordMetadata 인스턴스를 반환합니다.
Future<RecordMetadata> sendResult = kafkaProducer.send(record);
log.info("Send result: {}", sendResult.get());
메시지가 정상적으로 적재되었다면 토픽 이름과 파티션 번호, 오프셋 번호가 출력됩니다.
그러나 동기로 프로듀서의 전송 결과를 확인하는 것은 빠른 전송에 장애물이 될 수 있기 때문에 Callback 인터페이스를 구현하여 비동기로 결과를 확인할 수 있습니다.
public class ProducerCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error(e.getMessage(), e);
} else {
log.info("Record sent to topic: {}, partition: {}, offset: {}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset()
);
}
}
}
kafkaProducer.send(record, new ProducerCallback());
비동기로 결과를 받을 경우 동기로 결과를 받는 경우보다 더 빠른 속도로 데이터를 추가 처리할 수 있지만 비동기로 결과를 기다리는 동안 다음 데이터 전송이 성공하고 전에 전송한 데이터가 전송에 실패할 경우 데이터 순서가 역전될 위험이 있기 때문에 전송하는 데이터의 순서가 중요한 경우 사용해서는 안됩니다.
'Apache Kafka' 카테고리의 다른 글
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (2/2) (1) | 2025.01.30 |
---|---|
카프카 클라이언트(Kafka Client) - 컨슈머(Consumer) API (1/2) (0) | 2025.01.29 |
카프카(Kafka)의 기본 개념 - 토픽(Topic), 파티션(Partition), 레코드(Record) (0) | 2025.01.17 |
카프카 커맨드 라인 툴(kafka command-line-tool) 명령어 설명 및 사용법 (0) | 2025.01.17 |
카프카(Kafka)의 기본 개념 - 브로커(Broker) (0) | 2025.01.16 |