빅데이터/카프카

카프카 - (4) 프로듀서

_금융덕후_ 2022. 1. 31. 18:00
728x90
반응형

 

카프카 프로듀서

카프카 브로커를 서버, 파티션을 저장소라고 가정하면,
카프카 프로듀서는 카프카에게 데이터를 제공하는 클라이언트에 해당한다.

 

메시지 전송

지난 포스팅에서 생성한 토픽 kafka-test에 메시지를 전송해보자.
메시지를 전송하는 것 역시 bin 디렉토리의 스크립트 중 하나를 활용한다.
bin/kafka-console-producer.sh --bootstrap-server johnny:9092 -- topic kafka-test --property "parse.key=true" --property "key.seperator=";"
> key1;value1
> key2;value2
 
parse.key 프로퍼티는 전송할 메시지에 키를 추가하는 옵션이다.
key.seperator 프로퍼티는 전송할 메시지에서 key와 value를 구분하는 글자이고, 기본값은 탭(\t)이다.
 
메시지 전송의 결과는 다음 포스팅에서 확인해 보도록 하겠다.

 

 

카프카 프로듀서 API

Intellij에서 자바 프로젝트를 생성해 카프카 프로듀서를 구현해 본다.
먼저 Gradle 기반의 새 프로젝트를 생성한다.
 
그리고 프로젝트의 이름을 설정한 뒤 생성해준다. (이름은 원하는것으로 설정)

 

Gradle 파일에 다음과 같은 dependency들을 추가한다.
dependencies {
	compile 'org.apache.kafka:kafka-clients:2.5.0'
	compile 'org.slf4j:slf4j-simple:1.7.30'
}
 
src>main>java 디렉토리에 패키지 경로(com.johnny.study)를 생성하고, TestProducer.java 파일을 생성한다.
그리고 먼저 서버와 토픽 이름을 static 변수로 선언해준다.
*Slf4j 는 로깅을 위한 라이브러리를 추가한 애너테이션 이다.
@Slf4j
public class TestProducer {
	private final static String TOPIC_NAME = "kafka-test";
	private final static String BOOTSTRAP_SERVERS = "johnny:9092";

	...
 
main 함수를 만든 뒤, 프로듀서 설정을 주입시키고 카프카 프로듀서 객체를 생성한다.
설정들은 각, 서버 주소, key serializer, value serializer 설정이다.
public static void main(String[] args) {
	Map<String, Object> props = new HashMap<>();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

	KafkaProducer<String, String> producer = new KafkaProducer<>(props);

	...
}
 
이제 레코드를 생성해 토픽으로 전송한다.
위 커맨드라인에서 전송한 레코드와의 구분을 위해 "application-"를 붙여주었다.
public static void main(String[] args) {
	...

	ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC_NAME, "application-key1", "value1");
	ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "application-key2", "value2");

	producer.send(record1);
	log.info("{} sent", record1);

	producer.send(record2);
	log.info("{} sent", record2);

	producer.flush();
	producer.close();
}
 
작성이 완료되면 main함수를 실행한다.
실행의 결과는 다음 포스팅에서 확인하도록 하겠다.
728x90
반응형

'빅데이터 > 카프카' 카테고리의 다른 글

카프카 - (5) 컨슈머  (0) 2022.02.07
카프카 - (3) 토픽  (0) 2022.01.24
카프카 - (2) 설치  (0) 2022.01.17
카프카 - (1) 개념  (0) 2022.01.10