※ 예제 코드의 스펙은 아래와 같습니다.
- JDK 21
- Spring Boot 3.4.2
- Kafka Streams 3.5.1
- confluentinc/cp-kafka:latest(Docker Image, 2025-02-24 기준,
sha256:e6b87a4a8ca07aadba9c04d86515a340f67cd11ca6160c9b07205f3d88dfb5f1)
Stream(), to()
스트림즈DSL로 구현할 수 있는 가장 간단한 프로세싱은 특정 토픽(Topic)의 데이터(Record)를 다른 토픽으로 전달하는 것입니다.
특정 토픽을 KStream 형태로 가져오려면 스트림즈DSL의 stream() 메서드를 사용하고(소스 프로세서) 이렇게 가져온 데이터를 다른 토픽으로 전달하려면 to() 메서드를 사용합니다.(싱크 프로세서)
아래 코드는 streams-test1 토픽의 데이터를 streams-test2 토픽으로 전달하는 예제입니다.
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder streamsBuilder = new StreamsBuilder();
// KStream 형태로 데이터 조회(소스 프로세서)
KStream<String, String> testStream = streamsBuilder.stream("streams-test1");
// 데이터 확인
testStream.peek((key, value) -> System.out.println("Received Key: " + key + ", Value: " + value));
// streams-test2 Topic으로 데이터 전달(싱크 프로세서)
testStream.to("streams-test2");
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.start();
streams-test1 토픽에 데이터를 넣어 확인해 보면 streams-test2 토픽에 정상적으로 전달된 것을 확인할 수 있습니다.
filter()
filter() 메서드를 사용해 특정 조건에 맞는 데이터를 필러링 할 수 있습니다.(스트림 프로세서)
아래 코드는 streams-test1 토픽의 데이터 값의 길이가 5 이상인 데이터만 필터링하여 streams-test2 토픽으로 전달하는 예제입니다.
StreamsBuilder streamsBuilder = new StreamsBuilder();
// KStream 형태로 데이터 조회(소스 프로세서)
KStream<String, String> testStream = streamsBuilder.stream("streams-test1");
// 메시지 값의 길이가 5보다 큰 경우만 필터링(스트림 프로세서)
KStream<String, String> filteredStream = testStream.filter((key, value) -> value.length() > 5);
// 데이터 확인
filteredStream.peek((key, value) -> System.out.println("Received Key: " + key + ", Value: " + value));
// streams-test2 Topic으로 데이터 전달(싱크 프로세서)
filteredStream.to("streams-test2");
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.start();
streams-test1 토픽에 데이터를 넣어 확인해 보면 streams-test2 토픽에 필터링한 데이터만 정상적으로 전달된 것을 확인할 수 있습니다.
(apple앞에 이상한 특수문자가 들어가서 5자 이상으로 필터링되었네요;;)
join()
대부분의 데이터베이스는 정적으로 저장된 데이터를 조인할 수 있지만 카프카에서는 실시간으로 들어오는 데이터를 조인할 수 있습니다.
이를 통해 데이터베이스에 이벤트를 저장하지 않고도 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있습니다.
KTable과 KStream을 join()
아래 코드는 KTable과 KStream을 소스 프로세서로 가져와서 조인을 수행하는 스트림 프로세서를 거쳐 특정 토픽에 저장하는 싱크 프로세서의 로직입니다.
StreamsBuilder streamsBuilder = new StreamsBuilder();
// KTable 형태로 데이터 조회(소스 프로세서)
KTable<String, String> userProfiles = streamsBuilder.table("user-profile");
// KStream 형태로 데이터 조회(소스 프로세서)
KStream<String, String> userActions = streamsBuilder.stream("user-action");
// KStream과 KTable을 조인(스트림 프로세서)
KStream<String, String> enrichedStream = userActions.join(userProfiles,
(action, profile) -> "Action: " + action + ", Profile: " + profile);
// enriched-user-actions Topic으로 데이터 전달(싱크 프로세서)
enrichedStream.to("enriched-user-action");
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.start();
user-profile, user-action 토픽에 데이터를 넣어 확인해 보면 enriched-user-action 토픽에 조인한 데이터를 확인할 수 있습니다.
예제를 통해 메시지 키를 기반으로 데이터를 조인하고, 조인된 메시지의 키로 등록되는 것을 확인할 수 있습니다.
이번엔 KTable 토픽의 값이 바뀌게 된다면 어떻게 될까 예제를 통해 확인해 보겠습니다.
새로운 데이터가 조인된 것을 확인할 수 있습니다.
GlobalKTable과 KStream을 join()
user-profile, user-action 토픽은 코파티셔닝이 되어 있으므로 조인이 가능했지만 만약 코파티셔닝이 되어있지 않은 토픽을 조인해야 할 때는 리파티셔닝을 수행한 이후 코파티셔닝 된 상태로 조인하거나 GlobalKTable을 사용하여 조인해야 합니다.
먼저 코파티셔닝이 되지 않은 토픽들을 생성해 GlobalKTable과 KStream을 소스 프로세서로 가져와서 조인을 수행하는 스트림 프로세서를 거쳐 특정 토픽에 저장하는 싱크 프로세서의 로직을 살펴보겠습니다.
StreamsBuilder streamsBuilder = new StreamsBuilder();
// GlobalKTable 형태로 데이터 조회(소스 프로세서)
GlobalKTable<String, String> userProfiles = streamsBuilder.globalTable("user-profile-v2");
// KStream 형태로 데이터 조회(소스 프로세서)
KStream<String, String> userActions = streamsBuilder.stream("user-action-v2");
// KStream과 KTable을 조인(스트림 프로세서)
KStream<String, String> enrichedStream = userActions.join(userProfiles,
(key, value) -> key, // 메시지 키와 매칭
(action, profile) -> "Action: " + action + ", Profile: " + profile);
// enriched-user-actions-v2 Topic으로 데이터 전달(싱크 프로세서)
enrichedStream.to("enriched-user-action-v2");
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.start();
user-profile-v2, user-action-v2 토픽에 데이터를 넣어 확인해 보면 enriched-user-action-v2 토픽에 조인한 데이터를 확인할 수 있습니다.
결과물을 보면 KTable과 크게 다르지 않아 보이지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하고, KStream의 메시지 키뿐만 아니라 메시지 값을 기준으로도 매칭하여 조인할 수 있다는 점이 다릅니다.
스트림즈DSL에서는 각 프로세서 역할을 하는 메서드들을 제공하기 때문에 메서드를 찾아서 적재적소에 구현할 수 있습니다.
스트림즈DSL에서 사용 가능한 모든 프로세서와 사용 방법은 공식 가이드 문서에서 확인할 수 있습니다.
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
'Apache Kafka' 카테고리의 다른 글
카프카 프로세서(Processor) API 사용 예제 (0) | 2025.02.24 |
---|---|
카프카 프로세서(Processor) API의 기본 개념 (0) | 2025.02.24 |
카프카 스트림즈(Streams) DSL의 주요 옵션 (0) | 2025.02.07 |
카프카 스트림즈(Streams) DSL의 기본 개념 (0) | 2025.02.07 |
카프카 스트림즈(Kafka Streams)의 기본 개념 (0) | 2025.02.07 |