카프카 커맨드 라인 툴(kafka command-line-tool)
카프카(kafka)에서 제공하는 커맨드 라인 툴을 통해 카프카 브로커 운영에 필요한 다양한 명령을 내릴 수 있습니다.
카프카를 운영할 때는 카프카 클러스터(Kafka Cluster)와 연동하여 데이터를 주고받는 것도 중요하지만 토픽(Topic)이나 파티션(Partition) 개수 변경과 같은 명령을 실행해야 하는 경우도 자주 발생합니다.
커맨드 라인 툴을 통해 명령을 실행할 때 필수 옵션과 선택 옵션이 있는데,
선택 옵션은 지정하지 않을 시 브로커에 설정된 기본 설정값 또는 커맨드 라인 툴의 기본값으로 대체되어 설정되기 때문에 브로커에 옵션이나 커맨드 라인 툴의 기본값이 어떻게 되어있는지 확인한 후에 사용해야 합니다.
1. kafka-topics.sh
kafka-topics.sh을 통해 토픽(Topic)과 관련된 명령을 실행할 수 있습니다.
1-1. 토픽(Topic) 생성
$ /bin/kafka-topics.sh --create \
--bootstrap-server {브로커IP}:{브로커Port} \
--partitions 3 \
--replication-factor 1 \
--config retention.ms 172800000 \
--topic {토픽명}
- --create : 토픽을 생성하는 명령어 명시
- --bootstrap-server : 토픽을 생성할 브로커 설정
- --topic : 토픽 이름 설정
- --partitions : 파티션 개수 설정(default: config/server.properties 파일에 num.partitions 옵션값)
- --replication-factor : 복제할 파티션 개수 설정(1로 설정 시 복제하지 않음, 2로 설정 시 1개의 복제본)
- --config : 추가 설정
- retention.ms config : 토픽에 데이터를 유지하는 기간을 설정 (172800000ms = 2일)
1-2. 토픽 리스트 조회
$ /bin/kafka-topics.sh --bootstrap-server {브로커IP}:{브로커Port} --list --exclude-internal
- --list : 토픽 목록을 조회하는 명령어 명시
- --exclude-internal : 인터널 토픽(internal topic)은 제외 설정(인터널 토픽은 카프카 내부 관리를 위한 토픽)
1-3. 토픽 상세 조회
$ /bin/kafka-topics.sh --bootstrap-server {브로커IP}:{브로커Port} --describe --topic {토픽명}
Topic: {토픽명} TopicId: b1svcmTcQROJyHafTtVZaA PartitionCount: 3 ReplicationFactor: 1 Configs: retention.ms=172800000
Topic: kafka-topic-test Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Elr: N/A LastKnownElr: N/A
Topic: kafka-topic-test Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: N/A LastKnownElr: N/A
Topic: kafka-topic-test Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Elr: N/A LastKnownElr: N/A
- --describe : 토픽 목록을 조회하는 명령어 명시
- PartitionCount : 파티션의 개수
- ReplicationFactor : 복제 파티션의 개수
- Configs : 기타 토픽을 구성하는 옵션값 표시
- Leader : 현재 해당 파티션의 리더 브로커
- Replicas : 해당 파티션의 데이터를 복제하고 있는 모든 브로커
- Isr(In-Sync Replicas) : 리더와 동기화 상태를 유지하고 있는 브로커
1-4. 토픽 옵션 수정
토픽 옵션을 변경하기 위해서는 kafka-topics.sh 또는 kafak-configs.sh를 사용합니다.
토픽에 대한 정보를 관리하는 일부 로직이 다른 명령어로 넘어갔기 때문에 명령어가 파편화되었습니다.
1-4-1. 파티션 개수 변경
파티션 개수 변경을 위해서는 kafka-topics.sh를 사용해야 합니다.
$ /bin/kafka-topics.sh --bootstrap-server {브로커IP}:{브로커Port} \
--topic {토픽명} \
--alter \
--partitions 4
- --alter, partitions : 파티션의 개수 설정(파티션은 줄일 수 없기 때문에 반드시 늘려야 하는 상황에만 사용)
1-4-2. 리텐션 기간 변경
리텐션 기간을 변경하기 위해서는 kafka-configs.sh를 사용해야 합니다.
$ /bin/kafka-configs.sh --bootstrap-server {브로커IP}:{브로커Port} \
--entity-type topics \
--entity-name {토픽명} \
--alter --add-config retention.ms=86400000
- --alter, --add-config : 리텐션 기간 설정 (이미 설정된 옵션값이 있다면 변경, 없다면 추가)
2. kafka-console-producer.sh
kafka-console-producer.sh를 통해 토픽에 데이터를 넣을 수 있습니다.
주의해야 할 점은 kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직열화 되어 String이 아닌 타입으로는 직렬화하여 전송할 수 없습니다.
다른 타입으로 직렬화하여 데이터를 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 합니다.
토픽에 넣는 데이터는 레코드(Record)라고 부르며 메시지 키(key)와 메시지 값(value)으로 이루어져 있습니다.
# key를 설정하지 않은 경우
$ /bin/kafka-console-producer.sh --bootstrap-server {브로커IP}:{브로커Port} --topic {토픽명}
>beekei
>kafka
>test
>0
>1
>2
>3
# key를 설정한 경우
$ /bin/kafka-console-producer.sh --bootstrap-server {브로커IP}:{브로커Port} --topic {토픽명} \
--property "parse.key=true" \
--property "key.separator=:"
>key1:hasKeyValue1
>key2:hasKeyValue2
>key3:hasKeyValue3
- parse.key=true : 레코드를 전송할 때 키를 추가
- key.separator : 키를 구분하는 구분자 설정(default: Tab delimiter(\t))
키 값을 설정하지 않은 경우 레코드 배치 단위(레코드 전송 묶음)로 라운드 로빈으로 파티션에 전송됩니다.
키 값을 설정한 경우 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당됩니다. 이로 인해 키가 동일한 경우에는 파티셔너(Partitioner)에 의해 동일한 파티션으로 전송됩니다.
하지만 파티션을 추가한 경우에는 동일한 파티션으로 전송된다는 보장은 없으니 만약 파티션을 추가했다면 커스텀 파티셔너(Partitioner)를 구현해서 운영해야 합니다.
3. kafka-console-consumer.sh
kafka-console-consumer.sh를 통해 토픽에 저장된 데이터를 확인할 수 있습니다.
# 키를 출력하지 않고 조회
$ /bin/kafka-console-consumer.sh --bootstrap-server {브로커IP}:{브로커Port} --topic {토픽명}
hasKeyValue3
hasKeyValue1
hasKeyValue2
beekei
kafka
test
0
1
2
3
# 키를 출력하고 조회
$ /bin/kafka-console-consumer.sh --bootstrap-server {브로커IP}:{브로커Port} --topic {토픽명} \
> --property print.key=true \
> --property key.separator=":" \
> --group consumer-group-1 \
> --from-beginning
null:beekei
null:kafka
null:test
null:0
null:1
null:2
null:3
key3:hasKeyValue3
key1:hasKeyValue1
key2:hasKeyValue2
- --from-beginning : 가장 처음 데이터부터 출력 설정
- --property : 키와 관련된 옵션 설정
- print.key=true 선언으로 키와 함께 출력
- key.separator 선언으로 키를 구분하는 구분자 설정 - default: Tab delimiter(\t)
- --group : 컨슈머 그룹(Consumer Group) 생성
(컨슈머 그룹은 따로 생성하는 명령을 날리지 않고 컨슈머를 동작할 때 컨슈머 그룹을 지정하면 자동으로 생성됩니다.)
컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있는데,
해당 그룹이 토픽의 메시지를 어디까지 가져갔는지 레코드의 오프셋 번호를 브로커에 커밋합니다.(__consumer_offsets 토픽에 저장)
이로써 그룹 내 컨슈머들은 같은 메세지를 중복으로 가져갈 수 없게 됩니다.
주의해야 할 점은 전송했던 데이터의 순서가 출력되는 순서와 다르다는 점입니다.
kafka-console-consumer.sh 명령어를 통해 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져가기 때문에 프로듀서가 토픽에 넣은 데이터의 순서와 컨슈머가 토픽에서 가져간 데이터의 순서가 달라지게 됩니다.
만약 데이터의 순서를 보장해야 한다면 파티션을 1개로 구성된 토픽을 만들어 사용해야 합니다.
4. kafka-consumer-groups.sh
kafka-consumer-groups.sh를 통해 생성된 컨슈머 그룹의 리스트와 상세 내용을 확인할 수 있습니다.
4-1. 컨슈머 그룹 리스트 조회
$ /bin/kafka-consumer-groups.sh --bootstrap-server {브로커IP}:{브로커Port} --list
- --list 옵션으로 컨슈머 그룹의 목록을 조회하는 명령어 명시
4-2. 컨슈머 그룹 상세 정보 조회
$ /bin/kafka-consumer-groups.sh --bootstrap-server {브로커IP}:{브로커Port} --group {그룹명} --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group-1 topic-test 0 7 7 0 - - -
consumer-group-1 topic-test 1 1 1 0 - - -
consumer-group-1 topic-test 2 2 12 10 - - -
해당 그룹이 마지막으로 커밋한 토픽과 파티션의 상세 정보를 확인할 수 있습니다.
- --group : 조회할 그룹 명시
- --describe : 컨슈머 그룹의 상세 내용을 조회하는 명령어 명시
- TOPIC, PARTITION : 마지막으로 커밋한 토픽과 파티션
- CURRENT-OFFSET : 해당 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋
- LOG-END-OFFSET : 해당 그룹의 컨슈머가 최종으로 커밋한 오프셋
- LAG : 해당 그룹이 토픽의 파티션에 있는 데이터를 가져가는데 발생한 지연 지표
(LOG-END-OFFSET과 CURRENT-OFFSET 차이) - CONSUMER-ID : 컨슈머의 토픽(파티션) 할당을 위해 사용하는 ID
- HOST : 컨슈머가 동작하는 host명
- CLIENT-ID : 컨슈머에 할당된 ID
카프카를 운영할 때 컨슈머 그룹이 중복되지 않는지, 랙이 얼마인지 파악하거나 접근 중이 컨슈머의 호스트, IP를 확인하기에 유용합니다.
5. kafka-verifiable-producer, consumer.sh
kafka-verifiable로 시작하는 2개의 스크립트를 사용하려면 메시지를 코드 없이 주고받을 수 있습니다.
카프카 클러스터 설치 후 간단한 네트워크 통신 테스트를 진행할 때 유용합니다.
$ /bin/kafka-verifiable-producer.sh --bootstrap-server {브로커IP}:{브로커Port} \
--max-messages 10 \
--topic {토픽명}
{"timestamp":1737098564036,"name":"startup_complete"}
{"timestamp":1737098564144,"name":"producer_send_success","key":null,"value":"0","offset":12,"topic":"topic-test","partition":2}
{"timestamp":1737098564146,"name":"producer_send_success","key":null,"value":"1","offset":13,"topic":"topic-test","partition":2}
{"timestamp":1737098564146,"name":"producer_send_success","key":null,"value":"2","offset":14,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"3","offset":15,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"4","offset":16,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"5","offset":17,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"6","offset":18,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"7","offset":19,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"8","offset":20,"topic":"topic-test","partition":2}
{"timestamp":1737098564147,"name":"producer_send_success","key":null,"value":"9","offset":21,"topic":"topic-test","partition":2}
{"timestamp":1737098564155,"name":"shutdown_complete"}
{"timestamp":1737098564156,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":82.64462809917356}
데이터가 모두 전송된 이후 통계값을 확인할 수 있습니다.
- --max-messages : 전송하는 데이터 개수 설정(-1로 설정할 경우 kafka-verifiable-producer.sh가 종료될 때까지 계속 전송)
- --topic : 데이터를 받을 토픽
전송한 데이터는 kafka-verifiable-consumer.sh로 확인할 수 있습니다.
$ /bin/kafka-verifiable-consumer.sh --bootstrap-server {브로커IP}:{브로커Port} \
--topic {토픽명} \
--group-id {그룹ID}
{"timestamp":1737099109207,"name":"startup_complete"}
{"timestamp":1737099112388,"name":"partitions_assigned","partitions":[{"topic":"verifiable-test","partition":0},{"topic":"verifiable-test","partition":1},{"topic":"verifiable-test","partition":2}]}
{"timestamp":1737099112422,"name":"records_consumed","count":10,"partitions":[{"topic":"verifiable-test","partition":1,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1737099112428,"name":"offsets_committed","offsets":[{"topic":"verifiable-test","partition":1,"offset":10}],"success":true}
{"timestamp":1737099126231,"name":"partitions_revoked","partitions":[{"topic":"verifiable-test","partition":0},{"topic":"verifiable-test","partition":1},{"topic":"verifiable-test","partition":2}]}
{"timestamp":1737099126593,"name":"shutdown_complete"}
- --group-id : 컨슈머 그룹을 지정
6. kafka-delete-records.sh
kafka-delete-records.sh를 통해 토픽의 데이터를 지울 수 있습니다.
토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점까지 삭제할 수 있습니다.
삭제하고자 하는 데이터의 대한 토픽, 파티션, 오프셋 정보를 파일로 저장해서 사용해야 합니다.
$ vim delete-records.json # 리코드 삭제 파일 생성
{"partitions": [{"topic": "topic-test", "partition": 0, "offset": 5}], "version": 1}
$ /bin/kafka-delete-records.sh --bootstrap-server {브로커IP}:{브로커Port} \
--offset-json-file delete-records.json
Executing records delete operation
Records delete operation completed:
partition: topic-test-0 low_watermark: 5
- --offset-json-file : 읽어드릴 파일 설정
토픽의 특정 레코드 하나만 삭제되는 것이 아니라, 파티션에 존재하는 가장 오래된 오프셋부터 지정한 오프셋까지 삭제되는 것을 주의해야 합니다. 카프카에서는 토픽의 파티션에 저장된 특정 데이터만 삭제하는 것은 불가능합니다.
다음 글
카프카 클라이언트(Kafka Client) - 프로듀서(Producer) API 설명 및 예제
카프카 클라이언트(Kafka Client)란?카프카 클러스터(Kafka Cluster)에 명령을 내리거나 데이터를 송수신하기 위해 다양한 API를 제공합니다.카프카 클라이언트는 라이브러리이기 때문에 자체 라이프사
devbksheen.tistory.com
'Apache Kafka' 카테고리의 다른 글
카프카 클라이언트(Kafka Client) - 프로듀서(Producer) API (0) | 2025.01.20 |
---|---|
카프카(Kafka)의 기본 개념 - 토픽(Topic), 파티션(Partition), 레코드(Record) (0) | 2025.01.17 |
카프카(Kafka)의 기본 개념 - 브로커(Broker) (0) | 2025.01.16 |
Spring Boot에 Kafka 연동 (0) | 2021.09.24 |
Local에서 Kafka 명령어 날리기 (0) | 2021.09.24 |