Apache Kafka

카프카 프로세서(Processor) API 사용 예제

Beekei 2025. 2. 24. 16:49
반응형
※ 예제 코드의 스펙은 아래와 같습니다.
- JDK 21
- Spring Boot 3.4.2
- Kafka Streams 3.5.1
- confluentinc/cp-kafka:latest(Docker Image, 2025-02-24 기준,
sha256:e6b87a4a8ca07aadba9c04d86515a340f67cd11ca6160c9b07205f3d88dfb5f1)

구현 예제

프로세서(Processor) API에서 로직을 구현하기 위해서는 스트림 프로세서 역할을 하는 Processor 인터페이스를 구현해야 합니다.

스트림즈(Streams)DSL의 filter() 메서드와 동일한 기능을 하는 FilterProcessor를 구현해 보겠습니다.

Processor

스트림 프로세서 클래스를 생성하기 위해서는 카프카 스트림즈(kafka streams) 라이브러리에서 제공하는 Processor 또는 Transformer 인터페이스를 구현해야 합니다.

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

public class FilterProcessor implements Processor<String, String, String, String> {

  private ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
  }

  @Override
  public void process(Record<String, String> record) {
    if (record.value().length() > 5) {
      context.forward(record);
    }
    context.commit();
  }

  @Override
  public void close() {
    // 사용한 리소스 해체 로직
  }

}

Processor<KIn, VIn, KOut, VOut>

프로세서 클래스에서 제네릭 변수들에 대한 설명은 아래와 같습니다.

  • KIn: 전달받을 메시지 키의 유형
  • VIn: 전달받을 메시지 값의 유형
  • KOut: 전달할 메시지 키의 유형
  • VOut: 전달할 메시지 값의 유형

ProcessorContext

ProcessorContext는 프로세서 내부에서 사용할 수 있는 컨텍스트 객체로 프로세서에 대한 정보를 담고 있어, 현재 스트림 처리 중인 토폴로지의 토픽 정보, 애플리케이션 아이디를 조회할 수 있습니다. 프로세싱 처리에 제공하고 있는 메서드는 아래와 같습니다.

  • currentSystemTime()
    • 현재 시스템 시간을 밀리초 단위로 반환합니다.
  • currentStreamTime()
    • 스트림즈의 이벤트 시간(스트림 시간)을 반환합니다. 이벤트 시간은 Kafka에서 설정한 타임스탬프를 기준으로 합니다.
  • topic()
    • 현재 처리 중인 토픽 이름을 반환합니다.
  • partition()
    • 현재 처리 중인 파티션 번호를 반환합니다.
  • offset()
    • 현재 처리 중인 레코드의 오프셋(offset)을 반환합니다.
  • getStateStore(String storeName)
    • State Store에 접근하여 데이터를 읽고 쓸 수 있습니다.
  • forward(Record<KOut, VOut> record)
    • 현재 처리된 데이터를 다음 프로세서로 전달합니다.
  • schedule(Duration interval, PunctuationType type, Punctuator callback)
    • 특정 시간 후에 실행될 작업을 예약합니다.
  • commit()
    • 현재의 상태 저장소에서 변경된 내용을 커밋합니다.
  • close()
    • 프로세서가 종료될 때 호출됩니다. 리소스를 정리하거나 상태 저장소를 닫는 등의 작업을 처리할 수 있습니다.

 

이렇게 Processor 인터페이스를 구현한 클래스를 addProcessor 메서드를 사용해 추가합니다.

Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Topology topology = new Topology();
topology.addSource("Source", "processor-test-1")
  .addProcessor("Process", FilterProcessor::new, "Source")
  .addSink("Sink", "processor-test-2", "Process");

KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();

Topology

Topology 클래스는 포톨리지를 구성할 수 있으며 제공하는 메서드는 다음과 같습니다.

  • addSource(String name, String... topics)
    • 데이터를 읽어올 소스 노드(Source Node)를 추가하는 메서드입니다.
    • name: 노드명 설정
    • topics: 데이터를 가져올 토픽명
  • addProcessor(String name, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, String... parentNames)
    • 데이터를 변환, 필터링, 집계하는 핵심 로직을 정의하는 프로세서 노드(Processor Node)를 추가하는 메서드입니다.
    • name: 노드명 설정
    • supplier: 프로세서 로직을 정의하는 메서드(함수형 인터페이스)
    • parentNames: 데이터를 전달받을 노드명
  • addSink(String name, String topic, String... parentNames) 
    • 데이터를 출력하는 싱크 노드(Sink Node)를 추가하는 메서드입니다.
    • name: 노드명 설정
    • topic: 데이터를 전달할 토픽명
    • parentNames: 데이터를 전달받을 노드명
  • addStateStore(StoreBuilder<?> storeBuilder, String... processorNames)
    • 특정 프로세서에서 사용할 수 있는 State Store를 추가하는 메서드입니다. 이를 통해 데이터를 저장하고 다시 검색할 수 있습니다.
    • storeBuilder: Key, Value Store 객체
    • processorNames: 해당 Store을 사용할 수 있는 프로세서 노드명 
  • describe()
    • 현재 Topology의 구조를 문자열로 반환하는 메서드입니다.

 

이제 토픽에 데이터를 추가해 보면 5자 이상인 데이터만 필터링되어 다른 토픽으로 전송된 것을 확인할 수 있습니다.

processor-test-1 topic 데이터 produce
processor-test-2 topic 데이터 consume


프로세서 API를 활용하는 추가적인 정보는 공식 가이드 문서에서 확인할 수 있습니다.

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

 

반응형