Apache Kafka

Spring Boot에 Kafka 연동

beekei 2021. 9. 24. 17:45
반응형
 

[Kafka] Local에서 Kafka 명령어 날리기

[Kafka] EC2 생성 후 접속, Kafka 설치 및 설정 AWS EC2 생성 후 Inbound rule 추가 1. AWS Console 로그인 후 EC2에 접속 2. EC2 인스턴스 생성 3. 키 페어 생성 4. Inbound rule 추가 Inbound rule은 필요한대로..

devbksheen.tistory.com

앞서 Local에서 여러가지 Kafka 명령어를 날려보았다.

이제는 Spring Boot에 연동해 실제 프로젝트에 적용해보자


Spring Boot에 Kafka를 연동

  • Java(11)
  • Amazon Corretto JDK(11)
  • Spring Boot(2.5.5)

1. Kafka 의존성 추가

dependencies {
	...

	// rest
    implementation 'org.springframework.boot:spring-boot-starter-web'
	// kafka
    implementation 'org.springframework.kafka:spring-kafka'

	...
}

2. application.yml 설정

spring:
  kafka:
    bootstrap-servers: 3.34.97.97:9092
    consumer:
      # consumer bootstrap servers가 따로 존재하면 설정
      # bootstrap-servers: 3.34.97.97:9092

      # 식별 가능한 Consumer Group Id
      group-id: devbeekei
      # Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
      # latest: 가장 최근에 생산된 메시지로 offeset reset
      # earliest: 가장 오래된 메시지로 offeset reset
      # none: offset 정보가 없으면 Exception 발생
      auto-offset-reset: earliest
      # 데이터를 받아올 때, key/value를 역직렬화
      # JSON 데이터를 받아올 것이라면 JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # producer bootstrap servers가 따로 존재하면 설정
      # bootstrap-servers: 3.34.97.97:9092

      # 데이터를 보낼 때, key/value를 직렬화
      # JSON 데이터를 보낼 것이라면 JsonDeserializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

참고

 

Springboot Kafka 설정 application.properties 목록

springboot에서 kafka 설정을 위한 property 목록이다. Key Default Value Description spring.kafka.admin.client-id ID to pass to the server when making requests. Used for server-side logging. spring.kaf..

oingdaddy.tistory.com

 

3. Controller, Producer, Consumer

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/message")
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
        return "success";
    }

}
@Service
public class KafkaProducer {
    private static final String TOPIC = "fruit";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "fruit", groupId = "devbeekei")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }

}

4. Server Run!

서버가 정상적으로 켜진것을 확인할 수 있다.

로컬 카프카에서 해당 카프카 서버에 토픽을 조회하면

# 생성된 Topic 조회
bin/kafka-topics.sh --bootstrap-server 3.34.97.97:9092 --list

__consumer_offsets
fruit

코드에서 설정한 __consumer_offsets, fruit Topic이 생성된 것을 확인할 수 있다.

__consumer_offsets은 각각의 consumer group이 어디까지 consume 했는지 저장하는 곳이다.

# __consumer_offsets 조회
bin/kafka-console-consumer.sh --bootstrap-server 3.34.97.97:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

5. Topic의 Record produce

Postman으로 API 호출!

정상적으로 message가 produce, consume 되는 것을 확인할 수 있다.

# fruit Topic에 생성된 Record 조회
bin/kafka-console-consumer.sh --bootstrap-server 3.34.97.97:9092 --topic fruit --from-beginning

apple

전체 소스는 GitHub에서 확인하실 수 있습니다.

 

GitHub - devbeekei/kafka: Kafka example

Kafka example. Contribute to devbeekei/kafka development by creating an account on GitHub.

github.com

반응형