반응형
앞서 Local에서 여러가지 Kafka 명령어를 날려보았다.
이제는 Spring Boot에 연동해 실제 프로젝트에 적용해보자
Spring Boot에 Kafka를 연동
- Java(11)
- Amazon Corretto JDK(11)
- Spring Boot(2.5.5)
1. Kafka 의존성 추가
dependencies {
...
// rest
implementation 'org.springframework.boot:spring-boot-starter-web'
// kafka
implementation 'org.springframework.kafka:spring-kafka'
...
}
2. application.yml 설정
spring:
kafka:
bootstrap-servers: 3.34.97.97:9092
consumer:
# consumer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 식별 가능한 Consumer Group Id
group-id: devbeekei
# Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
# latest: 가장 최근에 생산된 메시지로 offeset reset
# earliest: 가장 오래된 메시지로 offeset reset
# none: offset 정보가 없으면 Exception 발생
auto-offset-reset: earliest
# 데이터를 받아올 때, key/value를 역직렬화
# JSON 데이터를 받아올 것이라면 JsonDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# producer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 데이터를 보낼 때, key/value를 직렬화
# JSON 데이터를 보낼 것이라면 JsonDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
참고
3. Controller, Producer, Consumer
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping(value = "/message")
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
@Service
public class KafkaProducer {
private static final String TOPIC = "fruit";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
@Service
public class KafkaConsumer {
@KafkaListener(topics = "fruit", groupId = "devbeekei")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s", message));
}
}
4. Server Run!
로컬 카프카에서 해당 카프카 서버에 토픽을 조회하면
# 생성된 Topic 조회
bin/kafka-topics.sh --bootstrap-server 3.34.97.97:9092 --list
__consumer_offsets
fruit
코드에서 설정한 __consumer_offsets, fruit Topic이 생성된 것을 확인할 수 있다.
__consumer_offsets은 각각의 consumer group이 어디까지 consume 했는지 저장하는 곳이다.
# __consumer_offsets 조회
bin/kafka-console-consumer.sh --bootstrap-server 3.34.97.97:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
5. Topic의 Record produce
정상적으로 message가 produce, consume 되는 것을 확인할 수 있다.
# fruit Topic에 생성된 Record 조회
bin/kafka-console-consumer.sh --bootstrap-server 3.34.97.97:9092 --topic fruit --from-beginning
apple
전체 소스는 GitHub에서 확인하실 수 있습니다.
반응형
'Apache Kafka' 카테고리의 다른 글
Local에서 Kafka 명령어 날리기 (0) | 2021.09.24 |
---|---|
EC2 생성 후 접속, Kafka 설치 및 설정 (0) | 2021.09.24 |
Kafka Producer & Consumer란? (0) | 2021.09.17 |
Kafka Topic & Partition이란? (0) | 2021.09.17 |
Kafka Broker & Zookeeper란? (0) | 2021.09.17 |