빅데이터/카프카

카프카 - (5) 컨슈머

_금융덕후_ 2022. 2. 7. 18:00
728x90
반응형

 

 

 

카프카 프로듀서

카프카 브로커를 서버, 파티션을 저장소라고 가정하면,
카프카 컨슈머는 카프카로부터 데이터를 제공받아 소비하는 클라이언트에 해당한다.
 

메시지 소비

지난 포스팅에서 생성한 토픽 kafka-test에 메시지를 소비해보자.
메시지를 소비하는 것 역시 bin 디렉토리의 스크립트 중 하나를 활용한다.
--from-beginning 옵션을 주면 처리 토픽에서 보관중인 첫 오프셋의 데이터부터 모두 받아오게 된다.
bin/kafka-console-consumer.sh --bootstrap-server johnny:9092 \
--topic kafka-test \
--from-beginning
value1
value2
value1
value2
 
위의 예제에서는 value값만 가져온다.
따라서 "print.key"와 "key.separator" 프로퍼티를 추가해 다시 가져와 본다.
bin/kafka-console-consumer.sh --bootstrap-server johnny:9092 \
--topic kafka-test \
--from-beginning \
--property print.key=true \
--property key.separator=";"
key1;value1
key2;value2
application-key1;value1
application-key2;value2
 

카프카 컨슈머 API

프로젝트와 설정들은 지난 포스팅의 설정 그대로 사용한다.
동일한 패키지 경로(com.johnny.study)에 생성하고, TestConsumer.java 파일을 생성한다.
프로듀서때와 동일하게 서버와 토픽 이름을 static 변수로 선언해주고,
 
컨슈머 그룹
이번에는 컨 슈머에서만 사용하는 poll timeout 및 컨슈머 그룹 이름을 설정해 준다.
컨슈머 그룹은 여러 컨슈머들을 목적에 따라 분류하기 위한 이름으로, 카프카는 컨슈머 그룹을 사용해 레코드 오프셋의 기준을 설정해 데이터를 처리하게 된다.
또한 성능 모니터링을 할 때도 컨슈머 그룹별로 모니터링을 하기 용이하다.
 
Poll Timeout
카프카 컨슈머는 poll() 메서드를 사용해 토픽으로부터 데이터를 가져온다.
timeout은 poll 메서드가 특정 시간동안 데이터를 처리하지 못할 때 포기를 하기 위해 설정해 준다.
@Slf4j
public class TestConsumer {
	private final static String TOPIC_NAME = "kafka-test";
	private final static String BOOTSTRAP_SERVERS = "johnny:9092";
	private final static String CONSUMER_GROUP_ID = "test-consumer-group";
	private final static int CONSUMER_POLL_TIMEOUT_SEC = 1;
	...

}
 
main 함수를 만든 뒤, 컨슈머 설정을 주입시키고 카프카 컨슈머 객체를 생성한다.
프로듀서 때와는 다르게 serializer가 아닌 deserializer가 사용되었다.
컨슈머는 여러 토픽을 소비할 수 있기 때문에 토픽이름의 리스트를 주입받는다.
아래의 경우에는 하나의 토픽만 받기 때문에 singletonList가 사용되었다.
@Slf4j
public class TestConsumer {
	...

	public static void main(String[] args) {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);


		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));

	...
}
 
보통 컨슈머는 프로듀서가 생성하는 메시지를 지속적으로 소비하기 위해 사용된다.
아래 코드에서는 지속적으로 토픽에서 데이터를 가져오기 위해 while문을 사용했다.
@Slf4j
public class TestConsumer {

		...

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(CONSUMER_POLL_TIMEOUT_SEC));
			records.forEach(record -> log.info("{}", record));
		}

	}
}
 
앞선 프로듀서의 코드예제에서 생성한 레코드를 위 컨슈머에서 받으면 아래와 같은 로그를 출력하는 것을 확인할 수 있다.
[main] INFO com.johnny.study.TestConsumer - ConsumerRecord(topic = kafka-test, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1628503013011, serialized key size = 16, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = application-key1, value = value1)
[main] INFO com.johnny.study.TestConsumer - ConsumerRecord(topic = kafka-test, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1628503013021, serialized key size = 16, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = application-key2, value = value2)
 
이로서 지난 포스팅에서 프로듀싱 한 레코드를 소비하는 컨슈머를 만들었다.
실행한 프로그램은 무한히 돌기 때문에 테스트가 끝나면 멈춰주어야 한다.
728x90
반응형

'빅데이터 > 카프카' 카테고리의 다른 글

카프카 - (4) 프로듀서  (0) 2022.01.31
카프카 - (3) 토픽  (0) 2022.01.24
카프카 - (2) 설치  (0) 2022.01.17
카프카 - (1) 개념  (0) 2022.01.10